Skip to content

Commit c577227

Browse files
committed
HDDS-12935. Support unsigned chunked upload with STREAMING-UNSIGNED-PAYLOAD-TRAILER (apache#8366)
(cherry picked from commit e9e8b30)
1 parent fe4cb7c commit c577227

23 files changed

Lines changed: 1024 additions & 129 deletions

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import com.amazonaws.AmazonServiceException;
3131
import com.amazonaws.AmazonServiceException.ErrorType;
32+
import com.amazonaws.HttpMethod;
3233
import com.amazonaws.services.s3.AmazonS3;
3334
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
3435
import com.amazonaws.services.s3.model.AccessControlList;
@@ -37,6 +38,7 @@
3738
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
3839
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
3940
import com.amazonaws.services.s3.model.CreateBucketRequest;
41+
import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest;
4042
import com.amazonaws.services.s3.model.GetObjectRequest;
4143
import com.amazonaws.services.s3.model.Grantee;
4244
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
@@ -69,17 +71,22 @@
6971
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
7072
import com.amazonaws.services.s3.transfer.Upload;
7173
import com.amazonaws.services.s3.transfer.model.UploadResult;
74+
import com.amazonaws.util.IOUtils;
7275
import java.io.ByteArrayInputStream;
7376
import java.io.ByteArrayOutputStream;
7477
import java.io.File;
7578
import java.io.IOException;
7679
import java.io.InputStream;
80+
import java.net.HttpURLConnection;
81+
import java.net.URL;
7782
import java.nio.charset.StandardCharsets;
7883
import java.nio.file.Files;
7984
import java.nio.file.Path;
85+
import java.time.Instant;
8086
import java.util.ArrayList;
8187
import java.util.Arrays;
8288
import java.util.Collections;
89+
import java.util.Date;
8390
import java.util.HashMap;
8491
import java.util.HashSet;
8592
import java.util.List;
@@ -955,6 +962,42 @@ public void testQuotaExceeded() throws IOException {
955962
assertEquals("QuotaExceeded", ase.getErrorCode());
956963
}
957964

965+
@Test
966+
public void testPresignedUrlGet() throws IOException {
967+
final String bucketName = getBucketName();
968+
final String keyName = getKeyName();
969+
final String content = "bar";
970+
s3Client.createBucket(bucketName);
971+
972+
InputStream is = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
973+
974+
s3Client.putObject(bucketName, keyName, is, new ObjectMetadata());
975+
976+
// Set the presigned URL to expire after one hour.
977+
Date expiration = Date.from(Instant.now().plusMillis(1000 * 60 * 60));
978+
979+
// Generate the presigned URL
980+
GeneratePresignedUrlRequest generatePresignedUrlRequest =
981+
new GeneratePresignedUrlRequest(bucketName, keyName)
982+
.withMethod(HttpMethod.GET)
983+
.withExpiration(expiration);
984+
generatePresignedUrlRequest.addRequestParameter("x-custom-parameter", "custom-value");
985+
URL url = s3Client.generatePresignedUrl(generatePresignedUrlRequest);
986+
987+
// Download the object using HttpUrlConnection (since v1.1)
988+
// Capture the response body to a byte array.
989+
URL presignedUrl = new URL(url.toExternalForm());
990+
HttpURLConnection connection = (HttpURLConnection) presignedUrl.openConnection();
991+
connection.setRequestMethod("GET");
992+
// Download the result of executing the request.
993+
try (InputStream s3is = connection.getInputStream();
994+
ByteArrayOutputStream bos = new ByteArrayOutputStream(
995+
content.getBytes(StandardCharsets.UTF_8).length)) {
996+
IOUtils.copy(s3is, bos);
997+
assertEquals(content, bos.toString("UTF-8"));
998+
}
999+
}
1000+
9581001
private boolean isBucketEmpty(Bucket bucket) {
9591002
ObjectListing objectListing = s3Client.listObjects(bucket.getName());
9601003
return objectListing.getObjectSummaries().isEmpty();

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,18 @@
2323
import static org.junit.jupiter.api.Assertions.assertEquals;
2424
import static org.junit.jupiter.api.Assertions.assertTrue;
2525

26+
import java.io.ByteArrayOutputStream;
2627
import java.io.File;
2728
import java.io.IOException;
2829
import java.io.InputStream;
2930
import java.io.RandomAccessFile;
31+
import java.net.HttpURLConnection;
32+
import java.net.URL;
3033
import java.nio.ByteBuffer;
34+
import java.nio.charset.StandardCharsets;
3135
import java.nio.file.Files;
3236
import java.nio.file.Path;
37+
import java.time.Duration;
3338
import java.util.ArrayList;
3439
import java.util.Arrays;
3540
import java.util.HashMap;
@@ -48,20 +53,33 @@
4853
import org.junit.jupiter.api.io.TempDir;
4954
import software.amazon.awssdk.core.ResponseBytes;
5055
import software.amazon.awssdk.core.sync.RequestBody;
56+
import software.amazon.awssdk.http.HttpExecuteRequest;
57+
import software.amazon.awssdk.http.HttpExecuteResponse;
58+
import software.amazon.awssdk.http.SdkHttpClient;
59+
import software.amazon.awssdk.http.SdkHttpMethod;
60+
import software.amazon.awssdk.http.SdkHttpRequest;
61+
import software.amazon.awssdk.http.apache.ApacheHttpClient;
62+
import software.amazon.awssdk.services.s3.S3AsyncClient;
5163
import software.amazon.awssdk.services.s3.S3Client;
64+
import software.amazon.awssdk.services.s3.S3Configuration;
5265
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
5366
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
5467
import software.amazon.awssdk.services.s3.model.CompletedPart;
5568
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
5669
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
5770
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
71+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
5872
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
5973
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
6074
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
6175
import software.amazon.awssdk.services.s3.model.Tag;
6276
import software.amazon.awssdk.services.s3.model.Tagging;
6377
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
6478
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
79+
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
80+
import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest;
81+
import software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest;
82+
import software.amazon.awssdk.utils.IoUtils;
6583

6684
/**
6785
* This is an abstract class to test the AWS Java S3 SDK operations.
@@ -195,6 +213,78 @@ public void testLowLevelMultipartUpload(@TempDir Path tempDir) throws Exception
195213
assertEquals(userMetadata, headObjectResponse.metadata());
196214
}
197215

216+
@Test
217+
public void testPresignedUrlGet() throws Exception {
218+
final String bucketName = getBucketName();
219+
final String keyName = getKeyName();
220+
final String content = "bar";
221+
s3Client.createBucket(b -> b.bucket(bucketName));
222+
223+
s3Client.putObject(b -> b
224+
.bucket(bucketName)
225+
.key(keyName),
226+
RequestBody.fromString(content));
227+
228+
try (S3Presigner presigner = S3Presigner.builder()
229+
// TODO: Find a way to retrieve the path style configuration from S3Client instead
230+
.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
231+
.endpointOverride(s3Client.serviceClientConfiguration().endpointOverride().get())
232+
.region(s3Client.serviceClientConfiguration().region())
233+
.credentialsProvider(s3Client.serviceClientConfiguration().credentialsProvider()).build()) {
234+
GetObjectRequest objectRequest = GetObjectRequest.builder()
235+
.bucket(bucketName)
236+
.key(keyName)
237+
.build();
238+
239+
GetObjectPresignRequest presignRequest = GetObjectPresignRequest.builder()
240+
.signatureDuration(Duration.ofMinutes(10)) // The URL will expire in 10 minutes.
241+
.getObjectRequest(objectRequest)
242+
.build();
243+
244+
PresignedGetObjectRequest presignedRequest = presigner.presignGetObject(presignRequest);
245+
246+
// Download the object using HttpUrlConnection (since v1.1)
247+
// Capture the response body to a byte array.
248+
URL presignedUrl = presignedRequest.url();
249+
HttpURLConnection connection = (HttpURLConnection) presignedUrl.openConnection();
250+
connection.setRequestMethod("GET");
251+
// Download the result of executing the request.
252+
try (InputStream s3is = connection.getInputStream();
253+
ByteArrayOutputStream bos = new ByteArrayOutputStream(
254+
content.getBytes(StandardCharsets.UTF_8).length)) {
255+
IoUtils.copy(s3is, bos);
256+
assertEquals(content, bos.toString("UTF-8"));
257+
}
258+
259+
// Use the AWS SDK for Java SdkHttpClient class to do the download
260+
SdkHttpRequest request = SdkHttpRequest.builder()
261+
.method(SdkHttpMethod.GET)
262+
.uri(presignedUrl.toURI())
263+
.build();
264+
265+
HttpExecuteRequest executeRequest = HttpExecuteRequest.builder()
266+
.request(request)
267+
.build();
268+
269+
try (SdkHttpClient sdkHttpClient = ApacheHttpClient.create();
270+
ByteArrayOutputStream bos = new ByteArrayOutputStream(
271+
content.getBytes(StandardCharsets.UTF_8).length)) {
272+
HttpExecuteResponse response = sdkHttpClient.prepareRequest(executeRequest).call();
273+
assertTrue(response.responseBody().isPresent(), () -> "The presigned url download request " +
274+
"should have a response body");
275+
response.responseBody().ifPresent(
276+
abortableInputStream -> {
277+
try {
278+
IoUtils.copy(abortableInputStream, bos);
279+
} catch (IOException e) {
280+
throw new RuntimeException(e);
281+
}
282+
});
283+
assertEquals(content, bos.toString("UTF-8"));
284+
}
285+
}
286+
}
287+
198288
private String getBucketName() {
199289
return getBucketName(null);
200290
}

hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/SignedChunksInputStream.java

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,57 @@
1717

1818
package org.apache.hadoop.ozone.s3;
1919

20+
import static org.apache.hadoop.ozone.s3.util.S3Utils.eol;
21+
2022
import java.io.IOException;
2123
import java.io.InputStream;
24+
import java.util.Objects;
2225
import java.util.regex.Matcher;
2326
import java.util.regex.Pattern;
2427

2528
/**
26-
* Input stream implementation to read body with chunked signatures. This should also work
29+
* Input stream implementation to read body of a signed chunked upload. This should also work
2730
* with the chunked payloads with trailer.
2831
*
32+
* <p>
33+
* Example chunk data:
34+
* <pre>
35+
* 10000;chunk-signature=b474d8862b1487a5145d686f57f013e54db672cee1c953b3010fb58501ef5aa2\r\n
36+
* &lt;65536-bytes&gt;\r\n
37+
* 400;chunk-signature=1c1344b170168f8e65b41376b44b20fe354e373826ccbbe2c1d40a8cae51e5c7\r\n
38+
* &lt;1024-bytes&gt;\r\n
39+
* 0;chunk-signature=b6c6ea8a5354eaf15b3cb7646744f4275b71ea724fed81ceb9323e279d449df9\r\n
40+
* x-amz-checksum-crc32c:sOO8/Q==\r\n
41+
* x-amz-trailer-signature:63bddb248ad2590c92712055f51b8e78ab024eead08276b24f010b0efd74843f\r\n
42+
* </pre>
43+
* </p>
44+
* For the first chunk 10000 will be read and decoded from base-16 representation to 65536, which is the size of
45+
* the first chunk payload. Each chunk upload ends with a zero-byte final additional chunk.
46+
* At the end, there might be a trailer checksum payload and signature, depending on whether the x-amz-content-sha256
47+
* header value contains "-TRAILER" suffix (e.g. STREAMING-AWS4-HMAC-SHA256-PAYLOAD-TRAILER
48+
* and STREAMING-AWS4-ECDSA-P256-SHA256-PAYLOAD-TRAILER) and "x-amz-trailer" is specified (e.g. x-amz-checksum-crc32c).
49+
* <p>
50+
*
51+
* <p>
52+
* The logic is similar to {@link UnsignedChunksInputStream}, but there is a "chunk-signature" to parse.
53+
* </p>
54+
*
55+
* <p>
2956
* Note that there are no actual chunk signature verification taking place. The InputStream only
3057
* returns the actual chunk payload from chunked signatures format.
58+
* </p>
3159
*
32-
* See
33-
* - https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
34-
* - https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming-trailers.html
60+
* Reference:
61+
* <ul>
62+
* <li>
63+
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html">
64+
* Signature Calculation: Transfer Payload in Multiple Chunks</a>
65+
* </li>
66+
* <li>
67+
* <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming-trailers.html">
68+
* Signature Calculation: Including Trailing Headers</a>
69+
* </li>
70+
* </ul>
3571
*/
3672
public class SignedChunksInputStream extends InputStream {
3773

@@ -46,12 +82,21 @@ public class SignedChunksInputStream extends InputStream {
4682
*/
4783
private int remainingData = 0;
4884

85+
/**
86+
* Every chunked uploads (multiple chunks) contains an additional final zero-byte
87+
* chunk. This can be used as the end-of-file marker.
88+
*/
89+
private boolean isFinalChunkEncountered = false;
90+
4991
public SignedChunksInputStream(InputStream inputStream) {
5092
originalStream = inputStream;
5193
}
5294

5395
@Override
5496
public int read() throws IOException {
97+
if (isFinalChunkEncountered) {
98+
return -1;
99+
}
55100
if (remainingData > 0) {
56101
int curr = originalStream.read();
57102
remainingData--;
@@ -63,7 +108,10 @@ public int read() throws IOException {
63108
return curr;
64109
} else {
65110
remainingData = readContentLengthFromHeader();
66-
if (remainingData == -1) {
111+
if (remainingData <= 0) {
112+
// there is always a final zero byte chunk so we can stop reading
113+
// if we encounter this chunk
114+
isFinalChunkEncountered = true;
67115
return -1;
68116
}
69117
return read();
@@ -72,12 +120,14 @@ public int read() throws IOException {
72120

73121
@Override
74122
public int read(byte[] b, int off, int len) throws IOException {
75-
if (b == null) {
76-
throw new NullPointerException();
77-
} else if (off < 0 || len < 0 || len > b.length - off) {
78-
throw new IndexOutOfBoundsException();
123+
Objects.requireNonNull(b, "b == null");
124+
if (off < 0 || len < 0 || len > b.length - off) {
125+
throw new IndexOutOfBoundsException("Offset=" + off + " and len="
126+
+ len + " don't match the array length of " + b.length);
79127
} else if (len == 0) {
80128
return 0;
129+
} else if (isFinalChunkEncountered) {
130+
return -1;
81131
}
82132
int currentOff = off;
83133
int currentLen = len;
@@ -103,7 +153,12 @@ public int read(byte[] b, int off, int len) throws IOException {
103153
}
104154
} else {
105155
remainingData = readContentLengthFromHeader();
106-
if (remainingData == -1) {
156+
if (remainingData == 0) {
157+
// there is always a final zero byte chunk so we can stop reading
158+
// if we encounter this chunk
159+
isFinalChunkEncountered = true;
160+
}
161+
if (isFinalChunkEncountered || remainingData == -1) {
107162
break;
108163
}
109164
}
@@ -125,10 +180,9 @@ private int readContentLengthFromHeader() throws IOException {
125180
prev = curr;
126181
curr = next;
127182
}
128-
// Example
129-
// The chunk data sent:
130-
// 10000;chunk-signature=b474d8862b1487a5145d686f57f013e54db672cee1c953b3010fb58501ef5aa2
131-
// <65536-bytes>
183+
// Example of a single chunk data:
184+
// 10000;chunk-signature=b474d8862b1487a5145d686f57f013e54db672cee1c953b3010fb58501ef5aa2\r\n
185+
// <65536-bytes>\r\n
132186
//
133187
// 10000 will be read and decoded from base-16 representation to 65536, which is the size of
134188
// the subsequent chunk payload.
@@ -145,8 +199,4 @@ private int readContentLengthFromHeader() throws IOException {
145199
throw new IOException("Invalid signature line: " + signatureLine);
146200
}
147201
}
148-
149-
private boolean eol(int prev, int curr) {
150-
return prev == 13 && curr == 10;
151-
}
152202
}

0 commit comments

Comments
 (0)