diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java index 2577455d41cd1..ae5f1c3eb44ad 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java @@ -27,10 +27,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.stream.Collectors; import static java.util.Map.Entry.comparingByValue; @@ -39,7 +41,7 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator { private static final Random RANDOM = new Random(); - private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 100; + private static final int GCR_MAX_OPTIMAL_PLAN_NUM = 10; private int replicationFactor; // Sorted available DataNodeIds @@ -50,16 +52,35 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator private int[] databaseRegionCounter; // The number of 2-Region combinations in current cluster private int[][] combinationCounter; + // The initial load for each database on each datanode + private Map initialDbLoad; // First Key: the sum of Regions at the DataNodes in the allocation result is minimal - int optimalRegionSum; + private int optimalRegionSum; // Second Key: the sum of Regions at the DataNodes within the same Database // in the allocation result is minimal - int optimalDatabaseRegionSum; + private int optimalDatabaseRegionSum; // Third Key: the sum of overlapped 2-Region combination Regions with // other allocated RegionGroups is minimal - int optimalCombinationSum; - List optimalReplicaSets; + private int optimalCombinationSum; + private List optimalReplicaSets; + + // Pre-calculation, scatterDelta[i][j] means the scatter increment between region i and the old + // replica set when region i is placed on node j + private int[][] scatterDelta; + // For each region, the allowed candidate destination node IDs. + private Map> allowedCandidatesMap; + // A list of regions that need to be migrated. + private List dfsRegionKeys; + // A mapping from each region identifier to its corresponding database name. + private Map regionDatabaseMap; + // Buffer holding best assignment arrays. + private int[] bestAssignment; + // An int array holding the best metrics found so far: [maxGlobalLoad, maxDatabaseLoad, + // scatterValue]. + private int[] bestMetrics; + // dfsRemoveNodeReplica batch size + private static final int BATCH_SIZE = 12; private static class DataNodeEntry { @@ -103,12 +124,9 @@ public TRegionReplicaSet generateOptimalRegionReplicasDistribution( int replicationFactor, TConsensusGroupId consensusGroupId) { try { - prepare( - replicationFactor, - availableDataNodeMap, - allocatedRegionGroups, - databaseAllocatedRegionGroups); - dfs(-1, 0, new int[replicationFactor], 0, 0); + this.replicationFactor = replicationFactor; + prepare(availableDataNodeMap, allocatedRegionGroups, databaseAllocatedRegionGroups); + dfsAllocateReplica(-1, 0, new int[replicationFactor], 0, 0); // Randomly pick one optimal plan as result Collections.shuffle(optimalReplicaSets); @@ -125,21 +143,332 @@ public TRegionReplicaSet generateOptimalRegionReplicasDistribution( } } + @Override + public Map removeNodeReplicaSelect( + Map availableDataNodeMap, + Map freeDiskSpaceMap, + List allocatedRegionGroups, + Map regionDatabaseMap, + Map> databaseAllocatedRegionGroupMap, + Map remainReplicasMap) { + try { + // 1. prepare: compute regionCounter, databaseRegionCounter, and combinationCounter + + prepare(availableDataNodeMap, allocatedRegionGroups, Collections.emptyList()); + computeInitialDbLoad(availableDataNodeMap, databaseAllocatedRegionGroupMap); + + // 2. Build allowed candidate set for each region that needs to be migrated. + // For each region in remainReplicasMap, the candidate destination nodes are all nodes in + // availableDataNodeMap + // excluding those already in the remain replica set. + List regionKeys = new ArrayList<>(remainReplicasMap.keySet()); + allowedCandidatesMap = new HashMap<>(); + this.regionDatabaseMap = regionDatabaseMap; + for (TConsensusGroupId regionId : regionKeys) { + TRegionReplicaSet remainReplicaSet = remainReplicasMap.get(regionId); + Set notAllowedNodes = + remainReplicaSet.getDataNodeLocations().stream() + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet()); + + // Allowed candidates are the nodes not in the exclusion set + List candidates = + availableDataNodeMap.keySet().stream() + .filter(nodeId -> !notAllowedNodes.contains(nodeId)) + .sorted( + (a, b) -> { + int cmp = Integer.compare(regionCounter[a], regionCounter[b]); + return (cmp != 0) + ? cmp + : Integer.compare(databaseRegionCounter[a], databaseRegionCounter[b]); + }) + .collect(Collectors.toList()); + Collections.shuffle(candidates); + + // Sort candidates in ascending order of current global load (regionCounter) + allowedCandidatesMap.put(regionId, candidates); + } + + // Optionally, sort regionKeys by the size of its candidate list (smaller candidate sets + // first) + regionKeys.sort(Comparator.comparingInt(id -> allowedCandidatesMap.get(id).size())); + + // 3. Batch DFS + Map result = new HashMap<>(); + + for (int start = 0; start < regionKeys.size(); start += BATCH_SIZE) { + dfsRegionKeys = regionKeys.subList(start, Math.min(start + BATCH_SIZE, regionKeys.size())); + int batchSize = dfsRegionKeys.size(); + + // Initialize buffer + bestAssignment = new int[batchSize]; + // bestMetrics holds the best found metrics: [maxGlobalLoad, maxDatabaseLoad, scatterValue]. + bestMetrics = new int[] {Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE}; + // currentAssignment holds the candidate nodeId chosen for the region at that index + int[] currentAssignment = new int[batchSize]; + // additionalLoad holds the number of extra regions assigned to each node in this migration + // solution. + int[] additionalLoad = new int[regionCounter.length]; + + scatterDelta = new int[batchSize][regionCounter.length]; + for (int r = 0; r < batchSize; r++) { + TConsensusGroupId regionId = dfsRegionKeys.get(r); + for (int nodeId : allowedCandidatesMap.get(regionId)) { + int inc = 0; + for (TDataNodeLocation loc : remainReplicasMap.get(regionId).getDataNodeLocations()) { + inc += combinationCounter[nodeId][loc.getDataNodeId()]; + } + scatterDelta[r][nodeId] = inc; + } + } + + int currentMaxGlobalLoad = 0; + for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) { + currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, regionCounter[nodeId]); + } + + dfsRemoveNodeReplica(0, currentMaxGlobalLoad, 0, currentAssignment, additionalLoad); + + if (bestMetrics[0] == Integer.MAX_VALUE) { + // This should not happen if there is at least one valid assignment + return Collections.emptyMap(); + } + + for (int i = 0; i < batchSize; i++) { + TConsensusGroupId regionId = dfsRegionKeys.get(i); + int chosenNodeId = bestAssignment[i]; + result.put(regionId, availableDataNodeMap.get(chosenNodeId)); + + regionCounter[chosenNodeId]++; + String db = regionDatabaseMap.get(regionId); + if (db != null) { + int[] dbLoad = initialDbLoad.computeIfAbsent(db, k -> new int[regionCounter.length]); + dbLoad[chosenNodeId]++; + } + for (TDataNodeLocation loc : remainReplicasMap.get(regionId).getDataNodeLocations()) { + combinationCounter[chosenNodeId][loc.getDataNodeId()]++; + combinationCounter[loc.getDataNodeId()][chosenNodeId]++; + } + } + } + return result; + } finally { + // Clear any temporary state to avoid impacting subsequent calls + clear(); + } + } + + /** + * DFS method that searches for migration target assignments. + * + *

It enumerates all possible assignments (one candidate for each region) and collects + * candidate solutions in the optimalAssignments buffer. The evaluation metrics for each complete + * assignment (i.e. when index == regionKeys.size()) are: + * + *

1. Max global load: the maximum over nodes of (regionCounter[node] + additionalLoad[node]) + * 2. Max database load: the maximum over nodes of (databaseRegionCounter[node] + + * additionalLoad[node]) 3. Scatter value: computed per region, summing the combinationCounter for + * every pair in the complete replica set. The complete replica set for a region includes nodes in + * its remain replica set plus the newly assigned node. + * + *

The candidates are compared lexicographically (first by global load, then by database load, + * then by scatter). When a better candidate is found, the optimalAssignments buffer is cleared + * and updated; if the new candidate matches the best found metrics, it is added to the buffer. + * + *

DFS search is pruned if the optimalAssignments buffer reaches CAPACITY. + * + * @param index Current DFS level, corresponding to regionKeys.get(index) + * @param currentMaxGlobalLoad The maximum global load across all data nodes. + * @param currentScatter The scatter value for the complete assignment. + * @param currentAssignment Current partial assignment; its length equals the number of regions. + * @param additionalLoad Extra load currently assigned to each node. + */ + private void dfsRemoveNodeReplica( + int index, + int currentMaxGlobalLoad, + int currentScatter, + int[] currentAssignment, + int[] additionalLoad) { + // Compute the maximum global load and maximum database load among all nodes that received + // additional load. + int[] currentMetrics = getCurrentMetrics(additionalLoad, currentScatter, currentAssignment); + // Lexicographically compare currentMetrics with bestMetrics. + // If currentMetrics is better than bestMetrics, update bestMetrics and clear the candidate + // buffer. + boolean isBetter = false; + boolean isEqual = true; + for (int i = 0; i < 3; i++) { + if (currentMetrics[i] < bestMetrics[i]) { + isBetter = true; + isEqual = false; + break; + } else if (currentMetrics[i] > bestMetrics[i]) { + isEqual = false; + break; + } + } + if (!isBetter && !isEqual) { + return; + } + + if (index == dfsRegionKeys.size()) { + if (isBetter) { + bestMetrics[0] = currentMetrics[0]; + bestMetrics[1] = currentMetrics[1]; + bestMetrics[2] = currentMetrics[2]; + System.arraycopy(currentAssignment, 0, bestAssignment, 0, dfsRegionKeys.size()); + } + return; + } + + // Process the region at the current index. + TConsensusGroupId regionId = dfsRegionKeys.get(index); + List candidates = allowedCandidatesMap.get(regionId); + for (Integer candidate : candidates) { + currentAssignment[index] = candidate; + currentScatter += scatterDelta[index][currentAssignment[index]]; + additionalLoad[candidate]++; + int nextMaxGlobalLoad = + Math.max(currentMaxGlobalLoad, regionCounter[candidate] + additionalLoad[candidate]); + + dfsRemoveNodeReplica( + index + 1, nextMaxGlobalLoad, currentScatter, currentAssignment, additionalLoad); + // Backtrack + additionalLoad[candidate]--; + currentScatter -= scatterDelta[index][currentAssignment[index]]; + } + } + + /** + * Computes the squared sum of the maximum load for each database. + * + *

For each database, this method calculates the maximum load on any data node by summing the + * initial load (from {@code initialDbLoad}) with the additional load assigned during migration + * (accumulated in {@code currentAssignment}), and then squares this maximum load. Finally, it + * returns the sum of these squared maximum loads across all databases. + * + * @param currentAssignment an array where each element is the nodeId assigned for the + * corresponding region in {@code regionKeys}. + * @param regionKeys a list of region identifiers (TConsensusGroupId) representing the regions + * under migration. + * @param regionDatabaseMap a mapping from each region identifier to its corresponding database + * name. + * @return the sum of the squares of the maximum loads computed for each database. + */ + private int computeDatabaseLoadSquaredSum( + int[] currentAssignment, + List regionKeys, + Map regionDatabaseMap) { + Map extraLoadPerDb = new HashMap<>(); + // Initialize extra load counters for each database using the number of nodes from + // regionCounter. + for (String db : initialDbLoad.keySet()) { + extraLoadPerDb.put(db, new int[regionCounter.length]); + } + // Accumulate extra load per database based on the current assignment. + for (int i = 0; i < regionKeys.size(); i++) { + TConsensusGroupId regionId = regionKeys.get(i); + String db = regionDatabaseMap.get(regionId); + int nodeId = currentAssignment[i]; + extraLoadPerDb.get(db)[nodeId]++; + } + int sumSquared = 0; + // For each database, compute the maximum load across nodes and add its square to the sum. + for (String db : initialDbLoad.keySet()) { + int[] initLoads = initialDbLoad.get(db); + int[] extras = extraLoadPerDb.get(db); + int maxLoad = 0; + for (int nodeId = 0; nodeId < regionCounter.length; nodeId++) { + int load = initLoads[nodeId] + extras[nodeId]; + if (load > maxLoad) { + maxLoad = load; + } + } + sumSquared += maxLoad * maxLoad; + } + return sumSquared; + } + + /** + * Computes the current migration metrics. + * + *

This method calculates three key metrics: + * + *

    + *
  1. Max Global Load: The maximum load among all nodes, computed as the sum + * of the initial region load (from {@code regionCounter}) and the additional load (from + * {@code additionalLoad}). + *
  2. Database Load Squared Sum: The squared sum of the maximum load per + * database, which is computed by {@link #computeDatabaseLoadSquaredSum(int[], List, Map)}. + *
  3. Scatter Value: A provided metric that reflects additional balancing + * criteria. + *
+ * + * The metrics are returned as an array of three integers in the order: [maxGlobalLoad, + * databaseLoadSquaredSum, scatterValue]. + * + * @param additionalLoad an array representing the additional load assigned to each node during + * migration. + * @param currentScatter the current scatter value metric. + * @param currentAssignment an array where each element is the nodeId assigned for the + * corresponding region in {@code regionKeys}. + * @return an integer array of size 3: [maxGlobalLoad, databaseLoadSquaredSum, scatterValue]. + */ + private int[] getCurrentMetrics( + int[] additionalLoad, int currentScatter, int[] currentAssignment) { + int currentMaxGlobalLoad = 0; + // Calculate the maximum global load across all data nodes. + for (int nodeId = 0; nodeId < additionalLoad.length; nodeId++) { + int globalLoad = regionCounter[nodeId] + additionalLoad[nodeId]; + currentMaxGlobalLoad = Math.max(currentMaxGlobalLoad, globalLoad); + } + // Compute the database load squared sum using the helper method. + int dbLoadSquaredSum = + computeDatabaseLoadSquaredSum(currentAssignment, dfsRegionKeys, regionDatabaseMap); + // Build current metrics in order [maxGlobalLoad, maxDatabaseLoad, scatterValue] + return new int[] {currentMaxGlobalLoad, dbLoadSquaredSum, currentScatter}; + } + + /** + * Compute the initial load for each database on each data node. + * + * @param availableDataNodeMap currently available DataNodes, ensure size() >= replicationFactor + * @param databaseAllocatedRegionGroupMap Mapping of each database to its list of replica sets. + */ + private void computeInitialDbLoad( + Map availableDataNodeMap, + Map> databaseAllocatedRegionGroupMap) { + initialDbLoad = new HashMap<>(); + + // Iterate over each database and count the number of regions on each data node across all its + // replica sets. + for (String database : databaseAllocatedRegionGroupMap.keySet()) { + List replicaSets = databaseAllocatedRegionGroupMap.get(database); + int[] load = new int[regionCounter.length]; + for (TRegionReplicaSet replicaSet : replicaSets) { + for (TDataNodeLocation location : replicaSet.getDataNodeLocations()) { + int nodeId = location.getDataNodeId(); + if (availableDataNodeMap.containsKey(nodeId)) { + load[nodeId]++; + } + } + } + initialDbLoad.put(database, load); + } + } + /** * Prepare some statistics before dfs. * - * @param replicationFactor replication factor in the cluster * @param availableDataNodeMap currently available DataNodes, ensure size() >= replicationFactor * @param allocatedRegionGroups already allocated RegionGroups in the cluster * @param databaseAllocatedRegionGroups already allocated RegionGroups in the same Database */ private void prepare( - int replicationFactor, Map availableDataNodeMap, List allocatedRegionGroups, List databaseAllocatedRegionGroups) { - this.replicationFactor = replicationFactor; // Store the maximum DataNodeId int maxDataNodeId = Math.max( @@ -225,7 +554,7 @@ private void prepare( * current allocation plan * @param regionSum the sum of Regions at the DataNodes in the current allocation plan */ - private void dfs( + private void dfsAllocateReplica( int lastIndex, int currentReplica, int[] currentReplicaSet, @@ -274,7 +603,7 @@ private void dfs( for (int i = lastIndex + 1; i < dataNodeIds.length; i++) { // Decide the next DataNodeId in the allocation plan currentReplicaSet[currentReplica] = dataNodeIds[i]; - dfs( + dfsAllocateReplica( i, currentReplica + 1, currentReplicaSet, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java index d05a8accbec3b..9c8f032cee082 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyRegionGroupAllocator.java @@ -57,6 +57,19 @@ public TRegionReplicaSet generateOptimalRegionReplicasDistribution( weightList.stream().limit(replicationFactor).collect(Collectors.toList())); } + @Override + public Map removeNodeReplicaSelect( + Map availableDataNodeMap, + Map freeDiskSpaceMap, + List allocatedRegionGroups, + Map regionDatabaseMap, + Map> databaseAllocatedRegionGroupMap, + Map remainReplicasMap) { + // TODO: Implement this method + throw new UnsupportedOperationException( + "The removeNodeReplicaSelect method of GreedyRegionGroupAllocator is yet to be implemented."); + } + private List buildWeightList( Map availableDataNodeMap, Map freeDiskSpaceMap, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java index 554168d849735..24200548163dd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/IRegionGroupAllocator.java @@ -47,4 +47,25 @@ TRegionReplicaSet generateOptimalRegionReplicasDistribution( List databaseAllocatedRegionGroups, int replicationFactor, TConsensusGroupId consensusGroupId); + + /** + * Select the optimal DataNode to place the new replica on along with the remaining replica set. + * + * @param availableDataNodeMap DataNodes that can be used for allocation + * @param freeDiskSpaceMap The free disk space of the DataNodes + * @param allocatedRegionGroups Allocated RegionGroups + * @param regionDatabaseMap A mapping from each region identifier to its corresponding database + * name + * @param databaseAllocatedRegionGroupMap Allocated RegionGroups within the same Database with the + * replica set + * @param remainReplicasMap the remaining replica set excluding the removed DataNodes + * @return The optimal DataNode to place the new replica on along with the remaining replicas + */ + Map removeNodeReplicaSelect( + Map availableDataNodeMap, + Map freeDiskSpaceMap, + List allocatedRegionGroups, + Map regionDatabaseMap, + Map> databaseAllocatedRegionGroupMap, + Map remainReplicasMap); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java index 1205e0abc067c..4a2237805ad8a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphReplicationRegionGroupAllocator.java @@ -113,6 +113,19 @@ public TRegionReplicaSet generateOptimalRegionReplicasDistribution( return result; } + @Override + public Map removeNodeReplicaSelect( + Map availableDataNodeMap, + Map freeDiskSpaceMap, + List allocatedRegionGroups, + Map regionDatabaseMap, + Map> databaseAllocatedRegionGroupMap, + Map remainReplicasMap) { + // TODO: Implement this method + throw new UnsupportedOperationException( + "The removeNodeReplicaSelect method of PartiteGraphPlacementRegionGroupAllocator is yet to be implemented."); + } + private void prepare( int replicationFactor, Map availableDataNodeMap, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java index eaa16f47907f4..980dd712eb0fe 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RemoveDataNodeHandler.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.procedure.env; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; @@ -37,6 +38,8 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp; import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator; +import org.apache.iotdb.confignode.manager.load.balancer.region.IRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; @@ -51,10 +54,13 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS; @@ -70,8 +76,22 @@ public class RemoveDataNodeHandler { private final ConfigManager configManager; + private final IRegionGroupAllocator regionGroupAllocator; + public RemoveDataNodeHandler(ConfigManager configManager) { this.configManager = configManager; + + switch (ConfigNodeDescriptor.getInstance().getConf().getRegionGroupAllocatePolicy()) { + case GREEDY: + this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator(); + break; + case PGR: + this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator(); + break; + case GCR: + default: + this.regionGroupAllocator = new GreedyCopySetRegionGroupAllocator(); + } } /** @@ -193,6 +213,172 @@ public List getRegionMigrationPlans( return regionMigrationPlans; } + /** + * Retrieves all region migration plans for the specified removed DataNodes and selects the + * destination. + * + * @param removedDataNodes the list of DataNodes from which to obtain migration plans + * @return a list of region migration plans associated with the removed DataNodes + */ + public List selectedRegionMigrationPlans( + List removedDataNodes) { + + Set removedDataNodesSet = new HashSet<>(); + for (TDataNodeLocation removedDataNode : removedDataNodes) { + removedDataNodesSet.add(removedDataNode.dataNodeId); + } + + final List availableDataNodes = + configManager + .getNodeManager() + .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown) + .stream() + .filter(node -> !removedDataNodesSet.contains(node.getLocation().getDataNodeId())) + .collect(Collectors.toList()); + + List regionMigrationPlans = new ArrayList<>(); + + regionMigrationPlans.addAll( + selectMigrationPlans(availableDataNodes, TConsensusGroupType.DataRegion, removedDataNodes)); + + regionMigrationPlans.addAll( + selectMigrationPlans( + availableDataNodes, TConsensusGroupType.SchemaRegion, removedDataNodes)); + + return regionMigrationPlans; + } + + public List selectMigrationPlans( + List availableDataNodes, + TConsensusGroupType consensusGroupType, + List removedDataNodes) { + + // Retrieve all allocated replica sets for the given consensus group type + List allocatedReplicaSets = + configManager.getPartitionManager().getAllReplicaSets(consensusGroupType); + + // Step 1: Identify affected replica sets and record the removed DataNode for each replica set + Map removedNodeMap = new HashMap<>(); + Set affectedReplicaSets = + identifyAffectedReplicaSets(allocatedReplicaSets, removedDataNodes, removedNodeMap); + + // Step 2: Update affected replica sets by removing the removed DataNode + updateReplicaSets(allocatedReplicaSets, affectedReplicaSets, removedNodeMap); + + // Build a mapping of available DataNodes and their free disk space (computed only once) + Map availableDataNodeMap = + buildAvailableDataNodeMap(availableDataNodes); + Map freeDiskSpaceMap = buildFreeDiskSpaceMap(availableDataNodes); + + // Step 3: For each affected replica set, select a new destination DataNode and create a + // migration plan + List migrationPlans = new ArrayList<>(); + + Map remainReplicasMap = new HashMap<>(); + Map regionDatabaseMap = new HashMap<>(); + Map> databaseAllocatedRegionGroupMap = new HashMap<>(); + + for (TRegionReplicaSet replicaSet : affectedReplicaSets) { + remainReplicasMap.put(replicaSet.getRegionId(), replicaSet); + String database = + configManager.getPartitionManager().getRegionDatabase(replicaSet.getRegionId()); + List databaseAllocatedReplicaSets = + configManager.getPartitionManager().getAllReplicaSets(database, consensusGroupType); + regionDatabaseMap.put(replicaSet.getRegionId(), database); + databaseAllocatedRegionGroupMap.put(database, databaseAllocatedReplicaSets); + } + + Map result = + regionGroupAllocator.removeNodeReplicaSelect( + availableDataNodeMap, + freeDiskSpaceMap, + allocatedReplicaSets, + regionDatabaseMap, + databaseAllocatedRegionGroupMap, + remainReplicasMap); + + for (TConsensusGroupId regionId : result.keySet()) { + + TDataNodeConfiguration selectedNode = result.get(regionId); + LOGGER.info( + "Selected DataNode {} for Region {}", + selectedNode.getLocation().getDataNodeId(), + regionId); + + // Create the migration plan + RegionMigrationPlan plan = RegionMigrationPlan.create(regionId, removedNodeMap.get(regionId)); + plan.setToDataNode(selectedNode.getLocation()); + migrationPlans.add(plan); + } + return migrationPlans; + } + + /** + * Identifies affected replica sets from allocatedReplicaSets that contain any DataNode in + * removedDataNodes, and records the removed DataNode for each replica set. + */ + private Set identifyAffectedReplicaSets( + List allocatedReplicaSets, + List removedDataNodes, + Map removedNodeMap) { + + Set affectedReplicaSets = new HashSet<>(); + // Create a copy of allocatedReplicaSets to avoid concurrent modifications + List allocatedCopy = new ArrayList<>(allocatedReplicaSets); + + for (TDataNodeLocation removedNode : removedDataNodes) { + allocatedCopy.stream() + .filter(replicaSet -> replicaSet.getDataNodeLocations().contains(removedNode)) + .forEach( + replicaSet -> { + removedNodeMap.put(replicaSet.getRegionId(), removedNode); + affectedReplicaSets.add(replicaSet); + }); + } + return affectedReplicaSets; + } + + /** + * Updates each affected replica set by removing the removed DataNode from its list. The + * allocatedReplicaSets list is updated accordingly. + */ + private void updateReplicaSets( + List allocatedReplicaSets, + Set affectedReplicaSets, + Map removedNodeMap) { + for (TRegionReplicaSet replicaSet : affectedReplicaSets) { + // Remove the replica set, update its node list, then re-add it + allocatedReplicaSets.remove(replicaSet); + replicaSet.getDataNodeLocations().remove(removedNodeMap.get(replicaSet.getRegionId())); + allocatedReplicaSets.add(replicaSet); + } + } + + /** + * Constructs a mapping from DataNodeId to TDataNodeConfiguration from the available DataNodes. + */ + private Map buildAvailableDataNodeMap( + List availableDataNodes) { + return availableDataNodes.stream() + .collect( + Collectors.toMap( + dataNode -> dataNode.getLocation().getDataNodeId(), Function.identity())); + } + + /** Constructs a mapping of free disk space for each DataNode. */ + private Map buildFreeDiskSpaceMap( + List availableDataNodes) { + Map freeDiskSpaceMap = new HashMap<>(availableDataNodes.size()); + availableDataNodes.forEach( + dataNode -> + freeDiskSpaceMap.put( + dataNode.getLocation().getDataNodeId(), + configManager + .getLoadManager() + .getFreeDiskSpace(dataNode.getLocation().getDataNodeId()))); + return freeDiskSpaceMap; + } + /** * Broadcasts DataNodes' status change, preventing disabled DataNodes from accepting read or write * requests. diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java index 68b0550e0dfd0..a531d67955cec 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodesProcedure.java @@ -121,7 +121,8 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveDataNodeState removedDataNodes.forEach( dataNode -> removedNodeStatusMap.put(dataNode.getDataNodeId(), NodeStatus.Removing)); removeDataNodeHandler.changeDataNodeStatus(removedDataNodes, removedNodeStatusMap); - regionMigrationPlans = removeDataNodeHandler.getRegionMigrationPlans(removedDataNodes); + regionMigrationPlans = + removeDataNodeHandler.selectedRegionMigrationPlans(removedDataNodes); LOG.info( "{}, DataNode regions to be removed is {}", REMOVE_DATANODE_PROCESS, @@ -165,8 +166,7 @@ private void submitChildRegionMigrate(ConfigNodeProcedureEnv env) { regionMigrationPlan -> { TConsensusGroupId regionId = regionMigrationPlan.getRegionId(); TDataNodeLocation removedDataNode = regionMigrationPlan.getFromDataNode(); - TDataNodeLocation destDataNode = - env.getRegionMaintainHandler().findDestDataNode(regionId); + TDataNodeLocation destDataNode = regionMigrationPlan.getToDataNode(); // TODO: need to improve the coordinator selection method here, maybe through load // balancing and other means. final TDataNodeLocation coordinatorForAddPeer = diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java new file mode 100644 index 0000000000000..e28b4dda18f71 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRemoveNodeReplicaSelectTest.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class GreedyCopySetRemoveNodeReplicaSelectTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(GreedyCopySetRemoveNodeReplicaSelectTest.class); + + private static final IRegionGroupAllocator GCR_ALLOCATOR = + new GreedyCopySetRegionGroupAllocator(); + + private static final TDataNodeLocation REMOVE_DATANODE_LOCATION = + new TDataNodeLocation().setDataNodeId(5); + + private static final int TEST_DATA_NODE_NUM = 5; + + private static final int DATA_REGION_PER_DATA_NODE = 30; + + private static final int DATA_REPLICATION_FACTOR = 2; + + private static final Map AVAILABLE_DATA_NODE_MAP = + new HashMap<>(); + + private static final Map FREE_SPACE_MAP = new HashMap<>(); + + @Before + public void setUp() { + // Construct TEST_DATA_NODE_NUM DataNodes + AVAILABLE_DATA_NODE_MAP.clear(); + FREE_SPACE_MAP.clear(); + for (int i = 1; i <= TEST_DATA_NODE_NUM; i++) { + AVAILABLE_DATA_NODE_MAP.put( + i, new TDataNodeConfiguration().setLocation(new TDataNodeLocation().setDataNodeId(i))); + FREE_SPACE_MAP.put(i, Math.random()); + } + } + + @Test + public void testSelectDestNode() { + final int dataRegionGroupNum = + DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM / DATA_REPLICATION_FACTOR; + + List allocateResult = new ArrayList<>(); + List databaseAllocateResult = new ArrayList<>(); + for (int index = 0; index < dataRegionGroupNum; index++) { + TRegionReplicaSet replicaSet = + GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + allocateResult, + allocateResult, + DATA_REPLICATION_FACTOR, + new TConsensusGroupId(TConsensusGroupType.DataRegion, index)); + TRegionReplicaSet replicaSetCopy = new TRegionReplicaSet(replicaSet); + + allocateResult.add(replicaSet); + databaseAllocateResult.add(replicaSetCopy); + } + + List migratedReplicas = + allocateResult.stream() + .filter( + replicaSet -> replicaSet.getDataNodeLocations().contains(REMOVE_DATANODE_LOCATION)) + .collect(Collectors.toList()); + + AVAILABLE_DATA_NODE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId()); + FREE_SPACE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId()); + + List remainReplicas = new ArrayList<>(); + for (TRegionReplicaSet replicaSet : migratedReplicas) { + List dataNodeLocations = replicaSet.getDataNodeLocations(); + allocateResult.remove(replicaSet); + dataNodeLocations.remove(REMOVE_DATANODE_LOCATION); + allocateResult.add(replicaSet); + remainReplicas.add(replicaSet); + } + + Map randomRegionCounter = new HashMap<>(); + Map PGPRegionCounter = new HashMap<>(); + Set randomSelectedNodeIds = new HashSet<>(); + Set PGPSelectedNodeIds = new HashSet<>(); + + int randomMaxRegionCount = 0; + int randomMinRegionCount = Integer.MAX_VALUE; + int PGPMaxRegionCount = 0; + int PGPMinRegionCount = Integer.MAX_VALUE; + + AVAILABLE_DATA_NODE_MAP + .keySet() + .forEach( + nodeId -> { + randomRegionCounter.put(nodeId, 0); + PGPRegionCounter.put(nodeId, 0); + }); + + for (TRegionReplicaSet replicaSet : allocateResult) { + for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) { + randomRegionCounter.put( + loc.getDataNodeId(), randomRegionCounter.get(loc.getDataNodeId()) + 1); + PGPRegionCounter.put(loc.getDataNodeId(), PGPRegionCounter.get(loc.getDataNodeId()) + 1); + } + } + + for (TRegionReplicaSet remainReplicaSet : remainReplicas) { + TDataNodeLocation selectedNode = + randomSelectNodeForRegion(remainReplicaSet.getDataNodeLocations()).get(); + LOGGER.info( + "Random Selected DataNode {} for Region {}", + selectedNode.getDataNodeId(), + remainReplicaSet.regionId); + randomSelectedNodeIds.add(selectedNode.getDataNodeId()); + randomRegionCounter.put( + selectedNode.getDataNodeId(), randomRegionCounter.get(selectedNode.getDataNodeId()) + 1); + } + + LOGGER.info("Remain Replicas... :"); + for (TRegionReplicaSet remainReplicaSet : remainReplicas) { + LOGGER.info("Region Group Id: {}", remainReplicaSet.regionId.id); + List dataNodeLocations = remainReplicaSet.getDataNodeLocations(); + for (TDataNodeLocation dataNodeLocation : dataNodeLocations) { + LOGGER.info("DataNode: {}", dataNodeLocation.getDataNodeId()); + } + } + Map remainReplicasMap = new HashMap<>(); + Map> databaseAllocatedRegionGroupMap = new HashMap<>(); + databaseAllocatedRegionGroupMap.put("database", databaseAllocateResult); + + for (TRegionReplicaSet remainReplicaSet : remainReplicas) { + remainReplicasMap.put(remainReplicaSet.getRegionId(), remainReplicaSet); + } + Map regionDatabaseMap = new HashMap<>(); + for (TRegionReplicaSet replicaSet : allocateResult) { + regionDatabaseMap.put(replicaSet.getRegionId(), "database"); + } + Map result = + GCR_ALLOCATOR.removeNodeReplicaSelect( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + allocateResult, + regionDatabaseMap, + databaseAllocatedRegionGroupMap, + remainReplicasMap); + + for (TConsensusGroupId regionId : result.keySet()) { + TDataNodeConfiguration selectedNode = result.get(regionId); + + LOGGER.info( + "GCR Selected DataNode {} for Region {}", + selectedNode.getLocation().getDataNodeId(), + regionId); + PGPSelectedNodeIds.add(selectedNode.getLocation().getDataNodeId()); + PGPRegionCounter.put( + selectedNode.getLocation().getDataNodeId(), + PGPRegionCounter.get(selectedNode.getLocation().getDataNodeId()) + 1); + } + + LOGGER.info("randomRegionCount:"); + + for (Integer i : randomRegionCounter.keySet()) { + Integer value = randomRegionCounter.get(i); + randomMaxRegionCount = Math.max(randomMaxRegionCount, value); + randomMinRegionCount = Math.min(randomMinRegionCount, value); + LOGGER.info("{} : {}", i, value); + } + + LOGGER.info("PGPRegionCount:"); + + for (Integer i : PGPRegionCounter.keySet()) { + Integer value = PGPRegionCounter.get(i); + PGPMaxRegionCount = Math.max(PGPMaxRegionCount, value); + PGPMinRegionCount = Math.min(PGPMinRegionCount, value); + LOGGER.info("{} : {}", i, value); + } + + LOGGER.info("PGPSelectedNodeIds size: {}", PGPSelectedNodeIds.size()); + Assert.assertEquals(TEST_DATA_NODE_NUM - 1, PGPSelectedNodeIds.size()); + LOGGER.info("randomSelectedNodeIds size: {}", randomSelectedNodeIds.size()); + Assert.assertTrue(PGPSelectedNodeIds.size() >= randomSelectedNodeIds.size()); + LOGGER.info( + "randomMaxRegionCount: {}, PGPMaxRegionCount: {}", randomMaxRegionCount, PGPMaxRegionCount); + Assert.assertTrue(randomMaxRegionCount >= PGPMaxRegionCount); + } + + @Test + public void testSelectDestNodeMultiDatabase() { + // Pre‑allocate RegionReplicaSets for multiple databases + final String[] DB_NAMES = {"db0", "db1", "db2"}; + final int TOTAL_RG_NUM = + DATA_REGION_PER_DATA_NODE * TEST_DATA_NODE_NUM / DATA_REPLICATION_FACTOR; + + int basePerDb = TOTAL_RG_NUM / DB_NAMES.length; + int remainder = TOTAL_RG_NUM % DB_NAMES.length; // first DBs get one extra + + Map> dbAllocatedMap = new HashMap<>(); + List globalAllocatedList = new ArrayList<>(); + int globalIndex = 0; + + for (int dbIdx = 0; dbIdx < DB_NAMES.length; dbIdx++) { + String db = DB_NAMES[dbIdx]; + int rgToCreate = basePerDb + (dbIdx < remainder ? 1 : 0); + List perDbList = new ArrayList<>(); + dbAllocatedMap.put(db, perDbList); + + for (int i = 0; i < rgToCreate; i++) { + TRegionReplicaSet rs = + GCR_ALLOCATOR.generateOptimalRegionReplicasDistribution( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + globalAllocatedList, + perDbList, + DATA_REPLICATION_FACTOR, + new TConsensusGroupId(TConsensusGroupType.DataRegion, globalIndex++)); + globalAllocatedList.add(rs); + perDbList.add(rs); + } + } + + // Identify the replica‑sets that contain the node to be removed + List impactedReplicas = + globalAllocatedList.stream() + .filter(rs -> rs.getDataNodeLocations().contains(REMOVE_DATANODE_LOCATION)) + .collect(Collectors.toList()); + + // Simulate removing the faulty/offline node + AVAILABLE_DATA_NODE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId()); + FREE_SPACE_MAP.remove(REMOVE_DATANODE_LOCATION.getDataNodeId()); + + List remainReplicas = new ArrayList<>(); + for (TRegionReplicaSet rs : impactedReplicas) { + globalAllocatedList.remove(rs); + rs.getDataNodeLocations().remove(REMOVE_DATANODE_LOCATION); + globalAllocatedList.add(rs); + remainReplicas.add(rs); + } + + // Build helper maps for removeNodeReplicaSelect + Map remainMap = new HashMap<>(); + remainReplicas.forEach(r -> remainMap.put(r.getRegionId(), r)); + + Map regionDbMap = new HashMap<>(); + dbAllocatedMap.forEach((db, list) -> list.forEach(r -> regionDbMap.put(r.getRegionId(), db))); + + // Baseline: random selection for comparison + Map rndCount = new HashMap<>(); + Map planCount = new HashMap<>(); + Set rndNodes = new HashSet<>(); + Set planNodes = new HashSet<>(); + int rndMax = 0, rndMin = Integer.MAX_VALUE; + int planMax = 0, planMin = Integer.MAX_VALUE; + + AVAILABLE_DATA_NODE_MAP + .keySet() + .forEach( + n -> { + rndCount.put(n, 0); + planCount.put(n, 0); + }); + + for (TRegionReplicaSet replicaSet : globalAllocatedList) { + for (TDataNodeLocation loc : replicaSet.getDataNodeLocations()) { + rndCount.merge(loc.getDataNodeId(), 1, Integer::sum); + planCount.merge(loc.getDataNodeId(), 1, Integer::sum); + } + } + + for (TRegionReplicaSet r : remainReplicas) { + TDataNodeLocation pick = randomSelectNodeForRegion(r.getDataNodeLocations()).get(); + LOGGER.info("Random Selected DataNode {} for Region {}", pick.getDataNodeId(), r.regionId); + rndNodes.add(pick.getDataNodeId()); + rndCount.merge(pick.getDataNodeId(), 1, Integer::sum); + } + + LOGGER.info("Remain Replicas... :"); + for (TRegionReplicaSet remainReplicaSet : remainReplicas) { + LOGGER.info("Region Group Id: {}", remainReplicaSet.regionId.id); + List dataNodeLocations = remainReplicaSet.getDataNodeLocations(); + for (TDataNodeLocation dataNodeLocation : dataNodeLocations) { + LOGGER.info("DataNode: {}", dataNodeLocation.getDataNodeId()); + } + } + + // Call the method under test + Map result = + GCR_ALLOCATOR.removeNodeReplicaSelect( + AVAILABLE_DATA_NODE_MAP, + FREE_SPACE_MAP, + globalAllocatedList, + regionDbMap, + dbAllocatedMap, + remainMap); + + for (TConsensusGroupId regionId : result.keySet()) { + TDataNodeConfiguration selectedNode = result.get(regionId); + + LOGGER.info( + "GCR Selected DataNode {} for Region {}", + selectedNode.getLocation().getDataNodeId(), + regionId); + planNodes.add(selectedNode.getLocation().getDataNodeId()); + planCount.merge(selectedNode.getLocation().getDataNodeId(), 1, Integer::sum); + } + + // Calculate load distribution + for (int c : rndCount.values()) { + rndMax = Math.max(rndMax, c); + rndMin = Math.min(rndMin, c); + } + for (int c : planCount.values()) { + planMax = Math.max(planMax, c); + planMin = Math.min(planMin, c); + } + + // Assertions + Assert.assertEquals(TEST_DATA_NODE_NUM - 1, planNodes.size()); + Assert.assertTrue(planNodes.size() >= rndNodes.size()); + Assert.assertTrue(rndMax >= planMax); + } + + private Optional randomSelectNodeForRegion( + List regionReplicaNodes) { + List dataNodeConfigurations = + new ArrayList<>(AVAILABLE_DATA_NODE_MAP.values()); + // Randomly selected to ensure a basic load balancing + Collections.shuffle(dataNodeConfigurations); + return dataNodeConfigurations.stream() + .map(TDataNodeConfiguration::getLocation) + .filter(e -> !regionReplicaNodes.contains(e)) + .findAny(); + } +}