Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,8 @@ public void do_test()
consume_data(consumer, session_dest);
check_count(8, "select count(s_0) from " + device, "Consume data again:" + pattern);
check_count(8, "select count(s_1) from " + device, "Consumption data: s_1");
while (!consumer.allTopicMessagesHaveBeenConsumed()) {
Thread.sleep(1000);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.rpc.subscription.payload.response;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
Expand All @@ -29,19 +30,34 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class PipeSubscribeHeartbeatResp extends TPipeSubscribeResp {

private transient Map<String, TopicConfig> topics = new HashMap<>(); // subscribed topics

private transient Map<Integer, TEndPoint> endPoints = new HashMap<>(); // available endpoints

private transient List<String> topicNamesToUnsubscribe =
new ArrayList<>(); // topics should be unsubscribed

public Map<String, TopicConfig> getTopics() {
return topics;
}

public Map<Integer, TEndPoint> getEndPoints() {
return endPoints;
}

public List<String> getTopicNamesToUnsubscribe() {
return topicNamesToUnsubscribe;
}

/////////////////////////////// Thrift ///////////////////////////////

/**
Expand All @@ -63,7 +79,11 @@ public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(final TSStatus sta
* server.
*/
public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(
final TSStatus status, final Map<String, TopicConfig> topics) throws IOException {
final TSStatus status,
final Map<String, TopicConfig> topics,
final Map<Integer, TEndPoint> endPoints,
final List<String> topicNamesToUnsubscribe)
throws IOException {
final PipeSubscribeHeartbeatResp resp = toTPipeSubscribeResp(status);

try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
Expand All @@ -73,6 +93,13 @@ public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(
ReadWriteIOUtils.write(entry.getKey(), outputStream);
entry.getValue().serialize(outputStream);
}
ReadWriteIOUtils.write(endPoints.size(), outputStream);
for (final Map.Entry<Integer, TEndPoint> entry : endPoints.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue().getIp(), outputStream);
ReadWriteIOUtils.write(entry.getValue().getPort(), outputStream);
}
ReadWriteIOUtils.writeStringList(topicNamesToUnsubscribe, outputStream);
resp.body =
Collections.singletonList(
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()));
Expand All @@ -89,14 +116,28 @@ public static PipeSubscribeHeartbeatResp fromTPipeSubscribeResp(
if (Objects.nonNull(heartbeatResp.body)) {
for (final ByteBuffer byteBuffer : heartbeatResp.body) {
if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) {
final int size = ReadWriteIOUtils.readInt(byteBuffer);
final Map<String, TopicConfig> topics = new HashMap<>();
for (int i = 0; i < size; i++) {
final String topicName = ReadWriteIOUtils.readString(byteBuffer);
final TopicConfig topicConfig = TopicConfig.deserialize(byteBuffer);
topics.put(topicName, topicConfig);
{
final int size = ReadWriteIOUtils.readInt(byteBuffer);
final Map<String, TopicConfig> topics = new HashMap<>();
for (int i = 0; i < size; i++) {
final String topicName = ReadWriteIOUtils.readString(byteBuffer);
final TopicConfig topicConfig = TopicConfig.deserialize(byteBuffer);
topics.put(topicName, topicConfig);
}
resp.topics = topics;
}
{
final int size = ReadWriteIOUtils.readInt(byteBuffer);
final Map<Integer, TEndPoint> endPoints = new HashMap<>();
for (int i = 0; i < size; i++) {
final int nodeId = ReadWriteIOUtils.readInt(byteBuffer);
final String ip = ReadWriteIOUtils.readString(byteBuffer);
final int port = ReadWriteIOUtils.readInt(byteBuffer);
endPoints.put(nodeId, new TEndPoint(ip, port));
}
resp.endPoints = endPoints;
}
resp.topics = topics;
resp.topicNamesToUnsubscribe = ReadWriteIOUtils.readStringList(byteBuffer);
break;
}
}
Expand All @@ -122,6 +163,8 @@ public boolean equals(final Object obj) {
}
final PipeSubscribeHeartbeatResp that = (PipeSubscribeHeartbeatResp) obj;
return Objects.equals(this.topics, that.topics)
&& Objects.equals(this.endPoints, that.endPoints)
&& Objects.equals(this.topicNamesToUnsubscribe, that.topicNamesToUnsubscribe)
&& Objects.equals(this.status, that.status)
&& this.version == that.version
&& this.type == that.type
Expand All @@ -130,6 +173,6 @@ public boolean equals(final Object obj) {

@Override
public int hashCode() {
return Objects.hash(topics, status, version, type, body);
return Objects.hash(topics, endPoints, topicNamesToUnsubscribe, status, version, type, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package org.apache.iotdb.session.subscription;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.iotdb.session.Session;
Expand All @@ -31,20 +29,11 @@
import org.apache.thrift.TException;

import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

public class SubscriptionSessionConnection extends SessionConnection {

private static final String SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
private static final String NODE_ID_COLUMN_NAME = "NodeID";
private static final String STATUS_COLUMN_NAME = "Status";
private static final String IP_COLUMN_NAME = "RpcAddress";
private static final String PORT_COLUMN_NAME = "RpcPort";
private static final String REMOVING_STATUS = "Removing";

public SubscriptionSessionConnection(
Session session,
TEndPoint endPoint,
Expand All @@ -56,27 +45,6 @@ public SubscriptionSessionConnection(
super(session, endPoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
}

// from org.apache.iotdb.session.NodesSupplier.updateDataNodeList
public Map<Integer, TEndPoint> fetchAllEndPoints()
throws IoTDBConnectionException, StatementExecutionException {
SessionDataSet dataSet = session.executeQueryStatement(SHOW_DATA_NODES_COMMAND);
SessionDataSet.DataIterator iterator = dataSet.iterator();
Map<Integer, TEndPoint> endPoints = new HashMap<>();
while (iterator.next()) {
// ignore removing DN
if (REMOVING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))) {
continue;
}
String ip = iterator.getString(IP_COLUMN_NAME);
String port = iterator.getString(PORT_COLUMN_NAME);
if (ip != null && port != null) {
endPoints.put(
iterator.getInt(NODE_ID_COLUMN_NAME), new TEndPoint(ip, Integer.parseInt(port)));
}
}
return endPoints;
}

public TPipeSubscribeResp pipeSubscribe(final TPipeSubscribeReq req) throws TException {
return client.pipeSubscribe(req);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection() throws SubscriptionEx
}
for (final SubscriptionProvider provider : providers) {
try {
return provider.getSessionConnection().fetchAllEndPoints();
return provider.heartbeat().getEndPoints();
} catch (final Exception e) {
LOGGER.warn(
"{} failed to fetch all endpoints from subscription provider {}, try next subscription provider...",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ void closeInternal() throws SubscriptionException {

/////////////////////////////// subscription APIs ///////////////////////////////

Map<String, TopicConfig> heartbeat() throws SubscriptionException {
PipeSubscribeHeartbeatResp heartbeat() throws SubscriptionException {
final TPipeSubscribeResp resp;
try {
resp = getSessionConnection().pipeSubscribe(PipeSubscribeHeartbeatReq.toTPipeSubscribeReq());
Expand All @@ -232,9 +232,7 @@ Map<String, TopicConfig> heartbeat() throws SubscriptionException {
throw new SubscriptionConnectionException(e.getMessage(), e);
}
verifyPipeSubscribeSuccess(resp.status);
final PipeSubscribeHeartbeatResp heartbeatResp =
PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
return heartbeatResp.getTopics();
return PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
}

Map<String, TopicConfig> subscribe(final Set<String> topicNames) throws SubscriptionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -94,7 +95,7 @@ void openProviders(final SubscriptionConsumer consumer) throws SubscriptionExcep

final Map<Integer, TEndPoint> allEndPoints;
try {
allEndPoints = defaultProvider.getSessionConnection().fetchAllEndPoints();
allEndPoints = defaultProvider.heartbeat().getEndPoints();
} catch (final Exception e) {
LOGGER.warn(
"{} failed to fetch all endpoints from {} because of {}", consumer, endPoint, e, e);
Expand Down Expand Up @@ -243,7 +244,17 @@ void heartbeat(final SubscriptionConsumer consumer) {
private void heartbeatInternal(final SubscriptionConsumer consumer) {
for (final SubscriptionProvider provider : getAllProviders()) {
try {
consumer.subscribedTopics = provider.heartbeat();
final PipeSubscribeHeartbeatResp resp = provider.heartbeat();
// update subscribed topics
consumer.subscribedTopics = resp.getTopics();
// unsubscribe completed topics
for (final String topicName : resp.getTopicNamesToUnsubscribe()) {
LOGGER.info(
"Termination occurred when SubscriptionConsumer {} polling topics, unsubscribe topic {} automatically",
consumer.coreReportMessage(),
topicName);
consumer.unsubscribe(topicName);
}
provider.setAvailable();
} catch (final Exception e) {
LOGGER.warn(
Expand Down Expand Up @@ -308,7 +319,7 @@ private void syncInternal(final SubscriptionConsumer consumer) {
} else {
// existing provider
try {
consumer.subscribedTopics = provider.heartbeat();
consumer.subscribedTopics = provider.heartbeat().getTopics();
provider.setAvailable();
} catch (final Exception e) {
LOGGER.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -125,6 +126,16 @@ public boolean isCommitContextOutdated(final SubscriptionCommitContext commitCon
return broker.isCommitContextOutdated(commitContext);
}

public List<String> fetchTopicNamesToUnsubscribe(
final ConsumerConfig consumerConfig, final Set<String> topicNames) {
final String consumerGroupId = consumerConfig.getConsumerGroupId();
final SubscriptionBroker broker = consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
return Collections.emptyList();
}
return broker.fetchTopicNamesToUnsubscribe(topicNames);
}

/////////////////////////////// broker ///////////////////////////////

public boolean isBrokerExist(final String consumerGroupId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,25 @@ public boolean isCommitContextOutdated(final SubscriptionCommitContext commitCon
return prefetchingQueue.isCommitContextOutdated(commitContext);
}

public List<String> fetchTopicNamesToUnsubscribe(final Set<String> topicNames) {
final List<String> topicNamesToUnsubscribe = new ArrayList<>();

for (final String topicName : topicNames) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
// If there is no prefetching queue for the topic, check if it's completed
if (Objects.isNull(prefetchingQueue) && completedTopicNames.containsKey(topicName)) {
LOGGER.info(
"Subscription: prefetching queue bound to topic [{}] for consumer group [{}] is completed, reply to client heartbeat request",
topicName,
brokerId);
topicNamesToUnsubscribe.add(topicName);
}
}

return topicNamesToUnsubscribe;
}

/////////////////////////////// prefetching queue ///////////////////////////////

public void bindPrefetchingQueue(
Expand Down
Loading
Loading