Skip to content

Commit 82f3ae0

Browse files
committed
Merge remote-tracking branch 'upstream/main' into dagger-parquet-file-processing
# Conflicts: # dagger-common/build.gradle # dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java # version.txt
2 parents b6d76d3 + 8893853 commit 82f3ae0

74 files changed

Lines changed: 1931 additions & 156 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
name: Python Package
2+
on:
3+
release:
4+
types: [created]
5+
6+
jobs:
7+
publishPythonZip:
8+
runs-on: ubuntu-latest
9+
steps:
10+
- uses: actions/checkout@v2
11+
- name: Zip Python Udf
12+
run: |
13+
cd dagger-py-functions
14+
zip -r python_udfs.zip udfs -x "*/__init__.py"
15+
zip -jr data.zip data
16+
zip -r dagger-py-functions.zip requirements.txt data.zip python_udfs.zip
17+
- name: Upload Release
18+
uses: ncipollo/release-action@v1
19+
with:
20+
artifacts: dagger-py-functions/dagger-py-functions.zip
21+
allowUpdates: true
22+
omitNameDuringUpdate: true
23+
omitBodyDuringUpdate: true
24+
omitPrereleaseDuringUpdate: true
25+
token: ${{ secrets.GITHUB_TOKEN }}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
name: Python Validation
2+
3+
on: push
4+
5+
jobs:
6+
pythonValidation:
7+
runs-on: ubuntu-latest
8+
steps:
9+
- uses: actions/checkout@v3
10+
- name: Set up Python 3.8
11+
uses: actions/setup-python@v3
12+
with:
13+
python-version: '3.8'
14+
- name: Install dependencies
15+
run: |
16+
python -m pip install --upgrade pip
17+
pip install apache-flink==1.14.3
18+
cd dagger-py-functions
19+
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
20+
- name: Lint with flake8
21+
run: |
22+
# stop the build if there are Python syntax errors or undefined names
23+
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
24+
- name: Test with pytest
25+
run: |
26+
cd dagger-py-functions
27+
pytest --disable-warnings

