diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 2c2b791f23657..db49a363b1733 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -213,7 +213,13 @@ impl FileGroupPartitioner { .iter() .map(|f| f.object_meta.size as i64) .sum::(); - if total_size < (repartition_file_min_size as i64) || total_size == 0 { + + // bail if we are asked to *split* a set of files that are already too small + // if we are being asked to consolidate, we proceed + if (total_size < (repartition_file_min_size as i64) + && target_partitions >= file_groups.len()) + || total_size == 0 + { return None; } @@ -228,30 +234,44 @@ impl FileGroupPartitioner { .scan( (current_partition_index, current_partition_size), |state, source_file| { - let mut produced_files = vec![]; - let mut range_start = 0; - while range_start < source_file.object_meta.size { - let range_end = min( - range_start + (target_partition_size - state.1), - source_file.object_meta.size, - ); - - let mut produced_file = source_file.clone(); - produced_file.range = Some(FileRange { - start: range_start as i64, - end: range_end as i64, - }); - produced_files.push((state.0, produced_file)); - - if state.1 + (range_end - range_start) >= target_partition_size { + // Skip splitting files smaller than repartition_file_min_size + // This may result in a few more partitions than requested (maybe 1 more) + if source_file.object_meta.size < repartition_file_min_size { + state.1 += source_file.object_meta.size; + if state.1 > target_partition_size { state.0 += 1; state.1 = 0; - } else { - state.1 += range_end - range_start; } - range_start = range_end; + let small_file = (state.0, source_file.clone()); + Some(vec![small_file]) + } else { + let mut produced_files = vec![]; + let mut range_start = 0; + while range_start < source_file.object_meta.size { + let range_end = min( + range_start + (target_partition_size - state.1), + source_file.object_meta.size, + ); + + let mut produced_file = source_file.clone(); + produced_file.range = Some(FileRange { + start: range_start as i64, + end: range_end as i64, + }); + produced_files.push((state.0, produced_file)); + + if state.1 + (range_end - range_start) + >= target_partition_size + { + state.0 += 1; + state.1 = 0; + } else { + state.1 += range_end - range_start; + } + range_start = range_end; + } + Some(produced_files) } - Some(produced_files) }, ) .flatten()