diff --git a/.github/workflows/python_release.yml b/.github/workflows/python_release.yml new file mode 100644 index 000000000..3721e2368 --- /dev/null +++ b/.github/workflows/python_release.yml @@ -0,0 +1,25 @@ +name: Python Package +on: + release: + types: [created] + +jobs: + publishPythonZip: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Zip Python Udf + run: | + cd dagger-py-functions + zip -r python_udfs.zip udfs -x "*/__init__.py" + zip -jr data.zip data + zip -r dagger-py-functions.zip requirements.txt data.zip python_udfs.zip + - name: Upload Release + uses: ncipollo/release-action@v1 + with: + artifacts: dagger-py-functions/dagger-py-functions.zip + allowUpdates: true + omitNameDuringUpdate: true + omitBodyDuringUpdate: true + omitPrereleaseDuringUpdate: true + token: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.github/workflows/python_validation.yml b/.github/workflows/python_validation.yml new file mode 100644 index 000000000..0a8b04c40 --- /dev/null +++ b/.github/workflows/python_validation.yml @@ -0,0 +1,27 @@ +name: Python Validation + +on: push + +jobs: + pythonValidation: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.8 + uses: actions/setup-python@v3 + with: + python-version: '3.8' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install apache-flink==1.14.3 + cd dagger-py-functions + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + - name: Test with pytest + run: | + cd dagger-py-functions + pytest --disable-warnings \ No newline at end of file diff --git a/dagger-core/env/local.properties b/dagger-core/env/local.properties index 6127bbae2..8d4372bf8 100644 --- a/dagger-core/env/local.properties +++ b/dagger-core/env/local.properties @@ -26,3 +26,7 @@ METRIC_TELEMETRY_ENABLE=true # == Others == FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.FunctionFactory FLINK_ROWTIME_ATTRIBUTE_NAME=rowtime + +# == Python Udf == +PYTHON_UDF_ENABLE=false +PYTHON_UDF_CONFIG={"PYTHON_FILES":"/path/to/files.zip", "PYTHON_REQUIREMENTS": "requirements.txt", "PYTHON_FN_EXECUTION_BUNDLE_SIZE": "1000"} \ No newline at end of file diff --git a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java index a2ed9b426..969a0bb21 100644 --- a/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java +++ b/dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java @@ -19,6 +19,8 @@ import io.odpf.dagger.core.source.Stream; import io.odpf.dagger.core.source.StreamsFactory; import io.odpf.dagger.core.utils.Constants; +import io.odpf.dagger.functions.udfs.python.PythonUdfConfig; +import io.odpf.dagger.functions.udfs.python.PythonUdfManager; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -28,12 +30,15 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.util.List; import static io.odpf.dagger.core.utils.Constants.*; +import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_DEFAULT; +import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_KEY; import static org.apache.flink.table.api.Expressions.$; /** @@ -138,7 +143,13 @@ private ApiExpression[] getApiExpressions(StreamInfo streamInfo) { * * @return the stream manager */ - public StreamManager registerFunctions() { + public StreamManager registerFunctions() throws IOException { + if (configuration.getBoolean(PYTHON_UDF_ENABLE_KEY, PYTHON_UDF_ENABLE_DEFAULT)) { + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + } + String[] functionFactoryClasses = configuration .getString(Constants.FUNCTION_FACTORY_CLASSES_KEY, Constants.FUNCTION_FACTORY_CLASSES_DEFAULT) .split(","); diff --git a/dagger-functions/build.gradle b/dagger-functions/build.gradle index 21720686e..1303f9494 100644 --- a/dagger-functions/build.gradle +++ b/dagger-functions/build.gradle @@ -46,6 +46,7 @@ sourceSets { dependencies { compileOnly 'org.projectlombok:lombok:1.18.8' annotationProcessor 'org.projectlombok:lombok:1.18.8' + compileOnly project(path: ':dagger-common', configuration: 'minimalCommonJar') compileOnly project(path: ':dagger-common', configuration: 'dependenciesCommonJar') compileOnly 'org.apache.flink:flink-streaming-java_2.11:' + flinkVersion @@ -62,7 +63,7 @@ dependencies { dependenciesFunctionsJar group: 'org.apache.commons', name: 'commons-jexl3', version: '3.1' dependenciesFunctionsJar group: 'org.isuper', name: 's2-geometry-library-java', version: '0.0.1' dependenciesFunctionsJar group: 'com.google.cloud', name: 'google-cloud-storage', version: '1.67.0' - + testImplementation project(':dagger-common').sourceSets.test.output testImplementation group: 'junit', name: 'junit', version: '4.12' testImplementation 'org.mockito:mockito-core:2.0.99-beta' diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/common/Constants.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/common/Constants.java index 485bcd275..fff13666c 100644 --- a/dagger-functions/src/main/java/io/odpf/dagger/functions/common/Constants.java +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/common/Constants.java @@ -7,4 +7,17 @@ public class Constants { public static final String UDF_DART_GCS_PROJECT_ID_DEFAULT = ""; public static final String UDF_DART_GCS_BUCKET_ID_KEY = "UDF_DART_GCS_BUCKET_ID"; public static final String UDF_DART_GCS_BUCKET_ID_DEFAULT = ""; + + public static final String PYTHON_UDF_CONFIG = "PYTHON_UDF_CONFIG"; + public static final String PYTHON_UDF_ENABLE_KEY = "PYTHON_UDF_ENABLE"; + public static final boolean PYTHON_UDF_ENABLE_DEFAULT = false; + public static final String PYTHON_FILES_KEY = "PYTHON_FILES"; + public static final String PYTHON_REQUIREMENTS_KEY = "PYTHON_REQUIREMENTS"; + public static final String PYTHON_ARCHIVES_KEY = "PYTHON_ARCHIVES"; + public static final String PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY = "PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE"; + public static final Integer PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT = 10000; + public static final String PYTHON_FN_EXECUTION_BUNDLE_SIZE_KEY = "PYTHON_FN_EXECUTION_BUNDLE_SIZE"; + public static final Integer PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT = 100000; + public static final String PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY = "PYTHON_FN_EXECUTION_BUNDLE_TIME"; + public static final long PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT = 1000; } diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesEmptyException.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesEmptyException.java new file mode 100644 index 000000000..e849dc208 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesEmptyException.java @@ -0,0 +1,17 @@ +package io.odpf.dagger.functions.exceptions; + +/** + * The type Python files empty exception. + */ +public class PythonFilesEmptyException extends RuntimeException { + + /** + * Instantiates a new Python files empty exception. + * + * @param message the message + */ + public PythonFilesEmptyException(String message) { + super(message); + } + +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesFormatException.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesFormatException.java new file mode 100644 index 000000000..02771fda6 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/exceptions/PythonFilesFormatException.java @@ -0,0 +1,17 @@ +package io.odpf.dagger.functions.exceptions; + +/** + * The type Python files format exception. + */ +public class PythonFilesFormatException extends RuntimeException { + + /** + * Instantiates a new Python files format exception. + * + * @param message the message + */ + public PythonFilesFormatException(String message) { + super(message); + } + +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfig.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfig.java new file mode 100644 index 000000000..2f33f75c8 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfig.java @@ -0,0 +1,110 @@ +package io.odpf.dagger.functions.udfs.python; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; +import io.odpf.dagger.common.configuration.Configuration; +import lombok.Getter; + +import static io.odpf.dagger.functions.common.Constants.*; + +/** + * The type Python udf config. + */ +public class PythonUdfConfig { + private static final Gson GSON = new GsonBuilder() + .enableComplexMapKeySerialization() + .setPrettyPrinting() + .create(); + + @SerializedName(PYTHON_FILES_KEY) + private String pythonFiles; + + @SerializedName(PYTHON_REQUIREMENTS_KEY) + @Getter + private String pythonRequirements; + + @SerializedName(PYTHON_ARCHIVES_KEY) + private String pythonArchives; + + @SerializedName(PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY) + private Integer pythonArrowBatchSize; + + @SerializedName(PYTHON_FN_EXECUTION_BUNDLE_SIZE_KEY) + private Integer pythonBundleSize; + + @SerializedName(PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY) + private Long pythonBundleTime; + + /** + * Gets python files. + * + * @return the python files + */ + public String getPythonFiles() { + if (pythonFiles != null) { + return pythonFiles.replaceAll("\\s+", ""); + } + return null; + } + + /** + * Gets python archives. + * + * @return the python archives + */ + public String getPythonArchives() { + if (pythonArchives != null) { + return pythonArchives.replaceAll("\\s+", ""); + } + return null; + } + + /** + * Gets python arrow batch size. + * + * @return the python arrow batch size + */ + public int getPythonArrowBatchSize() { + if (pythonArrowBatchSize == null) { + return PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT; + } + return pythonArrowBatchSize; + } + + /** + * Gets python bundle size. + * + * @return the python bundle size + */ + public int getPythonBundleSize() { + if (pythonBundleSize == null) { + return PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT; + } + return pythonBundleSize; + } + + /** + * Gets python bundle time. + * + * @return the python bundle time + */ + public long getPythonBundleTime() { + if (pythonBundleTime == null) { + return PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT; + } + return pythonBundleTime; + } + + /** + * Parse python udf config. + * + * @param configuration the configuration + * @return the python udf config + */ + public static PythonUdfConfig parse(Configuration configuration) { + String jsonString = configuration.getString(PYTHON_UDF_CONFIG, ""); + + return GSON.fromJson(jsonString, PythonUdfConfig.class); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfManager.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfManager.java new file mode 100644 index 000000000..59fb99d2a --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/PythonUdfManager.java @@ -0,0 +1,81 @@ +package io.odpf.dagger.functions.udfs.python; + +import io.odpf.dagger.functions.exceptions.PythonFilesEmptyException; +import io.odpf.dagger.functions.udfs.python.file.type.FileType; +import io.odpf.dagger.functions.udfs.python.file.type.FileTypeFactory; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The type Python udf manager. + */ +public class PythonUdfManager { + + private StreamTableEnvironment tableEnvironment; + private PythonUdfConfig pythonUdfConfig; + + /** + * Instantiates a new Python udf manager. + * + * @param tableEnvironment the table environment + * @param pythonUdfConfig the python udf config + */ + public PythonUdfManager(StreamTableEnvironment tableEnvironment, PythonUdfConfig pythonUdfConfig) { + this.tableEnvironment = tableEnvironment; + this.pythonUdfConfig = pythonUdfConfig; + } + + /** + * Register python functions. + */ + public void registerPythonFunctions() throws IOException { + String inputFiles = pythonUdfConfig.getPythonFiles(); + String[] pythonFiles; + if (inputFiles != null) { + registerPythonConfig(); + pythonFiles = inputFiles.split(","); + } else { + throw new PythonFilesEmptyException("Python files can not be null"); + } + + for (String pythonFile : pythonFiles) { + FileType fileType = FileTypeFactory.getFileType(pythonFile); + List fileNames = fileType.getFileNames(); + List sqlQueries = createQuery(fileNames); + executeSql(sqlQueries); + } + } + + private void registerPythonConfig() { + if (pythonUdfConfig.getPythonRequirements() != null) { + tableEnvironment.getConfig().getConfiguration().setString("python.requirements", pythonUdfConfig.getPythonRequirements()); + } + if (pythonUdfConfig.getPythonArchives() != null) { + tableEnvironment.getConfig().getConfiguration().setString("python.archives", pythonUdfConfig.getPythonArchives()); + } + tableEnvironment.getConfig().getConfiguration().setString("python.files", pythonUdfConfig.getPythonFiles()); + tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.arrow.batch.size", pythonUdfConfig.getPythonArrowBatchSize()); + tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.bundle.size", pythonUdfConfig.getPythonBundleSize()); + tableEnvironment.getConfig().getConfiguration().setLong("python.fn-execution.bundle.time", pythonUdfConfig.getPythonBundleTime()); + } + + private void executeSql(List sqlQueries) { + for (String query : sqlQueries) { + tableEnvironment.executeSql(query); + } + } + + private List createQuery(List fileNames) { + List sqlQueries = new ArrayList<>(); + for (String fileName : fileNames) { + fileName = fileName.replace(".py", "").replace("/", "."); + String functionName = fileName.substring(fileName.lastIndexOf(".") + 1); + String query = "CREATE TEMPORARY FUNCTION " + functionName.toUpperCase() + " AS '" + fileName + "." + functionName + "' LANGUAGE PYTHON"; + sqlQueries.add(query); + } + return sqlQueries; + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSource.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSource.java new file mode 100644 index 000000000..915f81d32 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSource.java @@ -0,0 +1,16 @@ +package io.odpf.dagger.functions.udfs.python.file.source; + +import java.io.IOException; + +/** + * The interface File source. + */ +public interface FileSource { + + /** + * Get object file byte [ ]. + * + * @return the byte [ ] + */ + byte[] getObjectFile() throws IOException; +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactory.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactory.java new file mode 100644 index 000000000..8053f19fd --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactory.java @@ -0,0 +1,29 @@ +package io.odpf.dagger.functions.udfs.python.file.source; + +import io.odpf.dagger.functions.udfs.python.file.source.gcs.GcsFileSource; +import io.odpf.dagger.functions.udfs.python.file.source.local.LocalFileSource; + +/** + * The type File source factory. + */ +public class FileSourceFactory { + + /** + * Gets file source. + * + * @param pythonFile the python file + * @return the file source + */ + public static FileSource getFileSource(String pythonFile) { + if ("GS".equals(getFileSourcePrefix(pythonFile))) { + return new GcsFileSource(pythonFile); + } else { + return new LocalFileSource(pythonFile); + } + } + + private static String getFileSourcePrefix(String pythonFile) { + String[] files = pythonFile.split("://"); + return files[0].toUpperCase(); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClient.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClient.java new file mode 100644 index 000000000..e3bd69a3f --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClient.java @@ -0,0 +1,56 @@ +package io.odpf.dagger.functions.udfs.python.file.source.gcs; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** + * The type Gcs client. + */ +public class GcsClient { + + private Storage storage; + + /** + * Instantiates a new Gcs client. + */ + public GcsClient() { + + if (storage == null) { + storage = StorageOptions.newBuilder() + .build().getService(); + } + } + + /** + * Instantiates a new Gcs client. + * This constructor used for unit test purposes. + * + * @param storage the storage + */ + public GcsClient(Storage storage) { + this.storage = storage; + } + + /** + * Get file byte [ ]. + * + * @param pythonFile the python file + * @return the byte [ ] + */ + public byte[] getFile(String pythonFile) { + List file = Arrays.asList(pythonFile.replace("gs://", "").split("/")); + + String bucketName = file.get(0); + String objectName = file.stream().skip(1).collect(Collectors.joining("/")); + + Blob blob = storage.get(BlobId.of(bucketName, objectName)); + + return blob.getContent(); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSource.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSource.java new file mode 100644 index 000000000..d7b7a5bef --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSource.java @@ -0,0 +1,51 @@ +package io.odpf.dagger.functions.udfs.python.file.source.gcs; + +import io.odpf.dagger.functions.udfs.python.file.source.FileSource; + + +/** + * The type Gcs file source. + */ +public class GcsFileSource implements FileSource { + + private GcsClient gcsClient; + private String pythonFile; + + /** + * Instantiates a new Gcs file source. + * + * @param pythonFile the python file + */ + public GcsFileSource(String pythonFile) { + this.pythonFile = pythonFile; + } + + /** + * Instantiates a new Gcs file source. + * This constructor used for unit test purposes. + * + * @param pythonFile the python file + * @param gcsClient the gcs client + */ + public GcsFileSource(String pythonFile, GcsClient gcsClient) { + this.pythonFile = pythonFile; + this.gcsClient = gcsClient; + } + + @Override + public byte[] getObjectFile() { + return getGcsClient().getFile(pythonFile); + } + + /** + * Gets gcs client. + * + * @return the gcs client + */ + private GcsClient getGcsClient() { + if (this.gcsClient == null) { + this.gcsClient = new GcsClient(); + } + return this.gcsClient; + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSource.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSource.java new file mode 100644 index 000000000..3f3aff624 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSource.java @@ -0,0 +1,29 @@ +package io.odpf.dagger.functions.udfs.python.file.source.local; + +import io.odpf.dagger.functions.udfs.python.file.source.FileSource; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +/** + * The type Local file source. + */ +public class LocalFileSource implements FileSource { + + private String pythonFile; + + /** + * Instantiates a new Local file source. + * + * @param pythonFile the python file + */ + public LocalFileSource(String pythonFile) { + this.pythonFile = pythonFile; + } + + @Override + public byte[] getObjectFile() throws IOException { + return Files.readAllBytes(Paths.get(pythonFile)); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileType.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileType.java new file mode 100644 index 000000000..1123b42f1 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileType.java @@ -0,0 +1,17 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import java.io.IOException; +import java.util.List; + +/** + * The interface File type. + */ +public interface FileType { + + /** + * Gets file names. + * + * @return the file names + */ + List getFileNames() throws IOException; +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactory.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactory.java new file mode 100644 index 000000000..83543ad19 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactory.java @@ -0,0 +1,34 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.exceptions.PythonFilesFormatException; +import io.odpf.dagger.functions.udfs.python.file.source.FileSource; +import io.odpf.dagger.functions.udfs.python.file.source.FileSourceFactory; + +/** + * The type File type factory. + */ +public class FileTypeFactory { + + /** + * Gets file type. + * + * @param pythonFile the python file + * @return the file type + */ + public static FileType getFileType(String pythonFile) { + FileSource fileSource = FileSourceFactory.getFileSource(pythonFile); + switch (getFileTypeFormat(pythonFile)) { + case "PY": + return new PythonFileType(pythonFile); + case "ZIP": + return new ZipFileType(fileSource); + default: + throw new PythonFilesFormatException("Python files should be in .py or .zip format"); + } + } + + private static String getFileTypeFormat(String pythonFile) { + String[] files = pythonFile.split("\\."); + return files[files.length - 1].toUpperCase(); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileType.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileType.java new file mode 100644 index 000000000..2bc993e04 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileType.java @@ -0,0 +1,33 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.exceptions.PythonFilesEmptyException; + +import java.util.Collections; +import java.util.List; + +/** + * The type Python file type. + */ +public class PythonFileType implements FileType { + + private String pythonFile; + + /** + * Instantiates a new Python file type. + * + * @param pythonFile the python file + */ + public PythonFileType(String pythonFile) { + this.pythonFile = pythonFile; + } + + @Override + public List getFileNames() { + if (pythonFile == null) { + throw new PythonFilesEmptyException("Python files can not be null"); + } + String name = pythonFile.substring(pythonFile.lastIndexOf('/') + 1); + + return Collections.singletonList(name); + } +} diff --git a/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileType.java b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileType.java new file mode 100644 index 000000000..31fc00cd0 --- /dev/null +++ b/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileType.java @@ -0,0 +1,47 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.udfs.python.file.source.FileSource; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +/** + * The type Zip file type. + */ +public class ZipFileType implements FileType { + + private FileSource fileSource; + + public ZipFileType(FileSource fileSource) { + this.fileSource = fileSource; + } + + @Override + public List getFileNames() throws IOException { + byte[] object = fileSource.getObjectFile(); + + ZipInputStream zi = new ZipInputStream(new ByteArrayInputStream(object)); + ZipEntry zipEntry; + List entries = new ArrayList<>(); + while ((zipEntry = zi.getNextEntry()) != null) { + entries.add(zipEntry); + } + + List fileNames = new ArrayList<>(); + for (ZipEntry entry : entries) { + String name = entry.getName(); + if (isPythonFile(name)) { + fileNames.add(name); + } + } + return fileNames; + } + + private boolean isPythonFile(String fileName) { + return fileName.endsWith(".py"); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfigTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfigTest.java new file mode 100644 index 000000000..01be1b7af --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfConfigTest.java @@ -0,0 +1,82 @@ +package io.odpf.dagger.functions.udfs.python; + +import io.odpf.dagger.common.configuration.Configuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import static io.odpf.dagger.functions.common.Constants.*; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class PythonUdfConfigTest { + + @Mock + private Configuration configuration; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldParseConfig() { + String pythonJsonConfig = "{ \"PYTHON_FILES\": \"/path/to/function.zip\", \"PYTHON_ARCHIVES\": \"/path/to/file.txt\", \"PYTHON_REQUIREMENTS\": \"requirements.txt\", \"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE\": \"10000\", \"PYTHON_FN_EXECUTION_BUNDLE_SIZE\": \"100000\", \"PYTHON_FN_EXECUTION_BUNDLE_TIME\": \"1000\" }"; + + when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig); + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + + Assert.assertNotNull(pythonUdfConfig); + Assert.assertEquals(pythonUdfConfig.getPythonFiles(), "/path/to/function.zip"); + Assert.assertEquals(pythonUdfConfig.getPythonArchives(), "/path/to/file.txt"); + Assert.assertEquals(pythonUdfConfig.getPythonRequirements(), "requirements.txt"); + Assert.assertEquals(pythonUdfConfig.getPythonArrowBatchSize(), 10000); + Assert.assertEquals(pythonUdfConfig.getPythonBundleSize(), 100000); + Assert.assertEquals(pythonUdfConfig.getPythonBundleTime(), 1000); + } + + @Test + public void shouldUseDefaultValueIfConfigIsNotGiven() { + String pythonJsonConfig = "{ \"PYTHON_FILES\": \"/path/to/function.zip\", \"PYTHON_ARCHIVES\": \"/path/to/file.txt\", \"PYTHON_REQUIREMENTS\": \"requirements.txt\" }"; + + when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig); + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + + Assert.assertEquals(pythonUdfConfig.getPythonArrowBatchSize(), 10000); + Assert.assertEquals(pythonUdfConfig.getPythonBundleSize(), 100000); + Assert.assertEquals(pythonUdfConfig.getPythonBundleTime(), 1000); + } + + @Test + public void shouldReturnNullIfPythonFilesConfigIsNotGiven() { + String pythonJsonConfig = "{\"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE\": \"10000\", \"PYTHON_FN_EXECUTION_BUNDLE_SIZE\": \"100000\", \"PYTHON_FN_EXECUTION_BUNDLE_TIME\": \"1000\"}"; + + when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig); + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + + Assert.assertNull(pythonUdfConfig.getPythonFiles()); + Assert.assertNull(pythonUdfConfig.getPythonArchives()); + Assert.assertNull(pythonUdfConfig.getPythonRequirements()); + } + + @Test + public void shouldRemoveWhitespaceInPythonFilesConfig() { + String pythonJsonConfig = "{ \"PYTHON_FILES\": \" /path/to/function.zip, /path/to/files/test.py \"}"; + + when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig); + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + + Assert.assertEquals(pythonUdfConfig.getPythonFiles(), "/path/to/function.zip,/path/to/files/test.py"); + } + + @Test + public void shouldRemoveWhitespaceInPythonArchivesConfig() { + String pythonJsonConfig = "{ \"PYTHON_FILES\": \"/path/to/function.zip\", \"PYTHON_ARCHIVES\": \" /path/to/data.zip, /path/to/files/second_data.zip \"}"; + + when(configuration.getString(PYTHON_UDF_CONFIG, "")).thenReturn(pythonJsonConfig); + PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration); + + Assert.assertEquals(pythonUdfConfig.getPythonArchives(), "/path/to/data.zip,/path/to/files/second_data.zip"); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfManagerTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfManagerTest.java new file mode 100644 index 000000000..d7a210db6 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/PythonUdfManagerTest.java @@ -0,0 +1,190 @@ +package io.odpf.dagger.functions.udfs.python; + +import io.odpf.dagger.functions.exceptions.PythonFilesFormatException; +import io.odpf.dagger.functions.exceptions.PythonFilesEmptyException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.Mock; + +import java.io.File; +import java.io.IOException; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import static org.mockito.MockitoAnnotations.initMocks; + +public class PythonUdfManagerTest { + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Mock + private StreamTableEnvironment tableEnvironment; + + @Mock + private PythonUdfConfig pythonUdfConfig; + + @Mock + private TableConfig tableConfig; + + @Mock + private Configuration configuration; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldRegisterPythonUdfConfig() throws IOException { + String pathFile = getPath("python_udf.zip"); + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(pathFile); + when(pythonUdfConfig.getPythonArchives()).thenReturn("/path/to/file.txt"); + when(pythonUdfConfig.getPythonRequirements()).thenReturn("requirements.txt"); + when(pythonUdfConfig.getPythonArrowBatchSize()).thenReturn(10000); + when(pythonUdfConfig.getPythonBundleSize()).thenReturn(100000); + when(pythonUdfConfig.getPythonBundleTime()).thenReturn(1000L); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + + verify(configuration, times(1)).setString("python.files", pathFile); + verify(configuration, times(1)).setString("python.archives", "/path/to/file.txt"); + verify(configuration, times(1)).setString("python.requirements", "requirements.txt"); + verify(configuration, times(1)).setInteger("python.fn-execution.arrow.batch.size", 10000); + verify(configuration, times(1)).setInteger("python.fn-execution.bundle.size", 100000); + verify(configuration, times(1)).setLong("python.fn-execution.bundle.time", 1000); + } + + @Test + public void shouldNotRegisterConfigIfNotSet() throws IOException { + String pathFile = getPath("python_udf.zip"); + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(pathFile); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + + verify(configuration, times(1)).setString("python.files", pathFile); + verify(configuration, times(0)).setString("python.archives", "/path/to/file.txt"); + verify(configuration, times(0)).setString("python.requirements", "requirements.txt"); + } + + @Test + public void shouldRegisterPythonUdfFromPyFile() throws IOException { + String pathFile = getPath("test_udf.py"); + String sqlRegisterUdf = "CREATE TEMPORARY FUNCTION TEST_UDF AS 'test_udf.test_udf' LANGUAGE PYTHON"; + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(pathFile); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + + verify(configuration, times(1)).setString("python.files", pathFile); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterUdf); + } + + @Test + public void shouldOnlyExecutePyFormatInsideZipFile() throws IOException { + String pathFile = getPath("python_udf.zip"); + + String sqlRegisterFirstUdf = "CREATE TEMPORARY FUNCTION MULTIPLY AS 'python_udf.scalar.multiply.multiply' LANGUAGE PYTHON"; + String sqlRegisterSecondUdf = "CREATE TEMPORARY FUNCTION ADD AS 'python_udf.scalar.add.add' LANGUAGE PYTHON"; + String sqlRegisterThirdUdf = "CREATE TEMPORARY FUNCTION SUBSTRACT AS 'python_udf.vectorized.substract.substract' LANGUAGE PYTHON"; + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(pathFile); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + + verify(configuration, times(1)).setString("python.files", pathFile); + verify(tableEnvironment, times(0)).executeSql(sqlRegisterFirstUdf); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterSecondUdf); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterThirdUdf); + } + + @Test + public void shouldRegisterPythonUdfFromPyAndZipFile() throws IOException { + String zipPathFile = getPath("python_udf.zip"); + String pyPathFile = getPath("test_udf.py"); + + String sqlRegisterFirstUdf = "CREATE TEMPORARY FUNCTION ADD AS 'python_udf.scalar.add.add' LANGUAGE PYTHON"; + String sqlRegisterSecondUdf = "CREATE TEMPORARY FUNCTION SUBSTRACT AS 'python_udf.vectorized.substract.substract' LANGUAGE PYTHON"; + String sqlRegisterThirdUdf = "CREATE TEMPORARY FUNCTION TEST_UDF AS 'test_udf.test_udf' LANGUAGE PYTHON"; + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(zipPathFile + "," + pyPathFile); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + + verify(configuration, times(1)).setString("python.files", zipPathFile + "," + pyPathFile); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterFirstUdf); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterSecondUdf); + verify(tableEnvironment, times(1)).executeSql(sqlRegisterThirdUdf); + } + + @Test + public void shouldThrowExceptionIfPythonFilesNotInZipOrPyFormat() throws IOException { + expectedEx.expect(PythonFilesFormatException.class); + expectedEx.expectMessage("Python files should be in .py or .zip format"); + + File file = File.createTempFile("test_file", ".txt"); + file.deleteOnExit(); + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn("test_file.txt"); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + } + + @Test + public void shouldThrowExceptionIfPythonFilesIsEmpty() throws IOException { + expectedEx.expect(PythonFilesFormatException.class); + expectedEx.expectMessage("Python files should be in .py or .zip format"); + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + when(pythonUdfConfig.getPythonFiles()).thenReturn(""); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + } + + @Test + public void shouldThrowExceptionIfPythonFilesIsNull() throws IOException { + expectedEx.expect(PythonFilesEmptyException.class); + expectedEx.expectMessage("Python files can not be null"); + + when(tableEnvironment.getConfig()).thenReturn(tableConfig); + when(tableConfig.getConfiguration()).thenReturn(configuration); + + PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig); + pythonUdfManager.registerPythonFunctions(); + } + + private String getPath(String filename) { + ClassLoader classLoader = getClass().getClassLoader(); + + return classLoader.getResource(filename).getPath(); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java new file mode 100644 index 000000000..2cd5a472f --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/FileSourceFactoryTest.java @@ -0,0 +1,27 @@ +package io.odpf.dagger.functions.udfs.python.file.source; + +import io.odpf.dagger.functions.udfs.python.file.source.gcs.GcsFileSource; +import io.odpf.dagger.functions.udfs.python.file.source.local.LocalFileSource; +import org.junit.Assert; +import org.junit.Test; + +public class FileSourceFactoryTest { + + @Test + public void shouldGetLocalFileSource() { + String pythonFile = "/path/to/file/test_function.py"; + + FileSource fileSource = FileSourceFactory.getFileSource(pythonFile); + + Assert.assertTrue(fileSource instanceof LocalFileSource); + } + + @Test + public void shouldGetGcsFileSource() { + String pythonFile = "gs://bucket-name/path/to/file/test_function.py"; + + FileSource fileSource = FileSourceFactory.getFileSource(pythonFile); + + Assert.assertTrue(fileSource instanceof GcsFileSource); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClientTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClientTest.java new file mode 100644 index 000000000..5c0c7950e --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsClientTest.java @@ -0,0 +1,47 @@ +package io.odpf.dagger.functions.udfs.python.file.source.gcs; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.util.Arrays; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +public class GcsClientTest { + + @Mock + private Storage storage; + + @Mock + private Blob blob; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() { + + String pythonFile = "gs://bucket_name/path/to/file/python_udf.zip"; + String bucketName = "bucket_name"; + String objectName = "path/to/file/python_udf.zip"; + String expectedValue = Arrays.toString("objectFile".getBytes()); + + when(storage.get(BlobId.of(bucketName, objectName))).thenReturn(blob); + when(blob.getContent()).thenReturn("objectFile".getBytes()); + + GcsClient gcsClient = new GcsClient(storage); + byte[] actualValue = gcsClient.getFile(pythonFile); + + verify(storage, times(1)).get(BlobId.of(bucketName, objectName)); + verify(blob, times(1)).getContent(); + Assert.assertEquals(expectedValue, Arrays.toString(actualValue)); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSourceTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSourceTest.java new file mode 100644 index 000000000..d72d6a8f2 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/gcs/GcsFileSourceTest.java @@ -0,0 +1,38 @@ +package io.odpf.dagger.functions.udfs.python.file.source.gcs; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class GcsFileSourceTest { + + @Mock + private GcsClient gcsClient; + + @Before + public void setup() { + initMocks(this); + } + + @Test + public void shouldGetObjectFile() throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); + String pythonFile = classLoader.getResource("python_udf.zip").getFile(); + byte[] expectedObject = Files.readAllBytes(Paths.get(pythonFile)); + + when(gcsClient.getFile(pythonFile)).thenReturn(expectedObject); + GcsFileSource gcsFileSource = new GcsFileSource(pythonFile, gcsClient); + + byte[] actualObject = gcsFileSource.getObjectFile(); + + Assert.assertEquals(expectedObject, actualObject); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSourceTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSourceTest.java new file mode 100644 index 000000000..4094e186a --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/source/local/LocalFileSourceTest.java @@ -0,0 +1,28 @@ +package io.odpf.dagger.functions.udfs.python.file.source.local; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +public class LocalFileSourceTest { + + @Test + public void shouldGetObjectFile() throws IOException { + ClassLoader classLoader = getClass().getClassLoader(); + + String pythonFile = classLoader.getResource("python_udf.zip").getPath(); + + byte[] object = Files.readAllBytes(Paths.get(pythonFile)); + String stringObject = new String(object, StandardCharsets.UTF_8); + + LocalFileSource localFileSource = new LocalFileSource(pythonFile); + byte[] actualObject = localFileSource.getObjectFile(); + + String actualStringObject = new String(actualObject, StandardCharsets.UTF_8); + Assert.assertEquals(stringObject, actualStringObject); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactoryTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactoryTest.java new file mode 100644 index 000000000..67f58cf63 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/FileTypeFactoryTest.java @@ -0,0 +1,41 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.exceptions.PythonFilesFormatException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class FileTypeFactoryTest { + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Test + public void shouldGetPythonFileType() { + String pythonFile = "/path/to/file/test_udf.py"; + + FileType fileType = FileTypeFactory.getFileType(pythonFile); + + Assert.assertTrue(fileType instanceof PythonFileType); + } + + @Test + public void shouldGetZipFileType() { + String pythonFile = "/path/to/file/python_udf.zip"; + + FileType fileType = FileTypeFactory.getFileType(pythonFile); + + Assert.assertTrue(fileType instanceof ZipFileType); + } + + @Test + public void shouldThrowExceptionIfPythonFilesNotInZipOrPyFormat() { + expectedEx.expect(PythonFilesFormatException.class); + expectedEx.expectMessage("Python files should be in .py or .zip format"); + + String pythonFile = "/path/to/file/test_file.txt"; + + FileTypeFactory.getFileType(pythonFile); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileTypeTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileTypeTest.java new file mode 100644 index 000000000..7ff4e9c69 --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/PythonFileTypeTest.java @@ -0,0 +1,45 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.exceptions.PythonFilesEmptyException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.List; + +public class PythonFileTypeTest { + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Test + public void shouldGetFileNames() { + ClassLoader classLoader = getClass().getClassLoader(); + String pythonFile = classLoader.getResource("test_udf.py").getPath(); + + PythonFileType pythonFileType = new PythonFileType(pythonFile); + List fileNames = pythonFileType.getFileNames(); + + Assert.assertEquals("[test_udf.py]", fileNames.toString()); + } + + @Test + public void shouldGetEmptyFileNamesIfPythonFilesIsEmpty() { + String pythonFile = ""; + + PythonFileType pythonFileType = new PythonFileType(pythonFile); + List fileNames = pythonFileType.getFileNames(); + + Assert.assertEquals("[]", fileNames.toString()); + } + + @Test + public void shouldThrowNullPointerExceptionIfPythonFilesIsNull() { + expectedEx.expect(PythonFilesEmptyException.class); + expectedEx.expectMessage("Python files can not be null"); + + PythonFileType pythonFileType = new PythonFileType(null); + pythonFileType.getFileNames(); + } +} diff --git a/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileTypeTest.java b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileTypeTest.java new file mode 100644 index 000000000..99acd37ae --- /dev/null +++ b/dagger-functions/src/test/java/io/odpf/dagger/functions/udfs/python/file/type/ZipFileTypeTest.java @@ -0,0 +1,72 @@ +package io.odpf.dagger.functions.udfs.python.file.type; + +import io.odpf.dagger.functions.udfs.python.file.source.gcs.GcsFileSource; +import io.odpf.dagger.functions.udfs.python.file.source.local.LocalFileSource; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.*; + +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.initMocks; + +public class ZipFileTypeTest { + + @Mock + private GcsFileSource gcsFileSource; + + @Mock + private LocalFileSource localFileSource; + + private byte[] zipInBytes; + + @Before + public void setup() throws IOException { + initMocks(this); + ClassLoader classLoader = getClass().getClassLoader(); + File file = new File(Objects.requireNonNull(classLoader.getResource("python_udf.zip")).getFile()); + zipInBytes = Files.readAllBytes(file.toPath()); + } + + @Test + public void shouldGetFileNamesFromLocalZip() throws IOException { + + when(localFileSource.getObjectFile()).thenReturn(zipInBytes); + + ZipFileType zipFileType = new ZipFileType(localFileSource); + List fileNames = zipFileType.getFileNames(); + + Assert.assertEquals("[python_udf/scalar/add.py, python_udf/vectorized/substract.py]", fileNames.toString()); + } + + @Test + public void shouldGetFileNamesFromGcsZip() throws IOException { + + when(gcsFileSource.getObjectFile()).thenReturn(zipInBytes); + + ZipFileType zipFileType = new ZipFileType(gcsFileSource); + List fileNames = zipFileType.getFileNames(); + + Assert.assertEquals("[python_udf/scalar/add.py, python_udf/vectorized/substract.py]", fileNames.toString()); + } + + @Test + public void shouldGetEmptyFileNamesIfZipFileNotContainPyFile() throws IOException { + + ClassLoader classLoader = getClass().getClassLoader(); + File file = new File(Objects.requireNonNull(classLoader.getResource("test_no_py.zip")).getFile()); + zipInBytes = Files.readAllBytes(file.toPath()); + + when(gcsFileSource.getObjectFile()).thenReturn(zipInBytes); + + ZipFileType zipFileType = new ZipFileType(gcsFileSource); + List fileNames = zipFileType.getFileNames(); + + Assert.assertEquals("[]", fileNames.toString()); + } +} diff --git a/dagger-functions/src/test/resources/python_udf.zip b/dagger-functions/src/test/resources/python_udf.zip new file mode 100644 index 000000000..d55b8d1e2 Binary files /dev/null and b/dagger-functions/src/test/resources/python_udf.zip differ diff --git a/dagger-functions/src/test/resources/test_no_py.zip b/dagger-functions/src/test/resources/test_no_py.zip new file mode 100644 index 000000000..ffcfef40f Binary files /dev/null and b/dagger-functions/src/test/resources/test_no_py.zip differ diff --git a/dagger-functions/src/test/resources/test_udf.py b/dagger-functions/src/test/resources/test_udf.py new file mode 100644 index 000000000..39e64f8be --- /dev/null +++ b/dagger-functions/src/test/resources/test_udf.py @@ -0,0 +1,7 @@ +from pyflink.table import DataTypes +from pyflink.table.udf import udf + + +@udf(result_type=DataTypes.STRING()) +def test_udf(text: str): + return text + "_added_text" diff --git a/dagger-py-functions/data/sample_data.txt b/dagger-py-functions/data/sample_data.txt new file mode 100644 index 000000000..157f4b043 --- /dev/null +++ b/dagger-py-functions/data/sample_data.txt @@ -0,0 +1 @@ +sample_text \ No newline at end of file diff --git a/dagger-py-functions/requirements.txt b/dagger-py-functions/requirements.txt new file mode 100644 index 000000000..64e47665c --- /dev/null +++ b/dagger-py-functions/requirements.txt @@ -0,0 +1,2 @@ +pytest==7.1.2 +flake8==4.0.1 \ No newline at end of file diff --git a/dagger-py-functions/tests/__init__.py b/dagger-py-functions/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dagger-py-functions/tests/udfs/__init__.py b/dagger-py-functions/tests/udfs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dagger-py-functions/tests/udfs/scalar/__init__.py b/dagger-py-functions/tests/udfs/scalar/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dagger-py-functions/tests/udfs/scalar/multiply_test.py b/dagger-py-functions/tests/udfs/scalar/multiply_test.py new file mode 100644 index 000000000..12aced4ef --- /dev/null +++ b/dagger-py-functions/tests/udfs/scalar/multiply_test.py @@ -0,0 +1,6 @@ +from udfs.scalar.multiply import multiply + + +def testMultiply(): + value = multiply._func + assert value(5,10) == 50 diff --git a/dagger-py-functions/tests/udfs/scalar/sample_test.py b/dagger-py-functions/tests/udfs/scalar/sample_test.py new file mode 100644 index 000000000..e6c9dc7d9 --- /dev/null +++ b/dagger-py-functions/tests/udfs/scalar/sample_test.py @@ -0,0 +1,6 @@ +from udfs.scalar.sample import sample + + +def testSample(): + value = sample._func + assert value("input_text_") == "input_text_sample_text" diff --git a/dagger-py-functions/udfs/scalar/multiply.py b/dagger-py-functions/udfs/scalar/multiply.py new file mode 100644 index 000000000..4078293c5 --- /dev/null +++ b/dagger-py-functions/udfs/scalar/multiply.py @@ -0,0 +1,7 @@ +from pyflink.table import DataTypes +from pyflink.table.udf import udf + + +@udf(result_type=DataTypes.FLOAT()) +def multiply(i, j): + return i * j \ No newline at end of file diff --git a/dagger-py-functions/udfs/scalar/sample.py b/dagger-py-functions/udfs/scalar/sample.py new file mode 100644 index 000000000..99103d4bc --- /dev/null +++ b/dagger-py-functions/udfs/scalar/sample.py @@ -0,0 +1,9 @@ +from pyflink.table import DataTypes +from pyflink.table.udf import udf + + +@udf(result_type=DataTypes.STRING()) +def sample(text): + file = open("data/sample_data.txt", "r") + data = file.read() + return text + data diff --git a/docs/docs/contribute/add_transformer.md b/docs/docs/contribute/add_transformer.md index 9368f9588..8c7463d6e 100644 --- a/docs/docs/contribute/add_transformer.md +++ b/docs/docs/contribute/add_transformer.md @@ -12,7 +12,7 @@ For adding custom Transformers follow these steps - To define a new Transformer implement Transformer interface. The contract of Transformers is defined [here](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/core/Transformer.java). -- Since an input DataStream is available in Transformer, all the Flink supported operators which transform `DataStream -> DataStream` can be applied/used by default for the transformations. Operators are how Flink exposes classic Map-reduce type functionalities. Read more about Flink Operators [here](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/). +- Since an input DataStream is available in Transformer, all the Flink supported operators which transform `DataStream -> DataStream` can be applied/used by default for the transformations. Operators are how Flink exposes classic Map-reduce type functionalities. Read more about Flink Operators [here](https://ci.apache.org/projects/flink/flink-docs-release-1.14/dev/stream/operators/). - In the case of single Operator Transformation you can extend the desired Operator in the Transformer class itself. For example, follow this code of [HashTransformer](https://github.com/odpf/dagger/blob/main/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java). You can also define multiple chaining operators to Transform Data. diff --git a/docs/docs/contribute/add_udf.md b/docs/docs/contribute/add_udf.md index 419f9e93c..647607b79 100644 --- a/docs/docs/contribute/add_udf.md +++ b/docs/docs/contribute/add_udf.md @@ -4,21 +4,29 @@ Want a function to use in SQL which is not supported both by Flink and one of th `Note`: _Please go through the [Contribution guide](../contribute/contribution.md) to know about all the conventions and practices we tend to follow and to know about the contribution process to the dagger._ -For adding custom UDFs follow these steps +For adding custom UDFs follow these steps: -- Ensure none of the [built-in functions](https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/systemfunctions) or [existing UDF](../reference/udfs.md) suits your requirement. -- For adding a UDF, figure out which type of UDF you required. Flink supports three types of [User defined function](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/udfs.html). Choose one of them according to the requirement. +- Ensure none of the [built-in functions](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/functions/systemfunctions/) or [existing UDF](../reference/udfs.md) suits your requirement. -- For getting more insights on writing your UDF, follow [this](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/udfs.html) to create a UDF. It needs to be written in Java/Scala. +- For adding a UDF, figure out which type of UDF you required. Flink supports three types of [User defined function](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/functions/udfs/). Choose one of them according to the requirement. -- UDF need to be the `function-type` directory inside [this](https://github.com/odpf/dagger/tree/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs) on `dagger-functions` subproject. +- There are options for programming language you can choose for adding UDF, which is using Java, Scala and Python. -- Extend either of [ScalarUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/ScalarUdf.java), [TableUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/TableUdf.java) or [AggregateUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/AggregateUdf.java) from `dagger-common`. They are boilerplate contracts extending Flink UDF classes. These classes do some more preprocessing(like exposing some metrics) in the `open` method behind the scene. - -- Register the UDF in [this](https://github.com/odpf/dagger/blob/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/factories/FunctionFactory.java) class. This is required to let Flink know about your function. +- For adding UDF with Java/Scala: + - Follow [this]((https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/functions/udfs/)) for more insights on writing your UDF. + - UDF needs to be added in the `function-type` folder inside [this](https://github.com/odpf/dagger/tree/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs) on `dagger-functions` subproject. + - Extend either of [ScalarUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/ScalarUdf.java), [TableUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/TableUdf.java) or [AggregateUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/AggregateUdf.java) from `dagger-common`. They are boilerplate contracts extending Flink UDF classes. These classes do some more preprocessing(like exposing some metrics) in the `open` method behind the scene. + - Register the UDF in [this](https://github.com/odpf/dagger/blob/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/factories/FunctionFactory.java) class. This is required to let Flink know about your function. + - If you have some business-specific use-cases and you don't want to add UDFs to the open-sourced repo, you can have a separate local codebase for those UDFs. Those UDFs need to be registered in a similar class like the [`UDFFactory`](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/UdfFactory.java). Keep both the UDF classes and the factory class in the classpath of Dagger. Configure the fully qualified Factory class in the `FUNCTION_FACTORY_CLASSES` parameter and you will be able to use the desired UDF in your query. +- For adding UDF with Python: + - Follow [this]((https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/table/udfs/overview/)) for more insights on writing your UDF. + - UDF need to be added inside [this](https://github.com/odpf/dagger/tree/main/dagger-py-functions/udfs) on `dagger-py-functions` directory. + - Ensure that the filename and method name on the python functions is the same. This name will be registered by dagger as a function name which later can be used on the query. + - Ensure to add dependency needed for the python function on the [requirements.txt](https://github.com/odpf/dagger/tree/main/dagger-py-functions/requirements.txt) file. + - Add python unit test and the make sure the test is passed. + - If you have some business-specific use-cases and you don't want to add UDFs to the open-sourced repo, you can have a separate local codebase for those UDFs and specify that file on the python configuration. + - Bump up the version and raise a PR for the same. Also please add the registered function to the [list of udfs doc](../reference/udfs.md). -- If you have some business-specific use-cases and you don't want to add UDFs to the open-sourced repo, you can have a separate local codebase for those UDFs. Those UDFs need to be registered in a similar class like the [`UDFFactory`](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/UdfFactory.java). Keep both the UDF classes and the factory class in the classpath of Dagger. Configure the fully qualified Factory class in the `FUNCTION_FACTORY_CLASSES` parameter and you will be able to use the desired UDF in your query. - In the subsequent release of the dagger, your functions should be useable in the query. diff --git a/docs/docs/guides/use_udf.md b/docs/docs/guides/use_udf.md index e8ac5a8e7..a8e049513 100644 --- a/docs/docs/guides/use_udf.md +++ b/docs/docs/guides/use_udf.md @@ -26,6 +26,24 @@ Some of the use-cases can not be solved using Flink SQL & the Apache Calcite fun Maps zero or more values to multiple rows and each row may have multiple columns. -All the supported udfs present in the `dagger-functions` subproject in [this](https://github.com/odpf/dagger/tree/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs) directory. Follow this to find more details about the already supported UDFs in the dagger. +All the supported java udfs present in the `dagger-functions` subproject in [this](https://github.com/odpf/dagger/tree/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs) directory. + +All the supported python udfs present in the [dagger-py-functions](https://github.com/odpf/dagger/tree/main/dagger-py-functions/udfs/) directory. + +Follow [this](../reference/udfs.md) to find more details about the already supported UDFs in the dagger. If any of the predefined functions do not meet your requirement you can create your custom UDFs by extending some implementation. Follow [this](../contribute/add_udf.md) to add your custom UDFs in the dagger. + +## Python Environment Setup + +Python UDF execution requires Python version (3.6, 3.7 or 3.8) with PyFlink installed. + +PyFlink is available in PyPi and can be installed as follows: +``` +$ python -m pip install apache-flink==1.14.3 +``` + +To satisfy the PyFlink requirement regarding the Python environment version, you need to soft link python to point to your python3 interpreter: +``` +ln -s /usr/bin/python3 python +``` diff --git a/docs/docs/reference/configuration.md b/docs/docs/reference/configuration.md index e03cbe8ab..4130b3321 100644 --- a/docs/docs/reference/configuration.md +++ b/docs/docs/reference/configuration.md @@ -14,6 +14,8 @@ This page contains references for all the application configurations for Dagger. * [PreProcessor](configuration.md#preprocessor) * [PostProcessor](configuration.md#postprocessor) * [Telemetry](configuration.md#telemetry) +* [Python Udfs](configuration.md#python-udfs) + ### Generic @@ -496,4 +498,96 @@ Shutdown period of metric telemetry in milliseconds. * Example value: `10000` * Type: `optional` -* Default value: `10000` \ No newline at end of file +* Default value: `10000` + +### Python Udfs + +#### `PYTHON_UDF_ENABLE` + +Enable/Disable using python udf. + +* Example value: `10000` +* Type: `optional` +* Default value: `10000` + +#### `PYTHON_UDF_CONFIG` + +All the configuration need to use python udf. + +These following variables need to be configured: + +##### `PYTHON_FILES` + +Defines the path of python udf files. Currently only support for `.py` and `.zip` data type. Comma (',') could be used as the separator to specify multiple files. + +* Example value: `/path/to/files.zip` +* Type: `required` + +##### `PYTHON_ARCHIVES` + +Defines the path of files that used on the python udf. Only support for `.zip` data type. Comma (',') could be used as the separator to specify multiple archive files. +The archive files will be extracted to the working directory of python UDF worker. For each archive file, a target directory is specified. If the target directory name is specified, the archive file will be extracted to a directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. '#' could be used as the separator of the archive file path and the target directory name. + +Example: +* PYTHON_ARCHIVES=/path/to/data.zip + + You should set the path name to `data.zip/data/sample.txt` on the udf to be able open the files. + +* PYTHON_ARCHIVES=/path/to/data.zip#data + + You should set the path name to `data/sample.txt` on the udf to be able open the files. + +* Example how to use this, can be found in this [udf](https://github.com/odpf/dagger/tree/main/dagger-py-functions/udfs/scalar/sample.py) + +* Type: `optional` +* Default value: `(none)` + +##### `PYTHON_REQUIREMENTS` + +Defines the path of python dependency files. + +* Example value: `/path/to/requirements.txt` +* Type: `optional` +* Default value: `(none)` + +##### `PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE` + +The maximum number of elements to include in an arrow batch for python user-defined function execution. + +* Example value: `10000` +* Type: `optional` +* Default value: `10000` + +##### `PYTHON_FN_EXECUTION_BUNDLE_SIZE` + +The maximum number of elements to include in a bundle for python user-defined function execution. + +* Example value: `100000` +* Type: `optional` +* Default value: `100000` + +##### `PYTHON_FN_EXECUTION_BUNDLE_TIME` + +Sets the waiting timeout(in milliseconds) before processing a bundle for Python user-defined function execution. The timeout defines how long the elements of a bundle will be buffered before being processed. + +* Example value: `1000` +* Type: `optional` +* Default value: `1000` + +##### Sample Configuration +``` +PYTHON_UDF_CONFIG = [ + { + "PYTHON_FILES": "/path/to/files.py", + "PYTHON_ARCHIVES": "/path/to/data.zip", + "PYTHON_REQUIREMENTS": "/path/to/requirements.txt", + "PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE": "10000", + "PYTHON_FN_EXECUTION_BUNDLE_SIZE": "100000", + "PYTHON_FN_EXECUTION_BUNDLE_TIME": "1000" + } +] +``` + +Find more details on python udf config [here](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-options). + + diff --git a/docs/docs/reference/udfs.md b/docs/docs/reference/udfs.md index 529cb2ee2..bc0e26b1d 100644 --- a/docs/docs/reference/udfs.md +++ b/docs/docs/reference/udfs.md @@ -1,8 +1,12 @@ # Udfs -This page contains references for all the custom udfs available on Dagger. +This page contains references for all the custom udfs available on Dagger. -## List of Udfs +The udfs on Dagger divided into two parts: +* [Java Udfs](udfs.md#list-of-java-udfs) +* [Python Udfs](udfs.md#list-of-python-udfs) + +## List of Java Udfs - [Scalar Functions](udfs.md#scalar-functions) - [ArrayAggregate](udfs.md#ArrayAggregate) @@ -795,3 +799,45 @@ LATERAL TABLE( `isOutlier` ) ``` + +## List of Python Udfs + +- [Scalar Functions](udfs.md#python-scalar-functions) + - [Sample](udfs.md#Sample) + - [Multiply](udfs.md#Multiply) +- [Aggregate Functions](udfs.md#python-aggregate-functions) +- [Table Functions](udfs.md#python-table-functions) + +### Python Scalar Functions + +#### Sample +* Contract: + * **String** `Sample(input_string)`. +* Functionality: + * This is one of sample python udfs. + * Adding extra string from [data/sample.txt](../../../dagger-py-functions/data/sample_data.txt) to the input_string. +* Example: + ``` + SELECT + sample(inputString) as input_with_additonal_text + FROM + data_stream + ``` + +#### Multiply +* Contract: + * **Float** `Multiply(input_number1, input_number2)`. +* Functionality: + * This is one of sample python udfs. + * Multiply two input numbers specified. +* Example: + ``` + SELECT + multiply(input_number1, input_number2) as multiply_result + FROM + data_stream + ``` + +### Python Aggregate Functions + +### Python Table Functions \ No newline at end of file diff --git a/version.txt b/version.txt index 1866a362b..d156ab466 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.2.9 +0.2.10 \ No newline at end of file