From bacee66965205206f33098d640b36983d0a95a53 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sun, 12 Apr 2026 10:51:01 -0700 Subject: [PATCH 01/14] add count distinct group benchmarks --- .../benches/count_distinct.rs | 115 +++++++++++++++++- 1 file changed, 113 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index e742a3e5c1267..a095e1871afcb 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -23,7 +23,7 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::function::AccumulatorArgs; -use datafusion_expr::{Accumulator, AggregateUDFImpl}; +use datafusion_expr::{Accumulator, AggregateUDFImpl, EmitTo}; use datafusion_functions_aggregate::count::Count; use datafusion_physical_expr::expressions::col; use rand::rngs::StdRng; @@ -87,6 +87,30 @@ fn create_i16_array(n_distinct: usize) -> Int16Array { .collect() } +fn prepare_args(data_type: DataType) -> (Arc, AccumulatorArgs<'static>) { + let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)])); + let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone())); + let expr = col("f", schema_leaked).unwrap(); + let expr_leaked: &'static _ = Box::leak(Box::new(expr)); + let return_field: Arc = Field::new("f", DataType::Int64, true).into(); + let return_field_leaked: &'static _ = Box::leak(Box::new(return_field.clone())); + let expr_field = expr_leaked.return_field(schema_leaked).unwrap(); + let expr_field_leaked: &'static _ = Box::leak(Box::new(expr_field)); + + let accumulator_args = AccumulatorArgs { + return_field: return_field_leaked.clone(), + schema: schema_leaked, + expr_fields: std::slice::from_ref(expr_field_leaked), + ignore_nulls: false, + order_bys: &[], + is_reversed: false, + name: "count(distinct f)", + is_distinct: true, + exprs: std::slice::from_ref(expr_leaked), + }; + (schema, accumulator_args) +} + fn count_distinct_benchmark(c: &mut Criterion) { for pct in [80, 99] { let n_distinct = BATCH_SIZE * pct / 100; @@ -150,5 +174,92 @@ fn count_distinct_benchmark(c: &mut Criterion) { }); } -criterion_group!(benches, count_distinct_benchmark); +/// Create group indices with uniform distribution +fn create_uniform_groups(num_groups: usize) -> Vec { + let mut rng = StdRng::seed_from_u64(42); + (0..BATCH_SIZE) + .map(|_| rng.random_range(0..num_groups)) + .collect() +} + +/// Create group indices with skewed distribution (80% in 20% of groups) +fn create_skewed_groups(num_groups: usize) -> Vec { + let mut rng = StdRng::seed_from_u64(42); + let hot_groups = (num_groups / 5).max(1); + (0..BATCH_SIZE) + .map(|_| { + if rng.random_range(0..100) < 80 { + rng.random_range(0..hot_groups) + } else { + rng.random_range(0..num_groups) + } + }) + .collect() +} + +fn count_distinct_groups_benchmark(c: &mut Criterion) { + let count_fn = Count::new(); + + // bench different scenarios + let scenarios = [ + // (name, num_groups, distinct_pct, group_fn) + ("sparse_uniform", 10, 80, "uniform"), + ("moderate_uniform", 100, 80, "uniform"), + ("dense_uniform", 1000, 80, "uniform"), + ("sparse_skewed", 10, 80, "skewed"), + ("dense_skewed", 1000, 80, "skewed"), + ("sparse_high_cardinality", 10, 99, "uniform"), + ("dense_low_cardinality", 1000, 20, "uniform"), + ]; + + for (name, num_groups, distinct_pct, group_type) in scenarios { + let n_distinct = BATCH_SIZE * distinct_pct / 100; + let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; + let group_indices = if group_type == "uniform" { + create_uniform_groups(num_groups) + } else { + create_skewed_groups(num_groups) + }; + + let (_schema, args) = prepare_args(DataType::Int64); + + if count_fn.groups_accumulator_supported(args.clone()) { + c.bench_function(&format!("count_distinct_groups {name}"), |b| { + b.iter(|| { + let (_schema, args) = prepare_args(DataType::Int64); + let mut acc = count_fn.create_groups_accumulator(args).unwrap(); + acc.update_batch(&[values.clone()], &group_indices, None, num_groups) + .unwrap(); + acc.evaluate(EmitTo::All).unwrap() + }) + }); + } else { + c.bench_function(&format!("count_distinct_groups {name}"), |b| { + b.iter(|| { + let mut accumulators: Vec<_> = (0..num_groups) + .map(|_| prepare_accumulator(DataType::Int64)) + .collect(); + + let arr = values.as_any().downcast_ref::().unwrap(); + for (idx, group_idx) in group_indices.iter().enumerate() { + if let Some(val) = arr.value(idx).into() { + let single_val = + Arc::new(Int64Array::from(vec![Some(val)])) as ArrayRef; + accumulators[*group_idx] + .update_batch(std::slice::from_ref(&single_val)) + .unwrap(); + } + } + + let _results: Vec<_> = accumulators + .iter_mut() + .map(|acc| acc.evaluate().unwrap()) + .collect(); + }) + }); + } + } +} + +criterion_group!(benches, count_distinct_benchmark, count_distinct_groups_benchmark); criterion_main!(benches); From c4461b7fc8954abac874093a87e3e3f875201aaf Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sun, 12 Apr 2026 11:27:58 -0700 Subject: [PATCH 02/14] add count distinct group benchmarks --- datafusion/functions-aggregate/benches/count_distinct.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index a095e1871afcb..38f184c710ea8 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -261,5 +261,9 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { } } -criterion_group!(benches, count_distinct_benchmark, count_distinct_groups_benchmark); +criterion_group!( + benches, + count_distinct_benchmark, + count_distinct_groups_benchmark +); criterion_main!(benches); From 45a19b0ad43fbbd24527414ff1b0e2efb7891a18 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sun, 12 Apr 2026 11:39:08 -0700 Subject: [PATCH 03/14] add count distinct group benchmarks --- datafusion/functions-aggregate/benches/count_distinct.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 38f184c710ea8..4e56cb6b82974 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -228,8 +228,13 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { b.iter(|| { let (_schema, args) = prepare_args(DataType::Int64); let mut acc = count_fn.create_groups_accumulator(args).unwrap(); - acc.update_batch(&[values.clone()], &group_indices, None, num_groups) - .unwrap(); + acc.update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + num_groups, + ) + .unwrap(); acc.evaluate(EmitTo::All).unwrap() }) }); From 659754f8da7a8b1d4f0648ef53a7c2ff1786f0a2 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Mon, 13 Apr 2026 19:07:03 -0700 Subject: [PATCH 04/14] count group benchmark check --- .../benches/count_distinct.rs | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 4e56cb6b82974..427acd46f99cb 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -224,6 +224,7 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { let (_schema, args) = prepare_args(DataType::Int64); if count_fn.groups_accumulator_supported(args.clone()) { +<<<<<<< Updated upstream c.bench_function(&format!("count_distinct_groups {name}"), |b| { b.iter(|| { let (_schema, args) = prepare_args(DataType::Int64); @@ -238,6 +239,25 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { acc.evaluate(EmitTo::All).unwrap() }) }); +======= + c.bench_function( + &format!("count_distinct_groups i64 {num_groups} groups"), + |b| { + b.iter(|| { + let mut acc = + count_fn.create_groups_accumulator(args.clone()).unwrap(); + acc.update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + num_groups, + ) + .unwrap(); + acc.evaluate(EmitTo::All).unwrap() + }) + }, + ); +>>>>>>> Stashed changes } else { c.bench_function(&format!("count_distinct_groups {name}"), |b| { b.iter(|| { From 5f2d9bb771f5b3279760c03d008b73d0458f84ec Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Mon, 13 Apr 2026 19:11:57 -0700 Subject: [PATCH 05/14] count group benchmark check --- .../benches/count_distinct.rs | 24 ++----------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 427acd46f99cb..d8d6e68158478 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -224,11 +224,10 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { let (_schema, args) = prepare_args(DataType::Int64); if count_fn.groups_accumulator_supported(args.clone()) { -<<<<<<< Updated upstream c.bench_function(&format!("count_distinct_groups {name}"), |b| { b.iter(|| { - let (_schema, args) = prepare_args(DataType::Int64); - let mut acc = count_fn.create_groups_accumulator(args).unwrap(); + let mut acc = + count_fn.create_groups_accumulator(args.clone()).unwrap(); acc.update_batch( std::slice::from_ref(&values), &group_indices, @@ -239,25 +238,6 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { acc.evaluate(EmitTo::All).unwrap() }) }); -======= - c.bench_function( - &format!("count_distinct_groups i64 {num_groups} groups"), - |b| { - b.iter(|| { - let mut acc = - count_fn.create_groups_accumulator(args.clone()).unwrap(); - acc.update_batch( - std::slice::from_ref(&values), - &group_indices, - None, - num_groups, - ) - .unwrap(); - acc.evaluate(EmitTo::All).unwrap() - }) - }, - ); ->>>>>>> Stashed changes } else { c.bench_function(&format!("count_distinct_groups {name}"), |b| { b.iter(|| { From a9cdec87cde0dcf215bc942f57b9121be62530fd Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 15 Apr 2026 23:27:15 -0700 Subject: [PATCH 06/14] groups_acc --- datafusion/functions-aggregate/benches/count_distinct.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index b2bb30db1de28..8639df2f7ec33 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use arrow::array::{ - ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array, + Array, ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array, }; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; @@ -258,7 +258,7 @@ fn count_distinct_groups_benchmark(c: &mut Criterion) { .collect(); for (group_idx, batch) in group_arrays.iter().enumerate() { - if batch.len() > 0 { + if !batch.is_empty() { accumulators[group_idx] .update_batch(std::slice::from_ref(batch)) .unwrap(); From f982d8de2cf6ee712e261378508a5658f3cb2150 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sun, 12 Apr 2026 10:51:01 -0700 Subject: [PATCH 07/14] add count distinct group benchmarks add count distinct group benchmarks add count distinct group benchmarks count group benchmark check count group benchmark check init implement_group_accumulators_count_distinct implement_group_accumulators_count_distinct implement_group_accumulators_count_distinct implement_group_accumulators_count_distinct implement_group_accumulators_count_distinct_use_hashtable implement_group_accumulators_count_distinct_use_hashtable add group benches Use same benchmark names for comparison count group benchmark check count group benchmark check --- .../src/aggregate/count_distinct.rs | 2 + .../src/aggregate/count_distinct/groups.rs | 182 ++++++++++++++++++ .../benches/count_distinct.rs | 140 +++++--------- datafusion/functions-aggregate/src/count.rs | 63 +++++- 4 files changed, 295 insertions(+), 92 deletions(-) create mode 100644 datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs index 25b40382299b4..2300447c7b976 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs @@ -17,10 +17,12 @@ mod bytes; mod dict; +mod groups; mod native; pub use bytes::BytesDistinctCountAccumulator; pub use bytes::BytesViewDistinctCountAccumulator; pub use dict::DictionaryCountAccumulator; +pub use groups::PrimitiveDistinctCountGroupsAccumulator; pub use native::FloatDistinctCountAccumulator; pub use native::PrimitiveDistinctCountAccumulator; diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs new file mode 100644 index 0000000000000..13a967dfb66b3 --- /dev/null +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + ArrayRef, AsArray, BooleanArray, Int64Array, ListArray, PrimitiveArray, +}; +use arrow::buffer::OffsetBuffer; +use arrow::datatypes::{ArrowPrimitiveType, Field}; +use datafusion_common::HashSet; +use datafusion_common::hash_utils::RandomState; +use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; +use std::hash::Hash; +use std::mem::size_of; +use std::sync::Arc; + +use crate::aggregate::groups_accumulator::accumulate::accumulate; + +pub struct PrimitiveDistinctCountGroupsAccumulator +where + T::Native: Eq + Hash, +{ + seen: HashSet<(usize, T::Native), RandomState>, + num_groups: usize, +} + +impl PrimitiveDistinctCountGroupsAccumulator +where + T::Native: Eq + Hash, +{ + pub fn new() -> Self { + Self { + seen: HashSet::default(), + num_groups: 0, + } + } +} + +impl Default for PrimitiveDistinctCountGroupsAccumulator +where + T::Native: Eq + Hash, +{ + fn default() -> Self { + Self::new() + } +} + +impl GroupsAccumulator + for PrimitiveDistinctCountGroupsAccumulator +where + T::Native: Eq + Hash, +{ + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> datafusion_common::Result<()> { + debug_assert_eq!(values.len(), 1); + self.num_groups = self.num_groups.max(total_num_groups); + let arr = values[0].as_primitive::(); + accumulate(group_indices, arr, opt_filter, |group_idx, value| { + self.seen.insert((group_idx, value)); + }); + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> datafusion_common::Result { + let num_emitted = match emit_to { + EmitTo::All => self.num_groups, + EmitTo::First(n) => n, + }; + + let mut counts = vec![0i64; num_emitted]; + + if matches!(emit_to, EmitTo::All) { + for &(group_idx, _) in self.seen.iter() { + counts[group_idx] += 1; + } + self.seen.clear(); + self.num_groups = 0; + } else { + let mut remaining = HashSet::default(); + for (group_idx, value) in self.seen.drain() { + if group_idx < num_emitted { + counts[group_idx] += 1; + } else { + remaining.insert((group_idx - num_emitted, value)); + } + } + self.seen = remaining; + self.num_groups = self.num_groups.saturating_sub(num_emitted); + } + + Ok(Arc::new(Int64Array::from(counts))) + } + + fn state(&mut self, emit_to: EmitTo) -> datafusion_common::Result> { + let num_emitted = match emit_to { + EmitTo::All => self.num_groups, + EmitTo::First(n) => n, + }; + + let mut group_values: Vec> = vec![Vec::new(); num_emitted]; + + if matches!(emit_to, EmitTo::All) { + for (group_idx, value) in self.seen.drain() { + group_values[group_idx].push(value); + } + self.num_groups = 0; + } else { + let mut remaining = HashSet::default(); + for (group_idx, value) in self.seen.drain() { + if group_idx < num_emitted { + group_values[group_idx].push(value); + } else { + remaining.insert((group_idx - num_emitted, value)); + } + } + self.seen = remaining; + self.num_groups = self.num_groups.saturating_sub(num_emitted); + } + + let mut offsets = vec![0i32]; + let mut all_values = Vec::new(); + for values in &group_values { + all_values.extend(values.iter().copied()); + offsets.push(all_values.len() as i32); + } + + let values_array = Arc::new(PrimitiveArray::::from_iter_values(all_values)); + let list_array = ListArray::new( + Arc::new(Field::new_list_field(T::DATA_TYPE, true)), + OffsetBuffer::new(offsets.into()), + values_array, + None, + ); + + Ok(vec![Arc::new(list_array)]) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + _opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> datafusion_common::Result<()> { + debug_assert_eq!(values.len(), 1); + self.num_groups = self.num_groups.max(total_num_groups); + let list_array = values[0].as_list::(); + + for (row_idx, group_idx) in group_indices.iter().enumerate() { + let inner = list_array.value(row_idx); + let inner_arr = inner.as_primitive::(); + for value in inner_arr.values().iter() { + self.seen.insert((*group_idx, *value)); + } + } + + Ok(()) + } + + fn size(&self) -> usize { + size_of::() + + self.seen.capacity() * (size_of::<(usize, T::Native)>() + size_of::()) + } +} diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 8639df2f7ec33..ff3556a7994b5 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use arrow::array::{ - Array, ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array, + ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array, }; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; @@ -87,6 +87,13 @@ fn create_i16_array(n_distinct: usize) -> Int16Array { .collect() } +fn create_group_indices(num_groups: usize) -> Vec { + let mut rng = StdRng::seed_from_u64(42); + (0..BATCH_SIZE) + .map(|_| rng.random_range(0..num_groups)) + .collect() +} + fn prepare_args(data_type: DataType) -> (Arc, AccumulatorArgs<'static>) { let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)])); let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone())); @@ -174,103 +181,62 @@ fn count_distinct_benchmark(c: &mut Criterion) { }); } -/// Create group indices with uniform distribution -fn create_uniform_groups(num_groups: usize) -> Vec { - let mut rng = StdRng::seed_from_u64(42); - (0..BATCH_SIZE) - .map(|_| rng.random_range(0..num_groups)) - .collect() -} - -/// Create group indices with skewed distribution (80% in 20% of groups) -fn create_skewed_groups(num_groups: usize) -> Vec { - let mut rng = StdRng::seed_from_u64(42); - let hot_groups = (num_groups / 5).max(1); - (0..BATCH_SIZE) - .map(|_| { - if rng.random_range(0..100) < 80 { - rng.random_range(0..hot_groups) - } else { - rng.random_range(0..num_groups) - } - }) - .collect() -} - fn count_distinct_groups_benchmark(c: &mut Criterion) { let count_fn = Count::new(); - // bench different scenarios - let scenarios = [ - // (name, num_groups, distinct_pct, group_fn) - ("sparse_uniform", 10, 80, "uniform"), - ("moderate_uniform", 100, 80, "uniform"), - ("dense_uniform", 1000, 80, "uniform"), - ("sparse_skewed", 10, 80, "skewed"), - ("dense_skewed", 1000, 80, "skewed"), - ("sparse_high_cardinality", 10, 99, "uniform"), - ("dense_low_cardinality", 1000, 20, "uniform"), - ]; - - for (name, num_groups, distinct_pct, group_type) in scenarios { - let n_distinct = BATCH_SIZE * distinct_pct / 100; + for num_groups in [10, 100, 1000] { + let n_distinct = BATCH_SIZE * 80 / 100; let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; - let group_indices = if group_type == "uniform" { - create_uniform_groups(num_groups) - } else { - create_skewed_groups(num_groups) - }; + let group_indices = create_group_indices(num_groups); let (_schema, args) = prepare_args(DataType::Int64); if count_fn.groups_accumulator_supported(args.clone()) { - c.bench_function(&format!("count_distinct_groups {name}"), |b| { - b.iter(|| { - let mut acc = - count_fn.create_groups_accumulator(args.clone()).unwrap(); - acc.update_batch( - std::slice::from_ref(&values), - &group_indices, - None, - num_groups, - ) - .unwrap(); - acc.evaluate(EmitTo::All).unwrap() - }) - }); + c.bench_function( + &format!("count_distinct_groups i64 {num_groups} groups"), + |b| { + b.iter(|| { + let (_schema, args) = prepare_args(DataType::Int64); + let mut acc = count_fn.create_groups_accumulator(args).unwrap(); + acc.update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + num_groups, + ) + .unwrap(); + acc.evaluate(EmitTo::All).unwrap() + }) + }, + ); } else { - let arr = values.as_any().downcast_ref::().unwrap(); - let mut group_rows: Vec> = vec![Vec::new(); num_groups]; - for (idx, &group_idx) in group_indices.iter().enumerate() { - if arr.is_valid(idx) { - group_rows[group_idx].push(arr.value(idx)); - } - } - let group_arrays: Vec = group_rows - .iter() - .map(|rows| Arc::new(Int64Array::from(rows.clone())) as ArrayRef) - .collect(); - - c.bench_function(&format!("count_distinct_groups {name}"), |b| { - b.iter(|| { - let mut accumulators: Vec<_> = (0..num_groups) - .map(|_| prepare_accumulator(DataType::Int64)) - .collect(); - - for (group_idx, batch) in group_arrays.iter().enumerate() { - if !batch.is_empty() { - accumulators[group_idx] - .update_batch(std::slice::from_ref(batch)) - .unwrap(); + c.bench_function( + &format!("count_distinct_groups i64 {num_groups} groups"), + |b| { + b.iter(|| { + let mut accumulators: Vec<_> = (0..num_groups) + .map(|_| prepare_accumulator(DataType::Int64)) + .collect(); + + let arr = values.as_any().downcast_ref::().unwrap(); + for (idx, group_idx) in group_indices.iter().enumerate() { + if let Some(val) = arr.value(idx).into() { + let single_val = + Arc::new(Int64Array::from(vec![Some(val)])) + as ArrayRef; + accumulators[*group_idx] + .update_batch(std::slice::from_ref(&single_val)) + .unwrap(); + } } - } - let _results: Vec<_> = accumulators - .iter_mut() - .map(|acc| acc.evaluate().unwrap()) - .collect(); - }) - }); + let _results: Vec<_> = accumulators + .iter_mut() + .map(|acc| acc.evaluate().unwrap()) + .collect(); + }) + }, + ); } } } diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 0edfc152ff1b4..f74c94bcf6c79 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -41,6 +41,7 @@ use datafusion_expr::{ function::{AccumulatorArgs, StateFieldsArgs}, utils::format_state_name, }; +use datafusion_functions_aggregate_common::aggregate::count_distinct::PrimitiveDistinctCountGroupsAccumulator; use datafusion_functions_aggregate_common::aggregate::{ count_distinct::BytesDistinctCountAccumulator, count_distinct::BytesViewDistinctCountAccumulator, @@ -336,18 +337,70 @@ impl AggregateUDFImpl for Count { } fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { - // groups accumulator only supports `COUNT(c1)`, not - // `COUNT(c1, c2)`, etc - if args.is_distinct { + if args.exprs.len() != 1 { return false; } - args.exprs.len() == 1 + if args.is_distinct { + // Only support primitive integer types for now + matches!( + args.expr_fields[0].data_type(), + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + ) + } else { + true + } } fn create_groups_accumulator( &self, - _args: AccumulatorArgs, + args: AccumulatorArgs, ) -> Result> { + if args.is_distinct { + let data_type = args.expr_fields[0].data_type(); + return match data_type { + DataType::Int8 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::Int16 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::Int32 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::Int64 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::UInt8 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::UInt16 => { + Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt16Type, + >::new())) + } + DataType::UInt32 => { + Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt32Type, + >::new())) + } + DataType::UInt64 => { + Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt64Type, + >::new())) + } + _ => not_impl_err!( + "GroupsAccumulator not supported for COUNT(DISTINCT) with {}", + data_type + ), + }; + } // instantiate specialized accumulator Ok(Box::new(CountGroupsAccumulator::new())) } From 929e081ae16016f3c3a7bd84525dc9631f297526 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 15 Apr 2026 19:03:44 -0700 Subject: [PATCH 08/14] hashtable_with_count_vector_approach --- .../src/aggregate/count_distinct/groups.rs | 58 +++++----- datafusion/functions-aggregate/src/count.rs | 108 +++++++++--------- 2 files changed, 80 insertions(+), 86 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs index 13a967dfb66b3..5f8c9b3c3cc49 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs @@ -34,7 +34,7 @@ where T::Native: Eq + Hash, { seen: HashSet<(usize, T::Native), RandomState>, - num_groups: usize, + counts: Vec, } impl PrimitiveDistinctCountGroupsAccumulator @@ -44,7 +44,7 @@ where pub fn new() -> Self { Self { seen: HashSet::default(), - num_groups: 0, + counts: Vec::new(), } } } @@ -71,39 +71,32 @@ where total_num_groups: usize, ) -> datafusion_common::Result<()> { debug_assert_eq!(values.len(), 1); - self.num_groups = self.num_groups.max(total_num_groups); + self.counts.resize(total_num_groups, 0); let arr = values[0].as_primitive::(); accumulate(group_indices, arr, opt_filter, |group_idx, value| { - self.seen.insert((group_idx, value)); + if self.seen.insert((group_idx, value)) { + self.counts[group_idx] += 1; + } }); Ok(()) } fn evaluate(&mut self, emit_to: EmitTo) -> datafusion_common::Result { - let num_emitted = match emit_to { - EmitTo::All => self.num_groups, - EmitTo::First(n) => n, - }; + let counts = emit_to.take_needed(&mut self.counts); - let mut counts = vec![0i64; num_emitted]; - - if matches!(emit_to, EmitTo::All) { - for &(group_idx, _) in self.seen.iter() { - counts[group_idx] += 1; + match emit_to { + EmitTo::All => { + self.seen.clear(); } - self.seen.clear(); - self.num_groups = 0; - } else { - let mut remaining = HashSet::default(); - for (group_idx, value) in self.seen.drain() { - if group_idx < num_emitted { - counts[group_idx] += 1; - } else { - remaining.insert((group_idx - num_emitted, value)); + EmitTo::First(n) => { + let mut remaining = HashSet::default(); + for (group_idx, value) in self.seen.drain() { + if group_idx >= n { + remaining.insert((group_idx - n, value)); + } } + self.seen = remaining; } - self.seen = remaining; - self.num_groups = self.num_groups.saturating_sub(num_emitted); } Ok(Arc::new(Int64Array::from(counts))) @@ -111,7 +104,7 @@ where fn state(&mut self, emit_to: EmitTo) -> datafusion_common::Result> { let num_emitted = match emit_to { - EmitTo::All => self.num_groups, + EmitTo::All => self.counts.len(), EmitTo::First(n) => n, }; @@ -121,7 +114,7 @@ where for (group_idx, value) in self.seen.drain() { group_values[group_idx].push(value); } - self.num_groups = 0; + self.counts.clear(); } else { let mut remaining = HashSet::default(); for (group_idx, value) in self.seen.drain() { @@ -132,7 +125,7 @@ where } } self.seen = remaining; - self.num_groups = self.num_groups.saturating_sub(num_emitted); + let _ = emit_to.take_needed(&mut self.counts); } let mut offsets = vec![0i32]; @@ -161,14 +154,16 @@ where total_num_groups: usize, ) -> datafusion_common::Result<()> { debug_assert_eq!(values.len(), 1); - self.num_groups = self.num_groups.max(total_num_groups); + self.counts.resize(total_num_groups, 0); let list_array = values[0].as_list::(); - for (row_idx, group_idx) in group_indices.iter().enumerate() { + for (row_idx, &group_idx) in group_indices.iter().enumerate() { let inner = list_array.value(row_idx); let inner_arr = inner.as_primitive::(); - for value in inner_arr.values().iter() { - self.seen.insert((*group_idx, *value)); + for &value in inner_arr.values().iter() { + if self.seen.insert((group_idx, value)) { + self.counts[group_idx] += 1; + } } } @@ -178,5 +173,6 @@ where fn size(&self) -> usize { size_of::() + self.seen.capacity() * (size_of::<(usize, T::Native)>() + size_of::()) + + self.counts.capacity() * size_of::() } } diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index f74c94bcf6c79..8a6bc425a4815 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -340,69 +340,30 @@ impl AggregateUDFImpl for Count { if args.exprs.len() != 1 { return false; } - if args.is_distinct { - // Only support primitive integer types for now - matches!( - args.expr_fields[0].data_type(), - DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - ) - } else { - true + if !args.is_distinct { + return true; } + matches!( + args.expr_fields[0].data_type(), + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + ) } fn create_groups_accumulator( &self, args: AccumulatorArgs, ) -> Result> { - if args.is_distinct { - let data_type = args.expr_fields[0].data_type(); - return match data_type { - DataType::Int8 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::Int16 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::Int32 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::Int64 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::UInt8 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::UInt16 => { - Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< - UInt16Type, - >::new())) - } - DataType::UInt32 => { - Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< - UInt32Type, - >::new())) - } - DataType::UInt64 => { - Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< - UInt64Type, - >::new())) - } - _ => not_impl_err!( - "GroupsAccumulator not supported for COUNT(DISTINCT) with {}", - data_type - ), - }; + if !args.is_distinct { + return Ok(Box::new(CountGroupsAccumulator::new())); } - // instantiate specialized accumulator - Ok(Box::new(CountGroupsAccumulator::new())) + create_distinct_count_groups_accumulator(args) } fn reverse_expr(&self) -> ReversedUDAF { @@ -475,6 +436,43 @@ impl AggregateUDFImpl for Count { } } +#[cold] +fn create_distinct_count_groups_accumulator( + args: AccumulatorArgs, +) -> Result> { + let data_type = args.expr_fields[0].data_type(); + match data_type { + DataType::Int8 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::Int16 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::Int32 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::Int64 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::UInt8 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::UInt16 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::UInt32 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + DataType::UInt64 => Ok(Box::new( + PrimitiveDistinctCountGroupsAccumulator::::new(), + )), + _ => not_impl_err!( + "GroupsAccumulator not supported for COUNT(DISTINCT) with {}", + data_type + ), + } +} + // DistinctCountAccumulator does not support retract_batch and sliding window // this is a specialized accumulator for distinct count that supports retract_batch // and sliding window. From 72cea6280e01ad08e3bf40e7238f435454e999f3 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 15 Apr 2026 21:47:41 -0700 Subject: [PATCH 09/14] hashtable_with_count_vector_approach --- datafusion/functions-aggregate/src/count.rs | 46 ++++++++++----------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 8a6bc425a4815..d098cc3b00c19 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -363,7 +363,7 @@ impl AggregateUDFImpl for Count { if !args.is_distinct { return Ok(Box::new(CountGroupsAccumulator::new())); } - create_distinct_count_groups_accumulator(args) + create_distinct_count_groups_accumulator(&args) } fn reverse_expr(&self) -> ReversedUDAF { @@ -438,34 +438,34 @@ impl AggregateUDFImpl for Count { #[cold] fn create_distinct_count_groups_accumulator( - args: AccumulatorArgs, + args: &AccumulatorArgs, ) -> Result> { let data_type = args.expr_fields[0].data_type(); match data_type { DataType::Int8 => Ok(Box::new( PrimitiveDistinctCountGroupsAccumulator::::new(), )), - DataType::Int16 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::Int32 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::Int64 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::UInt8 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::UInt16 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::UInt32 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), - DataType::UInt64 => Ok(Box::new( - PrimitiveDistinctCountGroupsAccumulator::::new(), - )), + DataType::Int16 => Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Int16Type, + >::new())), + DataType::Int32 => Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Int32Type, + >::new())), + DataType::Int64 => Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< + Int64Type, + >::new())), + DataType::UInt8 => Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt8Type, + >::new())), + DataType::UInt16 => Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt16Type, + >::new())), + DataType::UInt32 => Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt32Type, + >::new())), + DataType::UInt64 => Ok(Box::new(PrimitiveDistinctCountGroupsAccumulator::< + UInt64Type, + >::new())), _ => not_impl_err!( "GroupsAccumulator not supported for COUNT(DISTINCT) with {}", data_type From ba140a00af402b7f8e6ff3470d1f50b369766d69 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 16 Apr 2026 06:17:58 -0700 Subject: [PATCH 10/14] groups_acc_more_robust_checks --- .../benches/count_distinct.rs | 149 +++++++++++------- 1 file changed, 90 insertions(+), 59 deletions(-) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index ff3556a7994b5..b58cfaa9a965a 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use arrow::array::{ - ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array, + Array, ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array, }; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; @@ -87,13 +87,6 @@ fn create_i16_array(n_distinct: usize) -> Int16Array { .collect() } -fn create_group_indices(num_groups: usize) -> Vec { - let mut rng = StdRng::seed_from_u64(42); - (0..BATCH_SIZE) - .map(|_| rng.random_range(0..num_groups)) - .collect() -} - fn prepare_args(data_type: DataType) -> (Arc, AccumulatorArgs<'static>) { let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)])); let schema_leaked: &'static Schema = Box::leak(Box::new((*schema).clone())); @@ -181,62 +174,100 @@ fn count_distinct_benchmark(c: &mut Criterion) { }); } +/// Create group indices with uniform distribution +fn create_uniform_groups(num_groups: usize) -> Vec { + let mut rng = StdRng::seed_from_u64(42); + (0..BATCH_SIZE) + .map(|_| rng.random_range(0..num_groups)) + .collect() +} + +/// Create group indices with skewed distribution (80% in 20% of groups) +fn create_skewed_groups(num_groups: usize) -> Vec { + let mut rng = StdRng::seed_from_u64(42); + let hot_groups = (num_groups / 5).max(1); + (0..BATCH_SIZE) + .map(|_| { + if rng.random_range(0..100) < 80 { + rng.random_range(0..hot_groups) + } else { + rng.random_range(0..num_groups) + } + }) + .collect() +} + fn count_distinct_groups_benchmark(c: &mut Criterion) { let count_fn = Count::new(); - for num_groups in [10, 100, 1000] { - let n_distinct = BATCH_SIZE * 80 / 100; - let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; - let group_indices = create_group_indices(num_groups); - - let (_schema, args) = prepare_args(DataType::Int64); - - if count_fn.groups_accumulator_supported(args.clone()) { - c.bench_function( - &format!("count_distinct_groups i64 {num_groups} groups"), - |b| { - b.iter(|| { - let (_schema, args) = prepare_args(DataType::Int64); - let mut acc = count_fn.create_groups_accumulator(args).unwrap(); - acc.update_batch( - std::slice::from_ref(&values), - &group_indices, - None, - num_groups, - ) - .unwrap(); - acc.evaluate(EmitTo::All).unwrap() - }) - }, - ); - } else { - c.bench_function( - &format!("count_distinct_groups i64 {num_groups} groups"), - |b| { - b.iter(|| { - let mut accumulators: Vec<_> = (0..num_groups) - .map(|_| prepare_accumulator(DataType::Int64)) - .collect(); - - let arr = values.as_any().downcast_ref::().unwrap(); - for (idx, group_idx) in group_indices.iter().enumerate() { - if let Some(val) = arr.value(idx).into() { - let single_val = - Arc::new(Int64Array::from(vec![Some(val)])) - as ArrayRef; - accumulators[*group_idx] - .update_batch(std::slice::from_ref(&single_val)) - .unwrap(); - } + let group_counts = [100, 1000, 10000]; + let cardinalities = [("low", 20), ("mid", 80), ("high", 99)]; + let distributions = ["uniform", "skewed"]; + + for num_groups in group_counts { + for (card_name, distinct_pct) in cardinalities { + for dist in distributions { + let name = format!("g{num_groups}_{card_name}_{dist}"); + let n_distinct = BATCH_SIZE * distinct_pct / 100; + let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; + let group_indices = if dist == "uniform" { + create_uniform_groups(num_groups) + } else { + create_skewed_groups(num_groups) + }; + + let (_schema, args) = prepare_args(DataType::Int64); + + if count_fn.groups_accumulator_supported(args.clone()) { + c.bench_function(&format!("count_distinct_groups {name}"), |b| { + b.iter(|| { + let mut acc = + count_fn.create_groups_accumulator(args.clone()).unwrap(); + acc.update_batch( + std::slice::from_ref(&values), + &group_indices, + None, + num_groups, + ) + .unwrap(); + acc.evaluate(EmitTo::All).unwrap() + }) + }); + } else { + let arr = values.as_any().downcast_ref::().unwrap(); + let mut group_rows: Vec> = vec![Vec::new(); num_groups]; + for (idx, &group_idx) in group_indices.iter().enumerate() { + if arr.is_valid(idx) { + group_rows[group_idx].push(arr.value(idx)); } + } + let group_arrays: Vec = group_rows + .iter() + .map(|rows| Arc::new(Int64Array::from(rows.clone())) as ArrayRef) + .collect(); + + c.bench_function(&format!("count_distinct_groups {name}"), |b| { + b.iter(|| { + let mut accumulators: Vec<_> = (0..num_groups) + .map(|_| prepare_accumulator(DataType::Int64)) + .collect(); + + for (group_idx, batch) in group_arrays.iter().enumerate() { + if !batch.is_empty() { + accumulators[group_idx] + .update_batch(std::slice::from_ref(batch)) + .unwrap(); + } + } - let _results: Vec<_> = accumulators - .iter_mut() - .map(|acc| acc.evaluate().unwrap()) - .collect(); - }) - }, - ); + let _results: Vec<_> = accumulators + .iter_mut() + .map(|acc| acc.evaluate().unwrap()) + .collect(); + }) + }); + } + } } } } From c2bc9d376c96c3ff907011c1f3e3589373febbc3 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 16 Apr 2026 17:47:00 -0700 Subject: [PATCH 11/14] fix_compilation_issues --- datafusion/functions-aggregate/src/count.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 193c9e45bb567..eab36d4951a9c 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -21,11 +21,11 @@ use arrow::{ compute, datatypes::{ DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, - FieldRef, Float16Type, Float32Type, Float64Type, Int32Type, Int64Type, - Time32MillisecondType, Time32SecondType, Time64MicrosecondType, + FieldRef, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, + Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, - UInt32Type, UInt64Type, + UInt8Type, UInt16Type, UInt32Type, UInt64Type, }, }; use datafusion_common::hash_utils::RandomState; From 888f556e9501b0e16e28c99bf99d165d229afe24 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Fri, 17 Apr 2026 00:15:18 -0700 Subject: [PATCH 12/14] single_state_vec --- .../src/aggregate/count_distinct/groups.rs | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs index 5f8c9b3c3cc49..8db242dc9b19d 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs @@ -108,18 +108,32 @@ where EmitTo::First(n) => n, }; - let mut group_values: Vec> = vec![Vec::new(); num_emitted]; + // Prefix-sum counts[..num_emitted] into offsets + let mut offsets = Vec::with_capacity(num_emitted + 1); + offsets.push(0i32); + let mut total = 0i32; + for &c in &self.counts[..num_emitted] { + total += c as i32; + offsets.push(total); + } + + let mut all_values = vec![T::Native::default(); total as usize]; + let mut cursors: Vec = offsets[..num_emitted].to_vec(); if matches!(emit_to, EmitTo::All) { for (group_idx, value) in self.seen.drain() { - group_values[group_idx].push(value); + let pos = cursors[group_idx] as usize; + all_values[pos] = value; + cursors[group_idx] += 1; } self.counts.clear(); } else { let mut remaining = HashSet::default(); for (group_idx, value) in self.seen.drain() { if group_idx < num_emitted { - group_values[group_idx].push(value); + let pos = cursors[group_idx] as usize; + all_values[pos] = value; + cursors[group_idx] += 1; } else { remaining.insert((group_idx - num_emitted, value)); } @@ -128,13 +142,6 @@ where let _ = emit_to.take_needed(&mut self.counts); } - let mut offsets = vec![0i32]; - let mut all_values = Vec::new(); - for values in &group_values { - all_values.extend(values.iter().copied()); - offsets.push(all_values.len() as i32); - } - let values_array = Arc::new(PrimitiveArray::::from_iter_values(all_values)); let list_array = ListArray::new( Arc::new(Field::new_list_field(T::DATA_TYPE, true)), From 617e79032b8930cd89a52ec0f27e761d5b71c5c5 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Fri, 17 Apr 2026 00:18:15 -0700 Subject: [PATCH 13/14] slice_merge_batch --- .../src/aggregate/count_distinct/groups.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs index 8db242dc9b19d..956270e17de8b 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs @@ -163,11 +163,14 @@ where debug_assert_eq!(values.len(), 1); self.counts.resize(total_num_groups, 0); let list_array = values[0].as_list::(); + let inner = list_array.values().as_primitive::(); + let inner_values = inner.values(); + let offsets = list_array.offsets(); for (row_idx, &group_idx) in group_indices.iter().enumerate() { - let inner = list_array.value(row_idx); - let inner_arr = inner.as_primitive::(); - for &value in inner_arr.values().iter() { + let start = offsets[row_idx] as usize; + let end = offsets[row_idx + 1] as usize; + for &value in &inner_values[start..end] { if self.seen.insert((group_idx, value)) { self.counts[group_idx] += 1; } From 918036ee8b25ff5a5669e8b6a176adf395ef26ff Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sat, 18 Apr 2026 23:17:55 -0700 Subject: [PATCH 14/14] fix_pr_comments --- .../src/aggregate/count_distinct/groups.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs index 956270e17de8b..d370d59c90012 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/groups.rs @@ -18,7 +18,7 @@ use arrow::array::{ ArrayRef, AsArray, BooleanArray, Int64Array, ListArray, PrimitiveArray, }; -use arrow::buffer::OffsetBuffer; +use arrow::buffer::{OffsetBuffer, ScalarBuffer}; use arrow::datatypes::{ArrowPrimitiveType, Field}; use datafusion_common::HashSet; use datafusion_common::hash_utils::RandomState; @@ -142,7 +142,10 @@ where let _ = emit_to.take_needed(&mut self.counts); } - let values_array = Arc::new(PrimitiveArray::::from_iter_values(all_values)); + let values_array = Arc::new(PrimitiveArray::::new( + ScalarBuffer::from(all_values), + None, + )); let list_array = ListArray::new( Arc::new(Field::new_list_field(T::DATA_TYPE, true)), OffsetBuffer::new(offsets.into()),