diff --git a/.github/workflows/pipe-it.yml b/.github/workflows/pipe-it.yml index ba09fb4e27e31..2c1b03623d1fd 100644 --- a/.github/workflows/pipe-it.yml +++ b/.github/workflows/pipe-it.yml @@ -438,7 +438,7 @@ jobs: matrix: java: [ 17 ] # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ ScalableSingleNodeMode ] + cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} @@ -606,7 +606,7 @@ jobs: matrix: java: [ 17 ] # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] + cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} @@ -690,7 +690,7 @@ jobs: matrix: java: [ 17 ] # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] + cluster1: [ ScalableSingleNodeMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java index 5b8ec39327414..3162139fb665e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java @@ -19,10 +19,15 @@ package org.apache.iotdb.subscription.it; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.session.Session; + import org.awaitility.Awaitility; import org.awaitility.core.ConditionFactory; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; public class IoTDBSubscriptionITConstant { @@ -40,4 +45,27 @@ public class IoTDBSubscriptionITConstant { public static final long SLEEP_NS = 1_000_000_000L; public static final long POLL_TIMEOUT_MS = 10_000L; + + @FunctionalInterface + public interface WrappedVoidSupplier { + void get() throws Throwable; + } + + public static void AWAIT_WITH_FLUSH(final Session session, final WrappedVoidSupplier assertions) { + AWAIT.untilAsserted( + () -> { + session.executeNonQueryStatement("flush"); + assertions.get(); + }); + } + + public static Consumer FORCE_SCALABLE_SINGLE_NODE_MODE = + env -> + env.getConfig() + .getCommonConfig() + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setSchemaReplicationFactor(1) + .setDataReplicationFactor(1); } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java index 4b0164801cf18..288f202ee9647 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/AbstractSubscriptionTreeRegressionIT.java @@ -29,6 +29,7 @@ import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer; import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.WrappedVoidSupplier; import org.apache.iotdb.subscription.it.triple.AbstractSubscriptionTripleIT; import org.apache.thrift.TException; @@ -57,6 +58,7 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT; import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS; public abstract class AbstractSubscriptionTreeRegressionIT extends AbstractSubscriptionTripleIT { @@ -359,26 +361,6 @@ public List consume_tsfile(SubscriptionTreePullConsumer consumer, List< return results; } - public static void consume_data_long( - SubscriptionTreePullConsumer consumer, Session session, Long timeout) - throws StatementExecutionException, InterruptedException, IoTDBConnectionException { - timeout = System.currentTimeMillis() + timeout; - while (System.currentTimeMillis() < timeout) { - List messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); - if (messages.isEmpty()) { - Thread.sleep(1000); - } - for (final SubscriptionMessage message : messages) { - for (final Iterator it = message.getSessionDataSetsHandler().tabletIterator(); - it.hasNext(); ) { - final Tablet tablet = it.next(); - session.insertTablet(tablet); - } - } - consumer.commitSync(messages); - } - } - public void consume_data(SubscriptionTreePullConsumer consumer) throws TException, IOException, @@ -388,6 +370,66 @@ public void consume_data(SubscriptionTreePullConsumer consumer) consume_data(consumer, session_dest); } + public void consume_data_await( + SubscriptionTreePullConsumer consumer, + Session session, + List assertions) { + AWAIT.untilAsserted( + () -> { + List messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); + if (messages.isEmpty()) { + session_src.executeNonQueryStatement("flush"); + } + for (final SubscriptionMessage message : messages) { + for (final Iterator it = message.getSessionDataSetsHandler().tabletIterator(); + it.hasNext(); ) { + final Tablet tablet = it.next(); + session.insertTablet(tablet); + } + } + consumer.commitSync(messages); + for (final WrappedVoidSupplier assertion : assertions) { + assertion.get(); + } + }); + } + + public void consume_tsfile_await( + SubscriptionTreePullConsumer consumer, List devices, List expected) { + final List counters = new ArrayList<>(devices.size()); + for (int i = 0; i < devices.size(); i++) { + counters.add(new AtomicInteger(0)); + } + AWAIT.untilAsserted( + () -> { + List messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); + if (messages.isEmpty()) { + session_src.executeNonQueryStatement("flush"); + } + for (final SubscriptionMessage message : messages) { + final SubscriptionTsFileHandler tsFileHandler = message.getTsFileHandler(); + try (final TsFileReader tsFileReader = tsFileHandler.openReader()) { + for (int i = 0; i < devices.size(); i++) { + final Path path = new Path(devices.get(i), "s_0", true); + final QueryDataSet dataSet = + tsFileReader.query( + QueryExpression.create(Collections.singletonList(path), null)); + while (dataSet.hasNext()) { + dataSet.next(); + counters.get(i).addAndGet(1); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + consumer.commitSync(messages); + for (int i = 0; i < devices.size(); i++) { + assertEquals(counters.get(i).get(), expected.get(i)); + } + }); + } + //////////////////////////// strict assertions //////////////////////////// public static void assertEquals(int actual, int expected) { diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java index f99d88b8a29c8..3468a6e93caea 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java @@ -26,6 +26,7 @@ import org.apache.iotdb.session.subscription.consumer.AckStrategy; import org.apache.iotdb.session.subscription.consumer.ConsumeResult; import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT; import org.apache.thrift.TException; @@ -81,6 +82,15 @@ public void setUp() throws Exception { } } + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception { @@ -107,6 +117,7 @@ private void insert_data(long timestamp, String device) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java index d340363a41bec..9ec58ef95f4bb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java @@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT; import org.apache.thrift.TException; @@ -69,6 +70,15 @@ public void setUp() throws Exception { session_src.executeNonQueryStatement("create database root.RootPullConsumeTsfile"); } + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception { diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java index a9fecef2dc90c..1ae3269f23943 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java @@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT; import org.apache.thrift.TException; @@ -83,6 +84,15 @@ public void setUp() throws Exception { assertTrue(subs.getTopic(topicName2).isPresent(), "Create show topics 2"); } + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception { diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java index 2cad94fc4ac73..c24326fed9563 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; @RunWith(IoTDBTestRunner.class) @@ -115,6 +116,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test @@ -132,13 +134,18 @@ public void do_test() assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after subscription"); insert_data(System.currentTimeMillis() - 30000L); // Consumption data - consume_data(consumer, session_dest); String sql = "select count(s_0) from " + device; - System.out.println("src: " + getCount(session_src, sql)); - check_count(8, sql, "Consumption data:" + pattern); - check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); - check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption data:d_1"); - check_count(0, "select count(s_0) from " + device2, "Consumption data:d_2"); + consume_data_await( + consumer, + session_dest, + Collections.singletonList( + () -> { + System.out.println("src: " + getCount(session_src, sql)); + check_count(8, sql, "Consumption data:" + pattern); + check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); + check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption data:d_1"); + check_count(0, "select count(s_0) from " + device2, "Consumption data:d_2"); + })); insert_data(System.currentTimeMillis()); // Unsubscribe consumer.unsubscribe(topicName); @@ -149,8 +156,14 @@ public void do_test() System.out.println("src: " + getCount(session_src, sql)); // Consumption data: Progress is not retained after unsubscribing and then re-subscribing. Full // synchronization. - consume_data(consumer, session_dest); - check_count(12, "select count(s_0) from " + device, "consume data again:s_0"); - check_count(12, "select count(s_1) from " + device, "Consumption data: s_1"); + consume_data_await( + consumer, + session_dest, + Collections.singletonList( + () -> { + System.out.println("src: " + getCount(session_src, sql)); + check_count(12, "select count(s_0) from " + device, "consume data again:s_0"); + check_count(12, "select count(s_1) from " + device, "Consumption data: s_1"); + })); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java index 2572c41de9369..49562e5314c96 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java @@ -24,6 +24,7 @@ import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT; import org.apache.thrift.TException; @@ -41,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /*** @@ -94,6 +96,16 @@ public void setUp() throws Exception { assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics"); } + // TODO: remove it later + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception { @@ -151,11 +163,7 @@ public void do_test() devices.add(device); devices.add(device2); devices.add(database2 + ".d_2"); - - List rowCounts = consume_tsfile(consumer, devices); - assertEquals(rowCounts.get(0), 10); - assertEquals(rowCounts.get(1), 1); - assertEquals(rowCounts.get(2), 1); + consume_tsfile_await(consumer, devices, Arrays.asList(10, 1, 1)); // Unsubscribe consumer.unsubscribe(topicName); assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after cancellation"); @@ -165,14 +173,6 @@ public void do_test() insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00 // Consumption data: Progress is not retained after canceling and re-subscribing. Full // synchronization. - rowCounts = consume_tsfile(consumer, devices); - - assertEquals( - rowCounts.get(0), - 15, - "Unsubscribe and resubscribe, progress is not retained. Full synchronization."); - assertEquals( - rowCounts.get(1), 1, "Cancel subscription and subscribe again," + database + ".d_1"); - assertEquals(rowCounts.get(2), 1, "Unsubscribe and resubscribe," + database2 + ".d_2"); + consume_tsfile_await(consumer, devices, Arrays.asList(15, 1, 1)); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java index d003cbceaedab..b90b46a3e2da9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java @@ -28,6 +28,7 @@ import org.apache.iotdb.session.subscription.consumer.ConsumeResult; import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer; import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT; import org.apache.thrift.TException; @@ -100,6 +101,16 @@ public void setUp() throws Exception { assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics"); } + // TODO: remove it later + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception { @@ -188,7 +199,8 @@ public void do_test() // Consumption data: Progress is not retained when re-subscribing after cancellation. Full // synchronization. - AWAIT.untilAsserted( + IoTDBSubscriptionITConstant.AWAIT_WITH_FLUSH( + session_src, () -> { check_count(12, "select count(s_0) from " + device, "consume data again:s_0 " + device); check_count(0, "select count(s_1) from " + device, "Consumption data: s_1 " + device); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java index 17fa08cd9e2ce..ecd515964d0b0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/treemodel/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java @@ -27,6 +27,7 @@ import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePushConsumer; import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType; import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.treemodel.regression.AbstractSubscriptionTreeRegressionIT; import org.apache.thrift.TException; @@ -122,6 +123,15 @@ public void setUp() throws Exception { subs.getTopics().forEach(System.out::println); } + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception {