@@ -221,6 +221,7 @@ fn attempt_place_unassigned_shards(
221221 place_unassigned_shards_single_source (
222222 source,
223223 indexers_with_most_available_capacity,
224+ problem. num_indexers ( ) ,
224225 & mut solution,
225226 ) ?;
226227 }
@@ -257,6 +258,7 @@ fn place_unassigned_shards_with_affinity(
257258 let _ = place_unassigned_shards_single_source (
258259 source,
259260 indexers_with_affinity_and_available_capacity,
261+ problem. num_indexers ( ) ,
260262 solution,
261263 ) ;
262264 }
@@ -348,15 +350,28 @@ struct NotEnoughCapacity;
348350fn place_unassigned_shards_single_source (
349351 source : & Source ,
350352 mut indexer_with_capacities : impl Iterator < Item = ( IndexerOrd , CpuCapacity ) > ,
353+ num_indexers : usize ,
351354 solution : & mut SchedulingSolution ,
352355) -> Result < ( ) , NotEnoughCapacity > {
353356 let mut num_shards = source. num_shards ;
357+ // To ensure that merges can keep up, try not to assign more than 3
358+ // shards per indexer for a source (except if there aren't enough nodes)
359+ let limit_num_shards_per_indexer_per_source = 3 . max ( num_shards. div_ceil ( num_indexers as u32 ) ) ;
354360 while num_shards > 0 {
355361 let Some ( ( indexer_ord, available_capacity) ) = indexer_with_capacities. next ( ) else {
356362 return Err ( NotEnoughCapacity ) ;
357363 } ;
358- let num_placable_shards = available_capacity. cpu_millis ( ) / source. load_per_shard ;
359- let num_shards_to_place = num_placable_shards. min ( num_shards) ;
364+ let current_num_shards_for_indexer_and_source = * solution. indexer_assignments [ indexer_ord]
365+ . num_shards_per_source
366+ . get ( & source. source_ord )
367+ . unwrap_or ( & 0 ) ;
368+ let num_placable_shards_for_available_capacity =
369+ available_capacity. cpu_millis ( ) / source. load_per_shard ;
370+ let num_placable_shards_for_limit = limit_num_shards_per_indexer_per_source
371+ . saturating_sub ( current_num_shards_for_indexer_and_source) ;
372+ let num_shards_to_place = num_shards
373+ . min ( num_placable_shards_for_available_capacity)
374+ . min ( num_placable_shards_for_limit) ;
360375 // Update the solution, the shard load, and the number of shards to place.
361376 if num_shards_to_place == 0u32 {
362377 // No need to fill indexer_assignments with empty assignments.
@@ -596,11 +611,15 @@ mod tests {
596611 problem. add_source ( 4 , NonZeroU32 :: new ( 1_000 ) . unwrap ( ) ) ;
597612 problem. add_source ( 4 , NonZeroU32 :: new ( 1_000 ) . unwrap ( ) ) ;
598613 problem. inc_affinity ( 0 , 1 ) ;
614+ problem. inc_affinity ( 0 , 1 ) ;
615+ problem. inc_affinity ( 0 , 0 ) ;
599616 problem. inc_affinity ( 1 , 0 ) ;
600617 let mut solution = problem. new_solution ( ) ;
601618 place_unassigned_shards_with_affinity ( & problem, & mut solution) ;
602- assert_eq ! ( solution. indexer_assignments[ 0 ] . num_shards( 1 ) , 4 ) ;
603- assert_eq ! ( solution. indexer_assignments[ 1 ] . num_shards( 0 ) , 4 ) ;
619+ assert_eq ! ( solution. indexer_assignments[ 0 ] . num_shards( 1 ) , 3 ) ;
620+ assert_eq ! ( solution. indexer_assignments[ 0 ] . num_shards( 0 ) , 1 ) ;
621+ assert_eq ! ( solution. indexer_assignments[ 1 ] . num_shards( 0 ) , 3 ) ;
622+ assert_eq ! ( solution. indexer_assignments[ 1 ] . num_shards( 1 ) , 0 ) ;
604623 }
605624
606625 #[ test]
0 commit comments