perf: improve approx_distinct performance 100x when there are fewer distinct values with many groups#22768
perf: improve approx_distinct performance 100x when there are fewer distinct values with many groups#22768haohuaijin wants to merge 4 commits into
Conversation
|
benchmark result |
There was a problem hiding this comment.
@haohuaijin
Thanks for the optimization here. I think there is one nullable filter case that needs to be fixed before this can land. I also left a few smaller suggestions around consistency and malformed state handling.
| delta += groups[group_indices[row]].add_hash(hash); | ||
| }), | ||
| Some(filter) => H::for_each_hash(values[0].as_ref(), |row, hash| { | ||
| if filter.value(row) { |
There was a problem hiding this comment.
I think this fast path needs to treat a NULL aggregate filter the same as false.
The generic adapter handled this through Arrow filter, but this path only checks filter.value(row). For a nullable boolean filter, the value bit can still be true on a null row, which would incorrectly add that row to approx_distinct.
Could we gate on validity too, for example filter.is_valid(row) && filter.value(row)? It would also be great to add a grouped approx_distinct(...) FILTER (WHERE nullable_bool) regression test with a null filter row.
There was a problem hiding this comment.
Nulls are checked inside for_each_hash now.
But I think we should first fold(bitwise or) the nulls array and opt_filter, then process row by row. This way we can make for_each_hash simpler, and also faster.
I vaguely remember there are some existing utility function/pattern to do so in other GroupsAccumulator implementation.
| let states = downcast_value!(values[0], BinaryArray); | ||
| let mut delta: isize = 0; | ||
| for (row, &group_index) in group_indices.iter().enumerate() { | ||
| if let Some(filter) = opt_filter |
There was a problem hiding this comment.
Same nullable filter concern here if merge_batch continues to accept opt_filter.
Could we skip rows when filter.is_null(row) || !filter.value(row)? If final-stage filters are not expected here, another option would be to assert opt_filter.is_none(), similar to some other aggregate implementations.
There was a problem hiding this comment.
I am also a bit confused for this API, what's the semantics for opt_filter? In update_filter it's quite obvious, they refer to the filter in the original query like avg(x) filter x>0, here can we assume they're always None, and for groups with Null state, it should be encoded in the values null mask?
I can't find the doc on trait, it seem to depend on implementation now. I suggest we can figure it out the existing practice and update the doc. (in follow up PR for sure)
| /// Returns true for the data types backed by the HyperLogLog | ||
| /// [`HllGroupsAccumulator`]. The fixed-domain types (booleans / small ints) and | ||
| /// `Null` fall back to the per-group [`Accumulator`] path. | ||
| fn is_hll_groups_type(data_type: &DataType) -> bool { |
There was a problem hiding this comment.
is_hll_groups_type looks a little broader than create_groups_accumulator. For example, it allows Time32(_) and Time64(_), while creation only accepts specific valid units.
Could we make this predicate exactly match the creation logic, or derive both from a shared helper? That would avoid groups_accumulator_supported() returning true for a type that creation later rejects.
| } else { | ||
| // capacity is unchanged by sort/dedup | ||
| 0 | ||
| } |
There was a problem hiding this comment.
merge_serialized should probably reject sparse states whose length is not a multiple of 8 in release builds too.
Right now this is only covered by debug_assert_eq!, and chunks_exact would silently drop trailing bytes if a malformed state reaches this boundary.
2010YOUY01
left a comment
There was a problem hiding this comment.
Thank you. I think this design achieves a good balance between performance and simplicity.
My only concern is have we handled groups with null value correct (see comment), otherwise LGTM.
| let other: HyperLogLog<u8> = bytes.try_into()?; | ||
| Ok(self.merge_dense(&other)) | ||
| } else { | ||
| debug_assert_eq!(bytes.len() % size_of::<u64>(), 0); |
There was a problem hiding this comment.
I suggest to also assert the serialized size here not exceeding sparse limit
| /// fallback for high-cardinality `GROUP BY`s: it processes the whole input in a | ||
| /// single vectorized pass (no per-group `take`/slice and no dynamic dispatch), | ||
| /// and the sparse representation avoids allocating a 16 KiB sketch for every | ||
| /// group when most groups only see a few distinct values. |
There was a problem hiding this comment.
| /// group when most groups only see a few distinct values. | |
| /// group when most groups only see a few distinct values. | |
| /// | |
| /// | |
| /// # Example | |
| /// | |
| /// For `SELECT k, approx_distinct(v) FROM t GROUP BY k`, each group owns one | |
| /// independent sketch: | |
| /// | |
| /// ```text | |
| /// group state | |
| /// a Sparse([h1, h2, h3, h2]) | |
| /// b Dense(HLL registers) | |
| /// ... | |
| /// ``` | |
| /// | |
| /// Group `a` has fewer than [`SPARSE_LIMIT`] distinct hashes, so it stays in | |
| /// the sparse representation. Before emitting state or estimating the count, the | |
| /// hash list is sorted and deduplicated to `[h1, h2, h3]`, then those hashes are | |
| /// interpreted exactly as if they had been added to a dense [`HyperLogLog`]. | |
| /// | |
| /// Group `b` has crossed the sparse limit, so its hashes have already been | |
| /// replayed into a dense sketch. New values for `b` update the dense registers | |
| /// directly, and serialized state is the raw [`NUM_REGISTERS`]-byte register | |
| /// array. |
| delta += groups[group_indices[row]].add_hash(hash); | ||
| }), | ||
| Some(filter) => H::for_each_hash(values[0].as_ref(), |row, hash| { | ||
| if filter.value(row) { |
There was a problem hiding this comment.
Nulls are checked inside for_each_hash now.
But I think we should first fold(bitwise or) the nulls array and opt_filter, then process row by row. This way we can make for_each_hash simpler, and also faster.
I vaguely remember there are some existing utility function/pattern to do so in other GroupsAccumulator implementation.
| let states = downcast_value!(values[0], BinaryArray); | ||
| let mut delta: isize = 0; | ||
| for (row, &group_index) in group_indices.iter().enumerate() { | ||
| if let Some(filter) = opt_filter |
There was a problem hiding this comment.
I am also a bit confused for this API, what's the semantics for opt_filter? In update_filter it's quite obvious, they refer to the filter in the original query like avg(x) filter x>0, here can we assume they're always None, and for groups with Null state, it should be encoded in the values null mask?
I can't find the doc on trait, it seem to depend on implementation now. I suggest we can figure it out the existing practice and update the doc. (in follow up PR for sure)
| for g in groups.iter_mut() { | ||
| freed += g.heap_bytes(); | ||
| g.serialize(&mut scratch); | ||
| builder.append_value(&scratch); |
There was a problem hiding this comment.
I'm wondering how is Null handled here
e.g. select k, approx_distinct(v) ..., and for group key a, the only row is NULL, should we return count as NULL for group 'a'
Which issue does this PR close?
apporx_distinctwhen each group do no have many distinct value #22767Rationale for this change
approx_distinctis very slow withGROUP BYon high-cardinality keys.On a dataset (~3.9M rows, ~512K groups), one file from the dataset describe in #22767
approx_count_distinct): ~0.1sThe reason is that
approx_distinctonly implementedAccumulator, notGroupsAccumulator. So grouped queries fell back toGroupsAccumulatorAdapter, which allocates a full 16 KiB HyperLogLog per group (~8 GB for 512K groups) and re-slices the input per group on every batch — even though most groups only see a few distinct values.What changes are included in this PR?
GroupsAccumulatorforapprox_distinctthat processes each batch in a single pass (no per-group slicing or dynamic dispatch).count_from_hashesso small groups are estimated directly from their stored hashes, avoiding a 16 KiB alloc + scan per group at output time.Nullkeep using the old path.Result on the query above: ~32.6s → ~0.12s (~270x, on par with DuckDB), with identical output.
Are these changes tested?
Yes.
aggregate.sltcases: groupedapprox_distinctoverUtf8,Utf8View, andInt32(small groups are exact), null-only groups (= 0), and a sparse→dense case (2000 distinct/group, within HyperLogLog error).aggregate.sltandaggregate_skip_partial.sltstill pass; clippy and fmt are clean.Are there any user-facing changes?
No API or result changes — only a large speedup for
approx_distinctwithGROUP BYon high-cardinality keys.