diff --git a/datafusion/spark/src/function/map/utils.rs b/datafusion/spark/src/function/map/utils.rs index f5fff0c4b4c46..0cf5b45ffd0b7 100644 --- a/datafusion/spark/src/function/map/utils.rs +++ b/datafusion/spark/src/function/map/utils.rs @@ -202,17 +202,20 @@ fn map_deduplicate_keys( cur_keys_offset + cur_entry_idx, )? .compacted(); + // Enforce Spark's default `spark.sql.mapKeyDedupPolicy=EXCEPTION`. + // Native LAST_WIN support is deferred to a follow-up. if seen_keys.contains(&key) { - // TODO: implement configuration and logic for spark.sql.mapKeyDedupPolicy=EXCEPTION (this is default spark-config) - // exec_err!("invalid argument: duplicate keys in map") - // https://github.com/apache/spark/blob/cf3a34e19dfcf70e2d679217ff1ba21302212472/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L4961 - } else { - // This code implements deduplication logic for spark.sql.mapKeyDedupPolicy=LAST_WIN (this is NOT default spark-config) - keys_mask_one[cur_entry_idx] = true; - values_mask_one[cur_entry_idx] = true; - seen_keys.insert(key); - new_last_offset += 1; + return exec_err!( + "[DUPLICATED_MAP_KEY] Duplicate map key {key} was found, \ + please check the input data. If you want to remove the \ + duplicated keys, you can set spark.sql.mapKeyDedupPolicy \ + to LAST_WIN so that the key inserted at last takes precedence." + ); } + keys_mask_one[cur_entry_idx] = true; + values_mask_one[cur_entry_idx] = true; + seen_keys.insert(key); + new_last_offset += 1; } } } else { diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt index a26b0435c9291..c961f183befd9 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_arrays.slt @@ -118,11 +118,9 @@ SELECT ---- {outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20, inner_z: 30}} -# Test with duplicate keys -query ? +# Test with duplicate keys: raises DUPLICATED_MAP_KEY under Spark's default policy +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key true was found SELECT map_from_arrays(array(true, false, true), array('a', NULL, 'b')); ----- -{false: NULL, true: b} # Tests with different list types query ? diff --git a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt index 19b46886a027e..7aec6b3264f4a 100644 --- a/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt +++ b/datafusion/sqllogictest/test_files/spark/map/map_from_entries.slt @@ -151,14 +151,12 @@ SELECT ---- {outer_key1: {inner_a: 1, inner_b: 2}, outer_key2: {inner_x: 10, inner_y: 20, inner_z: 30}} -# Test with duplicate keys -query ? +# Test with duplicate keys: raises DUPLICATED_MAP_KEY under Spark's default policy +query error DataFusion error: Execution error: \[DUPLICATED_MAP_KEY\] Duplicate map key true was found SELECT map_from_entries(array( - struct(true, 'a'), - struct(false, 'b'), + struct(true, 'a'), + struct(false, 'b'), struct(true, 'c'), - struct(false, cast(NULL as string)), + struct(false, cast(NULL as string)), struct(true, 'd') )); ----- -{false: NULL, true: d}