From 1a2ae1bc3eb74290a40b605c7a6483b6d7b732d0 Mon Sep 17 00:00:00 2001 From: sjhddh <151469562+sjhddh@users.noreply.github.com> Date: Wed, 3 Jun 2026 20:23:58 +0200 Subject: [PATCH] feat: implement Spark-compatible weekday function Implements the Spark `weekday` scalar function (issue #22599), returning the day of the week as a 0-indexed integer where Monday = 0 .. Sunday = 6. This differs from `dayofweek` (1-indexed, Sunday = 1). The implementation is modeled on the sibling `monthname` function and uses Arrow's `DatePart::DayOfWeekMonday0` kernel, which returns 0..=6 with Monday = 0 -- an exact match for Spark semantics. It accepts Date and implicitly coerces Timestamp input, returns Int32, and propagates null input to null output. Adds Rust unit tests and sqllogictest coverage for scalar/array/timestamp/ null/error cases. Closes #22599 Signed-off-by: sjhddh <151469562+sjhddh@users.noreply.github.com> --- datafusion/spark/src/function/datetime/mod.rs | 8 + .../spark/src/function/datetime/weekday.rs | 191 ++++++++++++++++++ .../test_files/spark/datetime/weekday.slt | 108 +++++++++- 3 files changed, 305 insertions(+), 2 deletions(-) create mode 100644 datafusion/spark/src/function/datetime/weekday.rs diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index 98afa91ddc834..70ab024c329aa 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -32,6 +32,7 @@ pub mod time_trunc; pub mod to_utc_timestamp; pub mod trunc; pub mod unix; +pub mod weekday; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; @@ -59,6 +60,7 @@ make_udf_function!(time_trunc::SparkTimeTrunc, time_trunc); make_udf_function!(to_utc_timestamp::SparkToUtcTimestamp, to_utc_timestamp); make_udf_function!(trunc::SparkTrunc, trunc); make_udf_function!(unix::SparkUnixDate, unix_date); +make_udf_function!(weekday::SparkWeekDay, weekday); make_udf_function!( unix::SparkUnixTimestamp, unix_micros, @@ -186,6 +188,11 @@ pub mod expr_fn { "Returns the number of seconds since epoch (1970-01-01 00:00:00 UTC) for the given timestamp `ts`.", ts )); + export_functions!(( + weekday, + "Returns the day of the week for date/timestamp as an integer where Monday = 0, Tuesday = 1, ..., Sunday = 6.", + arg1 + )); } pub fn functions() -> Vec> { @@ -212,5 +219,6 @@ pub fn functions() -> Vec> { unix_micros(), unix_millis(), unix_seconds(), + weekday(), ] } diff --git a/datafusion/spark/src/function/datetime/weekday.rs b/datafusion/spark/src/function/datetime/weekday.rs new file mode 100644 index 0000000000000..b9ac7e43750ba --- /dev/null +++ b/datafusion/spark/src/function/datetime/weekday.rs @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::AsArray; +use arrow::compute::{DatePart, date_part}; +use arrow::datatypes::{DataType, Field, FieldRef, Int32Type}; +use datafusion_common::types::{NativeType, logical_date}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{Result, ScalarValue, internal_err}; +use datafusion_expr::{ + Coercion, ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, + Signature, TypeSignatureClass, Volatility, +}; + +/// Spark-compatible `weekday` expression. +/// Returns the day of the week for a date or timestamp as an integer index where +/// Monday = 0, Tuesday = 1, ..., Sunday = 6. +/// +/// Note: this differs from `dayofweek`, which is 1-indexed with Sunday = 1. +/// +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkWeekDay { + signature: Signature, +} + +impl Default for SparkWeekDay { + fn default() -> Self { + Self::new() + } +} + +impl SparkWeekDay { + pub fn new() -> Self { + Self { + signature: Signature::coercible( + vec![Coercion::new_implicit( + TypeSignatureClass::Native(logical_date()), + vec![TypeSignatureClass::Timestamp], + NativeType::Date, + )], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkWeekDay { + fn name(&self) -> &str { + "weekday" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_field_from_args should be used instead") + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + let nullable = args.arg_fields.iter().any(|f| f.is_nullable()); + Ok(Arc::new(Field::new(self.name(), DataType::Int32, nullable))) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let [arg] = take_function_args(self.name(), args.args)?; + match arg { + ColumnarValue::Scalar(scalar) => { + if scalar.is_null() { + return Ok(ColumnarValue::Scalar(ScalarValue::Int32(None))); + } + let arr = scalar.to_array_of_size(1)?; + // `DayOfWeekMonday0` returns 0..=6 with Monday = 0, which + // matches Spark `weekday` semantics exactly. + let weekday_arr = date_part(&arr, DatePart::DayOfWeekMonday0)?; + let value = weekday_arr.as_primitive::().value(0); + Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(value)))) + } + ColumnarValue::Array(arr) => { + let weekday_arr = date_part(&arr, DatePart::DayOfWeekMonday0)?; + Ok(ColumnarValue::Array(weekday_arr)) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Date32Array, Int32Array}; + + #[test] + fn test_weekday_return_field_nullability_matches_input() { + let func = SparkWeekDay::new(); + + let non_nullable_arg = Arc::new(Field::new("arg", DataType::Date32, false)); + let nullable_arg = Arc::new(Field::new("arg", DataType::Date32, true)); + + let non_nullable_out = func + .return_field_from_args(ReturnFieldArgs { + arg_fields: &[Arc::clone(&non_nullable_arg)], + scalar_arguments: &[None], + }) + .expect("non-nullable arg should succeed"); + assert_eq!(non_nullable_out.data_type(), &DataType::Int32); + assert!(!non_nullable_out.is_nullable()); + + let nullable_out = func + .return_field_from_args(ReturnFieldArgs { + arg_fields: &[Arc::clone(&nullable_arg)], + scalar_arguments: &[None], + }) + .expect("nullable arg should succeed"); + assert_eq!(nullable_out.data_type(), &DataType::Int32); + assert!(nullable_out.is_nullable()); + } + + #[test] + fn test_weekday_scalar() -> Result<()> { + let func = SparkWeekDay::new(); + + // 2024-03-15 is a Friday -> Spark weekday = 4 (Mon=0). + let result = func.invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Date32(Some(19797)))], + arg_fields: vec![Arc::new(Field::new("arg", DataType::Date32, true))], + number_rows: 1, + return_field: Arc::new(Field::new("weekday", DataType::Int32, true)), + config_options: Arc::new(Default::default()), + })?; + match result { + ColumnarValue::Scalar(ScalarValue::Int32(Some(v))) => assert_eq!(v, 4), + other => panic!("unexpected result: {other:?}"), + } + + // NULL input -> NULL output. + let result = func.invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Scalar(ScalarValue::Date32(None))], + arg_fields: vec![Arc::new(Field::new("arg", DataType::Date32, true))], + number_rows: 1, + return_field: Arc::new(Field::new("weekday", DataType::Int32, true)), + config_options: Arc::new(Default::default()), + })?; + match result { + ColumnarValue::Scalar(ScalarValue::Int32(None)) => {} + other => panic!("unexpected result: {other:?}"), + } + + Ok(()) + } + + #[test] + fn test_weekday_array() -> Result<()> { + let func = SparkWeekDay::new(); + + // 2024-01-01 Mon(0), 2024-01-06 Sat(5), 2024-01-07 Sun(6), NULL. + let input = Date32Array::from(vec![Some(19723), Some(19728), Some(19729), None]); + let result = func.invoke_with_args(ScalarFunctionArgs { + args: vec![ColumnarValue::Array(Arc::new(input))], + arg_fields: vec![Arc::new(Field::new("arg", DataType::Date32, true))], + number_rows: 4, + return_field: Arc::new(Field::new("weekday", DataType::Int32, true)), + config_options: Arc::new(Default::default()), + })?; + match result { + ColumnarValue::Array(arr) => { + let expected = Int32Array::from(vec![Some(0), Some(5), Some(6), None]); + assert_eq!(arr.as_primitive::(), &expected); + } + other => panic!("unexpected result: {other:?}"), + } + + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/spark/datetime/weekday.slt b/datafusion/sqllogictest/test_files/spark/datetime/weekday.slt index b4f5444e8a2da..efa6b898c2a6a 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/weekday.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/weekday.slt @@ -23,5 +23,109 @@ ## Original Query: SELECT weekday('2009-07-30'); ## PySpark 3.5.5 Result: {'weekday(2009-07-30)': 3, 'typeof(weekday(2009-07-30))': 'int', 'typeof(2009-07-30)': 'string'} -#query -#SELECT weekday('2009-07-30'::string); +# Spark `weekday` is 0-indexed with Monday = 0 .. Sunday = 6. +# 2009-07-30 is a Thursday -> 3. +query I +SELECT weekday('2009-07-30'::DATE); +---- +3 + +# All seven days of one week (2024-01-01 is a Monday). +query I +SELECT weekday('2024-01-01'::DATE); +---- +0 + +query I +SELECT weekday('2024-01-02'::DATE); +---- +1 + +query I +SELECT weekday('2024-01-03'::DATE); +---- +2 + +query I +SELECT weekday('2024-01-04'::DATE); +---- +3 + +query I +SELECT weekday('2024-01-05'::DATE); +---- +4 + +query I +SELECT weekday('2024-01-06'::DATE); +---- +5 + +query I +SELECT weekday('2024-01-07'::DATE); +---- +6 + +# NULL handling +query I +SELECT weekday(NULL::DATE); +---- +NULL + +# Array input (mix of weekdays and NULL) +query I +SELECT weekday(d) FROM (VALUES ('2024-01-01'::DATE), ('2024-01-06'::DATE), ('2024-01-07'::DATE), (NULL::DATE)) AS t(d); +---- +0 +5 +6 +NULL + +# Timestamp input: Spark coerces TIMESTAMP/TIMESTAMP_NTZ to DATE before evaluation +query I +SELECT weekday('2009-07-30 12:34:56'::TIMESTAMP); +---- +3 + +query I +SELECT weekday(NULL::TIMESTAMP); +---- +NULL + +# Timestamp array input +query I +SELECT weekday(ts) FROM (VALUES + ('2024-01-01 01:02:03'::TIMESTAMP), + ('2024-01-06 10:20:30'::TIMESTAMP), + ('2024-01-07 23:59:59'::TIMESTAMP), + (NULL::TIMESTAMP) +) AS t(ts); +---- +0 +5 +6 +NULL + +# TIMESTAMP_NTZ (Timestamp without timezone) — explicit Microsecond precision +query I +SELECT weekday(arrow_cast('2009-07-30 09:15:00', 'Timestamp(Microsecond, None)')); +---- +3 + +# TIMESTAMP with timezone (Spark TIMESTAMP / LTZ) — coerces to Date32 +query I +SELECT weekday(arrow_cast('2024-01-07 03:00:00', 'Timestamp(Nanosecond, Some("UTC"))')); +---- +6 + +# Error: wrong argument type (string without cast) +statement error Function 'weekday' requires Date, but received String +SELECT weekday('not-a-date'); + +# Error: wrong argument type (integer) +statement error Function 'weekday' requires Date, but received Int64 +SELECT weekday(123); + +# Error: no arguments +statement error 'weekday' does not support zero arguments +SELECT weekday();