@@ -47,10 +47,11 @@ use crate::pipelines::processors::transforms::hash_join::util::min_max_filter;
4747///
4848/// Each runtime filter (identified by packet.id) is built once and then applied to multiple scans.
4949/// The probe_targets in RuntimeFilterDesc specify all (probe_key, scan_id) pairs where this filter should be applied.
50- pub fn build_runtime_filter_infos (
50+ pub async fn build_runtime_filter_infos (
5151 packet : JoinRuntimeFilterPacket ,
5252 runtime_filter_descs : HashMap < usize , & RuntimeFilterDesc > ,
5353 selectivity_threshold : u64 ,
54+ max_threads : usize ,
5455) -> Result < HashMap < usize , RuntimeFilterInfo > > {
5556 let total_build_rows = packet. build_rows ;
5657 let Some ( packets) = packet. packets else {
@@ -73,7 +74,7 @@ pub fn build_runtime_filter_infos(
7374 probe_expr : probe_key. clone ( ) ,
7475 bloom : if enabled {
7576 if let Some ( ref bloom) = packet. bloom {
76- Some ( build_bloom_filter ( bloom. clone ( ) , probe_key) ?)
77+ Some ( build_bloom_filter ( bloom. clone ( ) , probe_key, max_threads ) . await ?)
7778 } else {
7879 None
7980 }
@@ -243,15 +244,82 @@ fn build_min_max_filter(
243244 Ok ( min_max_filter)
244245}
245246
246- fn build_bloom_filter ( bloom : Vec < u64 > , probe_key : & Expr < String > ) -> Result < RuntimeFilterBloom > {
247+ async fn build_bloom_filter (
248+ bloom : Vec < u64 > ,
249+ probe_key : & Expr < String > ,
250+ max_threads : usize ,
251+ ) -> Result < RuntimeFilterBloom > {
247252 let probe_key = probe_key. as_column_ref ( ) . unwrap ( ) ;
248- let filter = BloomFilter :: with_false_pos ( 0.01 ) . items ( bloom) ;
253+ let column_name = probe_key. id . to_string ( ) ;
254+ let total_items = bloom. len ( ) ;
255+
256+ if total_items < 50000 {
257+ let filter = BloomFilter :: with_false_pos ( 0.01 ) . items ( bloom) ;
258+ return Ok ( RuntimeFilterBloom {
259+ column_name,
260+ filter,
261+ } ) ;
262+ }
263+
264+ let chunk_size = total_items. div_ceil ( max_threads) ;
265+
266+ let chunks: Vec < Vec < u64 > > = bloom
267+ . chunks ( chunk_size)
268+ . map ( |chunk| chunk. to_vec ( ) )
269+ . collect ( ) ;
270+
271+ let tasks: Vec < _ > = chunks
272+ . into_iter ( )
273+ . map ( |chunk| {
274+ databend_common_base:: runtime:: spawn ( async move {
275+ let mut filter = BloomFilter :: with_false_pos ( 0.01 ) . expected_items ( total_items) ;
276+ for hash in chunk {
277+ filter. insert_hash ( hash) ;
278+ }
279+ Ok :: < BloomFilter , ErrorCode > ( filter)
280+ } )
281+ } )
282+ . collect ( ) ;
283+
284+ let task_results = futures:: future:: join_all ( tasks) . await ;
285+
286+ let filters: Vec < BloomFilter > = task_results
287+ . into_iter ( )
288+ . map ( |r| r. expect ( "Task panicked" ) )
289+ . collect :: < Result < Vec < _ > > > ( ) ?;
290+
291+ let merged_filter = merge_bloom_filters_tree ( filters) ;
292+
249293 Ok ( RuntimeFilterBloom {
250- column_name : probe_key . id . to_string ( ) ,
251- filter,
294+ column_name,
295+ filter : merged_filter ,
252296 } )
253297}
254298
299+ fn merge_bloom_filters_tree ( mut filters : Vec < BloomFilter > ) -> BloomFilter {
300+ if filters. is_empty ( ) {
301+ return BloomFilter :: with_false_pos ( 0.01 ) . expected_items ( 1 ) ;
302+ }
303+
304+ while filters. len ( ) > 1 {
305+ let mut next_level = Vec :: new ( ) ;
306+ let mut iter = filters. into_iter ( ) ;
307+
308+ while let Some ( mut left) = iter. next ( ) {
309+ if let Some ( right) = iter. next ( ) {
310+ left. union ( & right) ;
311+ next_level. push ( left) ;
312+ } else {
313+ next_level. push ( left) ;
314+ }
315+ }
316+
317+ filters = next_level;
318+ }
319+
320+ filters. pop ( ) . unwrap ( )
321+ }
322+
255323#[ cfg( test) ]
256324mod tests {
257325 use std:: collections:: HashMap ;
0 commit comments