Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/contribute/add_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ For adding custom Transformers follow these steps

- To define a new Transformer implement Transformer interface. The contract of Transformers is defined [here](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/core/Transformer.java).

- Since an input DataStream is available in Transformer, all the Flink supported operators which transform `DataStream -> DataStream` can be applied/used by default for the transformations. Operators are how Flink exposes classic Map-reduce type functionalities. Read more about Flink Operators [here](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/).
- Since an input DataStream is available in Transformer, all the Flink supported operators which transform `DataStream -> DataStream` can be applied/used by default for the transformations. Operators are how Flink exposes classic Map-reduce type functionalities. Read more about Flink Operators [here](https://ci.apache.org/projects/flink/flink-docs-release-1.14/dev/stream/operators/).

- In the case of single Operator Transformation you can extend the desired Operator in the Transformer class itself. For example, follow this code of [HashTransformer](https://github.com/odpf/dagger/blob/main/dagger-functions/src/main/java/io/odpf/dagger/functions/transformers/HashTransformer.java). You can also define multiple chaining operators to Transform Data.

Expand Down
28 changes: 18 additions & 10 deletions docs/docs/contribute/add_udf.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,29 @@ Want a function to use in SQL which is not supported both by Flink and one of th

`Note`: _Please go through the [Contribution guide](../contribute/contribution.md) to know about all the conventions and practices we tend to follow and to know about the contribution process to the dagger._

For adding custom UDFs follow these steps
For adding custom UDFs follow these steps:

- Ensure none of the [built-in functions](https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/systemfunctions) or [existing UDF](../reference/udfs.md) suits your requirement.
- For adding a UDF, figure out which type of UDF you required. Flink supports three types of [User defined function](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/udfs.html). Choose one of them according to the requirement.
- Ensure none of the [built-in functions](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/functions/systemfunctions/) or [existing UDF](../reference/udfs.md) suits your requirement.

- For getting more insights on writing your UDF, follow [this](https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/udfs.html) to create a UDF. It needs to be written in Java/Scala.
- For adding a UDF, figure out which type of UDF you required. Flink supports three types of [User defined function](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/functions/udfs/). Choose one of them according to the requirement.

- UDF need to be the `function-type` directory inside [this](https://github.com/odpf/dagger/tree/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs) on `dagger-functions` subproject.
- There are options for programming language you can choose for adding UDF, which is using Java, Scala and Python.

- Extend either of [ScalarUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/ScalarUdf.java), [TableUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/TableUdf.java) or [AggregateUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/AggregateUdf.java) from `dagger-common`. They are boilerplate contracts extending Flink UDF classes. These classes do some more preprocessing(like exposing some metrics) in the `open` method behind the scene.

- Register the UDF in [this](https://github.com/odpf/dagger/blob/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/factories/FunctionFactory.java) class. This is required to let Flink know about your function.
- For adding UDF with Java/Scala:
- Follow [this]((https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/functions/udfs/)) for more insights on writing your UDF.
- UDF needs to be added in the `function-type` folder inside [this](https://github.com/odpf/dagger/tree/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs) on `dagger-functions` subproject.
- Extend either of [ScalarUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/ScalarUdf.java), [TableUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/TableUdf.java) or [AggregateUdf](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/AggregateUdf.java) from `dagger-common`. They are boilerplate contracts extending Flink UDF classes. These classes do some more preprocessing(like exposing some metrics) in the `open` method behind the scene.
- Register the UDF in [this](https://github.com/odpf/dagger/blob/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs/factories/FunctionFactory.java) class. This is required to let Flink know about your function.
- If you have some business-specific use-cases and you don't want to add UDFs to the open-sourced repo, you can have a separate local codebase for those UDFs. Those UDFs need to be registered in a similar class like the [`UDFFactory`](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/UdfFactory.java). Keep both the UDF classes and the factory class in the classpath of Dagger. Configure the fully qualified Factory class in the `FUNCTION_FACTORY_CLASSES` parameter and you will be able to use the desired UDF in your query.

- For adding UDF with Python:
- Follow [this]((https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/table/udfs/overview/)) for more insights on writing your UDF.
- UDF need to be added inside [this](https://github.com/odpf/dagger/tree/main/dagger-py-functions/udfs) on `dagger-py-functions` directory.
- Ensure that the filename and method name on the python functions is the same. This name will be registered by dagger as a function name which later can be used on the query.
- Ensure to add dependency needed for the python function on the [requirements.txt](https://github.com/odpf/dagger/tree/main/dagger-py-functions/requirements.txt) file.
- Add python unit test and the make sure the test is passed.
- If you have some business-specific use-cases and you don't want to add UDFs to the open-sourced repo, you can have a separate local codebase for those UDFs and specify that file on the python configuration.

- Bump up the version and raise a PR for the same. Also please add the registered function to the [list of udfs doc](../reference/udfs.md).

- If you have some business-specific use-cases and you don't want to add UDFs to the open-sourced repo, you can have a separate local codebase for those UDFs. Those UDFs need to be registered in a similar class like the [`UDFFactory`](https://github.com/odpf/dagger/blob/main/dagger-common/src/main/java/io/odpf/dagger/common/udfs/UdfFactory.java). Keep both the UDF classes and the factory class in the classpath of Dagger. Configure the fully qualified Factory class in the `FUNCTION_FACTORY_CLASSES` parameter and you will be able to use the desired UDF in your query.

In the subsequent release of the dagger, your functions should be useable in the query.
20 changes: 19 additions & 1 deletion docs/docs/guides/use_udf.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ Some of the use-cases can not be solved using Flink SQL & the Apache Calcite fun

Maps zero or more values to multiple rows and each row may have multiple columns.

All the supported udfs present in the `dagger-functions` subproject in [this](https://github.com/odpf/dagger/tree/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs) directory. Follow this to find more details about the already supported UDFs in the dagger.
All the supported java udfs present in the `dagger-functions` subproject in [this](https://github.com/odpf/dagger/tree/main/dagger-functions/src/main/java/io/odpf/dagger/functions/udfs) directory.

All the supported python udfs present in the [dagger-py-functions](https://github.com/odpf/dagger/tree/main/dagger-py-functions/udfs/) directory.

Follow [this](../reference/udfs.md) to find more details about the already supported UDFs in the dagger.

If any of the predefined functions do not meet your requirement you can create your custom UDFs by extending some implementation. Follow [this](../contribute/add_udf.md) to add your custom UDFs in the dagger.

## Python Environment Setup

Python UDF execution requires Python version (3.6, 3.7 or 3.8) with PyFlink installed.

PyFlink is available in PyPi and can be installed as follows:
```
$ python -m pip install apache-flink==1.14.3
```

To satisfy the PyFlink requirement regarding the Python environment version, you need to soft link python to point to your python3 interpreter:
```
ln -s /usr/bin/python3 python
```
96 changes: 95 additions & 1 deletion docs/docs/reference/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ This page contains references for all the application configurations for Dagger.
* [PreProcessor](configuration.md#preprocessor)
* [PostProcessor](configuration.md#postprocessor)
* [Telemetry](configuration.md#telemetry)
* [Python Udfs](configuration.md#python-udfs)


### Generic

Expand Down Expand Up @@ -496,4 +498,96 @@ Shutdown period of metric telemetry in milliseconds.

* Example value: `10000`
* Type: `optional`
* Default value: `10000`
* Default value: `10000`

### Python Udfs

#### `PYTHON_UDF_ENABLE`

Enable/Disable using python udf.

* Example value: `10000`
* Type: `optional`
* Default value: `10000`

#### `PYTHON_UDF_CONFIG`

All the configuration need to use python udf.

These following variables need to be configured:

##### `PYTHON_FILES`

Defines the path of python udf files. Currently only support for `.py` and `.zip` data type. Comma (',') could be used as the separator to specify multiple files.

* Example value: `/path/to/files.zip`
* Type: `required`

##### `PYTHON_ARCHIVES`

Defines the path of files that used on the python udf. Only support for `.zip` data type. Comma (',') could be used as the separator to specify multiple archive files.
The archive files will be extracted to the working directory of python UDF worker. For each archive file, a target directory is specified. If the target directory name is specified, the archive file will be extracted to a directory with the specified name. Otherwise, the archive file will be extracted to a directory with the same name of the archive file. '#' could be used as the separator of the archive file path and the target directory name.

Example:
* PYTHON_ARCHIVES=/path/to/data.zip

You should set the path name to `data.zip/data/sample.txt` on the udf to be able open the files.

* PYTHON_ARCHIVES=/path/to/data.zip#data

You should set the path name to `data/sample.txt` on the udf to be able open the files.

* Example how to use this, can be found in this [udf](https://github.com/odpf/dagger/tree/main/dagger-py-functions/udfs/scalar/sample.py)

* Type: `optional`
* Default value: `(none)`

##### `PYTHON_REQUIREMENTS`

Defines the path of python dependency files.

* Example value: `/path/to/requirements.txt`
* Type: `optional`
* Default value: `(none)`

##### `PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE`

The maximum number of elements to include in an arrow batch for python user-defined function execution.

* Example value: `10000`
* Type: `optional`
* Default value: `10000`

##### `PYTHON_FN_EXECUTION_BUNDLE_SIZE`

The maximum number of elements to include in a bundle for python user-defined function execution.

* Example value: `100000`
* Type: `optional`
* Default value: `100000`

##### `PYTHON_FN_EXECUTION_BUNDLE_TIME`

Sets the waiting timeout(in milliseconds) before processing a bundle for Python user-defined function execution. The timeout defines how long the elements of a bundle will be buffered before being processed.

* Example value: `1000`
* Type: `optional`
* Default value: `1000`

##### Sample Configuration
```
PYTHON_UDF_CONFIG = [
{
"PYTHON_FILES": "/path/to/files.py",
"PYTHON_ARCHIVES": "/path/to/data.zip",
"PYTHON_REQUIREMENTS": "/path/to/requirements.txt",
"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE": "10000",
"PYTHON_FN_EXECUTION_BUNDLE_SIZE": "100000",
"PYTHON_FN_EXECUTION_BUNDLE_TIME": "1000"
}
]
```

Find more details on python udf config [here](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/python_config/#python-options).


50 changes: 48 additions & 2 deletions docs/docs/reference/udfs.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# Udfs

This page contains references for all the custom udfs available on Dagger.
This page contains references for all the custom udfs available on Dagger.

## List of Udfs
The udfs on Dagger divided into two parts:
* [Java Udfs](udfs.md#list-of-java-udfs)
* [Python Udfs](udfs.md#list-of-python-udfs)

## List of Java Udfs

- [Scalar Functions](udfs.md#scalar-functions)
- [ArrayAggregate](udfs.md#ArrayAggregate)
Expand Down Expand Up @@ -795,3 +799,45 @@ LATERAL TABLE(
`isOutlier`
)
```

## List of Python Udfs

- [Scalar Functions](udfs.md#python-scalar-functions)
- [Sample](udfs.md#Sample)
- [Multiply](udfs.md#Multiply)
- [Aggregate Functions](udfs.md#python-aggregate-functions)
- [Table Functions](udfs.md#python-table-functions)

### Python Scalar Functions

#### Sample
* Contract:
* **String** `Sample(input_string)`.
* Functionality:
* This is one of sample python udfs.
* Adding extra string from [data/sample.txt](../../../dagger-py-functions/data/sample_data.txt) to the input_string.
* Example:
```
SELECT
sample(inputString) as input_with_additonal_text
FROM
data_stream
```

#### Multiply
* Contract:
* **Float** `Multiply(input_number1, input_number2)`.
* Functionality:
* This is one of sample python udfs.
* Multiply two input numbers specified.
* Example:
```
SELECT
multiply(input_number1, input_number2) as multiply_result
FROM
data_stream
```

### Python Aggregate Functions

### Python Table Functions