Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.hadoop.hdds.scm.storage;

import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.EOFException;
Expand All @@ -32,7 +30,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
Expand Down Expand Up @@ -295,10 +292,7 @@ private void setPipeline(Pipeline pipeline) throws IOException {
boolean okForRead =
pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
|| pipeline.getType() == HddsProtos.ReplicationType.EC;
Pipeline readPipeline = okForRead ? pipeline : Pipeline.newBuilder(pipeline)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
getLegacyFactor(pipeline.getReplicationConfig())))
.build();
Pipeline readPipeline = okForRead ? pipeline : pipeline.copyForRead();
pipelineRef.set(readPipeline);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.DatanodeID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
Expand All @@ -49,6 +51,7 @@
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto2Codec;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -425,8 +428,32 @@ public Pipeline copyWithNodesInOrder(List<? extends DatanodeDetails> nodes) {
return toBuilder().setNodesInOrder(nodes).build();
}

public Pipeline copyForRead() {
if (replicationConfig.getReplicationType() == ReplicationType.STAND_ALONE) {
return this;
}

HddsProtos.ReplicationFactor factor = replicationConfig instanceof ReplicatedReplicationConfig
? ((ReplicatedReplicationConfig) replicationConfig).getReplicationFactor()
: HddsProtos.ReplicationFactor.ONE;

return toBuilder()
.setReplicationConfig(StandaloneReplicationConfig.getInstance(factor))
.build();
}

public Pipeline copyForReadFromNode(DatanodeDetails node) {
Preconditions.assertTrue(nodeStatus.containsKey(node), () -> node + " is not part of the pipeline " + id.getId());

return toBuilder()
.setNodes(Collections.singletonList(node))
.setReplicaIndexes(Collections.singletonMap(node, getReplicaIndex(node)))
.setReplicationConfig(StandaloneReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE))
.build();
}

public Builder toBuilder() {
return newBuilder(this);
return new Builder(this);
}

public static Builder toBuilder(HddsProtos.Pipeline pipeline) {
Expand Down Expand Up @@ -465,7 +492,8 @@ public static Builder toBuilder(HddsProtos.Pipeline pipeline) {
final ReplicationConfig config = ReplicationConfig
.fromProto(pipeline.getType(), pipeline.getFactor(),
pipeline.getEcReplicationConfig());
return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId()))
return newBuilder()
.setId(PipelineID.getFromProtobuf(pipeline.getId()))
.setReplicationConfig(config)
.setState(PipelineState.fromProtobuf(pipeline.getState()))
.setNodes(new ArrayList<>(nodes.keySet()))
Expand Down Expand Up @@ -531,14 +559,10 @@ public static Builder newBuilder() {
return new Builder();
}

public static Builder newBuilder(Pipeline pipeline) {
return new Builder(pipeline);
}

