Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions datafusion/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def struct_df():

return ctx.create_dataframe([[batch]])

@pytest.fixture
def aggregate_df():
ctx = SessionContext()
ctx.register_csv('test', 'testing/data/csv/aggregate_test_100.csv')
return ctx.sql('select c1, sum(c2) from test group by c1')


def test_select(df):
df = df.select(
Expand Down Expand Up @@ -258,19 +264,58 @@ def test_explain(df):
df.explain()


def test_logical_plan(df):
plan = df.logical_plan()
assert plan is not None
def test_logical_plan(aggregate_df):
plan = aggregate_df.logical_plan()

expected = "Projection: test.c1, SUM(test.c2)"

assert expected == plan.display()

expected = \
"Projection: test.c1, SUM(test.c2)\n" \
" Aggregate: groupBy=[[test.c1]], aggr=[[SUM(test.c2)]]\n" \
" TableScan: test"

assert expected == plan.display_indent()


def test_optimized_logical_plan(aggregate_df):
plan = aggregate_df.optimized_logical_plan()

expected = "Projection: test.c1, SUM(test.c2)"

assert expected == plan.display()

expected = \
"Projection: test.c1, SUM(test.c2)\n" \
" Aggregate: groupBy=[[test.c1]], aggr=[[SUM(test.c2)]]\n" \
" TableScan: test projection=[c1, c2]"

assert expected == plan.display_indent()


def test_execution_plan(aggregate_df):
plan = aggregate_df.execution_plan()

expected = "ProjectionExec: expr=[c1@0 as c1, SUM(test.c2)@1 as SUM(test.c2)]\n"

assert expected == plan.display()

expected = \
"ProjectionExec: expr=[c1@0 as c1, SUM(test.c2)@1 as SUM(test.c2)]\n" \
" Aggregate: groupBy=[[test.c1]], aggr=[[SUM(test.c2)]]\n" \
" TableScan: test projection=[c1, c2]"

def test_optimized_logical_plan(df):
plan = df.optimized_logical_plan()
assert plan is not None
indent = plan.display_indent()

# indent plan will be different for everyone due to absolute path to filename, so
# we just check for some expected content
assert "ProjectionExec:" in indent
assert "AggregateExec:" in indent
assert "CoalesceBatchesExec:" in indent
assert "RepartitionExec:" in indent
assert "CsvExec:" in indent

def test_execution_plan(df):
plan = df.execution_plan()
assert plan is not None


def test_repartition(df):
Expand Down
2 changes: 1 addition & 1 deletion src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl PyDataFrame {

/// Get the logical plan for this `DataFrame`
fn logical_plan(&self) -> PyResult<PyLogicalPlan> {
Ok(self.df.as_ref().clone().into_optimized_plan()?.into())
Ok(self.df.as_ref().clone().logical_plan().clone().into())
}

/// Get the optimized logical plan for this `DataFrame`
Expand Down
28 changes: 28 additions & 0 deletions src/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,34 @@ impl PyLogicalPlan {
}
}

#[pymethods]
impl PyLogicalPlan {
/// Get the inputs to this plan
pub fn inputs(&self) -> Vec<PyLogicalPlan> {
let mut inputs = vec![];
for input in self.plan.inputs() {
inputs.push(input.to_owned().into());
}
inputs
}

pub fn display(&self) -> String {
format!("{}", self.plan.display())
}

pub fn display_indent(&self) -> String {
format!("{}", self.plan.display_indent())
}

pub fn display_indent_schema(&self) -> String {
format!("{}", self.plan.display_indent_schema())
}

pub fn display_graphviz(&self) -> String {
format!("{}", self.plan.display_indent_schema())
}
}

impl From<PyLogicalPlan> for LogicalPlan {
fn from(logical_plan: PyLogicalPlan) -> LogicalPlan {
logical_plan.plan.as_ref().clone()
Expand Down
24 changes: 23 additions & 1 deletion src/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use std::sync::Arc;

use pyo3::prelude::*;
Expand All @@ -33,6 +33,28 @@ impl PyExecutionPlan {
}
}

#[pymethods]
impl PyExecutionPlan {
/// Get the inputs to this plan
pub fn children(&self) -> Vec<PyExecutionPlan> {
self.plan
.children()
.iter()
.map(|p| p.to_owned().into())
.collect()
}

pub fn display(&self) -> String {
let d = displayable(self.plan.as_ref());
format!("{}", d.one_line())
}

pub fn display_indent(&self) -> String {
let d = displayable(self.plan.as_ref());
format!("{}", d.indent())
}
}

impl From<PyExecutionPlan> for Arc<dyn ExecutionPlan> {
fn from(plan: PyExecutionPlan) -> Arc<dyn ExecutionPlan> {
plan.plan.clone()
Expand Down