Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 66 additions & 8 deletions datafusion/functions-nested/src/array_has.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ fn array_has_dispatch_for_scalar(
haystack: ArrayWrapper<'_>,
needle: &dyn Datum,
) -> Result<ArrayRef> {
let values = haystack.values();
let is_nested = values.data_type().is_nested();
// If first argument is empty list (second argument is non-null), return false
// i.e. array_has([], non-null element) -> false
if haystack.len() == 0 {
Expand All @@ -362,7 +360,17 @@ fn array_has_dispatch_for_scalar(
None,
)));
}
let eq_array = compare_with_eq(values, needle, is_nested)?;

// For sliced ListArrays, values() returns the full underlying array but
// only elements between the first and last offset are visible.
let offsets: Vec<usize> = haystack.offsets().collect();
let first_offset = offsets[0];
let visible_values = haystack
.values()
.slice(first_offset, offsets[offsets.len() - 1] - first_offset);

let is_nested = visible_values.data_type().is_nested();
let eq_array = compare_with_eq(&visible_values, needle, is_nested)?;

// When a haystack element is null, `eq()` returns null (not false).
// In Arrow, a null BooleanArray entry has validity=0 but an
Expand All @@ -382,10 +390,14 @@ fn array_has_dispatch_for_scalar(
ArrayWrapper::LargeList(arr) => arr.nulls(),
};
let mut matches = eq_bits.set_indices().peekable();
let mut values = BooleanBufferBuilder::new(haystack.len());
values.append_n(haystack.len(), false);
let mut result = BooleanBufferBuilder::new(haystack.len());
result.append_n(haystack.len(), false);

// Match positions are relative to visible_values (0-based), so
// subtract first_offset from each offset when comparing.
for (i, window) in offsets.windows(2).enumerate() {
let end = window[1] - first_offset;

for (i, (_start, end)) in haystack.offsets().tuple_windows().enumerate() {
let has_match = matches.peek().is_some_and(|&p| p < end);

// Advance past all match positions in this row's range.
Expand All @@ -394,14 +406,14 @@ fn array_has_dispatch_for_scalar(
}

if has_match && validity.is_none_or(|v| v.is_valid(i)) {
values.set_bit(i, true);
result.set_bit(i, true);
}
}

// A null haystack row always produces a null output, so we can
// reuse the haystack's null buffer directly.
Ok(Arc::new(BooleanArray::new(
values.finish(),
result.finish(),
validity.cloned(),
)))
}
Expand Down Expand Up @@ -1066,6 +1078,52 @@ mod tests {
Ok(())
}

#[test]
fn test_array_has_sliced_list() -> Result<(), DataFusionError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double checked it is failing in main, this fix would be nice backporting to #19692

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I haven't done a backport before, but I'd guess we should wait for this to land in main, then I'll open a new PR to backport to the 53 branch?

// [[10, 20], [30, 40], [50, 60], [70, 80]] → slice(1,2) → [[30, 40], [50, 60]]
let list = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(10), Some(20)]),
Some(vec![Some(30), Some(40)]),
Some(vec![Some(50), Some(60)]),
Some(vec![Some(70), Some(80)]),
]);
let sliced = list.slice(1, 2);
let haystack_field =
Arc::new(Field::new("haystack", sliced.data_type().clone(), true));
let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
let return_field = Arc::new(Field::new("return", DataType::Boolean, true));

// Search for elements that exist only in sliced-away rows:
// 10 is in the prefix row, 70 is in the suffix row.
let invoke = |needle: i32| -> Result<ArrayRef, DataFusionError> {
ArrayHas::new()
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(sliced.clone())),
ColumnarValue::Scalar(ScalarValue::Int32(Some(needle))),
],
arg_fields: vec![
Arc::clone(&haystack_field),
Arc::clone(&needle_field),
],
number_rows: 2,
return_field: Arc::clone(&return_field),
config_options: Arc::new(ConfigOptions::default()),
})?
.into_array(2)
};

let output = invoke(10)?.as_boolean().clone();
assert!(!output.value(0));
assert!(!output.value(1));

