diff --git a/datafusion/__init__.py b/datafusion/__init__.py index e640c041f..bb1beacd9 100644 --- a/datafusion/__init__.py +++ b/datafusion/__init__.py @@ -83,6 +83,8 @@ CreateView, Distinct, DropTable, + Repartition, + Partitioning, ) __version__ = importlib_metadata.version(__name__) @@ -141,6 +143,8 @@ "CreateView", "Distinct", "DropTable", + "Repartition", + "Partitioning", ] diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index eaa230221..766ddce89 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -84,6 +84,8 @@ CreateView, Distinct, DropTable, + Repartition, + Partitioning, ) @@ -157,6 +159,8 @@ def test_class_module_is_datafusion(): CreateView, Distinct, DropTable, + Repartition, + Partitioning, ]: assert klass.__module__ == "datafusion.expr" diff --git a/src/expr.rs b/src/expr.rs index 579f5098d..7c80d0d82 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -68,6 +68,7 @@ pub mod literal; pub mod logical_node; pub mod placeholder; pub mod projection; +pub mod repartition; pub mod scalar_function; pub mod scalar_subquery; pub mod scalar_variable; @@ -287,5 +288,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/expr/repartition.rs b/src/expr/repartition.rs new file mode 100644 index 000000000..e3e14f878 --- /dev/null +++ b/src/expr/repartition.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{self, Display, Formatter}; + +use datafusion_expr::{logical_plan::Repartition, Expr, Partitioning}; +use pyo3::prelude::*; + +use crate::{errors::py_type_err, sql::logical::PyLogicalPlan}; + +use super::{logical_node::LogicalNode, PyExpr}; + +#[pyclass(name = "Repartition", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyRepartition { + repartition: Repartition, +} + +#[pyclass(name = "Partitioning", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyPartitioning { + partitioning: Partitioning, +} + +impl From for Partitioning { + fn from(partitioning: PyPartitioning) -> Self { + partitioning.partitioning + } +} + +impl From for PyPartitioning { + fn from(partitioning: Partitioning) -> Self { + PyPartitioning { partitioning } + } +} + +impl From for Repartition { + fn from(repartition: PyRepartition) -> Self { + repartition.repartition + } +} + +impl From for PyRepartition { + fn from(repartition: Repartition) -> PyRepartition { + PyRepartition { repartition } + } +} + +impl Display for PyRepartition { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Repartition + input: {:?} + partitioning_scheme: {:?}", + &self.repartition.input, &self.repartition.partitioning_scheme, + ) + } +} + +#[pymethods] +impl PyRepartition { + fn input(&self) -> PyResult> { + Ok(Self::inputs(self)) + } + + fn partitioning_scheme(&self) -> PyResult { + Ok(PyPartitioning { + partitioning: self.repartition.partitioning_scheme.clone(), + }) + } + + fn distribute_list(&self) -> PyResult> { + match &self.repartition.partitioning_scheme { + Partitioning::DistributeBy(distribute_list) => Ok(distribute_list + .iter() + .map(|e| PyExpr::from(e.clone())) + .collect()), + _ => Err(py_type_err("unexpected repartition strategy")), + } + } + + fn distribute_columns(&self) -> PyResult { + match &self.repartition.partitioning_scheme { + Partitioning::DistributeBy(distribute_list) => Ok(distribute_list + .iter() + .map(|e| match &e { + Expr::Column(column) => column.name.clone(), + _ => panic!("Encountered a type other than Expr::Column"), + }) + .collect()), + _ => Err(py_type_err("unexpected repartition strategy")), + } + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Repartition({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("Repartition".to_string()) + } +} + +impl LogicalNode for PyRepartition { + fn inputs(&self) -> Vec { + vec![PyLogicalPlan::from((*self.repartition.input).clone())] + } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } +}