Skip to content

Commit ad1617d

Browse files
timmylichengxiaoyuyao
authored andcommitted
HDDS-1574 Average out pipeline allocation on datanodes and add metrcs/test (#291)
1 parent 01eb8cd commit ad1617d

14 files changed

Lines changed: 339 additions & 20 deletions

File tree

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public final class Pipeline {
5757
private UUID leaderId;
5858
// Timestamp for pipeline upon creation
5959
private Long creationTimestamp;
60+
// Only valid for Ratis THREE pipeline. No need persist.
61+
private int nodeIdsHash;
6062

6163
/**
6264
* The immutable properties of pipeline object is used in
@@ -72,6 +74,7 @@ private Pipeline(PipelineID id, ReplicationType type,
7274
this.state = state;
7375
this.nodeStatus = nodeStatus;
7476
this.creationTimestamp = System.currentTimeMillis();
77+
this.nodeIdsHash = 0;
7578
}
7679

7780
/**
@@ -128,6 +131,14 @@ void setCreationTimestamp(Long creationTimestamp) {
128131
this.creationTimestamp = creationTimestamp;
129132
}
130133

134+
public int getNodeIdsHash() {
135+
return nodeIdsHash;
136+
}
137+
138+
void setNodeIdsHash(int nodeIdsHash) {
139+
this.nodeIdsHash = nodeIdsHash;
140+
}
141+
131142
/**
132143
* Return the pipeline leader's UUID.
133144
*
@@ -328,6 +339,7 @@ public static class Builder {
328339
private List<DatanodeDetails> nodesInOrder = null;
329340
private UUID leaderId = null;
330341
private Long creationTimestamp = null;
342+
private int nodeIdsHash = 0;
331343

332344
public Builder() {}
333345

@@ -340,6 +352,7 @@ public Builder(Pipeline pipeline) {
340352
this.nodesInOrder = pipeline.nodesInOrder.get();
341353
this.leaderId = pipeline.getLeaderId();
342354
this.creationTimestamp = pipeline.getCreationTimestamp();
355+
this.nodeIdsHash = 0;
343356
}
344357

345358
public Builder setId(PipelineID id1) {
@@ -378,6 +391,11 @@ public Builder setNodesInOrder(List<Integer> orders) {
378391
return this;
379392
}
380393

394+
public Builder setNodeIdsHash(int nodeIdsHash1) {
395+
this.nodeIdsHash = nodeIdsHash1;
396+
return this;
397+
}
398+
381399
public Pipeline build() {
382400
Preconditions.checkNotNull(id);
383401
Preconditions.checkNotNull(type);
@@ -386,6 +404,7 @@ public Pipeline build() {
386404
Preconditions.checkNotNull(nodeStatus);
387405
Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
388406
pipeline.setLeaderId(leaderId);
407+
pipeline.setNodeIdsHash(nodeIdsHash);
389408
// overwrite with original creationTimestamp
390409
if (creationTimestamp != null) {
391410
pipeline.setCreationTimestamp(creationTimestamp);

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ List<DatanodeDetails> filterViableNodes(
162162
// filter nodes that meet the size and pipeline engagement criteria.
163163
// Pipeline placement doesn't take node space left into account.
164164
List<DatanodeDetails> healthyList = healthyNodes.stream()
165-
.filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
165+
.filter(d -> meetCriteria(d, nodesRequired))
166166
.collect(Collectors.toList());
167167

168168
if (healthyList.size() < nodesRequired) {
@@ -308,6 +308,7 @@ public DatanodeDetails chooseNode(
308308
}
309309
// the pick is decided and it should be removed from candidates.
310310
healthyNodes.remove(datanodeDetails);
311+
311312
return datanodeDetails;
312313
}
313314

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,13 @@ Pipeline openPipeline(PipelineID pipelineId) throws IOException {
133133
pipeline = pipelineStateMap
134134
.updatePipelineState(pipelineId, PipelineState.OPEN);
135135
}
136+
// Amend nodeIdsHash if needed.
137+
if (pipeline.getType() == ReplicationType.RATIS &&
138+
pipeline.getFactor() == ReplicationFactor.THREE &&
139+
pipeline.getNodeIdsHash() == 0) {
140+
pipeline.setNodeIdsHash(RatisPipelineUtils
141+
.encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes()));
142+
}
136143
return pipeline;
137144
}
138145

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
157157
}
158158

159159
List<DatanodeDetails> dns;
160+
int nodeIdHash = 0;
160161

161162
switch(factor) {
162163
case ONE:
@@ -165,6 +166,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
165166
case THREE:
166167
dns = placementPolicy.chooseDatanodes(null,
167168
null, factor.getNumber(), 0);
169+
nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(dns);
168170
break;
169171
default:
170172
throw new IllegalStateException("Unknown factor: " + factor.name());
@@ -176,6 +178,7 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
176178
.setType(ReplicationType.RATIS)
177179
.setFactor(factor)
178180
.setNodes(dns)
181+
.setNodeIdsHash(nodeIdHash)
179182
.build();
180183

181184
// Send command to datanodes to create pipeline
@@ -197,12 +200,17 @@ public Pipeline create(ReplicationFactor factor) throws IOException {
197200
@Override
198201
public Pipeline create(ReplicationFactor factor,
199202
List<DatanodeDetails> nodes) {
203+
int nodeIdHash = 0;
204+
if (factor == ReplicationFactor.THREE) {
205+
nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes);
206+
}
200207
return Pipeline.newBuilder()
201208
.setId(PipelineID.randomId())
202209
.setState(PipelineState.ALLOCATED)
203210
.setType(ReplicationType.RATIS)
204211
.setFactor(factor)
205212
.setNodes(nodes)
213+
.setNodeIdsHash(nodeIdHash)
206214
.build();
207215
}
208216

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
package org.apache.hadoop.hdds.scm.pipeline;
1919

2020
import java.io.IOException;
21+
import java.util.List;
22+
import java.util.stream.Collectors;
2123

2224
import org.apache.hadoop.conf.Configuration;
2325
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
26+
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
2427
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
2528
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
2629
import org.apache.hadoop.hdds.ratis.RatisHelper;
@@ -35,7 +38,6 @@
3538
import org.slf4j.Logger;
3639
import org.slf4j.LoggerFactory;
3740

38-
3941
/**
4042
* Utility class for Ratis pipelines. Contains methods to create and destroy
4143
* ratis pipelines.
@@ -100,4 +102,38 @@ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
100102
true, p.getId());
101103
}
102104
}
105+
106+
static int encodeNodeIdsOfFactorThreePipeline(List<DatanodeDetails> nodes) {
107+
if (nodes.size() != HddsProtos.ReplicationFactor.THREE.getNumber()) {
108+
return 0;
109+
}
110+
return nodes.get(0).getUuid().hashCode() ^
111+
nodes.get(1).getUuid().hashCode() ^
112+
nodes.get(2).getUuid().hashCode();
113+
}
114+
115+
/**
116+
* Return first existed pipeline which share the same set of datanodes
117+
* with the input pipeline.
118+
* @param stateManager PipelineStateManager
119+
* @param pipeline input pipeline
120+
* @return first matched pipeline
121+
*/
122+
static Pipeline checkPipelineContainSameDatanodes(
123+
PipelineStateManager stateManager, Pipeline pipeline) {
124+
List<Pipeline> matchedPipelines = stateManager.getPipelines(
125+
HddsProtos.ReplicationType.RATIS,
126+
HddsProtos.ReplicationFactor.THREE)
127+
.stream().filter(p -> !p.getId().equals(pipeline.getId()) &&
128+
(// For all OPEN or ALLOCATED pipelines
129+
p.getPipelineState() == Pipeline.PipelineState.OPEN ||
130+
p.getPipelineState() == Pipeline.PipelineState.ALLOCATED) &&
131+
p.getNodeIdsHash() == pipeline.getNodeIdsHash())
132+
.collect(Collectors.toList());
133+
if (matchedPipelines.size() == 0) {
134+
return null;
135+
} else {
136+
return matchedPipelines.stream().findFirst().get();
137+
}
138+
}
103139
}

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,19 @@ public void setPipelineProvider(ReplicationType replicationType,
128128
pipelineFactory.setProvider(replicationType, provider);
129129
}
130130

131+
private int computeNodeIdHash(Pipeline pipeline) {
132+
if (pipeline.getType() != ReplicationType.RATIS) {
133+
return 0;
134+
}
135+
136+
if (pipeline.getFactor() != ReplicationFactor.THREE) {
137+
return 0;
138+
}
139+
140+
return RatisPipelineUtils.
141+
encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes());
142+
}
143+
131144
private void initializePipelineState() throws IOException {
132145
if (pipelineStore.isEmpty()) {
133146
LOG.info("No pipeline exists in current db");
@@ -143,6 +156,7 @@ private void initializePipelineState() throws IOException {
143156
Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
144157
HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
145158
Preconditions.checkNotNull(pipeline);
159+
pipeline.setNodeIdsHash(computeNodeIdHash(pipeline));
146160
stateManager.addPipeline(pipeline);
147161
nodeManager.addPipeline(pipeline);
148162
}
@@ -163,6 +177,18 @@ public synchronized Pipeline createPipeline(ReplicationType type,
163177
metrics.incNumPipelineCreated();
164178
metrics.createPerPipelineMetrics(pipeline);
165179
}
180+
Pipeline overlapPipeline = RatisPipelineUtils
181+
.checkPipelineContainSameDatanodes(stateManager, pipeline);
182+
if (overlapPipeline != null) {
183+
metrics.incNumPipelineContainSameDatanodes();
184+
//TODO remove until pipeline allocation is proved equally distributed.
185+
LOG.info("Pipeline: " + pipeline.getId().toString() +
186+
" contains same datanodes as previous pipeline: " +
187+
overlapPipeline.getId().toString() + " nodeIds: " +
188+
pipeline.getNodes().get(0).getUuid().toString() +
189+
", " + pipeline.getNodes().get(1).getUuid().toString() +
190+
", " + pipeline.getNodes().get(2).getUuid().toString());
191+
}
166192
return pipeline;
167193
} catch (IOException ex) {
168194
metrics.incNumPipelineCreationFailed();

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
5454
private @Metric MutableCounterLong numPipelineDestroyFailed;
5555
private @Metric MutableCounterLong numPipelineReportProcessed;
5656
private @Metric MutableCounterLong numPipelineReportProcessingFailed;
57+
private @Metric MutableCounterLong numPipelineContainSameDatanodes;
5758
private Map<PipelineID, MutableCounterLong> numBlocksAllocated;
5859

5960
/** Private constructor. */
@@ -92,6 +93,7 @@ public void getMetrics(MetricsCollector collector, boolean all) {
9293
numPipelineDestroyFailed.snapshot(recordBuilder, true);
9394
numPipelineReportProcessed.snapshot(recordBuilder, true);
9495
numPipelineReportProcessingFailed.snapshot(recordBuilder, true);
96+
numPipelineContainSameDatanodes.snapshot(recordBuilder, true);
9597
numBlocksAllocated
9698
.forEach((pid, metric) -> metric.snapshot(recordBuilder, true));
9799
}
@@ -176,4 +178,11 @@ void incNumPipelineReportProcessed() {
176178
void incNumPipelineReportProcessingFailed() {
177179
numPipelineReportProcessingFailed.incr();
178180
}
181+
182+
/**
183+
* Increments number of pipeline who contains same set of datanodes.
184+
*/
185+
void incNumPipelineContainSameDatanodes() {
186+
numPipelineContainSameDatanodes.incr();
187+
}
179188
}

hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.hadoop.hdds.scm.container;
1818

19-
import org.apache.hadoop.conf.Configuration;
19+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
2020
import org.apache.hadoop.hdds.protocol.proto
2121
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
2222
import org.apache.hadoop.hdds.scm.TestUtils;
@@ -93,16 +93,17 @@ public class MockNodeManager implements NodeManager {
9393
private NetworkTopology clusterMap;
9494
private ConcurrentMap<String, Set<String>> dnsToUuidMap;
9595

96-
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
96+
public MockNodeManager(NetworkTopologyImpl clusterMap,
97+
boolean initializeFakeNodes, int nodeCount) {
9798
this.healthyNodes = new LinkedList<>();
9899
this.staleNodes = new LinkedList<>();
99100
this.deadNodes = new LinkedList<>();
100101
this.nodeMetricMap = new HashMap<>();
101102
this.node2PipelineMap = new Node2PipelineMap();
102103
this.node2ContainerMap = new Node2ContainerMap();
103104
this.dnsToUuidMap = new ConcurrentHashMap<>();
104-
aggregateStat = new SCMNodeStat();
105-
clusterMap = new NetworkTopologyImpl(new Configuration());
105+
this.aggregateStat = new SCMNodeStat();
106+
this.clusterMap = clusterMap;
106107
if (initializeFakeNodes) {
107108
for (int x = 0; x < nodeCount; x++) {
108109
DatanodeDetails dd = TestUtils.randomDatanodeDetails();
@@ -114,6 +115,11 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
114115
this.commandMap = new HashMap<>();
115116
}
116117

118+
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
119+
this(new NetworkTopologyImpl(new OzoneConfiguration()),
120+
initializeFakeNodes, nodeCount);
121+
}
122+
117123
/**
118124
* Invoked from ctor to create some node Metrics.
119125
*

hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hadoop.hdds.HddsConfigKeys;
2525
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2626
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
27+
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
2728
import org.apache.hadoop.hdds.scm.TestUtils;
2829
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
2930
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
@@ -67,13 +68,14 @@ public static void setUp() throws Exception {
6768
.getTestDir(TestCloseContainerEventHandler.class.getSimpleName());
6869
configuration
6970
.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
71+
configuration.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT, 16);
7072
nodeManager = new MockNodeManager(true, 10);
7173
eventQueue = new EventQueue();
7274
pipelineManager =
7375
new SCMPipelineManager(configuration, nodeManager, eventQueue);
7476
PipelineProvider mockRatisProvider =
7577
new MockRatisPipelineProvider(nodeManager,
76-
pipelineManager.getStateManager(), configuration);
78+
pipelineManager.getStateManager(), configuration, eventQueue);
7779
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
7880
mockRatisProvider);
7981
containerManager = new
@@ -93,6 +95,9 @@ public static void tearDown() throws Exception {
9395
if (containerManager != null) {
9496
containerManager.close();
9597
}
98+
if (pipelineManager != null) {
99+
pipelineManager.close();
100+
}
96101
FileUtil.fullyDelete(testDir);
97102
}
98103

hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public Pipeline create(HddsProtos.ReplicationFactor factor)
7373
.setType(initialPipeline.getType())
7474
.setFactor(factor)
7575
.setNodes(initialPipeline.getNodes())
76+
.setNodeIdsHash(RatisPipelineUtils
77+
.encodeNodeIdsOfFactorThreePipeline(initialPipeline.getNodes()))
7678
.build();
7779
}
7880
}
@@ -91,6 +93,8 @@ public Pipeline create(HddsProtos.ReplicationFactor factor,
9193
.setType(HddsProtos.ReplicationType.RATIS)
9294
.setFactor(factor)
9395
.setNodes(nodes)
96+
.setNodeIdsHash(RatisPipelineUtils
97+
.encodeNodeIdsOfFactorThreePipeline(nodes))
9498
.build();
9599
}
96100
}

0 commit comments

Comments
 (0)