diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 58ac4d596bcc2..b7fa3bdee2ce6 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -119,4 +119,16 @@ public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) setProperty("cache_last_values_for_load", String.valueOf(cacheLastValuesForLoad)); return this; } + + @Override + public DataNodeConfig setWalThrottleSize(long walThrottleSize) { + setProperty("wal_throttle_threshold_in_byte", String.valueOf(walThrottleSize)); + return this; + } + + @Override + public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) { + setProperty("delete_wal_files_period_in_ms", String.valueOf(deleteWalFilesPeriodInMs)); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 3b6d840698190..0e33674fdead3 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -818,4 +818,8 @@ public long getPid() { return -1; } } + + public Process getInstance() { + return instance; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index 63f50d15958d3..1f61ff4289cfc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -78,4 +78,14 @@ public DataNodeConfig setLoadLastCacheStrategy(String strategyName) { public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) { return this; } + + @Override + public DataNodeConfig setWalThrottleSize(long walThrottleSize) { + return this; + } + + @Override + public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index 353fdef7f2532..4d2e4435bdbe4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -45,4 +45,8 @@ DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( DataNodeConfig setLoadLastCacheStrategy(String strategyName); DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad); + + DataNodeConfig setWalThrottleSize(long walThrottleSize); + + DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java new file mode 100644 index 0000000000000..5812e255d1639 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.it; + +import org.apache.iotdb.it.env.cluster.env.SimpleEnv; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Date; + +import static org.junit.Assert.fail; + +/** Tests that may not be satisfied with the default cluster settings. */ +@RunWith(IoTDBTestRunner.class) +@Category({ClusterIT.class}) +public class IoTDBCustomizedClusterIT { + + private final Logger logger = LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class); + + /** + * When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try + * deleting wal forever, which will block the DataNode from exiting, because task of deleting wal + * submitted by the ShutdownHook cannot be executed. This test ensures that this blocking is + * fixed. + */ + @Test + public void testWalThrottleStuck() + throws SQLException, + IoTDBConnectionException, + StatementExecutionException, + InterruptedException { + SimpleEnv simpleEnv = new SimpleEnv(); + simpleEnv + .getConfig() + .getDataNodeConfig() + .setWalThrottleSize(1) + .setDeleteWalFilesPeriodInMs(100); + simpleEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(3) + .setSchemaReplicationFactor(3) + .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus") + .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus"); + try { + simpleEnv.initClusterEnvironment(1, 3); + + int leaderIndex = -1; + try (Connection connection = simpleEnv.getConnection(); + Statement statement = connection.createStatement()) { + // write the first data + statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + // find the leader of the data region + int port = -1; + + ResultSet resultSet = statement.executeQuery("SHOW REGIONS"); + while (resultSet.next()) { + String regionType = resultSet.getString("Type"); + if (regionType.equals("DataRegion")) { + String role = resultSet.getString("Role"); + if (role.equals("Leader")) { + port = resultSet.getInt("RpcPort"); + break; + } + } + } + + if (port == -1) { + fail("Leader not found"); + } + + for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) { + if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) { + leaderIndex = i; + break; + } + } + } + + // stop a follower + int followerIndex = (leaderIndex + 1) % simpleEnv.getDataNodeWrapperList().size(); + simpleEnv.getDataNodeWrapperList().get(followerIndex).stop(); + System.out.println( + new Date() + + ":Stopping data node " + + simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString()); + + DataNodeWrapper leader = simpleEnv.getDataNodeWrapperList().get(leaderIndex); + // write to the leader to generate wal that cannot be synced + try (Session session = new Session(leader.getIp(), leader.getPort())) { + session.open(); + + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1) values (1,1)"); + } + + // wait for wal-delete thread to be scheduled + Thread.sleep(1000); + + // stop the leader + leader.getInstance().destroy(); + System.out.println(new Date() + ":Stopping data node " + leader.getIpAndPortString()); + // confirm the death of the leader + long startTime = System.currentTimeMillis(); + while (leader.isAlive()) { + if (System.currentTimeMillis() - startTime > 30000) { + fail("Leader does not exit after 30s"); + } + } + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } +} diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index c3a0665be6adf..e196df432091a 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -105,8 +105,8 @@ public synchronized void start() { public synchronized void stop() { if (!threads.isEmpty()) { threads.forEach(LogDispatcherThread::setStopped); - threads.forEach(LogDispatcherThread::processStopped); executorService.shutdownNow(); + threads.forEach(LogDispatcherThread::processStopped); int timeout = 10; try { if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java index fe00939050e1e..c5a426d88b8be 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java @@ -44,8 +44,9 @@ public SyncStatus(IndexController controller, IoTConsensusConfig config) { * @throws InterruptedException */ public synchronized void addNextBatch(Batch batch) throws InterruptedException { - while (pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum() - || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) { + while ((pendingBatches.size() >= config.getReplication().getMaxPendingBatchesNum() + || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), false)) + && !Thread.interrupted()) { wait(); } pendingBatches.add(batch); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java index 9fb29e7c6124d..731c4b09da1bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java @@ -49,15 +49,43 @@ public class DataNodeShutdownHook extends Thread { private static final Logger logger = LoggerFactory.getLogger(DataNodeShutdownHook.class); private final TDataNodeLocation nodeLocation; + private Thread watcherThread; public DataNodeShutdownHook(TDataNodeLocation nodeLocation) { super(ThreadName.DATANODE_SHUTDOWN_HOOK.getName()); this.nodeLocation = nodeLocation; } + private void startWatcher() { + Thread hookThread = Thread.currentThread(); + watcherThread = + new Thread( + () -> { + while (!Thread.interrupted()) { + try { + Thread.sleep(10000); + StackTraceElement[] stackTrace = hookThread.getStackTrace(); + StringBuilder stackTraceBuilder = + new StringBuilder("Stack trace of shutdown hook:\n"); + for (StackTraceElement traceElement : stackTrace) { + stackTraceBuilder.append(traceElement.toString()).append("\n"); + } + logger.info(stackTraceBuilder.toString()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + }, + "ShutdownHookWatcher"); + watcherThread.setDaemon(true); + watcherThread.start(); + } + @Override public void run() { logger.info("DataNode exiting..."); + startWatcher(); // Stop external rpc service firstly. ExternalRPCService.getInstance().stop(); @@ -77,7 +105,6 @@ public void run() { .equals(ConsensusFactory.RATIS_CONSENSUS)) { StorageEngine.getInstance().syncCloseAllProcessor(); } - WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes(); // We did this work because the RatisConsensus recovery mechanism is different from other // consensus algorithms, which will replace the underlying storage engine based on its @@ -114,6 +141,8 @@ public void run() { "DataNode exits. Jvm memory usage: {}", MemUtils.bytesCntToStr( Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())); + + watcherThread.interrupt(); } private void triggerSnapshotForAllDataRegion() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index 7d9f7bb467951..300cc10ad81bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -180,7 +180,7 @@ private void deleteOutdatedFiles() { // threshold, the system continues to delete expired files until the disk size is smaller than // the threshold. boolean firstLoop = true; - while ((firstLoop || shouldThrottle()) && !Thread.interrupted()) { + while ((firstLoop || shouldThrottle())) { deleteOutdatedFilesInWALNodes(); if (firstLoop && shouldThrottle()) { logger.warn( @@ -189,6 +189,10 @@ private void deleteOutdatedFiles() { getThrottleThreshold()); } firstLoop = false; + if (Thread.interrupted()) { + logger.info("Timed wal delete thread is interrupted."); + return; + } } } @@ -267,12 +271,15 @@ public void stop() { if (config.getWalMode() == WALMode.DISABLE) { return; } - + logger.info("Stopping WALManager"); if (walDeleteThread != null) { shutdownThread(walDeleteThread, ThreadName.WAL_DELETE); walDeleteThread = null; } + logger.info("Deleting outdated files before exiting"); + deleteOutdatedFilesInWALNodes(); clear(); + logger.info("WALManager stopped"); } private void shutdownThread(ExecutorService thread, ThreadName threadName) {