From cc7d2cc72499c659943547a60cf887c77bf996c2 Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Sat, 18 Apr 2026 13:21:34 +0530 Subject: [PATCH 1/2] added support for MapFromEntries --- datafusion/common/src/config.rs | 10 + .../spark/src/function/map/map_from_arrays.rs | 3 +- .../src/function/map/map_from_entries.rs | 242 ++++++++++++++++-- datafusion/spark/src/function/map/utils.rs | 56 +++- 4 files changed, 275 insertions(+), 36 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 85361ef5e17e1..b5021d6bd05f8 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -692,6 +692,16 @@ config_namespace! { /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false + /// Duplicate-key policy used by Spark-compatible map construction functions + /// (e.g. `map_from_entries`, `map_from_arrays`). + /// + /// The flag is experimental and relevant only for DataFusion Spark built-in functions. + /// It mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala). + /// + /// Accepted values (case-insensitive): `"EXCEPTION"` (default, matches Spark's + /// default) and `"LAST_WIN"`. Any other value falls back to `"EXCEPTION"`. + pub map_key_dedup_policy: String, default = "EXCEPTION".to_string() + /// How many bytes to buffer in the probe side of hash joins while the build side is /// concurrently being built. /// diff --git a/datafusion/spark/src/function/map/map_from_arrays.rs b/datafusion/spark/src/function/map/map_from_arrays.rs index 692e837d00f5e..b096e2faf5350 100644 --- a/datafusion/spark/src/function/map/map_from_arrays.rs +++ b/datafusion/spark/src/function/map/map_from_arrays.rs @@ -16,7 +16,7 @@ // under the License. use crate::function::map::utils::{ - get_element_type, get_list_offsets, get_list_values, + MapKeyDedupPolicy, get_element_type, get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, map_type_from_key_value_types, }; use arrow::array::{Array, ArrayRef, NullArray}; @@ -105,6 +105,7 @@ fn map_from_arrays_inner(args: &[ArrayRef]) -> Result { &get_list_offsets(values)?, keys.nulls(), values.nulls(), + MapKeyDedupPolicy::Exception, ) } diff --git a/datafusion/spark/src/function/map/map_from_entries.rs b/datafusion/spark/src/function/map/map_from_entries.rs index facf9f8c53473..698ca719006d1 100644 --- a/datafusion/spark/src/function/map/map_from_entries.rs +++ b/datafusion/spark/src/function/map/map_from_entries.rs @@ -18,11 +18,10 @@ use std::sync::Arc; use crate::function::map::utils::{ - get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, - map_type_from_key_value_types, + MapKeyDedupPolicy, get_list_offsets, get_list_values, + map_from_keys_values_offsets_nulls, map_type_from_key_value_types, }; -use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray}; -use arrow::buffer::NullBuffer; +use arrow::array::{Array, ArrayRef, StructArray}; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::utils::take_function_args; use datafusion_common::{Result, exec_err, internal_err}; @@ -101,11 +100,20 @@ impl ScalarUDFImpl for MapFromEntries { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - make_scalar_function(map_from_entries_inner, vec![])(&args.args) + let dedup_policy = MapKeyDedupPolicy::from_config_str( + &args.config_options.execution.map_key_dedup_policy, + ); + make_scalar_function( + move |arrays: &[ArrayRef]| map_from_entries_inner(arrays, dedup_policy), + vec![], + )(&args.args) } } -fn map_from_entries_inner(args: &[ArrayRef]) -> Result { +fn map_from_entries_inner( + args: &[ArrayRef], + dedup_policy: MapKeyDedupPolicy, +) -> Result { let [entries] = take_function_args("map_from_entries", args)?; let entries_offsets = get_list_offsets(entries)?; let entries_values = get_list_values(entries)?; @@ -119,27 +127,36 @@ fn map_from_entries_inner(args: &[ArrayRef]) -> Result { ), }?; - let entries_with_nulls = entries_values.nulls().and_then(|entries_inner_nulls| { - let mut builder = NullBufferBuilder::new_with_len(0); - let mut cur_offset = entries_offsets + // Spark throws on: + // * a null struct entry inside a non-null list row — Spark error class `NULL_MAP_KEY` + // (see `QueryExecutionErrors.nullAsMapKeyNotAllowedError`) + // * a null key inside a non-null struct entry — Spark error class `NULL_MAP_KEY` + // A null outer list row is valid and propagates to a null output row. + let outer_nulls = entries.nulls(); + let struct_nulls = entries_values.nulls(); + let key_nulls = flat_keys.nulls(); + + if struct_nulls.is_some() || key_nulls.is_some() { + let start = entries_offsets .first() .map(|offset| *offset as usize) .unwrap_or(0); - - for next_offset in entries_offsets.iter().skip(1) { - let num_entries = *next_offset as usize - cur_offset; - builder.append( - entries_inner_nulls - .slice(cur_offset, num_entries) - .null_count() - == 0, - ); - cur_offset = *next_offset as usize; + let mut cur_offset = start; + for (row_idx, next_offset) in entries_offsets.iter().skip(1).enumerate() { + let next = *next_offset as usize; + let row_is_null = outer_nulls.is_some_and(|n| n.is_null(row_idx)); + if !row_is_null { + for i in cur_offset..next { + if struct_nulls.is_some_and(|n| n.is_null(i)) + || key_nulls.is_some_and(|n| n.is_null(i)) + { + return exec_err!("[NULL_MAP_KEY] Cannot use null as map key."); + } + } + } + cur_offset = next; } - builder.finish() - }); - - let res_nulls = NullBuffer::union(entries.nulls(), entries_with_nulls.as_ref()); + } map_from_keys_values_offsets_nulls( flat_keys, @@ -147,13 +164,19 @@ fn map_from_entries_inner(args: &[ArrayRef]) -> Result { &entries_offsets, &entries_offsets, None, - res_nulls.as_ref(), + outer_nulls, + dedup_policy, ) } #[cfg(test)] mod tests { use super::*; + use arrow::array::{ + Int32Array, Int32Builder, ListArray, MapArray, StringArray, StringBuilder, + StructArray, + }; + use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::Fields; fn make_entries_field(array_nullable: bool, element_nullable: bool) -> FieldRef { @@ -207,4 +230,175 @@ mod tests { assert!(result.is_nullable()); assert_eq!(result.data_type(), &expected_type); } + + fn struct_fields() -> Fields { + Fields::from(vec![ + Field::new("key", DataType::Int32, false), + Field::new("value", DataType::Utf8, true), + ]) + } + + type TestRow<'a> = Option)>>; + + /// Build a `List>` from per-row entries. + /// `rows` is a list of rows; `None` means the outer list row is null; a row is a vector + /// of `(key, value)` pairs where `key` is always present and `value` may be `None`. + fn build_list(rows: Vec) -> ArrayRef { + let fields = struct_fields(); + let mut key_builder = Int32Builder::new(); + let mut val_builder = StringBuilder::new(); + let mut offsets: Vec = vec![0]; + let mut nulls = vec![]; + let mut cur: i32 = 0; + for row in rows { + match row { + Some(entries) => { + for (k, v) in entries { + key_builder.append_value(k); + match v { + Some(s) => val_builder.append_value(s), + None => val_builder.append_null(), + } + cur += 1; + } + nulls.push(true); + } + None => nulls.push(false), + } + offsets.push(cur); + } + let keys: ArrayRef = Arc::new(key_builder.finish()); + let values: ArrayRef = Arc::new(val_builder.finish()); + let entries = StructArray::try_new(fields.clone(), vec![keys, values], None) + .expect("struct array"); + let list_field = Arc::new(Field::new("item", DataType::Struct(fields), false)); + let list = ListArray::try_new( + list_field, + OffsetBuffer::new(offsets.into()), + Arc::new(entries), + Some(NullBuffer::from(nulls)), + ) + .expect("list array"); + Arc::new(list) + } + + #[test] + fn test_map_from_entries_happy_path() { + let input = build_list(vec![ + Some(vec![(1, Some("a")), (2, Some("b"))]), + Some(vec![]), + None, + ]); + let out = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception).unwrap(); + let map = out.as_any().downcast_ref::().unwrap(); + assert_eq!(map.len(), 3); + assert!(!map.is_null(0)); + assert!(!map.is_null(1)); + assert!(map.is_null(2)); + let row0 = map.value(0); + let row0 = row0.as_any().downcast_ref::().unwrap(); + let keys = row0 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let values = row0 + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(keys.values(), &[1, 2]); + assert_eq!(values.value(0), "a"); + assert_eq!(values.value(1), "b"); + } + + #[test] + fn test_map_from_entries_duplicate_keys_exception() { + let input = build_list(vec![Some(vec![(1, Some("a")), (1, Some("b"))])]); + let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) + .expect_err("should error on duplicate key under Exception policy"); + assert!( + err.to_string().contains("[DUPLICATED_MAP_KEY]"), + "unexpected error: {err}" + ); + } + + #[test] + fn test_map_from_entries_duplicate_keys_last_win() { + let input = build_list(vec![Some(vec![(1, Some("a")), (1, Some("b"))])]); + let out = map_from_entries_inner(&[input], MapKeyDedupPolicy::LastWin).unwrap(); + let map = out.as_any().downcast_ref::().unwrap(); + assert_eq!(map.len(), 1); + let row0 = map.value(0); + let row0 = row0.as_any().downcast_ref::().unwrap(); + let keys = row0 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let values = row0 + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(keys.len(), 1); + assert_eq!(keys.value(0), 1); + assert_eq!(values.value(0), "b"); + } + + #[test] + fn test_map_from_entries_null_struct_entry_throws() { + // Build List where the struct element has a null at position 1 + // inside a non-null list row. + let fields = struct_fields(); + let keys: ArrayRef = Arc::new(Int32Array::from(vec![1, 0])); + let values: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("x")])); + let struct_nulls = NullBuffer::from(vec![true, false]); + let entries = + StructArray::try_new(fields.clone(), vec![keys, values], Some(struct_nulls)) + .unwrap(); + let list_field = Arc::new(Field::new("item", DataType::Struct(fields), true)); + let list = ListArray::try_new( + list_field, + OffsetBuffer::new(vec![0, 2].into()), + Arc::new(entries), + None, + ) + .unwrap(); + let input: ArrayRef = Arc::new(list); + let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) + .expect_err("should error on null struct entry"); + assert!( + err.to_string().contains("[NULL_MAP_KEY]"), + "unexpected error: {err}" + ); + } + + #[test] + fn test_map_from_entries_null_key_throws() { + // Build List where the struct itself is non-null but the key column has a null. + let fields = Fields::from(vec![ + Field::new("key", DataType::Int32, true), + Field::new("value", DataType::Utf8, true), + ]); + let keys: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None])); + let values: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("b")])); + let entries = + StructArray::try_new(fields.clone(), vec![keys, values], None).unwrap(); + let list_field = Arc::new(Field::new("item", DataType::Struct(fields), false)); + let list = ListArray::try_new( + list_field, + OffsetBuffer::new(vec![0, 2].into()), + Arc::new(entries), + None, + ) + .unwrap(); + let input: ArrayRef = Arc::new(list); + let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) + .expect_err("should error on null key"); + assert!( + err.to_string().contains("[NULL_MAP_KEY]"), + "unexpected error: {err}" + ); + } } diff --git a/datafusion/spark/src/function/map/utils.rs b/datafusion/spark/src/function/map/utils.rs index f5fff0c4b4c46..75954dbc35147 100644 --- a/datafusion/spark/src/function/map/utils.rs +++ b/datafusion/spark/src/function/map/utils.rs @@ -25,6 +25,31 @@ use arrow::compute::filter; use arrow::datatypes::{DataType, Field, Fields}; use datafusion_common::{Result, ScalarValue, exec_err}; +/// Policy for handling duplicate keys when constructing a Spark `MapType`. +/// +/// Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961). +/// Spark's default is [`Exception`](Self::Exception). +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +pub enum MapKeyDedupPolicy { + /// Raise a runtime error when a duplicate key is encountered (Spark default). + #[default] + Exception, + /// Keep the last occurrence of each key. + LastWin, +} + +impl MapKeyDedupPolicy { + /// Parse from a case-insensitive string. Unknown values fall back to + /// [`MapKeyDedupPolicy::Exception`] (Spark's default). + pub fn from_config_str(value: &str) -> Self { + if value.eq_ignore_ascii_case("LAST_WIN") { + Self::LastWin + } else { + Self::Exception + } + } +} + /// Helper function to get element [`DataType`] /// from [`List`](DataType::List)/[`LargeList`](DataType::LargeList)/[`FixedSizeList`](DataType::FixedSizeList)
/// [`Null`](DataType::Null) can be coerced to `ListType`([`Null`](DataType::Null)), so [`Null`](DataType::Null) is returned
@@ -111,13 +136,8 @@ pub fn map_type_from_key_value_types( /// So the inputs can be [`ListArray`](`arrow::array::ListArray`)/[`LargeListArray`](`arrow::array::LargeListArray`)/[`FixedSizeListArray`](`arrow::array::FixedSizeListArray`)
/// To preserve the row info, [`offsets`](arrow::array::ListArray::offsets) and [`nulls`](arrow::array::ListArray::nulls) for both keys and values need to be provided
/// [`FixedSizeListArray`](`arrow::array::FixedSizeListArray`) has no `offsets`, so they can be generated as a cumulative sum of it's `Size` -/// 2. Spark provides [spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961) -/// to handle duplicate keys
-/// For now, configurable functions are not supported by Datafusion
-/// So more permissive `LAST_WIN` option is used in this implementation (instead of `EXCEPTION`)
-/// `EXCEPTION` behaviour can still be achieved externally in cost of performance:
-/// `when(array_length(array_distinct(keys)) == array_length(keys), constructed_map)`
-/// `.otherwise(raise_error("duplicate keys occurred during map construction"))` +/// 2. Duplicate key handling follows [`MapKeyDedupPolicy`], mirroring Spark's +/// [spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961). pub fn map_from_keys_values_offsets_nulls( flat_keys: &ArrayRef, flat_values: &ArrayRef, @@ -125,6 +145,7 @@ pub fn map_from_keys_values_offsets_nulls( values_offsets: &[i32], keys_nulls: Option<&NullBuffer>, values_nulls: Option<&NullBuffer>, + dedup_policy: MapKeyDedupPolicy, ) -> Result { let (keys, values, offsets) = map_deduplicate_keys( flat_keys, @@ -133,6 +154,7 @@ pub fn map_from_keys_values_offsets_nulls( values_offsets, keys_nulls, values_nulls, + dedup_policy, )?; let nulls = NullBuffer::union(keys_nulls, values_nulls); @@ -155,6 +177,7 @@ fn map_deduplicate_keys( values_offsets: &[i32], keys_nulls: Option<&NullBuffer>, values_nulls: Option<&NullBuffer>, + dedup_policy: MapKeyDedupPolicy, ) -> Result<(ArrayRef, ArrayRef, OffsetBuffer)> { let offsets_len = keys_offsets.len(); let mut new_offsets = Vec::with_capacity(offsets_len); @@ -203,11 +226,22 @@ fn map_deduplicate_keys( )? .compacted(); if seen_keys.contains(&key) { - // TODO: implement configuration and logic for spark.sql.mapKeyDedupPolicy=EXCEPTION (this is default spark-config) - // exec_err!("invalid argument: duplicate keys in map") - // https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961 + match dedup_policy { + MapKeyDedupPolicy::Exception => { + // Message matches Spark's `duplicateMapKeyFoundError` + // (org.apache.spark.sql.errors.QueryExecutionErrors). + return exec_err!( + "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ + please check the input data. If you want to remove the \ + duplicated keys, you can set spark.sql.mapKeyDedupPolicy \ + to LAST_WIN so that the key inserted at last takes precedence." + ); + } + MapKeyDedupPolicy::LastWin => { + // Earlier occurrence is dropped; the later (already-kept) entry wins. + } + } } else { - // This code implements deduplication logic for spark.sql.mapKeyDedupPolicy=LAST_WIN (this is NOT default spark-config) keys_mask_one[cur_entry_idx] = true; values_mask_one[cur_entry_idx] = true; seen_keys.insert(key); From ba3c933d0851fadb34108da74c06100ef997fc6d Mon Sep 17 00:00:00 2001 From: Sudarshan Date: Sat, 18 Apr 2026 22:04:06 +0530 Subject: [PATCH 2/2] removed MapKeyDedupPolicy --- datafusion/common/src/config.rs | 10 - .../spark/src/function/map/map_from_arrays.rs | 3 +- .../src/function/map/map_from_entries.rs | 242 ++---------------- datafusion/spark/src/function/map/utils.rs | 69 ++--- .../test_files/spark/map/map_from_arrays.slt | 6 +- .../test_files/spark/map/map_from_entries.slt | 12 +- 6 files changed, 51 insertions(+), 291 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b5021d6bd05f8..85361ef5e17e1 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -692,16 +692,6 @@ config_namespace! { /// `false` — ANSI SQL mode is disabled by default. pub enable_ansi_mode: bool, default = false - /// Duplicate-key policy used by Spark-compatible map construction functions - /// (e.g. `map_from_entries`, `map_from_arrays`). - /// - /// The flag is experimental and relevant only for DataFusion Spark built-in functions. - /// It mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala). - /// - /// Accepted values (case-insensitive): `"EXCEPTION"` (default, matches Spark's - /// default) and `"LAST_WIN"`. Any other value falls back to `"EXCEPTION"`. - pub map_key_dedup_policy: String, default = "EXCEPTION".to_string() - /// How many bytes to buffer in the probe side of hash joins while the build side is /// concurrently being built. /// diff --git a/datafusion/spark/src/function/map/map_from_arrays.rs b/datafusion/spark/src/function/map/map_from_arrays.rs index b096e2faf5350..692e837d00f5e 100644 --- a/datafusion/spark/src/function/map/map_from_arrays.rs +++ b/datafusion/spark/src/function/map/map_from_arrays.rs @@ -16,7 +16,7 @@ // under the License. use crate::function::map::utils::{ - MapKeyDedupPolicy, get_element_type, get_list_offsets, get_list_values, + get_element_type, get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, map_type_from_key_value_types, }; use arrow::array::{Array, ArrayRef, NullArray}; @@ -105,7 +105,6 @@ fn map_from_arrays_inner(args: &[ArrayRef]) -> Result { &get_list_offsets(values)?, keys.nulls(), values.nulls(), - MapKeyDedupPolicy::Exception, ) } diff --git a/datafusion/spark/src/function/map/map_from_entries.rs b/datafusion/spark/src/function/map/map_from_entries.rs index 698ca719006d1..facf9f8c53473 100644 --- a/datafusion/spark/src/function/map/map_from_entries.rs +++ b/datafusion/spark/src/function/map/map_from_entries.rs @@ -18,10 +18,11 @@ use std::sync::Arc; use crate::function::map::utils::{ - MapKeyDedupPolicy, get_list_offsets, get_list_values, - map_from_keys_values_offsets_nulls, map_type_from_key_value_types, + get_list_offsets, get_list_values, map_from_keys_values_offsets_nulls, + map_type_from_key_value_types, }; -use arrow::array::{Array, ArrayRef, StructArray}; +use arrow::array::{Array, ArrayRef, NullBufferBuilder, StructArray}; +use arrow::buffer::NullBuffer; use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::utils::take_function_args; use datafusion_common::{Result, exec_err, internal_err}; @@ -100,20 +101,11 @@ impl ScalarUDFImpl for MapFromEntries { } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { - let dedup_policy = MapKeyDedupPolicy::from_config_str( - &args.config_options.execution.map_key_dedup_policy, - ); - make_scalar_function( - move |arrays: &[ArrayRef]| map_from_entries_inner(arrays, dedup_policy), - vec![], - )(&args.args) + make_scalar_function(map_from_entries_inner, vec![])(&args.args) } } -fn map_from_entries_inner( - args: &[ArrayRef], - dedup_policy: MapKeyDedupPolicy, -) -> Result { +fn map_from_entries_inner(args: &[ArrayRef]) -> Result { let [entries] = take_function_args("map_from_entries", args)?; let entries_offsets = get_list_offsets(entries)?; let entries_values = get_list_values(entries)?; @@ -127,36 +119,27 @@ fn map_from_entries_inner( ), }?; - // Spark throws on: - // * a null struct entry inside a non-null list row — Spark error class `NULL_MAP_KEY` - // (see `QueryExecutionErrors.nullAsMapKeyNotAllowedError`) - // * a null key inside a non-null struct entry — Spark error class `NULL_MAP_KEY` - // A null outer list row is valid and propagates to a null output row. - let outer_nulls = entries.nulls(); - let struct_nulls = entries_values.nulls(); - let key_nulls = flat_keys.nulls(); - - if struct_nulls.is_some() || key_nulls.is_some() { - let start = entries_offsets + let entries_with_nulls = entries_values.nulls().and_then(|entries_inner_nulls| { + let mut builder = NullBufferBuilder::new_with_len(0); + let mut cur_offset = entries_offsets .first() .map(|offset| *offset as usize) .unwrap_or(0); - let mut cur_offset = start; - for (row_idx, next_offset) in entries_offsets.iter().skip(1).enumerate() { - let next = *next_offset as usize; - let row_is_null = outer_nulls.is_some_and(|n| n.is_null(row_idx)); - if !row_is_null { - for i in cur_offset..next { - if struct_nulls.is_some_and(|n| n.is_null(i)) - || key_nulls.is_some_and(|n| n.is_null(i)) - { - return exec_err!("[NULL_MAP_KEY] Cannot use null as map key."); - } - } - } - cur_offset = next; + + for next_offset in entries_offsets.iter().skip(1) { + let num_entries = *next_offset as usize - cur_offset; + builder.append( + entries_inner_nulls + .slice(cur_offset, num_entries) + .null_count() + == 0, + ); + cur_offset = *next_offset as usize; } - } + builder.finish() + }); + + let res_nulls = NullBuffer::union(entries.nulls(), entries_with_nulls.as_ref()); map_from_keys_values_offsets_nulls( flat_keys, @@ -164,19 +147,13 @@ fn map_from_entries_inner( &entries_offsets, &entries_offsets, None, - outer_nulls, - dedup_policy, + res_nulls.as_ref(), ) } #[cfg(test)] mod tests { use super::*; - use arrow::array::{ - Int32Array, Int32Builder, ListArray, MapArray, StringArray, StringBuilder, - StructArray, - }; - use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::Fields; fn make_entries_field(array_nullable: bool, element_nullable: bool) -> FieldRef { @@ -230,175 +207,4 @@ mod tests { assert!(result.is_nullable()); assert_eq!(result.data_type(), &expected_type); } - - fn struct_fields() -> Fields { - Fields::from(vec![ - Field::new("key", DataType::Int32, false), - Field::new("value", DataType::Utf8, true), - ]) - } - - type TestRow<'a> = Option)>>; - - /// Build a `List>` from per-row entries. - /// `rows` is a list of rows; `None` means the outer list row is null; a row is a vector - /// of `(key, value)` pairs where `key` is always present and `value` may be `None`. - fn build_list(rows: Vec) -> ArrayRef { - let fields = struct_fields(); - let mut key_builder = Int32Builder::new(); - let mut val_builder = StringBuilder::new(); - let mut offsets: Vec = vec![0]; - let mut nulls = vec![]; - let mut cur: i32 = 0; - for row in rows { - match row { - Some(entries) => { - for (k, v) in entries { - key_builder.append_value(k); - match v { - Some(s) => val_builder.append_value(s), - None => val_builder.append_null(), - } - cur += 1; - } - nulls.push(true); - } - None => nulls.push(false), - } - offsets.push(cur); - } - let keys: ArrayRef = Arc::new(key_builder.finish()); - let values: ArrayRef = Arc::new(val_builder.finish()); - let entries = StructArray::try_new(fields.clone(), vec![keys, values], None) - .expect("struct array"); - let list_field = Arc::new(Field::new("item", DataType::Struct(fields), false)); - let list = ListArray::try_new( - list_field, - OffsetBuffer::new(offsets.into()), - Arc::new(entries), - Some(NullBuffer::from(nulls)), - ) - .expect("list array"); - Arc::new(list) - } - - #[test] - fn test_map_from_entries_happy_path() { - let input = build_list(vec![ - Some(vec![(1, Some("a")), (2, Some("b"))]), - Some(vec![]), - None, - ]); - let out = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception).unwrap(); - let map = out.as_any().downcast_ref::().unwrap(); - assert_eq!(map.len(), 3); - assert!(!map.is_null(0)); - assert!(!map.is_null(1)); - assert!(map.is_null(2)); - let row0 = map.value(0); - let row0 = row0.as_any().downcast_ref::().unwrap(); - let keys = row0 - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let values = row0 - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(keys.values(), &[1, 2]); - assert_eq!(values.value(0), "a"); - assert_eq!(values.value(1), "b"); - } - - #[test] - fn test_map_from_entries_duplicate_keys_exception() { - let input = build_list(vec![Some(vec![(1, Some("a")), (1, Some("b"))])]); - let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) - .expect_err("should error on duplicate key under Exception policy"); - assert!( - err.to_string().contains("[DUPLICATED_MAP_KEY]"), - "unexpected error: {err}" - ); - } - - #[test] - fn test_map_from_entries_duplicate_keys_last_win() { - let input = build_list(vec![Some(vec![(1, Some("a")), (1, Some("b"))])]); - let out = map_from_entries_inner(&[input], MapKeyDedupPolicy::LastWin).unwrap(); - let map = out.as_any().downcast_ref::().unwrap(); - assert_eq!(map.len(), 1); - let row0 = map.value(0); - let row0 = row0.as_any().downcast_ref::().unwrap(); - let keys = row0 - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - let values = row0 - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(keys.len(), 1); - assert_eq!(keys.value(0), 1); - assert_eq!(values.value(0), "b"); - } - - #[test] - fn test_map_from_entries_null_struct_entry_throws() { - // Build List where the struct element has a null at position 1 - // inside a non-null list row. - let fields = struct_fields(); - let keys: ArrayRef = Arc::new(Int32Array::from(vec![1, 0])); - let values: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("x")])); - let struct_nulls = NullBuffer::from(vec![true, false]); - let entries = - StructArray::try_new(fields.clone(), vec![keys, values], Some(struct_nulls)) - .unwrap(); - let list_field = Arc::new(Field::new("item", DataType::Struct(fields), true)); - let list = ListArray::try_new( - list_field, - OffsetBuffer::new(vec![0, 2].into()), - Arc::new(entries), - None, - ) - .unwrap(); - let input: ArrayRef = Arc::new(list); - let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) - .expect_err("should error on null struct entry"); - assert!( - err.to_string().contains("[NULL_MAP_KEY]"), - "unexpected error: {err}" - ); - } - - #[test] - fn test_map_from_entries_null_key_throws() { - // Build List where the struct itself is non-null but the key column has a null. - let fields = Fields::from(vec![ - Field::new("key", DataType::Int32, true), - Field::new("value", DataType::Utf8, true), - ]); - let keys: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None])); - let values: ArrayRef = Arc::new(StringArray::from(vec![Some("a"), Some("b")])); - let entries = - StructArray::try_new(fields.clone(), vec![keys, values], None).unwrap(); - let list_field = Arc::new(Field::new("item", DataType::Struct(fields), false)); - let list = ListArray::try_new( - list_field, - OffsetBuffer::new(vec![0, 2].into()), - Arc::new(entries), - None, - ) - .unwrap(); - let input: ArrayRef = Arc::new(list); - let err = map_from_entries_inner(&[input], MapKeyDedupPolicy::Exception) - .expect_err("should error on null key"); - assert!( - err.to_string().contains("[NULL_MAP_KEY]"), - "unexpected error: {err}" - ); - } } diff --git a/datafusion/spark/src/function/map/utils.rs b/datafusion/spark/src/function/map/utils.rs index 75954dbc35147..0cf5b45ffd0b7 100644 --- a/datafusion/spark/src/function/map/utils.rs +++ b/datafusion/spark/src/function/map/utils.rs @@ -25,31 +25,6 @@ use arrow::compute::filter; use arrow::datatypes::{DataType, Field, Fields}; use datafusion_common::{Result, ScalarValue, exec_err}; -/// Policy for handling duplicate keys when constructing a Spark `MapType`. -/// -/// Mirrors Spark's [`spark.sql.mapKeyDedupPolicy`](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961). -/// Spark's default is [`Exception`](Self::Exception). -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] -pub enum MapKeyDedupPolicy { - /// Raise a runtime error when a duplicate key is encountered (Spark default). - #[default] - Exception, - /// Keep the last occurrence of each key. - LastWin, -} - -impl MapKeyDedupPolicy { - /// Parse from a case-insensitive string. Unknown values fall back to - /// [`MapKeyDedupPolicy::Exception`] (Spark's default). - pub fn from_config_str(value: &str) -> Self { - if value.eq_ignore_ascii_case("LAST_WIN") { - Self::LastWin - } else { - Self::Exception - } - } -} - /// Helper function to get element [`DataType`] /// from [`List`](DataType::List)/[`LargeList`](DataType::LargeList)/[`FixedSizeList`](DataType::FixedSizeList)
/// [`Null`](DataType::Null) can be coerced to `ListType`([`Null`](DataType::Null)), so [`Null`](DataType::Null) is returned
@@ -136,8 +111,13 @@ pub fn map_type_from_key_value_types( /// So the inputs can be [`ListArray`](`arrow::array::ListArray`)/[`LargeListArray`](`arrow::array::LargeListArray`)/[`FixedSizeListArray`](`arrow::array::FixedSizeListArray`)
/// To preserve the row info, [`offsets`](arrow::array::ListArray::offsets) and [`nulls`](arrow::array::ListArray::nulls) for both keys and values need to be provided
/// [`FixedSizeListArray`](`arrow::array::FixedSizeListArray`) has no `offsets`, so they can be generated as a cumulative sum of it's `Size` -/// 2. Duplicate key handling follows [`MapKeyDedupPolicy`], mirroring Spark's -/// [spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961). +/// 2. Spark provides [spark.sql.mapKeyDedupPolicy](https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961) +/// to handle duplicate keys
+/// For now, configurable functions are not supported by Datafusion
+/// So more permissive `LAST_WIN` option is used in this implementation (instead of `EXCEPTION`)
+/// `EXCEPTION` behaviour can still be achieved externally in cost of performance:
+/// `when(array_length(array_distinct(keys)) == array_length(keys), constructed_map)`
+/// `.otherwise(raise_error("duplicate keys occurred during map construction"))` pub fn map_from_keys_values_offsets_nulls( flat_keys: &ArrayRef, flat_values: &ArrayRef, @@ -145,7 +125,6 @@ pub fn map_from_keys_values_offsets_nulls( values_offsets: &[i32], keys_nulls: Option<&NullBuffer>, values_nulls: Option<&NullBuffer>, - dedup_policy: MapKeyDedupPolicy, ) -> Result { let (keys, values, offsets) = map_deduplicate_keys( flat_keys, @@ -154,7 +133,6 @@ pub fn map_from_keys_values_offsets_nulls( values_offsets, keys_nulls, values_nulls, - dedup_policy, )?; let nulls = NullBuffer::union(keys_nulls, values_nulls); @@ -177,7 +155,6 @@ fn map_deduplicate_keys( values_offsets: &[i32], keys_nulls: Option<&NullBuffer>, values_nulls: Option<&NullBuffer>, - dedup_policy: MapKeyDedupPolicy, ) -> Result<(ArrayRef, ArrayRef, OffsetBuffer)> { let offsets_len = keys_offsets.len(); let mut new_offsets = Vec::with_capacity(offsets_len); @@ -225,28 +202,20 @@ fn map_deduplicate_keys( cur_keys_offset + cur_entry_idx, )? .compacted(); + // Enforce Spark's default `spark.sql.mapKeyDedupPolicy=EXCEPTION`. + // Native LAST_WIN support is deferred to a follow-up. if seen_keys.contains(&key) { - match dedup_policy { - MapKeyDedupPolicy::Exception => { - // Message matches Spark's `duplicateMapKeyFoundError` - // (org.apache.spark.sql.errors.QueryExecutionErrors). - return exec_err!( - "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ - please check the input data. If you want to remove the \ - duplicated keys, you can set spark.sql.mapKeyDedupPolicy \ - to LAST_WIN so that the key inserted at last takes precedence." - ); - } - MapKeyDedupPolicy::LastWin => { - // Earlier occurrence is dropped; the later (already-kept) entry wins. - } - } - } else { - keys_mask_one[cur_entry_idx] = true; - values_mask_one[cur_entry_idx] = true; - seen_keys.insert(key); - new_last_offset += 1; + return exec_err!( + "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ + please check the input data. If you want to remove the \ + duplicated keys, you can set spark.sql.mapKeyDedupPolicy \ + to LAST_WIN so that the key inserted at last takes precedence." + ); } + keys_mask_one[cur_entry_idx] = true; + values_mask_one[cur_entry_idx] = true; + seen_keys.insert(key); + new_last_offset += 1; } } } else { diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt index a26b0435c9291..c961f183befd9 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt @@ -118,11 +118,9 @@ SELECT ---- {outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20, inner_z: 30}} -# Test with duplicate keys -query ? +# Test with duplicate keys: raises DUPLICATED_MAP_KEY under Spark's default policy +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key true was found SELECT map_from_arrays(array(true, false, true), array('a', NULL, 'b')); ----- -{false: NULL, true: b} # Tests with different list types query ? diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt index 19b46886a027e..7aec6b3264f4a 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt @@ -151,14 +151,12 @@ SELECT ---- {outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20, inner_z: 30}} -# Test with duplicate keys -query ? +# Test with duplicate keys: raises DUPLICATED_MAP_KEY under Spark's default policy +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key true was found SELECT map_from_entries(array( - struct(true, 'a'), - struct(false, 'b'), + struct(true, 'a'), + struct(false, 'b'), struct(true, 'c'), - struct(false, cast(NULL as string)), + struct(false, cast(NULL as string)), struct(true, 'd') )); ----- -{false: NULL, true: d}