Skip to content

Commit e3206f4

Browse files
authored
HDDS-7930. input stream does not refresh expired block token. (#4378)
1 parent 0ebb555 commit e3206f4

File tree

15 files changed

+110
-82
lines changed

15 files changed

+110
-82
lines changed

hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class BlockInputStream extends BlockExtendedInputStream {
6464
private final BlockID blockID;
6565
private final long length;
6666
private Pipeline pipeline;
67-
private final Token<OzoneBlockTokenIdentifier> token;
67+
private Token<OzoneBlockTokenIdentifier> token;
6868
private final boolean verifyChecksum;
6969
private XceiverClientFactory xceiverClientFactory;
7070
private XceiverClientSpi xceiverClient;
@@ -103,19 +103,19 @@ public class BlockInputStream extends BlockExtendedInputStream {
103103
// can be reset if a new position is seeked.
104104
private int chunkIndexOfPrevPosition;
105105

106-
private final Function<BlockID, Pipeline> refreshPipelineFunction;
106+
private final Function<BlockID, BlockLocationInfo> refreshFunction;
107107

108108
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
109109
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
110110
XceiverClientFactory xceiverClientFactory,
111-
Function<BlockID, Pipeline> refreshPipelineFunction) {
111+
Function<BlockID, BlockLocationInfo> refreshFunction) {
112112
this.blockID = blockId;
113113
this.length = blockLen;
114114
this.pipeline = pipeline;
115115
this.token = token;
116116
this.verifyChecksum = verifyChecksum;
117117
this.xceiverClientFactory = xceiverClientFactory;
118-
this.refreshPipelineFunction = refreshPipelineFunction;
118+
this.refreshFunction = refreshFunction;
119119
}
120120

121121
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
@@ -150,12 +150,12 @@ public synchronized void initialize() throws IOException {
150150
} catch (SCMSecurityException ex) {
151151
throw ex;
152152
} catch (StorageContainerException ex) {
153-
refreshPipeline(ex);
153+
refreshBlockInfo(ex);
154154
catchEx = ex;
155155
} catch (IOException ex) {
156156
LOG.debug("Retry to get chunk info fail", ex);
157157
if (isConnectivityIssue(ex)) {
158-
refreshPipeline(ex);
158+
refreshBlockInfo(ex);
159159
}
160160
catchEx = ex;
161161
}
@@ -199,17 +199,19 @@ private boolean isConnectivityIssue(IOException ex) {
199199
return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
200200
}
201201

202-
private void refreshPipeline(IOException cause) throws IOException {
202+
private void refreshBlockInfo(IOException cause) throws IOException {
203203
LOG.info("Unable to read information for block {} from pipeline {}: {}",
204204
blockID, pipeline.getId(), cause.getMessage());
205-
if (refreshPipelineFunction != null) {
206-
LOG.debug("Re-fetching pipeline for block {}", blockID);
207-
Pipeline newPipeline = refreshPipelineFunction.apply(blockID);
208-
if (newPipeline == null) {
209-
LOG.debug("No new pipeline for block {}", blockID);
205+
if (refreshFunction != null) {
206+
LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
207+
BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
208+
if (blockLocationInfo == null) {
209+
LOG.debug("No new block location info for block {}", blockID);
210210
} else {
211-
LOG.debug("New pipeline for block {}: {}", blockID, newPipeline);
212-
this.pipeline = newPipeline;
211+
LOG.debug("New block location info for block {}: {}",
212+
blockID, blockLocationInfo);
213+
this.pipeline = blockLocationInfo.getPipeline();
214+
this.token = blockLocationInfo.getToken();
213215
}
214216
} else {
215217
throw cause;
@@ -526,7 +528,7 @@ private void handleReadError(IOException cause) throws IOException {
526528
}
527529
}
528530

529-
refreshPipeline(cause);
531+
refreshBlockInfo(cause);
530532
}
531533

532534
@VisibleForTesting

hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ public interface BlockInputStreamFactory {
4343
* @param token The block Access Token
4444
* @param verifyChecksum Whether to verify checksums or not.
4545
* @param xceiverFactory Factory to create the xceiver in the client
46-
* @param refreshFunction Function to refresh the pipeline if needed
46+
* @param refreshFunction Function to refresh the block location if needed
4747
* @return BlockExtendedInputStream of the correct type.
4848
*/
4949
BlockExtendedInputStream create(ReplicationConfig repConfig,
5050
BlockLocationInfo blockInfo, Pipeline pipeline,
5151
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
5252
XceiverClientFactory xceiverFactory,
53-
Function<BlockID, Pipeline> refreshFunction);
53+
Function<BlockID, BlockLocationInfo> refreshFunction);
5454

5555
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public BlockExtendedInputStream create(ReplicationConfig repConfig,
7878
BlockLocationInfo blockInfo, Pipeline pipeline,
7979
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
8080
XceiverClientFactory xceiverFactory,
81-
Function<BlockID, Pipeline> refreshFunction) {
81+
Function<BlockID, BlockLocationInfo> refreshFunction) {
8282
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
8383
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
8484
blockInfo, verifyChecksum, xceiverFactory, refreshFunction,

hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class ECBlockInputStream extends BlockExtendedInputStream {
6262
private final BlockInputStreamFactory streamFactory;
6363
private final boolean verifyChecksum;
6464
private final XceiverClientFactory xceiverClientFactory;
65-
private final Function<BlockID, Pipeline> refreshFunction;
65+
private final Function<BlockID, BlockLocationInfo> refreshFunction;
6666
private final BlockLocationInfo blockInfo;
6767
private final DatanodeDetails[] dataLocations;
6868
private final BlockExtendedInputStream[] blockStreams;
@@ -120,8 +120,9 @@ protected int availableParityLocations() {
120120

121121
public ECBlockInputStream(ECReplicationConfig repConfig,
122122
BlockLocationInfo blockInfo, boolean verifyChecksum,
123-
XceiverClientFactory xceiverClientFactory, Function<BlockID,
124-
Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
123+
XceiverClientFactory xceiverClientFactory,
124+
Function<BlockID, BlockLocationInfo> refreshFunction,
125+
BlockInputStreamFactory streamFactory) {
125126
this.repConfig = repConfig;
126127
this.ecChunkSize = repConfig.getEcChunkSize();
127128
this.verifyChecksum = verifyChecksum;
@@ -215,27 +216,30 @@ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
215216
* @param refreshFunc
216217
* @return
217218
*/
218-
protected Function<BlockID, Pipeline> ecPipelineRefreshFunction(
219-
int replicaIndex, Function<BlockID, Pipeline> refreshFunc) {
219+
protected Function<BlockID, BlockLocationInfo> ecPipelineRefreshFunction(
220+
int replicaIndex, Function<BlockID, BlockLocationInfo> refreshFunc) {
220221
return (blockID) -> {
221-
Pipeline ecPipeline = refreshFunc.apply(blockID);
222-
if (ecPipeline == null) {
222+
BlockLocationInfo blockLocationInfo = refreshFunc.apply(blockID);
223+
if (blockLocationInfo == null) {
223224
return null;
224225
}
226+
Pipeline ecPipeline = blockLocationInfo.getPipeline();
225227
DatanodeDetails curIndexNode = ecPipeline.getNodes()
226228
.stream().filter(dn ->
227229
ecPipeline.getReplicaIndex(dn) == replicaIndex)
228230
.findAny().orElse(null);
229231
if (curIndexNode == null) {
230232
return null;
231233
}
232-
return Pipeline.newBuilder().setReplicationConfig(
234+
Pipeline pipeline = Pipeline.newBuilder().setReplicationConfig(
233235
StandaloneReplicationConfig.getInstance(
234236
HddsProtos.ReplicationFactor.ONE))
235237
.setNodes(Collections.singletonList(curIndexNode))
236238
.setId(PipelineID.randomId())
237239
.setState(Pipeline.PipelineState.CLOSED)
238240
.build();
241+
blockLocationInfo.setPipeline(pipeline);
242+
return blockLocationInfo;
239243
};
240244
}
241245

hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.hadoop.hdds.client.ReplicationConfig;
2222
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2323
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
24-
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
2524
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
2625
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
2726

@@ -47,12 +46,12 @@ public interface ECBlockInputStreamFactory {
4746
* @param blockInfo The blockInfo representing the block.
4847
* @param verifyChecksum Whether to verify checksums or not.
4948
* @param xceiverFactory Factory to create the xceiver in the client
50-
* @param refreshFunction Function to refresh the pipeline if needed
49+
* @param refreshFunction Function to refresh the block location if needed
5150
* @return BlockExtendedInputStream of the correct type.
5251
*/
5352
BlockExtendedInputStream create(boolean missingLocations,
5453
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
5554
BlockLocationInfo blockInfo, boolean verifyChecksum,
5655
XceiverClientFactory xceiverFactory,
57-
Function<BlockID, Pipeline> refreshFunction);
56+
Function<BlockID, BlockLocationInfo> refreshFunction);
5857
}

hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.hadoop.hdds.client.ReplicationConfig;
2323
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2424
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
25-
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
2625
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
2726
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
2827
import org.apache.hadoop.io.ByteBufferPool;
@@ -77,7 +76,7 @@ public BlockExtendedInputStream create(boolean missingLocations,
7776
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
7877
BlockLocationInfo blockInfo, boolean verifyChecksum,
7978
XceiverClientFactory xceiverFactory,
80-
Function<BlockID, Pipeline> refreshFunction) {
79+
Function<BlockID, BlockLocationInfo> refreshFunction) {
8180
if (missingLocations) {
8281
// We create the reconstruction reader
8382
ECBlockReconstructedStripeInputStream sis =

hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class ECBlockInputStreamProxy extends BlockExtendedInputStream {
5151
private final ECReplicationConfig repConfig;
5252
private final boolean verifyChecksum;
5353
private final XceiverClientFactory xceiverClientFactory;
54-
private final Function<BlockID, Pipeline> refreshFunction;
54+
private final Function<BlockID, BlockLocationInfo> refreshFunction;
5555
private final BlockLocationInfo blockInfo;
5656
private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
5757

@@ -99,7 +99,8 @@ public static int availableDataLocations(Pipeline pipeline,
9999
public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
100100
BlockLocationInfo blockInfo, boolean verifyChecksum,
101101
XceiverClientFactory xceiverClientFactory, Function<BlockID,
102-
Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
102+
BlockLocationInfo> refreshFunction,
103+
ECBlockInputStreamFactory streamFactory) {
103104
this.repConfig = repConfig;
104105
this.verifyChecksum = verifyChecksum;
105106
this.blockInfo = blockInfo;

hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.hadoop.hdds.client.ECReplicationConfig;
2525
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
2626
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
27-
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
2827
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
2928
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
3029
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
@@ -152,8 +151,9 @@ public class ECBlockReconstructedStripeInputStream extends ECBlockInputStream {
152151
@SuppressWarnings("checkstyle:ParameterNumber")
153152
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
154153
BlockLocationInfo blockInfo, boolean verifyChecksum,
155-
XceiverClientFactory xceiverClientFactory, Function<BlockID,
156-
Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
154+
XceiverClientFactory xceiverClientFactory,
155+
Function<BlockID, BlockLocationInfo> refreshFunction,
156+
BlockInputStreamFactory streamFactory,
157157
ByteBufferPool byteBufferPool,
158158
ExecutorService ecReconstructExecutor) {
159159
super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,

hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class DummyBlockInputStream extends BlockInputStream {
4646
Token<OzoneBlockTokenIdentifier> token,
4747
boolean verifyChecksum,
4848
XceiverClientFactory xceiverClientManager,
49-
Function<BlockID, Pipeline> refreshFunction,
49+
Function<BlockID, BlockLocationInfo> refreshFunction,
5050
List<ChunkInfo> chunkList,
5151
Map<String, byte[]> chunks) {
5252
super(blockId, blockLen, pipeline, token, verifyChecksum,

hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,22 @@
1818
package org.apache.hadoop.hdds.scm.storage;
1919

2020
import java.io.IOException;
21-
import java.util.Collections;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.concurrent.atomic.AtomicBoolean;
2524

2625
import org.apache.hadoop.hdds.client.BlockID;
27-
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
2826
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
29-
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
3027
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
3128
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
29+
import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
3230
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
33-
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
3431
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
3532
import org.apache.hadoop.security.token.Token;
3633

3734
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.when;
3837

3938
/**
4039
* A dummy BlockInputStream with pipeline refresh function to mock read
@@ -60,13 +59,15 @@ final class DummyBlockInputStreamWithRetry
6059
super(blockId, blockLen, pipeline, token, verifyChecksum,
6160
xceiverClientManager, blockID -> {
6261
isRerfreshed.set(true);
63-
return Pipeline.newBuilder()
64-
.setState(Pipeline.PipelineState.OPEN)
65-
.setId(PipelineID.randomId())
66-
.setReplicationConfig(StandaloneReplicationConfig.getInstance(
67-
ReplicationFactor.ONE))
68-
.setNodes(Collections.emptyList())
69-
.build();
62+
try {
63+
BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class);
64+
Pipeline mockPipeline = MockPipeline.createPipeline(1);
65+
when(blockLocationInfo.getPipeline()).thenReturn(mockPipeline);
66+
return blockLocationInfo;
67+
} catch (IOException e) {
68+
throw new RuntimeException(e);
69+
}
70+
7071
}, chunkList, chunkMap);
7172
this.ioException = ioException;
7273
}

0 commit comments

Comments
 (0)