Skip to content

Commit 0cb08d3

Browse files
VGalaxiesSyncGithubBot
authored andcommitted
Subscription: support unsubscribe from completed topics under client heartbeat thread (apache#15595) (apache#15603)
1 parent da6fdc7 commit 0cb08d3

9 files changed

Lines changed: 150 additions & 52 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,5 +147,8 @@ public void do_test()
147147
consume_data(consumer, session_dest);
148148
check_count(8, "select count(s_0) from " + device, "Consume data again:" + pattern);
149149
check_count(8, "select count(s_1) from " + device, "Consumption data: s_1");
150+
while (!consumer.allTopicMessagesHaveBeenConsumed()) {
151+
Thread.sleep(1000);
152+
}
150153
}
151154
}

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.rpc.subscription.payload.response;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2223
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2324
import org.apache.iotdb.rpc.subscription.config.TopicConfig;
2425
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
@@ -29,19 +30,34 @@
2930
import java.io.DataOutputStream;
3031
import java.io.IOException;
3132
import java.nio.ByteBuffer;
33+
import java.util.ArrayList;
3234
import java.util.Collections;
3335
import java.util.HashMap;
36+
import java.util.List;
3437
import java.util.Map;
3538
import java.util.Objects;
3639

3740
public class PipeSubscribeHeartbeatResp extends TPipeSubscribeResp {
3841

3942
private transient Map<String, TopicConfig> topics = new HashMap<>(); // subscribed topics
4043

44+
private transient Map<Integer, TEndPoint> endPoints = new HashMap<>(); // available endpoints
45+
46+
private transient List<String> topicNamesToUnsubscribe =
47+
new ArrayList<>(); // topics should be unsubscribed
48+
4149
public Map<String, TopicConfig> getTopics() {
4250
return topics;
4351
}
4452

53+
public Map<Integer, TEndPoint> getEndPoints() {
54+
return endPoints;
55+
}
56+
57+
public List<String> getTopicNamesToUnsubscribe() {
58+
return topicNamesToUnsubscribe;
59+
}
60+
4561
/////////////////////////////// Thrift ///////////////////////////////
4662

4763
/**
@@ -63,7 +79,11 @@ public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(final TSStatus sta
6379
* server.
6480
*/
6581
public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(
66-
final TSStatus status, final Map<String, TopicConfig> topics) throws IOException {
82+
final TSStatus status,
83+
final Map<String, TopicConfig> topics,
84+
final Map<Integer, TEndPoint> endPoints,
85+
final List<String> topicNamesToUnsubscribe)
86+
throws IOException {
6787
final PipeSubscribeHeartbeatResp resp = toTPipeSubscribeResp(status);
6888

6989
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -73,6 +93,13 @@ public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(
7393
ReadWriteIOUtils.write(entry.getKey(), outputStream);
7494
entry.getValue().serialize(outputStream);
7595
}
96+
ReadWriteIOUtils.write(endPoints.size(), outputStream);
97+
for (final Map.Entry<Integer, TEndPoint> entry : endPoints.entrySet()) {
98+
ReadWriteIOUtils.write(entry.getKey(), outputStream);
99+
ReadWriteIOUtils.write(entry.getValue().getIp(), outputStream);
100+
ReadWriteIOUtils.write(entry.getValue().getPort(), outputStream);
101+
}
102+
ReadWriteIOUtils.writeStringList(topicNamesToUnsubscribe, outputStream);
76103
resp.body =
77104
Collections.singletonList(
78105
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()));
@@ -89,14 +116,28 @@ public static PipeSubscribeHeartbeatResp fromTPipeSubscribeResp(
89116
if (Objects.nonNull(heartbeatResp.body)) {
90117
for (final ByteBuffer byteBuffer : heartbeatResp.body) {
91118
if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) {
92-
final int size = ReadWriteIOUtils.readInt(byteBuffer);
93-
final Map<String, TopicConfig> topics = new HashMap<>();
94-
for (int i = 0; i < size; i++) {
95-
final String topicName = ReadWriteIOUtils.readString(byteBuffer);
96-
final TopicConfig topicConfig = TopicConfig.deserialize(byteBuffer);
97-
topics.put(topicName, topicConfig);
119+
{
120+
final int size = ReadWriteIOUtils.readInt(byteBuffer);
121+
final Map<String, TopicConfig> topics = new HashMap<>();
122+
for (int i = 0; i < size; i++) {
123+
final String topicName = ReadWriteIOUtils.readString(byteBuffer);
124+
final TopicConfig topicConfig = TopicConfig.deserialize(byteBuffer);
125+
topics.put(topicName, topicConfig);
126+
}
127+
resp.topics = topics;
128+
}
129+
{
130+
final int size = ReadWriteIOUtils.readInt(byteBuffer);
131+
final Map<Integer, TEndPoint> endPoints = new HashMap<>();
132+
for (int i = 0; i < size; i++) {
133+
final int nodeId = ReadWriteIOUtils.readInt(byteBuffer);
134+
final String ip = ReadWriteIOUtils.readString(byteBuffer);
135+
final int port = ReadWriteIOUtils.readInt(byteBuffer);
136+
endPoints.put(nodeId, new TEndPoint(ip, port));
137+
}
138+
resp.endPoints = endPoints;
98139
}
99-
resp.topics = topics;
140+
resp.topicNamesToUnsubscribe = ReadWriteIOUtils.readStringList(byteBuffer);
100141
break;
101142
}
102143
}
@@ -122,6 +163,8 @@ public boolean equals(final Object obj) {
122163
}
123164
final PipeSubscribeHeartbeatResp that = (PipeSubscribeHeartbeatResp) obj;
124165
return Objects.equals(this.topics, that.topics)
166+
&& Objects.equals(this.endPoints, that.endPoints)
167+
&& Objects.equals(this.topicNamesToUnsubscribe, that.topicNamesToUnsubscribe)
125168
&& Objects.equals(this.status, that.status)
126169
&& this.version == that.version
127170
&& this.type == that.type
@@ -130,6 +173,6 @@ public boolean equals(final Object obj) {
130173

131174
@Override
132175
public int hashCode() {
133-
return Objects.hash(topics, status, version, type, body);
176+
return Objects.hash(topics, endPoints, topicNamesToUnsubscribe, status, version, type, body);
134177
}
135178
}

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@
2020
package org.apache.iotdb.session.subscription;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
23-
import org.apache.iotdb.isession.SessionDataSet;
2423
import org.apache.iotdb.rpc.IoTDBConnectionException;
25-
import org.apache.iotdb.rpc.StatementExecutionException;
2624
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
2725
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
2826
import org.apache.iotdb.session.Session;
@@ -31,20 +29,11 @@
3129
import org.apache.thrift.TException;
3230

3331
import java.time.ZoneId;
34-
import java.util.HashMap;
3532
import java.util.List;
36-
import java.util.Map;
3733
import java.util.function.Supplier;
3834

3935
public class SubscriptionSessionConnection extends SessionConnection {
4036

41-
private static final String SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
42-
private static final String NODE_ID_COLUMN_NAME = "NodeID";
43-
private static final String STATUS_COLUMN_NAME = "Status";
44-
private static final String IP_COLUMN_NAME = "RpcAddress";
45-
private static final String PORT_COLUMN_NAME = "RpcPort";
46-
private static final String REMOVING_STATUS = "Removing";
47-
4837
public SubscriptionSessionConnection(
4938
Session session,
5039
TEndPoint endPoint,
@@ -56,27 +45,6 @@ public SubscriptionSessionConnection(
5645
super(session, endPoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
5746
}
5847

59-
// from org.apache.iotdb.session.NodesSupplier.updateDataNodeList
60-
public Map<Integer, TEndPoint> fetchAllEndPoints()
61-
throws IoTDBConnectionException, StatementExecutionException {
62-
SessionDataSet dataSet = session.executeQueryStatement(SHOW_DATA_NODES_COMMAND);
63-
SessionDataSet.DataIterator iterator = dataSet.iterator();
64-
Map<Integer, TEndPoint> endPoints = new HashMap<>();
65-
while (iterator.next()) {
66-
// ignore removing DN
67-
if (REMOVING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))) {
68-
continue;
69-
}
70-
String ip = iterator.getString(IP_COLUMN_NAME);
71-
String port = iterator.getString(PORT_COLUMN_NAME);
72-
if (ip != null && port != null) {
73-
endPoints.put(
74-
iterator.getInt(NODE_ID_COLUMN_NAME), new TEndPoint(ip, Integer.parseInt(port)));
75-
}
76-
}
77-
return endPoints;
78-
}
79-
8048
public TPipeSubscribeResp pipeSubscribe(final TPipeSubscribeReq req) throws TException {
8149
return client.pipeSubscribe(req);
8250
}

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1359,7 +1359,7 @@ Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection() throws SubscriptionEx
13591359
}
13601360
for (final SubscriptionProvider provider : providers) {
13611361
try {
1362-
return provider.getSessionConnection().fetchAllEndPoints();
1362+
return provider.heartbeat().getEndPoints();
13631363
} catch (final Exception e) {
13641364
LOGGER.warn(
13651365
"{} failed to fetch all endpoints from subscription provider {}, try next subscription provider...",

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ void closeInternal() throws SubscriptionException {
218218

219219
/////////////////////////////// subscription APIs ///////////////////////////////
220220

221-
Map<String, TopicConfig> heartbeat() throws SubscriptionException {
221+
PipeSubscribeHeartbeatResp heartbeat() throws SubscriptionException {
222222
final TPipeSubscribeResp resp;
223223
try {
224224
resp = getSessionConnection().pipeSubscribe(PipeSubscribeHeartbeatReq.toTPipeSubscribeReq());
@@ -232,9 +232,7 @@ Map<String, TopicConfig> heartbeat() throws SubscriptionException {
232232
throw new SubscriptionConnectionException(e.getMessage(), e);
233233
}
234234
verifyPipeSubscribeSuccess(resp.status);
235-
final PipeSubscribeHeartbeatResp heartbeatResp =
236-
PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
237-
return heartbeatResp.getTopics();
235+
return PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
238236
}
239237

240238
Map<String, TopicConfig> subscribe(final Set<String> topicNames) throws SubscriptionException {

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.rpc.IoTDBConnectionException;
2424
import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException;
2525
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
26+
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp;
2627

2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
@@ -94,7 +95,7 @@ void openProviders(final SubscriptionConsumer consumer) throws SubscriptionExcep
9495

9596
final Map<Integer, TEndPoint> allEndPoints;
9697
try {
97-
allEndPoints = defaultProvider.getSessionConnection().fetchAllEndPoints();
98+
allEndPoints = defaultProvider.heartbeat().getEndPoints();
9899
} catch (final Exception e) {
99100
LOGGER.warn(
100101
"{} failed to fetch all endpoints from {} because of {}", consumer, endPoint, e, e);
@@ -243,7 +244,17 @@ void heartbeat(final SubscriptionConsumer consumer) {
243244
private void heartbeatInternal(final SubscriptionConsumer consumer) {
244245
for (final SubscriptionProvider provider : getAllProviders()) {
245246
try {
246-
consumer.subscribedTopics = provider.heartbeat();
247+
final PipeSubscribeHeartbeatResp resp = provider.heartbeat();
248+
// update subscribed topics
249+
consumer.subscribedTopics = resp.getTopics();
250+
// unsubscribe completed topics
251+
for (final String topicName : resp.getTopicNamesToUnsubscribe()) {
252+
LOGGER.info(
253+
"Termination occurred when SubscriptionConsumer {} polling topics, unsubscribe topic {} automatically",
254+
consumer.coreReportMessage(),
255+
topicName);
256+
consumer.unsubscribe(topicName);
257+
}
247258
provider.setAvailable();
248259
} catch (final Exception e) {
249260
LOGGER.warn(
@@ -308,7 +319,7 @@ private void syncInternal(final SubscriptionConsumer consumer) {
308319
} else {
309320
// existing provider
310321
try {
311-
consumer.subscribedTopics = provider.heartbeat();
322+
consumer.subscribedTopics = provider.heartbeat().getTopics();
312323
provider.setAvailable();
313324
} catch (final Exception e) {
314325
LOGGER.warn(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.slf4j.Logger;
3131
import org.slf4j.LoggerFactory;
3232

33+
import java.util.Collections;
3334
import java.util.List;
3435
import java.util.Map;
3536
import java.util.Objects;
@@ -125,6 +126,16 @@ public boolean isCommitContextOutdated(final SubscriptionCommitContext commitCon
125126
return broker.isCommitContextOutdated(commitContext);
126127
}
127128

129+
public List<String> fetchTopicNamesToUnsubscribe(
130+
final ConsumerConfig consumerConfig, final Set<String> topicNames) {
131+
final String consumerGroupId = consumerConfig.getConsumerGroupId();
132+
final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
133+
if (Objects.isNull(broker)) {
134+
return Collections.emptyList();
135+
}
136+
return broker.fetchTopicNamesToUnsubscribe(topicNames);
137+
}
138+
128139
/////////////////////////////// broker ///////////////////////////////
129140

130141
public boolean isBrokerExist(final String consumerGroupId) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,25 @@ public boolean isCommitContextOutdated(final SubscriptionCommitContext commitCon
339339
return prefetchingQueue.isCommitContextOutdated(commitContext);
340340
}
341341

342+
public List<String> fetchTopicNamesToUnsubscribe(final Set<String> topicNames) {
343+
final List<String> topicNamesToUnsubscribe = new ArrayList<>();
344+
345+
for (final String topicName : topicNames) {
346+
final SubscriptionPrefetchingQueue prefetchingQueue =
347+
topicNameToPrefetchingQueue.get(topicName);
348+
// If there is no prefetching queue for the topic, check if it's completed
349+
if (Objects.isNull(prefetchingQueue) && completedTopicNames.containsKey(topicName)) {
350+
LOGGER.info(
351+
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is completed, reply to client heartbeat request",
352+
topicName,
353+
brokerId);
354+
topicNamesToUnsubscribe.add(topicName);
355+
}
356+
}
357+
358+
return topicNamesToUnsubscribe;
359+
}
360+
342361
/////////////////////////////// prefetching queue ///////////////////////////////
343362

344363
public void bindPrefetchingQueue(

0 commit comments

Comments
 (0)