/**
* Builder class for Pipeline.
*/
public static class Builder {
public static final class Builder {
private PipelineID id = null;
private ReplicationConfig replicationConfig = null;
private PipelineState state = null;
Expand All @@ -550,9 +574,9 @@ public static class Builder {
private DatanodeID suggestedLeaderId = null;
private Map<DatanodeDetails, Integer> replicaIndexes = ImmutableMap.of();

public Builder() { }
private Builder() { }

public Builder(Pipeline pipeline) {
private Builder(Pipeline pipeline) {
this.id = pipeline.id;
this.replicationConfig = pipeline.replicationConfig;
this.state = pipeline.state;
Expand Down Expand Up @@ -599,8 +623,20 @@ public Builder setLeaderId(DatanodeID leaderId1) {
}

public Builder setNodes(List<DatanodeDetails> nodes) {
this.nodeStatus = new LinkedHashMap<>();
nodes.forEach(node -> nodeStatus.put(node, -1L));
Map<DatanodeDetails, Long> newNodeStatus = new LinkedHashMap<>();
nodes.forEach(node -> newNodeStatus.put(node, -1L));

// replace pipeline ID if nodes are not the same
if (nodeStatus != null && !nodeStatus.keySet().equals(newNodeStatus.keySet())) {
if (nodes.size() == 1) {
setId(nodes.iterator().next().getID());
} else {
setId(PipelineID.randomId());
}
}

nodeStatus = newNodeStatus;

if (nodesInOrder != null) {
// nodesInOrder may belong to another pipeline, avoid overwriting it
nodesInOrder = new LinkedList<>(nodesInOrder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,22 @@

package org.apache.hadoop.hdds.scm.pipeline;

import static java.util.Collections.singletonList;
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.ALL_PORTS;
import static org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name.V0_PORTS;
import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
import static org.apache.hadoop.hdds.protocol.TestDatanodeDetails.assertPorts;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static org.apache.hadoop.ozone.ClientVersion.DEFAULT_VERSION;
import static org.apache.hadoop.ozone.ClientVersion.VERSION_HANDLES_UNKNOWN_DN_PORTS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -91,14 +98,11 @@ public void testECPipelineIsAlwaysHealthy() {
@Test
public void testBuilderCopiesAllFieldsFromOtherPipeline() {
Pipeline original = MockPipeline.createEcPipeline();
Pipeline copied = Pipeline.newBuilder(original).build();
Pipeline copied = original.toBuilder().build();
assertEquals(original.getId(), copied.getId());
assertEquals(original.getReplicationConfig(),
copied.getReplicationConfig());
assertEquals(original.getPipelineState(), copied.getPipelineState());
assertEquals(original.getId(), copied.getId());
assertEquals(original.getId(), copied.getId());
assertEquals(original.getId(), copied.getId());
assertEquals(original.getNodeSet(), copied.getNodeSet());
assertEquals(original.getNodesInOrder(), copied.getNodesInOrder());
assertEquals(original.getLeaderId(), copied.getLeaderId());
Expand All @@ -111,4 +115,36 @@ public void testBuilderCopiesAllFieldsFromOtherPipeline() {
copied.getReplicaIndex(dn));
}
}

@Test
void idChangedIfNodesReplaced() {
Pipeline original = MockPipeline.createRatisPipeline();

Pipeline withDifferentNodes = original.toBuilder()
.setNodes(Arrays.asList(randomDatanodeDetails(), randomDatanodeDetails(), randomDatanodeDetails()))
.build();

assertNotEquals(original.getId(), withDifferentNodes.getId());
withDifferentNodes.getNodes()
.forEach(node -> assertNotEquals(node.getID().toPipelineID(), withDifferentNodes.getId()));
}

@Test
void testCopyForReadFromNode() {
Pipeline subject = MockPipeline.createRatisPipeline();
DatanodeDetails node = subject.getNodes().iterator().next();

Pipeline copy = subject.copyForReadFromNode(node);

assertEquals(singletonList(node), copy.getNodes());
assertEquals(node.getID().toPipelineID(), copy.getId());
assertEquals(subject.getReplicaIndex(node), copy.getReplicaIndex(node));
assertEquals(StandaloneReplicationConfig.getInstance(ONE), copy.getReplicationConfig());
}

@Test
void testCopyForReadFromNodeRejectsUnknownNode() {
Pipeline subject = MockPipeline.createRatisPipeline();
assertThrows(IllegalStateException.class, () -> subject.copyForReadFromNode(randomDatanodeDetails()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ Pipeline updatePipelineState(PipelineID pipelineID, PipelineState state)
return pipeline;
}
Pipeline updatedPipeline = pipelineMap.compute(pipelineID,
(id, p) -> Pipeline.newBuilder(pipeline).setState(state).build());
(id, p) -> pipeline.toBuilder().setState(state).build());

List<Pipeline> pipelineList =
query2OpenPipelines.get(pipeline.getReplicationConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ public void testQueryPipeline() throws IOException, TimeoutException {

Pipeline pipeline2 = createDummyPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, 3);
pipeline2 = Pipeline.newBuilder(pipeline2)
pipeline2 = pipeline2.toBuilder()
.setState(Pipeline.PipelineState.OPEN)
.build();
HddsProtos.Pipeline pipelineProto2 = pipeline2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ private List<ContainerReplicaInfo> getReplicas(boolean includeIndex) {
}

private ContainerWithPipeline getContainerWithPipeline(long containerID) {
Pipeline pipeline = new Pipeline.Builder()
Pipeline pipeline = Pipeline.newBuilder()
.setState(Pipeline.PipelineState.CLOSED)
.setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
.setId(PipelineID.randomId())
Expand All @@ -349,7 +349,7 @@ private ContainerWithPipeline getContainerWithPipeline(long containerID) {
}

private ContainerWithPipeline getECContainerWithPipeline() {
Pipeline pipeline = new Pipeline.Builder()
Pipeline pipeline = Pipeline.newBuilder()
.setState(Pipeline.PipelineState.CLOSED)
.setReplicationConfig(new ECReplicationConfig(3, 2))
.setId(PipelineID.randomId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private List<Pipeline> createPipelines() {

private Pipeline createPipeline(ReplicationConfig repConfig,
Pipeline.PipelineState state) {
return new Pipeline.Builder()
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setCreateTimestamp(System.currentTimeMillis())
.setState(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private List<Pipeline> createPipelines() {

private Pipeline createPipeline(ReplicationConfig repConfig,
Pipeline.PipelineState state) {
return new Pipeline.Builder()
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setCreateTimestamp(System.currentTimeMillis())
.setState(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected List<ContainerProtos.ChunkInfo> getChunkInfos(OmKeyLocationInfo
}
}

pipeline = Pipeline.newBuilder(pipeline)
pipeline = pipeline.toBuilder()
.setReplicationConfig(StandaloneReplicationConfig
.getInstance(HddsProtos.ReplicationFactor.THREE))
.setNodes(nodes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
Expand Down Expand Up @@ -73,15 +70,8 @@ protected List<ContainerProtos.ChunkInfo> getChunkInfos(
// irrespective of the container state, we will always read via Standalone
// protocol.
Token<OzoneBlockTokenIdentifier> token = keyLocationInfo.getToken();
Pipeline pipeline = keyLocationInfo.getPipeline();
Pipeline pipeline = keyLocationInfo.getPipeline().copyForRead();
BlockID blockID = keyLocationInfo.getBlockID();
if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
pipeline = Pipeline.newBuilder(pipeline)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
ReplicationConfig
.getLegacyFactor(pipeline.getReplicationConfig())))
.build();
}

List<ContainerProtos.ChunkInfo> chunks;
XceiverClientSpi xceiverClientSpi = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
import org.apache.hadoop.hdds.scm.storage.MultipartInputStream;
import org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
Expand Down Expand Up @@ -1579,11 +1578,7 @@ public OzoneInputStream getKey(
List<DatanodeDetails> datanodes = pipelineBefore.getNodes();

for (DatanodeDetails dn : datanodes) {
List<DatanodeDetails> nodes = new ArrayList<>();
nodes.add(dn);
Pipeline pipeline
= new Pipeline.Builder(pipelineBefore).setNodes(nodes)
.setId(PipelineID.randomId()).build();
Pipeline pipeline = pipelineBefore.copyForReadFromNode(dn);
long length = replicationConfig instanceof ECReplicationConfig
? ECBlockInputStream.internalBlockLength(pipelineBefore.getReplicaIndex(dn),
(ECReplicationConfig) replicationConfig, locationInfo.getLength())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@

package org.apache.hadoop.ozone.freon;

import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;

import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.XceiverClientCreator;
Expand Down Expand Up @@ -144,10 +141,7 @@ public Void call() throws Exception {
LOG.warn("Read only is not set to true for GRPC, setting it to true");
readOnly = true;
}
pipeline = Pipeline.newBuilder(pipeline)
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
getLegacyFactor(pipeline.getReplicationConfig())))
.build();
pipeline = pipeline.copyForRead();
}
encodedContainerToken = scmClient.getEncodedContainerToken(containerID);
XceiverClientFactory xceiverClientManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

package org.apache.hadoop.ozone.debug.replicas;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;

import java.io.IOException;
import java.util.Collections;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
Expand Down Expand Up @@ -51,16 +47,10 @@ public BlockExistenceVerifier(OzoneConfiguration conf) throws IOException {
}

@Override
public BlockVerificationResult verifyBlock(DatanodeDetails datanode, OmKeyLocationInfo keyLocation,
int replicaIndex) {
public BlockVerificationResult verifyBlock(DatanodeDetails datanode, OmKeyLocationInfo keyLocation) {
XceiverClientSpi client = null;
try {
Pipeline pipeline = Pipeline.newBuilder(keyLocation.getPipeline())
.setId(datanode.getID())
.setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
.setNodes(Collections.singletonList(datanode))
.setReplicaIndexes(Collections.singletonMap(datanode, replicaIndex))
.build();
Pipeline pipeline = keyLocation.getPipeline().copyForReadFromNode(datanode);

client = xceiverClientManager.acquireClientForReadData(pipeline);
ContainerProtos.GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
Expand Down
Loading