Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions datafusion/datasource/src/file_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,13 @@ impl FileGroupPartitioner {
.iter()
.map(|f| f.object_meta.size as i64)
.sum::<i64>();
if total_size < (repartition_file_min_size as i64) || total_size == 0 {

// bail if we are asked to *split* a set of files that are already too small
// if we are being asked to consolidate, we proceed
if (total_size < (repartition_file_min_size as i64)
&& target_partitions >= file_groups.len())
|| total_size == 0
{
return None;
}

Expand All @@ -228,30 +234,44 @@ impl FileGroupPartitioner {
.scan(
(current_partition_index, current_partition_size),
|state, source_file| {
let mut produced_files = vec![];
let mut range_start = 0;
while range_start < source_file.object_meta.size {
let range_end = min(
range_start + (target_partition_size - state.1),
source_file.object_meta.size,
);

let mut produced_file = source_file.clone();
produced_file.range = Some(FileRange {
start: range_start as i64,
end: range_end as i64,
});
produced_files.push((state.0, produced_file));

if state.1 + (range_end - range_start) >= target_partition_size {
// Skip splitting files smaller than repartition_file_min_size
// This may result in a few more partitions than requested (maybe 1 more)
if source_file.object_meta.size < repartition_file_min_size {
state.1 += source_file.object_meta.size;
if state.1 > target_partition_size {
state.0 += 1;
state.1 = 0;
} else {
state.1 += range_end - range_start;
}
range_start = range_end;
let small_file = (state.0, source_file.clone());
Some(vec![small_file])
} else {
let mut produced_files = vec![];
let mut range_start = 0;
while range_start < source_file.object_meta.size {
let range_end = min(
range_start + (target_partition_size - state.1),
source_file.object_meta.size,
);

let mut produced_file = source_file.clone();
produced_file.range = Some(FileRange {
start: range_start as i64,
end: range_end as i64,
});
produced_files.push((state.0, produced_file));

if state.1 + (range_end - range_start)
>= target_partition_size
{
state.0 += 1;
state.1 = 0;
} else {
state.1 += range_end - range_start;
}
range_start = range_end;
}
Some(produced_files)
}
Some(produced_files)
},
)
.flatten()
Expand Down
Loading