Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/python_release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: Python Package
on:
release:
types: [created]

jobs:
publishPythonZip:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Zip Python Udf
run: |
cd dagger-py-functions
zip -r python_udfs.zip udfs -x "*/__init__.py"
zip -jr data.zip data
zip -r dagger-py-functions.zip requirements.txt data.zip python_udfs.zip
- name: Upload Release
uses: ncipollo/release-action@v1
with:
artifacts: dagger-py-functions/dagger-py-functions.zip
allowUpdates: true
omitNameDuringUpdate: true
omitBodyDuringUpdate: true
omitPrereleaseDuringUpdate: true
token: ${{ secrets.GITHUB_TOKEN }}
27 changes: 27 additions & 0 deletions .github/workflows/python_validation.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Python Validation

on: push

jobs:
pythonValidation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.8
uses: actions/setup-python@v3
with:
python-version: '3.8'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install apache-flink==1.14.3
cd dagger-py-functions
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
- name: Test with pytest
run: |
cd dagger-py-functions
pytest --disable-warnings
4 changes: 4 additions & 0 deletions dagger-core/env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ METRIC_TELEMETRY_ENABLE=true
# == Others ==
FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.FunctionFactory
FLINK_ROWTIME_ATTRIBUTE_NAME=rowtime

# == Python Udf ==
PYTHON_UDF_ENABLE=false
PYTHON_UDF_CONFIG={"PYTHON_FILES":"/path/to/files.zip", "PYTHON_REQUIREMENTS": "requirements.txt", "PYTHON_FN_EXECUTION_BUNDLE_SIZE": "1000"}
13 changes: 12 additions & 1 deletion dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.odpf.dagger.core.source.Stream;
import io.odpf.dagger.core.source.StreamsFactory;
import io.odpf.dagger.core.utils.Constants;
import io.odpf.dagger.functions.udfs.python.PythonUdfConfig;
import io.odpf.dagger.functions.udfs.python.PythonUdfManager;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -28,12 +30,15 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.List;

import static io.odpf.dagger.core.utils.Constants.*;
import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_DEFAULT;
import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_KEY;
import static org.apache.flink.table.api.Expressions.$;

/**
Expand Down Expand Up @@ -138,7 +143,13 @@ private ApiExpression[] getApiExpressions(StreamInfo streamInfo) {
*
* @return the stream manager
*/
public StreamManager registerFunctions() {
public StreamManager registerFunctions() throws IOException {
if (configuration.getBoolean(PYTHON_UDF_ENABLE_KEY, PYTHON_UDF_ENABLE_DEFAULT)) {
PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration);
PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig);
pythonUdfManager.registerPythonFunctions();
}

