diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index 3ecb83655..eb69a9b8f 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -52,6 +52,12 @@ def struct_df(): return ctx.create_dataframe([[batch]]) +@pytest.fixture +def aggregate_df(): + ctx = SessionContext() + ctx.register_csv('test', 'testing/data/csv/aggregate_test_100.csv') + return ctx.sql('select c1, sum(c2) from test group by c1') + def test_select(df): df = df.select( @@ -258,19 +264,58 @@ def test_explain(df): df.explain() -def test_logical_plan(df): - plan = df.logical_plan() - assert plan is not None +def test_logical_plan(aggregate_df): + plan = aggregate_df.logical_plan() + + expected = "Projection: test.c1, SUM(test.c2)" + + assert expected == plan.display() + + expected = \ + "Projection: test.c1, SUM(test.c2)\n" \ + " Aggregate: groupBy=[[test.c1]], aggr=[[SUM(test.c2)]]\n" \ + " TableScan: test" + + assert expected == plan.display_indent() + + +def test_optimized_logical_plan(aggregate_df): + plan = aggregate_df.optimized_logical_plan() + + expected = "Projection: test.c1, SUM(test.c2)" + + assert expected == plan.display() + + expected = \ + "Projection: test.c1, SUM(test.c2)\n" \ + " Aggregate: groupBy=[[test.c1]], aggr=[[SUM(test.c2)]]\n" \ + " TableScan: test projection=[c1, c2]" + + assert expected == plan.display_indent() + + +def test_execution_plan(aggregate_df): + plan = aggregate_df.execution_plan() + + expected = "ProjectionExec: expr=[c1@0 as c1, SUM(test.c2)@1 as SUM(test.c2)]\n" + + assert expected == plan.display() + expected = \ + "ProjectionExec: expr=[c1@0 as c1, SUM(test.c2)@1 as SUM(test.c2)]\n" \ + " Aggregate: groupBy=[[test.c1]], aggr=[[SUM(test.c2)]]\n" \ + " TableScan: test projection=[c1, c2]" -def test_optimized_logical_plan(df): - plan = df.optimized_logical_plan() - assert plan is not None + indent = plan.display_indent() + # indent plan will be different for everyone due to absolute path to filename, so + # we just check for some expected content + assert "ProjectionExec:" in indent + assert "AggregateExec:" in indent + assert "CoalesceBatchesExec:" in indent + assert "RepartitionExec:" in indent + assert "CsvExec:" in indent -def test_execution_plan(df): - plan = df.execution_plan() - assert plan is not None def test_repartition(df): diff --git a/src/dataframe.rs b/src/dataframe.rs index 749b7bd57..c6162e4e4 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -206,7 +206,7 @@ impl PyDataFrame { /// Get the logical plan for this `DataFrame` fn logical_plan(&self) -> PyResult { - Ok(self.df.as_ref().clone().into_optimized_plan()?.into()) + Ok(self.df.as_ref().clone().logical_plan().clone().into()) } /// Get the optimized logical plan for this `DataFrame` diff --git a/src/logical.rs b/src/logical.rs index 304cdf991..dcd7baa58 100644 --- a/src/logical.rs +++ b/src/logical.rs @@ -35,6 +35,34 @@ impl PyLogicalPlan { } } +#[pymethods] +impl PyLogicalPlan { + /// Get the inputs to this plan + pub fn inputs(&self) -> Vec { + let mut inputs = vec![]; + for input in self.plan.inputs() { + inputs.push(input.to_owned().into()); + } + inputs + } + + pub fn display(&self) -> String { + format!("{}", self.plan.display()) + } + + pub fn display_indent(&self) -> String { + format!("{}", self.plan.display_indent()) + } + + pub fn display_indent_schema(&self) -> String { + format!("{}", self.plan.display_indent_schema()) + } + + pub fn display_graphviz(&self) -> String { + format!("{}", self.plan.display_indent_schema()) + } +} + impl From for LogicalPlan { fn from(logical_plan: PyLogicalPlan) -> LogicalPlan { logical_plan.plan.as_ref().clone() diff --git a/src/physical_plan.rs b/src/physical_plan.rs index b4c68b9f9..01bf3eefa 100644 --- a/src/physical_plan.rs +++ b/src/physical_plan.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{displayable, ExecutionPlan}; use std::sync::Arc; use pyo3::prelude::*; @@ -33,6 +33,28 @@ impl PyExecutionPlan { } } +#[pymethods] +impl PyExecutionPlan { + /// Get the inputs to this plan + pub fn children(&self) -> Vec { + self.plan + .children() + .iter() + .map(|p| p.to_owned().into()) + .collect() + } + + pub fn display(&self) -> String { + let d = displayable(self.plan.as_ref()); + format!("{}", d.one_line()) + } + + pub fn display_indent(&self) -> String { + let d = displayable(self.plan.as_ref()); + format!("{}", d.indent()) + } +} + impl From for Arc { fn from(plan: PyExecutionPlan) -> Arc { plan.plan.clone()