Skip to content

Commit 9d0c8e8

Browse files
committed
feat: check if open is called
[raystack#108]
1 parent 7f26745 commit 9d0c8e8

1 file changed

Lines changed: 8 additions & 1 deletion

File tree

dagger-core/src/main/java/io/odpf/dagger/core/source/parquet/reader/ParquetReader.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.odpf.dagger.common.serde.parquet.deserialization.SimpleGroupDeserializer;
44
import io.odpf.dagger.core.exception.ParquetFileSourceReaderInitializationException;
5+
import org.apache.flink.api.common.functions.AbstractRichFunction;
56
import org.apache.flink.connector.file.src.reader.FileRecordFormat;
67
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
78
import org.apache.flink.types.Row;
@@ -23,7 +24,7 @@
2324
import javax.annotation.Nullable;
2425
import java.io.IOException;
2526

26-
public class ParquetReader implements FileRecordFormat.Reader<Row> {
27+
public class ParquetReader extends AbstractRichFunction implements FileRecordFormat.Reader<Row> {
2728
private final Path hadoopFilePath;
2829
private final SimpleGroupDeserializer simpleGroupDeserializer;
2930
private long currentRecordIndex;
@@ -42,6 +43,12 @@ private ParquetReader(Path hadoopFilePath, SimpleGroupDeserializer simpleGroupDe
4243
this.schema = this.parquetFileReader.getFileMetaData().getSchema();
4344
this.isRecordReaderInitialized = false;
4445
this.totalEmittedRowCount = 0L;
46+
LOGGER.info("Constructor called in ParquetReader class");
47+
}
48+
49+
@Override
50+
public void open(org.apache.flink.configuration.Configuration internalFlinkConfig) throws Exception {
51+
LOGGER.info("Lifecycle function open called in ParquetReader class");
4552
}
4653

4754
private boolean checkIfNullPage(PageReadStore page) {

0 commit comments

Comments
 (0)