diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java index f9315ac787f9e..a2fe36a3ed5ab 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java @@ -24,9 +24,10 @@ import org.apache.commons.lang3.SystemUtils; import org.apache.tsfile.utils.Pair; -import java.io.File; -import java.io.IOException; +import java.io.*; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -98,15 +99,71 @@ public static int[] searchAvailablePorts() { } private static boolean checkPortsAvailable(final List ports) { - final String cmd = getSearchAvailablePortCmd(ports); try { - return Runtime.getRuntime().exec(cmd).waitFor() == 1; - } catch (final IOException ignore) { - // ignore - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); + return listPortOccupation(ports).isEmpty(); + } catch (IOException e) { + IoTDBTestLogger.logger.error("Cannot check available ports", e); + return false; } - return false; + } + + public static Map listPortOccupation(final List ports) + throws IOException { + return SystemUtils.IS_OS_WINDOWS + ? listPortOccupationWindows(ports) + : listPortOccupationUnix(ports); + } + + public static Map listPortOccupation( + final List ports, + String cmd, + int targetColumnLength, + int addressColumnIndex, + int pidColumnIndex) + throws IOException { + Process process = Runtime.getRuntime().exec(cmd); + Map result = new HashMap<>(); + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + String[] split = line.trim().split("\\s+"); + if (split.length != targetColumnLength) { + continue; + } + String localAddress = split[addressColumnIndex]; + for (Integer port : ports) { + if (localAddress.endsWith(":" + port)) { + result.put(port, Long.parseLong(split[pidColumnIndex])); + break; + } + } + } + } catch (EOFException ignored) { + } + return result; + } + + /** + * List occupied port and the associated pid on windows. + * + * @param ports ports to be checked + * @return (occupiedPort, pid) pairs + */ + public static Map listPortOccupationWindows(final List ports) + throws IOException { + return listPortOccupation(ports, "netstat -aon -p tcp", 5, 1, 4); + } + + /** + * List occupied port and the associated pid on Unix. + * + * @param ports ports to be checked + * @return (occupiedPort, pid) pairs + */ + public static Map listPortOccupationUnix(final List ports) + throws IOException { + return listPortOccupation(ports, "lsof -iTCP -sTCP:LISTEN -P -n", 10, 9, 1); } private static String getSearchAvailablePortCmd(final List ports) { @@ -115,7 +172,7 @@ private static String getSearchAvailablePortCmd(final List ports) { private static String getWindowsSearchPortCmd(final List ports) { return "netstat -aon -p tcp | findStr " - + ports.stream().map(v -> "/C:'127.0.0.1:" + v + "'").collect(Collectors.joining(" ")); + + ports.stream().map(v -> "/C:\"127.0.0.1:" + v + "\"").collect(Collectors.joining(" ")); } private static String getUnixSearchPortCmd(final List ports) { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index fa2cb85d19191..e68a6d550c28b 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -21,11 +21,13 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.exception.PortOccupiedException; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; @@ -78,13 +80,7 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Random; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -223,25 +219,9 @@ protected void initEnvironment( final RequestDelegate configNodesDelegate = new SerialRequestDelegate<>(configNodeEndpoints); for (int i = 1; i < configNodesNum; i++) { - final ConfigNodeWrapper configNodeWrapper = - new ConfigNodeWrapper( - false, - seedConfigNode, - testClassName, - testMethodName, - EnvUtils.searchAvailablePorts(), - index, - this instanceof MultiClusterEnv, - startTime); + ConfigNodeWrapper configNodeWrapper = newConfigNode(); this.configNodeWrapperList.add(configNodeWrapper); configNodeEndpoints.add(configNodeWrapper.getIpAndPortString()); - configNodeWrapper.createNodeDir(); - configNodeWrapper.changeConfig( - (MppConfigNodeConfig) clusterConfig.getConfigNodeConfig(), - (MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(), - (MppJVMConfig) clusterConfig.getConfigNodeJVMConfig()); - configNodeWrapper.createLogDir(); - configNodeWrapper.setKillPoints(configNodeKillPoints); configNodesDelegate.addRequest( () -> { configNodeWrapper.start(); @@ -259,24 +239,9 @@ protected void initEnvironment( final RequestDelegate dataNodesDelegate = new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT); for (int i = 0; i < dataNodesNum; i++) { - final DataNodeWrapper dataNodeWrapper = - new DataNodeWrapper( - seedConfigNode, - testClassName, - testMethodName, - EnvUtils.searchAvailablePorts(), - index, - this instanceof MultiClusterEnv, - startTime); - this.dataNodeWrapperList.add(dataNodeWrapper); + DataNodeWrapper dataNodeWrapper = newDataNode(); dataNodeEndpoints.add(dataNodeWrapper.getIpAndPortString()); - dataNodeWrapper.createNodeDir(); - dataNodeWrapper.changeConfig( - (MppDataNodeConfig) clusterConfig.getDataNodeConfig(), - (MppCommonConfig) clusterConfig.getDataNodeCommonConfig(), - (MppJVMConfig) clusterConfig.getDataNodeJVMConfig()); - dataNodeWrapper.createLogDir(); - dataNodeWrapper.setKillPoints(dataNodeKillPoints); + this.dataNodeWrapperList.add(dataNodeWrapper); dataNodesDelegate.addRequest( () -> { dataNodeWrapper.start(); @@ -299,6 +264,49 @@ protected void initEnvironment( checkClusterStatusWithoutUnknown(); } + private ConfigNodeWrapper newConfigNode() { + final ConfigNodeWrapper configNodeWrapper = + new ConfigNodeWrapper( + false, + configNodeWrapperList.get(0).getIpAndPortString(), + getTestClassName(), + testMethodName, + EnvUtils.searchAvailablePorts(), + index, + this instanceof MultiClusterEnv, + startTime); + + configNodeWrapper.createNodeDir(); + configNodeWrapper.changeConfig( + (MppConfigNodeConfig) clusterConfig.getConfigNodeConfig(), + (MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(), + (MppJVMConfig) clusterConfig.getConfigNodeJVMConfig()); + configNodeWrapper.createLogDir(); + configNodeWrapper.setKillPoints(configNodeKillPoints); + return configNodeWrapper; + } + + private DataNodeWrapper newDataNode() { + final DataNodeWrapper dataNodeWrapper = + new DataNodeWrapper( + configNodeWrapperList.get(0).getIpAndPortString(), + getTestClassName(), + testMethodName, + EnvUtils.searchAvailablePorts(), + index, + this instanceof MultiClusterEnv, + startTime); + + dataNodeWrapper.createNodeDir(); + dataNodeWrapper.changeConfig( + (MppDataNodeConfig) clusterConfig.getDataNodeConfig(), + (MppCommonConfig) clusterConfig.getDataNodeCommonConfig(), + (MppJVMConfig) clusterConfig.getDataNodeJVMConfig()); + dataNodeWrapper.createLogDir(); + dataNodeWrapper.setKillPoints(dataNodeKillPoints); + return dataNodeWrapper; + } + private void startAINode(final String seedConfigNode, final String testClassName) { final String aiNodeEndPoint; final AINodeWrapper aiNodeWrapper = @@ -351,12 +359,14 @@ private Map countNodeStatus(final Map nodeStat } public void checkNodeInStatus(int nodeId, NodeStatus expectation) { - checkClusterStatus(nodeStatusMap -> expectation.getStatus().equals(nodeStatusMap.get(nodeId))); + checkClusterStatus( + nodeStatusMap -> expectation.getStatus().equals(nodeStatusMap.get(nodeId)), m -> true); } public void checkClusterStatusWithoutUnknown() { checkClusterStatus( - nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals)); + nodeStatusMap -> nodeStatusMap.values().stream().noneMatch("Unknown"::equals), + processStatus -> processStatus.values().stream().noneMatch(i -> i != 0)); testJDBCConnection(); } @@ -366,6 +376,10 @@ public void checkClusterStatusOneUnknownOtherRunning() { Map count = countNodeStatus(nodeStatus); return count.getOrDefault("Unknown", 0) == 1 && count.getOrDefault("Running", 0) == nodeStatus.size() - 1; + }, + processStatus -> { + long aliveProcessCount = processStatus.values().stream().filter(i -> i == 0).count(); + return aliveProcessCount == processStatus.size() - 1; }); testJDBCConnection(); } @@ -374,22 +388,41 @@ public void checkClusterStatusOneUnknownOtherRunning() { * check whether all nodes' status match the provided predicate with RPC. after retryCount times, * if the status of all nodes still not match the predicate, throw AssertionError. * - * @param statusCheck the predicate to test the status of nodes + * @param nodeStatusCheck the predicate to test the status of nodes */ - public void checkClusterStatus(final Predicate> statusCheck) { + public void checkClusterStatus( + final Predicate> nodeStatusCheck, + final Predicate> processStatusCheck) { logger.info("Testing cluster environment..."); TShowClusterResp showClusterResp; Exception lastException = null; - boolean flag; + boolean passed; + boolean showClusterPassed = true; + boolean nodeSizePassed = true; + boolean nodeStatusPassed = true; + boolean processStatusPassed = true; + TSStatus showClusterStatus = null; + int actualNodeSize = 0; + Map lastNodeStatus = null; + Map processStatusMap = new HashMap<>(); + for (int i = 0; i < retryCount; i++) { try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { - flag = true; + passed = true; + showClusterPassed = true; + nodeSizePassed = true; + nodeStatusPassed = true; + processStatusPassed = true; + processStatusMap.clear(); + showClusterResp = client.showCluster(); // Check resp status if (showClusterResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - flag = false; + passed = false; + showClusterPassed = false; + showClusterStatus = showClusterResp.getStatus(); } // Check the number of nodes @@ -397,18 +430,66 @@ public void checkClusterStatus(final Predicate> statusCheck != configNodeWrapperList.size() + dataNodeWrapperList.size() + aiNodeWrapperList.size()) { - flag = false; + passed = false; + nodeSizePassed = false; + actualNodeSize = showClusterResp.getNodeStatusSize(); } // Check the status of nodes - if (flag) { - flag = statusCheck.test(showClusterResp.getNodeStatus()); + if (passed) { + passed = nodeStatusCheck.test(showClusterResp.getNodeStatus()); + if (!passed) { + nodeStatusPassed = false; + lastNodeStatus = showClusterResp.getNodeStatus(); + } + } + + // check the status of processes + for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { + boolean alive = dataNodeWrapper.getInstance().isAlive(); + if (!alive) { + processStatusMap.put(dataNodeWrapper, dataNodeWrapper.getInstance().waitFor()); + } else { + processStatusMap.put(dataNodeWrapper, 0); + } + } + for (ConfigNodeWrapper nodeWrapper : configNodeWrapperList) { + boolean alive = nodeWrapper.getInstance().isAlive(); + if (!alive) { + processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); + } else { + processStatusMap.put(nodeWrapper, 0); + } + } + for (AINodeWrapper nodeWrapper : aiNodeWrapperList) { + boolean alive = nodeWrapper.getInstance().isAlive(); + if (!alive) { + processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); + } else { + processStatusMap.put(nodeWrapper, 0); + } + } + + processStatusPassed = processStatusCheck.test(processStatusMap); + if (!processStatusPassed) { + passed = false; } - if (flag) { + if (!processStatusPassed) { + handleProcessStatus(processStatusMap); + } + + if (passed) { logger.info("The cluster is now ready for testing!"); return; } + logger.info( + "Retry {}: showClusterPassed={}, nodeSizePassed={}, nodeStatusPassed={}, processStatusPassed={}", + i, + showClusterPassed, + nodeSizePassed, + nodeStatusPassed, + processStatusPassed); } catch (final Exception e) { lastException = e; } @@ -425,10 +506,83 @@ public void checkClusterStatus(final Predicate> statusCheck lastException.getMessage(), lastException); } + if (!showClusterPassed) { + logger.error("Show cluster failed: {}", showClusterStatus); + } + if (!nodeSizePassed) { + logger.error("Only {} nodes detected", actualNodeSize); + } + if (!nodeStatusPassed) { + logger.error("Some node status incorrect: {}", lastNodeStatus); + } + if (!processStatusPassed) { + logger.error( + "Some process status incorrect: {}", + processStatusMap.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().getId(), Map.Entry::getValue))); + + if (processStatusMap.containsValue(TSStatusCode.PORT_OCCUPIED.getStatusCode())) { + throw new PortOccupiedException(); + } + } + throw new AssertionError( String.format("After %d times retry, the cluster can't work!", retryCount)); } + private void handleProcessStatus(Map processStatusMap) { + for (Map.Entry entry : processStatusMap.entrySet()) { + Integer statusCode = entry.getValue(); + AbstractNodeWrapper nodeWrapper = entry.getKey(); + if (statusCode != 0) { + logger.info("Node {} is not running due to {}", nodeWrapper.getId(), statusCode); + } + if (statusCode == TSStatusCode.PORT_OCCUPIED.getStatusCode()) { + try { + Map portOccupationMap = + EnvUtils.listPortOccupation( + Arrays.stream(nodeWrapper.getPortList()).boxed().collect(Collectors.toList())); + logger.info("Check port result: {}", portOccupationMap); + for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { + if (portOccupationMap.containsValue(dataNodeWrapper.getPid())) { + logger.info( + "A port is occupied by another DataNode {}-{}, restart it", + dataNodeWrapper.getIpAndPortString(), + dataNodeWrapper.getPid()); + dataNodeWrapper.stop(); + dataNodeWrapper.start(); + } + } + for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { + if (portOccupationMap.containsValue(configNodeWrapper.getPid())) { + logger.info( + "A port is occupied by another ConfigNode {}-{}, restart it", + configNodeWrapper.getIpAndPortString(), + configNodeWrapper.getPid()); + configNodeWrapper.stop(); + configNodeWrapper.start(); + } + } + for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) { + if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) { + logger.info( + "A port is occupied by another AINode {}-{}, restart it", + aiNodeWrapper.getIpAndPortString(), + aiNodeWrapper.getPid()); + aiNodeWrapper.stop(); + aiNodeWrapper.start(); + } + } + } catch (IOException e) { + logger.error("Cannot check port occupation", e); + } + logger.info("Restarting it"); + nodeWrapper.stop(); + nodeWrapper.start(); + } + } + } + @Override public void cleanClusterEnvironment() { final List allNodeWrappers = @@ -890,6 +1044,10 @@ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() for (int i = 0; i < retryCount; i++) { for (final ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { try { + if (!configNodeWrapper.getInstance().isAlive()) { + throw new IOException("ConfigNode " + configNodeWrapper.getId() + " is not alive"); + } + lastErrorNode = configNodeWrapper; final SyncConfigNodeIServiceClient client = clientManager.borrowClient( @@ -1335,4 +1493,8 @@ public void registerConfigNodeKillPoints(final List killPoints) { public void registerDataNodeKillPoints(final List killPoints) { this.dataNodeKillPoints = killPoints; } + + public void clearClientManager() { + clientManager.clearAll(); + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 0e33674fdead3..27c688dc32842 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -546,7 +546,10 @@ public void stop() { this.instance.destroy(); try { if (!this.instance.waitFor(20, TimeUnit.SECONDS)) { - this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS); + logger.warn("Node {} does not exit within 20s, killing it", getId()); + if (!this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS)) { + logger.error("Cannot forcibly stop node {}", getId()); + } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -822,4 +825,8 @@ public long getPid() { public Process getInstance() { return instance; } + + public int[] getPortList() { + return portList; + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java index 5812e255d1639..07517324e1f39 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -19,10 +19,12 @@ package org.apache.iotdb.db.it; +import org.apache.iotdb.commons.exception.PortOccupiedException; +import org.apache.iotdb.it.env.cluster.env.AbstractEnv; import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; -import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.DailyIT; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; @@ -33,21 +35,142 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.*; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Date; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; /** Tests that may not be satisfied with the default cluster settings. */ @RunWith(IoTDBTestRunner.class) -@Category({ClusterIT.class}) public class IoTDBCustomizedClusterIT { private final Logger logger = LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class); + @FunctionalInterface + private interface RestartAction { + void act(Statement statement, int round, AbstractEnv env) throws Exception; + } + + @Category(DailyIT.class) + @Test + public void testRepeatedlyRestartWholeClusterWithWrite() throws Exception { + testRepeatedlyRestartWholeCluster( + (s, i, env) -> { + if (i != 0) { + ResultSet resultSet = s.executeQuery("SELECT last s1 FROM root.**"); + ResultSetMetaData metaData = resultSet.getMetaData(); + assertEquals(4, metaData.getColumnCount()); + int cnt = 0; + while (resultSet.next()) { + cnt++; + StringBuilder result = new StringBuilder(); + for (int j = 0; j < metaData.getColumnCount(); j++) { + result + .append(metaData.getColumnName(j + 1)) + .append(":") + .append(resultSet.getString(j + 1)) + .append(","); + } + System.out.println(result); + } + } + s.execute("INSERT INTO root.db1.d1 (time, s1) VALUES (1, 1)"); + s.execute("INSERT INTO root.db2.d1 (time, s1) VALUES (1, 1)"); + s.execute("INSERT INTO root.db3.d1 (time, s1) VALUES (1, 1)"); + }); + } + + @Category(DailyIT.class) + @Test + public void testRepeatedlyRestartWholeClusterWithPipeCreation() throws Exception { + SimpleEnv receiverEnv = new SimpleEnv(); + receiverEnv.initClusterEnvironment(1, 1); + try { + testRepeatedlyRestartWholeCluster( + (s, i, env) -> { + // use another thread to make creating and restart concurrent + // otherwise, all tasks will be done before restart and the cluster will not attempt to + // recover tasks + s.execute( + String.format( + "CREATE PIPE p%d_1 WITH SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", + i, receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + s.execute( + String.format( + "CREATE PIPE p%d_2 WITH SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", + i, receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + s.execute( + String.format( + "CREATE PIPE p%d_3 WITH SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", + i, receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + }); + } finally { + receiverEnv.cleanClusterEnvironment(); + } + } + + private void testRepeatedlyRestartWholeCluster(RestartAction restartAction) throws Exception { + SimpleEnv simpleEnv = new SimpleEnv(); + try { + simpleEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(3) + .setSchemaReplicationFactor(3) + .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus") + .setConfigNodeConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus") + .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus"); + simpleEnv.initClusterEnvironment(3, 3); + + int repeat = 100; + for (int i = 0; i < repeat; i++) { + logger.info("Round {} restart", i); + try (Connection connection = simpleEnv.getConnection(); + Statement statement = connection.createStatement()) { + ResultSet resultSet = statement.executeQuery("SHOW CLUSTER"); + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + while (resultSet.next()) { + StringBuilder row = new StringBuilder(); + for (int j = 0; j < columnCount; j++) { + row.append(metaData.getColumnName(j + 1)) + .append(":") + .append(resultSet.getString(j + 1)) + .append(","); + } + System.out.println(row); + } + + restartAction.act(statement, i, simpleEnv); + } + + simpleEnv.shutdownAllConfigNodes(); + simpleEnv.shutdownAllDataNodes(); + + simpleEnv.startAllConfigNodes(); + simpleEnv.startAllDataNodes(); + + simpleEnv.clearClientManager(); + + try { + simpleEnv.checkClusterStatusWithoutUnknown(); + } catch (PortOccupiedException e) { + logger.info( + "Some ports are occupied during restart, which cannot be processed, just pass the test."); + return; + } + } + + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } + /** * When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try * deleting wal forever, which will block the DataNode from exiting, because task of deleting wal diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 3f7eb8f6507b7..6359eafde5c53 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -40,6 +40,8 @@ public enum TSStatusCode { UNSUPPORTED_SQL_DIALECT(205), + PORT_OCCUPIED(206), + // General Error UNSUPPORTED_OPERATION(300), EXECUTE_STATEMENT_ERROR(301), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index 7f0db7a37840b..39612202720b6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -40,6 +40,7 @@ import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.cpu.CpuUsageMetrics; +import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.client.CnToCnNodeRequestType; import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool; @@ -67,6 +68,7 @@ import org.apache.iotdb.metrics.metricsets.system.SystemMetrics; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.ratis.util.ExitUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,6 +108,8 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { protected ConfigManager configManager; + private int exitStatusCode = 0; + public ConfigNode() { super("ConfigNode"); // We do not init anything here, so that we can re-initialize the instance in IT. @@ -121,6 +125,8 @@ public static void main(String[] args) throws Exception { "{} default charset is: {}", ConfigNodeConstant.GLOBAL_NAME, Charset.defaultCharset().displayName()); + // let IoTDB handle the exception instead of ratis + ExitUtils.disableSystemExit(); ConfigNode configNode = new ConfigNode(); int returnCode = configNode.run(args); if (returnCode != 0) { @@ -140,6 +146,7 @@ protected void start() throws IoTDBException { throw new IoTDBException("Error starting", -1); } active(); + LOGGER.info("IoTDB started"); } @Override @@ -266,8 +273,9 @@ public void active() { "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect."); stop(); } - } catch (StartupException | IOException | IllegalPathException e) { + } catch (Throwable e) { LOGGER.error("Meet error while starting up.", e); + exitStatusCode = StatusUtils.retrieveExitStatusCode(e); stop(); } } @@ -467,7 +475,7 @@ public void stop() { } catch (IOException e) { LOGGER.error("Meet error when deactivate ConfigNode", e); } - System.exit(-1); + System.exit(exitStatusCode); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 714edde57c2a4..55268d72ab253 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -114,6 +114,7 @@ import org.apache.iotdb.udf.api.exception.UDFManagementException; import org.antlr.v4.runtime.CommonTokenStream; +import org.apache.ratis.util.ExitUtils; import org.apache.thrift.TException; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -134,6 +135,7 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME; +import static org.apache.iotdb.commons.utils.StatusUtils.retrieveExitStatusCode; import static org.apache.iotdb.db.conf.IoTDBStartCheck.PROPERTIES_FILE_NAME; public class DataNode extends ServerCommandLine implements DataNodeMBean { @@ -193,6 +195,8 @@ public static DataNode getInstance() { public static void main(String[] args) { logger.info("IoTDB-DataNode environment variables: {}", IoTDBConfig.getEnvironmentVariables()); logger.info("IoTDB-DataNode default charset is: {}", Charset.defaultCharset().displayName()); + // let IoTDB handle the exception instead of ratis + ExitUtils.disableSystemExit(); DataNode dataNode = new DataNode(); int returnCode = dataNode.run(args); if (returnCode != 0) { @@ -202,6 +206,7 @@ public static void main(String[] args) { @Override protected void start() { + logger.info("Starting DataNode..."); boolean isFirstStart; try { // Check if this DataNode is start for the first time and do other pre-checks @@ -274,11 +279,13 @@ protected void start() { dataRegionConsensusStarted = true; } - } catch (StartupException | IOException e) { + } catch (Throwable e) { + int exitStatusCode = retrieveExitStatusCode(e); logger.error("Fail to start server", e); stop(); - System.exit(-1); + System.exit(exitStatusCode); } + logger.info("DataNode started"); } @Override @@ -683,7 +690,7 @@ private void active() throws StartupException { setUp(); } catch (StartupException e) { logger.error("Meet error while starting up.", e); - throw new StartupException("Error in activating IoTDB DataNode."); + throw e; } logger.info("IoTDB DataNode has started."); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index 56bc67b63993e..79fcc799ae93c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -88,6 +88,11 @@ public void clear(K node) { }); } + @Override + public void clearAll() { + pool.clear(); + } + @Override public void close() { pool.close(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java index 81344e46719f4..cba9b840740de 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java @@ -44,6 +44,9 @@ public interface IClientManager { */ void clear(K node); + /** clear all clients; */ + void clearAll(); + /** close IClientManager, which means closing all clients for all nodes. */ void close(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java new file mode 100644 index 0000000000000..bbde0c8857644 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.commons.exception; + +import java.util.Arrays; + +public class PortOccupiedException extends RuntimeException { + public PortOccupiedException() { + super("Some ports are occupied"); + } + + public PortOccupiedException(int... ports) { + super(String.format("Ports %s are occupied", Arrays.toString(ports))); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java index d05fb8c9d7e2f..ac64699b9cf46 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java @@ -25,6 +25,8 @@ import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.ratis.util.ExitUtils; + import java.util.Arrays; import java.util.HashSet; import java.util.Map; @@ -248,4 +250,16 @@ private static boolean needRetryHelperForSingleStatus(int statusCode) { public static boolean isUnknownError(int statusCode) { return UNKNOWN_ERRORS.contains(statusCode); } + + public static int retrieveExitStatusCode(Throwable e) { + if (e instanceof ExitUtils.ExitException && e.getCause() != null) { + e = e.getCause(); + } + if (e.getMessage().contains("because Could not create ServerSocket") + || e.getMessage().contains("Failed to bind to address") + || e.getMessage().contains("Address already in use: bind")) { + return TSStatusCode.PORT_OCCUPIED.getStatusCode(); + } + return -1; + } }