Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
public class StreamConfigValidator {
public static StreamConfig validateSourceDetails(StreamConfig streamConfig) {
SourceDetails[] sourceDetailsArray = streamConfig.getSourceDetails();
Preconditions.checkArgument(sourceDetailsArray.length != 0, "%s config is set to "
+ "an empty array. Please check the documentation and specify in a valid format.",
STREAM_SOURCE_DETAILS_KEY);
for (SourceDetails sourceDetails : sourceDetailsArray) {
Preconditions.checkArgument(sourceDetails != null, "One or more elements inside %s "
+ "is either null or invalid.", STREAM_SOURCE_DETAILS_KEY);
Expand Down Expand Up @@ -41,7 +44,8 @@ public static StreamConfig validateParquetDataSourceStreamConfigs(StreamConfig s

private static StreamConfig validateParquetFilePaths(StreamConfig streamConfig) {
String[] parquetFilePaths = streamConfig.getParquetFilePaths();
Preconditions.checkArgument(parquetFilePaths != null, "%s is required for configuring a Parquet Data Source Stream, but is set to null.", STREAM_SOURCE_PARQUET_FILE_PATHS_KEY);
Preconditions.checkArgument(parquetFilePaths != null, "%s is required for configuring a "
+ "Parquet Data Source Stream, but is set to null.", STREAM_SOURCE_PARQUET_FILE_PATHS_KEY);
Arrays.stream(parquetFilePaths)
.forEach(filePath -> Preconditions.checkArgument(!filePath.equals("null"),
"One or more file path inside %s is null.", STREAM_SOURCE_PARQUET_FILE_PATHS_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,20 @@ public void shouldThrowRuntimeExceptionIfSourceDetailsArrayContainsInvalidSource
+ "is a valid SourceType and ensure no trailing/leading whitespaces are present", exception.getMessage());
}

@Test
public void shouldThrowRuntimeExceptionIfSourceDetailsSetToEmptyArray() {
when(configuration.getString(INPUT_STREAMS, ""))
.thenReturn("[{\"SOURCE_PARQUET_FILE_PATHS\": [\"gs://some-parquet-path\", \"gs://another-parquet-path\"],"
+ "\"SOURCE_PARQUET_BILLING_PROJECT\": \"data-project\","
+ "\"SOURCE_PARQUET_READ_ORDER_STRATEGY\": \"EARLIEST_TIME_URL_FIRST\","
+ "\"SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY\": \"BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH\","
+ "\"SOURCE_DETAILS\": []"
+ "}]");
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> StreamConfig.parse(configuration));
assertEquals("SOURCE_DETAILS config is set to an empty array. Please check the documentation and specify "
+ "in a valid format.", exception.getMessage());
}

@Test
public void shouldThrowRuntimeExceptionForParquetSourceIfSourceParquetFilePathsArrayContainsInvalidFilePath() {
when(configuration.getString(INPUT_STREAMS, ""))
Expand Down