Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ impl SessionContext {
Box::pin(self.drop_schema(cmd)).await
}
DdlStatement::CreateFunction(cmd) => {
Box::pin(self.create_function(cmd)).await
Box::pin(self.create_function(*cmd)).await
}
DdlStatement::DropFunction(cmd) => {
Box::pin(self.drop_function(cmd)).await
Expand Down
28 changes: 14 additions & 14 deletions datafusion/expr/src/logical_plan/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ use sqlparser::ast::Ident;
/// Various types of DDL (CREATE / DROP) catalog manipulation
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum DdlStatement {
/// Creates an external table.
CreateExternalTable(CreateExternalTable),
/// Creates an external table. Boxed to keep `LogicalPlan` enum size down
/// — `CreateExternalTable` is ~312 bytes, dwarfing every other variant
/// in the plan tree and forcing the whole enum to that width.
CreateExternalTable(Box<CreateExternalTable>),
/// Creates an in memory table.
CreateMemoryTable(CreateMemoryTable),
/// Creates a new view.
Expand All @@ -56,8 +58,9 @@ pub enum DdlStatement {
DropView(DropView),
/// Drops a catalog schema
DropCatalogSchema(DropCatalogSchema),
/// Create function statement
CreateFunction(CreateFunction),
/// Create function statement. Boxed for the same reason as
/// [`Self::CreateExternalTable`] (~288 bytes).
CreateFunction(Box<CreateFunction>),
/// Drop function statement
DropFunction(DropFunction),
}
Expand All @@ -66,9 +69,7 @@ impl DdlStatement {
/// Get a reference to the logical plan's schema
pub fn schema(&self) -> &DFSchemaRef {
match self {
DdlStatement::CreateExternalTable(CreateExternalTable { schema, .. }) => {
schema
}
DdlStatement::CreateExternalTable(ce) => &ce.schema,
DdlStatement::CreateMemoryTable(CreateMemoryTable { input, .. })
| DdlStatement::CreateView(CreateView { input, .. }) => input.schema(),
DdlStatement::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
Expand All @@ -79,7 +80,7 @@ impl DdlStatement {
DdlStatement::DropTable(DropTable { schema, .. }) => schema,
DdlStatement::DropView(DropView { schema, .. }) => schema,
DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema,
DdlStatement::CreateFunction(CreateFunction { schema, .. }) => schema,
DdlStatement::CreateFunction(cf) => &cf.schema,
DdlStatement::DropFunction(DropFunction { schema, .. }) => schema,
}
}
Expand Down Expand Up @@ -131,11 +132,9 @@ impl DdlStatement {
impl Display for Wrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.0 {
DdlStatement::CreateExternalTable(CreateExternalTable {
name,
constraints,
..
}) => {
DdlStatement::CreateExternalTable(ce) => {
let name = &ce.name;
let constraints = &ce.constraints;
if constraints.is_empty() {
write!(f, "CreateExternalTable: {name:?}")
} else {
Expand Down Expand Up @@ -191,7 +190,8 @@ impl DdlStatement {
"DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}"
)
}
DdlStatement::CreateFunction(CreateFunction { name, .. }) => {
DdlStatement::CreateFunction(cf) => {
let name = &cf.name;
write!(f, "CreateFunction: name {name:?}")
}
DdlStatement::DropFunction(DropFunction { name, .. }) => {
Expand Down
39 changes: 39 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4740,6 +4740,45 @@ mod tests {
use insta::{assert_debug_snapshot, assert_snapshot};
use std::hash::DefaultHasher;

/// `LogicalPlan` is moved/swapped on every step of the planning hot path
/// (every `mem::take` in an in-place rewriter, every `Arc<LogicalPlan>`
/// write, every owned `map_*` traversal). Its size is set by the largest
/// variant, so an oversized variant balloons cost for every other variant.
///
/// Today the size-setter should be `Join` (~176 bytes); `DdlStatement` is
/// boxed precisely so it does not dominate. If you grow a variant, please
/// box the new large fields rather than letting this number creep up —
/// see the analogous `test_size_of_expr` in `expr.rs`.
#[test]
fn test_size_of_logical_plan() {
// `LogicalPlan` enum on aarch64 / x86_64. Today this matches
// `Join`'s 176 bytes (the enum discriminant fits in `Join`'s
// alignment padding); if `Join` grows or another variant overtakes
// it, this number will move with the new size-setter.
assert_eq!(size_of::<LogicalPlan>(), 176);
// `DdlStatement` is `Ddl(DdlStatement)`'s payload; keep it below the
// `Join` ceiling so it never re-becomes the size-setter.
assert!(
size_of::<DdlStatement>() < size_of::<Join>(),
"DdlStatement ({} bytes) should stay smaller than Join ({} bytes); \
box the new large variant rather than letting it dominate `LogicalPlan`.",
size_of::<DdlStatement>(),
size_of::<Join>(),
);
// Sanity check the two boxed variants stay boxed (so the payload
// sits on the heap, not in the enum).
assert_eq!(
size_of::<Box<crate::CreateExternalTable>>(),
8,
"CreateExternalTable should be Box'd inside DdlStatement"
);
assert_eq!(
size_of::<Box<crate::CreateFunction>>(),
8,
"CreateFunction should be Box'd inside DdlStatement"
);
}

fn employee_schema() -> Schema {
Schema::new(vec![
Field::new("id", DataType::Int32, false),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/ffi/src/table_provider_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl FFI_TableProviderFactory {
let plan = LogicalPlanNode::decode(cmd_serialized.as_ref())
.map_err(|e| DataFusionError::Internal(format!("{e:?}")))?;
match plan.try_into_logical_plan(&task_ctx, logical_codec.as_ref())? {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => Ok(cmd),
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => Ok(*cmd),
_ => Err(DataFusionError::Internal(
"Invalid logical plan in FFI_TableProviderFactory.".to_owned(),
)),
Expand Down Expand Up @@ -272,7 +272,7 @@ impl ForeignTableProviderFactory {
let logical_codec: Arc<dyn LogicalExtensionCodec> =
(&self.0.logical_codec).into();

let plan = LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd));
let plan = LogicalPlan::Ddl(DdlStatement::CreateExternalTable(Box::new(cmd)));
let plan: LogicalPlanNode =
AsLogicalPlan::try_from_logical_plan(&plan, logical_codec.as_ref())?;

Expand Down
51 changes: 27 additions & 24 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,26 +790,30 @@ impl AsLogicalPlan for LogicalPlanNode {
}

Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
CreateExternalTable::builder(
from_table_reference(
create_extern_table.name.as_ref(),
"CreateExternalTable",
)?,
create_extern_table.location.clone(),
create_extern_table.file_type.clone(),
pb_schema.try_into()?,
)
.with_partition_cols(create_extern_table.table_partition_cols.clone())
.with_order_exprs(order_exprs)
.with_if_not_exists(create_extern_table.if_not_exists)
.with_or_replace(create_extern_table.or_replace)
.with_temporary(create_extern_table.temporary)
.with_definition(definition)
.with_unbounded(create_extern_table.unbounded)
.with_options(create_extern_table.options.clone())
.with_constraints(constraints.into())
.with_column_defaults(column_defaults)
.build(),
Box::new(
CreateExternalTable::builder(
from_table_reference(
create_extern_table.name.as_ref(),
"CreateExternalTable",
)?,
create_extern_table.location.clone(),
create_extern_table.file_type.clone(),
pb_schema.try_into()?,
)
.with_partition_cols(
create_extern_table.table_partition_cols.clone(),
)
.with_order_exprs(order_exprs)
.with_if_not_exists(create_extern_table.if_not_exists)
.with_or_replace(create_extern_table.or_replace)
.with_temporary(create_extern_table.temporary)
.with_definition(definition)
.with_unbounded(create_extern_table.unbounded)
.with_options(create_extern_table.options.clone())
.with_constraints(constraints.into())
.with_column_defaults(column_defaults)
.build(),
),
)))
}
LogicalPlanType::CreateView(create_view) => {
Expand Down Expand Up @@ -1774,8 +1778,8 @@ impl AsLogicalPlan for LogicalPlanNode {
},
)),
}),
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
CreateExternalTable {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(ce)) => {
let CreateExternalTable {
name,
location,
file_type,
Expand All @@ -1790,8 +1794,7 @@ impl AsLogicalPlan for LogicalPlanNode {
constraints,
column_defaults,
temporary,
},
)) => {
} = ce.as_ref();
let mut converted_order_exprs: Vec<SortExprNodeCollection> = vec![];
for order in order_exprs {
let temp = SortExprNodeCollection {
Expand Down
30 changes: 16 additions & 14 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1468,15 +1468,15 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
function_body,
};

let statement = DdlStatement::CreateFunction(CreateFunction {
let statement = DdlStatement::CreateFunction(Box::new(CreateFunction {
or_replace,
temporary,
name,
return_type: return_type.map(|f| f.data_type().clone()),
args,
params,
schema: DFSchemaRef::new(DFSchema::empty()),
});
}));

Ok(LogicalPlan::Ddl(statement))
}
Expand Down Expand Up @@ -1855,18 +1855,20 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
let constraints =
self.new_constraint_from_table_constraints(&all_constraints, &df_schema)?;
Ok(LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
PlanCreateExternalTable::builder(name, location, file_type, df_schema)
.with_partition_cols(table_partition_cols)
.with_if_not_exists(if_not_exists)
.with_or_replace(or_replace)
.with_temporary(temporary)
.with_definition(definition)
.with_order_exprs(ordered_exprs)
.with_unbounded(unbounded)
.with_options(options_map)
.with_constraints(constraints)
.with_column_defaults(column_defaults)
.build(),
Box::new(
PlanCreateExternalTable::builder(name, location, file_type, df_schema)
.with_partition_cols(table_partition_cols)
.with_if_not_exists(if_not_exists)
.with_or_replace(or_replace)
.with_temporary(temporary)
.with_definition(definition)
.with_order_exprs(ordered_exprs)
.with_unbounded(unbounded)
.with_options(options_map)
.with_constraints(constraints)
.with_column_defaults(column_defaults)
.build(),
),
)))
}

Expand Down
106 changes: 106 additions & 0 deletions docs/source/library-user-guide/upgrading/55.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,112 @@ as a supertrait:
+ pub trait QueryPlanner: Any + Debug
```

### `DdlStatement::CreateExternalTable` and `CreateFunction` are now boxed

The two largest variants of `datafusion_expr::DdlStatement` are now
`Box`ed:

```rust,ignore
// Before
pub enum DdlStatement {
CreateExternalTable(CreateExternalTable),
// ...
CreateFunction(CreateFunction),
// ...
}

// After
pub enum DdlStatement {
CreateExternalTable(Box<CreateExternalTable>),
// ...
CreateFunction(Box<CreateFunction>),
// ...
}
```

`CreateExternalTable` is 312 bytes and `CreateFunction` is 288 bytes, so
without boxing they forced the entire `LogicalPlan` enum to 320 bytes
even on SELECT-only query paths that never instantiate them. Boxing
shrinks `LogicalPlan` from 320 → 176 bytes (−45%), making every
`mem::take` / `mem::swap` / `Arc<LogicalPlan>` store on the planning
hot path move a smaller payload.

**Who is affected:**

- Users who construct `DdlStatement::CreateExternalTable(...)` or
`DdlStatement::CreateFunction(...)` from an owned struct.
- Users who pattern-match these variants and destructure the inner
struct in the same pattern (e.g.
`DdlStatement::CreateExternalTable(CreateExternalTable { name, .. })`).
- Code that consumes the inner struct out of these variants (e.g. to
pass `CreateExternalTable` by value to another function).

**Migration guide:**

When constructing the variants, wrap the inner struct in `Box::new`:

```rust,ignore
// Before
let stmt = DdlStatement::CreateFunction(CreateFunction { name, args, .. });

// After
let stmt = DdlStatement::CreateFunction(Box::new(CreateFunction {
name,
args,
..
}));
```

When pattern-matching, bind the boxed value and either access fields
through it (Rust auto-derefs the `Box`) or destructure via `.as_ref()`:

```rust,ignore
// Before
match ddl {
DdlStatement::CreateExternalTable(CreateExternalTable {
name, location, ..
}) => { /* use name, location */ }
}

// After — access fields through the box
match ddl {
DdlStatement::CreateExternalTable(ce) => {
let name = &ce.name;
let location = &ce.location;
/* ... */
}
}

// After — destructure the dereferenced struct
match ddl {
DdlStatement::CreateExternalTable(ce) => {
let CreateExternalTable { name, location, .. } = ce.as_ref();
/* ... */
}
}
```

When you need an owned `CreateExternalTable` / `CreateFunction` out of
the variant, dereference the box with `*`:

```rust,ignore
// Before
match plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => Ok(cmd),
_ => { /* ... */ }
}

// After
match plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => Ok(*cmd),
_ => { /* ... */ }
}
```

See [PR #22733](https://github.com/apache/datafusion/pull/22733) for
details, including the per-variant size breakdown and benchmark
results.

### Spark map functions now reject duplicate keys by default

The Spark-compatibility map-construction functions (`map_from_arrays`,
Expand Down
Loading