From 7d09c446422506b8f4964dade05a3622d68811fd Mon Sep 17 00:00:00 2001 From: Xiangpeng Hu <65238551+HxpSerein@users.noreply.github.com> Date: Sun, 27 Apr 2025 16:46:27 +0800 Subject: [PATCH 1/4] [remove datanode] GCR load balancing implement for removing datanode (#15282) (cherry picked from commit 7650b4793474b11405698bbe9290f814ef3afc06) --- .../GreedyCopySetRegionGroupAllocator.java | 358 +++++++++++++++++- .../region/GreedyRegionGroupAllocator.java | 13 + .../region/IRegionGroupAllocator.java | 21 + ...eGraphReplicationRegionGroupAllocator.java | 13 + .../procedure/env/RemoveDataNodeHandler.java | 186 +++++++++ .../impl/node/RemoveDataNodesProcedure.java | 6 +- ...edyCopySetRemoveNodeReplicaSelectTest.java | 342 +++++++++++++++++ 7 files changed, 925 insertions(+), 14 deletions(-) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java index 2577455d41cd1..5e36023053625 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java @@ -27,10 +27,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.stream.Collectors; import static java.util.Map.Entry.comparingByValue; @@ -50,6 +53,8 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator private int[] databaseRegionCounter; // The number of 2-Region combinations in current cluster private int[][] combinationCounter; + // The initial load for each database on each datanode + private Map initialDbLoad; // First Key: the sum of Regions at the DataNodes in the allocation result is minimal int optimalRegionSum; @@ -103,12 +108,9 @@ public TRegionReplicaSet generateOptimalRegionReplicasDistribution( int replicationFactor, TConsensusGroupId consensusGroupId) { try { - prepare( - replicationFactor, - availableDataNodeMap, - allocatedRegionGroups, - databaseAllocatedRegionGroups); - dfs(-1, 0, new int[replicationFactor], 0, 0); + this.replicationFactor = replicationFactor; + prepare(availableDataNodeMap, allocatedRegionGroups, databaseAllocatedRegionGroups); + dfsAllocateReplica(-1, 0, new int[replicationFactor], 0, 0); // Randomly pick one optimal plan as result Collections.shuffle(optimalReplicaSets); @@ -125,21 +127,355 @@ public TRegionReplicaSet generateOptimalRegionReplicasDistribution( } } + @Override + public Map removeNodeReplicaSelect( + Map availableDataNodeMap, + Map freeDiskSpaceMap, + List allocatedRegionGroups, + Map regionDatabaseMap, + Map> databaseAllocatedRegionGroupMap, + Map remainReplicasMap) { + try { + // 1. prepare: compute regionCounter, databaseRegionCounter, and combinationCounter + + List databaseAllocatedRegionGroups = + new ArrayList<>(databaseAllocatedRegionGroupMap.values()).get(0); + prepare(availableDataNodeMap, allocatedRegionGroups, databaseAllocatedRegionGroups); + computeInitialDbLoad(databaseAllocatedRegionGroupMap); + + // 2. Build allowed candidate set for each region that needs to be migrated. + // For each region in remainReplicasMap, the candidate destination nodes are all nodes in + // availableDataNodeMap + // excluding those already in the remain replica set. + List regionKeys = new ArrayList<>(remainReplicasMap.keySet()); + Map> allowedCandidatesMap = new HashMap<>(); + for (TConsensusGroupId regionId : regionKeys) { + TRegionReplicaSet remainReplicaSet = remainReplicasMap.get(regionId); + Set notAllowedNodes = new HashSet<>(); + + // Exclude nodes already present in the remain replica set + for (TDataNodeLocation location : remainReplicaSet.getDataNodeLocations()) { + notAllowedNodes.add(location.getDataNodeId()); + } + + // Allowed candidates are the nodes not in the exclusion set + List candidates = + availableDataNodeMap.keySet().stream() + .filter(nodeId -> !notAllowedNodes.contains(nodeId)) + .sorted( + (a, b) -> { + int cmp = Integer.compare(regionCounter[a], regionCounter[b]); + if (cmp == 0) { + cmp = Integer.compare(databaseRegionCounter[a], databaseRegionCounter[b]); + } + return cmp; + }) + .collect(Collectors.toList()); + + // Sort candidates in ascending order of current global load (regionCounter) + allowedCandidatesMap.put(regionId, candidates); + } + + // Optionally, sort regionKeys by the size of its candidate list (smaller candidate sets + // first) + regionKeys.sort(Comparator.comparingInt(id -> allowedCandidatesMap.get(id).size())); + + int n = regionKeys.size(); + // Each element holds the candidate nodeId chosen for the region at that index + int[] currentAssignment = new int[n]; + // additionalLoad holds the number of extra regions assigned to each node in this migration + // solution. + int[] additionalLoad = new int[regionCounter.length]; + + // 3. Create a buffer for candidate solutions + List optimalAssignments = new ArrayList<>(); + // bestMetrics holds the best found metrics: [maxGlobalLoad, maxDatabaseLoad, scatterValue]. + // Initialize to high values. + int[] bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE}; + + dfsRemoveNodeReplica( + 0, + regionKeys, + allowedCandidatesMap, + currentAssignment, + additionalLoad, + optimalAssignments, + bestMetrics, + remainReplicasMap, + regionDatabaseMap); + + // 4. Randomly select one solution from the candidate buffer + if (optimalAssignments.isEmpty()) { + // This should not happen if there is at least one valid assignment + return Collections.emptyMap(); + } + Collections.shuffle(optimalAssignments); + int[] bestAssignment = optimalAssignments.get(0); + + // 5. Build and return the result mapping: region -> chosen destination TDataNodeConfiguration + Map result = new HashMap<>(); + for (int i = 0; i < n; i++) { + TConsensusGroupId regionId = regionKeys.get(i); + int chosenNodeId = bestAssignment[i]; + result.put(regionId, availableDataNodeMap.get(chosenNodeId)); + } + return result; + } finally { + // Clear any temporary state to avoid impacting subsequent calls + clear(); + } + } + + /** + * DFS method that searches for migration target assignments. + * + *

It enumerates all possible assignments (one candidate for each region) and collects + * candidate solutions in the optimalAssignments buffer. The evaluation metrics for each complete + * assignment (i.e. when index == regionKeys.size()) are: + * + *

1. Max global load: the maximum over nodes of (regionCounter[node] + additionalLoad[node]) + * 2. Max database load: the maximum over nodes of (databaseRegionCounter[node] + + * additionalLoad[node]) 3. Scatter value: computed per region, summing the combinationCounter for + * every pair in the complete replica set. The complete replica set for a region includes nodes in + * its remain replica set plus the newly assigned node. + * + *

The candidates are compared lexicographically (first by global load, then by database load, + * then by scatter). When a better candidate is found, the optimalAssignments buffer is cleared + * and updated; if the new candidate matches the best found metrics, it is added to the buffer. + * + *

DFS search is pruned if the optimalAssignments buffer reaches CAPACITY. + * + * @param index Current DFS level, corresponding to regionKeys.get(index) + * @param regionKeys A list of regions that need to be migrated. + * @param allowedCandidatesMap For each region, the allowed candidate destination node IDs. + * @param currentAssignment Current partial assignment; its length equals the number of regions. + * @param additionalLoad Extra load currently assigned to each node. + * @param optimalAssignments Buffer holding candidate assignment arrays. + * @param bestMetrics An int array holding the best metrics found so far: [maxGlobalLoad, + * maxDatabaseLoad, scatterValue]. + * @param remainReplicasMap Mapping from region to its current remain replica set. + */ + private void dfsRemoveNodeReplica( + int index, + List regionKeys, + Map> allowedCandidatesMap, + int[] currentAssignment, + int[] additionalLoad, + List optimalAssignments, + int[] bestMetrics, + Map remainReplicasMap, + Map regionDatabaseMap) { + int n = regionKeys.size(); + if (index == n) { + // A complete assignment has been generated. + // Compute metrics for this complete migration assignment. + + // Compute the scatter value for the complete assignment. + int currentScatter = 0; + // For each region, calculate the scatter based on the combinationCounter among all nodes + // in the full replica set (which includes the nodes in the remain replica and the new + // candidate). + for (int r = 0; r < n; r++) { + TConsensusGroupId regionId = regionKeys.get(r); + for (TDataNodeLocation location : remainReplicasMap.get(regionId).getDataNodeLocations()) { + int nodeA = currentAssignment[r]; + int nodeB = location.getDataNodeId(); + currentScatter += combinationCounter[nodeA][nodeB]; + } + } + + // Compute the maximum global load and maximum database load among all nodes that received + // additional load. + int[] currentMetrics = + getCurrentMetrics( + additionalLoad, currentScatter, regionKeys, regionDatabaseMap, currentAssignment); + + // Lexicographically compare currentMetrics with bestMetrics. + // If currentMetrics is better than bestMetrics, update bestMetrics and clear the candidate + // buffer. + boolean isBetter = false; + boolean isEqual = true; + for (int i = 0; i < 3; i++) { + if (currentMetrics[i] < bestMetrics[i]) { + isBetter = true; + isEqual = false; + break; + } else if (currentMetrics[i] > bestMetrics[i]) { + isEqual = false; + break; + } + } + if (isBetter) { + bestMetrics[0] = currentMetrics[0]; + bestMetrics[1] = currentMetrics[1]; + bestMetrics[2] = currentMetrics[2]; + optimalAssignments.clear(); + optimalAssignments.add(Arrays.copyOf(currentAssignment, n)); + } else if (isEqual) { + optimalAssignments.add(Arrays.copyOf(currentAssignment, n)); + // Prune search if we already have enough candidate solutions + if (optimalAssignments.size() >= GCR_MAX_OPTIMAL_PLAN_NUM) { + return; + } + } + return; + } + + // Process the region at the current index. + TConsensusGroupId regionId = regionKeys.get(index); + List candidates = allowedCandidatesMap.get(regionId); + for (Integer candidate : candidates) { + currentAssignment[index] = candidate; + additionalLoad[candidate]++; + dfsRemoveNodeReplica( + index + 1, + regionKeys, + allowedCandidatesMap, + currentAssignment, + additionalLoad, + optimalAssignments, + bestMetrics, + remainReplicasMap, + regionDatabaseMap); + // Backtrack + additionalLoad[candidate]--; + } + } + + /** + * Computes the squared sum of the maximum load for each database. + * + *

For each database, this method calculates the maximum load on any data node by summing the + * initial load (from {@code initialDbLoad}) with the additional load assigned during migration + * (accumulated in {@code currentAssignment}), and then squares this maximum load. Finally, it + * returns the sum of these squared maximum loads across all databases. + * + * @param currentAssignment an array where each element is the nodeId assigned for the + * corresponding region in {@code regionKeys}. + * @param regionKeys a list of region identifiers (TConsensusGroupId) representing the regions + * under migration. + * @param regionDatabaseMap a mapping from each region identifier to its corresponding database + * name. + * @return the sum of the squares of the maximum loads computed for each database. + */ + private int computeDatabaseLoadSquaredSum( + int[] currentAssignment, + List regionKeys, + Map regionDatabaseMap) { + Map extraLoadPerDb = new HashMap<>(); + // Initialize extra load counters for each database using the number of nodes from + // regionCounter. + for (String db : initialDbLoad.keySet()) { + extraLoadPerDb.put(db, new int[regionCounter.length]); + } + // Accumulate extra load per database based on the current assignment. + for (int i = 0; i < regionKeys.size(); i++) { + TConsensusGroupId regionId = regionKeys.get(i); + String db = regionDatabaseMap.get(regionId); + int nodeId = currentAssignment[i]; + extraLoadPerDb.get(db)[nodeId]++; + } + int sumSquared = 0; + // For each database, compute the maximum load across nodes and add its square to the sum. + for (String db : initialDbLoad.keySet()) { + int[] initLoads = initialDbLoad.get(db); + int[] extras = extraLoadPerDb.get(db); + int maxLoad = 0; + for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) { + int load = initLoads[nodeId] + extras[nodeId]; + if (load > maxLoad) { + maxLoad = load; + } + } + sumSquared += maxLoad * maxLoad; + } + return sumSquared; + } + + /** + * Computes the current migration metrics. + * + *

This method calculates three key metrics: + * + *

    + *
  1. Max Global Load: The maximum load among all nodes, computed as the sum + * of the initial region load (from {@code regionCounter}) and the additional load (from + * {@code additionalLoad}). + *
  2. Database Load Squared Sum: The squared sum of the maximum load per + * database, which is computed by {@link #computeDatabaseLoadSquaredSum(int[], List, Map)}. + *
  3. Scatter Value: A provided metric that reflects additional balancing + * criteria. + *
+ * + * The metrics are returned as an array of three integers in the order: [maxGlobalLoad, + * databaseLoadSquaredSum, scatterValue]. + * + * @param additionalLoad an array representing the additional load assigned to each node during + * migration. + * @param currentScatter the current scatter value metric. + * @param regionKeys a list of region identifiers (TConsensusGroupId) for which migration is being + * computed. + * @param regionDatabaseMap a mapping from each region identifier to its corresponding database + * name. + * @param currentAssignment an array where each element is the nodeId assigned for the + * corresponding region in {@code regionKeys}. + * @return an integer array of size 3: [maxGlobalLoad, databaseLoadSquaredSum, scatterValue]. + */ + private int[] getCurrentMetrics( + int[] additionalLoad, + int currentScatter, + List regionKeys, + Map regionDatabaseMap, + int[] currentAssignment) { + int currentMaxGlobalLoad = 0; + // Calculate the maximum global load across all data nodes. + for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) { + int globalLoad = regionCounter[nodeId] + additionalLoad[nodeId]; + currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, globalLoad); + } + // Compute the database load squared sum using the helper method. + int dbLoadSquaredSum = + computeDatabaseLoadSquaredSum(currentAssignment, regionKeys, regionDatabaseMap); + // Build current metrics in order [maxGlobalLoad, maxDatabaseLoad, scatterValue] + return new int[] {currentMaxGlobalLoad, dbLoadSquaredSum, currentScatter}; + } + + /** + * Compute the initial load for each database on each data node. + * + * @param databaseAllocatedRegionGroupMap Mapping of each database to its list of replica sets. + */ + private void computeInitialDbLoad( + Map> databaseAllocatedRegionGroupMap) { + initialDbLoad = new HashMap<>(); + + // Iterate over each database and count the number of regions on each data node across all its + // replica sets. + for (String database : databaseAllocatedRegionGroupMap.keySet()) { + List replicaSets = databaseAllocatedRegionGroupMap.get(database); + int[] load = new int[regionCounter.length]; + for (TRegionReplicaSet replicaSet : replicaSets) { + for (TDataNodeLocation location : replicaSet.getDataNodeLocations()) { + int nodeId = location.getDataNodeId(); + load[nodeId]++; + } + } + initialDbLoad.put(database, load); + } + } + /** * Prepare some statistics before dfs. * - * @param replicationFactor replication factor in the cluster * @param availableDataNodeMap currently available DataNodes, ensure size() >= replicationFactor * @param allocatedRegionGroups already allocated RegionGroups in the cluster * @param databaseAllocatedRegionGroups already allocated RegionGroups in the same Database */ private void prepare( - int replicationFactor, Map availableDataNodeMap, List allocatedRegionGroups, List databaseAllocatedRegionGroups) { - this.replicationFactor = replicationFactor; // Store the maximum DataNodeId int maxDataNodeId = Math.max( @@ -225,7 +561,7 @@ private void prepare( * current allocation plan * @param regionSum the sum of Regions at the DataNodes in the current allocation plan */ - private void dfs( + private void dfsAllocateReplica( int lastIndex, int currentReplica, int[] currentReplicaSet, @@ -274,7 +610,7 @@ private void dfs( for (int i = lastIndex + 1; i < dataNodeIds.length; i++) { // Decide the next DataNodeId in the allocation plan currentReplicaSet[currentReplica] = dataNodeIds[i]; - dfs( + dfsAllocateReplica( i, currentReplica + 1, currentReplicaSet, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java index d05a8accbec3b..9c8f032cee082 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java @@ -57,6 +57,19 @@ public TRegionReplicaSet generateOptimalRegionReplicasDistribution( weightList.stream().limit(replicationFactor).collect(Collectors.toList())); } + @Override + public Map removeNodeReplicaSelect( + Map availableDataNodeMap, + Map freeDiskSpaceMap, + List allocatedRegionGroups, + Map regionDatabaseMap, + Map> databaseAllocatedRegionGroupMap, + Map remainReplicasMap) { + // TODO: Implement this method + throw new UnsupportedOperationException( + "The removeNodeReplicaSelect method of GreedyRegionGroupAllocator is yet to be implemented."); + } + private List buildWeightList( Map availableDataNodeMap, Map freeDiskSpaceMap, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java index 554168d849735..24200548163dd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java @@ -47,4 +47,25 @@ TRegionReplicaSet generateOptimalRegionReplicasDistribution( List databaseAllocatedRegionGroups, int replicationFactor, TConsensusGroupId consensusGroupId); + + /** + * Select the optimal DataNode to place the new replica on along with the remaining replica set. + * + * @param availableDataNodeMap DataNodes that can be used for allocation + * @param freeDiskSpaceMap The free disk space of the DataNodes + * @param allocatedRegionGroups Allocated RegionGroups + * @param regionDatabaseMap A mapping from each region identifier to its corresponding database + * name + * @param databaseAllocatedRegionGroupMap Allocated RegionGroups within the same Database with the + * replica set + * @param remainReplicasMap the remaining replica set excluding the removed DataNodes + * @return The optimal DataNode to place the new replica on along with the remaining replicas + */ + Map removeNodeReplicaSelect( + Map availableDataNodeMap, + Map freeDiskSpaceMap, + List allocatedRegionGroups, + Map regionDatabaseMap, + Map> databaseAllocatedRegionGroupMap, + Map remainReplicasMap); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java index 1205e0abc067c..4a2237805ad8a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java @@ -113,6 +113,19 @@ public TRegionReplicaSet generateOptimalRegionReplicasDistribution( return result; } + @Override + public Map removeNodeReplicaSelect( + Map availableDataNodeMap, + Map freeDiskSpaceMap, + List allocatedRegionGroups, + Map regionDatabaseMap, + Map> databaseAllocatedRegionGroupMap, + Map remainReplicasMap) { + // TODO: Implement this method + throw new UnsupportedOperationException( + "The removeNodeReplicaSelect method of PartiteGraphPlacementRegionGroupAllocator is yet to be implemented."); + } + private void prepare( int replicationFactor, Map availableDataNodeMap, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java index eaa16f47907f4..a60d732c4d92b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.procedure.env; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; @@ -37,6 +38,8 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp; import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator; +import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; @@ -51,10 +54,13 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS; @@ -70,8 +76,22 @@ public class RemoveDataNodeHandler { private final ConfigManager configManager; + private final IRegionGroupAllocator regionGroupAllocator; + public RemoveDataNodeHandler(ConfigManager configManager) { this.configManager = configManager; + + switch (ConfigNodeDescriptor.getInstance().getConf().getRegionGroupAllocatePolicy()) { + case GREEDY: + this.regionGroupAllocator = new GreedyRegionGroupAllocator(); + break; + case PGR: + this.regionGroupAllocator = new GreedyRegionGroupAllocator(); + break; + case GCR: + default: + this.regionGroupAllocator = new GreedyRegionGroupAllocator(); + } } /** @@ -193,6 +213,172 @@ public List getRegionMigrationPlans( return regionMigrationPlans; } + /** + * Retrieves all region migration plans for the specified removed DataNodes and selects the + * destination. + * + * @param removedDataNodes the list of DataNodes from which to obtain migration plans + * @return a list of region migration plans associated with the removed DataNodes + */ + public List selectedRegionMigrationPlans( + List removedDataNodes) { + + Set removedDataNodesSet = new HashSet<>(); + for (TDataNodeLocation removedDataNode : removedDataNodes) { + removedDataNodesSet.add(removedDataNode.dataNodeId); + } + + final List availableDataNodes = + configManager + .getNodeManager() + .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown) + .stream() + .filter(node -> !removedDataNodesSet.contains(node.getLocation().getDataNodeId())) + .collect(Collectors.toList()); + + List regionMigrationPlans = new ArrayList<>(); + + regionMigrationPlans.addAll( + selectMigrationPlans(availableDataNodes, TConsensusGroupType.DataRegion, removedDataNodes)); + + regionMigrationPlans.addAll( + selectMigrationPlans( + availableDataNodes, TConsensusGroupType.SchemaRegion, removedDataNodes)); + + return regionMigrationPlans; + } + + public List selectMigrationPlans( + List availableDataNodes, + TConsensusGroupType consensusGroupType, + List removedDataNodes) { + + // Retrieve all allocated replica sets for the given consensus group type + List allocatedReplicaSets = + configManager.getPartitionManager().getAllReplicaSets(consensusGroupType); + + // Step 1: Identify affected replica sets and record the removed DataNode for each replica set + Map removedNodeMap = new HashMap<>(); + Set affectedReplicaSets = + identifyAffectedReplicaSets(allocatedReplicaSets, removedDataNodes, removedNodeMap); + + // Step 2: Update affected replica sets by removing the removed DataNode + updateReplicaSets(allocatedReplicaSets, affectedReplicaSets, removedNodeMap); + + // Build a mapping of available DataNodes and their free disk space (computed only once) + Map availableDataNodeMap = + buildAvailableDataNodeMap(availableDataNodes); + Map freeDiskSpaceMap = buildFreeDiskSpaceMap(availableDataNodes); + + // Step 3: For each affected replica set, select a new destination DataNode and create a + // migration plan + List migrationPlans = new ArrayList<>(); + + Map remainReplicasMap = new HashMap<>(); + Map regionDatabaseMap = new HashMap<>(); + Map> databaseAllocatedRegionGroupMap = new HashMap<>(); + + for (TRegionReplicaSet replicaSet : affectedReplicaSets) { + remainReplicasMap.put(replicaSet.getRegionId(), replicaSet); + String database = + configManager.getPartitionManager().getRegionDatabase(replicaSet.getRegionId()); + List databaseAllocatedReplicaSets = + configManager.getPartitionManager().getAllReplicaSets(database, consensusGroupType); + regionDatabaseMap.put(replicaSet.getRegionId(), database); + databaseAllocatedRegionGroupMap.put(database, databaseAllocatedReplicaSets); + } + + Map result = + regionGroupAllocator.removeNodeReplicaSelect( + availableDataNodeMap, + freeDiskSpaceMap, + allocatedReplicaSets, + regionDatabaseMap, + databaseAllocatedRegionGroupMap, + remainReplicasMap); + + for (TConsensusGroupId regionId : result.keySet()) { + + TDataNodeConfiguration selectedNode = result.get(regionId); + LOGGER.info( + "Selected DataNode {} for Region {}", + selectedNode.getLocation().getDataNodeId(), + regionId); + + // Create the migration plan + RegionMigrationPlan plan = RegionMigrationPlan.create(regionId, removedNodeMap.get(regionId)); + plan.setToDataNode(selectedNode.getLocation()); + migrationPlans.add(plan); + } + return migrationPlans; + } + + /** + * Identifies affected replica sets from allocatedReplicaSets that contain any DataNode in + * removedDataNodes, and records the removed DataNode for each replica set. + */ + private Set identifyAffectedReplicaSets( + List allocatedReplicaSets, + List removedDataNodes, + Map removedNodeMap) { + + Set affectedReplicaSets = new HashSet<>(); + // Create a copy of allocatedReplicaSets to avoid concurrent modifications + List allocatedCopy = new ArrayList<>(allocatedReplicaSets); + + for (TDataNodeLocation removedNode : removedDataNodes) { + allocatedCopy.stream() + .filter(replicaSet -> replicaSet.getDataNodeLocations().contains(removedNode)) + .forEach( + replicaSet -> { + removedNodeMap.put(replicaSet.getRegionId(), removedNode); + affectedReplicaSets.add(replicaSet); + }); + } + return affectedReplicaSets; + } + + /** + * Updates each affected replica set by removing the removed DataNode from its list. The + * allocatedReplicaSets list is updated accordingly. + */ + private void updateReplicaSets( + List allocatedReplicaSets, + Set affectedReplicaSets, + Map removedNodeMap) { + for (TRegionReplicaSet replicaSet : affectedReplicaSets) { + // Remove the replica set, update its node list, then re-add it + allocatedReplicaSets.remove(replicaSet); + replicaSet.getDataNodeLocations().remove(removedNodeMap.get(replicaSet.getRegionId())); + allocatedReplicaSets.add(replicaSet); + } + } + + /** + * Constructs a mapping from DataNodeId to TDataNodeConfiguration from the available DataNodes. + */ + private Map buildAvailableDataNodeMap( + List availableDataNodes) { + return availableDataNodes.stream() + .collect( + Collectors.toMap( + dataNode -> dataNode.getLocation().getDataNodeId(), Function.identity())); + } + + /** Constructs a mapping of free disk space for each DataNode. */ + private Map buildFreeDiskSpaceMap( + List availableDataNodes) { + Map freeDiskSpaceMap = new HashMap<>(availableDataNodes.size()); + availableDataNodes.forEach( + dataNode -> + freeDiskSpaceMap.put( + dataNode.getLocation().getDataNodeId(), + configManager + .getLoadManager() + .getFreeDiskSpace(dataNode.getLocation().getDataNodeId()))); + return freeDiskSpaceMap; + } + /** * Broadcasts DataNodes' status change, preventing disabled DataNodes from accepting read or write * requests. diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java index 68b0550e0dfd0..a531d67955cec 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java @@ -121,7 +121,8 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState removedDataNodes.forEach( dataNode -> removedNodeStatusMap.put(dataNode.getDataNodeId(), NodeStatus.Removing)); removeDataNodeHandler.changeDataNodeStatus(removedDataNodes, removedNodeStatusMap); - regionMigrationPlans = removeDataNodeHandler.getRegionMigrationPlans(removedDataNodes); + regionMigrationPlans = + removeDataNodeHandler.selectedRegionMigrationPlans(removedDataNodes); LOG.info( "{}, DataNode regions to be removed is {}", REMOVE_DATANODE_PROCESS, @@ -165,8 +166,7 @@ private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) { regionMigrationPlan -> { TConsensusGroupId regionId = regionMigrationPlan.getRegionId(); TDataNodeLocation removedDataNode = regionMigrationPlan.getFromDataNode(); - TDataNodeLocation destDataNode = - env.getRegionMaintainHandler().findDestDataNode(regionId); + TDataNodeLocation destDataNode = regionMigrationPlan.getToDataNode(); // TODO: need to improve the coordinator selection method here, maybe through load // balancing and other means. final TDataNodeLocation coordinatorForAddPeer = diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java new file mode 100644 index 0000000000000..557fba8ba7a21 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class GreedyCopySetRemoveNodeReplicaSelectTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(GreedyCopySetRemoveNodeReplicaSelectTest.class); + + private static final IRegionGroupAllocator GCR_ALLOCATOR = + new GreedyCopySetRegionGroupAllocator(); + + private static final TDataNodeLocation REMOVE_DATANODE_LOCATION = + new TDataNodeLocation().setDataNodeId(5); + + private static final int TEST_DATA_NODE_NUM = 5; + + private static final int DATA_REGION_PER_DATA_NODE = 4; + + private static final int DATA_REPLICATION_FACTOR = 2; + + private static final Map AVAILABLE_DATA_NODE_MAP = + new HashMap<>(); + + private static final Map FREE_SPACE_MAP = new HashMap<>(); + + @Before + public void setUp() { + // Construct TEST_DATA_NODE_NUM DataNodes + AVAILABLE_DATA_NODE_MAP.clear(); + FREE_SPACE_MAP.clear(); + for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) { + AVAILABLE_DATA_NODE_MAP.put( + i, new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i))); + FREE_SPACE_MAP.put(i, Math.random()); + } + } + + @Test + public void testSelectDestNode() { + final int dataRegionGroupNum = + DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM / DATA_REPLICATION_FACTOR; + + List allocateResult = new ArrayList<>(); + for (int index = 0; index < dataRegionGroupNum; index++) { + allocateResult.add( + GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + allocateResult, + allocateResult, + DATA_REPLICATION_FACTOR, + new TConsensusGroupId(TConsensusGroupType.DataRegion, index))); + } + + List migratedReplicas = + allocateResult.stream() + .filter( + replicaSet -> replicaSet.getDataNodeLocations().contains(REMOVE_DATANODE_LOCATION)) + .collect(Collectors.toList()); + + AVAILABLE_DATA_NODE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId()); + FREE_SPACE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId()); + + List remainReplicas = new ArrayList<>(); + for (TRegionReplicaSet replicaSet : migratedReplicas) { + List dataNodeLocations = replicaSet.getDataNodeLocations(); + allocateResult.remove(replicaSet); + dataNodeLocations.remove(REMOVE_DATANODE_LOCATION); + allocateResult.add(replicaSet); + remainReplicas.add(replicaSet); + } + + Map randomRegionCounter = new HashMap<>(); + Map PGPRegionCounter = new HashMap<>(); + Set randomSelectedNodeIds = new HashSet<>(); + Set PGPSelectedNodeIds = new HashSet<>(); + + int randomMaxRegionCount = 0; + int randomMinRegionCount = Integer.MAX_VALUE; + int PGPMaxRegionCount = 0; + int PGPMinRegionCount = Integer.MAX_VALUE; + + AVAILABLE_DATA_NODE_MAP + .keySet() + .forEach( + nodeId -> { + randomRegionCounter.put(nodeId, 0); + PGPRegionCounter.put(nodeId, 0); + }); + + for (TRegionReplicaSet remainReplicaSet : remainReplicas) { + TDataNodeLocation selectedNode = + randomSelectNodeForRegion(remainReplicaSet.getDataNodeLocations()).get(); + LOGGER.info( + "Random Selected DataNode {} for Region {}", + selectedNode.getDataNodeId(), + remainReplicaSet.regionId); + randomSelectedNodeIds.add(selectedNode.getDataNodeId()); + randomRegionCounter.put( + selectedNode.getDataNodeId(), randomRegionCounter.get(selectedNode.getDataNodeId()) + 1); + } + + LOGGER.info("Remain Replicas... :"); + for (TRegionReplicaSet remainReplicaSet : remainReplicas) { + LOGGER.info("Region Group Id: {}", remainReplicaSet.regionId.id); + List dataNodeLocations = remainReplicaSet.getDataNodeLocations(); + for (TDataNodeLocation dataNodeLocation : dataNodeLocations) { + LOGGER.info("DataNode: {}", dataNodeLocation.getDataNodeId()); + } + } + Map remainReplicasMap = new HashMap<>(); + Map> databaseAllocatedRegionGroupMap = new HashMap<>(); + databaseAllocatedRegionGroupMap.put("database", allocateResult); + + for (TRegionReplicaSet remainReplicaSet : remainReplicas) { + remainReplicasMap.put(remainReplicaSet.getRegionId(), remainReplicaSet); + } + Map regionDatabaseMap = new HashMap<>(); + for (TRegionReplicaSet replicaSet : allocateResult) { + regionDatabaseMap.put(replicaSet.getRegionId(), "database"); + } + Map result = + GCR_ALLOCATOR.removeNodeReplicaSelect( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + allocateResult, + regionDatabaseMap, + databaseAllocatedRegionGroupMap, + remainReplicasMap); + + for (TConsensusGroupId regionId : result.keySet()) { + TDataNodeConfiguration selectedNode = result.get(regionId); + + LOGGER.info( + "GCR Selected DataNode {} for Region {}", + selectedNode.getLocation().getDataNodeId(), + regionId); + PGPSelectedNodeIds.add(selectedNode.getLocation().getDataNodeId()); + PGPRegionCounter.put( + selectedNode.getLocation().getDataNodeId(), + PGPRegionCounter.get(selectedNode.getLocation().getDataNodeId()) + 1); + } + + for (Integer i : randomRegionCounter.keySet()) { + Integer value = randomRegionCounter.get(i); + randomMaxRegionCount = Math.max(randomMaxRegionCount, value); + randomMinRegionCount = Math.min(randomMinRegionCount, value); + } + + for (Integer i : PGPRegionCounter.keySet()) { + Integer value = PGPRegionCounter.get(i); + PGPMaxRegionCount = Math.max(PGPMaxRegionCount, value); + PGPMinRegionCount = Math.min(PGPMinRegionCount, value); + } + + Assert.assertEquals(TEST_DATA_NODE_NUM - 1, PGPSelectedNodeIds.size()); + Assert.assertTrue(PGPSelectedNodeIds.size() >= randomSelectedNodeIds.size()); + Assert.assertTrue(randomMaxRegionCount >= PGPMaxRegionCount); + Assert.assertTrue(randomMinRegionCount <= PGPMinRegionCount); + } + + @Test + public void testSelectDestNodeMultiDatabase() { + // Pre‑allocate RegionReplicaSets for multiple databases + final String[] DB_NAMES = {"db0", "db1", "db2"}; + final int TOTAL_RG_NUM = + DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM / DATA_REPLICATION_FACTOR; + + int basePerDb = TOTAL_RG_NUM / DB_NAMES.length; + int remainder = TOTAL_RG_NUM % DB_NAMES.length; // first DBs get one extra + + Map> dbAllocatedMap = new HashMap<>(); + List globalAllocatedList = new ArrayList<>(); + int globalIndex = 0; + + for (int dbIdx = 0; dbIdx < DB_NAMES.length; dbIdx++) { + String db = DB_NAMES[dbIdx]; + int rgToCreate = basePerDb + (dbIdx < remainder ? 1 : 0); + List perDbList = new ArrayList<>(); + dbAllocatedMap.put(db, perDbList); + + for (int i = 0; i < rgToCreate; i++) { + TRegionReplicaSet rs = + GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + globalAllocatedList, + perDbList, + DATA_REPLICATION_FACTOR, + new TConsensusGroupId(TConsensusGroupType.DataRegion, globalIndex++)); + globalAllocatedList.add(rs); + perDbList.add(rs); + } + } + + // Identify the replica‑sets that contain the node to be removed + List impactedReplicas = + globalAllocatedList.stream() + .filter(rs -> rs.getDataNodeLocations().contains(REMOVE_DATANODE_LOCATION)) + .collect(Collectors.toList()); + + // Simulate removing the faulty/offline node + AVAILABLE_DATA_NODE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId()); + FREE_SPACE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId()); + + List remainReplicas = new ArrayList<>(); + for (TRegionReplicaSet rs : impactedReplicas) { + globalAllocatedList.remove(rs); + rs.getDataNodeLocations().remove(REMOVE_DATANODE_LOCATION); + globalAllocatedList.add(rs); + remainReplicas.add(rs); + } + + // Build helper maps for removeNodeReplicaSelect + Map remainMap = new HashMap<>(); + remainReplicas.forEach(r -> remainMap.put(r.getRegionId(), r)); + + Map regionDbMap = new HashMap<>(); + dbAllocatedMap.forEach((db, list) -> list.forEach(r -> regionDbMap.put(r.getRegionId(), db))); + + // Baseline: random selection for comparison + Map rndCount = new HashMap<>(); + Map planCount = new HashMap<>(); + Set rndNodes = new HashSet<>(); + Set planNodes = new HashSet<>(); + int rndMax = 0, rndMin = Integer.MAX_VALUE; + int planMax = 0, planMin = Integer.MAX_VALUE; + + AVAILABLE_DATA_NODE_MAP + .keySet() + .forEach( + n -> { + rndCount.put(n, 0); + planCount.put(n, 0); + }); + + for (TRegionReplicaSet r : remainReplicas) { + TDataNodeLocation pick = randomSelectNodeForRegion(r.getDataNodeLocations()).get(); + LOGGER.info("Random Selected DataNode {} for Region {}", pick.getDataNodeId(), r.regionId); + rndNodes.add(pick.getDataNodeId()); + rndCount.merge(pick.getDataNodeId(), 1, Integer::sum); + } + + LOGGER.info("Remain Replicas... :"); + for (TRegionReplicaSet remainReplicaSet : remainReplicas) { + LOGGER.info("Region Group Id: {}", remainReplicaSet.regionId.id); + List dataNodeLocations = remainReplicaSet.getDataNodeLocations(); + for (TDataNodeLocation dataNodeLocation : dataNodeLocations) { + LOGGER.info("DataNode: {}", dataNodeLocation.getDataNodeId()); + } + } + + // Call the method under test + Map result = + GCR_ALLOCATOR.removeNodeReplicaSelect( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + globalAllocatedList, + regionDbMap, + dbAllocatedMap, + remainMap); + + for (TConsensusGroupId regionId : result.keySet()) { + TDataNodeConfiguration selectedNode = result.get(regionId); + + LOGGER.info( + "GCR Selected DataNode {} for Region {}", + selectedNode.getLocation().getDataNodeId(), + regionId); + planNodes.add(selectedNode.getLocation().getDataNodeId()); + planCount.merge(selectedNode.getLocation().getDataNodeId(), 1, Integer::sum); + } + + // Calculate load distribution + for (int c : rndCount.values()) { + rndMax = Math.max(rndMax, c); + rndMin = Math.min(rndMin, c); + } + for (int c : planCount.values()) { + planMax = Math.max(planMax, c); + planMin = Math.min(planMin, c); + } + + // Assertions + Assert.assertEquals(TEST_DATA_NODE_NUM - 1, planNodes.size()); + Assert.assertTrue(planNodes.size() >= rndNodes.size()); + Assert.assertTrue(rndMax >= planMax); + Assert.assertTrue(rndMin <= planMin); + } + + private Optional randomSelectNodeForRegion( + List regionReplicaNodes) { + List dataNodeConfigurations = + new ArrayList<>(AVAILABLE_DATA_NODE_MAP.values()); + // Randomly selected to ensure a basic load balancing + Collections.shuffle(dataNodeConfigurations); + return dataNodeConfigurations.stream() + .map(TDataNodeConfiguration::getLocation) + .filter(e -> !regionReplicaNodes.contains(e)) + .findAny(); + } +} From b12b6ff1ae3b40d87e5f4d02d392f0b3eb01b275 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hu <65238551+HxpSerein@users.noreply.github.com> Date: Mon, 28 Apr 2025 18:44:45 +0800 Subject: [PATCH 2/4] [remove datanode] Fix IoTDBRemoveDataNodeNormalIT #15429 (cherry picked from commit 953780620df07eb6282d5391c0069fabd63cb40a) --- .../region/GreedyCopySetRegionGroupAllocator.java | 4 +--- .../confignode/procedure/env/RemoveDataNodeHandler.java | 8 ++++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java index 5e36023053625..da1205c10ba61 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java @@ -138,9 +138,7 @@ public Map removeNodeReplicaSelect( try { // 1. prepare: compute regionCounter, databaseRegionCounter, and combinationCounter - List databaseAllocatedRegionGroups = - new ArrayList<>(databaseAllocatedRegionGroupMap.values()).get(0); - prepare(availableDataNodeMap, allocatedRegionGroups, databaseAllocatedRegionGroups); + prepare(availableDataNodeMap, allocatedRegionGroups, Collections.emptyList()); computeInitialDbLoad(databaseAllocatedRegionGroupMap); // 2. Build allowed candidate set for each region that needs to be migrated. diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java index a60d732c4d92b..980dd712eb0fe 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java @@ -38,7 +38,7 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp; import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator; +import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; @@ -83,14 +83,14 @@ public RemoveDataNodeHandler(ConfigManager configManager) { switch (ConfigNodeDescriptor.getInstance().getConf().getRegionGroupAllocatePolicy()) { case GREEDY: - this.regionGroupAllocator = new GreedyRegionGroupAllocator(); + this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator(); break; case PGR: - this.regionGroupAllocator = new GreedyRegionGroupAllocator(); + this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator(); break; case GCR: default: - this.regionGroupAllocator = new GreedyRegionGroupAllocator(); + this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator(); } } From 7196559e21d33883c3e3c7400cf166ca74a9a407 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hu <65238551+HxpSerein@users.noreply.github.com> Date: Fri, 6 Jun 2025 13:02:39 +0800 Subject: [PATCH 3/4] [remove datanode] Accelerate GCR load balancing implement (#15535) (cherry picked from commit 51bad1ec88d5d20aa4e84f3ccd6841fbfba75390) --- .../GreedyCopySetRegionGroupAllocator.java | 253 +++++++++--------- ...edyCopySetRemoveNodeReplicaSelectTest.java | 29 +- 2 files changed, 148 insertions(+), 134 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java index da1205c10ba61..bc71bd3996e06 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java @@ -29,7 +29,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -42,7 +41,7 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator { private static final Random RANDOM = new Random(); - private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 100; + private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 10; private int replicationFactor; // Sorted available DataNodeIds @@ -57,14 +56,31 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator private Map initialDbLoad; // First Key: the sum of Regions at the DataNodes in the allocation result is minimal - int optimalRegionSum; + private int optimalRegionSum; // Second Key: the sum of Regions at the DataNodes within the same Database // in the allocation result is minimal - int optimalDatabaseRegionSum; + private int optimalDatabaseRegionSum; // Third Key: the sum of overlapped 2-Region combination Regions with // other allocated RegionGroups is minimal - int optimalCombinationSum; - List optimalReplicaSets; + private int optimalCombinationSum; + private List optimalReplicaSets; + + // Pre-calculation, scatterDelta[i][j] means the scatter increment between region i and the old + // replica set when region i is placed on node j + private int[][] scatterDelta; + // For each region, the allowed candidate destination node IDs. + private Map> allowedCandidatesMap; + // A list of regions that need to be migrated. + private List dfsRegionKeys; + // A mapping from each region identifier to its corresponding database name. + private Map regionDatabaseMap; + // Buffer holding best assignment arrays. + private int[] bestAssignment; + // An int array holding the best metrics found so far: [maxGlobalLoad, maxDatabaseLoad, + // scatterValue]. + private int[] bestMetrics; + // dfsRemoveNodeReplica batch size + private static final int BATCH_SIZE = 12; private static class DataNodeEntry { @@ -146,15 +162,14 @@ public Map removeNodeReplicaSelect( // availableDataNodeMap // excluding those already in the remain replica set. List regionKeys = new ArrayList<>(remainReplicasMap.keySet()); - Map> allowedCandidatesMap = new HashMap<>(); + allowedCandidatesMap = new HashMap<>(); + this.regionDatabaseMap = regionDatabaseMap; for (TConsensusGroupId regionId : regionKeys) { TRegionReplicaSet remainReplicaSet = remainReplicasMap.get(regionId); - Set notAllowedNodes = new HashSet<>(); - - // Exclude nodes already present in the remain replica set - for (TDataNodeLocation location : remainReplicaSet.getDataNodeLocations()) { - notAllowedNodes.add(location.getDataNodeId()); - } + Set notAllowedNodes = + remainReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet()); // Allowed candidates are the nodes not in the exclusion set List candidates = @@ -163,12 +178,12 @@ public Map removeNodeReplicaSelect( .sorted( (a, b) -> { int cmp = Integer.compare(regionCounter[a], regionCounter[b]); - if (cmp == 0) { - cmp = Integer.compare(databaseRegionCounter[a], databaseRegionCounter[b]); - } - return cmp; + return (cmp != 0) + ? cmp + : Integer.compare(databaseRegionCounter[a], databaseRegionCounter[b]); }) .collect(Collectors.toList()); + Collections.shuffle(candidates); // Sort candidates in ascending order of current global load (regionCounter) allowedCandidatesMap.put(regionId, candidates); @@ -178,44 +193,63 @@ public Map removeNodeReplicaSelect( // first) regionKeys.sort(Comparator.comparingInt(id -> allowedCandidatesMap.get(id).size())); - int n = regionKeys.size(); - // Each element holds the candidate nodeId chosen for the region at that index - int[] currentAssignment = new int[n]; - // additionalLoad holds the number of extra regions assigned to each node in this migration - // solution. - int[] additionalLoad = new int[regionCounter.length]; + // 3. Batch DFS + Map result = new HashMap<>(); - // 3. Create a buffer for candidate solutions - List optimalAssignments = new ArrayList<>(); - // bestMetrics holds the best found metrics: [maxGlobalLoad, maxDatabaseLoad, scatterValue]. - // Initialize to high values. - int[] bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE}; + for (int start = 0; start < regionKeys.size(); start += BATCH_SIZE) { + dfsRegionKeys = regionKeys.subList(start, Math.min(start + BATCH_SIZE, regionKeys.size())); + int batchSize = dfsRegionKeys.size(); + + // Initialize buffer + bestAssignment = new int[batchSize]; + // bestMetrics holds the best found metrics: [maxGlobalLoad, maxDatabaseLoad, scatterValue]. + bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE}; + // currentAssignment holds the candidate nodeId chosen for the region at that index + int[] currentAssignment = new int[batchSize]; + // additionalLoad holds the number of extra regions assigned to each node in this migration + // solution. + int[] additionalLoad = new int[regionCounter.length]; + + scatterDelta = new int[batchSize][regionCounter.length]; + for (int r = 0; r < batchSize; r++) { + TConsensusGroupId regionId = dfsRegionKeys.get(r); + for (int nodeId : allowedCandidatesMap.get(regionId)) { + int inc = 0; + for (TDataNodeLocation loc : remainReplicasMap.get(regionId).getDataNodeLocations()) { + inc += combinationCounter[nodeId][loc.getDataNodeId()]; + } + scatterDelta[r][nodeId] = inc; + } + } - dfsRemoveNodeReplica( - 0, - regionKeys, - allowedCandidatesMap, - currentAssignment, - additionalLoad, - optimalAssignments, - bestMetrics, - remainReplicasMap, - regionDatabaseMap); - - // 4. Randomly select one solution from the candidate buffer - if (optimalAssignments.isEmpty()) { - // This should not happen if there is at least one valid assignment - return Collections.emptyMap(); - } - Collections.shuffle(optimalAssignments); - int[] bestAssignment = optimalAssignments.get(0); + int currentMaxGlobalLoad = 0; + for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) { + currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, regionCounter[nodeId]); + } - // 5. Build and return the result mapping: region -> chosen destination TDataNodeConfiguration - Map result = new HashMap<>(); - for (int i = 0; i < n; i++) { - TConsensusGroupId regionId = regionKeys.get(i); - int chosenNodeId = bestAssignment[i]; - result.put(regionId, availableDataNodeMap.get(chosenNodeId)); + dfsRemoveNodeReplica(0, currentMaxGlobalLoad, 0, currentAssignment, additionalLoad); + + if (bestMetrics[0] == Integer.MAX_VALUE) { + // This should not happen if there is at least one valid assignment + return Collections.emptyMap(); + } + + for (int i = 0; i < batchSize; i++) { + TConsensusGroupId regionId = dfsRegionKeys.get(i); + int chosenNodeId = bestAssignment[i]; + result.put(regionId, availableDataNodeMap.get(chosenNodeId)); + + regionCounter[chosenNodeId]++; + String db = regionDatabaseMap.get(regionId); + if (db != null) { + int[] dbLoad = initialDbLoad.computeIfAbsent(db, k -> new int[regionCounter.length]); + dbLoad[chosenNodeId]++; + } + for (TDataNodeLocation loc : remainReplicasMap.get(regionId).getDataNodeLocations()) { + combinationCounter[chosenNodeId][loc.getDataNodeId()]++; + combinationCounter[loc.getDataNodeId()][chosenNodeId]++; + } + } } return result; } finally { @@ -244,99 +278,64 @@ public Map removeNodeReplicaSelect( *

DFS search is pruned if the optimalAssignments buffer reaches CAPACITY. * * @param index Current DFS level, corresponding to regionKeys.get(index) - * @param regionKeys A list of regions that need to be migrated. - * @param allowedCandidatesMap For each region, the allowed candidate destination node IDs. + * @param currentMaxGlobalLoad The maximum global load across all data nodes. + * @param currentScatter The scatter value for the complete assignment. * @param currentAssignment Current partial assignment; its length equals the number of regions. * @param additionalLoad Extra load currently assigned to each node. - * @param optimalAssignments Buffer holding candidate assignment arrays. - * @param bestMetrics An int array holding the best metrics found so far: [maxGlobalLoad, - * maxDatabaseLoad, scatterValue]. - * @param remainReplicasMap Mapping from region to its current remain replica set. */ private void dfsRemoveNodeReplica( int index, - List regionKeys, - Map> allowedCandidatesMap, + int currentMaxGlobalLoad, + int currentScatter, int[] currentAssignment, - int[] additionalLoad, - List optimalAssignments, - int[] bestMetrics, - Map remainReplicasMap, - Map regionDatabaseMap) { - int n = regionKeys.size(); - if (index == n) { - // A complete assignment has been generated. - // Compute metrics for this complete migration assignment. - - // Compute the scatter value for the complete assignment. - int currentScatter = 0; - // For each region, calculate the scatter based on the combinationCounter among all nodes - // in the full replica set (which includes the nodes in the remain replica and the new - // candidate). - for (int r = 0; r < n; r++) { - TConsensusGroupId regionId = regionKeys.get(r); - for (TDataNodeLocation location : remainReplicasMap.get(regionId).getDataNodeLocations()) { - int nodeA = currentAssignment[r]; - int nodeB = location.getDataNodeId(); - currentScatter += combinationCounter[nodeA][nodeB]; - } + int[] additionalLoad) { + // Compute the maximum global load and maximum database load among all nodes that received + // additional load. + int[] currentMetrics = getCurrentMetrics(additionalLoad, currentScatter, currentAssignment); + // Lexicographically compare currentMetrics with bestMetrics. + // If currentMetrics is better than bestMetrics, update bestMetrics and clear the candidate + // buffer. + boolean isBetter = false; + boolean isEqual = true; + for (int i = 0; i < 3; i++) { + if (currentMetrics[i] < bestMetrics[i]) { + isBetter = true; + isEqual = false; + break; + } else if (currentMetrics[i] > bestMetrics[i]) { + isEqual = false; + break; } + } + if (!isBetter && !isEqual) { + return; + } - // Compute the maximum global load and maximum database load among all nodes that received - // additional load. - int[] currentMetrics = - getCurrentMetrics( - additionalLoad, currentScatter, regionKeys, regionDatabaseMap, currentAssignment); - - // Lexicographically compare currentMetrics with bestMetrics. - // If currentMetrics is better than bestMetrics, update bestMetrics and clear the candidate - // buffer. - boolean isBetter = false; - boolean isEqual = true; - for (int i = 0; i < 3; i++) { - if (currentMetrics[i] < bestMetrics[i]) { - isBetter = true; - isEqual = false; - break; - } else if (currentMetrics[i] > bestMetrics[i]) { - isEqual = false; - break; - } - } + if (index == dfsRegionKeys.size()) { if (isBetter) { bestMetrics[0] = currentMetrics[0]; bestMetrics[1] = currentMetrics[1]; bestMetrics[2] = currentMetrics[2]; - optimalAssignments.clear(); - optimalAssignments.add(Arrays.copyOf(currentAssignment, n)); - } else if (isEqual) { - optimalAssignments.add(Arrays.copyOf(currentAssignment, n)); - // Prune search if we already have enough candidate solutions - if (optimalAssignments.size() >= GCR_MAX_OPTIMAL_PLAN_NUM) { - return; - } + System.arraycopy(currentAssignment, 0, bestAssignment, 0, dfsRegionKeys.size()); } return; } // Process the region at the current index. - TConsensusGroupId regionId = regionKeys.get(index); + TConsensusGroupId regionId = dfsRegionKeys.get(index); List candidates = allowedCandidatesMap.get(regionId); for (Integer candidate : candidates) { currentAssignment[index] = candidate; + currentScatter += scatterDelta[index][currentAssignment[index]]; additionalLoad[candidate]++; + int nextMaxGlobalLoad = + Math.max(currentMaxGlobalLoad, regionCounter[candidate] + additionalLoad[candidate]); + dfsRemoveNodeReplica( - index + 1, - regionKeys, - allowedCandidatesMap, - currentAssignment, - additionalLoad, - optimalAssignments, - bestMetrics, - remainReplicasMap, - regionDatabaseMap); + index + 1, nextMaxGlobalLoad, currentScatter, currentAssignment, additionalLoad); // Backtrack additionalLoad[candidate]--; + currentScatter -= scatterDelta[index][currentAssignment[index]]; } } @@ -411,20 +410,12 @@ private int computeDatabaseLoadSquaredSum( * @param additionalLoad an array representing the additional load assigned to each node during * migration. * @param currentScatter the current scatter value metric. - * @param regionKeys a list of region identifiers (TConsensusGroupId) for which migration is being - * computed. - * @param regionDatabaseMap a mapping from each region identifier to its corresponding database - * name. * @param currentAssignment an array where each element is the nodeId assigned for the * corresponding region in {@code regionKeys}. * @return an integer array of size 3: [maxGlobalLoad, databaseLoadSquaredSum, scatterValue]. */ private int[] getCurrentMetrics( - int[] additionalLoad, - int currentScatter, - List regionKeys, - Map regionDatabaseMap, - int[] currentAssignment) { + int[] additionalLoad, int currentScatter, int[] currentAssignment) { int currentMaxGlobalLoad = 0; // Calculate the maximum global load across all data nodes. for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) { @@ -433,7 +424,7 @@ private int[] getCurrentMetrics( } // Compute the database load squared sum using the helper method. int dbLoadSquaredSum = - computeDatabaseLoadSquaredSum(currentAssignment, regionKeys, regionDatabaseMap); + computeDatabaseLoadSquaredSum(currentAssignment, dfsRegionKeys, regionDatabaseMap); // Build current metrics in order [maxGlobalLoad, maxDatabaseLoad, scatterValue] return new int[] {currentMaxGlobalLoad, dbLoadSquaredSum, currentScatter}; } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java index 557fba8ba7a21..0dd73b77f167d 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java @@ -54,7 +54,7 @@ public class GreedyCopySetRemoveNodeReplicaSelectTest { private static final int TEST_DATA_NODE_NUM = 5; - private static final int DATA_REGION_PER_DATA_NODE = 4; + private static final int DATA_REGION_PER_DATA_NODE = 30; private static final int DATA_REPLICATION_FACTOR = 2; @@ -128,6 +128,14 @@ public void testSelectDestNode() { PGPRegionCounter.put(nodeId, 0); }); + for (TRegionReplicaSet replicaSet : allocateResult) { + for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) { + randomRegionCounter.put( + loc.getDataNodeId(), randomRegionCounter.get(loc.getDataNodeId()) + 1); + PGPRegionCounter.put(loc.getDataNodeId(), PGPRegionCounter.get(loc.getDataNodeId()) + 1); + } + } + for (TRegionReplicaSet remainReplicaSet : remainReplicas) { TDataNodeLocation selectedNode = randomSelectNodeForRegion(remainReplicaSet.getDataNodeLocations()).get(); @@ -181,22 +189,31 @@ public void testSelectDestNode() { PGPRegionCounter.get(selectedNode.getLocation().getDataNodeId()) + 1); } + LOGGER.info("randomRegionCount:"); + for (Integer i : randomRegionCounter.keySet()) { Integer value = randomRegionCounter.get(i); randomMaxRegionCount = Math.max(randomMaxRegionCount, value); randomMinRegionCount = Math.min(randomMinRegionCount, value); + LOGGER.info("{} : {}", i, value); } + LOGGER.info("PGPRegionCount:"); + for (Integer i : PGPRegionCounter.keySet()) { Integer value = PGPRegionCounter.get(i); PGPMaxRegionCount = Math.max(PGPMaxRegionCount, value); PGPMinRegionCount = Math.min(PGPMinRegionCount, value); + LOGGER.info("{} : {}", i, value); } + LOGGER.info("PGPSelectedNodeIds size: {}", PGPSelectedNodeIds.size()); Assert.assertEquals(TEST_DATA_NODE_NUM - 1, PGPSelectedNodeIds.size()); + LOGGER.info("randomSelectedNodeIds size: {}", randomSelectedNodeIds.size()); Assert.assertTrue(PGPSelectedNodeIds.size() >= randomSelectedNodeIds.size()); + LOGGER.info( + "randomMaxRegionCount: {}, PGPMaxRegionCount: {}", randomMaxRegionCount, PGPMaxRegionCount); Assert.assertTrue(randomMaxRegionCount >= PGPMaxRegionCount); - Assert.assertTrue(randomMinRegionCount <= PGPMinRegionCount); } @Test @@ -274,6 +291,13 @@ public void testSelectDestNodeMultiDatabase() { planCount.put(n, 0); }); + for (TRegionReplicaSet replicaSet : globalAllocatedList) { + for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) { + rndCount.merge(loc.getDataNodeId(), 1, Integer::sum); + planCount.merge(loc.getDataNodeId(), 1, Integer::sum); + } + } + for (TRegionReplicaSet r : remainReplicas) { TDataNodeLocation pick = randomSelectNodeForRegion(r.getDataNodeLocations()).get(); LOGGER.info("Random Selected DataNode {} for Region {}", pick.getDataNodeId(), r.regionId); @@ -325,7 +349,6 @@ public void testSelectDestNodeMultiDatabase() { Assert.assertEquals(TEST_DATA_NODE_NUM - 1, planNodes.size()); Assert.assertTrue(planNodes.size() >= rndNodes.size()); Assert.assertTrue(rndMax >= planMax); - Assert.assertTrue(rndMin <= planMin); } private Optional randomSelectNodeForRegion( From fd2ef1e0a2a4ff3de3638f6f8de646bdea120c20 Mon Sep 17 00:00:00 2001 From: Xiangpeng Hu <65238551+HxpSerein@users.noreply.github.com> Date: Tue, 17 Jun 2025 10:21:45 +0800 Subject: [PATCH 4/4] [remove datanode] Fix ArrayIndexOutOfBoundsException in computeInitialDbLoad (#15718) (cherry picked from commit 346ee720e7a4a561e28bc210f9ec6111c45a1a46) --- .../region/GreedyCopySetRegionGroupAllocator.java | 8 ++++++-- .../GreedyCopySetRemoveNodeReplicaSelectTest.java | 11 ++++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java index bc71bd3996e06..ae5f1c3eb44ad 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java @@ -155,7 +155,7 @@ public Map removeNodeReplicaSelect( // 1. prepare: compute regionCounter, databaseRegionCounter, and combinationCounter prepare(availableDataNodeMap, allocatedRegionGroups, Collections.emptyList()); - computeInitialDbLoad(databaseAllocatedRegionGroupMap); + computeInitialDbLoad(availableDataNodeMap, databaseAllocatedRegionGroupMap); // 2. Build allowed candidate set for each region that needs to be migrated. // For each region in remainReplicasMap, the candidate destination nodes are all nodes in @@ -432,9 +432,11 @@ private int[] getCurrentMetrics( /** * Compute the initial load for each database on each data node. * + * @param availableDataNodeMap currently available DataNodes, ensure size() >= replicationFactor * @param databaseAllocatedRegionGroupMap Mapping of each database to its list of replica sets. */ private void computeInitialDbLoad( + Map availableDataNodeMap, Map> databaseAllocatedRegionGroupMap) { initialDbLoad = new HashMap<>(); @@ -446,7 +448,9 @@ private void computeInitialDbLoad( for (TRegionReplicaSet replicaSet : replicaSets) { for (TDataNodeLocation location : replicaSet.getDataNodeLocations()) { int nodeId = location.getDataNodeId(); - load[nodeId]++; + if (availableDataNodeMap.containsKey(nodeId)) { + load[nodeId]++; + } } } initialDbLoad.put(database, load); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java index 0dd73b77f167d..e28b4dda18f71 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java @@ -81,15 +81,20 @@ public void testSelectDestNode() { DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM / DATA_REPLICATION_FACTOR; List allocateResult = new ArrayList<>(); + List databaseAllocateResult = new ArrayList<>(); for (int index = 0; index < dataRegionGroupNum; index++) { - allocateResult.add( + TRegionReplicaSet replicaSet = GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution( AVAILABLE_DATA_NODE_MAP, FREE_SPACE_MAP, allocateResult, allocateResult, DATA_REPLICATION_FACTOR, - new TConsensusGroupId(TConsensusGroupType.DataRegion, index))); + new TConsensusGroupId(TConsensusGroupType.DataRegion, index)); + TRegionReplicaSet replicaSetCopy = new TRegionReplicaSet(replicaSet); + + allocateResult.add(replicaSet); + databaseAllocateResult.add(replicaSetCopy); } List migratedReplicas = @@ -158,7 +163,7 @@ public void testSelectDestNode() { } Map remainReplicasMap = new HashMap<>(); Map> databaseAllocatedRegionGroupMap = new HashMap<>(); - databaseAllocatedRegionGroupMap.put("database", allocateResult); + databaseAllocatedRegionGroupMap.put("database", databaseAllocateResult); for (TRegionReplicaSet remainReplicaSet : remainReplicas) { remainReplicasMap.put(remainReplicaSet.getRegionId(), remainReplicaSet);