@@ -211,24 +211,12 @@ impl IPhysicalPlan for AggregatePartial {
211211 . cluster_with_partial ( true , builder. ctx . get_cluster ( ) . nodes . len ( ) )
212212 } ;
213213
214- // For rank limit, we can filter data using sort with rank before partial
215- if let Some ( rank_limit) = & self . rank_limit {
216- let sort_desc = rank_limit
217- . 0
218- . iter ( )
219- . map ( |desc| {
220- let offset = schema_before_group_by. index_of ( & desc. order_by . to_string ( ) ) ?;
221- Ok ( SortColumnDescription {
222- offset,
223- asc : desc. asc ,
224- nulls_first : desc. nulls_first ,
225- } )
226- } )
227- . collect :: < Result < Vec < _ > > > ( ) ?;
228- let sort_desc: Arc < [ _ ] > = sort_desc. into ( ) ;
229-
214+ // For rank limit, we can filter data using sort with rank before partial.
215+ if let Some ( ( sort_desc, limit) ) =
216+ self . resolve_rank_limit_descriptions ( & schema_before_group_by)
217+ {
230218 builder. main_pipeline . add_transformer ( || {
231- TransformSortPartial :: new ( LimitType :: LimitRank ( rank_limit . 1 ) , sort_desc. clone ( ) )
219+ TransformSortPartial :: new ( LimitType :: LimitRank ( limit ) , sort_desc. clone ( ) )
232220 } ) ;
233221 }
234222
@@ -283,3 +271,28 @@ impl IPhysicalPlan for AggregatePartial {
283271 Ok ( ( ) )
284272 }
285273}
274+
275+ impl AggregatePartial {
276+ fn resolve_rank_limit_descriptions (
277+ & self ,
278+ schema_before_group_by : & DataSchemaRef ,
279+ ) -> Option < ( Arc < [ SortColumnDescription ] > , usize ) > {
280+ let ( sort_descs, limit) = self . rank_limit . as_ref ( ) ?;
281+ let mut resolved = Vec :: with_capacity ( sort_descs. len ( ) ) ;
282+ for desc in sort_descs {
283+ let field_name = desc. order_by . to_string ( ) ;
284+ let offset = match schema_before_group_by. index_of ( & field_name) {
285+ Ok ( offset) => offset,
286+ Err ( _) => {
287+ return None ;
288+ }
289+ } ;
290+ resolved. push ( SortColumnDescription {
291+ offset,
292+ asc : desc. asc ,
293+ nulls_first : desc. nulls_first ,
294+ } ) ;
295+ }
296+ Some ( ( resolved. into ( ) , * limit) )
297+ }
298+ }
0 commit comments