From 6030334b94d279dd0d5a7ff0110c2f0da3f50b6e Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 5 Jun 2026 10:02:52 +0800 Subject: [PATCH 1/7] improve approx_distinct for small value --- .../src/approx_distinct.rs | 561 +++++++++++++++++- .../functions-aggregate/src/hyperloglog.rs | 71 ++- .../sqllogictest/test_files/aggregate.slt | 57 ++ 3 files changed, 677 insertions(+), 12 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index ee12d9050e1d0..c890af34c2e0d 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -17,10 +17,11 @@ //! Defines physical expressions that can evaluated at runtime during query execution -use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog}; +use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog, NUM_REGISTERS, count_from_hashes}; use arrow::array::{Array, BinaryArray, StringViewArray}; use arrow::array::{ - GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, + AsArray, BinaryBuilder, BooleanArray, GenericBinaryArray, GenericStringArray, + OffsetSizeTrait, PrimitiveArray, UInt64Array, }; use arrow::datatypes::{ ArrowPrimitiveType, Date32Type, Date64Type, FieldRef, Int32Type, Int64Type, @@ -37,7 +38,8 @@ use datafusion_common::{ use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; use datafusion_expr::utils::format_state_name; use datafusion_expr::{ - Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility, + Accumulator, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator, Signature, + Volatility, }; use datafusion_functions_aggregate_common::aggregate::count_distinct::{ Bitmap65536DistinctCountAccumulator, Bitmap65536DistinctCountAccumulatorI16, @@ -49,6 +51,7 @@ use datafusion_macros::user_doc; use std::fmt::{Debug, Formatter}; use std::hash::{BuildHasher, Hash}; use std::marker::PhantomData; +use std::sync::Arc; make_udaf_expr_and_func!( ApproxDistinct, @@ -294,6 +297,356 @@ where default_accumulator_impl!(); } +/// Maximum number of distinct hashes kept in the sparse representation of a +/// per-group sketch before it is promoted to a dense [`HyperLogLog`]. +/// +/// A dense sketch always occupies [`NUM_REGISTERS`] (16 KiB) regardless of how +/// many values it has seen. The vast majority of groups in a high-cardinality +/// `GROUP BY` only observe a handful of distinct values, so keeping their state +/// as a small list of hashes saves a huge amount of memory (both while +/// aggregating and when serializing the partial state for the final phase). +const SPARSE_LIMIT: usize = 256; + +/// Per-group HyperLogLog state used by [`HllGroupsAccumulator`]. +/// +/// Starts out as a compact list of the (deduplicated) hashes observed for the +/// group and only switches to a full dense [`HyperLogLog`] once it has seen more +/// than [`SPARSE_LIMIT`] distinct values. Folding the stored hashes into a dense +/// sketch produces exactly the same registers as adding the original values one +/// by one, so the cardinality estimate is identical to the per-group +/// [`Accumulator`] path. +#[derive(Clone, Debug)] +enum GroupHll { + /// Distinct hashes seen so far. May contain duplicates between compactions. + Sparse(Vec), + Dense(Box>), +} + +impl Default for GroupHll { + fn default() -> Self { + GroupHll::Sparse(Vec::new()) + } +} + +impl GroupHll { + /// Add a pre-computed hash, returning the change in heap-allocated bytes so + /// the accumulator can track its memory usage incrementally. + #[inline] + fn add_hash(&mut self, hash: u64) -> isize { + match self { + GroupHll::Dense(hll) => { + hll.add_hashed(hash); + 0 + } + GroupHll::Sparse(v) => { + let cap_before = v.capacity(); + v.push(hash); + if v.len() >= 2 * SPARSE_LIMIT { + return self.compact_or_promote(cap_before); + } + ((v.capacity() - cap_before) * size_of::()) as isize + } + } + } + + /// Deduplicate the sparse hash list and, if it still exceeds + /// [`SPARSE_LIMIT`] distinct values, promote it to a dense sketch. + #[cold] + fn compact_or_promote(&mut self, cap_before: usize) -> isize { + let GroupHll::Sparse(v) = self else { + return 0; + }; + v.sort_unstable(); + v.dedup(); + if v.len() > SPARSE_LIMIT { + let mut hll = HyperLogLog::::new(); + for &h in v.iter() { + hll.add_hashed(h); + } + *self = GroupHll::Dense(Box::new(hll)); + (NUM_REGISTERS as isize) - ((cap_before * size_of::()) as isize) + } else { + // capacity is unchanged by sort/dedup + 0 + } + } + + /// Merge a serialized state (produced by [`Self::serialize`] or by the + /// per-group [`Accumulator`]) into this sketch. + fn merge_serialized(&mut self, bytes: &[u8]) -> Result { + if bytes.is_empty() { + return Ok(0); + } + if bytes.len() == NUM_REGISTERS { + let other: HyperLogLog = bytes.try_into()?; + Ok(self.merge_dense(&other)) + } else { + debug_assert_eq!(bytes.len() % size_of::(), 0); + let mut delta = 0; + for chunk in bytes.chunks_exact(size_of::()) { + let h = u64::from_le_bytes(chunk.try_into().unwrap()); + delta += self.add_hash(h); + } + Ok(delta) + } + } + + /// Merge a dense sketch into this one, promoting to dense if necessary. + fn merge_dense(&mut self, other: &HyperLogLog) -> isize { + match self { + GroupHll::Dense(hll) => { + hll.merge(other); + 0 + } + GroupHll::Sparse(v) => { + let cap_before = v.capacity(); + let mut hll = other.clone(); + for &h in v.iter() { + hll.add_hashed(h); + } + *self = GroupHll::Dense(Box::new(hll)); + (NUM_REGISTERS as isize) - ((cap_before * size_of::()) as isize) + } + } + } + + /// The approximate number of distinct values seen by this group. + fn count(&self) -> u64 { + match self { + GroupHll::Dense(hll) => hll.count() as u64, + // Estimate directly from the stored hashes; this produces exactly the + // same value as folding them into a dense sketch but avoids + // allocating and scanning a 16 KiB register array for every group. + GroupHll::Sparse(v) => count_from_hashes(v) as u64, + } + } + + /// Serialize the sketch into `scratch` (which is cleared first). A dense + /// sketch is written as its raw [`NUM_REGISTERS`] registers (wire-compatible + /// with the per-group [`Accumulator`]); a sparse sketch is written as its + /// distinct hashes in little-endian order. + fn serialize(&mut self, scratch: &mut Vec) { + scratch.clear(); + match self { + GroupHll::Dense(hll) => { + let registers: &[u8] = (**hll).as_ref(); + scratch.extend_from_slice(registers); + } + GroupHll::Sparse(v) => { + v.sort_unstable(); + v.dedup(); + for &h in v.iter() { + scratch.extend_from_slice(&h.to_le_bytes()); + } + } + } + } +} + +/// Computes HyperLogLog hashes for the rows of an input array, type by type. +/// +/// The hashing matches the per-group [`Accumulator`] implementations exactly so +/// that the grouped and ungrouped paths produce identical estimates. +trait HllValueHasher: Send + Sync + 'static { + /// Invoke `f(row_index, hash)` for every non-null row of `array`. + fn for_each_hash(array: &dyn Array, f: impl FnMut(usize, u64)); +} + +struct NumericHasher(PhantomData); + +impl HllValueHasher for NumericHasher +where + T: ArrowPrimitiveType + Send + Sync + 'static, + T::Native: Hash, +{ + #[inline] + fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) { + let array: &PrimitiveArray = array.as_primitive::(); + if array.null_count() == 0 { + for (i, v) in array.values().iter().enumerate() { + f(i, HLL_HASH_STATE.hash_one(v)); + } + } else { + for i in 0..array.len() { + if array.is_valid(i) { + f(i, HLL_HASH_STATE.hash_one(array.value(i))); + } + } + } + } +} + +struct Utf8Hasher(PhantomData); + +impl HllValueHasher for Utf8Hasher { + #[inline] + fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) { + let array: &GenericStringArray = array.as_string::(); + for i in 0..array.len() { + if array.is_valid(i) { + f(i, HLL_HASH_STATE.hash_one(array.value(i))); + } + } + } +} + +struct Utf8ViewHasher; + +impl HllValueHasher for Utf8ViewHasher { + #[inline] + fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) { + let array: &StringViewArray = array.as_string_view(); + // Mirror `StringViewHLLAccumulator`: hash the raw inline view when all + // strings are stored inline (≤ 12 bytes), avoiding `&str` materialization. + if array.data_buffers().is_empty() { + let views = array.views(); + for i in 0..array.len() { + if array.is_valid(i) { + f(i, HLL_HASH_STATE.hash_one(views[i])); + } + } + } else { + for i in 0..array.len() { + if array.is_valid(i) { + f(i, HLL_HASH_STATE.hash_one(array.value(i))); + } + } + } + } +} + +struct BinaryHasher(PhantomData); + +impl HllValueHasher for BinaryHasher { + #[inline] + fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) { + let array: &GenericBinaryArray = array.as_binary::(); + for i in 0..array.len() { + if array.is_valid(i) { + f(i, HLL_HASH_STATE.hash_one(array.value(i))); + } + } + } +} + +/// A [`GroupsAccumulator`] for `approx_distinct` that keeps one adaptive +/// (sparse → dense) HyperLogLog sketch per group. +/// +/// This is dramatically faster than the generic `GroupsAccumulatorAdapter` +/// fallback for high-cardinality `GROUP BY`s: it processes the whole input in a +/// single vectorized pass (no per-group `take`/slice and no dynamic dispatch), +/// and the sparse representation avoids allocating a 16 KiB sketch for every +/// group when most groups only see a few distinct values. +struct HllGroupsAccumulator { + /// Per-group sketches, indexed by `group_index`. + groups: Vec, + /// Incrementally maintained estimate of heap bytes used by `groups`. + allocated_bytes: usize, + phantom: PhantomData, +} + +impl HllGroupsAccumulator { + fn new() -> Self { + Self { + groups: Vec::new(), + allocated_bytes: 0, + phantom: PhantomData, + } + } + + #[inline] + fn ensure_groups(&mut self, total_num_groups: usize) { + if total_num_groups > self.groups.len() { + self.groups.resize_with(total_num_groups, GroupHll::default); + } + } + + #[inline] + fn apply_delta(&mut self, delta: isize) { + self.allocated_bytes = + (self.allocated_bytes as isize).saturating_add(delta).max(0) as usize; + } +} + +impl GroupsAccumulator for HllGroupsAccumulator { + fn update_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.ensure_groups(total_num_groups); + let groups = &mut self.groups; + let mut delta: isize = 0; + match opt_filter { + None => H::for_each_hash(values[0].as_ref(), |row, hash| { + delta += groups[group_indices[row]].add_hash(hash); + }), + Some(filter) => H::for_each_hash(values[0].as_ref(), |row, hash| { + if filter.value(row) { + delta += groups[group_indices[row]].add_hash(hash); + } + }), + } + self.apply_delta(delta); + Ok(()) + } + + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + self.ensure_groups(total_num_groups); + let states = downcast_value!(values[0], BinaryArray); + let mut delta: isize = 0; + for (row, &group_index) in group_indices.iter().enumerate() { + if let Some(filter) = opt_filter + && !filter.value(row) + { + continue; + } + if states.is_valid(row) { + delta += self.groups[group_index].merge_serialized(states.value(row))?; + } + } + self.apply_delta(delta); + Ok(()) + } + + fn evaluate(&mut self, emit_to: EmitTo) -> Result { + let groups = emit_to.take_needed(&mut self.groups); + let counts: UInt64Array = groups.iter().map(|g| Some(g.count())).collect(); + // The emitted groups are gone; recompute the memory estimate lazily on + // the next batch rather than tracking the freed bytes precisely. + if matches!(emit_to, EmitTo::All) { + self.allocated_bytes = 0; + } + Ok(Arc::new(counts)) + } + + fn state(&mut self, emit_to: EmitTo) -> Result> { + let mut groups = emit_to.take_needed(&mut self.groups); + let mut builder = BinaryBuilder::new(); + let mut scratch: Vec = Vec::new(); + for g in groups.iter_mut() { + g.serialize(&mut scratch); + builder.append_value(&scratch); + } + if matches!(emit_to, EmitTo::All) { + self.allocated_bytes = 0; + } + Ok(vec![Arc::new(builder.finish())]) + } + + fn size(&self) -> usize { + self.groups.capacity() * size_of::() + self.allocated_bytes + } +} + impl Debug for ApproxDistinct { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("ApproxDistinct") @@ -481,7 +834,209 @@ impl AggregateUDFImpl for ApproxDistinct { Ok(accumulator) } + fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { + is_hll_groups_type(args.expr_fields[0].data_type()) + } + + fn create_groups_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { + let data_type = args.expr_fields[0].data_type(); + let accumulator: Box = match data_type { + DataType::UInt32 => { + Box::new(HllGroupsAccumulator::>::new()) + } + DataType::UInt64 => { + Box::new(HllGroupsAccumulator::>::new()) + } + DataType::Int32 => { + Box::new(HllGroupsAccumulator::>::new()) + } + DataType::Int64 => { + Box::new(HllGroupsAccumulator::>::new()) + } + DataType::Date32 => { + Box::new(HllGroupsAccumulator::>::new()) + } + DataType::Date64 => { + Box::new(HllGroupsAccumulator::>::new()) + } + DataType::Time32(TimeUnit::Second) => { + Box::new(HllGroupsAccumulator::>::new()) + } + DataType::Time32(TimeUnit::Millisecond) => Box::new(HllGroupsAccumulator::< + NumericHasher, + >::new()), + DataType::Time64(TimeUnit::Microsecond) => Box::new(HllGroupsAccumulator::< + NumericHasher, + >::new()), + DataType::Time64(TimeUnit::Nanosecond) => Box::new(HllGroupsAccumulator::< + NumericHasher, + >::new()), + DataType::Timestamp(TimeUnit::Second, _) => Box::new(HllGroupsAccumulator::< + NumericHasher, + >::new()), + DataType::Timestamp(TimeUnit::Millisecond, _) => { + Box::new(HllGroupsAccumulator::< + NumericHasher, + >::new()) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + Box::new(HllGroupsAccumulator::< + NumericHasher, + >::new()) + } + DataType::Timestamp(TimeUnit::Nanosecond, _) => Box::new( + HllGroupsAccumulator::>::new(), + ), + DataType::Utf8 => Box::new(HllGroupsAccumulator::>::new()), + DataType::LargeUtf8 => { + Box::new(HllGroupsAccumulator::>::new()) + } + DataType::Utf8View => Box::new(HllGroupsAccumulator::::new()), + DataType::Binary => { + Box::new(HllGroupsAccumulator::>::new()) + } + DataType::LargeBinary => { + Box::new(HllGroupsAccumulator::>::new()) + } + other => { + return not_impl_err!( + "GroupsAccumulator for 'approx_distinct' is not implemented for data type {other}" + ); + } + }; + Ok(accumulator) + } + fn documentation(&self) -> Option<&Documentation> { self.doc() } } + +/// Returns true for the data types backed by the HyperLogLog +/// [`HllGroupsAccumulator`]. The fixed-domain types (booleans / small ints) and +/// `Null` fall back to the per-group [`Accumulator`] path. +fn is_hll_groups_type(data_type: &DataType) -> bool { + matches!( + data_type, + DataType::UInt32 + | DataType::UInt64 + | DataType::Int32 + | DataType::Int64 + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Binary + | DataType::LargeBinary + ) +} + +#[cfg(test)] +mod groups_tests { + use super::*; + + /// Hash a value the same way the accumulators do. + fn h(v: u64) -> u64 { + HLL_HASH_STATE.hash_one(v) + } + + /// Reference count: fold the given distinct hashes straight into a dense + /// HyperLogLog. The grouped sketch must agree with this exactly. + fn reference_count(hashes: &[u64]) -> u64 { + let mut hll = HyperLogLog::::new(); + for &hash in hashes { + hll.add_hashed(hash); + } + hll.count() as u64 + } + + fn serialize(g: &mut GroupHll) -> Vec { + let mut buf = Vec::new(); + g.serialize(&mut buf); + buf + } + + #[test] + fn sparse_stays_sparse_for_small_groups() { + let mut g = GroupHll::default(); + let hashes: Vec = (0..50).map(h).collect(); + for &hash in &hashes { + g.add_hash(hash); + } + // duplicates must not change the estimate or trigger promotion + for &hash in &hashes { + g.add_hash(hash); + } + assert!( + matches!(g, GroupHll::Sparse(_)), + "small group must be sparse" + ); + assert_eq!(g.count(), reference_count(&hashes)); + // sparse serialized state is far smaller than a dense 16 KiB sketch + assert!(serialize(&mut g).len() < NUM_REGISTERS); + } + + #[test] + fn promotes_to_dense_for_large_groups() { + let mut g = GroupHll::default(); + let hashes: Vec = (0..(SPARSE_LIMIT as u64 * 4)).map(h).collect(); + for &hash in &hashes { + g.add_hash(hash); + } + assert!(matches!(g, GroupHll::Dense(_)), "large group must be dense"); + assert_eq!(g.count(), reference_count(&hashes)); + } + + #[test] + fn serialize_then_merge_roundtrips() { + for n in [0u64, 10, SPARSE_LIMIT as u64 * 4] { + let hashes: Vec = (0..n).map(h).collect(); + let mut src = GroupHll::default(); + for &hash in &hashes { + src.add_hash(hash); + } + let bytes = serialize(&mut src); + let mut dst = GroupHll::default(); + dst.merge_serialized(&bytes).unwrap(); + assert_eq!(dst.count(), reference_count(&hashes), "n = {n}"); + } + } + + #[test] + fn merge_combines_disjoint_groups() { + // sparse + sparse, sparse + dense, dense + dense + let left: Vec = (0..100).map(h).collect(); + let right: Vec = (100..(SPARSE_LIMIT as u64 * 4)).map(h).collect(); + let all: Vec = left.iter().chain(right.iter()).copied().collect(); + + let mut a = GroupHll::default(); + for &hash in &left { + a.add_hash(hash); + } + let mut b = GroupHll::default(); + for &hash in &right { + b.add_hash(hash); + } + let b_bytes = serialize(&mut b); + a.merge_serialized(&b_bytes).unwrap(); + assert_eq!(a.count(), reference_count(&all)); + } + + #[test] + fn empty_group_counts_zero() { + let mut g = GroupHll::default(); + assert_eq!(g.count(), 0); + let bytes = serialize(&mut g); + assert!(bytes.is_empty()); + let mut dst = GroupHll::default(); + dst.merge_serialized(&bytes).unwrap(); + assert_eq!(dst.count(), 0); + } +} diff --git a/datafusion/functions-aggregate/src/hyperloglog.rs b/datafusion/functions-aggregate/src/hyperloglog.rs index 3861800847edb..182fe15cf0f24 100644 --- a/datafusion/functions-aggregate/src/hyperloglog.rs +++ b/datafusion/functions-aggregate/src/hyperloglog.rs @@ -42,7 +42,7 @@ use std::marker::PhantomData; const HLL_P: usize = 14_usize; /// The number of bits of the hash value used determining the number of leading zeros const HLL_Q: usize = 64_usize - HLL_P; -const NUM_REGISTERS: usize = 1_usize << HLL_P; +pub(crate) const NUM_REGISTERS: usize = 1_usize << HLL_P; /// Mask to obtain index into the registers const HLL_P_MASK: u64 = (NUM_REGISTERS as u64) - 1; @@ -145,16 +145,69 @@ where /// Guess the number of unique elements seen by the HyperLogLog. pub fn count(&self) -> usize { - let histogram = self.get_histogram(); - let m = NUM_REGISTERS as f64; - let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m); - for i in histogram[1..=HLL_Q].iter().rev() { - z += *i as f64; - z *= 0.5; + count_from_histogram(&self.get_histogram()) + } +} + +/// Compute `index` and `rho` (register value) for a precomputed hash, exactly as +/// [`HyperLogLog::add_hashed`] does. +#[inline] +pub(crate) fn register_for_hash(hash: u64) -> (usize, u8) { + let index = (hash & HLL_P_MASK) as usize; + let rho = (((hash >> HLL_P) | (1_u64 << HLL_Q)).trailing_zeros() + 1) as u8; + (index, rho) +} + +/// Estimate the cardinality of a set of precomputed hashes without +/// materializing a full [`NUM_REGISTERS`]-byte register array. +/// +/// This is equivalent to adding every hash to a fresh [`HyperLogLog`] via +/// [`HyperLogLog::add_hashed`] and calling [`HyperLogLog::count`], but only does +/// work proportional to the number of hashes. It is used to cheaply estimate the +/// many small groups produced by a high-cardinality `GROUP BY`, where allocating +/// and scanning a 16 KiB sketch per group would dominate the runtime. +/// +/// `hashes` may contain duplicates (duplicate hashes are idempotent). +pub(crate) fn count_from_hashes(hashes: &[u64]) -> usize { + if hashes.is_empty() { + return 0; + } + // For each touched register index keep the maximum rho. Sorting by + // (index, rho) groups equal indices together with the max rho last. + let mut idx_rho: Vec<(usize, u8)> = + hashes.iter().map(|&hash| register_for_hash(hash)).collect(); + idx_rho.sort_unstable(); + + let mut histogram = [0u32; HLL_Q + 2]; + let mut touched = 0u32; + let mut i = 0; + while i < idx_rho.len() { + let index = idx_rho[i].0; + let mut max_rho = idx_rho[i].1; + i += 1; + while i < idx_rho.len() && idx_rho[i].0 == index { + max_rho = idx_rho[i].1; // ascending rho => last is the max + i += 1; } - z += m * hll_sigma(histogram[0] as f64 / m); - (0.5 / 2_f64.ln() * m * m / z).round() as usize + histogram[max_rho as usize] += 1; + touched += 1; + } + // All remaining registers are still zero. + histogram[0] = NUM_REGISTERS as u32 - touched; + count_from_histogram(&histogram) +} + +/// Apply the HyperLogLog cardinality estimator to a register histogram. +#[inline] +fn count_from_histogram(histogram: &[u32; HLL_Q + 2]) -> usize { + let m = NUM_REGISTERS as f64; + let mut z = m * hll_tau((m - histogram[HLL_Q + 1] as f64) / m); + for i in histogram[1..=HLL_Q].iter().rev() { + z += *i as f64; + z *= 0.5; } + z += m * hll_sigma(histogram[0] as f64 / m); + (0.5 / 2_f64.ln() * m * m / z).round() as usize } /// Helper function sigma as defined in diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 2861b50580407..8dc6f4df7d79c 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1905,6 +1905,63 @@ SELECT approx_distinct(b) FROM approx_distinct_bool_test; statement ok DROP TABLE approx_distinct_bool_test; +# Grouped approx_distinct uses a dedicated GroupsAccumulator (sparse -> dense +# HyperLogLog per group). For small groups the estimate is exact, so the grouped +# result must match the per-group scalar result. +statement ok +CREATE TABLE approx_distinct_group_test (g INT, s VARCHAR, i INT) AS VALUES + (1, 'a', 10), (1, 'a', 10), (1, 'b', 20), + (2, 'c', 30), (2, 'd', 30), (2, 'c', 40), + (3, NULL, NULL), (3, NULL, NULL), + (4, 'e', 50); + +# Strings (Utf8): group 1 -> {a,b}=2, group 2 -> {c,d}=2, group 3 -> all null=0, group 4 -> {e}=1 +query II +SELECT g, approx_distinct(s) FROM approx_distinct_group_test GROUP BY g ORDER BY g; +---- +1 2 +2 2 +3 0 +4 1 + +# Utf8View takes the inline-view hashing path +query II +SELECT g, approx_distinct(arrow_cast(s, 'Utf8View')) FROM approx_distinct_group_test GROUP BY g ORDER BY g; +---- +1 2 +2 2 +3 0 +4 1 + +# Integers (Int32): group 1 -> {10,20}=2, group 2 -> {30,40}=2, group 3 -> 0, group 4 -> {50}=1 +query II +SELECT g, approx_distinct(i) FROM approx_distinct_group_test GROUP BY g ORDER BY g; +---- +1 2 +2 2 +3 0 +4 1 + +statement ok +DROP TABLE approx_distinct_group_test; + +# Grouped approx_distinct that crosses the sparse -> dense promotion threshold: +# 2000 distinct values in group 0 and 2000 in group 1. The estimate should be +# within HyperLogLog's error margin (~0.8%) of the true cardinality. +statement ok +CREATE TABLE approx_distinct_dense_test AS + SELECT (v % 2) AS g, v AS i FROM generate_series(0, 3999) AS t(v); + +query B +SELECT min(c) > 1900 AND max(c) < 2100 FROM ( + SELECT g, approx_distinct(i) AS c FROM approx_distinct_dense_test GROUP BY g +); +---- +true + +statement ok +DROP TABLE approx_distinct_dense_test; + ## This test executes the APPROX_PERCENTILE_CONT aggregation against the test ## data, asserting the estimated quantiles are ±5% their actual values. ## From 9660fc01400d7a6efdac248fd828bc8dc1403fd4 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 5 Jun 2026 10:43:49 +0800 Subject: [PATCH 2/7] add benchmark --- .../benches/approx_distinct.rs | 115 +++++++++++++++++- 1 file changed, 113 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index cc85c2163c180..44b45431e3eb1 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::hint::black_box; use std::sync::Arc; use arrow::array::{ @@ -24,8 +25,12 @@ use arrow::array::{ use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::function::AccumulatorArgs; -use datafusion_expr::{Accumulator, AggregateUDFImpl}; +use datafusion_expr::{ + Accumulator, AggregateUDF, AggregateUDFImpl, EmitTo, GroupsAccumulator, +}; use datafusion_functions_aggregate::approx_distinct::ApproxDistinct; +use datafusion_physical_expr::GroupsAccumulatorAdapter; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::col; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; @@ -34,6 +39,11 @@ const BATCH_SIZE: usize = 8192; const SHORT_STRING_LENGTH: usize = 8; const LONG_STRING_LENGTH: usize = 20; +// Grouped (high-cardinality `GROUP BY`) benchmark parameters. +const N_GROUPS: usize = 50_000; +const AVG_ROWS_PER_GROUP: usize = 8; +const STRING_POOL_SIZE: usize = 100_000; + fn prepare_accumulator(data_type: DataType) -> Box { let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)])); let expr = col("f", &schema).unwrap(); @@ -216,5 +226,106 @@ fn approx_distinct_benchmark(c: &mut Criterion) { }); } -criterion_group!(benches, approx_distinct_benchmark); +/// Build a `GroupsAccumulator` the same way the aggregate operator does: use the +/// specialized one if the function supports it, otherwise fall back to wrapping +/// the per-group `Accumulator` in a `GroupsAccumulatorAdapter`. +fn prepare_groups_accumulator(data_type: DataType) -> Box { + let schema = Arc::new(Schema::new(vec![Field::new("f", data_type, true)])); + let expr = col("f", &schema).unwrap(); + let udf = Arc::new(AggregateUDF::from(ApproxDistinct::new())); + let agg = Arc::new( + AggregateExprBuilder::new(udf, vec![expr]) + .schema(schema) + .alias("approx_distinct(f)") + .build() + .unwrap(), + ); + + if agg.groups_accumulator_supported() { + agg.create_groups_accumulator().unwrap() + } else { + let agg = Arc::clone(&agg); + let factory = move || agg.create_accumulator(); + Box::new(GroupsAccumulatorAdapter::new(factory)) + } +} + +fn grouped_total_rows() -> usize { + N_GROUPS * AVG_ROWS_PER_GROUP +} + +/// A random group index in `0..N_GROUPS` for each row of a batch. +fn make_group_indices(rng: &mut StdRng) -> Vec { + (0..BATCH_SIZE) + .map(|_| rng.random_range(0..N_GROUPS)) + .collect() +} + +/// Pre-build all input batches `(values, group_indices)` for the grouped run, so +/// the measured loop only times the accumulator, not data generation. +fn build_grouped_batches(data_type: &DataType) -> Vec<(ArrayRef, Vec)> { + let n_batches = grouped_total_rows().div_ceil(BATCH_SIZE); + let mut rng = StdRng::seed_from_u64(7); + let pool = create_string_pool(STRING_POOL_SIZE, SHORT_STRING_LENGTH); + + (0..n_batches) + .map(|_| { + let group_indices = make_group_indices(&mut rng); + let values: ArrayRef = match data_type { + DataType::Int64 => Arc::new( + (0..BATCH_SIZE) + .map(|_| Some(rng.random::())) + .collect::(), + ), + DataType::Utf8 => Arc::new( + (0..BATCH_SIZE) + .map(|_| Some(pool[rng.random_range(0..pool.len())].as_str())) + .collect::(), + ), + DataType::Utf8View => Arc::new( + (0..BATCH_SIZE) + .map(|_| Some(pool[rng.random_range(0..pool.len())].as_str())) + .collect::(), + ), + other => panic!("unsupported grouped bench type: {other}"), + }; + (values, group_indices) + }) + .collect() +} + +/// Benchmark grouped `approx_distinct` over many groups. Each iteration feeds all batches into a +/// fresh accumulator and emits the result for every group. +fn approx_distinct_grouped_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("approx_distinct_grouped"); + group.sample_size(10); + + for data_type in [DataType::Int64, DataType::Utf8, DataType::Utf8View] { + let batches = build_grouped_batches(&data_type); + let label = format!("{data_type:?} {N_GROUPS} groups"); + group.bench_function(&label, |b| { + b.iter(|| { + let mut acc = prepare_groups_accumulator(data_type.clone()); + for (values, group_indices) in &batches { + acc.update_batch( + std::slice::from_ref(values), + group_indices, + None, + N_GROUPS, + ) + .unwrap(); + } + black_box(acc.evaluate(EmitTo::All).unwrap()); + }) + }); + } + + group.finish(); +} + +criterion_group!( + benches, + approx_distinct_benchmark, + approx_distinct_grouped_benchmark +); criterion_main!(benches); From 5a2203393fbf9cdc308f4237d0f0a9a28480998a Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 5 Jun 2026 11:25:12 +0800 Subject: [PATCH 3/7] update --- .../src/approx_distinct.rs | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index c890af34c2e0d..a2210f01d2f3b 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -421,6 +421,16 @@ impl GroupHll { } } + /// Heap bytes held by this sketch. Mirrors the deltas accrued in + /// [`Self::add_hash`] / [`Self::merge_dense`] so emitting a group can + /// precisely reverse them. + fn heap_bytes(&self) -> usize { + match self { + GroupHll::Sparse(v) => v.capacity() * size_of::(), + GroupHll::Dense(_) => NUM_REGISTERS, + } + } + /// Serialize the sketch into `scratch` (which is cleared first). A dense /// sketch is written as its raw [`NUM_REGISTERS`] registers (wire-compatible /// with the per-group [`Accumulator`]); a sparse sketch is written as its @@ -619,12 +629,16 @@ impl GroupsAccumulator for HllGroupsAccumulator { fn evaluate(&mut self, emit_to: EmitTo) -> Result { let groups = emit_to.take_needed(&mut self.groups); - let counts: UInt64Array = groups.iter().map(|g| Some(g.count())).collect(); - // The emitted groups are gone; recompute the memory estimate lazily on - // the next batch rather than tracking the freed bytes precisely. - if matches!(emit_to, EmitTo::All) { - self.allocated_bytes = 0; - } + let mut freed = 0; + let counts: UInt64Array = groups + .iter() + .map(|g| { + freed += g.heap_bytes(); + Some(g.count()) + }) + .collect(); + // The emitted groups have been removed; reclaim their tracked bytes. + self.allocated_bytes = self.allocated_bytes.saturating_sub(freed); Ok(Arc::new(counts)) } @@ -632,13 +646,14 @@ impl GroupsAccumulator for HllGroupsAccumulator { let mut groups = emit_to.take_needed(&mut self.groups); let mut builder = BinaryBuilder::new(); let mut scratch: Vec = Vec::new(); + let mut freed = 0; for g in groups.iter_mut() { + freed += g.heap_bytes(); g.serialize(&mut scratch); builder.append_value(&scratch); } - if matches!(emit_to, EmitTo::All) { - self.allocated_bytes = 0; - } + // The emitted groups have been removed; reclaim their tracked bytes. + self.allocated_bytes = self.allocated_bytes.saturating_sub(freed); Ok(vec![Arc::new(builder.finish())]) } From 0d668535bb4c245478a497bad9b4804779441728 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 5 Jun 2026 11:56:27 +0800 Subject: [PATCH 4/7] update test case --- .../sqllogictest/test_files/aggregate.slt | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 8dc6f4df7d79c..c0be055cdcc36 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1905,9 +1905,12 @@ SELECT approx_distinct(b) FROM approx_distinct_bool_test; statement ok DROP TABLE approx_distinct_bool_test; -# Grouped approx_distinct uses a dedicated GroupsAccumulator (sparse -> dense -# HyperLogLog per group). For small groups the estimate is exact, so the grouped -# result must match the per-group scalar result. +# Grouped approx_distinct uses a dedicated GroupsAccumulator (adaptive +# sparse -> dense HyperLogLog per group). Results are deterministic (the HLL uses +# a fixed hash seed); for these specific small inputs the 16384-register HLL +# estimates the true distinct count exactly. The key invariant is that the +# grouped path agrees with the scalar (no GROUP BY) path on the same data, which +# is checked explicitly below. statement ok CREATE TABLE approx_distinct_group_test (g INT, s VARCHAR, i INT) AS VALUES (1, 'a', 10), (1, 'a', 10), (1, 'b', 20), @@ -1942,6 +1945,19 @@ SELECT g, approx_distinct(i) FROM approx_distinct_group_test GROUP BY g ORDER BY 3 0 4 1 +# Invariant: the scalar (no GROUP BY) path must agree with the grouped path on +# the same data. The grouped result for g = 2 above is 2, and so is the scalar +# result over only g = 2's rows. +query I +SELECT approx_distinct(s) FROM approx_distinct_group_test WHERE g = 2; +---- +2 + +query I +SELECT approx_distinct(i) FROM approx_distinct_group_test WHERE g = 2; +---- +2 + statement ok DROP TABLE approx_distinct_group_test; From db8baf2ee4f51086b71a601d65322d09608565fa Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 5 Jun 2026 19:36:20 +0800 Subject: [PATCH 5/7] apply suggestion --- .../src/approx_distinct.rs | 172 ++++++++++++++---- 1 file changed, 135 insertions(+), 37 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index a2210f01d2f3b..45e1277c52c70 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -23,6 +23,7 @@ use arrow::array::{ AsArray, BinaryBuilder, BooleanArray, GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, UInt64Array, }; +use arrow::buffer::NullBuffer; use arrow::datatypes::{ ArrowPrimitiveType, Date32Type, Date64Type, FieldRef, Int32Type, Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, @@ -46,6 +47,7 @@ use datafusion_functions_aggregate_common::aggregate::count_distinct::{ BoolArray256DistinctCountAccumulator, BoolArray256DistinctCountAccumulatorI8, BooleanDistinctCountAccumulator, }; +use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filter_to_nulls; use datafusion_functions_aggregate_common::noop_accumulator::NoopAccumulator; use datafusion_macros::user_doc; use std::fmt::{Debug, Formatter}; @@ -381,7 +383,20 @@ impl GroupHll { let other: HyperLogLog = bytes.try_into()?; Ok(self.merge_dense(&other)) } else { - debug_assert_eq!(bytes.len() % size_of::(), 0); + if !bytes.len().is_multiple_of(size_of::()) { + return internal_err!( + "approx_distinct: malformed sparse state: length {} is not a multiple of {}", + bytes.len(), + size_of::() + ); + } + if bytes.len() > SPARSE_LIMIT * size_of::() { + return internal_err!( + "approx_distinct: malformed sparse state: length {} exceeds sparse limit {}", + bytes.len(), + SPARSE_LIMIT * size_of::() + ); + } let mut delta = 0; for chunk in bytes.chunks_exact(size_of::()) { let h = u64::from_le_bytes(chunk.try_into().unwrap()); @@ -458,8 +473,14 @@ impl GroupHll { /// The hashing matches the per-group [`Accumulator`] implementations exactly so /// that the grouped and ungrouped paths produce identical estimates. trait HllValueHasher: Send + Sync + 'static { - /// Invoke `f(row_index, hash)` for every non-null row of `array`. - fn for_each_hash(array: &dyn Array, f: impl FnMut(usize, u64)); + /// Invoke `f(row_index, hash)` for every row that is valid according to + /// `nulls`. `nulls = None` means every row is valid (caller has + /// pre-combined value-nulls and filter into a single buffer). + fn for_each_hash( + array: &dyn Array, + nulls: Option<&NullBuffer>, + f: impl FnMut(usize, u64), + ); } struct NumericHasher(PhantomData); @@ -470,16 +491,23 @@ where T::Native: Hash, { #[inline] - fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) { + fn for_each_hash( + array: &dyn Array, + nulls: Option<&NullBuffer>, + mut f: impl FnMut(usize, u64), + ) { let array: &PrimitiveArray = array.as_primitive::(); - if array.null_count() == 0 { - for (i, v) in array.values().iter().enumerate() { - f(i, HLL_HASH_STATE.hash_one(v)); + match nulls { + None => { + for (i, v) in array.values().iter().enumerate() { + f(i, HLL_HASH_STATE.hash_one(v)); + } } - } else { - for i in 0..array.len() { - if array.is_valid(i) { - f(i, HLL_HASH_STATE.hash_one(array.value(i))); + Some(nulls) => { + for i in 0..array.len() { + if nulls.is_valid(i) { + f(i, HLL_HASH_STATE.hash_one(array.value(i))); + } } } } @@ -490,10 +518,14 @@ struct Utf8Hasher(PhantomData); impl HllValueHasher for Utf8Hasher { #[inline] - fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) { + fn for_each_hash( + array: &dyn Array, + nulls: Option<&NullBuffer>, + mut f: impl FnMut(usize, u64), + ) { let array: &GenericStringArray = array.as_string::(); for i in 0..array.len() { - if array.is_valid(i) { + if nulls.is_none_or(|n| n.is_valid(i)) { f(i, HLL_HASH_STATE.hash_one(array.value(i))); } } @@ -504,20 +536,24 @@ struct Utf8ViewHasher; impl HllValueHasher for Utf8ViewHasher { #[inline] - fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) { + fn for_each_hash( + array: &dyn Array, + nulls: Option<&NullBuffer>, + mut f: impl FnMut(usize, u64), + ) { let array: &StringViewArray = array.as_string_view(); // Mirror `StringViewHLLAccumulator`: hash the raw inline view when all // strings are stored inline (≤ 12 bytes), avoiding `&str` materialization. if array.data_buffers().is_empty() { let views = array.views(); for i in 0..array.len() { - if array.is_valid(i) { + if nulls.is_none_or(|n| n.is_valid(i)) { f(i, HLL_HASH_STATE.hash_one(views[i])); } } } else { for i in 0..array.len() { - if array.is_valid(i) { + if nulls.is_none_or(|n| n.is_valid(i)) { f(i, HLL_HASH_STATE.hash_one(array.value(i))); } } @@ -529,10 +565,14 @@ struct BinaryHasher(PhantomData); impl HllValueHasher for BinaryHasher { #[inline] - fn for_each_hash(array: &dyn Array, mut f: impl FnMut(usize, u64)) { + fn for_each_hash( + array: &dyn Array, + nulls: Option<&NullBuffer>, + mut f: impl FnMut(usize, u64), + ) { let array: &GenericBinaryArray = array.as_binary::(); for i in 0..array.len() { - if array.is_valid(i) { + if nulls.is_none_or(|n| n.is_valid(i)) { f(i, HLL_HASH_STATE.hash_one(array.value(i))); } } @@ -547,6 +587,29 @@ impl HllValueHasher for BinaryHasher { /// single vectorized pass (no per-group `take`/slice and no dynamic dispatch), /// and the sparse representation avoids allocating a 16 KiB sketch for every /// group when most groups only see a few distinct values. +/// +/// +/// # Example +/// +/// For `SELECT k, approx_distinct(v) FROM t GROUP BY k`, each group owns one +/// independent sketch: +/// +/// ```text +/// group state +/// a Sparse([h1, h2, h3, h2]) +/// b Dense(HLL registers) +/// ... +/// ``` +/// +/// Group `a` has fewer than [`SPARSE_LIMIT`] distinct hashes, so it stays in +/// the sparse representation. Before emitting state or estimating the count, the +/// hash list is sorted and deduplicated to `[h1, h2, h3]`, then those hashes are +/// interpreted exactly as if they had been added to a dense [`HyperLogLog`]. +/// +/// Group `b` has crossed the sparse limit, so its hashes have already been +/// replayed into a dense sketch. New values for `b` update the dense registers +/// directly, and serialized state is the raw [`NUM_REGISTERS`]-byte register +/// array. struct HllGroupsAccumulator { /// Per-group sketches, indexed by `group_index`. groups: Vec, @@ -589,16 +652,15 @@ impl GroupsAccumulator for HllGroupsAccumulator { self.ensure_groups(total_num_groups); let groups = &mut self.groups; let mut delta: isize = 0; - match opt_filter { - None => H::for_each_hash(values[0].as_ref(), |row, hash| { - delta += groups[group_indices[row]].add_hash(hash); - }), - Some(filter) => H::for_each_hash(values[0].as_ref(), |row, hash| { - if filter.value(row) { - delta += groups[group_indices[row]].add_hash(hash); - } - }), - } + // Pre-combine value-nulls and filter into one mask so the callback + // needs no per-row branching. + let filter_nulls = opt_filter.map(filter_to_nulls); + let value_nulls = values[0].logical_nulls(); + let combined_nulls = + NullBuffer::union(filter_nulls.as_ref(), value_nulls.as_ref()); + H::for_each_hash(values[0].as_ref(), combined_nulls.as_ref(), |row, hash| { + delta += groups[group_indices[row]].add_hash(hash); + }); self.apply_delta(delta); Ok(()) } @@ -607,18 +669,19 @@ impl GroupsAccumulator for HllGroupsAccumulator { &mut self, values: &[ArrayRef], group_indices: &[usize], + // Since aggregate filter should be applied in partial stage, in final stage there should be no filter opt_filter: Option<&BooleanArray>, total_num_groups: usize, ) -> Result<()> { + assert!( + opt_filter.is_none(), + "aggregate filter should be applied in partial stage, there should be no filter in final stage" + ); + self.ensure_groups(total_num_groups); let states = downcast_value!(values[0], BinaryArray); let mut delta: isize = 0; for (row, &group_index) in group_indices.iter().enumerate() { - if let Some(filter) = opt_filter - && !filter.value(row) - { - continue; - } if states.is_valid(row) { delta += self.groups[group_index].merge_serialized(states.value(row))?; } @@ -942,9 +1005,14 @@ fn is_hll_groups_type(data_type: &DataType) -> bool { | DataType::Int64 | DataType::Date32 | DataType::Date64 - | DataType::Time32(_) - | DataType::Time64(_) - | DataType::Timestamp(_, _) + | DataType::Time32(TimeUnit::Second) + | DataType::Time32(TimeUnit::Millisecond) + | DataType::Time64(TimeUnit::Microsecond) + | DataType::Time64(TimeUnit::Nanosecond) + | DataType::Timestamp(TimeUnit::Second, _) + | DataType::Timestamp(TimeUnit::Millisecond, _) + | DataType::Timestamp(TimeUnit::Microsecond, _) + | DataType::Timestamp(TimeUnit::Nanosecond, _) | DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View @@ -995,7 +1063,10 @@ mod groups_tests { ); assert_eq!(g.count(), reference_count(&hashes)); // sparse serialized state is far smaller than a dense 16 KiB sketch - assert!(serialize(&mut g).len() < NUM_REGISTERS); + // and must not exceed the sparse limit contract enforced by merge_serialized + let serialized = serialize(&mut g); + assert!(serialized.len() < NUM_REGISTERS); + assert!(serialized.len() <= SPARSE_LIMIT * size_of::()); } #[test] @@ -1054,4 +1125,31 @@ mod groups_tests { dst.merge_serialized(&bytes).unwrap(); assert_eq!(dst.count(), 0); } + + /// `approx_distinct(v) FILTER (WHERE nullable_bool)` — a NULL filter row + /// must not be counted (null filter is treated the same as false). + #[test] + fn update_batch_nullable_filter_excludes_null_filter_rows() { + use arrow::array::Int64Array; + use std::sync::Arc; + + let values: ArrayRef = Arc::new(Int64Array::from(vec![1i64, 2, 3, 4, 5])); + // row 0: filter=true, row 1: filter=NULL, row 2: filter=false, + // row 3: filter=NULL, row 4: filter=true + let filter = + BooleanArray::from(vec![Some(true), None, Some(false), None, Some(true)]); + + let mut acc = HllGroupsAccumulator::>::new(); + // put all rows in group 0 + let group_indices = vec![0usize; 5]; + acc.update_batch(&[values], &group_indices, Some(&filter), 1) + .unwrap(); + + // Only rows 0 and 4 (values 1 and 5) should be counted. + let result = acc.evaluate(EmitTo::All).unwrap(); + let counts = result.as_any().downcast_ref::().unwrap(); + // reference: hash 1 and 5 into a dense sketch + let expected = reference_count(&[h(1), h(5)]); + assert_eq!(counts.value(0), expected); + } } From 0b65bc96e61e21a1f1bf3f821a1240259bef9804 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 5 Jun 2026 21:09:30 +0800 Subject: [PATCH 6/7] update test case --- .../src/approx_distinct.rs | 55 ++++++++++++++++++- 1 file changed, 52 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 45e1277c52c70..111a130d3b710 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -449,7 +449,9 @@ impl GroupHll { /// Serialize the sketch into `scratch` (which is cleared first). A dense /// sketch is written as its raw [`NUM_REGISTERS`] registers (wire-compatible /// with the per-group [`Accumulator`]); a sparse sketch is written as its - /// distinct hashes in little-endian order. + /// distinct hashes in little-endian order unless it has crossed + /// [`SPARSE_LIMIT`], in which case it is emitted as dense state so the final + /// merge path accepts it. fn serialize(&mut self, scratch: &mut Vec) { scratch.clear(); match self { @@ -460,8 +462,17 @@ impl GroupHll { GroupHll::Sparse(v) => { v.sort_unstable(); v.dedup(); - for &h in v.iter() { - scratch.extend_from_slice(&h.to_le_bytes()); + if v.len() > SPARSE_LIMIT { + let mut hll = HyperLogLog::::new(); + for &h in v.iter() { + hll.add_hashed(h); + } + let registers: &[u8] = hll.as_ref(); + scratch.extend_from_slice(registers); + } else { + for &h in v.iter() { + scratch.extend_from_slice(&h.to_le_bytes()); + } } } } @@ -1095,6 +1106,44 @@ mod groups_tests { } } + #[test] + fn sparse_limit_group_serializes_as_mergeable_sparse_state() { + let hashes: Vec = (0..SPARSE_LIMIT as u64).map(h).collect(); + let mut src = GroupHll::default(); + for &hash in &hashes { + src.add_hash(hash); + } + assert!(matches!(src, GroupHll::Sparse(_))); + + let bytes = serialize(&mut src); + assert_eq!(bytes.len(), SPARSE_LIMIT * size_of::()); + + let mut dst = GroupHll::default(); + dst.merge_serialized(&bytes).unwrap(); + assert_eq!(dst.count(), reference_count(&hashes)); + } + + #[test] + fn medium_sparse_group_serializes_as_mergeable_dense_state() { + let n = SPARSE_LIMIT as u64 + 44; + let hashes: Vec = (0..n).map(h).collect(); + let mut src = GroupHll::default(); + for &hash in &hashes { + src.add_hash(hash); + } + assert!( + matches!(src, GroupHll::Sparse(_)), + "group should not promote during update before the compaction threshold" + ); + + let bytes = serialize(&mut src); + assert_eq!(bytes.len(), NUM_REGISTERS); + + let mut dst = GroupHll::default(); + dst.merge_serialized(&bytes).unwrap(); + assert_eq!(dst.count(), reference_count(&hashes)); + } + #[test] fn merge_combines_disjoint_groups() { // sparse + sparse, sparse + dense, dense + dense From 7bffb023c2165de63c0a1aa18f0be8f0580dacc6 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Fri, 5 Jun 2026 22:06:38 +0800 Subject: [PATCH 7/7] make code better --- .../src/approx_distinct.rs | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 111a130d3b710..3550035635647 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -330,6 +330,15 @@ impl Default for GroupHll { } } +/// Fold a slice of pre-computed hashes into a fresh [`HyperLogLog`] sketch. +fn fold_sparse_to_hll(hashes: &[u64]) -> HyperLogLog { + let mut hll = HyperLogLog::::new(); + for &h in hashes { + hll.add_hashed(h); + } + hll +} + impl GroupHll { /// Add a pre-computed hash, returning the change in heap-allocated bytes so /// the accumulator can track its memory usage incrementally. @@ -361,15 +370,15 @@ impl GroupHll { v.sort_unstable(); v.dedup(); if v.len() > SPARSE_LIMIT { - let mut hll = HyperLogLog::::new(); - for &h in v.iter() { - hll.add_hashed(h); - } - *self = GroupHll::Dense(Box::new(hll)); + // cap_before is the capacity already reflected in allocated_bytes. + // Any reallocation caused by the triggering push was never counted and + // is also freed here, so the two cancel out. + *self = GroupHll::Dense(Box::new(fold_sparse_to_hll(v))); (NUM_REGISTERS as isize) - ((cap_before * size_of::()) as isize) } else { - // capacity is unchanged by sort/dedup - 0 + // Account for any Vec growth caused by the triggering push. + // sort/dedup do not reallocate, so v.capacity() is the post-push capacity. + ((v.capacity() - cap_before) * size_of::()) as isize } } @@ -463,12 +472,7 @@ impl GroupHll { v.sort_unstable(); v.dedup(); if v.len() > SPARSE_LIMIT { - let mut hll = HyperLogLog::::new(); - for &h in v.iter() { - hll.add_hashed(h); - } - let registers: &[u8] = hll.as_ref(); - scratch.extend_from_slice(registers); + scratch.extend_from_slice(fold_sparse_to_hll(v).as_ref()); } else { for &h in v.iter() { scratch.extend_from_slice(&h.to_le_bytes());