diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfigValidator.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfigValidator.java index 5f546e169..c9c63b57e 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfigValidator.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfigValidator.java @@ -13,6 +13,9 @@ public class StreamConfigValidator { public static StreamConfig validateSourceDetails(StreamConfig streamConfig) { SourceDetails[] sourceDetailsArray = streamConfig.getSourceDetails(); + Preconditions.checkArgument(sourceDetailsArray.length != 0, "%s config is set to " + + "an empty array. Please check the documentation and specify in a valid format.", + STREAM_SOURCE_DETAILS_KEY); for (SourceDetails sourceDetails : sourceDetailsArray) { Preconditions.checkArgument(sourceDetails != null, "One or more elements inside %s " + "is either null or invalid.", STREAM_SOURCE_DETAILS_KEY); @@ -41,7 +44,8 @@ public static StreamConfig validateParquetDataSourceStreamConfigs(StreamConfig s private static StreamConfig validateParquetFilePaths(StreamConfig streamConfig) { String[] parquetFilePaths = streamConfig.getParquetFilePaths(); - Preconditions.checkArgument(parquetFilePaths != null, "%s is required for configuring a Parquet Data Source Stream, but is set to null.", STREAM_SOURCE_PARQUET_FILE_PATHS_KEY); + Preconditions.checkArgument(parquetFilePaths != null, "%s is required for configuring a " + + "Parquet Data Source Stream, but is set to null.", STREAM_SOURCE_PARQUET_FILE_PATHS_KEY); Arrays.stream(parquetFilePaths) .forEach(filePath -> Preconditions.checkArgument(!filePath.equals("null"), "One or more file path inside %s is null.", STREAM_SOURCE_PARQUET_FILE_PATHS_KEY)); diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/config/StreamConfigTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/config/StreamConfigTest.java index bbc5ee7a0..47085e99a 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/source/config/StreamConfigTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/config/StreamConfigTest.java @@ -351,6 +351,20 @@ public void shouldThrowRuntimeExceptionIfSourceDetailsArrayContainsInvalidSource + "is a valid SourceType and ensure no trailing/leading whitespaces are present", exception.getMessage()); } + @Test + public void shouldThrowRuntimeExceptionIfSourceDetailsSetToEmptyArray() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[{\"SOURCE_PARQUET_FILE_PATHS\": [\"gs://some-parquet-path\", \"gs://another-parquet-path\"]," + + "\"SOURCE_PARQUET_BILLING_PROJECT\": \"data-project\"," + + "\"SOURCE_PARQUET_READ_ORDER_STRATEGY\": \"EARLIEST_TIME_URL_FIRST\"," + + "\"SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY\": \"BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH\"," + + "\"SOURCE_DETAILS\": []" + + "}]"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> StreamConfig.parse(configuration)); + assertEquals("SOURCE_DETAILS config is set to an empty array. Please check the documentation and specify " + + "in a valid format.", exception.getMessage()); + } + @Test public void shouldThrowRuntimeExceptionForParquetSourceIfSourceParquetFilePathsArrayContainsInvalidFilePath() { when(configuration.getString(INPUT_STREAMS, ""))