Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions datafusion/spark/src/function/map/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,17 +202,20 @@ fn map_deduplicate_keys(
cur_keys_offset + cur_entry_idx,
)?
.compacted();
// Enforce Spark's default `spark.sql.mapKeyDedupPolicy=EXCEPTION`.
// Native LAST_WIN support is deferred to a follow-up.
if seen_keys.contains(&key) {
// TODO: implement configuration and logic for spark.sql.mapKeyDedupPolicy=EXCEPTION (this is default spark-config)
// exec_err!("invalid argument: duplicate keys in map")
// https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961
} else {
// This code implements deduplication logic for spark.sql.mapKeyDedupPolicy=LAST_WIN (this is NOT default spark-config)
keys_mask_one[cur_entry_idx] = true;
values_mask_one[cur_entry_idx] = true;
seen_keys.insert(key);
new_last_offset += 1;
return exec_err!(
"[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \
please check the input data. If you want to remove the \
duplicated keys, you can set spark.sql.mapKeyDedupPolicy \
to LAST_WIN so that the key inserted at last takes precedence."
);
}
keys_mask_one[cur_entry_idx] = true;
values_mask_one[cur_entry_idx] = true;
seen_keys.insert(key);
new_last_offset += 1;
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ SELECT
----
{outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20, inner_z: 30}}

# Test with duplicate keys
query ?
# Test with duplicate keys: raises DUPLICATED_MAP_KEY under Spark's default policy
query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key true was found
SELECT map_from_arrays(array(true, false, true), array('a', NULL, 'b'));
----
{false: NULL, true: b}

# Tests with different list types
query ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,12 @@ SELECT
----
{outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20, inner_z: 30}}

# Test with duplicate keys
query ?
# Test with duplicate keys: raises DUPLICATED_MAP_KEY under Spark's default policy
query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key true was found
SELECT map_from_entries(array(
struct(true, 'a'),
struct(false, 'b'),
struct(true, 'a'),
struct(false, 'b'),
struct(true, 'c'),
struct(false, cast(NULL as string)),
struct(false, cast(NULL as string)),
struct(true, 'd')
));
----
{false: NULL, true: d}
Loading