Skip to content

Commit 8522633

Browse files
committed
Merge conflict
2 parents 2afeca1 + b8805d4 commit 8522633

16 files changed

Lines changed: 235 additions & 122 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ members = [
2020
"datafusion",
2121
"datafusion-cli",
2222
"datafusion-examples",
23-
"benchmarks",
23+
"benchmarks",
2424
"ballista/rust/client",
2525
"ballista/rust/core",
2626
"ballista/rust/executor",

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ the convenience of an SQL interface or a DataFrame API.
4848

4949
Here are some of the projects known to use DataFusion:
5050

51-
* [Ballista](https://github.com/ballista-compute/ballista) Distributed Compute Platform
51+
* [Ballista](ballista) Distributed Compute Platform
5252
* [Cloudfuse Buzz](https://github.com/cloudfuse-io/buzz-rust)
5353
* [Cube Store](https://github.com/cube-js/cube.js/tree/master/rust)
5454
* [datafusion-python](https://pypi.org/project/datafusion)

ballista/rust/core/proto/ballista.proto

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ message LogicalExprNode {
3333
oneof ExprType {
3434
// column references
3535
string column_name = 1;
36-
36+
3737
// alias
3838
AliasNode alias = 2;
3939

@@ -42,15 +42,15 @@ message LogicalExprNode {
4242

4343
// binary expressions
4444
BinaryExprNode binary_expr = 4;
45-
45+
4646
// aggregate expressions
4747
AggregateExprNode aggregate_expr = 5;
48-
48+
4949
// null checks
5050
IsNull is_null_expr = 6;
5151
IsNotNull is_not_null_expr = 7;
5252
Not not_expr = 8;
53-
53+
5454
BetweenNode between = 9;
5555
CaseNode case_ = 10;
5656
CastNode cast = 11;
@@ -130,6 +130,7 @@ enum ScalarFunction {
130130
SHA256 = 30;
131131
SHA384 = 31;
132132
SHA512 = 32;
133+
LN = 33;
133134
}
134135

135136
message ScalarFunctionNode {
@@ -361,7 +362,7 @@ message CsvScanExecNode {
361362
bool has_header = 5;
362363
uint32 batch_size = 6;
363364
string delimiter = 7;
364-
365+
365366
// partition filenames
366367
repeated string filename = 8;
367368
}
@@ -466,7 +467,7 @@ message Action {
466467
// Fetch a partition from an executor
467468
PartitionId fetch_partition = 3;
468469
}
469-
470+
470471
// configuration settings
471472
repeated KeyValuePair settings = 100;
472473
}
@@ -742,10 +743,10 @@ message ScalarValue{
742743
}
743744
}
744745

745-
// Contains all valid datafusion scalar type except for
746+
// Contains all valid datafusion scalar type except for
746747
// List
747748
enum PrimitiveScalarType{
748-
749+
749750
BOOL = 0; // arrow::Type::BOOL
750751
UINT8 = 1; // arrow::Type::UINT8
751752
INT8 = 2; // arrow::Type::INT8
@@ -777,7 +778,7 @@ message ScalarListType{
777778
PrimitiveScalarType deepest_type = 2;
778779
}
779780

780-
// Broke out into multiple message types so that type
781+
// Broke out into multiple message types so that type
781782
// metadata did not need to be in separate message
782783
//All types that are of the empty message types contain no additional metadata
783784
// about the type
@@ -794,7 +795,7 @@ message ArrowType{
794795
EmptyMessage UINT64 =9;
795796
EmptyMessage INT64 =10 ;
796797
EmptyMessage FLOAT16 =11 ;
797-
EmptyMessage FLOAT32 =12 ;
798+
EmptyMessage FLOAT32 =12 ;
798799
EmptyMessage FLOAT64 =13 ;
799800
EmptyMessage UTF8 =14 ;
800801
EmptyMessage LARGE_UTF8 = 32;
@@ -824,7 +825,7 @@ message ArrowType{
824825

825826
//Useful for representing an empty enum variant in rust
826827
// E.G. enum example{One, Two(i32)}
827-
// maps to
828+
// maps to
828829
// message example{
829830
// oneof{
830831
// EmptyMessage One = 1;

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use crate::{convert_box_required, convert_required};
2828

2929
use arrow::datatypes::{DataType, Field, Schema};
3030
use datafusion::logical_plan::{
31-
abs, acos, asin, atan, ceil, cos, exp, floor, log10, log2, round, signum, sin, sqrt,
32-
tan, trunc, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
31+
abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
32+
sqrt, tan, trunc, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Operator,
3333
};
3434
use datafusion::physical_plan::aggregates::AggregateFunction;
3535
use datafusion::physical_plan::csv::CsvReadOptions;
@@ -1013,6 +1013,7 @@ impl TryInto<Expr> for &protobuf::LogicalExprNode {
10131013
protobuf::ScalarFunction::Log2 => {
10141014
Ok(log2((&expr.expr[0]).try_into()?))
10151015
}
1016+
protobuf::ScalarFunction::Ln => Ok(ln((&expr.expr[0]).try_into()?)),
10161017
protobuf::ScalarFunction::Log10 => {
10171018
Ok(log10((&expr.expr[0]).try_into()?))
10181019
}

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,6 +1200,7 @@ impl TryInto<protobuf::ScalarFunction> for &BuiltinScalarFunction {
12001200
BuiltinScalarFunction::Atan => Ok(protobuf::ScalarFunction::Atan),
12011201
BuiltinScalarFunction::Exp => Ok(protobuf::ScalarFunction::Exp),
12021202
BuiltinScalarFunction::Log => Ok(protobuf::ScalarFunction::Log),
1203+
BuiltinScalarFunction::Ln => Ok(protobuf::ScalarFunction::Ln),
12031204
BuiltinScalarFunction::Log10 => Ok(protobuf::ScalarFunction::Log10),
12041205
BuiltinScalarFunction::Floor => Ok(protobuf::ScalarFunction::Floor),
12051206
BuiltinScalarFunction::Ceil => Ok(protobuf::ScalarFunction::Ceil),

datafusion-cli/Dockerfile

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
FROM rust:latest
18+
FROM rust:latest as builder
1919

20+
COPY ./datafusion /usr/src/datafusion
2021

21-
COPY ./datafusion ./usr/src/datafusion
22-
COPY ./datafusion-cli ./usr/src/datafusion-cli
22+
COPY ./datafusion-cli /usr/src/datafusion-cli
2323

2424
WORKDIR /usr/src/datafusion-cli
25-
RUN cargo install --path .
2625

26+
RUN cargo build --release
2727

28-
CMD ["datafusion-cli", "--data-path", "/data"]
28+
FROM debian:buster-slim
29+
30+
COPY --from=builder /usr/src/datafusion-cli/target/release/datafusion-cli /usr/local/bin
31+
32+
ENTRYPOINT ["datafusion-cli"]
33+
34+
CMD ["--data-path", "/data"]

datafusion/src/execution/context.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1766,6 +1766,25 @@ mod tests {
17661766
"+-----+-------------+",
17671767
];
17681768
assert_batches_sorted_eq!(expected, &results);
1769+
1770+
// Now, use dict as an aggregate
1771+
let results = plan_and_collect(
1772+
&mut ctx,
1773+
"SELECT val, count(distinct dict) FROM t GROUP BY val",
1774+
)
1775+
.await
1776+
.expect("ran plan correctly");
1777+
1778+
let expected = vec![
1779+
"+-----+----------------------+",
1780+
"| val | count(DISTINCT dict) |",
1781+
"+-----+----------------------+",
1782+
"| 1 | 2 |",
1783+
"| 2 | 2 |",
1784+
"| 4 | 1 |",
1785+
"+-----+----------------------+",
1786+
];
1787+
assert_batches_sorted_eq!(expected, &results);
17691788
}
17701789

17711790
run_test_case::<Int8Type>().await;

datafusion/src/execution/dataframe_impl.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -177,9 +177,11 @@ impl DataFrame for DataFrameImpl {
177177

178178
#[cfg(test)]
179179
mod tests {
180+
use std::vec;
181+
180182
use super::*;
181-
use crate::execution::context::ExecutionContext;
182183
use crate::logical_plan::*;
184+
use crate::{assert_batches_sorted_eq, execution::context::ExecutionContext};
183185
use crate::{datasource::csv::CsvReadOptions, physical_plan::ColumnarValue};
184186
use crate::{physical_plan::functions::ScalarFunctionImplementation, test};
185187
use arrow::datatypes::DataType;
@@ -216,8 +218,8 @@ mod tests {
216218
Ok(())
217219
}
218220

219-
#[test]
220-
fn aggregate() -> Result<()> {
221+
#[tokio::test]
222+
async fn aggregate() -> Result<()> {
221223
// build plan using DataFrame API
222224
let df = test_table()?;
223225
let group_expr = vec![col("c1")];
@@ -230,18 +232,22 @@ mod tests {
230232
count_distinct(col("c12")),
231233
];
232234

233-
let df = df.aggregate(group_expr, aggr_expr)?;
234-
235-
let plan = df.to_logical_plan();
236-
237-
// build same plan using SQL API
238-
let sql = "SELECT c1, MIN(c12), MAX(c12), AVG(c12), SUM(c12), COUNT(c12), COUNT(DISTINCT c12) \
239-
FROM aggregate_test_100 \
240-
GROUP BY c1";
241-
let sql_plan = create_plan(sql)?;
242-
243-
// the two plans should be identical
244-
assert_same_plan(&plan, &sql_plan);
235+
let df: Vec<RecordBatch> = df.aggregate(group_expr, aggr_expr)?.collect().await?;
236+
237+
assert_batches_sorted_eq!(
238+
vec![
239+
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
240+
"| c1 | MIN(c12) | MAX(c12) | AVG(c12) | SUM(c12) | COUNT(c12) | COUNT(DISTINCT c12) |",
241+
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
242+
"| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |",
243+
"| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |",
244+
"| c | 0.0494924465469434 | 0.991517828651004 | 0.6600456536439784 | 13.860958726523545 | 21 | 21 |",
245+
"| d | 0.061029375346466685 | 0.9748360509016578 | 0.48855379387549824 | 8.793968289758968 | 18 | 18 |",
246+
"| e | 0.01479305307777301 | 0.9965400387585364 | 0.48600669271341534 | 10.206140546981722 | 21 | 21 |",
247+
"+----+----------------------+--------------------+---------------------+--------------------+------------+---------------------+",
248+
],
249+
&df
250+
);
245251

246252
Ok(())
247253
}

datafusion/src/logical_plan/expr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1086,9 +1086,9 @@ unary_scalar_expr!(Trunc, trunc);
10861086
unary_scalar_expr!(Abs, abs);
10871087
unary_scalar_expr!(Signum, signum);
10881088
unary_scalar_expr!(Exp, exp);
1089-
unary_scalar_expr!(Log, ln);
10901089
unary_scalar_expr!(Log2, log2);
10911090
unary_scalar_expr!(Log10, log10);
1091+
unary_scalar_expr!(Ln, ln);
10921092

10931093
// string functions
10941094
unary_scalar_expr!(Ascii, ascii);

datafusion/src/physical_plan/distinct_expressions.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ pub struct DistinctCount {
4747
name: String,
4848
/// The DataType for the final count
4949
data_type: DataType,
50-
/// The DataType for each input argument
51-
input_data_types: Vec<DataType>,
50+
/// The DataType used to hold the state for each input
51+
state_data_types: Vec<DataType>,
5252
/// The input arguments
5353
exprs: Vec<Arc<dyn PhysicalExpr>>,
5454
}
@@ -61,15 +61,26 @@ impl DistinctCount {
6161
name: String,
6262
data_type: DataType,
6363
) -> Self {
64+
let state_data_types = input_data_types.into_iter().map(state_type).collect();
65+
6466
Self {
65-
input_data_types,
66-
exprs,
6767
name,
6868
data_type,
69+
state_data_types,
70+
exprs,
6971
}
7072
}
7173
}
7274

75+
/// return the type to use to accumulate state for the specified input type
76+
fn state_type(data_type: DataType) -> DataType {
77+
match data_type {
78+
// when aggregating dictionary values, use the underlying value type
79+
DataType::Dictionary(_key_type, value_type) => *value_type,
80+
t => t,
81+
}
82+
}
83+
7384
impl AggregateExpr for DistinctCount {
7485
/// Return a reference to Any that can be used for downcasting
7586
fn as_any(&self) -> &dyn Any {
@@ -82,12 +93,16 @@ impl AggregateExpr for DistinctCount {
8293

8394
fn state_fields(&self) -> Result<Vec<Field>> {
8495
Ok(self
85-
.input_data_types
96+
.state_data_types
8697
.iter()
87-
.map(|data_type| {
98+
.map(|state_data_type| {
8899
Field::new(
89100
&format_state_name(&self.name, "count distinct"),
90-
DataType::List(Box::new(Field::new("item", data_type.clone(), true))),
101+
DataType::List(Box::new(Field::new(
102+
"item",
103+
state_data_type.clone(),
104+
true,
105+
))),
91106
false,
92107
)
93108
})
@@ -101,7 +116,7 @@ impl AggregateExpr for DistinctCount {
101116
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
102117
Ok(Box::new(DistinctCountAccumulator {
103118
values: HashSet::default(),
104-
data_types: self.input_data_types.clone(),
119+
state_data_types: self.state_data_types.clone(),
105120
count_data_type: self.data_type.clone(),
106121
}))
107122
}
@@ -110,7 +125,7 @@ impl AggregateExpr for DistinctCount {
110125
#[derive(Debug)]
111126
struct DistinctCountAccumulator {
112127
values: HashSet<DistinctScalarValues, RandomState>,
113-
data_types: Vec<DataType>,
128+
state_data_types: Vec<DataType>,
114129
count_data_type: DataType,
115130
}
116131

@@ -156,9 +171,11 @@ impl Accumulator for DistinctCountAccumulator {
156171

157172
fn state(&self) -> Result<Vec<ScalarValue>> {
158173
let mut cols_out = self
159-
.data_types
174+
.state_data_types
160175
.iter()
161-
.map(|data_type| ScalarValue::List(Some(Vec::new()), data_type.clone()))
176+
.map(|state_data_type| {
177+
ScalarValue::List(Some(Vec::new()), state_data_type.clone())
178+
})
162179
.collect::<Vec<_>>();
163180

164181
let mut cols_vec = cols_out

0 commit comments

Comments
 (0)