[RTO/RPO] Topology awareness for query plan#15014
[RTO/RPO] Topology awareness for query plan#15014OneSizeFitsQuorum merged 19 commits intoapache:masterfrom
Conversation
854382e to
6ea3c15
Compare
OneSizeFitsQuorum
left a comment
There was a problem hiding this comment.
- Please fix this problem
2025-03-11 14:52:12,134 [AsyncDataNodeHeartbeatServiceClientPool-selector-99] ERROR o.a.t.a.TAsyncClientManager$SelectThread:117 - Ignoring uncaught exception in SelectThread
java.util.ConcurrentModificationException: null
at java.base/java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:970)
at java.base/java.util.LinkedList$ListItr.next(LinkedList.java:892)
at java.base/java.util.Collections$UnmodifiableCollection$1.next(Collections.java:1054)
at org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector.create(PhiAccrualDetector.java:94)
at org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector.isAvailable(PhiAccrualDetector.java:72)
at org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache.updateCurrentStatistics(ConfigNodeHeartbeatCache.java:70)
at org.apache.iotdb.confignode.manager.load.cache.LoadCache.lambda$updateNodeStatistics$21(LoadCache.java:380)
at java.base/java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780)
at org.apache.iotdb.confignode.manager.load.cache.LoadCache.updateNodeStatistics(LoadCache.java:380)
at org.apache.iotdb.confignode.manager.load.LoadManager.forceUpdateNodeCache(LoadManager.java:275)
at org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler.onError(DataNodeHeartbeatHandler.java:166)
at org.apache.thrift.async.TAsyncMethodCall.onError(TAsyncMethodCall.java:216)
at org.apache.thrift.async.TAsyncMethodCall.transition(TAsyncMethodCall.java:210)
at org.apache.thrift.async.TAsyncClientManager$SelectThread.transitionMethods(TAsyncClientManager.java:143)
at org.apache.thrift.async.TAsyncClientManager$SelectThread.run(TAsyncClientManager.java:113)
...-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
Show resolved
Hide resolved
...fignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
Show resolved
Hide resolved
...nfignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
Outdated
Show resolved
Hide resolved
...nfignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
Outdated
Show resolved
Hide resolved
...nfignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
Outdated
Show resolved
Hide resolved
...nfignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
Show resolved
Hide resolved
...nfignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
Outdated
Show resolved
Hide resolved
.../org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java
Outdated
Show resolved
Hide resolved
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
Outdated
Show resolved
Hide resolved
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
Show resolved
Hide resolved
...rc/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
Outdated
Show resolved
Hide resolved
...onfignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java
Outdated
Show resolved
Hide resolved
.../org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
Show resolved
Hide resolved
...nfignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
Show resolved
Hide resolved
integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
Show resolved
Hide resolved
...fignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
Show resolved
Hide resolved
...nfignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
Outdated
Show resolved
Hide resolved
|
|
||
| public synchronized void stopTopologyService() { | ||
| shouldRun.set(false); | ||
| topologyThread.shutdown(); |
There was a problem hiding this comment.
Maybe we don't need to shutdown the thread pool, but make sure that the thread completes and waits for 60 seconds before being recycled.
Careful design is required to ensure that this single-threaded thread pool only has a single task when switching leaders
...nfignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
Outdated
Show resolved
Hide resolved
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
Outdated
Show resolved
Hide resolved
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
Outdated
Show resolved
Hide resolved
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
Outdated
Show resolved
Hide resolved
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
Show resolved
Hide resolved
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
Show resolved
Hide resolved
|
|
||
| // Record FragmentInstances dispatched to same DataNode | ||
| private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap; | ||
| private final ClusterTopology topology = ClusterTopology.getInstance(); |
There was a problem hiding this comment.
better not using singleton, passing it in constructor will be better for UT code
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
Show resolved
Hide resolved
...main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
Show resolved
Hide resolved
...n/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java
Outdated
Show resolved
Hide resolved
.../org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java
Outdated
Show resolved
Hide resolved
|
|
||
| // Record FragmentInstances dispatched to same DataNode | ||
| private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap = new HashMap<>(); | ||
| private final ClusterTopology topology = ClusterTopology.getInstance(); |
| private final SymbolAllocator symbolAllocator; | ||
| private final Map<PlanNodeId, OrderingScheme> nodeOrderingMap = new HashMap<>(); | ||
| private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier dataNodeLocationSupplier; | ||
| private final ClusterTopology topology = ClusterTopology.getInstance(); |
There was a problem hiding this comment.
avoid using singleton
| final List<TRegionReplicaSet> allSets = | ||
| input.stream().map(Map.Entry::getKey).collect(Collectors.toList()); | ||
| final List<TRegionReplicaSet> candidates = getReachableCandidates(allSets); |
There was a problem hiding this comment.
why we need to firstly convert ti to List
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
Outdated
Show resolved
Hide resolved
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
Show resolved
Hide resolved
OneSizeFitsQuorum
left a comment
There was a problem hiding this comment.
LGTM, only minor issues
| long retryIntervalMS = 1000; | ||
| while (true) { | ||
| try (Connection connection = EnvFactory.getEnv().getConnection()) { | ||
| final List<BaseNodeWrapper> allDataNodes = |
There was a problem hiding this comment.
Remove this in next PR
| private void logAsymmetricPartition(final Map<Integer, Set<Integer>> partitioned) { | ||
| for (final int fromId : partitioned.keySet()) { | ||
| for (final int toId : partitioned.get(fromId)) { | ||
| if (partitioned.get(toId) == null || !partitioned.get(toId).contains(fromId)) { |
There was a problem hiding this comment.
It seems that if one node generates a symmetric network partition, all other nodes log an asymmetric network partition.
ConfigNode-leader should theoretically be able to identify whether symmetric or asymmetric partitions are generated. I think we can use latestTopology to detect whether symmetric or asymmetric network partitions are present. Because the key contains the full number of nodes, and the partitioned only contains the successful parts, it cannot be used as from for the failed nodes
| public class RootFIPlacementException extends IoTDBRuntimeException { | ||
| public RootFIPlacementException(Collection<TRegionReplicaSet> replicaSets) { | ||
| super( | ||
| "root FragmentInstance placement error: " + replicaSets.toString(), |
| } else if (t instanceof QueryInBatchStatementException) { | ||
| return RpcUtils.getStatus( | ||
| TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR + rootCause.getMessage()); | ||
| } else if (t instanceof RootFIPlacementException) { |
There was a problem hiding this comment.
merge them in one if






Introduction
This Patch implements two functionalities:
Part1. Topology awareness
Part2. How it affects query plan and execution
FragmentInstance, theTRegionReplicaSetwill be rewrite, only reachableDataNodeLocationswill survive.TRegionReplicaSetin query: fail immediately withReplicaSetUnreachableException. It will be caught and return the correspondingTSStatusTRegionReplicaSetin write: won't be scheduled and added to the failed list during schedule phase.TRegionReplicaSetasRootFIin query: fail immediately withRootFIPlacementException. It will be caught and return the correspondingTSStatusPart3. TODOs
ReplicaSetUnreachableException&RootFIPlacementExceptionTSStatus to retry in session connection.registerDataNode,removeDataNode,restartDataNode.