Skip to content

Commit 047fb33

Browse files
authored
Minor: refactor data_trunc to reduce duplicated code (#8430)
* refactor data_trunc * fix cast to timestamp array * fix cast to timestamp scalar * fix doc
1 parent 8f9d6e3 commit 047fb33

2 files changed

Lines changed: 53 additions & 99 deletions

File tree

datafusion/common/src/scalar.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use arrow::{
4646
},
4747
};
4848
use arrow_array::cast::as_list_array;
49+
use arrow_array::types::ArrowTimestampType;
4950
use arrow_array::{ArrowNativeTypeOp, Scalar};
5051

5152
/// A dynamically typed, nullable single value, (the single-valued counter-part
@@ -774,6 +775,20 @@ impl ScalarValue {
774775
ScalarValue::IntervalMonthDayNano(Some(val))
775776
}
776777

778+
/// Returns a [`ScalarValue`] representing
779+
/// `value` and `tz_opt` timezone
780+
pub fn new_timestamp<T: ArrowTimestampType>(
781+
value: Option<i64>,
782+
tz_opt: Option<Arc<str>>,
783+
) -> Self {
784+
match T::UNIT {
785+
TimeUnit::Second => ScalarValue::TimestampSecond(value, tz_opt),
786+
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(value, tz_opt),
787+
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(value, tz_opt),
788+
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(value, tz_opt),
789+
}
790+
}
791+
777792
/// Create a zero value in the given type.
778793
pub fn new_zero(datatype: &DataType) -> Result<ScalarValue> {
779794
assert!(datatype.is_primitive());

datafusion/physical-expr/src/datetime_expressions.rs

Lines changed: 38 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,15 @@ use arrow::{
3636
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType,
3737
},
3838
};
39+
use arrow_array::types::ArrowTimestampType;
3940
use arrow_array::{
4041
timezone::Tz, TimestampMicrosecondArray, TimestampMillisecondArray,
4142
TimestampSecondArray,
4243
};
4344
use chrono::prelude::*;
4445
use chrono::{Duration, Months, NaiveDate};
4546
use datafusion_common::cast::{
46-
as_date32_array, as_date64_array, as_generic_string_array,
47+
as_date32_array, as_date64_array, as_generic_string_array, as_primitive_array,
4748
as_timestamp_microsecond_array, as_timestamp_millisecond_array,
4849
as_timestamp_nanosecond_array, as_timestamp_second_array,
4950
};
@@ -335,7 +336,7 @@ fn date_trunc_coarse(granularity: &str, value: i64, tz: Option<Tz>) -> Result<i6
335336
}
336337