String[] functionFactoryClasses = configuration
.getString(Constants.FUNCTION_FACTORY_CLASSES_KEY, Constants.FUNCTION_FACTORY_CLASSES_DEFAULT)
.split(",");
Expand Down
3 changes: 2 additions & 1 deletion dagger-functions/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ sourceSets {
dependencies {
compileOnly 'org.projectlombok:lombok:1.18.8'
annotationProcessor 'org.projectlombok:lombok:1.18.8'

compileOnly project(path: ':dagger-common', configuration: 'minimalCommonJar')
compileOnly project(path: ':dagger-common', configuration: 'dependenciesCommonJar')
compileOnly 'org.apache.flink:flink-streaming-java_2.11:' + flinkVersion
Expand All @@ -62,7 +63,7 @@ dependencies {
dependenciesFunctionsJar group: 'org.apache.commons', name: 'commons-jexl3', version: '3.1'
dependenciesFunctionsJar group: 'org.isuper', name: 's2-geometry-library-java', version: '0.0.1'
dependenciesFunctionsJar group: 'com.google.cloud', name: 'google-cloud-storage', version: '1.67.0'

testImplementation project(':dagger-common').sourceSets.test.output
testImplementation group: 'junit', name: 'junit', version: '4.12'
testImplementation 'org.mockito:mockito-core:2.0.99-beta'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,17 @@ public class Constants {
public static final String UDF_DART_GCS_PROJECT_ID_DEFAULT = "";
public static final String UDF_DART_GCS_BUCKET_ID_KEY = "UDF_DART_GCS_BUCKET_ID";
public static final String UDF_DART_GCS_BUCKET_ID_DEFAULT = "";

public static final String PYTHON_UDF_CONFIG = "PYTHON_UDF_CONFIG";
public static final String PYTHON_UDF_ENABLE_KEY = "PYTHON_UDF_ENABLE";
public static final boolean PYTHON_UDF_ENABLE_DEFAULT = false;
public static final String PYTHON_FILES_KEY = "PYTHON_FILES";
public static final String PYTHON_REQUIREMENTS_KEY = "PYTHON_REQUIREMENTS";
public static final String PYTHON_ARCHIVES_KEY = "PYTHON_ARCHIVES";
public static final String PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY = "PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE";
public static final Integer PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT = 10000;
public static final String PYTHON_FN_EXECUTION_BUNDLE_SIZE_KEY = "PYTHON_FN_EXECUTION_BUNDLE_SIZE";
public static final Integer PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT = 100000;
public static final String PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY = "PYTHON_FN_EXECUTION_BUNDLE_TIME";
public static final long PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT = 1000;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.odpf.dagger.functions.exceptions;

/**
* The type Python files empty exception.
*/
public class PythonFilesEmptyException extends RuntimeException {

/**
* Instantiates a new Python files empty exception.
*
* @param message the message
*/
public PythonFilesEmptyException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.odpf.dagger.functions.exceptions;

/**
* The type Python files format exception.
*/
public class PythonFilesFormatException extends RuntimeException {

/**
* Instantiates a new Python files format exception.
*
* @param message the message
*/
public PythonFilesFormatException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package io.odpf.dagger.functions.udfs.python;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import io.odpf.dagger.common.configuration.Configuration;
import lombok.Getter;

import static io.odpf.dagger.functions.common.Constants.*;

/**
* The type Python udf config.
*/
public class PythonUdfConfig {
private static final Gson GSON = new GsonBuilder()
.enableComplexMapKeySerialization()
.setPrettyPrinting()
.create();

@SerializedName(PYTHON_FILES_KEY)
private String pythonFiles;

@SerializedName(PYTHON_REQUIREMENTS_KEY)
@Getter
private String pythonRequirements;

@SerializedName(PYTHON_ARCHIVES_KEY)
private String pythonArchives;

@SerializedName(PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_KEY)
private Integer pythonArrowBatchSize;

@SerializedName(PYTHON_FN_EXECUTION_BUNDLE_SIZE_KEY)
private Integer pythonBundleSize;

@SerializedName(PYTHON_FN_EXECUTION_BUNDLE_TIME_KEY)
private Long pythonBundleTime;

/**
* Gets python files.
*
* @return the python files
*/
public String getPythonFiles() {
if (pythonFiles != null) {
return pythonFiles.replaceAll("\\s+", "");
}
return null;
}

/**
* Gets python archives.
*
* @return the python archives
*/
public String getPythonArchives() {
if (pythonArchives != null) {
return pythonArchives.replaceAll("\\s+", "");
}
return null;
}

/**
* Gets python arrow batch size.
*
* @return the python arrow batch size
*/
public int getPythonArrowBatchSize() {
if (pythonArrowBatchSize == null) {
return PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE_DEFAULT;
}
return pythonArrowBatchSize;
}

/**
* Gets python bundle size.
*
* @return the python bundle size
*/
public int getPythonBundleSize() {
if (pythonBundleSize == null) {
return PYTHON_FN_EXECUTION_BUNDLE_SIZE_DEFAULT;
}
return pythonBundleSize;
}

/**
* Gets python bundle time.
*
* @return the python bundle time
*/
public long getPythonBundleTime() {
if (pythonBundleTime == null) {
return PYTHON_FN_EXECUTION_BUNDLE_TIME_DEFAULT;
}
return pythonBundleTime;
}

/**
* Parse python udf config.
*
* @param configuration the configuration
* @return the python udf config
*/
public static PythonUdfConfig parse(Configuration configuration) {
String jsonString = configuration.getString(PYTHON_UDF_CONFIG, "");

return GSON.fromJson(jsonString, PythonUdfConfig.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.odpf.dagger.functions.udfs.python;

import io.odpf.dagger.functions.exceptions.PythonFilesEmptyException;
import io.odpf.dagger.functions.udfs.python.file.type.FileType;
import io.odpf.dagger.functions.udfs.python.file.type.FileTypeFactory;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* The type Python udf manager.
*/
public class PythonUdfManager {

private StreamTableEnvironment tableEnvironment;
private PythonUdfConfig pythonUdfConfig;

/**
* Instantiates a new Python udf manager.
*
* @param tableEnvironment the table environment
* @param pythonUdfConfig the python udf config
*/
public PythonUdfManager(StreamTableEnvironment tableEnvironment, PythonUdfConfig pythonUdfConfig) {
this.tableEnvironment = tableEnvironment;
this.pythonUdfConfig = pythonUdfConfig;
}

/**
* Register python functions.
*/
public void registerPythonFunctions() throws IOException {
String inputFiles = pythonUdfConfig.getPythonFiles();
String[] pythonFiles;
if (inputFiles != null) {
registerPythonConfig();
pythonFiles = inputFiles.split(",");
} else {
throw new PythonFilesEmptyException("Python files can not be null");
}

for (String pythonFile : pythonFiles) {
FileType fileType = FileTypeFactory.getFileType(pythonFile);
List<String> fileNames = fileType.getFileNames();
List<String> sqlQueries = createQuery(fileNames);
executeSql(sqlQueries);
}
}

private void registerPythonConfig() {
if (pythonUdfConfig.getPythonRequirements() != null) {
tableEnvironment.getConfig().getConfiguration().setString("python.requirements", pythonUdfConfig.getPythonRequirements());
}
if (pythonUdfConfig.getPythonArchives() != null) {
tableEnvironment.getConfig().getConfiguration().setString("python.archives", pythonUdfConfig.getPythonArchives());
}
tableEnvironment.getConfig().getConfiguration().setString("python.files", pythonUdfConfig.getPythonFiles());
tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.arrow.batch.size", pythonUdfConfig.getPythonArrowBatchSize());
tableEnvironment.getConfig().getConfiguration().setInteger("python.fn-execution.bundle.size", pythonUdfConfig.getPythonBundleSize());
tableEnvironment.getConfig().getConfiguration().setLong("python.fn-execution.bundle.time", pythonUdfConfig.getPythonBundleTime());
}

private void executeSql(List<String> sqlQueries) {
for (String query : sqlQueries) {
tableEnvironment.executeSql(query);
}
}

private List<String> createQuery(List<String> fileNames) {
List<String> sqlQueries = new ArrayList<>();
for (String fileName : fileNames) {
fileName = fileName.replace(".py", "").replace("/", ".");
String functionName = fileName.substring(fileName.lastIndexOf(".") + 1);
String query = "CREATE TEMPORARY FUNCTION " + functionName.toUpperCase() + " AS '" + fileName + "." + functionName + "' LANGUAGE PYTHON";
sqlQueries.add(query);
}
return sqlQueries;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.odpf.dagger.functions.udfs.python.file.source;

import java.io.IOException;

/**
* The interface File source.
*/
public interface FileSource {

/**
* Get object file byte [ ].
*
* @return the byte [ ]
*/
byte[] getObjectFile() throws IOException;
}
Loading