diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfig.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfig.java index a4933f65c..be952f602 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfig.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfig.java @@ -2,6 +2,7 @@ import com.google.gson.annotations.JsonAdapter; import io.odpf.dagger.core.source.config.adapter.FileDateRangeAdaptor; +import io.odpf.dagger.core.source.config.adapter.SourceParquetFilePathsAdapter; import io.odpf.dagger.core.source.config.models.SourceDetails; import io.odpf.dagger.core.source.config.models.SourceName; import io.odpf.dagger.core.source.config.models.SourceType; @@ -22,6 +23,7 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; +import java.util.stream.Stream; import static io.odpf.dagger.common.core.Constants.INPUT_STREAMS; import static io.odpf.dagger.common.core.Constants.STREAM_INPUT_SCHEMA_PROTO_CLASS; @@ -92,10 +94,10 @@ public class StreamConfig { @SerializedName(STREAM_SOURCE_PARQUET_FILE_PATHS_KEY) @Getter + @JsonAdapter(value = SourceParquetFilePathsAdapter.class) private String[] parquetFilePaths; @SerializedName(STREAM_SOURCE_PARQUET_READ_ORDER_STRATEGY_KEY) - @Getter private SourceParquetReadOrderStrategy parquetFilesReadOrderStrategy; @SerializedName(STREAM_SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY_KEY) @@ -122,6 +124,14 @@ public SourceDetails[] getSourceDetails() { } } + public SourceParquetReadOrderStrategy getParquetFilesReadOrderStrategy() { + if (parquetFilesReadOrderStrategy == null) { + return SourceParquetReadOrderStrategy.EARLIEST_TIME_URL_FIRST; + } else { + return parquetFilesReadOrderStrategy; + } + } + public String getAutoOffsetReset() { if (autoOffsetReset == null) { autoOffsetReset = "latest"; @@ -134,7 +144,10 @@ public static StreamConfig[] parse(Configuration configuration) { JsonReader reader = new JsonReader(new StringReader(jsonArrayString)); reader.setLenient(true); - return GSON.fromJson(jsonArrayString, StreamConfig[].class); + return Stream.of(GSON.fromJson(jsonArrayString, StreamConfig[].class)) + .map(StreamConfigValidator::validateSourceDetails) + .map(StreamConfigValidator::validateParquetDataSourceStreamConfigs) + .toArray(StreamConfig[]::new); } public Properties getKafkaProps(Configuration configuration) { diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfigValidator.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfigValidator.java new file mode 100644 index 000000000..5f546e169 --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/config/StreamConfigValidator.java @@ -0,0 +1,50 @@ +package io.odpf.dagger.core.source.config; + +import com.google.common.base.Preconditions; +import io.odpf.dagger.core.source.config.models.SourceDetails; +import io.odpf.dagger.core.source.config.models.SourceName; + +import java.util.Arrays; +import java.util.stream.Stream; + +import static io.odpf.dagger.core.utils.Constants.STREAM_SOURCE_DETAILS_KEY; +import static io.odpf.dagger.core.utils.Constants.STREAM_SOURCE_PARQUET_FILE_PATHS_KEY; + +public class StreamConfigValidator { + public static StreamConfig validateSourceDetails(StreamConfig streamConfig) { + SourceDetails[] sourceDetailsArray = streamConfig.getSourceDetails(); + for (SourceDetails sourceDetails : sourceDetailsArray) { + Preconditions.checkArgument(sourceDetails != null, "One or more elements inside %s " + + "is either null or invalid.", STREAM_SOURCE_DETAILS_KEY); + Preconditions.checkArgument(sourceDetails.getSourceName() != null, "One or more " + + "elements inside %s has null or invalid SourceName. Check if it is a valid SourceName and ensure " + + "no trailing/leading whitespaces are present", STREAM_SOURCE_DETAILS_KEY); + Preconditions.checkArgument(sourceDetails.getSourceType() != null, "One or more " + + "elements inside %s has null or invalid SourceType. Check if it is a valid SourceType and ensure " + + "no trailing/leading whitespaces are present", STREAM_SOURCE_DETAILS_KEY); + } + return streamConfig; + } + + public static StreamConfig validateParquetDataSourceStreamConfigs(StreamConfig streamConfig) { + SourceDetails[] sourceDetailsArray = streamConfig.getSourceDetails(); + for (SourceDetails sourceDetails : sourceDetailsArray) { + if (sourceDetails.getSourceName().equals(SourceName.PARQUET_SOURCE)) { + return Stream.of(streamConfig) + .map(StreamConfigValidator::validateParquetFilePaths) + .findFirst() + .get(); + } + } + return streamConfig; + } + + private static StreamConfig validateParquetFilePaths(StreamConfig streamConfig) { + String[] parquetFilePaths = streamConfig.getParquetFilePaths(); + Preconditions.checkArgument(parquetFilePaths != null, "%s is required for configuring a Parquet Data Source Stream, but is set to null.", STREAM_SOURCE_PARQUET_FILE_PATHS_KEY); + Arrays.stream(parquetFilePaths) + .forEach(filePath -> Preconditions.checkArgument(!filePath.equals("null"), + "One or more file path inside %s is null.", STREAM_SOURCE_PARQUET_FILE_PATHS_KEY)); + return streamConfig; + } +} diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/source/config/adapter/SourceParquetFilePathsAdapter.java b/dagger-core/src/main/java/io/odpf/dagger/core/source/config/adapter/SourceParquetFilePathsAdapter.java new file mode 100644 index 000000000..3b21f055e --- /dev/null +++ b/dagger-core/src/main/java/io/odpf/dagger/core/source/config/adapter/SourceParquetFilePathsAdapter.java @@ -0,0 +1,24 @@ +package io.odpf.dagger.core.source.config.adapter; + +import com.google.gson.Gson; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; +import java.util.Arrays; + +public class SourceParquetFilePathsAdapter extends TypeAdapter { + @Override + public void write(JsonWriter jsonWriter, String[] strings) { + } + + @Override + public String[] read(JsonReader jsonReader) throws IOException { + Gson gson = new Gson(); + String[] filePathArray = gson.fromJson(jsonReader, String[].class); + return Arrays.stream(filePathArray) + .map(String::valueOf) + .map(String::trim).toArray(String[]::new); + } +} diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/config/StreamConfigTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/config/StreamConfigTest.java index 4429ea061..0d95bbdd1 100644 --- a/dagger-core/src/test/java/io/odpf/dagger/core/source/config/StreamConfigTest.java +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/config/StreamConfigTest.java @@ -21,10 +21,12 @@ import static io.odpf.dagger.common.core.Constants.INPUT_STREAMS; import static io.odpf.dagger.core.source.config.models.SourceName.KAFKA_CONSUMER; import static io.odpf.dagger.core.source.config.models.SourceType.UNBOUNDED; +import static io.odpf.dagger.core.source.parquet.SourceParquetReadOrderStrategy.EARLIEST_TIME_URL_FIRST; import static io.odpf.dagger.core.utils.Constants.SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_DEFAULT; import static io.odpf.dagger.core.utils.Constants.SOURCE_KAFKA_CONSUME_LARGE_MESSAGE_ENABLE_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -93,7 +95,6 @@ public void shouldParseMultipleStreamsFromStreamConfigJson() { assertEquals("local-kafka-stream", currConfigNext.getKafkaName()); } - @Test public void shouldParseKafkaProperties() { when(configuration.getString(INPUT_STREAMS, "")).thenReturn("[ { \"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\", \"INPUT_SCHEMA_TABLE\": \"data_stream\", \"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\", \"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\", \"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"\", \"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\", \"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"dummy-consumer-group\", \"SOURCE_KAFKA_NAME\": \"local-kafka-stream\" } ]"); @@ -178,7 +179,8 @@ public void shouldGetSourceDetails() { .thenReturn("[{" + "\"SOURCE_DETAILS\": " + "[{\"SOURCE_TYPE\": \"BOUNDED\", \"SOURCE_NAME\": \"PARQUET_SOURCE\"}," - + "{\"SOURCE_TYPE\": \"UNBOUNDED\", \"SOURCE_NAME\": \"KAFKA_SOURCE\"}]" + + "{\"SOURCE_TYPE\": \"UNBOUNDED\", \"SOURCE_NAME\": \"KAFKA_SOURCE\"}]," + + "\"SOURCE_PARQUET_FILE_PATHS\": [\"gs://some-parquet-path\", \"gs://another-parquet-path\"]" + "}]"); StreamConfig[] streamConfigs = StreamConfig.parse(configuration); @@ -210,6 +212,25 @@ public void shouldGetUnboundedKafkaConsumerAsSourceDetailsWhenNotGiven() { assertEquals(KAFKA_CONSUMER, sourceDetails[0].getSourceName()); } + @Test + public void shouldGetEarliestTimeUrlStrategyAsParquetReadOrderStrategyWhenNotGiven() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[{\"INPUT_SCHEMA_TABLE\": \"data_stream\"," + + "\"SOURCE_KAFKA_TOPIC_NAMES\": \"test-topic\"," + + "\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\": \"localhost:9092\"," + + "\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\": \"true\"," + + "\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\": \"latest\"," + + "\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\": \"test-group-13\"," + + "\"SOURCE_KAFKA_NAME\": \"local-kafka-stream\"," + + "\"INPUT_SCHEMA_PROTO_CLASS\": \"com.tests.TestMessage\"," + + "\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\": \"41\"}]"); + + StreamConfig[] streamConfigs = StreamConfig.parse(configuration); + SourceParquetReadOrderStrategy actualReadOrderStrategy = streamConfigs[0].getParquetFilesReadOrderStrategy(); + + assertEquals(EARLIEST_TIME_URL_FIRST, actualReadOrderStrategy); + } + @Test public void shouldGetParquetSourceProperties() { when(configuration.getString(INPUT_STREAMS, "")) @@ -226,7 +247,6 @@ public void shouldGetParquetSourceProperties() { assertEquals(SourceParquetSchemaMatchStrategy.valueOf("BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH"), streamConfigs[0].getParquetSchemaMatchStrategy()); } - @Test public void shouldParseParquetFileDateRange() { when(configuration.getString(INPUT_STREAMS, "")) @@ -256,4 +276,122 @@ public void shouldReturnEmptyTimeRangeIfParquetFileDateRangeNotGiven() { assertNull(streamConfigs[0].getParquetFileDateRange()); } + + @Test + public void shouldThrowRuntimeExceptionIfSourceDetailsArrayContainsInvalidData() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[{\"SOURCE_PARQUET_FILE_PATHS\": [\"gs://some-parquet-path\", \"gs://another-parquet-path\"]," + + "\"SOURCE_PARQUET_BILLING_PROJECT\": \"data-project\"," + + "\"SOURCE_PARQUET_READ_ORDER_STRATEGY\": \"EARLIEST_TIME_URL_FIRST\"," + + "\"SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY\": \"BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH\"," + + "\"SOURCE_DETAILS\": " + + "[null, {\"SOURCE_TYPE\": \"BOUNDED\", \"SOURCE_NAME\": \"PARQUET_SOURCE\"}]" + + "}]"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> StreamConfig.parse(configuration)); + assertEquals("One or more elements inside SOURCE_DETAILS is either null or invalid.", exception.getMessage()); + } + + @Test + public void shouldThrowRuntimeExceptionIfSourceDetailsArrayHasMissingSourceName() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[{\"SOURCE_PARQUET_FILE_PATHS\": [\"gs://some-parquet-path\", \"gs://another-parquet-path\"]," + + "\"SOURCE_PARQUET_BILLING_PROJECT\": \"data-project\"," + + "\"SOURCE_PARQUET_READ_ORDER_STRATEGY\": \"EARLIEST_TIME_URL_FIRST\"," + + "\"SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY\": \"BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH\"," + + "\"SOURCE_DETAILS\": " + + "[{\"SOURCE_TYPE\": \"BOUNDED\"}]" + + "}]"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> StreamConfig.parse(configuration)); + assertEquals("One or more elements inside SOURCE_DETAILS has null or invalid SourceName. " + + "Check if it is a valid SourceName and ensure no trailing/leading whitespaces are present", exception.getMessage()); + } + + @Test + public void shouldThrowRuntimeExceptionIfSourceDetailsArrayContainsInvalidSourceName() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[{\"SOURCE_PARQUET_FILE_PATHS\": [\"gs://some-parquet-path\", \"gs://another-parquet-path\"]," + + "\"SOURCE_PARQUET_BILLING_PROJECT\": \"data-project\"," + + "\"SOURCE_PARQUET_READ_ORDER_STRATEGY\": \"EARLIEST_TIME_URL_FIRST\"," + + "\"SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY\": \"BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH\"," + + "\"SOURCE_DETAILS\": " + + "[{\"SOURCE_TYPE\": \"BOUNDED\", \"SOURCE_NAME\": \" KAFKA_SOURCE\"}]" + + "}]"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> StreamConfig.parse(configuration)); + assertEquals("One or more elements inside SOURCE_DETAILS has null or invalid SourceName. " + + "Check if it is a valid SourceName and ensure no trailing/leading whitespaces are present", exception.getMessage()); + } + + @Test + public void shouldThrowRuntimeExceptionIfSourceDetailsArrayHasMissingSourceType() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[{\"SOURCE_PARQUET_FILE_PATHS\": [\"gs://some-parquet-path\", \"gs://another-parquet-path\"]," + + "\"SOURCE_PARQUET_BILLING_PROJECT\": \"data-project\"," + + "\"SOURCE_PARQUET_READ_ORDER_STRATEGY\": \"EARLIEST_TIME_URL_FIRST\"," + + "\"SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY\": \"BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH\"," + + "\"SOURCE_DETAILS\": " + + "[{\"SOURCE_NAME\": \"PARQUET_SOURCE\"}]" + + "}]"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> StreamConfig.parse(configuration)); + assertEquals("One or more elements inside SOURCE_DETAILS has null or invalid SourceType. Check if it " + + "is a valid SourceType and ensure no trailing/leading whitespaces are present", exception.getMessage()); + } + + @Test + public void shouldThrowRuntimeExceptionIfSourceDetailsArrayContainsInvalidSourceType() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[{\"SOURCE_PARQUET_FILE_PATHS\": [\"gs://some-parquet-path\", \"gs://another-parquet-path\"]," + + "\"SOURCE_PARQUET_BILLING_PROJECT\": \"data-project\"," + + "\"SOURCE_PARQUET_READ_ORDER_STRATEGY\": \"EARLIEST_TIME_URL_FIRST\"," + + "\"SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY\": \"BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH\"," + + "\"SOURCE_DETAILS\": " + + "[{\"SOURCE_TYPE\": \" BOUNDED\", \"SOURCE_NAME\": \"KAFKA_SOURCE\"}]" + + "}]"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> StreamConfig.parse(configuration)); + assertEquals("One or more elements inside SOURCE_DETAILS has null or invalid SourceType. Check if it " + + "is a valid SourceType and ensure no trailing/leading whitespaces are present", exception.getMessage()); + } + + @Test + public void shouldThrowRuntimeExceptionForParquetSourceIfSourceParquetFilePathsArrayContainsInvalidFilePath() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[{\"SOURCE_PARQUET_FILE_PATHS\": [null, \"gs://another-parquet-path\"]," + + "\"SOURCE_PARQUET_BILLING_PROJECT\": \"data-project\"," + + "\"SOURCE_PARQUET_READ_ORDER_STRATEGY\": \"EARLIEST_TIME_URL_FIRST\"," + + "\"SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY\": \"BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH\"," + + "\"SOURCE_DETAILS\": " + + "[{\"SOURCE_TYPE\": \"BOUNDED\", \"SOURCE_NAME\": \"PARQUET_SOURCE\"}]" + + "}]"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> StreamConfig.parse(configuration)); + assertEquals("One or more file path inside SOURCE_PARQUET_FILE_PATHS is null.", exception.getMessage()); + } + + @Test + public void shouldThrowRuntimeExceptionForParquetSourceIfSourceParquetFilePathsArrayIsNull() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[{\"SOURCE_PARQUET_BILLING_PROJECT\": \"data-project\"," + + "\"SOURCE_PARQUET_READ_ORDER_STRATEGY\": \"EARLIEST_TIME_URL_FIRST\"," + + "\"SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY\": \"BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH\"," + + "\"SOURCE_DETAILS\": " + + "[{\"SOURCE_TYPE\": \"BOUNDED\", \"SOURCE_NAME\": \"PARQUET_SOURCE\"}]" + + "}]"); + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> StreamConfig.parse(configuration)); + assertEquals("SOURCE_PARQUET_FILE_PATHS is required for configuring a Parquet Data Source Stream, " + + "but is set to null.", exception.getMessage()); + } + + @Test + public void shouldTrimLeadingAndTrailingWhitespacesFromParquetFilePathsWhenParquetSourceConfigured() { + when(configuration.getString(INPUT_STREAMS, "")) + .thenReturn("[{\"SOURCE_PARQUET_FILE_PATHS\": [\" gs://some-parquet-path\", \" gs://another-parquet-path \"]," + + "\"SOURCE_PARQUET_BILLING_PROJECT\": \"data-project\"," + + "\"SOURCE_PARQUET_READ_ORDER_STRATEGY\": \"EARLIEST_TIME_URL_FIRST\"," + + "\"SOURCE_PARQUET_SCHEMA_MATCH_STRATEGY\": \"BACKWARD_COMPATIBLE_SCHEMA_WITH_FAIL_ON_TYPE_MISMATCH\"," + + "\"SOURCE_DETAILS\": " + + "[{\"SOURCE_TYPE\": \"BOUNDED\", \"SOURCE_NAME\": \"PARQUET_SOURCE\"}," + + "{\"SOURCE_TYPE\": \"UNBOUNDED\", \"SOURCE_NAME\": \"KAFKA_SOURCE\"}]" + + "}]"); + StreamConfig[] streamConfigs = StreamConfig.parse(configuration); + + Assert.assertArrayEquals(new String[]{"gs://some-parquet-path", "gs://another-parquet-path"}, streamConfigs[0].getParquetFilePaths()); + } } diff --git a/dagger-core/src/test/java/io/odpf/dagger/core/source/config/adapter/SourceParquetFilePathsAdapterTest.java b/dagger-core/src/test/java/io/odpf/dagger/core/source/config/adapter/SourceParquetFilePathsAdapterTest.java new file mode 100644 index 000000000..4212ebca2 --- /dev/null +++ b/dagger-core/src/test/java/io/odpf/dagger/core/source/config/adapter/SourceParquetFilePathsAdapterTest.java @@ -0,0 +1,52 @@ +package io.odpf.dagger.core.source.config.adapter; + +import com.google.gson.stream.JsonReader; +import org.junit.Test; + +import java.io.IOException; +import java.io.StringReader; + +import static org.junit.Assert.assertArrayEquals; + +public class SourceParquetFilePathsAdapterTest { + @Test + public void shouldDeserializeJsonArrayToStringArray() throws IOException { + String parquetFilePathJSONString = "[\"gs://something\", \"gs://anything\"]"; + JsonReader reader = new JsonReader(new StringReader(parquetFilePathJSONString)); + + SourceParquetFilePathsAdapter adapter = new SourceParquetFilePathsAdapter(); + + assertArrayEquals(new String[]{"gs://something", "gs://anything"}, adapter.read(reader)); + } + + @Test + public void shouldDeserializeEmptyJsonArrayToEmptyStringArray() throws IOException { + String parquetFilePathJSONString = "[]"; + JsonReader reader = new JsonReader(new StringReader(parquetFilePathJSONString)); + + SourceParquetFilePathsAdapter adapter = new SourceParquetFilePathsAdapter(); + + assertArrayEquals(new String[]{}, adapter.read(reader)); + } + + @Test + public void shouldDeserializeJsonArrayContainingNullsToStringArray() throws IOException { + String parquetFilePathJSONString = "[null, \"gs://anything\"]"; + JsonReader reader = new JsonReader(new StringReader(parquetFilePathJSONString)); + + SourceParquetFilePathsAdapter adapter = new SourceParquetFilePathsAdapter(); + + assertArrayEquals(new String[]{"null", "gs://anything"}, adapter.read(reader)); + } + + @Test + public void shouldDeserializeByTrimmingLeadingAndTrailingWhitespacesFromEachElementIfAny() throws IOException { + String parquetFilePathJSONString = "[null, \" gs://something\", \"gs://anything \"]"; + + JsonReader reader = new JsonReader(new StringReader(parquetFilePathJSONString)); + + SourceParquetFilePathsAdapter adapter = new SourceParquetFilePathsAdapter(); + + assertArrayEquals(new String[]{"null", "gs://something", "gs://anything"}, adapter.read(reader)); + } +}