Skip to content

Commit 0a2b0a7

Browse files
authored
Add serialization of ScalarValue::Binary and ScalarValue::LargeBinary, ScalarValue::Time64 (#3534)
1 parent 6be3301 commit 0a2b0a7

4 files changed

Lines changed: 76 additions & 40 deletions

File tree

datafusion/proto/proto/datafusion.proto

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,9 @@ message ScalarValue{
761761
int64 interval_daytime_value = 25;
762762
ScalarTimestampValue timestamp_value = 26;
763763
ScalarDictionaryValue dictionary_value = 27;
764+
bytes binary_value = 28;
765+
bytes large_binary_value = 29;
766+
int64 time64_value = 30;
764767
}
765768
}
766769

@@ -788,16 +791,21 @@ enum PrimitiveScalarType{
788791
UTF8 = 11;
789792
LARGE_UTF8 = 12;
790793
DATE32 = 13;
791-
TIME_MICROSECOND = 14;
792-
TIME_NANOSECOND = 15;
794+
TIMESTAMP_MICROSECOND = 14;
795+
TIMESTAMP_NANOSECOND = 15;
793796
NULL = 16;
794797

795798
DECIMAL128 = 17;
796799
DATE64 = 20;
797-
TIME_SECOND = 21;
798-
TIME_MILLISECOND = 22;
800+
TIMESTAMP_SECOND = 21;
801+
TIMESTAMP_MILLISECOND = 22;
799802
INTERVAL_YEARMONTH = 23;
800803
INTERVAL_DAYTIME = 24;
804+
805+
BINARY = 25;
806+
LARGE_BINARY = 26;
807+
808+
TIME64 = 27;
801809
}
802810

