From 76016239f85ebc1cec4d0e5f3cc47ea0081745ee Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 7 Apr 2026 22:28:35 -0700 Subject: [PATCH 01/10] bitmap_smaller_datatypes --- .../src/approx_distinct.rs | 363 +++++++++++++++++- 1 file changed, 358 insertions(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 40da98c3eb3a2..6f590d9502ee2 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -18,7 +18,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog}; -use arrow::array::{Array, BinaryArray, StringViewArray}; +use arrow::array::{Array, AsArray, BinaryArray, BooleanArray, StringViewArray}; use arrow::array::{ GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; @@ -160,6 +160,358 @@ where } } +#[derive(Debug)] +struct BoolDistinctAccumulator { + seen_true: bool, + seen_false: bool, +} + +impl BoolDistinctAccumulator { + fn new() -> Self { + Self { + seen_true: false, + seen_false: false, + } + } +} + +impl Accumulator for BoolDistinctAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array: &BooleanArray = downcast_value!(values[0], BooleanArray); + for value in array.iter().flatten() { + if value { + self.seen_true = true; + } else { + self.seen_false = true; + } + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let count = (self.seen_true as u64) + (self.seen_false as u64); + Ok(ScalarValue::UInt64(Some(count))) + } + + fn size(&self) -> usize { + size_of::() + } + + fn state(&mut self) -> Result> { + // Pack into 1 byte: bit 0 = seen_false, bit 1 = seen_true + let packed = (self.seen_false as u8) | ((self.seen_true as u8) << 1); + Ok(vec![ScalarValue::Binary(Some(vec![packed]))]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if !data.is_empty() { + self.seen_false |= (data[0] & 1) != 0; + self.seen_true |= (data[0] & 2) != 0; + } + } + Ok(()) + } +} + +#[derive(Debug)] +struct Bitmap256Accumulator { + /// 256 bits = 4 x u64, tracks values 0-255 + bitmap: [u64; 4], +} + +impl Bitmap256Accumulator { + fn new() -> Self { + Self { bitmap: [0; 4] } + } + + #[inline] + fn set_bit(&mut self, value: u8) { + let word = (value / 64) as usize; + let bit = value % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> u64 { + self.bitmap.iter().map(|w| w.count_ones() as u64).sum() + } + + fn merge(&mut self, other: &[u64; 4]) { + for i in 0..4 { + self.bitmap[i] |= other[i]; + } + } +} + +impl Accumulator for Bitmap256Accumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 32 { + // Convert &[u8] to [u64; 4] + let mut other = [0u64; 4]; + for i in 0..4 { + let offset = i * 8; + other[i] = + u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); + } + self.merge(&other); + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + // Serialize [u64; 4] as 32 bytes + let mut bytes = Vec::with_capacity(32); + for word in &self.bitmap { + bytes.extend_from_slice(&word.to_le_bytes()); + } + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + } +} + +#[derive(Debug)] +struct Bitmap256AccumulatorI8 { + bitmap: [u64; 4], +} + +impl Bitmap256AccumulatorI8 { + fn new() -> Self { + Self { bitmap: [0; 4] } + } + + #[inline] + fn set_bit(&mut self, value: i8) { + // Convert i8 to u8 by reinterpreting bits + let idx = value as u8; + let word = (idx / 64) as usize; + let bit = idx % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> u64 { + self.bitmap.iter().map(|w| w.count_ones() as u64).sum() + } + + fn merge(&mut self, other: &[u64; 4]) { + for i in 0..4 { + self.bitmap[i] |= other[i]; + } + } +} + +impl Accumulator for Bitmap256AccumulatorI8 { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 32 { + let mut other = [0u64; 4]; + for i in 0..4 { + let offset = i * 8; + other[i] = + u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); + } + self.merge(&other); + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + let mut bytes = Vec::with_capacity(32); + for word in &self.bitmap { + bytes.extend_from_slice(&word.to_le_bytes()); + } + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + } +} + +/// Accumulator for u16 distinct counting using a 65536-bit bitmap +#[derive(Debug)] +struct Bitmap65536Accumulator { + /// 65536 bits = 1024 x u64, tracks values 0-65535 + bitmap: Box<[u64; 1024]>, +} + +impl Bitmap65536Accumulator { + fn new() -> Self { + Self { + bitmap: Box::new([0; 1024]), + } + } + + #[inline] + fn set_bit(&mut self, value: u16) { + let word = (value / 64) as usize; + let bit = value % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> u64 { + self.bitmap.iter().map(|w| w.count_ones() as u64).sum() + } + + fn merge(&mut self, other: &[u64; 1024]) { + for i in 0..1024 { + self.bitmap[i] |= other[i]; + } + } +} + +impl Accumulator for Bitmap65536Accumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 8192 { + let mut other = [0u64; 1024]; + for i in 0..1024 { + let offset = i * 8; + other[i] = + u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); + } + self.merge(&other); + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + let mut bytes = Vec::with_capacity(8192); + for word in self.bitmap.iter() { + bytes.extend_from_slice(&word.to_le_bytes()); + } + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + 8192 + } +} + +/// Accumulator for i16 distinct counting using a 65536-bit bitmap +#[derive(Debug)] +struct Bitmap65536AccumulatorI16 { + bitmap: Box<[u64; 1024]>, +} + +impl Bitmap65536AccumulatorI16 { + fn new() -> Self { + Self { + bitmap: Box::new([0; 1024]), + } + } + + #[inline] + fn set_bit(&mut self, value: i16) { + let idx = value as u16; + let word = (idx / 64) as usize; + let bit = idx % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> u64 { + self.bitmap.iter().map(|w| w.count_ones() as u64).sum() + } + + fn merge(&mut self, other: &[u64; 1024]) { + for i in 0..1024 { + self.bitmap[i] |= other[i]; + } + } +} + +impl Accumulator for Bitmap65536AccumulatorI16 { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 8192 { + let mut other = [0u64; 1024]; + for i in 0..1024 { + let offset = i * 8; + other[i] = + u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); + } + self.merge(&other); + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + let mut bytes = Vec::with_capacity(8192); + for word in self.bitmap.iter() { + bytes.extend_from_slice(&word.to_le_bytes()); + } + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + 8192 + } +} + macro_rules! default_accumulator_impl { () => { fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { @@ -344,12 +696,12 @@ impl AggregateUDFImpl for ApproxDistinct { // TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL // TODO support for boolean (trivial case) // https://github.com/apache/datafusion/issues/1109 - DataType::UInt8 => Box::new(NumericHLLAccumulator::::new()), - DataType::UInt16 => Box::new(NumericHLLAccumulator::::new()), + DataType::UInt8 => Box::new(Bitmap256Accumulator::new()), + DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()), DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int16 => Box::new(NumericHLLAccumulator::::new()), + DataType::Int8 => Box::new(Bitmap256AccumulatorI8::new()), + DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), DataType::Date32 => Box::new(NumericHLLAccumulator::::new()), @@ -383,6 +735,7 @@ impl AggregateUDFImpl for ApproxDistinct { DataType::Utf8View => Box::new(StringViewHLLAccumulator::new()), DataType::Binary => Box::new(BinaryHLLAccumulator::::new()), DataType::LargeBinary => Box::new(BinaryHLLAccumulator::::new()), + DataType::Boolean => Box::new(BoolDistinctAccumulator::new()), DataType::Null => { Box::new(NoopAccumulator::new(ScalarValue::UInt64(Some(0)))) } From b320233c477600d1b1066f0619d5d2bb6731b9b2 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 7 Apr 2026 22:43:29 -0700 Subject: [PATCH 02/10] bitmap_smaller_datatypes --- .../benches/approx_distinct.rs | 101 +++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index 9c22194e0384c..61650aaa2e3fa 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -17,7 +17,10 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, Int64Array, StringArray, StringViewArray}; +use arrow::array::{ + ArrayRef, BooleanArray, Int8Array, Int16Array, Int64Array, StringArray, + StringViewArray, UInt8Array, UInt16Array, +}; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::function::AccumulatorArgs; @@ -56,6 +59,45 @@ fn create_i64_array(n_distinct: usize) -> Int64Array { .collect() } +fn create_u8_array(n_distinct: usize) -> UInt8Array { + let mut rng = StdRng::seed_from_u64(42); + let max_val = n_distinct.min(256) as u8; + (0..BATCH_SIZE) + .map(|_| Some(rng.random_range(0..max_val))) + .collect() +} + +fn create_i8_array(n_distinct: usize) -> Int8Array { + let mut rng = StdRng::seed_from_u64(42); + let max_val = (n_distinct.min(256) / 2) as i8; + (0..BATCH_SIZE) + .map(|_| Some(rng.random_range(-max_val..max_val))) + .collect() +} + +fn create_u16_array(n_distinct: usize) -> UInt16Array { + let mut rng = StdRng::seed_from_u64(42); + let max_val = n_distinct.min(65536) as u16; + (0..BATCH_SIZE) + .map(|_| Some(rng.random_range(0..max_val))) + .collect() +} + +fn create_i16_array(n_distinct: usize) -> Int16Array { + let mut rng = StdRng::seed_from_u64(42); + let max_val = (n_distinct.min(65536) / 2) as i16; + (0..BATCH_SIZE) + .map(|_| Some(rng.random_range(-max_val..max_val))) + .collect() +} + +fn create_bool_array() -> BooleanArray { + let mut rng = StdRng::seed_from_u64(42); + (0..BATCH_SIZE) + .map(|_| Some(rng.random_bool(0.5))) + .collect() +} + /// Creates a pool of `n_distinct` random strings of the given length. fn create_string_pool(n_distinct: usize, string_length: usize) -> Vec { let mut rng = StdRng::seed_from_u64(42); @@ -133,6 +175,63 @@ fn approx_distinct_benchmark(c: &mut Criterion) { ); } } + + // --- Bitmap type benchmarks (our optimization) --- + + // UInt8 + let values = Arc::new(create_u8_array(200)) as ArrayRef; + c.bench_function("approx_distinct u8 bitmap", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::UInt8); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); + + // Int8 + let values = Arc::new(create_i8_array(200)) as ArrayRef; + c.bench_function("approx_distinct i8 bitmap", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::Int8); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); + + // UInt16 + let values = Arc::new(create_u16_array(50000)) as ArrayRef; + c.bench_function("approx_distinct u16 bitmap", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::UInt16); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); + + // Int16 + let values = Arc::new(create_i16_array(50000)) as ArrayRef; + c.bench_function("approx_distinct i16 bitmap", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::Int16); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); + + // Boolean + let values = Arc::new(create_bool_array()) as ArrayRef; + c.bench_function("approx_distinct bool bitmap", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::Boolean); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); } criterion_group!(benches, approx_distinct_benchmark); From ecee8eb38a13efae940b6210553cace3fbdd353a Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 7 Apr 2026 23:35:25 -0700 Subject: [PATCH 03/10] bitmap_instead_of_hll_smaller_datatypes --- .../benches/approx_distinct.rs | 20 +++++++++---------- .../src/approx_distinct.rs | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index 61650aaa2e3fa..d73794c4fd6ca 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -222,16 +222,16 @@ fn approx_distinct_benchmark(c: &mut Criterion) { }) }); - // Boolean - let values = Arc::new(create_bool_array()) as ArrayRef; - c.bench_function("approx_distinct bool bitmap", |b| { - b.iter(|| { - let mut accumulator = prepare_accumulator(DataType::Boolean); - accumulator - .update_batch(std::slice::from_ref(&values)) - .unwrap() - }) - }); + // // Boolean - commented out for main comparison (not supported on main) + // let values = Arc::new(create_bool_array()) as ArrayRef; + // c.bench_function("approx_distinct bool bitmap", |b| { + // b.iter(|| { + // let mut accumulator = prepare_accumulator(DataType::Boolean); + // accumulator + // .update_batch(std::slice::from_ref(&values)) + // .unwrap() + // }) + // }); } criterion_group!(benches, approx_distinct_benchmark); diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 6f590d9502ee2..04b27cc247965 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -228,8 +228,8 @@ impl Bitmap256Accumulator { #[inline] fn set_bit(&mut self, value: u8) { - let word = (value / 64) as usize; - let bit = value % 64; + let word = (value >> 6) as usize; + let bit = value & 63; self.bitmap[word] |= 1u64 << bit; } @@ -303,8 +303,8 @@ impl Bitmap256AccumulatorI8 { fn set_bit(&mut self, value: i8) { // Convert i8 to u8 by reinterpreting bits let idx = value as u8; - let word = (idx / 64) as usize; - let bit = idx % 64; + let word = (idx >> 6) as usize; + let bit = idx & 63; self.bitmap[word] |= 1u64 << bit; } From 47bb82e411421a0464da272da964b2df08870631 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 8 Apr 2026 00:00:42 -0700 Subject: [PATCH 04/10] bitmap_instead_of_hll_smaller_datatypes --- .../src/approx_distinct.rs | 171 ++---------------- 1 file changed, 11 insertions(+), 160 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 04b27cc247965..5858f295c0a9e 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -215,153 +215,6 @@ impl Accumulator for BoolDistinctAccumulator { } } -#[derive(Debug)] -struct Bitmap256Accumulator { - /// 256 bits = 4 x u64, tracks values 0-255 - bitmap: [u64; 4], -} - -impl Bitmap256Accumulator { - fn new() -> Self { - Self { bitmap: [0; 4] } - } - - #[inline] - fn set_bit(&mut self, value: u8) { - let word = (value >> 6) as usize; - let bit = value & 63; - self.bitmap[word] |= 1u64 << bit; - } - - #[inline] - fn count(&self) -> u64 { - self.bitmap.iter().map(|w| w.count_ones() as u64).sum() - } - - fn merge(&mut self, other: &[u64; 4]) { - for i in 0..4 { - self.bitmap[i] |= other[i]; - } - } -} - -impl Accumulator for Bitmap256Accumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.set_bit(value); - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 32 { - // Convert &[u8] to [u64; 4] - let mut other = [0u64; 4]; - for i in 0..4 { - let offset = i * 8; - other[i] = - u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - } - self.merge(&other); - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - // Serialize [u64; 4] as 32 bytes - let mut bytes = Vec::with_capacity(32); - for word in &self.bitmap { - bytes.extend_from_slice(&word.to_le_bytes()); - } - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() - } -} - -#[derive(Debug)] -struct Bitmap256AccumulatorI8 { - bitmap: [u64; 4], -} - -impl Bitmap256AccumulatorI8 { - fn new() -> Self { - Self { bitmap: [0; 4] } - } - - #[inline] - fn set_bit(&mut self, value: i8) { - // Convert i8 to u8 by reinterpreting bits - let idx = value as u8; - let word = (idx >> 6) as usize; - let bit = idx & 63; - self.bitmap[word] |= 1u64 << bit; - } - - #[inline] - fn count(&self) -> u64 { - self.bitmap.iter().map(|w| w.count_ones() as u64).sum() - } - - fn merge(&mut self, other: &[u64; 4]) { - for i in 0..4 { - self.bitmap[i] |= other[i]; - } - } -} - -impl Accumulator for Bitmap256AccumulatorI8 { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.set_bit(value); - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 32 { - let mut other = [0u64; 4]; - for i in 0..4 { - let offset = i * 8; - other[i] = - u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - } - self.merge(&other); - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let mut bytes = Vec::with_capacity(32); - for word in &self.bitmap { - bytes.extend_from_slice(&word.to_le_bytes()); - } - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() - } -} - /// Accumulator for u16 distinct counting using a 65536-bit bitmap #[derive(Debug)] struct Bitmap65536Accumulator { @@ -389,8 +242,8 @@ impl Bitmap65536Accumulator { } fn merge(&mut self, other: &[u64; 1024]) { - for i in 0..1024 { - self.bitmap[i] |= other[i]; + for (dst, src) in self.bitmap.iter_mut().zip(other.iter()) { + *dst |= src; } } } @@ -409,9 +262,9 @@ impl Accumulator for Bitmap65536Accumulator { for data in array.iter().flatten() { if data.len() == 8192 { let mut other = [0u64; 1024]; - for i in 0..1024 { + for (i, word) in other.iter_mut().enumerate() { let offset = i * 8; - other[i] = + *word = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); } self.merge(&other); @@ -464,8 +317,8 @@ impl Bitmap65536AccumulatorI16 { } fn merge(&mut self, other: &[u64; 1024]) { - for i in 0..1024 { - self.bitmap[i] |= other[i]; + for (dst, src) in self.bitmap.iter_mut().zip(other.iter()) { + *dst |= src; } } } @@ -484,9 +337,9 @@ impl Accumulator for Bitmap65536AccumulatorI16 { for data in array.iter().flatten() { if data.len() == 8192 { let mut other = [0u64; 1024]; - for i in 0..1024 { + for (i, word) in other.iter_mut().enumerate() { let offset = i * 8; - other[i] = + *word = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); } self.merge(&other); @@ -693,14 +546,12 @@ impl AggregateUDFImpl for ApproxDistinct { let data_type = acc_args.expr_fields[0].data_type(); let accumulator: Box = match data_type { - // TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL - // TODO support for boolean (trivial case) - // https://github.com/apache/datafusion/issues/1109 - DataType::UInt8 => Box::new(Bitmap256Accumulator::new()), + // Benchmarked HLL to be faster than bitmap for u8/i8 + DataType::UInt8 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()), DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(Bitmap256AccumulatorI8::new()), + DataType::Int8 => Box::new(NumericHLLAccumulator::::new()), DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), From 2291af5e45159213b43fa96af5ea6c0cc57f6933 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 8 Apr 2026 00:49:34 -0700 Subject: [PATCH 05/10] bitmap_instead_of_hll_smaller_datatypes --- .../benches/approx_distinct.rs | 22 +--- .../src/approx_distinct.rs | 118 +++++++++++++++++- 2 files changed, 117 insertions(+), 23 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index d73794c4fd6ca..25235f87fe031 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use arrow::array::{ - ArrayRef, BooleanArray, Int8Array, Int16Array, Int64Array, StringArray, - StringViewArray, UInt8Array, UInt16Array, + ArrayRef, Int8Array, Int16Array, Int64Array, StringArray, StringViewArray, + UInt8Array, UInt16Array, }; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; @@ -91,13 +91,6 @@ fn create_i16_array(n_distinct: usize) -> Int16Array { .collect() } -fn create_bool_array() -> BooleanArray { - let mut rng = StdRng::seed_from_u64(42); - (0..BATCH_SIZE) - .map(|_| Some(rng.random_bool(0.5))) - .collect() -} - /// Creates a pool of `n_distinct` random strings of the given length. fn create_string_pool(n_distinct: usize, string_length: usize) -> Vec { let mut rng = StdRng::seed_from_u64(42); @@ -221,17 +214,6 @@ fn approx_distinct_benchmark(c: &mut Criterion) { .unwrap() }) }); - - // // Boolean - commented out for main comparison (not supported on main) - // let values = Arc::new(create_bool_array()) as ArrayRef; - // c.bench_function("approx_distinct bool bitmap", |b| { - // b.iter(|| { - // let mut accumulator = prepare_accumulator(DataType::Boolean); - // accumulator - // .update_batch(std::slice::from_ref(&values)) - // .unwrap() - // }) - // }); } criterion_group!(benches, approx_distinct_benchmark); diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 5858f295c0a9e..428f72e3417ae 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -215,6 +215,118 @@ impl Accumulator for BoolDistinctAccumulator { } } +/// Accumulator for u8 distinct counting using a bool array +#[derive(Debug)] +struct BoolArray256Accumulator { + seen: Box<[bool; 256]>, +} + +impl BoolArray256Accumulator { + fn new() -> Self { + Self { + seen: Box::new([false; 256]), + } + } + + #[inline] + fn count(&self) -> u64 { + self.seen.iter().filter(|&&b| b).count() as u64 + } +} + +impl Accumulator for BoolArray256Accumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.seen[value as usize] = true; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 256 { + for (i, &b) in data.iter().enumerate() { + if b != 0 { + self.seen[i] = true; + } + } + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + let bytes: Vec = self.seen.iter().map(|&b| b as u8).collect(); + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + 256 + } +} + +/// Accumulator for i8 distinct counting using a bool array +#[derive(Debug)] +struct BoolArray256AccumulatorI8 { + seen: Box<[bool; 256]>, +} + +impl BoolArray256AccumulatorI8 { + fn new() -> Self { + Self { + seen: Box::new([false; 256]), + } + } + + #[inline] + fn count(&self) -> u64 { + self.seen.iter().filter(|&&b| b).count() as u64 + } +} + +impl Accumulator for BoolArray256AccumulatorI8 { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.seen[value as u8 as usize] = true; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 256 { + for (i, &b) in data.iter().enumerate() { + if b != 0 { + self.seen[i] = true; + } + } + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + let bytes: Vec = self.seen.iter().map(|&b| b as u8).collect(); + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + 256 + } +} + /// Accumulator for u16 distinct counting using a 65536-bit bitmap #[derive(Debug)] struct Bitmap65536Accumulator { @@ -546,12 +658,12 @@ impl AggregateUDFImpl for ApproxDistinct { let data_type = acc_args.expr_fields[0].data_type(); let accumulator: Box = match data_type { - // Benchmarked HLL to be faster than bitmap for u8/i8 - DataType::UInt8 => Box::new(NumericHLLAccumulator::::new()), + // Testing bool array for u8 + DataType::UInt8 => Box::new(BoolArray256Accumulator::new()), DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()), DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(NumericHLLAccumulator::::new()), + DataType::Int8 => Box::new(BoolArray256AccumulatorI8::new()), DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), From b03262bce9683bc8a518ddeb56daf069900d08b7 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 8 Apr 2026 13:11:08 -0700 Subject: [PATCH 06/10] init_fmt_check --- datafusion/functions-aggregate/benches/approx_distinct.rs | 2 +- datafusion/functions-aggregate/src/approx_distinct.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index 25235f87fe031..cc85c2163c180 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -169,7 +169,7 @@ fn approx_distinct_benchmark(c: &mut Criterion) { } } - // --- Bitmap type benchmarks (our optimization) --- + // Small integer types // UInt8 let values = Arc::new(create_u8_array(200)) as ArrayRef; diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 428f72e3417ae..d0f736596d68e 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -658,7 +658,7 @@ impl AggregateUDFImpl for ApproxDistinct { let data_type = acc_args.expr_fields[0].data_type(); let accumulator: Box = match data_type { - // Testing bool array for u8 + // Use bitmap accumulators for small integer types DataType::UInt8 => Box::new(BoolArray256Accumulator::new()), DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()), DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), From 8a95f3f8e825c6f6d416ea8ea43b4bcda8726441 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Fri, 17 Apr 2026 17:05:19 -0700 Subject: [PATCH 07/10] use_count_distinct_bitmap_instead_of_dupe_code --- .../src/approx_distinct.rs | 440 +++++------------- 1 file changed, 113 insertions(+), 327 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index d0f736596d68e..78d197b4b293e 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -18,16 +18,15 @@ //! Defines physical expressions that can evaluated at runtime during query execution use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog}; -use arrow::array::{Array, AsArray, BinaryArray, BooleanArray, StringViewArray}; +use arrow::array::{Array, BinaryArray, BooleanArray, StringViewArray}; use arrow::array::{ GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; use arrow::datatypes::{ - ArrowPrimitiveType, Date32Type, Date64Type, FieldRef, Int8Type, Int16Type, Int32Type, - Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, - Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, - UInt64Type, + ArrowPrimitiveType, Date32Type, Date64Type, FieldRef, Int32Type, Int64Type, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, UInt32Type, UInt64Type, }; use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field}; use datafusion_common::ScalarValue; @@ -40,11 +39,16 @@ use datafusion_expr::utils::format_state_name; use datafusion_expr::{ Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility, }; +use datafusion_functions_aggregate_common::aggregate::count_distinct::{ + Bitmap65536DistinctCountAccumulator, Bitmap65536DistinctCountAccumulatorI16, + BoolArray256DistinctCountAccumulator, BoolArray256DistinctCountAccumulatorI8, +}; use datafusion_functions_aggregate_common::noop_accumulator::NoopAccumulator; use datafusion_macros::user_doc; use std::fmt::{Debug, Formatter}; use std::hash::{BuildHasher, Hash}; use std::marker::PhantomData; +use std::mem::size_of_val; make_udaf_expr_and_func!( ApproxDistinct, @@ -84,6 +88,36 @@ impl TryFrom<&ScalarValue> for HyperLogLog { } } +#[derive(Debug)] +struct ApproxDistinctBitmapWrapper { + inner: A, +} + +impl Accumulator for ApproxDistinctBitmapWrapper { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + self.inner.update_batch(values) + } + + fn evaluate(&mut self) -> Result { + match self.inner.evaluate()? { + ScalarValue::Int64(Some(v)) => Ok(ScalarValue::UInt64(Some(v as u64))), + other => internal_err!("unexpected: {other}"), + } + } + + fn size(&self) -> usize { + self.inner.size() + } + + fn state(&mut self) -> Result> { + self.inner.state() + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.inner.merge_batch(states) + } +} + #[derive(Debug)] struct NumericHLLAccumulator where @@ -160,323 +194,6 @@ where } } -#[derive(Debug)] -struct BoolDistinctAccumulator { - seen_true: bool, - seen_false: bool, -} - -impl BoolDistinctAccumulator { - fn new() -> Self { - Self { - seen_true: false, - seen_false: false, - } - } -} - -impl Accumulator for BoolDistinctAccumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array: &BooleanArray = downcast_value!(values[0], BooleanArray); - for value in array.iter().flatten() { - if value { - self.seen_true = true; - } else { - self.seen_false = true; - } - } - Ok(()) - } - - fn evaluate(&mut self) -> Result { - let count = (self.seen_true as u64) + (self.seen_false as u64); - Ok(ScalarValue::UInt64(Some(count))) - } - - fn size(&self) -> usize { - size_of::() - } - - fn state(&mut self) -> Result> { - // Pack into 1 byte: bit 0 = seen_false, bit 1 = seen_true - let packed = (self.seen_false as u8) | ((self.seen_true as u8) << 1); - Ok(vec![ScalarValue::Binary(Some(vec![packed]))]) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if !data.is_empty() { - self.seen_false |= (data[0] & 1) != 0; - self.seen_true |= (data[0] & 2) != 0; - } - } - Ok(()) - } -} - -/// Accumulator for u8 distinct counting using a bool array -#[derive(Debug)] -struct BoolArray256Accumulator { - seen: Box<[bool; 256]>, -} - -impl BoolArray256Accumulator { - fn new() -> Self { - Self { - seen: Box::new([false; 256]), - } - } - - #[inline] - fn count(&self) -> u64 { - self.seen.iter().filter(|&&b| b).count() as u64 - } -} - -impl Accumulator for BoolArray256Accumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.seen[value as usize] = true; - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 256 { - for (i, &b) in data.iter().enumerate() { - if b != 0 { - self.seen[i] = true; - } - } - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let bytes: Vec = self.seen.iter().map(|&b| b as u8).collect(); - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() + 256 - } -} - -/// Accumulator for i8 distinct counting using a bool array -#[derive(Debug)] -struct BoolArray256AccumulatorI8 { - seen: Box<[bool; 256]>, -} - -impl BoolArray256AccumulatorI8 { - fn new() -> Self { - Self { - seen: Box::new([false; 256]), - } - } - - #[inline] - fn count(&self) -> u64 { - self.seen.iter().filter(|&&b| b).count() as u64 - } -} - -impl Accumulator for BoolArray256AccumulatorI8 { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.seen[value as u8 as usize] = true; - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 256 { - for (i, &b) in data.iter().enumerate() { - if b != 0 { - self.seen[i] = true; - } - } - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let bytes: Vec = self.seen.iter().map(|&b| b as u8).collect(); - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() + 256 - } -} - -/// Accumulator for u16 distinct counting using a 65536-bit bitmap -#[derive(Debug)] -struct Bitmap65536Accumulator { - /// 65536 bits = 1024 x u64, tracks values 0-65535 - bitmap: Box<[u64; 1024]>, -} - -impl Bitmap65536Accumulator { - fn new() -> Self { - Self { - bitmap: Box::new([0; 1024]), - } - } - - #[inline] - fn set_bit(&mut self, value: u16) { - let word = (value / 64) as usize; - let bit = value % 64; - self.bitmap[word] |= 1u64 << bit; - } - - #[inline] - fn count(&self) -> u64 { - self.bitmap.iter().map(|w| w.count_ones() as u64).sum() - } - - fn merge(&mut self, other: &[u64; 1024]) { - for (dst, src) in self.bitmap.iter_mut().zip(other.iter()) { - *dst |= src; - } - } -} - -impl Accumulator for Bitmap65536Accumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.set_bit(value); - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 8192 { - let mut other = [0u64; 1024]; - for (i, word) in other.iter_mut().enumerate() { - let offset = i * 8; - *word = - u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - } - self.merge(&other); - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let mut bytes = Vec::with_capacity(8192); - for word in self.bitmap.iter() { - bytes.extend_from_slice(&word.to_le_bytes()); - } - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() + 8192 - } -} - -/// Accumulator for i16 distinct counting using a 65536-bit bitmap -#[derive(Debug)] -struct Bitmap65536AccumulatorI16 { - bitmap: Box<[u64; 1024]>, -} - -impl Bitmap65536AccumulatorI16 { - fn new() -> Self { - Self { - bitmap: Box::new([0; 1024]), - } - } - - #[inline] - fn set_bit(&mut self, value: i16) { - let idx = value as u16; - let word = (idx / 64) as usize; - let bit = idx % 64; - self.bitmap[word] |= 1u64 << bit; - } - - #[inline] - fn count(&self) -> u64 { - self.bitmap.iter().map(|w| w.count_ones() as u64).sum() - } - - fn merge(&mut self, other: &[u64; 1024]) { - for (dst, src) in self.bitmap.iter_mut().zip(other.iter()) { - *dst |= src; - } - } -} - -impl Accumulator for Bitmap65536AccumulatorI16 { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.set_bit(value); - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 8192 { - let mut other = [0u64; 1024]; - for (i, word) in other.iter_mut().enumerate() { - let offset = i * 8; - *word = - u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - } - self.merge(&other); - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let mut bytes = Vec::with_capacity(8192); - for word in self.bitmap.iter() { - bytes.extend_from_slice(&word.to_le_bytes()); - } - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() + 8192 - } -} - macro_rules! default_accumulator_impl { () => { fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { @@ -586,6 +303,67 @@ impl Debug for ApproxDistinct { } } +/// Optimized accumulator for Boolean type - only 2 possible distinct values. +#[derive(Debug)] +struct BoolDistinctAccumulator { + seen_true: bool, + seen_false: bool, +} + +impl BoolDistinctAccumulator { + fn new() -> Self { + Self { + seen_true: false, + seen_false: false, + } + } +} + +impl Accumulator for BoolDistinctAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let array: &BooleanArray = downcast_value!(values[0], BooleanArray); + for value in array.iter().flatten() { + if value { + self.seen_true = true; + } else { + self.seen_false = true; + } + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let count = self.seen_true as u64 + self.seen_false as u64; + Ok(ScalarValue::UInt64(Some(count))) + } + + fn size(&self) -> usize { + size_of_val(self) + } + + fn state(&mut self) -> Result> { + // State: two booleans packed into a binary + let state = vec![self.seen_false as u8, self.seen_true as u8]; + Ok(vec![ScalarValue::Binary(Some(state))]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + assert_eq!(1, states.len(), "expect only 1 element in the states"); + let binary_array = downcast_value!(states[0], BinaryArray); + for v in binary_array.iter().flatten() { + if v.len() >= 2 { + self.seen_false = self.seen_false || v[0] != 0; + self.seen_true = self.seen_true || v[1] != 0; + } + } + Ok(()) + } +} + impl Default for ApproxDistinct { fn default() -> Self { Self::new() @@ -659,12 +437,20 @@ impl AggregateUDFImpl for ApproxDistinct { let accumulator: Box = match data_type { // Use bitmap accumulators for small integer types - DataType::UInt8 => Box::new(BoolArray256Accumulator::new()), - DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()), + DataType::UInt8 => Box::new(ApproxDistinctBitmapWrapper { + inner: BoolArray256DistinctCountAccumulator::new(), + }), + DataType::UInt16 => Box::new(ApproxDistinctBitmapWrapper { + inner: Bitmap65536DistinctCountAccumulator::new(), + }), DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(BoolArray256AccumulatorI8::new()), - DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()), + DataType::Int8 => Box::new(ApproxDistinctBitmapWrapper { + inner: BoolArray256DistinctCountAccumulatorI8::new(), + }), + DataType::Int16 => Box::new(ApproxDistinctBitmapWrapper { + inner: Bitmap65536DistinctCountAccumulatorI16::new(), + }), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), DataType::Date32 => Box::new(NumericHLLAccumulator::::new()), From 63452483a9ebf9d96dcd3f392885b177965f0887 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Fri, 17 Apr 2026 22:14:40 -0700 Subject: [PATCH 08/10] update_state_fields --- .../src/approx_distinct.rs | 44 ++++++++++++++++--- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 78d197b4b293e..0a4283172884d 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -411,24 +411,56 @@ impl AggregateUDFImpl for ApproxDistinct { } fn state_fields(&self, args: StateFieldsArgs) -> Result> { - if args.input_fields[0].data_type().is_null() { - Ok(vec![ + let data_type = args.input_fields[0].data_type(); + match data_type { + DataType::Null => Ok(vec![ Field::new( format_state_name(args.name, self.name()), DataType::Null, true, ) .into(), - ]) - } else { - Ok(vec![ + ]), + DataType::UInt8 => Ok(vec![ + Field::new_list( + format_state_name(args.name, "approx_distinct"), + Field::new_list_field(DataType::UInt8, true), + false, + ) + .into(), + ]), + DataType::Int8 => Ok(vec![ + Field::new_list( + format_state_name(args.name, "approx_distinct"), + Field::new_list_field(DataType::Int8, true), + false, + ) + .into(), + ]), + DataType::UInt16 => Ok(vec![ + Field::new_list( + format_state_name(args.name, "approx_distinct"), + Field::new_list_field(DataType::UInt16, true), + false, + ) + .into(), + ]), + DataType::Int16 => Ok(vec![ + Field::new_list( + format_state_name(args.name, "approx_distinct"), + Field::new_list_field(DataType::Int16, true), + false, + ) + .into(), + ]), + _ => Ok(vec![ Field::new( format_state_name(args.name, "hll_registers"), DataType::Binary, false, ) .into(), - ]) + ]), } } From e0f2aa9cd78305f33aa9ca3145c503263968c876 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Fri, 17 Apr 2026 23:15:52 -0700 Subject: [PATCH 09/10] update_state_fields --- .../src/approx_distinct.rs | 84 +++++++++---------- 1 file changed, 39 insertions(+), 45 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 0a4283172884d..3471f952d22ba 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -397,6 +397,39 @@ impl ApproxDistinct { } } +#[cold] +fn get_small_int_approx_accumulator( + data_type: &DataType, +) -> Result> { + match data_type { + DataType::UInt8 => Ok(Box::new(ApproxDistinctBitmapWrapper { + inner: BoolArray256DistinctCountAccumulator::new(), + })), + DataType::Int8 => Ok(Box::new(ApproxDistinctBitmapWrapper { + inner: BoolArray256DistinctCountAccumulatorI8::new(), + })), + DataType::UInt16 => Ok(Box::new(ApproxDistinctBitmapWrapper { + inner: Bitmap65536DistinctCountAccumulator::new(), + })), + DataType::Int16 => Ok(Box::new(ApproxDistinctBitmapWrapper { + inner: Bitmap65536DistinctCountAccumulatorI16::new(), + })), + _ => internal_err!("unsupported small int type: {}", data_type), + } +} + +#[cold] +fn get_small_int_state_field(name: &str, data_type: &DataType) -> Result> { + Ok(vec![ + Field::new_list( + format_state_name(name, "approx_distinct"), + Field::new_list_field(data_type.clone(), true), + false, + ) + .into(), + ]) +} + impl AggregateUDFImpl for ApproxDistinct { fn name(&self) -> &str { "approx_distinct" @@ -421,38 +454,9 @@ impl AggregateUDFImpl for ApproxDistinct { ) .into(), ]), - DataType::UInt8 => Ok(vec![ - Field::new_list( - format_state_name(args.name, "approx_distinct"), - Field::new_list_field(DataType::UInt8, true), - false, - ) - .into(), - ]), - DataType::Int8 => Ok(vec![ - Field::new_list( - format_state_name(args.name, "approx_distinct"), - Field::new_list_field(DataType::Int8, true), - false, - ) - .into(), - ]), - DataType::UInt16 => Ok(vec![ - Field::new_list( - format_state_name(args.name, "approx_distinct"), - Field::new_list_field(DataType::UInt16, true), - false, - ) - .into(), - ]), - DataType::Int16 => Ok(vec![ - Field::new_list( - format_state_name(args.name, "approx_distinct"), - Field::new_list_field(DataType::Int16, true), - false, - ) - .into(), - ]), + DataType::UInt8 | DataType::Int8 | DataType::UInt16 | DataType::Int16 => { + get_small_int_state_field(args.name, data_type) + } _ => Ok(vec![ Field::new( format_state_name(args.name, "hll_registers"), @@ -468,21 +472,11 @@ impl AggregateUDFImpl for ApproxDistinct { let data_type = acc_args.expr_fields[0].data_type(); let accumulator: Box = match data_type { - // Use bitmap accumulators for small integer types - DataType::UInt8 => Box::new(ApproxDistinctBitmapWrapper { - inner: BoolArray256DistinctCountAccumulator::new(), - }), - DataType::UInt16 => Box::new(ApproxDistinctBitmapWrapper { - inner: Bitmap65536DistinctCountAccumulator::new(), - }), + DataType::UInt8 | DataType::Int8 | DataType::UInt16 | DataType::Int16 => { + return get_small_int_approx_accumulator(data_type); + } DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(ApproxDistinctBitmapWrapper { - inner: BoolArray256DistinctCountAccumulatorI8::new(), - }), - DataType::Int16 => Box::new(ApproxDistinctBitmapWrapper { - inner: Bitmap65536DistinctCountAccumulatorI16::new(), - }), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), DataType::Date32 => Box::new(NumericHLLAccumulator::::new()), From c16592dfb764430ff83d83b6719dc6aa57ffdfb5 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sat, 18 Apr 2026 08:46:42 -0700 Subject: [PATCH 10/10] remove_bool_accumulator --- .../src/approx_distinct.rs | 65 +------------------ 1 file changed, 1 insertion(+), 64 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 3471f952d22ba..cc42b6c22bdbe 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -18,7 +18,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog}; -use arrow::array::{Array, BinaryArray, BooleanArray, StringViewArray}; +use arrow::array::{Array, BinaryArray, StringViewArray}; use arrow::array::{ GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; @@ -48,7 +48,6 @@ use datafusion_macros::user_doc; use std::fmt::{Debug, Formatter}; use std::hash::{BuildHasher, Hash}; use std::marker::PhantomData; -use std::mem::size_of_val; make_udaf_expr_and_func!( ApproxDistinct, @@ -303,67 +302,6 @@ impl Debug for ApproxDistinct { } } -/// Optimized accumulator for Boolean type - only 2 possible distinct values. -#[derive(Debug)] -struct BoolDistinctAccumulator { - seen_true: bool, - seen_false: bool, -} - -impl BoolDistinctAccumulator { - fn new() -> Self { - Self { - seen_true: false, - seen_false: false, - } - } -} - -impl Accumulator for BoolDistinctAccumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - let array: &BooleanArray = downcast_value!(values[0], BooleanArray); - for value in array.iter().flatten() { - if value { - self.seen_true = true; - } else { - self.seen_false = true; - } - } - Ok(()) - } - - fn evaluate(&mut self) -> Result { - let count = self.seen_true as u64 + self.seen_false as u64; - Ok(ScalarValue::UInt64(Some(count))) - } - - fn size(&self) -> usize { - size_of_val(self) - } - - fn state(&mut self) -> Result> { - // State: two booleans packed into a binary - let state = vec![self.seen_false as u8, self.seen_true as u8]; - Ok(vec![ScalarValue::Binary(Some(state))]) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - assert_eq!(1, states.len(), "expect only 1 element in the states"); - let binary_array = downcast_value!(states[0], BinaryArray); - for v in binary_array.iter().flatten() { - if v.len() >= 2 { - self.seen_false = self.seen_false || v[0] != 0; - self.seen_true = self.seen_true || v[1] != 0; - } - } - Ok(()) - } -} - impl Default for ApproxDistinct { fn default() -> Self { Self::new() @@ -510,7 +448,6 @@ impl AggregateUDFImpl for ApproxDistinct { DataType::Utf8View => Box::new(StringViewHLLAccumulator::new()), DataType::Binary => Box::new(BinaryHLLAccumulator::::new()), DataType::LargeBinary => Box::new(BinaryHLLAccumulator::::new()), - DataType::Boolean => Box::new(BoolDistinctAccumulator::new()), DataType::Null => { Box::new(NoopAccumulator::new(ScalarValue::UInt64(Some(0)))) }