Skip to content

Commit de8cf16

Browse files
authored
HDDS-12483. Quasi Closed Stuck should have 2 replicas of each origin (#8014)
1 parent 57a139e commit de8cf16

File tree

12 files changed

+1450
-130
lines changed

12 files changed

+1450
-130
lines changed
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.scm.container.replication;
19+
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.UUID;
28+
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
29+
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
30+
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
31+
32+
/**
33+
* Class to count the replicas in a quasi-closed stuck container.
34+
*/
35+
public class QuasiClosedStuckReplicaCount {
36+
37+
private final Map<UUID, Set<ContainerReplica>> replicasByOrigin = new HashMap<>();
38+
private final Map<UUID, Set<ContainerReplica>> inServiceReplicasByOrigin = new HashMap<>();
39+
private final Map<UUID, Set<ContainerReplica>> maintenanceReplicasByOrigin = new HashMap<>();
40+
private boolean hasOutOfServiceReplicas = false;
41+
private int minHealthyForMaintenance;
42+
private boolean hasHealthyReplicas = false;
43+
44+
public QuasiClosedStuckReplicaCount(Set<ContainerReplica> replicas, int minHealthyForMaintenance) {
45+
this.minHealthyForMaintenance = minHealthyForMaintenance;
46+
for (ContainerReplica r : replicas) {
47+
if (r.getState() != StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.UNHEALTHY) {
48+
hasHealthyReplicas = true;
49+
}
50+
replicasByOrigin.computeIfAbsent(r.getOriginDatanodeId(), k -> new HashSet<>()).add(r);
51+
HddsProtos.NodeOperationalState opState = r.getDatanodeDetails().getPersistedOpState();
52+
if (opState == HddsProtos.NodeOperationalState.IN_SERVICE) {
53+
inServiceReplicasByOrigin.computeIfAbsent(r.getOriginDatanodeId(), k -> new HashSet<>()).add(r);
54+
} else if (opState == HddsProtos.NodeOperationalState.IN_MAINTENANCE
55+
|| opState == HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE) {
56+
maintenanceReplicasByOrigin.computeIfAbsent(r.getOriginDatanodeId(), k -> new HashSet<>()).add(r);
57+
hasOutOfServiceReplicas = true;
58+
} else {
59+
hasOutOfServiceReplicas = true;
60+
}
61+
}
62+
}
63+
64+
public int availableOrigins() {
65+
return replicasByOrigin.size();
66+
}
67+
68+
public boolean hasOutOfServiceReplicas() {
69+
return hasOutOfServiceReplicas;
70+
}
71+
72+
public boolean hasHealthyReplicas() {
73+
return hasHealthyReplicas;
74+
}
75+
76+
public boolean isUnderReplicated() {
77+
return !getUnderReplicatedReplicas().isEmpty();
78+
}
79+
80+
public List<MisReplicatedOrigin> getUnderReplicatedReplicas() {
81+
List<MisReplicatedOrigin> misReplicatedOrigins = new ArrayList<>();
82+
83+
if (replicasByOrigin.size() == 1) {
84+
Map.Entry<UUID, Set<ContainerReplica>> entry = replicasByOrigin.entrySet().iterator().next();
85+
Set<ContainerReplica> inService = inServiceReplicasByOrigin.get(entry.getKey());
86+
if (inService == null) {
87+
inService = Collections.emptySet();
88+
}
89+
Set<ContainerReplica> maintenance = maintenanceReplicasByOrigin.get(entry.getKey());
90+
int maintenanceCount = maintenance == null ? 0 : maintenance.size();
91+
92+
if (maintenanceCount > 0) {
93+
if (inService.size() < minHealthyForMaintenance) {
94+
int additionalReplicas = minHealthyForMaintenance - inService.size();
95+
misReplicatedOrigins.add(new MisReplicatedOrigin(entry.getValue(), additionalReplicas));
96+
}
97+
} else {
98+
if (inService.size() < 3) {
99+
int additionalReplicas = 3 - inService.size();
100+
misReplicatedOrigins.add(new MisReplicatedOrigin(entry.getValue(), additionalReplicas));
101+
}
102+
}
103+
return misReplicatedOrigins;
104+
}
105+
106+
// If there are multiple origins, we expect 2 copies of each origin
107+
// For maintenance, we expect 1 copy of each origin and ignore the minHealthyForMaintenance parameter
108+
for (Map.Entry<UUID, Set<ContainerReplica>> entry : replicasByOrigin.entrySet()) {
109+
Set<ContainerReplica> inService = inServiceReplicasByOrigin.get(entry.getKey());
110+
if (inService == null) {
111+
inService = Collections.emptySet();
112+
}
113+
Set<ContainerReplica> maintenance = maintenanceReplicasByOrigin.get(entry.getKey());
114+
int maintenanceCount = maintenance == null ? 0 : maintenance.size();
115+
116+
if (inService.size() < 2) {
117+
if (maintenanceCount > 0) {
118+
if (inService.isEmpty()) {
119+
// We need 1 copy online for maintenance
120+
misReplicatedOrigins.add(new MisReplicatedOrigin(entry.getValue(), 1));
121+
}
122+
} else {
123+
misReplicatedOrigins.add(new MisReplicatedOrigin(entry.getValue(), 2 - inService.size()));
124+
}
125+
}
126+
}
127+
return misReplicatedOrigins;
128+
}
129+
130+
/**
131+
* Returns True is the container is over-replicated. This means that if we have a single origin, there are more than
132+
* 3 copies. If we have multiple origins, there are more than 2 copies of each origin.
133+
* The over replication check ignore maintenance replicas. The container may become over replicated when maintenance
134+
* ends.
135+
*
136+
* @return True if the container is over-replicated, otherwise false
137+
*/
138+
public boolean isOverReplicated() {
139+
return !getOverReplicatedOrigins().isEmpty();
140+
}
141+
142+
public List<MisReplicatedOrigin> getOverReplicatedOrigins() {
143+
// If there is only a single origin, we expect 3 copies, otherwise we expect 2 copies of each origin
144+
if (replicasByOrigin.size() == 1) {
145+
UUID origin = replicasByOrigin.keySet().iterator().next();
146+
Set<ContainerReplica> inService = inServiceReplicasByOrigin.get(origin);
147+
if (inService != null && inService.size() > 3) {
148+
return Collections.singletonList(new MisReplicatedOrigin(inService, inService.size() - 3));
149+
}
150+
return Collections.emptyList();
151+
}
152+
153+
// If there are multiple origins, we expect 2 copies of each origin
154+
List<MisReplicatedOrigin> overReplicatedOrigins = new ArrayList<>();
155+
for (UUID origin : replicasByOrigin.keySet()) {
156+
Set<ContainerReplica> replicas = inServiceReplicasByOrigin.get(origin);
157+
if (replicas != null && replicas.size() > 2) {
158+
overReplicatedOrigins.add(new MisReplicatedOrigin(replicas, replicas.size() - 2));
159+
}
160+
}
161+
// If we have 2 copies or less of each origin, we are not over-replicated
162+
return overReplicatedOrigins;
163+
}
164+
165+
/**
166+
* Class to represent the origin of under replicated replicas and the number of additional replicas required.
167+
*/
168+
public static class MisReplicatedOrigin {
169+
170+
private final Set<ContainerReplica> sources;
171+
private final int replicaDelta;
172+
173+
public MisReplicatedOrigin(Set<ContainerReplica> sources, int replicaDelta) {
174+
this.sources = sources;
175+
this.replicaDelta = replicaDelta;
176+
}
177+
178+
public Set<ContainerReplica> getSources() {
179+
return sources;
180+
}
181+
182+
public int getReplicaDelta() {
183+
return replicaDelta;
184+
}
185+
}
186+
187+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.hdds.scm.container.replication;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Set;
25+
import java.util.stream.Collectors;
26+
import org.apache.hadoop.hdds.conf.ConfigurationSource;
27+
import org.apache.hadoop.hdds.conf.StorageUnit;
28+
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
29+
import org.apache.hadoop.hdds.scm.PlacementPolicy;
30+
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
31+
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
32+
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
33+
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
34+
import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
/**
39+
* Class to correct under replicated QuasiClosed Stuck Ratis containers.
40+
*/
41+
public class QuasiClosedStuckUnderReplicationHandler implements UnhealthyReplicationHandler {
42+
public static final Logger LOG = LoggerFactory.getLogger(QuasiClosedStuckUnderReplicationHandler.class);
43+
44+
private final PlacementPolicy placementPolicy;
45+
private final ReplicationManager replicationManager;
46+
private final long currentContainerSize;
47+
private final ReplicationManagerMetrics metrics;
48+
49+
public QuasiClosedStuckUnderReplicationHandler(final PlacementPolicy placementPolicy,
50+
final ConfigurationSource conf, final ReplicationManager replicationManager) {
51+
this.placementPolicy = placementPolicy;
52+
this.currentContainerSize = (long) conf.getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
53+
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
54+
this.replicationManager = replicationManager;
55+
this.metrics = replicationManager.getMetrics();
56+
}
57+
58+
@Override
59+
public int processAndSendCommands(Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
60+
ContainerHealthResult result, int remainingMaintenanceRedundancy) throws IOException {
61+
ContainerInfo containerInfo = result.getContainerInfo();
62+
LOG.debug("Handling under replicated QuasiClosed Stuck Ratis container {}", containerInfo);
63+
64+
int pendingAdd = 0;
65+
for (ContainerReplicaOp op : pendingOps) {
66+
if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) {
67+
pendingAdd++;
68+
}
69+
}
70+
71+
if (pendingAdd > 0) {
72+
LOG.debug("Container {} has pending add operations. No more replication will be scheduled until they complete",
73+
containerInfo);
74+
return 0;
75+
}
76+
77+
QuasiClosedStuckReplicaCount replicaCount =
78+
new QuasiClosedStuckReplicaCount(replicas, remainingMaintenanceRedundancy);
79+
80+
List<QuasiClosedStuckReplicaCount.MisReplicatedOrigin> misReplicatedOrigins
81+
= replicaCount.getUnderReplicatedReplicas();
82+
83+
if (misReplicatedOrigins.isEmpty()) {
84+
LOG.debug("Container {} is not under replicated", containerInfo);
85+
return 0;
86+
}
87+
88+
// Schedule Replicas for the under replicated origins.
89+
int totalRequiredReplicas = 0;
90+
int totalCommandsSent = 0;
91+
IOException firstException = null;
92+
List<ContainerReplicaOp> mutablePendingOps = new ArrayList<>(pendingOps);
93+
for (QuasiClosedStuckReplicaCount.MisReplicatedOrigin origin : misReplicatedOrigins) {
94+
totalRequiredReplicas += origin.getReplicaDelta();
95+
List<DatanodeDetails> targets;
96+
try {
97+
targets = getTargets(containerInfo, replicas, origin.getReplicaDelta(), mutablePendingOps);
98+
} catch (SCMException e) {
99+
if (firstException == null) {
100+
firstException = e;
101+
}
102+
LOG.warn("Cannot replicate container {} because no suitable targets were found.", containerInfo, e);
103+
continue;
104+
}
105+
106+
List<DatanodeDetails> sourceDatanodes = origin.getSources().stream()
107+
.map(ContainerReplica::getDatanodeDetails)
108+
.collect(Collectors.toList());
109+
for (DatanodeDetails target : targets) {
110+
try {
111+
replicationManager.sendThrottledReplicationCommand(containerInfo, sourceDatanodes, target, 0);
112+
// Add the pending op, so we exclude the node for subsequent origins
113+
mutablePendingOps.add(ContainerReplicaOp.create(ContainerReplicaOp.PendingOpType.ADD, target, 0));
114+
totalCommandsSent++;
115+
} catch (CommandTargetOverloadedException e) {
116+
LOG.warn("Cannot replicate container {} because all sources are overloaded.", containerInfo);
117+
if (firstException == null) {
118+
firstException = e;
119+
}
120+
}
121+
}
122+
}
123+
124+
if (firstException != null || totalCommandsSent < totalRequiredReplicas) {
125+
// Some commands were not sent as expected (not enough nodes found or overloaded nodes), so we just rethrow
126+
// the first exception we encountered.
127+
LOG.info("A command was not sent for all required new replicas for container {}. Total sent {}, required {} ",
128+
containerInfo, totalCommandsSent, totalRequiredReplicas);
129+
metrics.incrPartialReplicationTotal();
130+
if (firstException != null) {
131+
throw firstException;
132+
} else {
133+
throw new InsufficientDatanodesException(totalRequiredReplicas, totalCommandsSent);
134+
}
135+
}
136+
return totalCommandsSent;
137+
}
138+
139+
private List<DatanodeDetails> getTargets(ContainerInfo containerInfo,
140+
Set<ContainerReplica> replicas, int additionalRequired, List<ContainerReplicaOp> pendingOps) throws IOException {
141+
LOG.debug("Need {} target datanodes for container {}. Current replicas: {}.",
142+
additionalRequired, containerInfo, replicas);
143+
144+
ReplicationManagerUtil.ExcludedAndUsedNodes excludedAndUsedNodes =
145+
ReplicationManagerUtil.getExcludedAndUsedNodes(containerInfo, new ArrayList<>(replicas), Collections.emptySet(),
146+
pendingOps, replicationManager);
147+
148+
List<DatanodeDetails> excluded = excludedAndUsedNodes.getExcludedNodes();
149+
List<DatanodeDetails> used = excludedAndUsedNodes.getUsedNodes();
150+
151+
LOG.debug("UsedList: {}, size {}. ExcludeList: {}, size: {}. ",
152+
used, used.size(), excluded, excluded.size());
153+
154+
return ReplicationManagerUtil.getTargetDatanodes(placementPolicy,
155+
additionalRequired, used, excluded, currentContainerSize, containerInfo);
156+
}
157+
158+
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import org.apache.hadoop.hdds.scm.container.replication.health.MismatchedReplicasHandler;
7272
import org.apache.hadoop.hdds.scm.container.replication.health.OpenContainerHandler;
7373
import org.apache.hadoop.hdds.scm.container.replication.health.QuasiClosedContainerHandler;
74+
import org.apache.hadoop.hdds.scm.container.replication.health.QuasiClosedStuckReplicationCheck;
7475
import org.apache.hadoop.hdds.scm.container.replication.health.RatisReplicationCheckHandler;
7576
import org.apache.hadoop.hdds.scm.container.replication.health.RatisUnhealthyReplicationCheckHandler;
7677
import org.apache.hadoop.hdds.scm.container.replication.health.VulnerableUnhealthyReplicasHandler;
@@ -182,6 +183,7 @@ public class ReplicationManager implements SCMService, ContainerReplicaPendingOp
182183
private final RatisUnderReplicationHandler ratisUnderReplicationHandler;
183184
private final RatisOverReplicationHandler ratisOverReplicationHandler;
184185
private final RatisMisReplicationHandler ratisMisReplicationHandler;
186+
private final QuasiClosedStuckUnderReplicationHandler quasiClosedStuckUnderReplicationHandler;
185187
private Thread underReplicatedProcessorThread;
186188
private Thread overReplicatedProcessorThread;
187189
private final UnderReplicatedProcessor underReplicatedProcessor;
@@ -248,6 +250,8 @@ public ReplicationManager(final ConfigurationSource conf,
248250
new RatisOverReplicationHandler(ratisContainerPlacement, this);
249251
ratisMisReplicationHandler = new RatisMisReplicationHandler(
250252
ratisContainerPlacement, conf, this);
253+
quasiClosedStuckUnderReplicationHandler =
254+
new QuasiClosedStuckUnderReplicationHandler(ratisContainerPlacement, conf, this);
251255
underReplicatedProcessor =
252256
new UnderReplicatedProcessor(this, rmConf::getUnderReplicatedInterval);
253257
overReplicatedProcessor =
@@ -262,6 +266,7 @@ public ReplicationManager(final ConfigurationSource conf,
262266
.addNext(new MismatchedReplicasHandler(this))
263267
.addNext(new EmptyContainerHandler(this))
264268
.addNext(new DeletingContainerHandler(this))
269+
.addNext(new QuasiClosedStuckReplicationCheck())
265270
.addNext(ecReplicationCheckHandler)
266271
.addNext(ratisReplicationCheckHandler)
267272
.addNext(new ClosedWithUnhealthyReplicasHandler(this))
@@ -746,8 +751,15 @@ int processUnderReplicatedContainer(
746751

747752
if (result.getHealthState()
748753
== ContainerHealthResult.HealthState.UNDER_REPLICATED) {
749-
handler = isEC ? ecUnderReplicationHandler
750-
: ratisUnderReplicationHandler;
754+
if (isEC) {
755+
handler = ecUnderReplicationHandler;
756+
} else {
757+
if (QuasiClosedStuckReplicationCheck.shouldHandleAsQuasiClosedStuck(result.getContainerInfo(), replicas)) {
758+
handler = quasiClosedStuckUnderReplicationHandler;
759+
} else {
760+
handler = ratisUnderReplicationHandler;
761+
}
762+
}
751763
} else if (result.getHealthState()
752764
== ContainerHealthResult.HealthState.MIS_REPLICATED) {
753765
handler = isEC ? ecMisReplicationHandler : ratisMisReplicationHandler;

0 commit comments

Comments
 (0)