dagger-common/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,13 @@ dependencies {
6161

6262
dependenciesCommonJar ('org.apache.hadoop:hadoop-client:2.8.3') {
6363
exclude module:"commons-cli"
64+
exclude module:"commons-compress"
6465
}
6566
dependenciesCommonJar 'com.google.cloud.bigdataoss:gcs-connector:1.9.0-hadoop2'
6667
dependenciesCommonJar 'org.apache.flink:flink-metrics-dropwizard:' + flinkVersion
6768
dependenciesCommonJar 'org.apache.flink:flink-json:' + flinkVersion
6869
dependenciesCommonJar 'com.jayway.jsonpath:json-path:2.4.0'
69-
dependenciesCommonJar 'io.odpf:stencil:0.1.6'
70+
dependenciesCommonJar 'io.odpf:stencil:0.2.1'
7071
dependenciesCommonJar 'com.google.code.gson:gson:2.8.2'
7172
dependenciesCommonJar 'org.apache.parquet:parquet-column:1.12.2'
7273

dagger-core/env/local.properties

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,7 @@ METRIC_TELEMETRY_ENABLE=true
2626
# == Others ==
2727
FUNCTION_FACTORY_CLASSES=io.odpf.dagger.functions.udfs.factories.FunctionFactory
2828
FLINK_ROWTIME_ATTRIBUTE_NAME=rowtime
29+
30+
# == Python Udf ==
31+
PYTHON_UDF_ENABLE=false
32+
PYTHON_UDF_CONFIG={"PYTHON_FILES":"/path/to/files.zip", "PYTHON_REQUIREMENTS": "requirements.txt", "PYTHON_FN_EXECUTION_BUNDLE_SIZE": "1000"}

dagger-core/src/main/java/io/odpf/dagger/core/StreamManager.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,18 @@
2828
import io.odpf.dagger.core.sink.SinkOrchestrator;
2929
import io.odpf.dagger.core.source.StreamsFactory;
3030
import io.odpf.dagger.core.utils.Constants;
31+
import io.odpf.dagger.functions.udfs.python.PythonUdfConfig;
32+
import io.odpf.dagger.functions.udfs.python.PythonUdfManager;
3133

34+
import java.io.IOException;
3235
import java.lang.reflect.Constructor;
3336
import java.lang.reflect.InvocationTargetException;
3437
import java.time.Duration;
3538
import java.util.List;
3639

3740
import static io.odpf.dagger.core.utils.Constants.*;
41+
import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_DEFAULT;
42+
import static io.odpf.dagger.functions.common.Constants.PYTHON_UDF_ENABLE_KEY;
3843
import static org.apache.flink.table.api.Expressions.$;
3944

4045
/**
@@ -138,7 +143,13 @@ private ApiExpression[] getApiExpressions(StreamInfo streamInfo) {
138143
*
139144
* @return the stream manager
140145
*/
141-
public StreamManager registerFunctions() {
146+
public StreamManager registerFunctions() throws IOException {
147+
if (configuration.getBoolean(PYTHON_UDF_ENABLE_KEY, PYTHON_UDF_ENABLE_DEFAULT)) {
148+
PythonUdfConfig pythonUdfConfig = PythonUdfConfig.parse(configuration);
149+
PythonUdfManager pythonUdfManager = new PythonUdfManager(tableEnvironment, pythonUdfConfig);
150+
pythonUdfManager.registerPythonFunctions();
151+
}
152+
142153
String[] functionFactoryClasses = configuration
143154
.getString(Constants.FUNCTION_FACTORY_CLASSES_KEY, Constants.FUNCTION_FACTORY_CLASSES_DEFAULT)
144155
.split(",");

dagger-core/src/main/java/io/odpf/dagger/core/processors/common/EndpointHandler.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.odpf.dagger.core.processors.common;
22

33
import io.odpf.dagger.core.processors.ColumnNameManager;
4-
import io.odpf.dagger.core.processors.types.SourceConfig;
4+
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
55
import org.apache.flink.streaming.api.functions.async.ResultFuture;
66
import org.apache.flink.types.Row;
77

@@ -29,7 +29,6 @@
2929
*/
3030
public class EndpointHandler {
3131
private static final Logger LOGGER = LoggerFactory.getLogger(EndpointHandler.class.getName());
32-
private SourceConfig sourceConfig;
3332
private MeterStatsManager meterStatsManager;
3433
private ErrorReporter errorReporter;
3534
private String[] inputProtoClasses;
@@ -48,13 +47,11 @@ public class EndpointHandler {
4847
* @param columnNameManager the column name manager
4948
* @param descriptorManager the descriptor manager
5049
*/
51-
public EndpointHandler(SourceConfig sourceConfig,
52-
MeterStatsManager meterStatsManager,
50+
public EndpointHandler(MeterStatsManager meterStatsManager,
5351
ErrorReporter errorReporter,
5452
String[] inputProtoClasses,
5553
ColumnNameManager columnNameManager,
5654
DescriptorManager descriptorManager) {
57-
this.sourceConfig = sourceConfig;
5855
this.meterStatsManager = meterStatsManager;
5956
this.errorReporter = errorReporter;
6057
this.inputProtoClasses = inputProtoClasses;
@@ -63,19 +60,20 @@ public EndpointHandler(SourceConfig sourceConfig,
6360
}
6461

6562
/**
66-
* Get endpoint or query variables values.
63+
* Get external post processor variables values.
6764
*
6865
* @param rowManager the row manager
66+
* @param variableType the variable type
67+
* @parm variables the variable list
6968
* @param resultFuture the result future
7069
* @return the array object
7170
*/
72-
public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultFuture<Row> resultFuture) {
73-
String queryVariables = sourceConfig.getVariables();
74-
if (StringUtils.isEmpty(queryVariables)) {
71+
public Object[] getVariablesValue(RowManager rowManager, ExternalPostProcessorVariableType variableType, String variables, ResultFuture<Row> resultFuture) {
72+
if (StringUtils.isEmpty(variables)) {
7573
return new Object[0];
7674
}
7775

78-
String[] requiredInputColumns = queryVariables.split(",");
76+
String[] requiredInputColumns = variables.split(",");
7977
ArrayList<Object> inputColumnValues = new ArrayList<>();
8078
if (descriptorMap == null) {
8179
descriptorMap = createDescriptorMap(requiredInputColumns, inputProtoClasses, resultFuture);
@@ -84,7 +82,7 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF
8482
for (String inputColumnName : requiredInputColumns) {
8583
int inputColumnIndex = columnNameManager.getInputIndex(inputColumnName);
8684
if (inputColumnIndex == -1) {
87-
throw new InvalidConfigurationException(String.format("Column '%s' not found as configured in the endpoint/query variable", inputColumnName));
85+
throw new InvalidConfigurationException(String.format("Column '%s' not found as configured in the '%s' variable", inputColumnName, variableType));
8886
}
8987

9088
Descriptors.FieldDescriptor fieldDescriptor = descriptorMap.get(inputColumnName);
@@ -105,11 +103,12 @@ public Object[] getEndpointOrQueryVariablesValues(RowManager rowManager, ResultF
105103
*
106104
* @param resultFuture the result future
107105
* @param rowManager the row manager
108-
* @param endpointVariablesValues the endpoint variables values
106+
* @param variables the request/header variables
107+
* @param variablesValue the variables value
109108
* @return the boolean
110109
*/
111-
public boolean isQueryInvalid(ResultFuture<Row> resultFuture, RowManager rowManager, Object[] endpointVariablesValues) {
112-
if (!StringUtils.isEmpty(sourceConfig.getVariables()) && (Arrays.asList(endpointVariablesValues).isEmpty() || Arrays.stream(endpointVariablesValues).allMatch(""::equals))) {
110+
public boolean isQueryInvalid(ResultFuture<Row> resultFuture, RowManager rowManager, String variables, Object[] variablesValue) {
111+
if (!StringUtils.isEmpty(variables) && (Arrays.asList(variablesValue).isEmpty() || Arrays.stream(variablesValue).allMatch(""::equals))) {
113112
LOGGER.warn("Could not populate any request variable. Skipping external calls");
114113
meterStatsManager.markEvent(ExternalSourceAspects.EMPTY_INPUT);
115114
resultFuture.complete(singleton(rowManager.getAll()));

dagger-core/src/main/java/io/odpf/dagger/core/processors/external/AsyncConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public void open(Configuration configuration) throws Exception {
161161
meterStatsManager = new MeterStatsManager(getRuntimeContext().getMetricGroup(), true);
162162
}
163163
if (endpointHandler == null) {
164-
endpointHandler = new EndpointHandler(sourceConfig, meterStatsManager, errorReporter,
164+
endpointHandler = new EndpointHandler(meterStatsManager, errorReporter,
165165
schemaConfig.getInputProtoClasses(), schemaConfig.getColumnNameManager(), descriptorManager);
166166
}
167167

dagger-core/src/main/java/io/odpf/dagger/core/processors/external/es/EsAsyncConnector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import io.odpf.dagger.core.processors.external.AsyncConnector;
1010
import org.apache.flink.streaming.api.functions.async.ResultFuture;
1111
import org.apache.flink.types.Row;
12-
1312
import io.odpf.dagger.core.utils.Constants;
13+
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
1414
import org.apache.http.HttpHost;
1515
import org.apache.http.auth.AuthScope;
1616
import org.apache.http.auth.UsernamePasswordCredentials;
@@ -79,8 +79,8 @@ protected void createClient() {
7979
protected void process(Row input, ResultFuture<Row> resultFuture) {
8080
RowManager rowManager = new RowManager(input);
8181
Object[] endpointVariablesValues = getEndpointHandler()
82-
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
83-
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, endpointVariablesValues)) {
82+
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.ENDPOINT_VARIABLE, esSourceConfig.getVariables(), resultFuture);
83+
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, esSourceConfig.getVariables(), endpointVariablesValues)) {
8484
return;
8585
}
8686
String esEndpoint = String.format(esSourceConfig.getPattern(), endpointVariablesValues);

dagger-core/src/main/java/io/odpf/dagger/core/processors/external/grpc/GrpcAsyncConnector.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.protobuf.Descriptors;
2121
import com.google.protobuf.DynamicMessage;
2222
import io.odpf.dagger.core.utils.Constants;
23+
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
2324
import org.apache.commons.lang3.StringUtils;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
@@ -90,8 +91,8 @@ protected void process(Row input, ResultFuture<Row> resultFuture) throws Excepti
9091
RowManager rowManager = new RowManager(input);
9192

9293
Object[] requestVariablesValues = getEndpointHandler()
93-
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
94-
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) {
94+
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, grpcSourceConfig.getVariables(), resultFuture);
95+
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, grpcSourceConfig.getVariables(), requestVariablesValues)) {
9596
return;
9697
}
9798

dagger-core/src/main/java/io/odpf/dagger/core/processors/external/http/HttpAsyncConnector.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.flink.types.Row;
1616

1717
import io.odpf.dagger.core.utils.Constants;
18+
import io.odpf.dagger.core.utils.Constants.ExternalPostProcessorVariableType;
1819
import org.asynchttpclient.AsyncHttpClient;
1920
import org.asynchttpclient.BoundRequestBuilder;
2021
import org.slf4j.Logger;
@@ -94,12 +95,14 @@ protected void process(Row input, ResultFuture<Row> resultFuture) {
9495
RowManager rowManager = new RowManager(input);
9596

9697
Object[] requestVariablesValues = getEndpointHandler()
97-
.getEndpointOrQueryVariablesValues(rowManager, resultFuture);
98-
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, requestVariablesValues)) {
98+
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.REQUEST_VARIABLES, httpSourceConfig.getRequestVariables(), resultFuture);
99+
Object[] dynamicHeaderVariablesValues = getEndpointHandler()
100+
.getVariablesValue(rowManager, ExternalPostProcessorVariableType.HEADER_VARIABLES, httpSourceConfig.getHeaderVariables(), resultFuture);
101+
if (getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getRequestVariables(), requestVariablesValues) || getEndpointHandler().isQueryInvalid(resultFuture, rowManager, httpSourceConfig.getHeaderVariables(), dynamicHeaderVariablesValues)) {
99102
return;
100103
}
101104

102-
BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues);
105+
BoundRequestBuilder request = HttpRequestFactory.createRequest(httpSourceConfig, httpClient, requestVariablesValues, dynamicHeaderVariablesValues);
103106
HttpResponseHandler httpResponseHandler = new HttpResponseHandler(httpSourceConfig, getMeterStatsManager(),
104107
rowManager, getColumnNameManager(), getOutputDescriptor(resultFuture), resultFuture, getErrorReporter(), new PostResponseTelemetry());
105108
httpResponseHandler.startTimer();

0 commit comments

Comments
 (0)