From 562e8855a0d15446c0e49892fe9ebfed748116a8 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Mon, 2 Mar 2026 12:52:16 -0500 Subject: [PATCH 1/2] Add benchmark for pad with scalar length and fill --- datafusion/functions/benches/pad.rs | 195 ++++++++++++++++++++++++++++ 1 file changed, 195 insertions(+) diff --git a/datafusion/functions/benches/pad.rs b/datafusion/functions/benches/pad.rs index 0f856f0fef384..a3bd698c5b4cd 100644 --- a/datafusion/functions/benches/pad.rs +++ b/datafusion/functions/benches/pad.rs @@ -24,6 +24,7 @@ use arrow::util::bench_util::{ create_string_array_with_len, create_string_view_array_with_len, }; use criterion::{Criterion, SamplingMode, criterion_group, criterion_main}; +use datafusion_common::ScalarValue; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; use datafusion_functions::unicode; @@ -172,6 +173,32 @@ fn create_pad_args( } } +/// Create args for pad benchmark with scalar length and fill (common pattern: +/// `lpad(column, 20, '0')`). +fn create_scalar_pad_args( + size: usize, + str_len: usize, + target_len: i64, + fill: &str, + use_string_view: bool, +) -> Vec { + if use_string_view { + let string_array = create_string_view_array_with_len(size, 0.1, str_len, false); + vec![ + ColumnarValue::Array(Arc::new(string_array)), + ColumnarValue::Scalar(ScalarValue::Int64(Some(target_len))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some(fill.to_string()))), + ] + } else { + let string_array = create_string_array_with_len::(size, 0.1, str_len); + vec![ + ColumnarValue::Array(Arc::new(string_array)), + ColumnarValue::Scalar(ScalarValue::Int64(Some(target_len))), + ColumnarValue::Scalar(ScalarValue::Utf8(Some(fill.to_string()))), + ] + } +} + fn criterion_benchmark(c: &mut Criterion) { for size in [1024, 4096] { let mut group = c.benchmark_group(format!("lpad size={size}")); @@ -336,6 +363,90 @@ fn criterion_benchmark(c: &mut Criterion) { }, ); + // --- Scalar length + fill benchmarks --- + + // Utf8 with scalar length and fill (3-arg) + let args = create_scalar_pad_args::(size, 5, 20, "x", false); + let arg_fields = args + .iter() + .enumerate() + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) + .collect::>(); + + group.bench_function( + format!("lpad utf8 scalar [size={size}, str_len=5, target=20, fill='x']"), + |b| { + b.iter(|| { + let args_cloned = args.clone(); + black_box(unicode::lpad().invoke_with_args(ScalarFunctionArgs { + args: args_cloned, + arg_fields: arg_fields.clone(), + number_rows: size, + return_field: Field::new("f", DataType::Utf8, true).into(), + config_options: Arc::clone(&config_options), + })) + }) + }, + ); + + // StringView with scalar length and fill (3-arg) + let args = create_scalar_pad_args::(size, 5, 20, "x", true); + let arg_fields = args + .iter() + .enumerate() + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) + .collect::>(); + + group.bench_function( + format!( + "lpad stringview scalar [size={size}, str_len=5, target=20, fill='x']" + ), + |b| { + b.iter(|| { + let args_cloned = args.clone(); + black_box(unicode::lpad().invoke_with_args(ScalarFunctionArgs { + args: args_cloned, + arg_fields: arg_fields.clone(), + number_rows: size, + return_field: Field::new("f", DataType::Utf8, true).into(), + config_options: Arc::clone(&config_options), + })) + }) + }, + ); + + // Utf8 with scalar length and unicode fill + let args = create_scalar_pad_args::(size, 5, 20, "é", false); + let arg_fields = args + .iter() + .enumerate() + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) + .collect::>(); + + group.bench_function( + format!( + "lpad utf8 scalar unicode [size={size}, str_len=5, target=20, fill='é']" + ), + |b| { + b.iter(|| { + let args_cloned = args.clone(); + black_box(unicode::lpad().invoke_with_args(ScalarFunctionArgs { + args: args_cloned, + arg_fields: arg_fields.clone(), + number_rows: size, + return_field: Field::new("f", DataType::Utf8, true).into(), + config_options: Arc::clone(&config_options), + })) + }) + }, + ); + group.finish(); } @@ -502,6 +613,90 @@ fn criterion_benchmark(c: &mut Criterion) { }, ); + // --- Scalar length + fill benchmarks --- + + // Utf8 with scalar length and fill (3-arg) + let args = create_scalar_pad_args::(size, 5, 20, "x", false); + let arg_fields = args + .iter() + .enumerate() + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) + .collect::>(); + + group.bench_function( + format!("rpad utf8 scalar [size={size}, str_len=5, target=20, fill='x']"), + |b| { + b.iter(|| { + let args_cloned = args.clone(); + black_box(unicode::rpad().invoke_with_args(ScalarFunctionArgs { + args: args_cloned, + arg_fields: arg_fields.clone(), + number_rows: size, + return_field: Field::new("f", DataType::Utf8, true).into(), + config_options: Arc::clone(&config_options), + })) + }) + }, + ); + + // StringView with scalar length and fill (3-arg) + let args = create_scalar_pad_args::(size, 5, 20, "x", true); + let arg_fields = args + .iter() + .enumerate() + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) + .collect::>(); + + group.bench_function( + format!( + "rpad stringview scalar [size={size}, str_len=5, target=20, fill='x']" + ), + |b| { + b.iter(|| { + let args_cloned = args.clone(); + black_box(unicode::rpad().invoke_with_args(ScalarFunctionArgs { + args: args_cloned, + arg_fields: arg_fields.clone(), + number_rows: size, + return_field: Field::new("f", DataType::Utf8, true).into(), + config_options: Arc::clone(&config_options), + })) + }) + }, + ); + + // Utf8 with scalar length and unicode fill + let args = create_scalar_pad_args::(size, 5, 20, "é", false); + let arg_fields = args + .iter() + .enumerate() + .map(|(idx, arg)| { + Field::new(format!("arg_{idx}"), arg.data_type(), true).into() + }) + .collect::>(); + + group.bench_function( + format!( + "rpad utf8 scalar unicode [size={size}, str_len=5, target=20, fill='é']" + ), + |b| { + b.iter(|| { + let args_cloned = args.clone(); + black_box(unicode::rpad().invoke_with_args(ScalarFunctionArgs { + args: args_cloned, + arg_fields: arg_fields.clone(), + number_rows: size, + return_field: Field::new("f", DataType::Utf8, true).into(), + config_options: Arc::clone(&config_options), + })) + }) + }, + ); + group.finish(); } } From 96bd9531c0c87117af6ab9852ec08a89b56fa067 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Mon, 2 Mar 2026 12:52:45 -0500 Subject: [PATCH 2/2] Optimize lpad and rpad --- datafusion/functions/src/unicode/common.rs | 18 + datafusion/functions/src/unicode/lpad.rs | 247 ++++++-- datafusion/functions/src/unicode/rpad.rs | 578 +++++++++++------- datafusion/functions/src/unicode/translate.rs | 8 +- 4 files changed, 601 insertions(+), 250 deletions(-) diff --git a/datafusion/functions/src/unicode/common.rs b/datafusion/functions/src/unicode/common.rs index 93f0c7900961e..002776e6c6538 100644 --- a/datafusion/functions/src/unicode/common.rs +++ b/datafusion/functions/src/unicode/common.rs @@ -23,14 +23,32 @@ use arrow::array::{ }; use arrow::datatypes::DataType; use arrow_buffer::{NullBuffer, ScalarBuffer}; +use datafusion_common::ScalarValue; use datafusion_common::cast::{ as_generic_string_array, as_int64_array, as_string_view_array, }; use datafusion_common::exec_err; +use datafusion_expr::ColumnarValue; use std::cmp::Ordering; use std::ops::Range; use std::sync::Arc; +/// If `cv` is a non-null scalar string, return its value. +pub(crate) fn try_as_scalar_str(cv: &ColumnarValue) -> Option<&str> { + match cv { + ColumnarValue::Scalar(s) => s.try_as_str().flatten(), + _ => None, + } +} + +/// If `cv` is a non-null scalar Int64, return its value. +pub(crate) fn try_as_scalar_i64(cv: &ColumnarValue) -> Option { + match cv { + ColumnarValue::Scalar(ScalarValue::Int64(v)) => *v, + _ => None, + } +} + /// A trait for `left` and `right` byte slicing operations pub(crate) trait LeftRightSlicer { fn slice(string: &str, n: i64) -> Range; diff --git a/datafusion/functions/src/unicode/lpad.rs b/datafusion/functions/src/unicode/lpad.rs index 50d15c7d62a65..ae2d7cb8653aa 100644 --- a/datafusion/functions/src/unicode/lpad.rs +++ b/datafusion/functions/src/unicode/lpad.rs @@ -32,7 +32,8 @@ use datafusion_common::cast::as_int64_array; use datafusion_common::{Result, exec_err}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ - ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, + Volatility, }; use datafusion_macros::user_doc; @@ -112,14 +113,64 @@ impl ScalarUDFImpl for LPadFunc { utf8_to_str_type(&arg_types[0], "lpad") } - fn invoke_with_args( - &self, - args: datafusion_expr::ScalarFunctionArgs, - ) -> Result { - let args = &args.args; + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let ScalarFunctionArgs { + args, number_rows, .. + } = args; + + const MAX_SCALAR_TARGET_LEN: usize = 8192; + + // If target_len and fill (if specified) are constants, use the scalar + // fast path. + if let Some(target_len) = try_as_scalar_i64(&args[1]) { + let target_len: usize = match usize::try_from(target_len) { + Ok(n) if n <= i32::MAX as usize => n, + Ok(n) => { + return exec_err!("lpad requested length {n} too large"); + } + Err(_) => 0, // negative → 0 + }; + + let fill_str = if args.len() == 3 { + try_as_scalar_str(&args[2]) + } else { + Some(" ") + }; + + // Skip the fast path for very large `target_len` values to avoid + // consuming too much memory. Such large padding values are uncommon + // in practice. + if target_len <= MAX_SCALAR_TARGET_LEN + && let Some(fill) = fill_str + { + let string_array = args[0].to_array_of_size(number_rows)?; + let result = match string_array.data_type() { + Utf8View => lpad_scalar_args::<_, i32>( + string_array.as_string_view(), + target_len, + fill, + ), + Utf8 => lpad_scalar_args::<_, i32>( + string_array.as_string::(), + target_len, + fill, + ), + LargeUtf8 => lpad_scalar_args::<_, i64>( + string_array.as_string::(), + target_len, + fill, + ), + other => { + exec_err!("Unsupported data type {other:?} for function lpad") + } + }?; + return Ok(ColumnarValue::Array(result)); + } + } + match args[0].data_type() { - Utf8 | Utf8View => make_scalar_function(lpad::, vec![])(args), - LargeUtf8 => make_scalar_function(lpad::, vec![])(args), + Utf8 | Utf8View => make_scalar_function(lpad::, vec![])(&args), + LargeUtf8 => make_scalar_function(lpad::, vec![])(&args), other => exec_err!("Unsupported data type {other:?} for function lpad"), } } @@ -129,8 +180,126 @@ impl ScalarUDFImpl for LPadFunc { } } -/// Extends the string to length 'length' by prepending the characters fill (a space by default). -/// If the string is already longer than length then it is truncated (on the right). +use super::common::{try_as_scalar_i64, try_as_scalar_str}; + +/// Optimized lpad for constant target_len and fill arguments. +fn lpad_scalar_args<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>( + string_array: V, + target_len: usize, + fill: &str, +) -> Result { + if string_array.is_ascii() && fill.is_ascii() { + lpad_scalar_ascii::(string_array, target_len, fill) + } else { + lpad_scalar_unicode::(string_array, target_len, fill) + } +} + +fn lpad_scalar_ascii<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>( + string_array: V, + target_len: usize, + fill: &str, +) -> Result { + // With a scalar `target_len` and `fill`, we can precompute a padding + // buffer of `target_len` fill characters repeated cyclically. + let padding_buf = if !fill.is_empty() { + let mut buf = String::with_capacity(target_len); + while buf.len() < target_len { + let remaining = target_len - buf.len(); + if remaining >= fill.len() { + buf.push_str(fill); + } else { + buf.push_str(&fill[..remaining]); + } + } + buf + } else { + String::new() + }; + + // Each output row is exactly `target_len` ASCII bytes (padding + string). + let data_capacity = string_array.len().saturating_mul(target_len); + let mut builder = + GenericStringBuilder::::with_capacity(string_array.len(), data_capacity); + + for maybe_string in string_array.iter() { + match maybe_string { + Some(string) => { + let str_len = string.len(); + if target_len <= str_len { + builder.append_value(&string[..target_len]); + } else if fill.is_empty() { + builder.append_value(string); + } else { + let pad_needed = target_len - str_len; + builder.write_str(&padding_buf[..pad_needed])?; + builder.append_value(string); + } + } + None => builder.append_null(), + } + } + + Ok(Arc::new(builder.finish()) as ArrayRef) +} + +fn lpad_scalar_unicode<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>( + string_array: V, + target_len: usize, + fill: &str, +) -> Result { + let fill_chars: Vec = fill.chars().collect(); + + // With a scalar `target_len` and `fill`, we can precompute a padding buffer + // of `target_len` fill characters repeated cyclically. Because Unicode + // characters are variable-width, we build a byte-offset table to map from + // character count to the corresponding byte position in the padding buffer. + let (padding_buf, char_byte_offsets) = if !fill_chars.is_empty() { + let mut buf = String::new(); + let mut offsets = Vec::with_capacity(target_len + 1); + offsets.push(0usize); + for i in 0..target_len { + buf.push(fill_chars[i % fill_chars.len()]); + offsets.push(buf.len()); + } + (buf, offsets) + } else { + (String::new(), vec![0]) + }; + + // Each output row is `target_len` chars; multiply by 4 (max UTF-8 bytes + // per char) for an upper bound in bytes. + let data_capacity = string_array.len().saturating_mul(target_len * 4); + let mut builder = + GenericStringBuilder::::with_capacity(string_array.len(), data_capacity); + let mut graphemes_buf = Vec::new(); + + for maybe_string in string_array.iter() { + match maybe_string { + Some(string) => { + graphemes_buf.clear(); + graphemes_buf.extend(string.graphemes(true)); + + if target_len < graphemes_buf.len() { + builder.append_value(graphemes_buf[..target_len].concat()); + } else if fill_chars.is_empty() { + builder.append_value(string); + } else { + let pad_chars = target_len - graphemes_buf.len(); + let pad_bytes = char_byte_offsets[pad_chars]; + builder.write_str(&padding_buf[..pad_bytes])?; + builder.append_value(string); + } + } + None => builder.append_null(), + } + } + + Ok(Arc::new(builder.finish()) as ArrayRef) +} + +/// Left-pads `string` to `target_len` using the fill string (default: space). +/// Truncates from the right if `string` is already longer than `target_len`. /// lpad('hi', 5, 'xy') = 'xyxhi' fn lpad(args: &[ArrayRef]) -> Result { if args.len() <= 1 || args.len() > 3 { @@ -212,18 +381,24 @@ where let mut graphemes_buf = Vec::new(); let mut fill_chars_buf = Vec::new(); - for ((string, length), fill) in string_array + for ((string, target_len), fill) in string_array .iter() .zip(length_array.iter()) .zip(fill_array.iter()) { - if let (Some(string), Some(length), Some(fill)) = (string, length, fill) { - if length > i32::MAX as i64 { - return exec_err!("lpad requested length {length} too large"); + if let (Some(string), Some(target_len), Some(fill)) = + (string, target_len, fill) + { + if target_len > i32::MAX as i64 { + return exec_err!("lpad requested length {target_len} too large"); } - let length = if length < 0 { 0 } else { length as usize }; - if length == 0 { + let target_len = if target_len < 0 { + 0 + } else { + target_len as usize + }; + if target_len == 0 { builder.append_value(""); continue; } @@ -232,12 +407,12 @@ where // ASCII fast path: byte length == character length, // so we skip expensive grapheme segmentation. let str_len = string.len(); - if length < str_len { - builder.append_value(&string[..length]); + if target_len < str_len { + builder.append_value(&string[..target_len]); } else if fill.is_empty() { builder.append_value(string); } else { - let pad_len = length - str_len; + let pad_len = target_len - str_len; let fill_len = fill.len(); let full_reps = pad_len / fill_len; let remainder = pad_len % fill_len; @@ -257,12 +432,12 @@ where fill_chars_buf.clear(); fill_chars_buf.extend(fill.chars()); - if length < graphemes_buf.len() { - builder.append_value(graphemes_buf[..length].concat()); + if target_len < graphemes_buf.len() { + builder.append_value(graphemes_buf[..target_len].concat()); } else if fill_chars_buf.is_empty() { builder.append_value(string); } else { - for l in 0..length - graphemes_buf.len() { + for l in 0..target_len - graphemes_buf.len() { let c = *fill_chars_buf.get(l % fill_chars_buf.len()).unwrap(); builder.write_char(c)?; @@ -280,14 +455,18 @@ where let mut builder: GenericStringBuilder = GenericStringBuilder::new(); let mut graphemes_buf = Vec::new(); - for (string, length) in string_array.iter().zip(length_array.iter()) { - if let (Some(string), Some(length)) = (string, length) { - if length > i32::MAX as i64 { - return exec_err!("lpad requested length {length} too large"); + for (string, target_len) in string_array.iter().zip(length_array.iter()) { + if let (Some(string), Some(target_len)) = (string, target_len) { + if target_len > i32::MAX as i64 { + return exec_err!("lpad requested length {target_len} too large"); } - let length = if length < 0 { 0 } else { length as usize }; - if length == 0 { + let target_len = if target_len < 0 { + 0 + } else { + target_len as usize + }; + if target_len == 0 { builder.append_value(""); continue; } @@ -295,10 +474,10 @@ where if string.is_ascii() { // ASCII fast path: byte length == character length let str_len = string.len(); - if length < str_len { - builder.append_value(&string[..length]); + if target_len < str_len { + builder.append_value(&string[..target_len]); } else { - for _ in 0..(length - str_len) { + for _ in 0..(target_len - str_len) { builder.write_str(" ")?; } builder.append_value(string); @@ -308,10 +487,10 @@ where graphemes_buf.clear(); graphemes_buf.extend(string.graphemes(true)); - if length < graphemes_buf.len() { - builder.append_value(graphemes_buf[..length].concat()); + if target_len < graphemes_buf.len() { + builder.append_value(graphemes_buf[..target_len].concat()); } else { - for _ in 0..(length - graphemes_buf.len()) { + for _ in 0..(target_len - graphemes_buf.len()) { builder.write_str(" ")?; } builder.append_value(string); diff --git a/datafusion/functions/src/unicode/rpad.rs b/datafusion/functions/src/unicode/rpad.rs index 95d9492bc246d..dc604943db418 100644 --- a/datafusion/functions/src/unicode/rpad.rs +++ b/datafusion/functions/src/unicode/rpad.rs @@ -15,25 +15,27 @@ // specific language governing permissions and limitations // under the License. -use crate::utils::{make_scalar_function, utf8_to_str_type}; +use std::any::Any; +use std::fmt::Write; +use std::sync::Arc; + use DataType::{LargeUtf8, Utf8, Utf8View}; use arrow::array::{ ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, Int64Array, OffsetSizeTrait, StringArrayType, StringViewArray, }; use arrow::datatypes::DataType; -use datafusion_common::DataFusionError; +use unicode_segmentation::UnicodeSegmentation; + +use crate::utils::{make_scalar_function, utf8_to_str_type}; use datafusion_common::cast::as_int64_array; use datafusion_common::{Result, exec_err}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ - ColumnarValue, Documentation, ScalarUDFImpl, Signature, Volatility, + ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature, + Volatility, }; use datafusion_macros::user_doc; -use std::any::Any; -use std::fmt::Write; -use std::sync::Arc; -use unicode_segmentation::UnicodeSegmentation; #[user_doc( doc_section(label = "String Functions"), @@ -111,36 +113,66 @@ impl ScalarUDFImpl for RPadFunc { utf8_to_str_type(&arg_types[0], "rpad") } - fn invoke_with_args( - &self, - args: datafusion_expr::ScalarFunctionArgs, - ) -> Result { - let args = &args.args; - match ( - args.len(), - args[0].data_type(), - args.get(2).map(|arg| arg.data_type()), - ) { - (2, Utf8 | Utf8View, _) => { - make_scalar_function(rpad::, vec![])(args) - } - (2, LargeUtf8, _) => make_scalar_function(rpad::, vec![])(args), - (3, Utf8 | Utf8View, Some(Utf8 | Utf8View)) => { - make_scalar_function(rpad::, vec![])(args) - } - (3, LargeUtf8, Some(LargeUtf8)) => { - make_scalar_function(rpad::, vec![])(args) - } - (3, Utf8 | Utf8View, Some(LargeUtf8)) => { - make_scalar_function(rpad::, vec![])(args) - } - (3, LargeUtf8, Some(Utf8 | Utf8View)) => { - make_scalar_function(rpad::, vec![])(args) - } - (_, _, _) => { - exec_err!("Unsupported combination of data types for function rpad") + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let ScalarFunctionArgs { + args, number_rows, .. + } = args; + + const MAX_SCALAR_TARGET_LEN: usize = 8192; + + // If target_len and fill (if specified) are constants, use the + // scalar fast path. + if let Some(target_len) = try_as_scalar_i64(&args[1]) { + let target_len: usize = match usize::try_from(target_len) { + Ok(n) if n <= i32::MAX as usize => n, + Ok(n) => { + return exec_err!("rpad requested length {n} too large"); + } + Err(_) => 0, // negative → 0 + }; + + let fill_str = if args.len() == 3 { + try_as_scalar_str(&args[2]) + } else { + Some(" ") + }; + + // Skip the fast path for very large `target_len` values to avoid + // consuming too much memory. Such large padding values are uncommon + // in practice. + if target_len <= MAX_SCALAR_TARGET_LEN + && let Some(fill) = fill_str + { + let string_array = args[0].to_array_of_size(number_rows)?; + let result = match string_array.data_type() { + Utf8View => rpad_scalar_args::<_, i32>( + string_array.as_string_view(), + target_len, + fill, + ), + Utf8 => rpad_scalar_args::<_, i32>( + string_array.as_string::(), + target_len, + fill, + ), + LargeUtf8 => rpad_scalar_args::<_, i64>( + string_array.as_string::(), + target_len, + fill, + ), + other => { + exec_err!("Unsupported data type {other:?} for function rpad") + } + }?; + return Ok(ColumnarValue::Array(result)); } } + + match args[0].data_type() { + Utf8 | Utf8View => make_scalar_function(rpad::, vec![])(&args), + LargeUtf8 => make_scalar_function(rpad::, vec![])(&args), + other => exec_err!("Unsupported data type {other:?} for function rpad"), + } } fn documentation(&self) -> Option<&Documentation> { @@ -148,204 +180,332 @@ impl ScalarUDFImpl for RPadFunc { } } -fn rpad( - args: &[ArrayRef], +use super::common::{try_as_scalar_i64, try_as_scalar_str}; + +/// Optimized rpad for constant target_len and fill arguments. +fn rpad_scalar_args<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>( + string_array: V, + target_len: usize, + fill: &str, +) -> Result { + if string_array.is_ascii() && fill.is_ascii() { + rpad_scalar_ascii::(string_array, target_len, fill) + } else { + rpad_scalar_unicode::(string_array, target_len, fill) + } +} + +fn rpad_scalar_ascii<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>( + string_array: V, + target_len: usize, + fill: &str, +) -> Result { + // With a scalar `target_len` and `fill`, we can precompute a padding + // buffer of `target_len` fill characters repeated cyclically. + let padding_buf = if !fill.is_empty() { + let mut buf = String::with_capacity(target_len); + while buf.len() < target_len { + let remaining = target_len - buf.len(); + if remaining >= fill.len() { + buf.push_str(fill); + } else { + buf.push_str(&fill[..remaining]); + } + } + buf + } else { + String::new() + }; + + // Each output row is exactly `target_len` ASCII bytes (string + padding). + let data_capacity = string_array.len().saturating_mul(target_len); + let mut builder = + GenericStringBuilder::::with_capacity(string_array.len(), data_capacity); + + for maybe_string in string_array.iter() { + match maybe_string { + Some(string) => { + let str_len = string.len(); + if target_len <= str_len { + builder.append_value(&string[..target_len]); + } else if fill.is_empty() { + builder.append_value(string); + } else { + let pad_needed = target_len - str_len; + builder.write_str(string)?; + builder.write_str(&padding_buf[..pad_needed])?; + builder.append_value(""); + } + } + None => builder.append_null(), + } + } + + Ok(Arc::new(builder.finish()) as ArrayRef) +} + +fn rpad_scalar_unicode<'a, V: StringArrayType<'a> + Copy, T: OffsetSizeTrait>( + string_array: V, + target_len: usize, + fill: &str, ) -> Result { - if args.len() < 2 || args.len() > 3 { + let fill_chars: Vec = fill.chars().collect(); + + // With a scalar `target_len` and `fill`, we can precompute a padding buffer + // of `target_len` fill characters repeated cyclically. Because Unicode + // characters are variable-width, we build a byte-offset table to map from + // character count to the corresponding byte position in the padding buffer. + let (padding_buf, char_byte_offsets) = if !fill_chars.is_empty() { + let mut buf = String::new(); + let mut offsets = Vec::with_capacity(target_len + 1); + offsets.push(0usize); + for i in 0..target_len { + buf.push(fill_chars[i % fill_chars.len()]); + offsets.push(buf.len()); + } + (buf, offsets) + } else { + (String::new(), vec![0]) + }; + + // Each output row is `target_len` chars; multiply by 4 (max UTF-8 bytes + // per char) for an upper bound in bytes. + let data_capacity = string_array.len().saturating_mul(target_len * 4); + let mut builder = + GenericStringBuilder::::with_capacity(string_array.len(), data_capacity); + let mut graphemes_buf = Vec::new(); + + for maybe_string in string_array.iter() { + match maybe_string { + Some(string) => { + graphemes_buf.clear(); + graphemes_buf.extend(string.graphemes(true)); + + if target_len < graphemes_buf.len() { + builder.append_value(graphemes_buf[..target_len].concat()); + } else if fill_chars.is_empty() { + builder.append_value(string); + } else { + let pad_chars = target_len - graphemes_buf.len(); + let pad_bytes = char_byte_offsets[pad_chars]; + builder.write_str(string)?; + builder.write_str(&padding_buf[..pad_bytes])?; + builder.append_value(""); + } + } + None => builder.append_null(), + } + } + + Ok(Arc::new(builder.finish()) as ArrayRef) +} + +fn rpad(args: &[ArrayRef]) -> Result { + if args.len() <= 1 || args.len() > 3 { return exec_err!( - "rpad was called with {} arguments. It requires 2 or 3 arguments.", + "rpad was called with {} arguments. It requires at least 2 and at most 3.", args.len() ); } let length_array = as_int64_array(&args[1])?; - match ( - args.len(), - args[0].data_type(), - args.get(2).map(|arg| arg.data_type()), - ) { - (2, Utf8View, _) => { - rpad_impl::<&StringViewArray, &StringViewArray, StringArrayLen>( - &args[0].as_string_view(), - length_array, - None, - ) - } - (3, Utf8View, Some(Utf8View)) => { - rpad_impl::<&StringViewArray, &StringViewArray, StringArrayLen>( - &args[0].as_string_view(), - length_array, - Some(args[2].as_string_view()), - ) - } - (3, Utf8View, Some(Utf8 | LargeUtf8)) => { - rpad_impl::<&StringViewArray, &GenericStringArray, StringArrayLen>( - &args[0].as_string_view(), - length_array, - Some(args[2].as_string::()), - ) - } - (3, Utf8 | LargeUtf8, Some(Utf8View)) => rpad_impl::< - &GenericStringArray, - &StringViewArray, - StringArrayLen, - >( - &args[0].as_string::(), + + match (args.len(), args[0].data_type()) { + (2, Utf8View) => rpad_impl::<&StringViewArray, &GenericStringArray, T>( + &args[0].as_string_view(), + length_array, + None, + ), + (2, Utf8 | LargeUtf8) => rpad_impl::< + &GenericStringArray, + &GenericStringArray, + T, + >(&args[0].as_string::(), length_array, None), + (3, Utf8View) => rpad_with_replace::<&StringViewArray, T>( + &args[0].as_string_view(), length_array, - Some(args[2].as_string_view()), + &args[2], ), - (_, _, _) => rpad_impl::< - &GenericStringArray, - &GenericStringArray, - StringArrayLen, - >( - &args[0].as_string::(), + (3, Utf8 | LargeUtf8) => rpad_with_replace::<&GenericStringArray, T>( + &args[0].as_string::(), length_array, - args.get(2).map(|arg| arg.as_string::()), + &args[2], ), + (_, _) => unreachable!("rpad"), } } -/// Extends the string to length 'length' by appending the characters fill (a space by default). -/// If the string is already longer than length then it is truncated (on the right). -/// rpad('hi', 5, 'xy') = 'hixyx' -fn rpad_impl<'a, StringArrType, FillArrType, StringArrayLen>( - string_array: &StringArrType, +fn rpad_with_replace<'a, V, T: OffsetSizeTrait>( + string_array: &V, length_array: &Int64Array, - fill_array: Option, + fill_array: &'a ArrayRef, ) -> Result where - StringArrType: StringArrayType<'a>, - FillArrType: StringArrayType<'a>, - StringArrayLen: OffsetSizeTrait, + V: StringArrayType<'a>, { - let mut builder: GenericStringBuilder = GenericStringBuilder::new(); - let mut graphemes_buf = Vec::new(); - let mut fill_chars_buf = Vec::new(); - - match fill_array { - None => { - string_array.iter().zip(length_array.iter()).try_for_each( - |(string, length)| -> Result<(), DataFusionError> { - match (string, length) { - (Some(string), Some(length)) => { - if length > i32::MAX as i64 { - return exec_err!( - "rpad requested length {} too large", - length - ); - } - let length = if length < 0 { 0 } else { length as usize }; - if length == 0 { - builder.append_value(""); - } else if string.is_ascii() { - // ASCII fast path: byte length == character length - let str_len = string.len(); - if length < str_len { - builder.append_value(&string[..length]); - } else { - builder.write_str(string)?; - for _ in 0..(length - str_len) { - builder.write_str(" ")?; - } - builder.append_value(""); - } - } else { - // Reuse buffer by clearing and refilling - graphemes_buf.clear(); - graphemes_buf.extend(string.graphemes(true)); - - if length < graphemes_buf.len() { - builder - .append_value(graphemes_buf[..length].concat()); - } else { - builder.write_str(string)?; - for _ in 0..(length - graphemes_buf.len()) { - builder.write_str(" ")?; - } - builder.append_value(""); - } - } + match fill_array.data_type() { + Utf8View => rpad_impl::( + string_array, + length_array, + Some(fill_array.as_string_view()), + ), + LargeUtf8 => rpad_impl::, T>( + string_array, + length_array, + Some(fill_array.as_string::()), + ), + Utf8 => rpad_impl::, T>( + string_array, + length_array, + Some(fill_array.as_string::()), + ), + other => { + exec_err!("Unsupported data type {other:?} for function rpad") + } + } +} + +fn rpad_impl<'a, V, V2, T>( + string_array: &V, + length_array: &Int64Array, + fill_array: Option, +) -> Result +where + V: StringArrayType<'a>, + V2: StringArrayType<'a>, + T: OffsetSizeTrait, +{ + let array = if let Some(fill_array) = fill_array { + let mut builder: GenericStringBuilder = GenericStringBuilder::new(); + let mut graphemes_buf = Vec::new(); + let mut fill_chars_buf = Vec::new(); + + for ((string, target_len), fill) in string_array + .iter() + .zip(length_array.iter()) + .zip(fill_array.iter()) + { + if let (Some(string), Some(target_len), Some(fill)) = + (string, target_len, fill) + { + if target_len > i32::MAX as i64 { + return exec_err!("rpad requested length {target_len} too large"); + } + + let target_len = if target_len < 0 { + 0 + } else { + target_len as usize + }; + if target_len == 0 { + builder.append_value(""); + continue; + } + + if string.is_ascii() && fill.is_ascii() { + // ASCII fast path: byte length == character length, + // so we skip expensive grapheme segmentation. + let str_len = string.len(); + if target_len < str_len { + builder.append_value(&string[..target_len]); + } else if fill.is_empty() { + builder.append_value(string); + } else { + let pad_len = target_len - str_len; + let fill_len = fill.len(); + let full_reps = pad_len / fill_len; + let remainder = pad_len % fill_len; + builder.write_str(string)?; + for _ in 0..full_reps { + builder.write_str(fill)?; } - _ => builder.append_null(), + if remainder > 0 { + builder.write_str(&fill[..remainder])?; + } + builder.append_value(""); + } + } else { + graphemes_buf.clear(); + graphemes_buf.extend(string.graphemes(true)); + + fill_chars_buf.clear(); + fill_chars_buf.extend(fill.chars()); + + if target_len < graphemes_buf.len() { + builder.append_value(graphemes_buf[..target_len].concat()); + } else if fill_chars_buf.is_empty() { + builder.append_value(string); + } else { + builder.write_str(string)?; + for l in 0..target_len - graphemes_buf.len() { + let c = + *fill_chars_buf.get(l % fill_chars_buf.len()).unwrap(); + builder.write_char(c)?; + } + builder.append_value(""); } - Ok(()) - }, - )?; + } + } else { + builder.append_null(); + } } - Some(fill_array) => { - string_array - .iter() - .zip(length_array.iter()) - .zip(fill_array.iter()) - .try_for_each( - |((string, length), fill)| -> Result<(), DataFusionError> { - match (string, length, fill) { - (Some(string), Some(length), Some(fill)) => { - if length > i32::MAX as i64 { - return exec_err!( - "rpad requested length {} too large", - length - ); - } - let length = if length < 0 { 0 } else { length as usize }; - if string.is_ascii() && fill.is_ascii() { - // ASCII fast path: byte length == character length, - // so we skip expensive grapheme segmentation. - let str_len = string.len(); - if length < str_len { - builder.append_value(&string[..length]); - } else if fill.is_empty() { - builder.append_value(string); - } else { - let pad_len = length - str_len; - let fill_len = fill.len(); - let full_reps = pad_len / fill_len; - let remainder = pad_len % fill_len; - builder.write_str(string)?; - for _ in 0..full_reps { - builder.write_str(fill)?; - } - if remainder > 0 { - builder.write_str(&fill[..remainder])?; - } - builder.append_value(""); - } - } else { - // Reuse buffer by clearing and refilling - graphemes_buf.clear(); - graphemes_buf.extend(string.graphemes(true)); - - if length < graphemes_buf.len() { - builder.append_value( - graphemes_buf[..length].concat(), - ); - } else if fill.is_empty() { - builder.append_value(string); - } else { - builder.write_str(string)?; - // Reuse fill_chars_buf by clearing and refilling - fill_chars_buf.clear(); - fill_chars_buf.extend(fill.chars()); - for l in 0..length - graphemes_buf.len() { - let c = *fill_chars_buf - .get(l % fill_chars_buf.len()) - .unwrap(); - builder.write_char(c)?; - } - builder.append_value(""); - } - } - } - _ => builder.append_null(), + + builder.finish() + } else { + let mut builder: GenericStringBuilder = GenericStringBuilder::new(); + let mut graphemes_buf = Vec::new(); + + for (string, target_len) in string_array.iter().zip(length_array.iter()) { + if let (Some(string), Some(target_len)) = (string, target_len) { + if target_len > i32::MAX as i64 { + return exec_err!("rpad requested length {target_len} too large"); + } + + let target_len = if target_len < 0 { + 0 + } else { + target_len as usize + }; + if target_len == 0 { + builder.append_value(""); + continue; + } + + if string.is_ascii() { + // ASCII fast path: byte length == character length + let str_len = string.len(); + if target_len < str_len { + builder.append_value(&string[..target_len]); + } else { + builder.write_str(string)?; + for _ in 0..(target_len - str_len) { + builder.write_str(" ")?; + } + builder.append_value(""); + } + } else { + graphemes_buf.clear(); + graphemes_buf.extend(string.graphemes(true)); + + if target_len < graphemes_buf.len() { + builder.append_value(graphemes_buf[..target_len].concat()); + } else { + builder.write_str(string)?; + for _ in 0..(target_len - graphemes_buf.len()) { + builder.write_str(" ")?; } - Ok(()) - }, - )?; + builder.append_value(""); + } + } + } else { + builder.append_null(); + } } - } - Ok(Arc::new(builder.finish()) as ArrayRef) + builder.finish() + }; + + Ok(Arc::new(array) as ArrayRef) } #[cfg(test)] diff --git a/datafusion/functions/src/unicode/translate.rs b/datafusion/functions/src/unicode/translate.rs index e86eaf8111b1c..164c3f5adae66 100644 --- a/datafusion/functions/src/unicode/translate.rs +++ b/datafusion/functions/src/unicode/translate.rs @@ -163,13 +163,7 @@ impl ScalarUDFImpl for TranslateFunc { } } -/// If `cv` is a non-null scalar string, return its value. -fn try_as_scalar_str(cv: &ColumnarValue) -> Option<&str> { - match cv { - ColumnarValue::Scalar(s) => s.try_as_str().flatten(), - _ => None, - } -} +use super::common::try_as_scalar_str; fn invoke_translate(args: &[ArrayRef]) -> Result { match args[0].data_type() {