803811
message ScalarType{

datafusion/proto/src/from_proto.rs

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -218,20 +218,25 @@ impl From<protobuf::PrimitiveScalarType> for DataType {
218218
protobuf::PrimitiveScalarType::Float64 => DataType::Float64,
219219
protobuf::PrimitiveScalarType::Utf8 => DataType::Utf8,
220220
protobuf::PrimitiveScalarType::LargeUtf8 => DataType::LargeUtf8,
221+
protobuf::PrimitiveScalarType::Binary => DataType::Binary,
222+
protobuf::PrimitiveScalarType::LargeBinary => DataType::LargeBinary,
221223
protobuf::PrimitiveScalarType::Date32 => DataType::Date32,
222-
protobuf::PrimitiveScalarType::TimeMicrosecond => {
223-
DataType::Time64(TimeUnit::Microsecond)
224-
}
225-
protobuf::PrimitiveScalarType::TimeNanosecond => {
224+
protobuf::PrimitiveScalarType::Time64 => {
226225
DataType::Time64(TimeUnit::Nanosecond)
227226
}
227+
protobuf::PrimitiveScalarType::TimestampMicrosecond => {
228+
DataType::Timestamp(TimeUnit::Microsecond, None)
229+
}
230+
protobuf::PrimitiveScalarType::TimestampNanosecond => {
231+
DataType::Timestamp(TimeUnit::Nanosecond, None)
232+
}
228233
protobuf::PrimitiveScalarType::Null => DataType::Null,
229234
protobuf::PrimitiveScalarType::Decimal128 => DataType::Decimal128(0, 0),
230235
protobuf::PrimitiveScalarType::Date64 => DataType::Date64,
231-
protobuf::PrimitiveScalarType::TimeSecond => {
236+
protobuf::PrimitiveScalarType::TimestampSecond => {
232237
DataType::Timestamp(TimeUnit::Second, None)
233238
}
234-
protobuf::PrimitiveScalarType::TimeMillisecond => {
239+
protobuf::PrimitiveScalarType::TimestampMillisecond => {
235240
DataType::Timestamp(TimeUnit::Millisecond, None)
236241
}
237242
protobuf::PrimitiveScalarType::IntervalYearmonth => {
@@ -643,15 +648,20 @@ impl TryFrom<&protobuf::PrimitiveScalarType> for ScalarValue {
643648
PrimitiveScalarType::Float64 => Self::Float64(None),
644649
PrimitiveScalarType::Utf8 => Self::Utf8(None),
645650
PrimitiveScalarType::LargeUtf8 => Self::LargeUtf8(None),
651+
PrimitiveScalarType::Binary => Self::Binary(None),
652+
PrimitiveScalarType::LargeBinary => Self::LargeBinary(None),
646653
PrimitiveScalarType::Date32 => Self::Date32(None),
647-
PrimitiveScalarType::TimeMicrosecond => {
654+
PrimitiveScalarType::Time64 => Self::Time64(None),
655+
PrimitiveScalarType::TimestampMicrosecond => {
648656
Self::TimestampMicrosecond(None, None)
649657
}
650-
PrimitiveScalarType::TimeNanosecond => Self::TimestampNanosecond(None, None),
658+
PrimitiveScalarType::TimestampNanosecond => {
659+
Self::TimestampNanosecond(None, None)
660+
}
651661
PrimitiveScalarType::Decimal128 => Self::Decimal128(None, 0, 0),
652662
PrimitiveScalarType::Date64 => Self::Date64(None),
653-
PrimitiveScalarType::TimeSecond => Self::TimestampSecond(None, None),
654-
PrimitiveScalarType::TimeMillisecond => {
663+
PrimitiveScalarType::TimestampSecond => Self::TimestampSecond(None, None),
664+
PrimitiveScalarType::TimestampMillisecond => {
655665
Self::TimestampMillisecond(None, None)
656666
}
657667
PrimitiveScalarType::IntervalYearmonth => Self::IntervalYearMonth(None),
@@ -749,6 +759,7 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
749759
)
750760
}
751761
Value::Date64Value(v) => Self::Date64(Some(*v)),
762+
Value::Time64Value(v) => Self::Time64(Some(*v)),
752763
Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
753764
Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(*v)),
754765
Value::TimestampValue(v) => {
@@ -792,6 +803,8 @@ impl TryFrom<&protobuf::ScalarValue> for ScalarValue {
792803

793804
Self::Dictionary(Box::new(index_type), Box::new(value))
794805
}
806+
Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
807+
Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
795808
})
796809
}
797810
}
@@ -1419,30 +1432,30 @@ fn typechecked_scalar_value_conversion(
14191432
value:
14201433
Some(protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(v)),
14211434
}),
1422-
PrimitiveScalarType::TimeMicrosecond,
1435+
PrimitiveScalarType::TimestampMicrosecond,
14231436
) => ScalarValue::TimestampMicrosecond(Some(*v), unwrap_timezone(timezone)),
14241437
(
14251438
Value::TimestampValue(protobuf::ScalarTimestampValue {
14261439
timezone,
14271440
value:
14281441
Some(protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(v)),
14291442
}),
1430-
PrimitiveScalarType::TimeNanosecond,
1443+
PrimitiveScalarType::TimestampNanosecond,
14311444
) => ScalarValue::TimestampNanosecond(Some(*v), unwrap_timezone(timezone)),
14321445
(
14331446
Value::TimestampValue(protobuf::ScalarTimestampValue {
14341447
timezone,
14351448
value: Some(protobuf::scalar_timestamp_value::Value::TimeSecondValue(v)),
14361449
}),
1437-
PrimitiveScalarType::TimeSecond,
1450+
PrimitiveScalarType::TimestampSecond,
14381451
) => ScalarValue::TimestampSecond(Some(*v), unwrap_timezone(timezone)),
14391452
(
14401453
Value::TimestampValue(protobuf::ScalarTimestampValue {
14411454
timezone,
14421455
value:
14431456
Some(protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(v)),
14441457
}),
1445-
PrimitiveScalarType::TimeMillisecond,
1458+
PrimitiveScalarType::TimestampMillisecond,
14461459
) => ScalarValue::TimestampMillisecond(Some(*v), unwrap_timezone(timezone)),
14471460
(Value::Utf8Value(v), PrimitiveScalarType::Utf8) => {
14481461
ScalarValue::Utf8(Some(v.to_owned()))
@@ -1469,10 +1482,11 @@ fn typechecked_scalar_value_conversion(
14691482
PrimitiveScalarType::Utf8 => ScalarValue::Utf8(None),
14701483
PrimitiveScalarType::LargeUtf8 => ScalarValue::LargeUtf8(None),
14711484
PrimitiveScalarType::Date32 => ScalarValue::Date32(None),
1472-
PrimitiveScalarType::TimeMicrosecond => {
1485+
PrimitiveScalarType::Time64 => ScalarValue::Time64(None),
1486+
PrimitiveScalarType::TimestampMicrosecond => {
14731487
ScalarValue::TimestampMicrosecond(None, None)
14741488
}
1475-
PrimitiveScalarType::TimeNanosecond => {
1489+
PrimitiveScalarType::TimestampNanosecond => {
14761490
ScalarValue::TimestampNanosecond(None, None)
14771491
}
14781492
PrimitiveScalarType::Null => {
@@ -1484,10 +1498,10 @@ fn typechecked_scalar_value_conversion(
14841498
ScalarValue::Decimal128(None, 0, 0)
14851499
}
14861500
PrimitiveScalarType::Date64 => ScalarValue::Date64(None),
1487-
PrimitiveScalarType::TimeSecond => {
1501+
PrimitiveScalarType::TimestampSecond => {
14881502
ScalarValue::TimestampSecond(None, None)
14891503
}
1490-
PrimitiveScalarType::TimeMillisecond => {
1504+
PrimitiveScalarType::TimestampMillisecond => {
14911505
ScalarValue::TimestampMillisecond(None, None)
14921506
}
14931507
PrimitiveScalarType::IntervalYearmonth => {
@@ -1496,6 +1510,8 @@ fn typechecked_scalar_value_conversion(
14961510
PrimitiveScalarType::IntervalDaytime => {
14971511
ScalarValue::IntervalDayTime(None)
14981512
}
1513+
PrimitiveScalarType::Binary => ScalarValue::Binary(None),
1514+
PrimitiveScalarType::LargeBinary => ScalarValue::LargeBinary(None),
14991515
};
15001516
scalar_value
15011517
} else {

datafusion/proto/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,10 @@ mod roundtrip_tests {
402402
ScalarValue::LargeUtf8(Some(String::from("Test Large utf8"))),
403403
ScalarValue::Date32(Some(0)),
404404
ScalarValue::Date32(Some(i32::MAX)),
405+
ScalarValue::Date32(None),
406+
ScalarValue::Time64(Some(0)),
407+
ScalarValue::Time64(Some(i64::MAX)),
408+
ScalarValue::Time64(None),
405409
ScalarValue::TimestampNanosecond(Some(0), None),
406410
ScalarValue::TimestampNanosecond(Some(i64::MAX), None),
407411
ScalarValue::TimestampNanosecond(Some(0), Some("UTC".to_string())),
@@ -459,6 +463,10 @@ mod roundtrip_tests {
459463
Box::new(DataType::Int32),
460464
Box::new(ScalarValue::Utf8(None)),
461465
),
466+
ScalarValue::Binary(Some(b"bar".to_vec())),
467+
ScalarValue::Binary(None),
468+
ScalarValue::LargeBinary(Some(b"bar".to_vec())),
469+
ScalarValue::LargeBinary(None),
462470
];
463471

464472
for test_case in should_pass.into_iter() {

datafusion/proto/src/to_proto.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
10981098
})
10991099
}
11001100
datafusion::scalar::ScalarValue::TimestampMicrosecond(val, tz) => {
1101-
create_proto_scalar(val, PrimitiveScalarType::TimeMicrosecond, |s| {
1101+
create_proto_scalar(val, PrimitiveScalarType::TimestampMicrosecond, |s| {
11021102
Value::TimestampValue(protobuf::ScalarTimestampValue {
11031103
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
11041104
value: Some(
@@ -1110,7 +1110,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
11101110
})
11111111
}
11121112
datafusion::scalar::ScalarValue::TimestampNanosecond(val, tz) => {
1113-
create_proto_scalar(val, PrimitiveScalarType::TimeNanosecond, |s| {
1113+
create_proto_scalar(val, PrimitiveScalarType::TimestampNanosecond, |s| {
11141114
Value::TimestampValue(protobuf::ScalarTimestampValue {
11151115
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
11161116
value: Some(
@@ -1145,7 +1145,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
11451145
})
11461146
}
11471147
datafusion::scalar::ScalarValue::TimestampSecond(val, tz) => {
1148-
create_proto_scalar(val, PrimitiveScalarType::TimeSecond, |s| {
1148+
create_proto_scalar(val, PrimitiveScalarType::TimestampSecond, |s| {
11491149
Value::TimestampValue(protobuf::ScalarTimestampValue {
11501150
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
11511151
value: Some(
@@ -1155,7 +1155,7 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
11551155
})
11561156
}
11571157
datafusion::scalar::ScalarValue::TimestampMillisecond(val, tz) => {
1158-
create_proto_scalar(val, PrimitiveScalarType::TimeMillisecond, |s| {
1158+
create_proto_scalar(val, PrimitiveScalarType::TimestampMillisecond, |s| {
11591159
Value::TimestampValue(protobuf::ScalarTimestampValue {
11601160
timezone: tz.as_ref().unwrap_or(&"".to_string()).clone(),
11611161
value: Some(
@@ -1180,19 +1180,21 @@ impl TryFrom<&ScalarValue> for protobuf::ScalarValue {
11801180
value: Some(Value::NullValue(PrimitiveScalarType::Null as i32)),
11811181
},
11821182

1183-
datafusion::scalar::ScalarValue::Binary(_) => {
1184-
// not yet implemented (TODO file ticket)
1185-
return Err(Error::invalid_scalar_value(val));
1183+
scalar::ScalarValue::Binary(val) => {
1184+
create_proto_scalar(val, PrimitiveScalarType::Binary, |s| {
1185+
Value::BinaryValue(s.to_owned())
1186+
})
11861187
}
1187-
1188-
datafusion::scalar::ScalarValue::LargeBinary(_) => {
1189-
// not yet implemented (TODO file ticket)
1190-
return Err(Error::invalid_scalar_value(val));
1188+
scalar::ScalarValue::LargeBinary(val) => {
1189+
create_proto_scalar(val, PrimitiveScalarType::LargeBinary, |s| {
1190+
Value::LargeBinaryValue(s.to_owned())
1191+
})
11911192
}
11921193

1193-
datafusion::scalar::ScalarValue::Time64(_) => {
1194-
// not yet implemented (TODO file ticket)
1195-
return Err(Error::invalid_scalar_value(val));
1194+
datafusion::scalar::ScalarValue::Time64(v) => {
1195+
create_proto_scalar(v, PrimitiveScalarType::Time64, |v| {
1196+
Value::Time64Value(*v)
1197+
})
11961198
}
11971199

11981200
datafusion::scalar::ScalarValue::IntervalMonthDayNano(_) => {
@@ -1335,10 +1337,10 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
13351337
DataType::Date32 => Self::Scalar(PrimitiveScalarType::Date32 as i32),
13361338
DataType::Time64(time_unit) => match time_unit {
13371339
TimeUnit::Microsecond => {
1338-
Self::Scalar(PrimitiveScalarType::TimeMicrosecond as i32)
1340+
Self::Scalar(PrimitiveScalarType::TimestampMicrosecond as i32)
13391341
}
13401342
TimeUnit::Nanosecond => {
1341-
Self::Scalar(PrimitiveScalarType::TimeNanosecond as i32)
1343+
Self::Scalar(PrimitiveScalarType::TimestampNanosecond as i32)
13421344
}
13431345
_ => {
13441346
return Err(Error::invalid_time_unit(time_unit));
@@ -1379,8 +1381,10 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype {
13791381
DataType::Float64 => PrimitiveScalarType::Float64,
13801382
DataType::Date32 => PrimitiveScalarType::Date32,
13811383
DataType::Time64(time_unit) => match time_unit {
1382-
TimeUnit::Microsecond => PrimitiveScalarType::TimeMicrosecond,
1383-
TimeUnit::Nanosecond => PrimitiveScalarType::TimeNanosecond,
1384+
TimeUnit::Microsecond => {
1385+
PrimitiveScalarType::TimestampMicrosecond
1386+
}
1387+
TimeUnit::Nanosecond => PrimitiveScalarType::TimestampNanosecond,
13841388
_ => {
13851389
return Err(Error::invalid_time_unit(time_unit));
13861390
}

0 commit comments

Comments
 (0)