From fb888d1984e9d7e56e47ecef58d87e2a11a66035 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Thu, 25 May 2023 14:42:00 -0400 Subject: [PATCH 1/9] checkpoint commit --- .gitignore | 5 +- datafusion/cudf.py | 116 ++++++++++- datafusion/datacontainer.py | 290 ++++++++++++++++++++++++++++ datafusion/input_utils/__init__.py | 10 + datafusion/input_utils/base.py | 11 ++ datafusion/input_utils/convert.py | 91 +++++++++ datafusion/input_utils/cudflike.py | 22 +++ datafusion/input_utils/location.py | 32 ++++ src/common.rs | 7 + src/common/function.rs | 55 ++++++ src/common/schema.rs | 296 +++++++++++++++++++++++++++++ 11 files changed, 931 insertions(+), 4 deletions(-) create mode 100644 datafusion/datacontainer.py create mode 100644 datafusion/input_utils/__init__.py create mode 100644 datafusion/input_utils/base.py create mode 100644 datafusion/input_utils/convert.py create mode 100644 datafusion/input_utils/cudflike.py create mode 100644 datafusion/input_utils/location.py create mode 100644 src/common/function.rs create mode 100644 src/common/schema.rs diff --git a/.gitignore b/.gitignore index 1d0a84a43..86157dd58 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,9 @@ __pycache__/ # C extensions *.so +# Python dist +dist + # pyenv # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: @@ -24,4 +27,4 @@ apache-rat-*.jar .env CHANGELOG.md.bak -docs/mdbook/book \ No newline at end of file +docs/mdbook/book diff --git a/datafusion/cudf.py b/datafusion/cudf.py index d5f02156f..9a64be218 100644 --- a/datafusion/cudf.py +++ b/datafusion/cudf.py @@ -15,15 +15,125 @@ # specific language governing permissions and limitations # under the License. +import logging import cudf import datafusion from datafusion.expr import Projection, TableScan, Column +from datafusion.datacontainer import ( + UDF, + DataContainer, + FunctionDescription, + SchemaContainer, + Statistics, +) + +from datafusion.common import ( + SqlTable, + SqlSchema, +) + +from datafusion import input_utils +from datafusion.input_utils import InputType, InputUtil + +logger = logging.getLogger(__name__) class SessionContext: - def __init__(self): - self.datafusion_ctx = datafusion.SessionContext() - self.parquet_tables = {} + + DEFAULT_CATALOG_NAME = "datafusion" + DEFAULT_SCHEMA_NAME = "root" + + def __init__(self, context=None, logging_level=logging.INFO): + """ + Create a new Session. + """ + self.context = context if not context else datafusion.SessionContext() + + # Set the logging level for this SQL context + logging.basicConfig(level=logging_level) + + # Name of the root catalog + self.catalog_name = self.DEFAULT_CATALOG_NAME + # Name of the root schema + self.schema_name = self.DEFAULT_SCHEMA_NAME + # All schema information + self.schema = {self.schema_name: SchemaContainer(self.schema_name)} + + self.context.register_schema(self.schema_name, SqlSchema(self.schema_name)) + + # Register default `InputPlugins` that will be used to + # understand and consume data in different formats + InputUtil.add_plugin_class(input_utils.DataFrameInputPlugin, replace=False) # Existing cudf.DataFrame object + InputUtil.add_plugin_class(input_utils.LocationInputPlugin, replace=False) # File location on disk + + def create_table( + self, + table_name: str, + input_table: InputType, + format: str = None, + schema_name: str = None, + statistics: Statistics = None, + **kwargs, + ): + """ + Registering a cudf DataFrame/table makes it usable in SQL queries. + The name you give here can be used as table name in the SQL later. + + Please note, that the table is stored as it is now. + If you change the table later, you need to re-register. + + Example: + This code registers a data frame as table "data" + and then uses it in a query. + + .. code-block:: python + + c.create_table("data", df) + df_result = c.sql("SELECT a, b FROM data") + + This code reads a file from disk. + Please note that we assume that the file(s) are reachable under this path + from every node in the cluster + + .. code-block:: python + + c.create_table("data", "/home/user/data.csv") + df_result = c.sql("SELECT a, b FROM data") + + Args: + table_name: (:obj:`str`): Under which name should the new table be addressable + input_table (:class:`dask.dataframe.DataFrame` or :class:`pandas.DataFrame` or :obj:`str` or :class:`hive.Cursor`): + The data frame/location/hive connection to register. + format (:obj:`str`): Only used when passing a string into the ``input`` parameter. + Specify the file format directly here if it can not be deduced from the extension. + If set to "memory", load the data from a published dataset in the dask cluster. + schema_name: (:obj:`str`): in which schema to create the table. By default, will use the currently selected schema. + statistics: (:obj:`Statistics`): if given, use these statistics during the cost-based optimization. + **kwargs: Additional arguments for specific formats. See :ref:`data_input` for more information. + + """ + logger.debug( + f"Creating table: '{table_name}' of format type '{format}' in schema '{schema_name}'" + ) + + schema_name = schema_name or self.schema_name + + dc = InputUtil.to_dc( + input_table, + table_name=table_name, + format=format, + **kwargs, + ) + + if type(input_table) == str: + dc.filepath = input_table + self.schema[schema_name].filepaths[table_name.lower()] = input_table + + # TODO: Implement reading physical statistics + dc.statistics = Statistics(float("nan")) + + self.schema[schema_name].tables[table_name.lower()] = dc + self.schema[schema_name].statistics[table_name.lower()] = statistics def register_parquet(self, name, path): self.parquet_tables[name] = path diff --git a/datafusion/datacontainer.py b/datafusion/datacontainer.py new file mode 100644 index 000000000..76bf7a14a --- /dev/null +++ b/datafusion/datacontainer.py @@ -0,0 +1,290 @@ +from collections import namedtuple +from typing import Any, Dict, List, Tuple, Union + +import cudf +import xdf as pd + +ColumnType = Union[str, int] + +FunctionDescription = namedtuple( + "FunctionDescription", ["name", "parameters", "return_type", "aggregation"] +) + + +class ColumnContainer: + # Forward declaration + pass + + +class ColumnContainer: + """ + Helper class to store a list of columns, + which do not necessarily be the ones of the dask dataframe. + Instead, the container also stores a mapping from "frontend" + columns (columns with the names and order expected by SQL) + to "backend" columns (the real column names used by dask) + to prevent unnecessary renames. + """ + + def __init__( + self, + frontend_columns: List[str], + frontend_backend_mapping: Union[Dict[str, ColumnType], None] = None, + ): + assert all( + isinstance(col, str) for col in frontend_columns + ), "All frontend columns need to be of string type" + self._frontend_columns = list(frontend_columns) + if frontend_backend_mapping is None: + self._frontend_backend_mapping = { + col: col for col in self._frontend_columns + } + else: + self._frontend_backend_mapping = frontend_backend_mapping + + def _copy(self) -> ColumnContainer: + """ + Internal function to copy this container + """ + return ColumnContainer( + self._frontend_columns.copy(), self._frontend_backend_mapping.copy() + ) + + def limit_to(self, fields: List[str]) -> ColumnContainer: + """ + Create a new ColumnContainer, which has frontend columns + limited to only the ones given as parameter. + Also uses the order of these as the new column order. + """ + if not fields: + return self # pragma: no cover + + assert all(f in self._frontend_backend_mapping for f in fields) + cc = self._copy() + cc._frontend_columns = [str(x) for x in fields] + return cc + + def rename(self, columns: Dict[str, str]) -> ColumnContainer: + """ + Return a new ColumnContainer where the frontend columns + are renamed according to the given mapping. + Columns not present in the mapping are not touched, + the order is preserved. + """ + cc = self._copy() + for column_from, column_to in columns.items(): + backend_column = self._frontend_backend_mapping[str(column_from)] + cc._frontend_backend_mapping[str(column_to)] = backend_column + + cc._frontend_columns = [ + str(columns[col]) if col in columns else col + for col in self._frontend_columns + ] + + return cc + + def rename_handle_duplicates( + self, from_columns: List[str], to_columns: List[str] + ) -> ColumnContainer: + """ + Same as `rename` but additionally handles presence of + duplicates in `from_columns` + """ + cc = self._copy() + cc._frontend_backend_mapping.update( + { + str(column_to): self._frontend_backend_mapping[str(column_from)] + for column_from, column_to in zip(from_columns, to_columns) + } + ) + + columns = dict(zip(from_columns, to_columns)) + cc._frontend_columns = [ + str(columns.get(col, col)) for col in self._frontend_columns + ] + + return cc + + def mapping(self) -> List[Tuple[str, ColumnType]]: + """ + The mapping from frontend columns to backend columns. + """ + return list(self._frontend_backend_mapping.items()) + + @property + def columns(self) -> List[str]: + """ + The stored frontend columns in the correct order + """ + return self._frontend_columns.copy() + + def add( + self, frontend_column: str, backend_column: Union[str, None] = None + ) -> ColumnContainer: + """ + Return a new ColumnContainer with the + given column added. + The column is added at the last position in the column list. + """ + cc = self._copy() + + frontend_column = str(frontend_column) + + cc._frontend_backend_mapping[frontend_column] = str( + backend_column or frontend_column + ) + if frontend_column not in cc._frontend_columns: + cc._frontend_columns.append(frontend_column) + + return cc + + def get_backend_by_frontend_index(self, index: int) -> str: + """ + Get back the dask column, which is referenced by the + frontend (SQL) column with the given index. + """ + frontend_column = self._frontend_columns[index] + backend_column = self._frontend_backend_mapping[frontend_column] + return backend_column + + def get_backend_by_frontend_name(self, column: str) -> str: + """ + Get back the dask column, which is referenced by the + frontend (SQL) column with the given name. + """ + + try: + return self._frontend_backend_mapping[column] + except KeyError: + return column + + def make_unique(self, prefix="col"): + """ + Make sure we have unique column names by calling each column + + _ + + where is the column index. + """ + return self.rename( + columns={str(col): f"{prefix}_{i}" for i, col in enumerate(self.columns)} + ) + + +class Statistics: + """ + Statistics are used during the cost-based optimization. + Currently, only the row count is supported, more + properties might follow. It needs to be provided by the user. + """ + + def __init__(self, row_count: int) -> None: + self.row_count = row_count + + def __eq__(self, other): + if isinstance(other, Statistics): + return self.row_count == other.row_count + return False + + +class DataContainer: + """ + In SQL, every column operation or reference is done via + the column index. Some dask operations, such as grouping, + joining or concatenating preserve the columns in a different + order than SQL would expect. + However, we do not want to change the column data itself + all the time (because this would lead to computational overhead), + but still would like to keep the columns accessible by name and index. + For this, we add an additional `ColumnContainer` to each dataframe, + which does all the column mapping between "frontend" + (what SQL expects, also in the correct order) + and "backend" (what dask has). + """ + + def __init__( + self, + df: cudf.DataFrame, + column_container: ColumnContainer, + statistics: Statistics = None, + filepath: str = None, + ): + self.df = df + self.column_container = column_container + self.statistics = statistics + self.filepath = filepath + + def assign(self) -> cudf.DataFrame: + """ + Combine the column mapping with the actual data and return + a dataframe which has the the columns specified in the + stored ColumnContainer. + """ + df = self.df[ + [ + self.column_container._frontend_backend_mapping[out_col] + for out_col in self.column_container.columns + ] + ] + df.columns = self.column_container.columns + + return df + + +class UDF: + def __init__(self, func, row_udf: bool, params, return_type=None): + """ + Helper class that handles different types of UDFs and manages + how they should be mapped to dask operations. Two versions of + UDFs are supported - when `row_udf=False`, the UDF is treated + as expecting series-like objects as arguments and will simply + run those through the function. When `row_udf=True` a row udf + is expected and should be written to expect a dictlike object + containing scalars + """ + self.row_udf = row_udf + self.func = func + + self.names = [param[0] for param in params] + + self.meta = (None, return_type) + + def __call__(self, *args, **kwargs): + if self.row_udf: + column_args = [] + scalar_args = [] + for operand in args: + if isinstance(operand, cudf.Series): + column_args.append(operand) + else: + scalar_args.append(operand) + + df = column_args[0].to_frame(self.names[0]) + for name, col in zip(self.names[1:], column_args[1:]): + df[name] = col + result = df.apply( + self.func, axis=1, args=tuple(scalar_args), meta=self.meta + ).astype(self.meta[1]) + else: + result = self.func(*args, **kwargs) + return result + + def __eq__(self, other): + if isinstance(other, UDF): + return self.func == other.func and self.row_udf == other.row_udf + return NotImplemented + + def __hash__(self): + return (self.func, self.row_udf).__hash__() + + +class SchemaContainer: + def __init__(self, name: str): + self.__name__ = name + self.tables: Dict[str, DataContainer] = {} + self.statistics: Dict[str, Statistics] = {} + self.experiments: Dict[str, pd.DataFrame] = {} + self.models: Dict[str, Tuple[Any, List[str]]] = {} + self.functions: Dict[str, UDF] = {} + self.function_lists: List[FunctionDescription] = [] + self.filepaths: Dict[str, str] = {} diff --git a/datafusion/input_utils/__init__.py b/datafusion/input_utils/__init__.py new file mode 100644 index 000000000..9e8558d69 --- /dev/null +++ b/datafusion/input_utils/__init__.py @@ -0,0 +1,10 @@ +from .convert import InputType, InputUtil +from .location import LocationInputPlugin +from .cudflike import CudfLikeInputPlugin + +__all__ = [ + InputUtil, + InputType, + LocationInputPlugin, + CudfLikeInputPlugin, +] diff --git a/datafusion/input_utils/base.py b/datafusion/input_utils/base.py new file mode 100644 index 000000000..90db678e7 --- /dev/null +++ b/datafusion/input_utils/base.py @@ -0,0 +1,11 @@ +from typing import Any + + +class BaseInputPlugin: + def is_correct_input( + self, input_item: Any, table_name: str, format: str = None, **kwargs + ): + raise NotImplementedError + + def to_dc(self, input_item: Any, table_name: str, format: str = None, **kwargs): + raise NotImplementedError diff --git a/datafusion/input_utils/convert.py b/datafusion/input_utils/convert.py new file mode 100644 index 000000000..60499beec --- /dev/null +++ b/datafusion/input_utils/convert.py @@ -0,0 +1,91 @@ +import logging +from typing import TYPE_CHECKING, Union + +try: + # Attempt to use GPU acceleration if present + import cudf as xdf +except ImportError: + import pandas as xdf + +from cusql.datacontainer import ColumnContainer, DataContainer +from cusql.input_utils.base import BaseInputPlugin +from cusql.utils import Pluggable + +logger = logging.Logger(__name__) + +InputType = Union[ + xdf.DataFrame, + str, + Union[ + "sqlalchemy.engine.base.Connection", + "hive.Cursor", + "cudf.core.dataframe.DataFrame", + ], +] + + +class InputUtil(Pluggable): + """ + Plugin list and helper class for transforming the inputs to + create table into a dask dataframe + """ + + @classmethod + def add_plugin_class(cls, plugin_class: BaseInputPlugin, replace=True): + """Convenience function to add a class directly to the plugins""" + logger.debug(f"Registering Input plugin for {plugin_class}") + cls.add_plugin(str(plugin_class), plugin_class(), replace=replace) + + @classmethod + def to_dc( + cls, + input_item: InputType, + table_name: str, + format: str = None, + persist: bool = True, + gpu: bool = False, + **kwargs, + ) -> DataContainer: + """ + Turn possible input descriptions or formats (e.g. dask dataframes, pandas dataframes, + locations as string, hive tables) into the loaded data containers, + maybe persist them to cluster memory before. + """ + filled_get_dask_dataframe = lambda *args: cls._get_dask_dataframe( + *args, + table_name=table_name, + format=format, + gpu=gpu, + **kwargs, + ) + + if isinstance(input_item, list): + table = dd.concat([filled_get_dask_dataframe(item) for item in input_item]) + else: + table = filled_get_dask_dataframe(input_item) + + if persist: + table = table.persist() + + return DataContainer(table.copy(), ColumnContainer(table.columns)) + + @classmethod + def _get_dask_dataframe( + cls, + input_item: InputType, + table_name: str, + format: str = None, + gpu: bool = False, + **kwargs, + ): + plugin_list = cls.get_plugins() + + for plugin in plugin_list: + if plugin.is_correct_input( + input_item, table_name=table_name, format=format, **kwargs + ): + return plugin.to_dc( + input_item, table_name=table_name, format=format, gpu=gpu, **kwargs + ) + + raise ValueError(f"Do not understand the input type {type(input_item)}") diff --git a/datafusion/input_utils/cudflike.py b/datafusion/input_utils/cudflike.py new file mode 100644 index 000000000..082a2786c --- /dev/null +++ b/datafusion/input_utils/cudflike.py @@ -0,0 +1,22 @@ +import cudf + +from datafusion.input_utils.base import BaseInputPlugin + + +class CudfLikeInputPlugin(BaseInputPlugin): + """Input Plugin for Pandas Like DataFrames, which get converted to dask DataFrames""" + + def is_correct_input( + self, input_item, table_name: str, format: str = None, **kwargs + ): + return hasattr(input_item, "__dataframe__") + + def to_dc( + self, + input_item, + table_name: str, + format: str = None, + gpu: bool = False, + **kwargs, + ): + return input_item diff --git a/datafusion/input_utils/location.py b/datafusion/input_utils/location.py new file mode 100644 index 000000000..b5fc7b99f --- /dev/null +++ b/datafusion/input_utils/location.py @@ -0,0 +1,32 @@ +import os +import cudf +from typing import Any + +from cusql.input_utils.base import BaseInputPlugin +from cusql.input_utils.convert import InputUtil + + +class LocationInputPlugin(BaseInputPlugin): + """Input Plugin for everything, which can be read in from a file (on disk, remote etc.)""" + + def is_correct_input( + self, input_item: Any, table_name: str, format: str = None, **kwargs + ): + return isinstance(input_item, str) + + def to_dc( + self, + input_item: Any, + format: str = None, + **kwargs, + ): + if not format: + _, extension = os.path.splitext(input_item) + + format = extension.lstrip(".") + try: + read_function = getattr(cudf, f"read_{format}") + except AttributeError: + raise AttributeError(f"Can not read files of format {format}") + + return read_function(input_item, **kwargs) diff --git a/src/common.rs b/src/common.rs index 8a8e2adf5..45523173c 100644 --- a/src/common.rs +++ b/src/common.rs @@ -20,6 +20,8 @@ use pyo3::prelude::*; pub mod data_type; pub mod df_field; pub mod df_schema; +pub mod function; +pub mod schema; /// Initializes the `common` module to match the pattern of `datafusion-common` https://docs.rs/datafusion-common/18.0.0/datafusion_common/index.html pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { @@ -29,5 +31,10 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/common/function.rs b/src/common/function.rs new file mode 100644 index 000000000..a8d752f16 --- /dev/null +++ b/src/common/function.rs @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use datafusion::arrow::datatypes::DataType; +use pyo3::prelude::*; + +use super::data_type::PyDataType; + +#[pyclass(name = "SqlFunction", module = "datafusion.common", subclass)] +#[derive(Debug, Clone)] +pub struct SqlFunction { + pub name: String, + pub return_types: HashMap, DataType>, + pub aggregation: bool, +} + +impl SqlFunction { + pub fn new( + function_name: String, + input_types: Vec, + return_type: PyDataType, + aggregation_bool: bool, + ) -> Self { + let mut func = Self { + name: function_name, + return_types: HashMap::new(), + aggregation: aggregation_bool, + }; + func.add_type_mapping(input_types, return_type); + func + } + + pub fn add_type_mapping(&mut self, input_types: Vec, return_type: PyDataType) { + self.return_types.insert( + input_types.iter().map(|t| t.clone().into()).collect(), + return_type.into(), + ); + } +} diff --git a/src/common/schema.rs b/src/common/schema.rs new file mode 100644 index 000000000..197915906 --- /dev/null +++ b/src/common/schema.rs @@ -0,0 +1,296 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use datafusion::arrow::datatypes::SchemaRef; +use datafusion_expr::{TableSource, TableProviderFilterPushDown, Expr}; +use pyo3::prelude::*; + +use datafusion_optimizer::utils::split_conjunction; + +use super::{function::SqlFunction, data_type::DataTypeMap}; + +#[pyclass(name = "SqlSchema", module = "datafusion.common", subclass)] +#[derive(Debug, Clone)] +pub struct SqlSchema { + #[pyo3(get, set)] + pub name: String, + #[pyo3(get, set)] + pub tables: Vec, + #[pyo3(get, set)] + pub views: Vec, + #[pyo3(get, set)] + pub functions: Vec, +} + +#[pyclass(name = "SqlTable", module = "datafusion.common", subclass)] +#[derive(Debug, Clone)] +pub struct SqlTable { + #[pyo3(get, set)] + pub name: String, + #[pyo3(get, set)] + pub columns: Vec<(String, DataTypeMap)>, + #[pyo3(get, set)] + pub primary_key: Option, + #[pyo3(get, set)] + pub foreign_keys: Vec, + #[pyo3(get, set)] + pub indexes: Vec, + #[pyo3(get, set)] + pub constraints: Vec, + #[pyo3(get, set)] + pub statistics: SqlStatistics, + #[pyo3(get, set)] + pub filepath: Option, +} + +#[pymethods] +impl SqlTable { + #[new] + pub fn new( + _schema_name: String, + table_name: String, + columns: Vec<(String, DataTypeMap)>, + // primary_key: Option, + // foreign_keys: Vec, + // indexes: Vec, + // constraints: Vec, + row_count: f64, + filepath: Option, + ) -> Self { + Self { + name: table_name, + columns, + primary_key: None, + foreign_keys: Vec::new(), + indexes: Vec::new(), + constraints: Vec::new(), + statistics: SqlStatistics::new(row_count), + filepath, + } + } + + // // TODO: Really wish we could accept a SqlTypeName instance here instead of a String for `column_type` .... + // #[pyo3(name = "add_column")] + // pub fn add_column(&mut self, column_name: &str, type_map: DaskTypeMap) { + // self.columns.push((column_name.to_owned(), type_map)); + // } + + // #[pyo3(name = "getSchema")] + // pub fn get_schema(&self) -> PyResult> { + // Ok(self.schema_name.clone()) + // } + + // #[pyo3(name = "getTableName")] + // pub fn get_table_name(&self) -> PyResult { + // Ok(self.table_name.clone()) + // } + + // #[pyo3(name = "getQualifiedName")] + // pub fn qualified_name(&self, plan: logical::PyLogicalPlan) -> Vec { + // let mut qualified_name = match &self.schema_name { + // Some(schema_name) => vec![schema_name.clone()], + // None => vec![], + // }; + + // match plan.original_plan { + // LogicalPlan::TableScan(table_scan) => { + // qualified_name.push(table_scan.table_name.to_string()); + // } + // _ => { + // qualified_name.push(self.table_name.clone()); + // } + // } + + // qualified_name + // } + + // #[pyo3(name = "getRowType")] + // pub fn row_type(&self) -> RelDataType { + // let mut fields: Vec = Vec::new(); + // for (name, data_type) in &self.columns { + // fields.push(RelDataTypeField::new(name.as_str(), data_type.clone(), 255)); + // } + // RelDataType::new(false, fields) + // } +} + +#[pyclass(name = "SqlView", module = "datafusion.common", subclass)] +#[derive(Debug, Clone)] +pub struct SqlView { + #[pyo3(get, set)] + pub name: String, + #[pyo3(get, set)] + pub definition: String, // SQL code that defines the view +} + + +#[pymethods] +impl SqlSchema { + #[new] + pub fn new(schema_name: &str) -> Self { + Self { + name: schema_name.to_owned(), + tables: Vec::new(), + views: Vec::new(), + functions: Vec::new(), + } + } + + pub fn table_by_name(&self, table_name: &str) -> Option { + for tbl in &self.tables { + if tbl.name.eq(table_name) { + return Some(tbl.clone()); + } + } + None + } + + pub fn add_table(&mut self, table: SqlTable) { + self.tables.push(table); + } + + // pub fn add_or_overload_function( + // &mut self, + // name: String, + // input_types: Vec, + // return_type: PyDataType, + // aggregation: bool, + // ) { + // self.functions + // .entry(name.clone()) + // .and_modify(|e| { + // (*e).lock() + // .unwrap() + // .add_type_mapping(input_types.clone(), return_type.clone()); + // }) + // .or_insert_with(|| { + // Arc::new(Mutex::new(SQLFunction::new( + // name, + // input_types, + // return_type, + // aggregation, + // ))) + // }); + // } +} + + +/// SqlTable wrapper that is compatible with DataFusion logical query plans +pub struct SqlTableSource { + schema: SchemaRef, + statistics: Option, + filepath: Option, +} + +impl SqlTableSource { + /// Initialize a new `EmptyTable` from a schema + pub fn new( + schema: SchemaRef, + statistics: Option, + filepath: Option, + ) -> Self { + Self { + schema, + statistics, + filepath, + } + } + + /// Access optional statistics associated with this table source + pub fn statistics(&self) -> Option<&SqlStatistics> { + self.statistics.as_ref() + } + + /// Access optional filepath associated with this table source + #[allow(dead_code)] + pub fn filepath(&self) -> Option<&String> { + self.filepath.as_ref() + } +} + +/// Implement TableSource, used in the logical query plan and in logical query optimizations +impl TableSource for SqlTableSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn supports_filter_pushdown( + &self, + filter: &Expr, + ) -> datafusion_common::Result { + let filters = split_conjunction(filter); + if filters.iter().all(|f| is_supported_push_down_expr(f)) { + // Push down filters to the tablescan operation if all are supported + Ok(TableProviderFilterPushDown::Exact) + } else if filters.iter().any(|f| is_supported_push_down_expr(f)) { + // Partially apply the filter in the TableScan but retain + // the Filter operator in the plan as well + Ok(TableProviderFilterPushDown::Inexact) + } else { + Ok(TableProviderFilterPushDown::Unsupported) + } + } + + fn table_type(&self) -> datafusion_expr::TableType { + datafusion_expr::TableType::Base + } + + #[allow(deprecated)] + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion_common::Result> { + filters + .iter() + .map(|f| self.supports_filter_pushdown(f)) + .collect() + } + + fn get_logical_plan(&self) -> Option<&datafusion_expr::LogicalPlan> { + None + } +} + +fn is_supported_push_down_expr(_expr: &Expr) -> bool { + // For now we support all kinds of expr's at this level + true +} + +#[pyclass(name = "SqlStatistics", module = "datafusion.common", subclass)] +#[derive(Debug, Clone)] +pub struct SqlStatistics { + row_count: f64, +} + +#[pymethods] +impl SqlStatistics { + #[new] + pub fn new(row_count: f64) -> Self { + Self { row_count } + } + + #[pyo3(name = "getRowCount")] + pub fn get_row_count(&self) -> f64 { + self.row_count + } +} From fdb07bba61449023ead1b48cb4d5cefa38b0defb Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 26 May 2023 11:06:40 -0400 Subject: [PATCH 2/9] Introduce BaseSessionContext abstract class --- .gitignore | 5 ++- datafusion/context.py | 45 +++++++++++++++++++++++++++ datafusion/cudf.py | 14 ++++++--- datafusion/pandas.py | 14 ++++++--- datafusion/polars.py | 14 ++++++--- docs/mdbook/src/usage/create-table.md | 2 +- examples/sql-on-cudf.py | 2 +- examples/sql-on-pandas.py | 2 +- examples/sql-on-polars.py | 2 +- 9 files changed, 80 insertions(+), 20 deletions(-) create mode 100644 datafusion/context.py diff --git a/.gitignore b/.gitignore index 1d0a84a43..365b89d5c 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,9 @@ __pycache__/ *.py[cod] *$py.class +# Python dist ignore +dist + # C extensions *.so @@ -24,4 +27,4 @@ apache-rat-*.jar .env CHANGELOG.md.bak -docs/mdbook/book \ No newline at end of file +docs/mdbook/book diff --git a/datafusion/context.py b/datafusion/context.py new file mode 100644 index 000000000..aa9c9a8af --- /dev/null +++ b/datafusion/context.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from abc import ABC, abstractmethod + + +class BaseSessionContext(ABC): + """ + Abstraction defining all methods, properties, and common functionality + shared amongst implementations using DataFusion as their SQL Parser/Engine + """ + + @abstractmethod + def register_table( + self, + table_name: str, + path: str, + **kwargs, + ): + pass + + # TODO: Remove abstraction, this functionality can be shared + # between all implementing classes since it just prints the + # logical plan from DataFusion + @abstractmethod + def explain(self, sql): + pass + + @abstractmethod + def sql(self, sql): + pass diff --git a/datafusion/cudf.py b/datafusion/cudf.py index d5f02156f..594e5efea 100644 --- a/datafusion/cudf.py +++ b/datafusion/cudf.py @@ -17,18 +17,15 @@ import cudf import datafusion +from datafusion.context import BaseSessionContext from datafusion.expr import Projection, TableScan, Column -class SessionContext: +class SessionContext(BaseSessionContext): def __init__(self): self.datafusion_ctx = datafusion.SessionContext() self.parquet_tables = {} - def register_parquet(self, name, path): - self.parquet_tables[name] = path - self.datafusion_ctx.register_parquet(name, path) - def to_cudf_expr(self, expr): # get Python wrapper for logical expression expr = expr.to_variant() @@ -55,6 +52,13 @@ def to_cudf_df(self, plan): "unsupported logical operator: {}".format(type(node)) ) + def register_table(self, name, path, **kwargs): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def explain(self, sql): + super.explain() + def sql(self, sql): datafusion_df = self.datafusion_ctx.sql(sql) plan = datafusion_df.logical_plan() diff --git a/datafusion/pandas.py b/datafusion/pandas.py index f8e56512b..935d9619b 100644 --- a/datafusion/pandas.py +++ b/datafusion/pandas.py @@ -17,18 +17,15 @@ import pandas as pd import datafusion +from datafusion.context import BaseSessionContext from datafusion.expr import Projection, TableScan, Column -class SessionContext: +class SessionContext(BaseSessionContext): def __init__(self): self.datafusion_ctx = datafusion.SessionContext() self.parquet_tables = {} - def register_parquet(self, name, path): - self.parquet_tables[name] = path - self.datafusion_ctx.register_parquet(name, path) - def to_pandas_expr(self, expr): # get Python wrapper for logical expression expr = expr.to_variant() @@ -55,6 +52,13 @@ def to_pandas_df(self, plan): "unsupported logical operator: {}".format(type(node)) ) + def register_table(self, name, path, **kwargs): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def explain(self, sql): + super.explain() + def sql(self, sql): datafusion_df = self.datafusion_ctx.sql(sql) plan = datafusion_df.logical_plan() diff --git a/datafusion/polars.py b/datafusion/polars.py index a1bafbef8..bbc1fd7c2 100644 --- a/datafusion/polars.py +++ b/datafusion/polars.py @@ -17,19 +17,16 @@ import polars import datafusion +from datafusion.context import BaseSessionContext from datafusion.expr import Projection, TableScan, Aggregate from datafusion.expr import Column, AggregateFunction -class SessionContext: +class SessionContext(BaseSessionContext): def __init__(self): self.datafusion_ctx = datafusion.SessionContext() self.parquet_tables = {} - def register_parquet(self, name, path): - self.parquet_tables[name] = path - self.datafusion_ctx.register_parquet(name, path) - def to_polars_expr(self, expr): # get Python wrapper for logical expression expr = expr.to_variant() @@ -78,6 +75,13 @@ def to_polars_df(self, plan): "unsupported logical operator: {}".format(type(node)) ) + def register_table(self, name, path, **kwargs): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def explain(self, sql): + super.explain() + def sql(self, sql): datafusion_df = self.datafusion_ctx.sql(sql) plan = datafusion_df.logical_plan() diff --git a/docs/mdbook/src/usage/create-table.md b/docs/mdbook/src/usage/create-table.md index 332863a16..98870fac0 100644 --- a/docs/mdbook/src/usage/create-table.md +++ b/docs/mdbook/src/usage/create-table.md @@ -55,5 +55,5 @@ ctx.register_csv("csv_1e8", "G1_1e8_1e2_0_0.csv") You can read a Parquet file into a DataFusion DataFrame. Here's how to read the `yellow_tripdata_2021-01.parquet` file into a table named `taxi`. ```python -ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") +ctx.register_table("taxi", "yellow_tripdata_2021-01.parquet") ``` diff --git a/examples/sql-on-cudf.py b/examples/sql-on-cudf.py index 999756fc8..b64d8f046 100644 --- a/examples/sql-on-cudf.py +++ b/examples/sql-on-cudf.py @@ -19,6 +19,6 @@ ctx = SessionContext() -ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") +ctx.register_table("taxi", "yellow_tripdata_2021-01.parquet") df = ctx.sql("select passenger_count from taxi") print(df) diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py index 0efd77631..e3312a201 100644 --- a/examples/sql-on-pandas.py +++ b/examples/sql-on-pandas.py @@ -19,6 +19,6 @@ ctx = SessionContext() -ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") +ctx.register_table("taxi", "yellow_tripdata_2021-01.parquet") df = ctx.sql("select passenger_count from taxi") print(df) diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py index c208114c1..dd7a9e021 100644 --- a/examples/sql-on-polars.py +++ b/examples/sql-on-polars.py @@ -19,7 +19,7 @@ ctx = SessionContext() -ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet") +ctx.register_table("taxi", "yellow_tripdata_2021-01.parquet") df = ctx.sql( "select passenger_count, count(*) from taxi group by passenger_count" ) From 585fe3e0e971d5a78edcffb4979a64d323889c4b Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 26 May 2023 14:43:36 -0400 Subject: [PATCH 3/9] Introduce abstract methods for CRUD schema operations --- datafusion/context.py | 52 ++++++ datafusion/cudf.py | 124 +++--------- datafusion/datacontainer.py | 290 ----------------------------- datafusion/input_utils/__init__.py | 10 - datafusion/input_utils/base.py | 11 -- datafusion/input_utils/convert.py | 91 --------- datafusion/input_utils/cudflike.py | 22 --- datafusion/input_utils/location.py | 32 ---- datafusion/pandas.py | 32 +++- datafusion/polars.py | 22 ++- src/common/schema.rs | 6 +- 11 files changed, 129 insertions(+), 563 deletions(-) delete mode 100644 datafusion/datacontainer.py delete mode 100644 datafusion/input_utils/__init__.py delete mode 100644 datafusion/input_utils/base.py delete mode 100644 datafusion/input_utils/convert.py delete mode 100644 datafusion/input_utils/cudflike.py delete mode 100644 datafusion/input_utils/location.py diff --git a/datafusion/context.py b/datafusion/context.py index aa9c9a8af..a36462123 100644 --- a/datafusion/context.py +++ b/datafusion/context.py @@ -16,6 +16,9 @@ # under the License. from abc import ABC, abstractmethod +from typing import Dict + +from datafusion.common import SqlSchema class BaseSessionContext(ABC): @@ -24,6 +27,55 @@ class BaseSessionContext(ABC): shared amongst implementations using DataFusion as their SQL Parser/Engine """ + DEFAULT_CATALOG_NAME = "root" + DEFAULT_SCHEMA_NAME = "datafusion" + + @abstractmethod + def create_schema( + self, + schema_name: str, + **kwargs, + ): + """ + Creates/Registers a logical container that holds database + objects such as tables, views, indexes, and other + related objects. It provides a way to group related database + objects together. A schema can be owned by a database + user and can be used to separate objects in different + logical groups for easy management. + """ + pass + + @abstractmethod + def update_schema( + self, + schema_name: str, + new_schema: SqlSchema, + **kwargs, + ): + """ + Updates an existing schema in the SessionContext + """ + pass + + @abstractmethod + def drop_schema( + self, + schema_name: str, + **kwargs, + ): + """ + Drops the specified Schema, based on name, from the current context + """ + pass + + @abstractmethod + def show_schemas(self, **kwargs) -> Dict[str, SqlSchema]: + """ + Return all schemas in the current SessionContext impl. + """ + pass + @abstractmethod def register_table( self, diff --git a/datafusion/cudf.py b/datafusion/cudf.py index a594197ff..e39daea31 100644 --- a/datafusion/cudf.py +++ b/datafusion/cudf.py @@ -17,38 +17,21 @@ import logging import cudf -import datafusion from datafusion.context import BaseSessionContext from datafusion.expr import Projection, TableScan, Column -from datafusion.datacontainer import ( - UDF, - DataContainer, - FunctionDescription, - SchemaContainer, - Statistics, -) - -from datafusion.common import ( - SqlTable, - SqlSchema, -) - -from datafusion import input_utils -from datafusion.input_utils import InputType, InputUtil +from datafusion.common import SqlSchema logger = logging.getLogger(__name__) -class SessionContext(BaseSessionContext): - - DEFAULT_CATALOG_NAME = "datafusion" - DEFAULT_SCHEMA_NAME = "root" - def __init__(self, context=None, logging_level=logging.INFO): +class SessionContext(BaseSessionContext): + def __init__(self, context, logging_level=logging.INFO): """ Create a new Session. """ - self.context = context if not context else datafusion.SessionContext() + # Cudf requires a provided context + self.context = context # Set the logging level for this SQL context logging.basicConfig(level=logging_level) @@ -57,84 +40,11 @@ def __init__(self, context=None, logging_level=logging.INFO): self.catalog_name = self.DEFAULT_CATALOG_NAME # Name of the root schema self.schema_name = self.DEFAULT_SCHEMA_NAME - # All schema information - self.schema = {self.schema_name: SchemaContainer(self.schema_name)} - - self.context.register_schema(self.schema_name, SqlSchema(self.schema_name)) - - # Register default `InputPlugins` that will be used to - # understand and consume data in different formats - InputUtil.add_plugin_class(input_utils.DataFrameInputPlugin, replace=False) # Existing cudf.DataFrame object - InputUtil.add_plugin_class(input_utils.LocationInputPlugin, replace=False) # File location on disk - - def create_table( - self, - table_name: str, - input_table: InputType, - format: str = None, - schema_name: str = None, - statistics: Statistics = None, - **kwargs, - ): - """ - Registering a cudf DataFrame/table makes it usable in SQL queries. - The name you give here can be used as table name in the SQL later. - - Please note, that the table is stored as it is now. - If you change the table later, you need to re-register. - - Example: - This code registers a data frame as table "data" - and then uses it in a query. - - .. code-block:: python - - c.create_table("data", df) - df_result = c.sql("SELECT a, b FROM data") - - This code reads a file from disk. - Please note that we assume that the file(s) are reachable under this path - from every node in the cluster - - .. code-block:: python - - c.create_table("data", "/home/user/data.csv") - df_result = c.sql("SELECT a, b FROM data") - - Args: - table_name: (:obj:`str`): Under which name should the new table be addressable - input_table (:class:`dask.dataframe.DataFrame` or :class:`pandas.DataFrame` or :obj:`str` or :class:`hive.Cursor`): - The data frame/location/hive connection to register. - format (:obj:`str`): Only used when passing a string into the ``input`` parameter. - Specify the file format directly here if it can not be deduced from the extension. - If set to "memory", load the data from a published dataset in the dask cluster. - schema_name: (:obj:`str`): in which schema to create the table. By default, will use the currently selected schema. - statistics: (:obj:`Statistics`): if given, use these statistics during the cost-based optimization. - **kwargs: Additional arguments for specific formats. See :ref:`data_input` for more information. - - """ - logger.debug( - f"Creating table: '{table_name}' of format type '{format}' in schema '{schema_name}'" - ) - - schema_name = schema_name or self.schema_name - - dc = InputUtil.to_dc( - input_table, - table_name=table_name, - format=format, - **kwargs, - ) - - if type(input_table) == str: - dc.filepath = input_table - self.schema[schema_name].filepaths[table_name.lower()] = input_table - - # TODO: Implement reading physical statistics - dc.statistics = Statistics(float("nan")) - - self.schema[schema_name].tables[table_name.lower()] = dc - self.schema[schema_name].statistics[table_name.lower()] = statistics + # Add the schema to the context + sch = SqlSchema(self.schema_name) + self.schemas = {} + self.schemas[self.schema_name] = sch + self.context.register_schema(self.schema_name, sch) def to_cudf_expr(self, expr): # get Python wrapper for logical expression @@ -162,6 +72,20 @@ def to_cudf_df(self, plan): "unsupported logical operator: {}".format(type(node)) ) + def create_schema(self, schema_name: str, **kwargs): + logger.debug(f"Creating schema: {schema_name}") + self.schemas[schema_name] = SqlSchema(schema_name) + self.context.register_schema(schema_name, SqlSchema(schema_name)) + + def update_schema(self, schema_name: str, new_schema: SqlSchema, **kwargs): + self.schemas[schema_name] = new_schema + + def drop_schema(self, schema_name, **kwargs): + del self.schemas[schema_name] + + def show_schemas(self, **kwargs): + return self.schemas + def register_table(self, name, path, **kwargs): self.parquet_tables[name] = path self.datafusion_ctx.register_parquet(name, path) diff --git a/datafusion/datacontainer.py b/datafusion/datacontainer.py deleted file mode 100644 index 76bf7a14a..000000000 --- a/datafusion/datacontainer.py +++ /dev/null @@ -1,290 +0,0 @@ -from collections import namedtuple -from typing import Any, Dict, List, Tuple, Union - -import cudf -import xdf as pd - -ColumnType = Union[str, int] - -FunctionDescription = namedtuple( - "FunctionDescription", ["name", "parameters", "return_type", "aggregation"] -) - - -class ColumnContainer: - # Forward declaration - pass - - -class ColumnContainer: - """ - Helper class to store a list of columns, - which do not necessarily be the ones of the dask dataframe. - Instead, the container also stores a mapping from "frontend" - columns (columns with the names and order expected by SQL) - to "backend" columns (the real column names used by dask) - to prevent unnecessary renames. - """ - - def __init__( - self, - frontend_columns: List[str], - frontend_backend_mapping: Union[Dict[str, ColumnType], None] = None, - ): - assert all( - isinstance(col, str) for col in frontend_columns - ), "All frontend columns need to be of string type" - self._frontend_columns = list(frontend_columns) - if frontend_backend_mapping is None: - self._frontend_backend_mapping = { - col: col for col in self._frontend_columns - } - else: - self._frontend_backend_mapping = frontend_backend_mapping - - def _copy(self) -> ColumnContainer: - """ - Internal function to copy this container - """ - return ColumnContainer( - self._frontend_columns.copy(), self._frontend_backend_mapping.copy() - ) - - def limit_to(self, fields: List[str]) -> ColumnContainer: - """ - Create a new ColumnContainer, which has frontend columns - limited to only the ones given as parameter. - Also uses the order of these as the new column order. - """ - if not fields: - return self # pragma: no cover - - assert all(f in self._frontend_backend_mapping for f in fields) - cc = self._copy() - cc._frontend_columns = [str(x) for x in fields] - return cc - - def rename(self, columns: Dict[str, str]) -> ColumnContainer: - """ - Return a new ColumnContainer where the frontend columns - are renamed according to the given mapping. - Columns not present in the mapping are not touched, - the order is preserved. - """ - cc = self._copy() - for column_from, column_to in columns.items(): - backend_column = self._frontend_backend_mapping[str(column_from)] - cc._frontend_backend_mapping[str(column_to)] = backend_column - - cc._frontend_columns = [ - str(columns[col]) if col in columns else col - for col in self._frontend_columns - ] - - return cc - - def rename_handle_duplicates( - self, from_columns: List[str], to_columns: List[str] - ) -> ColumnContainer: - """ - Same as `rename` but additionally handles presence of - duplicates in `from_columns` - """ - cc = self._copy() - cc._frontend_backend_mapping.update( - { - str(column_to): self._frontend_backend_mapping[str(column_from)] - for column_from, column_to in zip(from_columns, to_columns) - } - ) - - columns = dict(zip(from_columns, to_columns)) - cc._frontend_columns = [ - str(columns.get(col, col)) for col in self._frontend_columns - ] - - return cc - - def mapping(self) -> List[Tuple[str, ColumnType]]: - """ - The mapping from frontend columns to backend columns. - """ - return list(self._frontend_backend_mapping.items()) - - @property - def columns(self) -> List[str]: - """ - The stored frontend columns in the correct order - """ - return self._frontend_columns.copy() - - def add( - self, frontend_column: str, backend_column: Union[str, None] = None - ) -> ColumnContainer: - """ - Return a new ColumnContainer with the - given column added. - The column is added at the last position in the column list. - """ - cc = self._copy() - - frontend_column = str(frontend_column) - - cc._frontend_backend_mapping[frontend_column] = str( - backend_column or frontend_column - ) - if frontend_column not in cc._frontend_columns: - cc._frontend_columns.append(frontend_column) - - return cc - - def get_backend_by_frontend_index(self, index: int) -> str: - """ - Get back the dask column, which is referenced by the - frontend (SQL) column with the given index. - """ - frontend_column = self._frontend_columns[index] - backend_column = self._frontend_backend_mapping[frontend_column] - return backend_column - - def get_backend_by_frontend_name(self, column: str) -> str: - """ - Get back the dask column, which is referenced by the - frontend (SQL) column with the given name. - """ - - try: - return self._frontend_backend_mapping[column] - except KeyError: - return column - - def make_unique(self, prefix="col"): - """ - Make sure we have unique column names by calling each column - - _ - - where is the column index. - """ - return self.rename( - columns={str(col): f"{prefix}_{i}" for i, col in enumerate(self.columns)} - ) - - -class Statistics: - """ - Statistics are used during the cost-based optimization. - Currently, only the row count is supported, more - properties might follow. It needs to be provided by the user. - """ - - def __init__(self, row_count: int) -> None: - self.row_count = row_count - - def __eq__(self, other): - if isinstance(other, Statistics): - return self.row_count == other.row_count - return False - - -class DataContainer: - """ - In SQL, every column operation or reference is done via - the column index. Some dask operations, such as grouping, - joining or concatenating preserve the columns in a different - order than SQL would expect. - However, we do not want to change the column data itself - all the time (because this would lead to computational overhead), - but still would like to keep the columns accessible by name and index. - For this, we add an additional `ColumnContainer` to each dataframe, - which does all the column mapping between "frontend" - (what SQL expects, also in the correct order) - and "backend" (what dask has). - """ - - def __init__( - self, - df: cudf.DataFrame, - column_container: ColumnContainer, - statistics: Statistics = None, - filepath: str = None, - ): - self.df = df - self.column_container = column_container - self.statistics = statistics - self.filepath = filepath - - def assign(self) -> cudf.DataFrame: - """ - Combine the column mapping with the actual data and return - a dataframe which has the the columns specified in the - stored ColumnContainer. - """ - df = self.df[ - [ - self.column_container._frontend_backend_mapping[out_col] - for out_col in self.column_container.columns - ] - ] - df.columns = self.column_container.columns - - return df - - -class UDF: - def __init__(self, func, row_udf: bool, params, return_type=None): - """ - Helper class that handles different types of UDFs and manages - how they should be mapped to dask operations. Two versions of - UDFs are supported - when `row_udf=False`, the UDF is treated - as expecting series-like objects as arguments and will simply - run those through the function. When `row_udf=True` a row udf - is expected and should be written to expect a dictlike object - containing scalars - """ - self.row_udf = row_udf - self.func = func - - self.names = [param[0] for param in params] - - self.meta = (None, return_type) - - def __call__(self, *args, **kwargs): - if self.row_udf: - column_args = [] - scalar_args = [] - for operand in args: - if isinstance(operand, cudf.Series): - column_args.append(operand) - else: - scalar_args.append(operand) - - df = column_args[0].to_frame(self.names[0]) - for name, col in zip(self.names[1:], column_args[1:]): - df[name] = col - result = df.apply( - self.func, axis=1, args=tuple(scalar_args), meta=self.meta - ).astype(self.meta[1]) - else: - result = self.func(*args, **kwargs) - return result - - def __eq__(self, other): - if isinstance(other, UDF): - return self.func == other.func and self.row_udf == other.row_udf - return NotImplemented - - def __hash__(self): - return (self.func, self.row_udf).__hash__() - - -class SchemaContainer: - def __init__(self, name: str): - self.__name__ = name - self.tables: Dict[str, DataContainer] = {} - self.statistics: Dict[str, Statistics] = {} - self.experiments: Dict[str, pd.DataFrame] = {} - self.models: Dict[str, Tuple[Any, List[str]]] = {} - self.functions: Dict[str, UDF] = {} - self.function_lists: List[FunctionDescription] = [] - self.filepaths: Dict[str, str] = {} diff --git a/datafusion/input_utils/__init__.py b/datafusion/input_utils/__init__.py deleted file mode 100644 index 9e8558d69..000000000 --- a/datafusion/input_utils/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from .convert import InputType, InputUtil -from .location import LocationInputPlugin -from .cudflike import CudfLikeInputPlugin - -__all__ = [ - InputUtil, - InputType, - LocationInputPlugin, - CudfLikeInputPlugin, -] diff --git a/datafusion/input_utils/base.py b/datafusion/input_utils/base.py deleted file mode 100644 index 90db678e7..000000000 --- a/datafusion/input_utils/base.py +++ /dev/null @@ -1,11 +0,0 @@ -from typing import Any - - -class BaseInputPlugin: - def is_correct_input( - self, input_item: Any, table_name: str, format: str = None, **kwargs - ): - raise NotImplementedError - - def to_dc(self, input_item: Any, table_name: str, format: str = None, **kwargs): - raise NotImplementedError diff --git a/datafusion/input_utils/convert.py b/datafusion/input_utils/convert.py deleted file mode 100644 index 60499beec..000000000 --- a/datafusion/input_utils/convert.py +++ /dev/null @@ -1,91 +0,0 @@ -import logging -from typing import TYPE_CHECKING, Union - -try: - # Attempt to use GPU acceleration if present - import cudf as xdf -except ImportError: - import pandas as xdf - -from cusql.datacontainer import ColumnContainer, DataContainer -from cusql.input_utils.base import BaseInputPlugin -from cusql.utils import Pluggable - -logger = logging.Logger(__name__) - -InputType = Union[ - xdf.DataFrame, - str, - Union[ - "sqlalchemy.engine.base.Connection", - "hive.Cursor", - "cudf.core.dataframe.DataFrame", - ], -] - - -class InputUtil(Pluggable): - """ - Plugin list and helper class for transforming the inputs to - create table into a dask dataframe - """ - - @classmethod - def add_plugin_class(cls, plugin_class: BaseInputPlugin, replace=True): - """Convenience function to add a class directly to the plugins""" - logger.debug(f"Registering Input plugin for {plugin_class}") - cls.add_plugin(str(plugin_class), plugin_class(), replace=replace) - - @classmethod - def to_dc( - cls, - input_item: InputType, - table_name: str, - format: str = None, - persist: bool = True, - gpu: bool = False, - **kwargs, - ) -> DataContainer: - """ - Turn possible input descriptions or formats (e.g. dask dataframes, pandas dataframes, - locations as string, hive tables) into the loaded data containers, - maybe persist them to cluster memory before. - """ - filled_get_dask_dataframe = lambda *args: cls._get_dask_dataframe( - *args, - table_name=table_name, - format=format, - gpu=gpu, - **kwargs, - ) - - if isinstance(input_item, list): - table = dd.concat([filled_get_dask_dataframe(item) for item in input_item]) - else: - table = filled_get_dask_dataframe(input_item) - - if persist: - table = table.persist() - - return DataContainer(table.copy(), ColumnContainer(table.columns)) - - @classmethod - def _get_dask_dataframe( - cls, - input_item: InputType, - table_name: str, - format: str = None, - gpu: bool = False, - **kwargs, - ): - plugin_list = cls.get_plugins() - - for plugin in plugin_list: - if plugin.is_correct_input( - input_item, table_name=table_name, format=format, **kwargs - ): - return plugin.to_dc( - input_item, table_name=table_name, format=format, gpu=gpu, **kwargs - ) - - raise ValueError(f"Do not understand the input type {type(input_item)}") diff --git a/datafusion/input_utils/cudflike.py b/datafusion/input_utils/cudflike.py deleted file mode 100644 index 082a2786c..000000000 --- a/datafusion/input_utils/cudflike.py +++ /dev/null @@ -1,22 +0,0 @@ -import cudf - -from datafusion.input_utils.base import BaseInputPlugin - - -class CudfLikeInputPlugin(BaseInputPlugin): - """Input Plugin for Pandas Like DataFrames, which get converted to dask DataFrames""" - - def is_correct_input( - self, input_item, table_name: str, format: str = None, **kwargs - ): - return hasattr(input_item, "__dataframe__") - - def to_dc( - self, - input_item, - table_name: str, - format: str = None, - gpu: bool = False, - **kwargs, - ): - return input_item diff --git a/datafusion/input_utils/location.py b/datafusion/input_utils/location.py deleted file mode 100644 index b5fc7b99f..000000000 --- a/datafusion/input_utils/location.py +++ /dev/null @@ -1,32 +0,0 @@ -import os -import cudf -from typing import Any - -from cusql.input_utils.base import BaseInputPlugin -from cusql.input_utils.convert import InputUtil - - -class LocationInputPlugin(BaseInputPlugin): - """Input Plugin for everything, which can be read in from a file (on disk, remote etc.)""" - - def is_correct_input( - self, input_item: Any, table_name: str, format: str = None, **kwargs - ): - return isinstance(input_item, str) - - def to_dc( - self, - input_item: Any, - format: str = None, - **kwargs, - ): - if not format: - _, extension = os.path.splitext(input_item) - - format = extension.lstrip(".") - try: - read_function = getattr(cudf, f"read_{format}") - except AttributeError: - raise AttributeError(f"Can not read files of format {format}") - - return read_function(input_item, **kwargs) diff --git a/datafusion/pandas.py b/datafusion/pandas.py index 935d9619b..c2da83ff6 100644 --- a/datafusion/pandas.py +++ b/datafusion/pandas.py @@ -15,17 +15,33 @@ # specific language governing permissions and limitations # under the License. +import logging import pandas as pd import datafusion +from datafusion.common import SqlSchema from datafusion.context import BaseSessionContext from datafusion.expr import Projection, TableScan, Column +logger = logging.getLogger(__name__) + class SessionContext(BaseSessionContext): - def __init__(self): + def __init__(self, logging_level=logging.INFO): self.datafusion_ctx = datafusion.SessionContext() self.parquet_tables = {} + # Set the logging level for this SQL context + logging.basicConfig(level=logging_level) + + # Name of the root catalog + self.catalog_name = self.DEFAULT_CATALOG_NAME + # Name of the root schema + self.schema_name = self.DEFAULT_SCHEMA_NAME + # Add the schema to the context + sch = SqlSchema(self.schema_name) + self.schemas[self.schema_name] = sch + self.context.register_schema(self.schema_name, sch) + def to_pandas_expr(self, expr): # get Python wrapper for logical expression expr = expr.to_variant() @@ -52,6 +68,20 @@ def to_pandas_df(self, plan): "unsupported logical operator: {}".format(type(node)) ) + def create_schema(self, schema_name: str, **kwargs): + logger.debug(f"Creating schema: {schema_name}") + self.schemas[schema_name] = SqlSchema(schema_name) + self.context.register_schema(schema_name, SqlSchema(schema_name)) + + def update_schema(self, schema_name: str, new_schema: SqlSchema, **kwargs): + self.schemas[schema_name] = new_schema + + def drop_schema(self, schema_name, **kwargs): + del self.schemas[schema_name] + + def show_schemas(self, **kwargs): + return self.schemas + def register_table(self, name, path, **kwargs): self.parquet_tables[name] = path self.datafusion_ctx.register_parquet(name, path) diff --git a/datafusion/polars.py b/datafusion/polars.py index bbc1fd7c2..e4eb966fc 100644 --- a/datafusion/polars.py +++ b/datafusion/polars.py @@ -14,16 +14,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - +import logging import polars import datafusion from datafusion.context import BaseSessionContext from datafusion.expr import Projection, TableScan, Aggregate from datafusion.expr import Column, AggregateFunction +from datafusion.common import SqlSchema + +logger = logging.getLogger(__name__) + class SessionContext(BaseSessionContext): - def __init__(self): + def __init__(self, logging_level=logging.INFO): self.datafusion_ctx = datafusion.SessionContext() self.parquet_tables = {} @@ -75,6 +79,20 @@ def to_polars_df(self, plan): "unsupported logical operator: {}".format(type(node)) ) + def create_schema(self, schema_name: str, **kwargs): + logger.debug(f"Creating schema: {schema_name}") + self.schemas[schema_name] = SqlSchema(schema_name) + self.context.register_schema(schema_name, SqlSchema(schema_name)) + + def update_schema(self, schema_name: str, new_schema: SqlSchema, **kwargs): + self.schemas[schema_name] = new_schema + + def drop_schema(self, schema_name, **kwargs): + del self.schemas[schema_name] + + def show_schemas(self, **kwargs): + return self.schemas + def register_table(self, name, path, **kwargs): self.parquet_tables[name] = path self.datafusion_ctx.register_parquet(name, path) diff --git a/src/common/schema.rs b/src/common/schema.rs index 197915906..d552f0d23 100644 --- a/src/common/schema.rs +++ b/src/common/schema.rs @@ -18,12 +18,12 @@ use std::any::Any; use datafusion::arrow::datatypes::SchemaRef; -use datafusion_expr::{TableSource, TableProviderFilterPushDown, Expr}; +use datafusion_expr::{Expr, TableProviderFilterPushDown, TableSource}; use pyo3::prelude::*; use datafusion_optimizer::utils::split_conjunction; -use super::{function::SqlFunction, data_type::DataTypeMap}; +use super::{data_type::DataTypeMap, function::SqlFunction}; #[pyclass(name = "SqlSchema", module = "datafusion.common", subclass)] #[derive(Debug, Clone)] @@ -139,7 +139,6 @@ pub struct SqlView { pub definition: String, // SQL code that defines the view } - #[pymethods] impl SqlSchema { #[new] @@ -190,7 +189,6 @@ impl SqlSchema { // } } - /// SqlTable wrapper that is compatible with DataFusion logical query plans pub struct SqlTableSource { schema: SchemaRef, From 9312d40fad3d081229e782e463337db361ab454d Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 26 May 2023 15:48:18 -0400 Subject: [PATCH 4/9] Clean up schema.rs file --- src/common/schema.rs | 72 -------------------------------------------- 1 file changed, 72 deletions(-) diff --git a/src/common/schema.rs b/src/common/schema.rs index d552f0d23..304319369 100644 --- a/src/common/schema.rs +++ b/src/common/schema.rs @@ -66,10 +66,6 @@ impl SqlTable { _schema_name: String, table_name: String, columns: Vec<(String, DataTypeMap)>, - // primary_key: Option, - // foreign_keys: Vec, - // indexes: Vec, - // constraints: Vec, row_count: f64, filepath: Option, ) -> Self { @@ -84,50 +80,6 @@ impl SqlTable { filepath, } } - - // // TODO: Really wish we could accept a SqlTypeName instance here instead of a String for `column_type` .... - // #[pyo3(name = "add_column")] - // pub fn add_column(&mut self, column_name: &str, type_map: DaskTypeMap) { - // self.columns.push((column_name.to_owned(), type_map)); - // } - - // #[pyo3(name = "getSchema")] - // pub fn get_schema(&self) -> PyResult> { - // Ok(self.schema_name.clone()) - // } - - // #[pyo3(name = "getTableName")] - // pub fn get_table_name(&self) -> PyResult { - // Ok(self.table_name.clone()) - // } - - // #[pyo3(name = "getQualifiedName")] - // pub fn qualified_name(&self, plan: logical::PyLogicalPlan) -> Vec { - // let mut qualified_name = match &self.schema_name { - // Some(schema_name) => vec![schema_name.clone()], - // None => vec![], - // }; - - // match plan.original_plan { - // LogicalPlan::TableScan(table_scan) => { - // qualified_name.push(table_scan.table_name.to_string()); - // } - // _ => { - // qualified_name.push(self.table_name.clone()); - // } - // } - - // qualified_name - // } - - // #[pyo3(name = "getRowType")] - // pub fn row_type(&self) -> RelDataType { - // let mut fields: Vec = Vec::new(); - // for (name, data_type) in &self.columns { - // fields.push(RelDataTypeField::new(name.as_str(), data_type.clone(), 255)); - // } - // RelDataType::new(false, fields) - // } } #[pyclass(name = "SqlView", module = "datafusion.common", subclass)] @@ -163,30 +115,6 @@ impl SqlSchema { pub fn add_table(&mut self, table: SqlTable) { self.tables.push(table); } - - // pub fn add_or_overload_function( - // &mut self, - // name: String, - // input_types: Vec, - // return_type: PyDataType, - // aggregation: bool, - // ) { - // self.functions - // .entry(name.clone()) - // .and_modify(|e| { - // (*e).lock() - // .unwrap() - // .add_type_mapping(input_types.clone(), return_type.clone()); - // }) - // .or_insert_with(|| { - // Arc::new(Mutex::new(SQLFunction::new( - // name, - // input_types, - // return_type, - // aggregation, - // ))) - // }); - // } } /// SqlTable wrapper that is compatible with DataFusion logical query plans From 8b567926423f902ffb4920fe35bebd27f992c66b Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 26 May 2023 15:54:50 -0400 Subject: [PATCH 5/9] Introduce CRUD methods for table instances --- datafusion/context.py | 46 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/datafusion/context.py b/datafusion/context.py index a36462123..d402dd410 100644 --- a/datafusion/context.py +++ b/datafusion/context.py @@ -16,9 +16,9 @@ # under the License. from abc import ABC, abstractmethod -from typing import Dict +from typing import Dict, List -from datafusion.common import SqlSchema +from datafusion.common import SqlSchema, SqlTable class BaseSessionContext(ABC): @@ -76,6 +76,48 @@ def show_schemas(self, **kwargs) -> Dict[str, SqlSchema]: """ pass + @abstractmethod + def create_table( + self, + table_name: str, + schema_name: str = None, + **kwargs, + ): + """ + Creates/Registers a table in the specied schema instance + """ + pass + + @abstractmethod + def update_table( + self, + table_name: str, + new_table: SqlTable, + **kwargs, + ): + """ + Updates an existing table in the SessionContext + """ + pass + + @abstractmethod + def drop_table( + self, + table_name: str, + **kwargs, + ): + """ + Drops the specified table, based on name, from the current context + """ + pass + + @abstractmethod + def show_tables(self, **kwargs) -> List[SqlTable]: + """ + Return all tables in the current SessionContext impl. + """ + pass + @abstractmethod def register_table( self, From 57ea1f8e2df126bda4eac81db26885f5db5e5db5 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 26 May 2023 17:05:06 -0400 Subject: [PATCH 6/9] Add function to drop_table --- src/common/schema.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/common/schema.rs b/src/common/schema.rs index 304319369..f8a1ed635 100644 --- a/src/common/schema.rs +++ b/src/common/schema.rs @@ -115,6 +115,10 @@ impl SqlSchema { pub fn add_table(&mut self, table: SqlTable) { self.tables.push(table); } + + pub fn drop_table(&mut self, table_name: String) { + self.tables.retain(|x| !x.name.eq(&table_name)); + } } /// SqlTable wrapper that is compatible with DataFusion logical query plans From 97e44ebcc33576bf74217e67f84f6956229e1464 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 26 May 2023 17:08:02 -0400 Subject: [PATCH 7/9] Add schema_name to drop_table function --- datafusion/context.py | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/context.py b/datafusion/context.py index d402dd410..e13d61552 100644 --- a/datafusion/context.py +++ b/datafusion/context.py @@ -103,6 +103,7 @@ def update_table( @abstractmethod def drop_table( self, + schema_name: str, table_name: str, **kwargs, ): From e1c41e7f1b80112995e69f8d3df113d9febf3d7b Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 26 May 2023 17:11:22 -0400 Subject: [PATCH 8/9] remove unused parameter in SqlTable new --- src/common/schema.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/common/schema.rs b/src/common/schema.rs index f8a1ed635..a003d0ca1 100644 --- a/src/common/schema.rs +++ b/src/common/schema.rs @@ -63,7 +63,6 @@ pub struct SqlTable { impl SqlTable { #[new] pub fn new( - _schema_name: String, table_name: String, columns: Vec<(String, DataTypeMap)>, row_count: f64, From 434e256a50e9200b014dc2abd778038fdef237ab Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 26 May 2023 17:46:51 -0400 Subject: [PATCH 9/9] Update function to allow for modifying existing tables --- datafusion/context.py | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/context.py b/datafusion/context.py index e13d61552..6292177f7 100644 --- a/datafusion/context.py +++ b/datafusion/context.py @@ -91,6 +91,7 @@ def create_table( @abstractmethod def update_table( self, + schema_name: str, table_name: str, new_table: SqlTable, **kwargs,