let output = invoke(70)?.as_boolean().clone();
assert!(!output.value(0));
assert!(!output.value(1));

Ok(())
}

#[test]
fn test_array_has_list_null_haystack() -> Result<(), DataFusionError> {
let haystack_field = Arc::new(Field::new("haystack", DataType::Null, true));
Expand Down
81 changes: 74 additions & 7 deletions datafusion/functions-nested/src/position.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,26 +230,36 @@ fn array_position_scalar<O: OffsetSizeTrait>(
"array_position",
&[list_array.values(), element_array],
)?;
let element_datum = Scalar::new(Arc::clone(element_array));

let offsets = list_array.offsets();
let validity = list_array.nulls();

if list_array.len() == 0 {
return Ok(Arc::new(UInt64Array::new_null(0)));
}

let element_datum = Scalar::new(Arc::clone(element_array));
let validity = list_array.nulls();

// Only compare the visible portion of the values buffer, which avoids
// wasted work for sliced ListArrays.
let offsets = list_array.offsets();
let first_offset = offsets[0].as_usize();
let last_offset = offsets[list_array.len()].as_usize();
let visible_values = list_array
.values()
.slice(first_offset, last_offset - first_offset);

// `not_distinct` treats NULL=NULL as true, matching the semantics of
// `array_position`
let eq_array = arrow_ord::cmp::not_distinct(list_array.values(), &element_datum)?;
let eq_array = arrow_ord::cmp::not_distinct(&visible_values, &element_datum)?;
let eq_bits = eq_array.values();

let mut result: Vec<Option<u64>> = Vec::with_capacity(list_array.len());
let mut matches = eq_bits.set_indices().peekable();

// Match positions are relative to visible_values (0-based), so
// subtract first_offset from each offset when comparing.
for i in 0..list_array.len() {
let start = offsets[i].as_usize();
let end = offsets[i + 1].as_usize();
let start = offsets[i].as_usize() - first_offset;
let end = offsets[i + 1].as_usize() - first_offset;

if validity.is_some_and(|v| v.is_null(i)) {
// Null row -> null output; advance past matches in range
Expand Down Expand Up @@ -474,3 +484,60 @@ fn general_positions<OffsetSize: OffsetSizeTrait>(
ListArray::from_iter_primitive::<UInt64Type, _, _>(data),
))
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::AsArray;
use arrow::datatypes::Int32Type;
use datafusion_common::config::ConfigOptions;
use datafusion_expr::ScalarFunctionArgs;

#[test]
fn test_array_position_sliced_list() -> Result<()> {
// [[10, 20], [30, 40], [50, 60], [70, 80]] → slice(1,2) → [[30, 40], [50, 60]]
let list = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(10), Some(20)]),
Some(vec![Some(30), Some(40)]),
Some(vec![Some(50), Some(60)]),
Some(vec![Some(70), Some(80)]),
]);
let sliced = list.slice(1, 2);
let haystack_field =
Arc::new(Field::new("haystack", sliced.data_type().clone(), true));
let needle_field = Arc::new(Field::new("needle", DataType::Int32, true));
let return_field = Arc::new(Field::new("return", UInt64, true));

// Search for elements that exist only in sliced-away rows:
// 10 is in the prefix row, 70 is in the suffix row.
let invoke = |needle: i32| -> Result<ArrayRef> {
ArrayPosition::new()
.invoke_with_args(ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(sliced.clone())),
ColumnarValue::Scalar(ScalarValue::Int32(Some(needle))),
],
arg_fields: vec![
Arc::clone(&haystack_field),
Arc::clone(&needle_field),
],
number_rows: 2,
return_field: Arc::clone(&return_field),
config_options: Arc::new(ConfigOptions::default()),
})?
.into_array(2)
};

let output = invoke(10)?;
let output = output.as_primitive::<UInt64Type>();
assert!(output.is_null(0));
assert!(output.is_null(1));

let output = invoke(70)?;
let output = output.as_primitive::<UInt64Type>();
assert!(output.is_null(0));
assert!(output.is_null(1));

Ok(())
}
}