diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java index 10a83a7e67456..aca980b88e07a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java @@ -147,5 +147,8 @@ public void do_test() consume_data(consumer, session_dest); check_count(8, "select count(s_0) from " + device, "Consume data again:" + pattern); check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); + while (!consumer.allTopicMessagesHaveBeenConsumed()) { + Thread.sleep(1000); + } } } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java index 34a927ef29c42..7bf02c7c6e765 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java @@ -19,6 +19,7 @@ package org.apache.iotdb.rpc.subscription.payload.response; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.rpc.subscription.config.TopicConfig; import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp; @@ -29,8 +30,10 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; @@ -38,10 +41,23 @@ public class PipeSubscribeHeartbeatResp extends TPipeSubscribeResp { private transient Map topics = new HashMap<>(); // subscribed topics + private transient Map endPoints = new HashMap<>(); // available endpoints + + private transient List topicNamesToUnsubscribe = + new ArrayList<>(); // topics should be unsubscribed + public Map getTopics() { return topics; } + public Map getEndPoints() { + return endPoints; + } + + public List getTopicNamesToUnsubscribe() { + return topicNamesToUnsubscribe; + } + /////////////////////////////// Thrift /////////////////////////////// /** @@ -63,7 +79,11 @@ public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(final TSStatus sta * server. */ public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp( - final TSStatus status, final Map topics) throws IOException { + final TSStatus status, + final Map topics, + final Map endPoints, + final List topicNamesToUnsubscribe) + throws IOException { final PipeSubscribeHeartbeatResp resp = toTPipeSubscribeResp(status); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); @@ -73,6 +93,13 @@ public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp( ReadWriteIOUtils.write(entry.getKey(), outputStream); entry.getValue().serialize(outputStream); } + ReadWriteIOUtils.write(endPoints.size(), outputStream); + for (final Map.Entry entry : endPoints.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue().getIp(), outputStream); + ReadWriteIOUtils.write(entry.getValue().getPort(), outputStream); + } + ReadWriteIOUtils.writeStringList(topicNamesToUnsubscribe, outputStream); resp.body = Collections.singletonList( ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())); @@ -89,14 +116,28 @@ public static PipeSubscribeHeartbeatResp fromTPipeSubscribeResp( if (Objects.nonNull(heartbeatResp.body)) { for (final ByteBuffer byteBuffer : heartbeatResp.body) { if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) { - final int size = ReadWriteIOUtils.readInt(byteBuffer); - final Map topics = new HashMap<>(); - for (int i = 0; i < size; i++) { - final String topicName = ReadWriteIOUtils.readString(byteBuffer); - final TopicConfig topicConfig = TopicConfig.deserialize(byteBuffer); - topics.put(topicName, topicConfig); + { + final int size = ReadWriteIOUtils.readInt(byteBuffer); + final Map topics = new HashMap<>(); + for (int i = 0; i < size; i++) { + final String topicName = ReadWriteIOUtils.readString(byteBuffer); + final TopicConfig topicConfig = TopicConfig.deserialize(byteBuffer); + topics.put(topicName, topicConfig); + } + resp.topics = topics; + } + { + final int size = ReadWriteIOUtils.readInt(byteBuffer); + final Map endPoints = new HashMap<>(); + for (int i = 0; i < size; i++) { + final int nodeId = ReadWriteIOUtils.readInt(byteBuffer); + final String ip = ReadWriteIOUtils.readString(byteBuffer); + final int port = ReadWriteIOUtils.readInt(byteBuffer); + endPoints.put(nodeId, new TEndPoint(ip, port)); + } + resp.endPoints = endPoints; } - resp.topics = topics; + resp.topicNamesToUnsubscribe = ReadWriteIOUtils.readStringList(byteBuffer); break; } } @@ -122,6 +163,8 @@ public boolean equals(final Object obj) { } final PipeSubscribeHeartbeatResp that = (PipeSubscribeHeartbeatResp) obj; return Objects.equals(this.topics, that.topics) + && Objects.equals(this.endPoints, that.endPoints) + && Objects.equals(this.topicNamesToUnsubscribe, that.topicNamesToUnsubscribe) && Objects.equals(this.status, that.status) && this.version == that.version && this.type == that.type @@ -130,6 +173,6 @@ public boolean equals(final Object obj) { @Override public int hashCode() { - return Objects.hash(topics, status, version, type, body); + return Objects.hash(topics, endPoints, topicNamesToUnsubscribe, status, version, type, body); } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java index 135e1055f47c0..1245a39744280 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java @@ -20,9 +20,7 @@ package org.apache.iotdb.session.subscription; import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq; import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp; import org.apache.iotdb.session.Session; @@ -31,20 +29,11 @@ import org.apache.thrift.TException; import java.time.ZoneId; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.function.Supplier; public class SubscriptionSessionConnection extends SessionConnection { - private static final String SHOW_DATA_NODES_COMMAND = "SHOW DATANODES"; - private static final String NODE_ID_COLUMN_NAME = "NodeID"; - private static final String STATUS_COLUMN_NAME = "Status"; - private static final String IP_COLUMN_NAME = "RpcAddress"; - private static final String PORT_COLUMN_NAME = "RpcPort"; - private static final String REMOVING_STATUS = "Removing"; - public SubscriptionSessionConnection( Session session, TEndPoint endPoint, @@ -56,27 +45,6 @@ public SubscriptionSessionConnection( super(session, endPoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs); } - // from org.apache.iotdb.session.NodesSupplier.updateDataNodeList - public Map fetchAllEndPoints() - throws IoTDBConnectionException, StatementExecutionException { - SessionDataSet dataSet = session.executeQueryStatement(SHOW_DATA_NODES_COMMAND); - SessionDataSet.DataIterator iterator = dataSet.iterator(); - Map endPoints = new HashMap<>(); - while (iterator.next()) { - // ignore removing DN - if (REMOVING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))) { - continue; - } - String ip = iterator.getString(IP_COLUMN_NAME); - String port = iterator.getString(PORT_COLUMN_NAME); - if (ip != null && port != null) { - endPoints.put( - iterator.getInt(NODE_ID_COLUMN_NAME), new TEndPoint(ip, Integer.parseInt(port))); - } - } - return endPoints; - } - public TPipeSubscribeResp pipeSubscribe(final TPipeSubscribeReq req) throws TException { return client.pipeSubscribe(req); } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java index 7add29b4d7c31..9e7242de2009b 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java @@ -1359,7 +1359,7 @@ Map fetchAllEndPointsWithRedirection() throws SubscriptionEx } for (final SubscriptionProvider provider : providers) { try { - return provider.getSessionConnection().fetchAllEndPoints(); + return provider.heartbeat().getEndPoints(); } catch (final Exception e) { LOGGER.warn( "{} failed to fetch all endpoints from subscription provider {}, try next subscription provider...", diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java index dc717e6a4c6b6..8d50fda60709e 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java @@ -218,7 +218,7 @@ void closeInternal() throws SubscriptionException { /////////////////////////////// subscription APIs /////////////////////////////// - Map heartbeat() throws SubscriptionException { + PipeSubscribeHeartbeatResp heartbeat() throws SubscriptionException { final TPipeSubscribeResp resp; try { resp = getSessionConnection().pipeSubscribe(PipeSubscribeHeartbeatReq.toTPipeSubscribeReq()); @@ -232,9 +232,7 @@ Map heartbeat() throws SubscriptionException { throw new SubscriptionConnectionException(e.getMessage(), e); } verifyPipeSubscribeSuccess(resp.status); - final PipeSubscribeHeartbeatResp heartbeatResp = - PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp); - return heartbeatResp.getTopics(); + return PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp); } Map subscribe(final Set topicNames) throws SubscriptionException { diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java index 3d09cc3516a0a..6c0b7d03b184f 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java @@ -23,6 +23,7 @@ import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,7 +95,7 @@ void openProviders(final SubscriptionConsumer consumer) throws SubscriptionExcep final Map allEndPoints; try { - allEndPoints = defaultProvider.getSessionConnection().fetchAllEndPoints(); + allEndPoints = defaultProvider.heartbeat().getEndPoints(); } catch (final Exception e) { LOGGER.warn( "{} failed to fetch all endpoints from {} because of {}", consumer, endPoint, e, e); @@ -243,7 +244,17 @@ void heartbeat(final SubscriptionConsumer consumer) { private void heartbeatInternal(final SubscriptionConsumer consumer) { for (final SubscriptionProvider provider : getAllProviders()) { try { - consumer.subscribedTopics = provider.heartbeat(); + final PipeSubscribeHeartbeatResp resp = provider.heartbeat(); + // update subscribed topics + consumer.subscribedTopics = resp.getTopics(); + // unsubscribe completed topics + for (final String topicName : resp.getTopicNamesToUnsubscribe()) { + LOGGER.info( + "Termination occurred when SubscriptionConsumer {} polling topics, unsubscribe topic {} automatically", + consumer.coreReportMessage(), + topicName); + consumer.unsubscribe(topicName); + } provider.setAvailable(); } catch (final Exception e) { LOGGER.warn( @@ -308,7 +319,7 @@ private void syncInternal(final SubscriptionConsumer consumer) { } else { // existing provider try { - consumer.subscribedTopics = provider.heartbeat(); + consumer.subscribedTopics = provider.heartbeat().getTopics(); provider.setAvailable(); } catch (final Exception e) { LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java index b750834f4ce8d..2ea4f0a731efd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -125,6 +126,16 @@ public boolean isCommitContextOutdated(final SubscriptionCommitContext commitCon return broker.isCommitContextOutdated(commitContext); } + public List fetchTopicNamesToUnsubscribe( + final ConsumerConfig consumerConfig, final Set topicNames) { + final String consumerGroupId = consumerConfig.getConsumerGroupId(); + final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId); + if (Objects.isNull(broker)) { + return Collections.emptyList(); + } + return broker.fetchTopicNamesToUnsubscribe(topicNames); + } + /////////////////////////////// broker /////////////////////////////// public boolean isBrokerExist(final String consumerGroupId) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java index 6106b60cabfc0..0b434b7b331b7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java @@ -339,6 +339,25 @@ public boolean isCommitContextOutdated(final SubscriptionCommitContext commitCon return prefetchingQueue.isCommitContextOutdated(commitContext); } + public List fetchTopicNamesToUnsubscribe(final Set topicNames) { + final List topicNamesToUnsubscribe = new ArrayList<>(); + + for (final String topicName : topicNames) { + final SubscriptionPrefetchingQueue prefetchingQueue = + topicNameToPrefetchingQueue.get(topicName); + // If there is no prefetching queue for the topic, check if it's completed + if (Objects.isNull(prefetchingQueue) && completedTopicNames.containsKey(topicName)) { + LOGGER.info( + "Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is completed, reply to client heartbeat request", + topicName, + brokerId); + topicNamesToUnsubscribe.add(topicName); + } + } + + return topicNamesToUnsubscribe; + } + /////////////////////////////// prefetching queue /////////////////////////////// public void bindPrefetchingQueue( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java index 87fde40a43211..34acb35d880c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java @@ -19,13 +19,17 @@ package org.apache.iotdb.db.subscription.receiver; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.consensus.ConfigRegionId; import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq; import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq; +import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq; import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -39,6 +43,7 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.subscription.config.ConsumerConfig; +import org.apache.iotdb.rpc.subscription.config.TopicConfig; import org.apache.iotdb.rpc.subscription.exception.SubscriptionException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionPayloadExceedException; import org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException; @@ -78,7 +83,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -260,13 +267,51 @@ private TPipeSubscribeResp handlePipeSubscribeHeartbeatInternal( // TODO: do something LOGGER.info("Subscription: consumer {} heartbeat successfully", consumerConfig); - return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp( - RpcUtils.SUCCESS_STATUS, + + // fetch subscribed topics + final Map topics = SubscriptionAgent.topic() .getTopicConfigs( SubscriptionAgent.consumer() .getTopicNamesSubscribedByConsumer( - consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId()))); + consumerConfig.getConsumerGroupId(), consumerConfig.getConsumerId())); + + // fetch available endpoints + final Map endPoints = new HashMap<>(); + try (final ConfigNodeClient configNodeClient = + CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + final TShowDataNodesResp resp = configNodeClient.showDataNodes(); + // refer to org.apache.iotdb.session.NodesSupplier.updateDataNodeList + + for (final TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) { + // ignore removing DN + if (Objects.equals(NodeStatus.Removing.getStatus(), dataNodeInfo.getStatus())) { + continue; + } + final String ip = dataNodeInfo.getRpcAddresss(); + final int port = dataNodeInfo.getRpcPort(); + if (ip != null && port != 0) { + endPoints.put(dataNodeInfo.getDataNodeId(), new TEndPoint(ip, port)); + } + } + } catch (final ClientManagerException | TException e) { + LOGGER.warn( + "Exception occurred when fetch endpoints for consumer {} in config node", + consumerConfig, + e); + final String exceptionMessage = + String.format( + "Subscription: Failed to fetch endpoints for consumer %s in config node, exception is %s.", + consumerConfig, e); + throw new SubscriptionException(exceptionMessage); + } + + // fetch topics should be unsubscribed + final List topicNamesToUnsubscribe = + SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig, topics.keySet()); + + return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp( + RpcUtils.SUCCESS_STATUS, topics, endPoints, topicNamesToUnsubscribe); } private TPipeSubscribeResp handlePipeSubscribeSubscribe(final PipeSubscribeSubscribeReq req) {