337338
// truncates a single value with the given timeunit to the specified granularity
338-
fn _date_trunc(
339+
fn general_date_trunc(
339340
tu: TimeUnit,
340341
value: &Option<i64>,
341342
tz: Option<Tz>,
@@ -403,123 +404,61 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result<ColumnarValue> {
403404
return exec_err!("Granularity of `date_trunc` must be non-null scalar Utf8");
404405
};
405406

407+
fn process_array<T: ArrowTimestampType>(
408+
array: &dyn Array,
409+
granularity: String,
410+
tz_opt: &Option<Arc<str>>,
411+
) -> Result<ColumnarValue> {
412+
let parsed_tz = parse_tz(tz_opt)?;
413+
let array = as_primitive_array::<T>(array)?;
414+
let array = array
415+
.iter()
416+
.map(|x| general_date_trunc(T::UNIT, &x, parsed_tz, granularity.as_str()))
417+
.collect::<Result<PrimitiveArray<T>>>()?
418+
.with_timezone_opt(tz_opt.clone());
419+
Ok(ColumnarValue::Array(Arc::new(array)))
420+
}
421+
422+
fn process_scalr<T: ArrowTimestampType>(
423+
v: &Option<i64>,
424+
granularity: String,
425+
tz_opt: &Option<Arc<str>>,
426+
) -> Result<ColumnarValue> {
427+
let parsed_tz = parse_tz(tz_opt)?;
428+
let value = general_date_trunc(T::UNIT, v, parsed_tz, granularity.as_str())?;
429+
let value = ScalarValue::new_timestamp::<T>(value, tz_opt.clone());
430+
Ok(ColumnarValue::Scalar(value))
431+
}
432+
406433
Ok(match array {
407434
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
408-
let parsed_tz = parse_tz(tz_opt)?;
409-
let value =
410-
_date_trunc(TimeUnit::Nanosecond, v, parsed_tz, granularity.as_str())?;
411-
let value = ScalarValue::TimestampNanosecond(value, tz_opt.clone());
412-
ColumnarValue::Scalar(value)
435+
process_scalr::<TimestampNanosecondType>(v, granularity, tz_opt)?
413436
}
414437
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
415-
let parsed_tz = parse_tz(tz_opt)?;
416-
let value =
417-
_date_trunc(TimeUnit::Microsecond, v, parsed_tz, granularity.as_str())?;
418-
let value = ScalarValue::TimestampMicrosecond(value, tz_opt.clone());
419-
ColumnarValue::Scalar(value)
438+
process_scalr::<TimestampMicrosecondType>(v, granularity, tz_opt)?
420439
}
421440
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
422-
let parsed_tz = parse_tz(tz_opt)?;
423-
let value =
424-
_date_trunc(TimeUnit::Millisecond, v, parsed_tz, granularity.as_str())?;
425-
let value = ScalarValue::TimestampMillisecond(value, tz_opt.clone());
426-
ColumnarValue::Scalar(value)
441+
process_scalr::<TimestampMillisecondType>(v, granularity, tz_opt)?
427442
}
428443
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
429-
let parsed_tz = parse_tz(tz_opt)?;
430-
let value =
431-
_date_trunc(TimeUnit::Second, v, parsed_tz, granularity.as_str())?;
432-
let value = ScalarValue::TimestampSecond(value, tz_opt.clone());
433-
ColumnarValue::Scalar(value)
444+
process_scalr::<TimestampSecondType>(v, granularity, tz_opt)?
434445
}
435446
ColumnarValue::Array(array) => {
436447
let array_type = array.data_type();
437448
match array_type {
438449
DataType::Timestamp(TimeUnit::Second, tz_opt) => {
439-
let parsed_tz = parse_tz(tz_opt)?;
440-
let array = as_timestamp_second_array(array)?;
441-
let array = array
442-
.iter()
443-
.map(|x| {
444-
_date_trunc(
445-
TimeUnit::Second,
446-
&x,
447-
parsed_tz,
448-
granularity.as_str(),
449-
)
450-
})
451-
.collect::<Result<TimestampSecondArray>>()?
452-
.with_timezone_opt(tz_opt.clone());
453-
ColumnarValue::Array(Arc::new(array))
450+
process_array::<TimestampSecondType>(array, granularity, tz_opt)?
454451
}
455452
DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => {
456-
let parsed_tz = parse_tz(tz_opt)?;
457-
let array = as_timestamp_millisecond_array(array)?;
458-
let array = array
459-
.iter()
460-
.map(|x| {
461-
_date_trunc(
462-
TimeUnit::Millisecond,
463-
&x,
464-
parsed_tz,
465-
granularity.as_str(),
466-
)
467-
})
468-
.collect::<Result<TimestampMillisecondArray>>()?
469-
.with_timezone_opt(tz_opt.clone());
470-
ColumnarValue::Array(Arc::new(array))
453+
process_array::<TimestampMillisecondType>(array, granularity, tz_opt)?
471454
}
472455
DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => {
473-
let parsed_tz = parse_tz(tz_opt)?;
474-
let array = as_timestamp_microsecond_array(array)?;
475-
let array = array
476-
.iter()
477-
.map(|x| {
478-
_date_trunc(
479-
TimeUnit::Microsecond,
480-
&x,
481-
parsed_tz,
482-
granularity.as_str(),
483-
)
484-
})
485-
.collect::<Result<TimestampMicrosecondArray>>()?
486-
.with_timezone_opt(tz_opt.clone());
487-
ColumnarValue::Array(Arc::new(array))
456+
process_array::<TimestampMicrosecondType>(array, granularity, tz_opt)?
488457
}
489458
DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => {
490-
let parsed_tz = parse_tz(tz_opt)?;
491-
let array = as_timestamp_nanosecond_array(array)?;
492-
let array = array
493-
.iter()
494-
.map(|x| {
495-
_date_trunc(
496-
TimeUnit::Nanosecond,
497-
&x,
498-
parsed_tz,
499-
granularity.as_str(),
500-
)
501-
})
502-
.collect::<Result<TimestampNanosecondArray>>()?
503-
.with_timezone_opt(tz_opt.clone());
504-
ColumnarValue::Array(Arc::new(array))
505-
}
506-
_ => {
507-
let parsed_tz = None;
508-
let array = as_timestamp_nanosecond_array(array)?;
509-
let array = array
510-
.iter()
511-
.map(|x| {
512-
_date_trunc(
513-
TimeUnit::Nanosecond,
514-
&x,
515-
parsed_tz,
516-
granularity.as_str(),
517-
)
518-
})
519-
.collect::<Result<TimestampNanosecondArray>>()?;
520-
521-
ColumnarValue::Array(Arc::new(array))
459+
process_array::<TimestampNanosecondType>(array, granularity, tz_opt)?
522460
}
461+
_ => process_array::<TimestampNanosecondType>(array, granularity, &None)?,
523462
}
524463
}
525464
_ => {

0 commit comments

Comments
 (0)