diff --git a/datafusion/expr/src/preimage.rs b/datafusion/expr/src/preimage.rs index 67ca7a91bbf38..0ef8673cd256e 100644 --- a/datafusion/expr/src/preimage.rs +++ b/datafusion/expr/src/preimage.rs @@ -23,7 +23,23 @@ use crate::Expr; pub enum PreimageResult { /// No preimage exists for the specified value None, - /// The expression always evaluates to the specified constant - /// given that `expr` is within the interval - Range { expr: Expr, interval: Box }, + /// For some UDF, a `preimage` implementation determines that: + /// the result `udf_result` in `udf_result = UDF(expr)` + /// is equivalent to `udf_result = UDF(i)` for any `i` in `interval`. + /// + /// Then, `is_boundary` indicates a boundary condition where: + /// the original expression `UDF(expr)` is compared to a value `lit` where: + /// `UDF(lit) == lit` + /// This condition is important for two scenarios: + /// 1. `<` and `>=` operators: + /// if `Some(false)`, expression rewrite should use `interval.upper` + /// 2. `=` and `!=` operators: + /// if `Some(false)`, expression rewrite can use constant (false and true, respectively) + /// + /// if is_boundary is `None`, then the boundary condition never applies. + Range { + expr: Expr, + interval: Box, + is_boundary: Option, + }, } diff --git a/datafusion/functions/src/datetime/date_part.rs b/datafusion/functions/src/datetime/date_part.rs index e3080c9d1a007..de9d372cdbd6b 100644 --- a/datafusion/functions/src/datetime/date_part.rs +++ b/datafusion/functions/src/datetime/date_part.rs @@ -304,6 +304,7 @@ impl ScalarUDFImpl for DatePartFunc { Ok(PreimageResult::Range { expr: col_expr.clone(), interval, + is_boundary: None, }) } diff --git a/datafusion/functions/src/datetime/date_trunc.rs b/datafusion/functions/src/datetime/date_trunc.rs index 8497e583ba4bc..36cb6873fb9f3 100644 --- a/datafusion/functions/src/datetime/date_trunc.rs +++ b/datafusion/functions/src/datetime/date_trunc.rs @@ -37,19 +37,22 @@ use arrow::datatypes::TimeUnit::{self, Microsecond, Millisecond, Nanosecond, Sec use arrow::datatypes::{Field, FieldRef}; use datafusion_common::cast::as_primitive_array; use datafusion_common::types::{NativeType, logical_date, logical_string}; +use datafusion_common::utils::take_function_args; use datafusion_common::{ DataFusionError, Result, ScalarValue, exec_datafusion_err, exec_err, internal_err, }; +use datafusion_expr::interval_arithmetic::Interval; use datafusion_expr::sort_properties::{ExprProperties, SortProperties}; use datafusion_expr::{ - ColumnarValue, Documentation, ReturnFieldArgs, ScalarUDFImpl, Signature, - TypeSignature, Volatility, + ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDFImpl, Signature, + TypeSignature, Volatility, preimage::PreimageResult, simplify::SimplifyContext, }; use datafusion_expr_common::signature::{Coercion, TypeSignatureClass}; use datafusion_macros::user_doc; use chrono::{ - DateTime, Datelike, Duration, LocalResult, NaiveDateTime, Offset, TimeDelta, Timelike, + DateTime, Datelike, Duration, LocalResult, Months, NaiveDateTime, Offset, TimeDelta, + Timelike, }; /// Represents the granularity for date truncation operations @@ -107,6 +110,22 @@ impl DateTruncGranularity { } } + /// Convert a DateTruncGranularity enum into a granularity string for error reporting + fn as_str(&self) -> &str { + match self { + Self::Microsecond => "microsecond", + Self::Millisecond => "millisecond", + Self::Second => "second", + Self::Minute => "minute", + Self::Hour => "hour", + Self::Day => "day", + Self::Week => "week", + Self::Month => "month", + Self::Quarter => "quarter", + Self::Year => "year", + } + } + /// Returns true if this granularity can be handled with simple arithmetic /// (fine granularity: second, minute, millisecond, microsecond) fn is_fine_granularity(&self) -> bool { @@ -412,6 +431,145 @@ impl ScalarUDFImpl for DateTruncFunc { }) } + fn preimage( + &self, + args: &[Expr], + lit_expr: &Expr, + _info: &SimplifyContext, + ) -> Result { + // Determine what datetime granularity to use for preimage calculation + let [trunc_part, col_expr] = take_function_args(self.name(), args)?; + let granular_part = trunc_part + .as_literal() + .and_then(|sv| sv.try_as_str().flatten()) + .map(part_normalization); + + let granularity = match granular_part { + Some(trunc_granularity) => { + match DateTruncGranularity::from_str(trunc_granularity) { + Ok(granularity_instance) => granularity_instance, + Err(granularity_err) => { + return Err(granularity_err); + } + } + } + None => { + return Ok(PreimageResult::None); + } + }; + + let truncated_literal = match lit_expr.as_literal() { + // Timestamp types (smallest to largest granularity) + Some(ScalarValue::TimestampNanosecond(Some(ts_val), ts_tz)) => { + trunc_interval_for_ts::( + ts_val, + ts_tz, + granularity, + )? + } + Some(ScalarValue::TimestampMicrosecond(Some(ts_val), ts_tz)) => { + trunc_interval_for_ts::( + ts_val, + ts_tz, + granularity, + )? + } + Some(ScalarValue::TimestampMillisecond(Some(ts_val), ts_tz)) => { + trunc_interval_for_ts::( + ts_val, + ts_tz, + granularity, + )? + } + Some(ScalarValue::TimestampSecond(Some(ts_val), ts_tz)) => { + trunc_interval_for_ts::(ts_val, ts_tz, granularity)? + } + + // Time types (smallest to largest granularity) + Some(ScalarValue::Time64Nanosecond(Some(ts_val))) => { + let trunc_tval = truncate_time_nanos(*ts_val, granularity); + let next_tval = increment_time_nanos(trunc_tval, granularity); + if trunc_tval == next_tval { + return exec_err!( + "{:?} too coarse for time in Nanoseconds", + granularity.as_str() + ); + } + + Interval::try_new( + ScalarValue::Time64Nanosecond(Some(trunc_tval)), + ScalarValue::Time64Nanosecond(Some(next_tval)), + )? + } + Some(ScalarValue::Time64Microsecond(Some(ts_val))) => { + let trunc_tval = truncate_time_micros(*ts_val, granularity); + let next_tval = increment_time_micros(trunc_tval, granularity); + if trunc_tval == next_tval { + return exec_err!( + "{:?} too coarse for time in Microseconds", + granularity.as_str() + ); + } + + Interval::try_new( + ScalarValue::Time64Microsecond(Some(trunc_tval)), + ScalarValue::Time64Microsecond(Some(next_tval)), + )? + } + Some(ScalarValue::Time32Millisecond(Some(ts_val))) => { + let trunc_tval = truncate_time_millis(*ts_val, granularity); + let next_tval = increment_time_millis(trunc_tval, granularity); + if trunc_tval == next_tval { + return exec_err!( + "{:?} too coarse for time in Milliseconds", + granularity.as_str() + ); + } + + Interval::try_new( + ScalarValue::Time32Millisecond(Some(trunc_tval)), + ScalarValue::Time32Millisecond(Some(next_tval)), + )? + } + Some(ScalarValue::Time32Second(Some(ts_val))) => { + let trunc_tval = truncate_time_secs(*ts_val, granularity); + let next_tval = increment_time_secs(trunc_tval, granularity); + if trunc_tval == next_tval { + return exec_err!( + "{:?} too coarse for time in Seconds", + granularity.as_str() + ); + } + + Interval::try_new( + ScalarValue::Time32Second(Some(trunc_tval)), + ScalarValue::Time32Second(Some(next_tval)), + )? + } + + // Null or empty types (no pre-image) + Some(ScalarValue::Null) => return Ok(PreimageResult::None), + None => { + return Ok(PreimageResult::None); + } + _ => { + return Err(DataFusionError::NotImplemented(format!( + "Preimage not implemented for type {lit_expr}" + ))); + } + }; + + // Determine if the literal is aligned with the interval boundary + let is_boundary = + Some(truncated_literal.lower() == lit_expr.as_literal().unwrap()); + + Ok(PreimageResult::Range { + expr: col_expr.clone(), + interval: Box::new(truncated_literal), + is_boundary, + }) + } + fn aliases(&self) -> &[String] { &self.aliases } @@ -502,6 +660,155 @@ fn truncate_time_secs(value: i32, granularity: DateTruncGranularity) -> i32 { } } +/// Dispatch function for the increment functions by time unit +fn increment_time_unit( + time_unit: TimeUnit, + value: i64, + granularity: DateTruncGranularity, +) -> i64 { + match time_unit { + Nanosecond => increment_time_nanos(value, granularity), + Microsecond => increment_time_micros(value, granularity), + Millisecond => increment_time_millis(value as i32, granularity) as i64, + Second => increment_time_secs(value as i32, granularity) as i64, + } +} + +/// Increment time in nanoseconds by the specified granularity +fn increment_time_nanos(value: i64, granularity: DateTruncGranularity) -> i64 { + match granularity { + DateTruncGranularity::Hour => value + NANOS_PER_HOUR, + DateTruncGranularity::Minute => value + NANOS_PER_MINUTE, + DateTruncGranularity::Second => value + NANOS_PER_SECOND, + DateTruncGranularity::Millisecond => value + NANOS_PER_MILLISECOND, + DateTruncGranularity::Microsecond => value + NANOS_PER_MICROSECOND, + // Other granularities are invalid; return same value + _ => value, + } +} + +/// Increment time in microseconds by the specified granularity +fn increment_time_micros(value: i64, granularity: DateTruncGranularity) -> i64 { + match granularity { + DateTruncGranularity::Hour => value + MICROS_PER_HOUR, + DateTruncGranularity::Minute => value + MICROS_PER_MINUTE, + DateTruncGranularity::Second => value + MICROS_PER_SECOND, + DateTruncGranularity::Millisecond => value + MICROS_PER_MILLISECOND, + DateTruncGranularity::Microsecond => value + 1, + // Other granularities are invalid; return same value + _ => value, + } +} + +/// Increment time in milliseconds by the specified granularity +fn increment_time_millis(value: i32, granularity: DateTruncGranularity) -> i32 { + match granularity { + DateTruncGranularity::Hour => value + MILLIS_PER_HOUR, + DateTruncGranularity::Minute => value + MILLIS_PER_MINUTE, + DateTruncGranularity::Second => value + MILLIS_PER_SECOND, + DateTruncGranularity::Millisecond => value + 1, + // Other granularities are invalid; return same value + _ => value, + } +} + +/// Increment time in seconds by the specified granularity +fn increment_time_secs(value: i32, granularity: DateTruncGranularity) -> i32 { + match granularity { + DateTruncGranularity::Hour => value + SECS_PER_HOUR, + DateTruncGranularity::Minute => value + SECS_PER_MINUTE, + DateTruncGranularity::Second => value + 1, + // Other granularities are invalid; return same value + _ => value, + } +} + +/// Increment a calendar `DateTime` (with timezone) by the specified granularity +fn _increment_calendar_with_tz( + granularity: DateTruncGranularity, + dt: DateTime, +) -> Result { + let next_dt = match granularity { + DateTruncGranularity::Year => dt.with_year(dt.year() + 1), + DateTruncGranularity::Month => dt.checked_add_months(Months::new(1)), + DateTruncGranularity::Quarter => dt.checked_add_months(Months::new(3)), + DateTruncGranularity::Week => { + dt.checked_add_signed(TimeDelta::try_days(7).unwrap()) + } + DateTruncGranularity::Day => { + dt.checked_add_signed(TimeDelta::try_days(1).unwrap()) + } + _ => return exec_err!("Unsupported calendar granularity: {granularity:?}"), + }; + + next_dt + .and_then(|dt| dt.timestamp_nanos_opt()) + .ok_or(exec_datafusion_err!("Timestamp overflow")) +} + +/// Increment a calendar `NaiveDateTime` (without timezone) by the specified granularity +fn _increment_calendar_without_tz( + granularity: DateTruncGranularity, + dt: NaiveDateTime, +) -> Result { + let next_dt = match granularity { + DateTruncGranularity::Year => dt.with_year(dt.year() + 1), + DateTruncGranularity::Month => dt.checked_add_months(Months::new(1)), + DateTruncGranularity::Quarter => dt.checked_add_months(Months::new(3)), + DateTruncGranularity::Week => { + dt.checked_add_signed(TimeDelta::try_days(7).unwrap()) + } + DateTruncGranularity::Day => { + dt.checked_add_signed(TimeDelta::try_days(1).unwrap()) + } + _ => return exec_err!("Unsupported calendar granularity: {granularity:?}"), + }; + next_dt + .and_then(|dt| dt.and_utc().timestamp_nanos_opt()) + .ok_or(exec_datafusion_err!("Timestamp overflow")) +} + +/// Increment timestamp in nanoseconds by calendar-based granularity (year, month, quarter, week, day) +/// Handles timezone-aware and timezone-naive timestamps (follows pattern from date_trunc_coarse) +fn increment_timestamp_nanos_calendar( + ts_unit: TimeUnit, + ts_value: i64, + tz: Option, + granularity: DateTruncGranularity, +) -> Result { + // For calendar granularities, convert to nanos, increment, then convert back + let value_ns = match ts_unit { + Second => NANOSECONDS * ts_value, + Millisecond => NANOS_PER_MILLISECOND * ts_value, + Microsecond => NANOS_PER_MICROSECOND * ts_value, + Nanosecond => ts_value, + }; + + let incremented_ns = match tz { + Some(tz) => { + let Some(dt) = + as_datetime_with_timezone::(value_ns, tz) + else { + return Err(exec_datafusion_err!("Timestamp {value_ns} out of range")); + }; + _increment_calendar_with_tz(granularity, dt) + } + None => { + let Some(dt) = timestamp_ns_to_datetime(value_ns) else { + return Err(exec_datafusion_err!("Timestamp {value_ns} out of range")); + }; + _increment_calendar_without_tz(granularity, dt) + } + }?; + + match ts_unit { + Second => Ok(incremented_ns / NANOSECONDS), + Millisecond => Ok(incremented_ns / NANOS_PER_MILLISECOND), + Microsecond => Ok(incremented_ns / NANOS_PER_MICROSECOND), + Nanosecond => Ok(incremented_ns), + } +} + fn _date_trunc_coarse( granularity: DateTruncGranularity, value: Option, @@ -745,6 +1052,35 @@ fn general_date_trunc( Ok(result) } +fn trunc_interval_for_ts( + ts_val: &i64, + ts_tz: &Option>, + ts_granularity: DateTruncGranularity, +) -> Result { + let parsed_tz = parse_tz(ts_tz)?; + + // general_date_trunc returns values in TsType::UNIT (seconds/millis/micros/nanos) + let lower_val = general_date_trunc(TsType::UNIT, *ts_val, parsed_tz, ts_granularity)?; + + // Increment based on timestamp unit and granularity + let upper_val = if ts_granularity.valid_for_time() { + increment_time_unit(TsType::UNIT, lower_val, ts_granularity) + } else { + increment_timestamp_nanos_calendar( + TsType::UNIT, + lower_val, + parsed_tz, + ts_granularity, + )? + }; + + // Create the actual interval + Interval::try_new( + ScalarValue::new_timestamp::(Some(lower_val), ts_tz.clone()), + ScalarValue::new_timestamp::(Some(upper_val), ts_tz.clone()), + ) +} + fn parse_tz(tz: &Option>) -> Result> { tz.as_ref() .map(|tz| { @@ -754,6 +1090,15 @@ fn parse_tz(tz: &Option>) -> Result> { .transpose() } +// Copied from `crate::datetime::date_part` because it's private there +// Try to remove quote if exist, if the quote is invalid, return original string and let the +// calling function handle the error +fn part_normalization(part: &str) -> &str { + part.strip_prefix(|c| c == '\'' || c == '\"') + .and_then(|s| s.strip_suffix(|c| c == '\'' || c == '\"')) + .unwrap_or(part) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -1399,4 +1744,76 @@ mod tests { } }); } + + #[test] + fn test_date_trunc_preimage_time_granularity_too_fine() { + // Note: This scenario is unreachable from SQL because invalid granularities + // are rejected earlier in invoke_with_args (line 272). These tests verify + // defensive error checking in the preimage computation. + use datafusion_expr::{ + Expr, col, lit, preimage::PreimageResult, simplify::SimplifyContext, + }; + + let date_trunc_func = DateTruncFunc::new(); + let info = SimplifyContext::default(); + + // Test Time32Second (second precision) with millisecond granularity + // Should error because milliseconds cannot be represented in second precision + let args = vec![lit("millisecond"), col("x")]; + let lit_expr = Expr::Literal(ScalarValue::Time32Second(Some(45296)), None); + let result = date_trunc_func.preimage(&args, &lit_expr, &info); + assert!(result.is_err()); + if let Err(e) = result { + assert!(e.to_string().contains("too coarse for time in Seconds")); + } + + // Test Time64Microsecond (microsecond precision) with microsecond granularity + // Should succeed because microseconds can be represented + let args = vec![lit("microsecond"), col("x")]; + let lit_expr = + Expr::Literal(ScalarValue::Time64Microsecond(Some(45296000000)), None); + let result = date_trunc_func.preimage(&args, &lit_expr, &info); + assert!(result.is_ok()); + assert!(matches!(result.unwrap(), PreimageResult::Range { .. })); + } + + #[test] + fn test_date_trunc_preimage_time_interval_bounds() { + // Verify that preimage creates correct interval bounds with truncated lower value + use datafusion_expr::{ + Expr, col, lit, preimage::PreimageResult, simplify::SimplifyContext, + }; + + let date_trunc_func = DateTruncFunc::new(); + let info = SimplifyContext::default(); + + // Time32Second: 12:34:56 (45296 secs) truncated to minute should be 12:34:00 (45240 secs) + let args = vec![lit("minute"), col("x")]; + let lit_expr = Expr::Literal(ScalarValue::Time32Second(Some(45296)), None); + let result = date_trunc_func.preimage(&args, &lit_expr, &info).unwrap(); + if let PreimageResult::Range { interval, .. } = result { + assert_eq!(interval.lower(), &ScalarValue::Time32Second(Some(45240))); + assert_eq!(interval.upper(), &ScalarValue::Time32Second(Some(45300))); + } else { + panic!("Expected Range result"); + } + + // Time32Millisecond: verify bounds are truncated + let args = vec![lit("second"), col("x")]; + let lit_expr = + Expr::Literal(ScalarValue::Time32Millisecond(Some(45296500)), None); + let result = date_trunc_func.preimage(&args, &lit_expr, &info).unwrap(); + if let PreimageResult::Range { interval, .. } = result { + assert_eq!( + interval.lower(), + &ScalarValue::Time32Millisecond(Some(45296000)) + ); + assert_eq!( + interval.upper(), + &ScalarValue::Time32Millisecond(Some(45297000)) + ); + } else { + panic!("Expected Range result"); + } + } } diff --git a/datafusion/functions/src/math/floor.rs b/datafusion/functions/src/math/floor.rs index d4f25716ff7ee..bbe4943b80ae1 100644 --- a/datafusion/functions/src/math/floor.rs +++ b/datafusion/functions/src/math/floor.rs @@ -302,7 +302,8 @@ impl ScalarUDFImpl for FloorFunc { Ok(PreimageResult::Range { expr: arg, - interval: Box::new(Interval::try_new(lower, upper)?), + interval: Box::new(Interval::try_new(lower.clone(), upper)?), + is_boundary: Some(lower == *lit_value), }) } @@ -403,7 +404,7 @@ mod tests { let result = floor_func.preimage(&args, &lit_expr, &info).unwrap(); match result { - PreimageResult::Range { expr, interval } => { + PreimageResult::Range { expr, interval, .. } => { assert_eq!(expr, col("x")); assert_eq!(interval.lower().clone(), expected_lower); assert_eq!(interval.upper().clone(), expected_upper); diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs index c6644e008645a..312ed6cf3b22e 100644 --- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs +++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs @@ -2028,15 +2028,21 @@ impl TreeNodeRewriter for Simplifier<'_> { }))); } - if let PreimageResult::Range { interval, expr } = - get_preimage(left.as_ref(), right.as_ref(), info)? + if let PreimageResult::Range { + interval, + expr, + is_boundary, + } = get_preimage(left.as_ref(), right.as_ref(), info)? { - rewrite_with_preimage(*interval, op, expr)? + rewrite_with_preimage(*interval, op, expr, is_boundary)? } else if let Some(swapped) = op.swap() { - if let PreimageResult::Range { interval, expr } = - get_preimage(right.as_ref(), left.as_ref(), info)? + if let PreimageResult::Range { + interval, + expr, + is_boundary, + } = get_preimage(right.as_ref(), left.as_ref(), info)? { - rewrite_with_preimage(*interval, swapped, expr)? + rewrite_with_preimage(*interval, swapped, expr, is_boundary)? } else { Transformed::no(Expr::BinaryExpr(BinaryExpr { left, op, right })) } @@ -2064,8 +2070,11 @@ impl TreeNodeRewriter for Simplifier<'_> { let mut rewritten: Option = None; for item in &list { - let PreimageResult::Range { interval, expr } = - get_preimage(expr.as_ref(), item, info)? + let PreimageResult::Range { + interval, + expr, + is_boundary, + } = get_preimage(expr.as_ref(), item, info)? else { return Ok(Transformed::no(Expr::InList(InList { expr, @@ -2074,7 +2083,8 @@ impl TreeNodeRewriter for Simplifier<'_> { }))); }; - let range_expr = rewrite_with_preimage(*interval, op, expr)?.data; + let range_expr = + rewrite_with_preimage(*interval, op, expr, is_boundary)?.data; rewritten = Some(match rewritten { None => range_expr, Some(acc) => combiner(acc, range_expr), diff --git a/datafusion/optimizer/src/simplify_expressions/udf_preimage.rs b/datafusion/optimizer/src/simplify_expressions/udf_preimage.rs index da2716d13cb47..5a41f0bf1e13f 100644 --- a/datafusion/optimizer/src/simplify_expressions/udf_preimage.rs +++ b/datafusion/optimizer/src/simplify_expressions/udf_preimage.rs @@ -30,33 +30,43 @@ pub(super) fn rewrite_with_preimage( preimage_interval: Interval, op: Operator, expr: Expr, + is_boundary: Option, ) -> Result> { let (lower, upper) = preimage_interval.into_bounds(); let (lower, upper) = (lit(lower), lit(upper)); - let rewritten_expr = match op { - // < x ==> < lower - Operator::Lt => expr.lt(lower), - // >= x ==> >= lower - Operator::GtEq => expr.gt_eq(lower), - // > x ==> >= upper - Operator::Gt => expr.gt_eq(upper), - // <= x ==> < upper - Operator::LtEq => expr.lt(upper), - // = x ==> ( >= lower) and ( < upper) - Operator::Eq => and(expr.clone().gt_eq(lower), expr.lt(upper)), - // != x ==> ( < lower) or ( >= upper) - Operator::NotEq => or(expr.clone().lt(lower), expr.gt_eq(upper)), + // When is_boundary is Some, intervals are adjacent and we need to choose one + let rewritten_expr = match (op, is_boundary) { + // operators that use upper bound if is_boundary is Some(false): + // udf(expr) < x ==> udf(expr) < interval.upper + (Operator::Lt, Some(false)) => expr.lt(upper), + (Operator::GtEq, Some(false)) => expr.gt_eq(upper), + (Operator::Eq, Some(false)) => lit(false), + (Operator::NotEq, Some(false)) => lit(true), + + // otherwise, use lower bound + // udf(expr) < x ==> udf(expr) < interval.lower + (Operator::Lt, _) => expr.lt(lower), + (Operator::GtEq, _) => expr.gt_eq(lower), + (Operator::Eq, _) => and(expr.clone().gt_eq(lower), expr.lt(upper)), + (Operator::NotEq, _) => or(expr.clone().lt(lower), expr.gt_eq(upper)), + + // Operators that don't depend on interval boundaries + // udf(expr) > x ==> expr >= upper + (Operator::Gt, _) => expr.gt_eq(upper), + (Operator::LtEq, _) => expr.lt(upper), + // is not distinct from x ==> ( is NULL and x is NULL) or (( >= lower) and ( < upper)) - // but since x is always not NULL => ( is not NULL) and ( >= lower) and ( < upper) - Operator::IsNotDistinctFrom => expr + // but since x is always not NULL => ( is not NULL) and (= lower) and ( < upper) + (Operator::IsNotDistinctFrom, _) => expr .clone() .is_not_null() .and(expr.clone().gt_eq(lower)) .and(expr.lt(upper)), + // is distinct from x ==> ( < lower) or ( >= upper) or ( is NULL and x is not NULL) or ( is not NULL and x is NULL) - // but given that x is always not NULL => ( < lower) or ( >= upper) or ( is NULL) - Operator::IsDistinctFrom => expr + // but given that x is always not NULL => ( < lower) or ( >= upper) or ( expr .clone() .lt(lower) .or(expr.clone().gt_eq(upper)) @@ -136,10 +146,34 @@ mod test { Ok(DataType::Int32) } - fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { - Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(500)))) + /// Simple UDF that maps: [100, 200) -> 500 and [300, 400) -> 600 + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let arg = &args.args[0]; + + fn is_small_range(val: &i32) -> bool { + *val >= 100 && *val < 200 + } + fn is_large_range(val: &i32) -> bool { + *val >= 300 && *val < 400 + } + + let map_result = match arg { + ColumnarValue::Scalar(ScalarValue::Int32(Some(val))) => { + if is_small_range(val) { + Some(500) + } else if is_large_range(val) { + Some(600) + } else { + None + } + } + _ => None, + }; + + Ok(ColumnarValue::Scalar(ScalarValue::Int32(map_result))) } + /// Simple preimage that maps: 500 -> [100, 200) and 600 -> [300, 400) fn preimage( &self, args: &[Expr], @@ -162,6 +196,7 @@ mod test { ScalarValue::Int32(Some(100)), ScalarValue::Int32(Some(200)), )?), + is_boundary: None, }) } Expr::Literal(ScalarValue::Int32(Some(600)), _) => { @@ -171,8 +206,11 @@ mod test { ScalarValue::Int32(Some(300)), ScalarValue::Int32(Some(400)), )?), + is_boundary: None, }) } + + // Any other value has no preimage _ => Ok(PreimageResult::None), } } diff --git a/datafusion/sqllogictest/test_files/datetime/timestamps.slt b/datafusion/sqllogictest/test_files/datetime/timestamps.slt index 9526ccebfd16e..a0d241504e952 100644 --- a/datafusion/sqllogictest/test_files/datetime/timestamps.slt +++ b/datafusion/sqllogictest/test_files/datetime/timestamps.slt @@ -5370,3 +5370,379 @@ SELECT to_timestamp(arrow_cast(100.5, 'Float16'), name) FROM test_to_timestamp_s statement ok drop table test_to_timestamp_scalar + +## date_trunc Preimage tests + +# Test with timestamp data +statement ok +CREATE TABLE t1(ts TIMESTAMP) AS VALUES + (NULL), + ('2023-01-15T10:30:45'::timestamp), + ('2024-06-20T14:25:30'::timestamp), + ('2024-12-31T23:59:59'::timestamp), + ('2025-03-10T08:15:20'::timestamp); + +# Test YEAR granularity - basic comparisons + +query P +SELECT ts FROM t1 WHERE date_trunc('year', ts) = timestamp '2024-01-01T00:00:00' ORDER BY ts; +---- +2024-06-20T14:25:30 +2024-12-31T23:59:59 + +query P +SELECT ts FROM t1 WHERE date_trunc('year', ts) <> timestamp '2024-01-01T00:00:00' ORDER BY ts; +---- +2023-01-15T10:30:45 +2025-03-10T08:15:20 + +query P +SELECT ts FROM t1 WHERE date_trunc('year', ts) > timestamp '2024-01-01T00:00:00' ORDER BY ts; +---- +2025-03-10T08:15:20 + +query P +SELECT ts FROM t1 WHERE date_trunc('year', ts) < timestamp '2024-01-01T00:00:00' ORDER BY ts; +---- +2023-01-15T10:30:45 + +query P +SELECT ts FROM t1 WHERE date_trunc('year', ts) >= timestamp '2024-01-01T00:00:00' ORDER BY ts; +---- +2024-06-20T14:25:30 +2024-12-31T23:59:59 +2025-03-10T08:15:20 + +query P +SELECT ts FROM t1 WHERE date_trunc('year', ts) <= timestamp '2024-01-01T00:00:00' ORDER BY ts; +---- +2023-01-15T10:30:45 +2024-06-20T14:25:30 +2024-12-31T23:59:59 + +# Test IS [NOT] DISTINCT FROM + +query P +SELECT ts FROM t1 WHERE date_trunc('year', ts) IS NOT DISTINCT FROM timestamp '2024-01-01T00:00:00' ORDER BY ts; +---- +2024-06-20T14:25:30 +2024-12-31T23:59:59 + +query P +SELECT ts FROM t1 WHERE date_trunc('year', ts) IS DISTINCT FROM timestamp '2024-01-01T00:00:00' ORDER BY ts; +---- +2023-01-15T10:30:45 +2025-03-10T08:15:20 +NULL + +# Verify the plan shows the optimization (date_trunc should be rewritten) + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('year', ts) = timestamp '2024-01-01T00:00:00'; +---- +logical_plan +01)Filter: t1.ts >= TimestampNanosecond(1704067200000000000, None) AND t1.ts < TimestampNanosecond(1735689600000000000, None) +02)--TableScan: t1 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1704067200000000000 AND ts@0 < 1735689600000000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('year', ts) <> timestamp '2024-01-01T00:00:00'; +---- +logical_plan +01)Filter: t1.ts < TimestampNanosecond(1704067200000000000, None) OR t1.ts >= TimestampNanosecond(1735689600000000000, None) +02)--TableScan: t1 projection=[ts] +physical_plan +01)FilterExec: ts@0 < 1704067200000000000 OR ts@0 >= 1735689600000000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('year', ts) > timestamp '2024-01-01T00:00:00'; +---- +logical_plan +01)Filter: t1.ts >= TimestampNanosecond(1735689600000000000, None) +02)--TableScan: t1 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1735689600000000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Test YEAR granularity with non-aligned literal (2024-06-20 instead of 2024-01-01) +# date_trunc('year', x) can never equal '2024-06-20' because date_trunc always sets month=01, day=01 +# So this should return no rows (optimized to EmptyRelation) + +query P +SELECT ts FROM t1 WHERE date_trunc('year', ts) = timestamp '2024-06-20T14:25:30' ORDER BY ts; +---- + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('year', ts) = timestamp '2024-06-20T14:25:30'; +---- +logical_plan EmptyRelation: rows=0 +physical_plan EmptyExec + +# Test YEAR granularity with non-aligned literal and <> operator +# date_trunc('year', x) can never equal '2024-06-20', so <> is always true +# Should return all rows (optimized to full table scan) + +query P +SELECT ts FROM t1 WHERE date_trunc('year', ts) <> timestamp '2024-06-20T14:25:30' ORDER BY ts; +---- +2023-01-15T10:30:45 +2024-06-20T14:25:30 +2024-12-31T23:59:59 +2025-03-10T08:15:20 +NULL + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('year', ts) <> timestamp '2024-06-20T14:25:30'; +---- +logical_plan TableScan: t1 projection=[ts] +physical_plan DataSourceExec: partitions=1, partition_sizes=[1] + +# Test MONTH granularity + +query P +SELECT ts FROM t1 WHERE date_trunc('month', ts) = timestamp '2024-06-01T00:00:00' ORDER BY ts; +---- +2024-06-20T14:25:30 + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('month', ts) = timestamp '2024-06-01T00:00:00'; +---- +logical_plan +01)Filter: t1.ts >= TimestampNanosecond(1717200000000000000, None) AND t1.ts < TimestampNanosecond(1719792000000000000, None) +02)--TableScan: t1 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1717200000000000000 AND ts@0 < 1719792000000000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Test MONTH granularity with non-aligned literal (2024-06-20 instead of 2024-06-01) +# date_trunc('month', x) can never equal '2024-06-20' because date_trunc always sets day=01 +# So this should return no rows (optimized to EmptyRelation) + +query P +SELECT ts FROM t1 WHERE date_trunc('month', ts) = timestamp '2024-06-20T14:25:30' ORDER BY ts; +---- + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('month', ts) = timestamp '2024-06-20T14:25:30'; +---- +logical_plan EmptyRelation: rows=0 +physical_plan EmptyExec + +# Test QUARTER granularity + +query P +SELECT ts FROM t1 WHERE date_trunc('quarter', ts) = timestamp '2024-04-01T00:00:00' ORDER BY ts; +---- +2024-06-20T14:25:30 + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('quarter', ts) = timestamp '2024-04-01T00:00:00'; +---- +logical_plan +01)Filter: t1.ts >= TimestampNanosecond(1711929600000000000, None) AND t1.ts < TimestampNanosecond(1719792000000000000, None) +02)--TableScan: t1 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1711929600000000000 AND ts@0 < 1719792000000000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Test WEEK granularity + +query P +SELECT ts FROM t1 WHERE date_trunc('week', ts) = timestamp '2024-06-17T00:00:00' ORDER BY ts; +---- +2024-06-20T14:25:30 + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('week', ts) = timestamp '2024-06-17T00:00:00'; +---- +logical_plan +01)Filter: t1.ts >= TimestampNanosecond(1718582400000000000, None) AND t1.ts < TimestampNanosecond(1719187200000000000, None) +02)--TableScan: t1 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1718582400000000000 AND ts@0 < 1719187200000000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Test DAY granularity + +query P +SELECT ts FROM t1 WHERE date_trunc('day', ts) = timestamp '2024-06-20T00:00:00' ORDER BY ts; +---- +2024-06-20T14:25:30 + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('day', ts) = timestamp '2024-06-20T00:00:00'; +---- +logical_plan +01)Filter: t1.ts >= TimestampNanosecond(1718841600000000000, None) AND t1.ts < TimestampNanosecond(1718928000000000000, None) +02)--TableScan: t1 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1718841600000000000 AND ts@0 < 1718928000000000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Test HOUR granularity + +query P +SELECT ts FROM t1 WHERE date_trunc('hour', ts) = timestamp '2024-06-20T14:00:00' ORDER BY ts; +---- +2024-06-20T14:25:30 + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('hour', ts) = timestamp '2024-06-20T14:00:00'; +---- +logical_plan +01)Filter: t1.ts >= TimestampNanosecond(1718892000000000000, None) AND t1.ts < TimestampNanosecond(1718895600000000000, None) +02)--TableScan: t1 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1718892000000000000 AND ts@0 < 1718895600000000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Test MINUTE granularity + +query P +SELECT ts FROM t1 WHERE date_trunc('minute', ts) = timestamp '2024-06-20T14:25:00' ORDER BY ts; +---- +2024-06-20T14:25:30 + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('minute', ts) = timestamp '2024-06-20T14:25:00'; +---- +logical_plan +01)Filter: t1.ts >= TimestampNanosecond(1718893500000000000, None) AND t1.ts < TimestampNanosecond(1718893560000000000, None) +02)--TableScan: t1 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1718893500000000000 AND ts@0 < 1718893560000000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Test SECOND granularity + +query P +SELECT ts FROM t1 WHERE date_trunc('second', ts) = timestamp '2024-06-20T14:25:30' ORDER BY ts; +---- +2024-06-20T14:25:30 + +query TT +EXPLAIN SELECT ts FROM t1 WHERE date_trunc('second', ts) = timestamp '2024-06-20T14:25:30'; +---- +logical_plan +01)Filter: t1.ts >= TimestampNanosecond(1718893530000000000, None) AND t1.ts < TimestampNanosecond(1718893531000000000, None) +02)--TableScan: t1 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1718893530000000000 AND ts@0 < 1718893531000000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Test MILLISECOND granularity + +statement ok +CREATE TABLE t2(ts TIMESTAMP) AS VALUES + ('2024-06-20T14:25:30.123456'::timestamp), + ('2024-06-20T14:25:30.123999'::timestamp), + ('2024-06-20T14:25:30.124001'::timestamp); + +query P +SELECT ts FROM t2 WHERE date_trunc('millisecond', ts) = timestamp '2024-06-20T14:25:30.123' ORDER BY ts; +---- +2024-06-20T14:25:30.123456 +2024-06-20T14:25:30.123999 + +query TT +EXPLAIN SELECT ts FROM t2 WHERE date_trunc('millisecond', ts) = timestamp '2024-06-20T14:25:30.123'; +---- +logical_plan +01)Filter: t2.ts >= TimestampNanosecond(1718893530123000000, None) AND t2.ts < TimestampNanosecond(1718893530124000000, None) +02)--TableScan: t2 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1718893530123000000 AND ts@0 < 1718893530124000000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Test MICROSECOND granularity + +query P +SELECT ts FROM t2 WHERE date_trunc('microsecond', ts) = timestamp '2024-06-20T14:25:30.123456' ORDER BY ts; +---- +2024-06-20T14:25:30.123456 + +query TT +EXPLAIN SELECT ts FROM t2 WHERE date_trunc('microsecond', ts) = timestamp '2024-06-20T14:25:30.123456'; +---- +logical_plan +01)Filter: t2.ts >= TimestampNanosecond(1718893530123456000, None) AND t2.ts < TimestampNanosecond(1718893530123457000, None) +02)--TableScan: t2 projection=[ts] +physical_plan +01)FilterExec: ts@0 >= 1718893530123456000 AND ts@0 < 1718893530123457000 +02)--DataSourceExec: partitions=1, partition_sizes=[1] + +# Test with timestamp with timezone + +statement ok +SET datafusion.execution.time_zone = 'America/New_York'; + +statement ok +CREATE TABLE t3(ts TIMESTAMPTZ) AS VALUES + (NULL), + ('2023-12-31T23:59:59-05:00'::timestamptz), + ('2024-01-01T00:00:00-05:00'::timestamptz), + ('2024-12-31T23:59:59-05:00'::timestamptz), + ('2025-01-01T00:00:00-05:00'::timestamptz); + +query P +SELECT ts FROM t3 WHERE date_trunc('year', ts) = timestamp '2024-01-01T00:00:00-05:00' ORDER BY ts; +---- + +query TT +EXPLAIN SELECT ts FROM t3 WHERE date_trunc('year', ts) = timestamp '2024-01-01T00:00:00-05:00'; +---- +logical_plan EmptyRelation: rows=0 +physical_plan EmptyExec + +statement ok +RESET datafusion.execution.time_zone; + +# Test date_trunc preimage optimization with Time types +# These tests verify that the preimage computation works correctly for Time types with valid granularities + +statement ok +CREATE TABLE time_test (id INT, val VARCHAR) AS VALUES (1, '12:34:56'); + +# Valid granularity for Time32Second: the preimage optimization should create a range filter +query TT +EXPLAIN SELECT * FROM time_test WHERE date_trunc('second', arrow_cast(val, 'Time32(Second)')) = arrow_cast('12:34:56', 'Time32(Second)'); +---- +logical_plan +01)Projection: time_test.id, time_test.val +02)--Filter: __common_expr_3 >= Time32Second("45296") AND __common_expr_3 < Time32Second("45297") +03)----Projection: CAST(time_test.val AS Time32(s)) AS __common_expr_3, time_test.id, time_test.val +04)------TableScan: time_test projection=[id, val] +physical_plan +01)FilterExec: __common_expr_3@0 >= 45296 AND __common_expr_3@0 < 45297, projection=[id@1, val@2] +02)--ProjectionExec: expr=[CAST(val@1 AS Time32(s)) as __common_expr_3, id@0 as id, val@1 as val] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +# Valid granularity for Time32Millisecond: the preimage optimization should create a range filter +query TT +EXPLAIN SELECT * FROM time_test WHERE date_trunc('millisecond', arrow_cast(val, 'Time32(Millisecond)')) = arrow_cast('12:34:56', 'Time32(Millisecond)'); +---- +logical_plan +01)Projection: time_test.id, time_test.val +02)--Filter: __common_expr_3 >= Time32Millisecond("45296000") AND __common_expr_3 < Time32Millisecond("45296001") +03)----Projection: CAST(time_test.val AS Time32(ms)) AS __common_expr_3, time_test.id, time_test.val +04)------TableScan: time_test projection=[id, val] +physical_plan +01)FilterExec: __common_expr_3@0 >= 45296000 AND __common_expr_3@0 < 45296001, projection=[id@1, val@2] +02)--ProjectionExec: expr=[CAST(val@1 AS Time32(ms)) as __common_expr_3, id@0 as id, val@1 as val] +03)----DataSourceExec: partitions=1, partition_sizes=[1] + +statement ok +DROP TABLE time_test; + +# Cleanup +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; + +statement ok +DROP TABLE t3;