Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 56 additions & 51 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1319,19 +1319,33 @@ impl DisplayAs for FileScanConfig {
}
}

/// Get the indices of columns in a projection if the projection is a simple
/// list of columns.
/// If there are any expressions other than columns, returns None.
fn ordered_column_indices_from_projection(
/// Build a projection index mapping for the sort columns in `ordering`.
///
/// Returns a `Vec<usize>` of the same length as `projection`, where each entry
/// maps a projected-schema column index to the corresponding table-schema column
/// index. Only the entries referenced by sort columns in `ordering` are required
/// to be simple `Column` expressions; non-sort-column positions are filled with a
/// placeholder (0) since they will never be accessed by `MinMaxStatistics`.
///
/// Returns `None` if any sort column is not a simple `Column` reference in the
/// projected ordering, or if its corresponding projection expression is not a
/// simple `Column`.
fn resolve_sort_column_projection(
ordering: &LexOrdering,
projection: &ProjectionExprs,
) -> Option<Vec<usize>> {
projection
.expr_iter()
.map(|e| {
let index = e.as_any().downcast_ref::<Column>()?.index();
Some(index)
})
.collect::<Option<Vec<usize>>>()
let proj_slice = projection.as_ref();
let mut indices = vec![0usize; proj_slice.len()];

for sort_expr in ordering.iter() {
let col = sort_expr.expr.as_any().downcast_ref::<Column>()?;
let proj_idx = col.index();
let proj_expr = proj_slice.get(proj_idx)?;
let table_col = proj_expr.expr.as_any().downcast_ref::<Column>()?;
indices[proj_idx] = table_col.index();
}

Some(indices)
}

/// Check whether a given ordering is valid for all file groups by verifying
Expand Down Expand Up @@ -1445,47 +1459,38 @@ fn get_projected_output_ordering(
let projected_orderings =
project_orderings(&base_config.output_ordering, projected_schema);

let indices = base_config
.file_source
.projection()
.as_ref()
.map(|p| ordered_column_indices_from_projection(p));

match indices {
Some(Some(indices)) => {
// Simple column projection — validate with statistics
validate_orderings(
&projected_orderings,
projected_schema,
&base_config.file_groups,
Some(indices.as_slice()),
)
}
None => {
// No projection — validate with statistics (no remapping needed)
validate_orderings(
&projected_orderings,
projected_schema,
&base_config.file_groups,
None,
)
}
Some(None) => {
// Complex projection (expressions, not simple columns) — can't
// determine column indices for statistics. Still valid if all
// file groups have at most one file.
if base_config.file_groups.iter().all(|g| g.len() <= 1) {
projected_orderings
} else {
debug!(
"Skipping specified output orderings. \
Some file groups couldn't be determined to be sorted: {:?}",
base_config.file_groups
);
vec![]
let projection = base_config.file_source.projection();

projected_orderings
.into_iter()
.filter(|ordering| match projection.as_ref() {
None => {
// No projection — validate directly with statistics
is_ordering_valid_for_file_groups(
&base_config.file_groups,
ordering,
projected_schema,
None,
)
}
}
}
Some(proj) => match resolve_sort_column_projection(ordering, proj) {
Some(indices) => {
// All sort columns resolved — validate with statistics
is_ordering_valid_for_file_groups(
&base_config.file_groups,
ordering,
projected_schema,
Some(&indices),
)
}
None => {
// Some sort column is a complex expression — can't
// look up statistics. Fall back to single-file check.
base_config.file_groups.iter().all(|g| g.len() <= 1)
}
},
})
.collect()
}

/// Convert type to a type suitable for use as a `ListingTable`
Expand Down