From 05b37c0c468af3c6bb09a426af7eeaa7ceb3a780 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 12 Feb 2026 10:45:45 +0100 Subject: [PATCH 1/9] Add draft for basic extension type support --- Cargo.lock | 1 + datafusion/common/Cargo.toml | 1 + .../src/types/canonical_extensions/mod.rs | 3 + .../src/types/canonical_extensions/uuid.rs | 94 +++++++ datafusion/common/src/types/extension.rs | 53 ++++ datafusion/common/src/types/mod.rs | 4 + datafusion/core/src/dataframe/mod.rs | 6 + .../core/src/execution/session_state.rs | 91 ++++++- .../src/execution/session_state_defaults.rs | 8 + .../array_formatter_factory.rs | 50 ++++ datafusion/core/src/extension_types/mod.rs | 5 + datafusion/core/src/lib.rs | 7 +- datafusion/core/tests/core_integration.rs | 3 + datafusion/core/tests/extension_types/mod.rs | 1 + .../tests/extension_types/pretty_printing.rs | 58 ++++ datafusion/expr/src/registry.rs | 250 +++++++++++++++++- datafusion/ffi/src/session/mod.rs | 8 +- datafusion/session/src/session.rs | 6 +- 18 files changed, 640 insertions(+), 9 deletions(-) create mode 100644 datafusion/common/src/types/canonical_extensions/mod.rs create mode 100644 datafusion/common/src/types/canonical_extensions/uuid.rs create mode 100644 datafusion/common/src/types/extension.rs create mode 100644 datafusion/core/src/extension_types/array_formatter_factory.rs create mode 100644 datafusion/core/src/extension_types/mod.rs create mode 100644 datafusion/core/tests/extension_types/mod.rs create mode 100644 datafusion/core/tests/extension_types/pretty_printing.rs diff --git a/Cargo.lock b/Cargo.lock index 18e1b8b7f723f..12a25a1788134 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1915,6 +1915,7 @@ dependencies = [ "recursive", "sqlparser", "tokio", + "uuid", "web-time", ] diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 82e7aafcee2b1..65f1f053f6965 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -81,6 +81,7 @@ paste = { workspace = true } recursive = { workspace = true, optional = true } sqlparser = { workspace = true, optional = true } tokio = { workspace = true } +uuid = { workspace = true, features = ["v4"] } [target.'cfg(target_family = "wasm")'.dependencies] web-time = "1.1.0" diff --git a/datafusion/common/src/types/canonical_extensions/mod.rs b/datafusion/common/src/types/canonical_extensions/mod.rs new file mode 100644 index 0000000000000..f9dba8417d2eb --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/mod.rs @@ -0,0 +1,3 @@ +mod uuid; + +pub use uuid::*; diff --git a/datafusion/common/src/types/canonical_extensions/uuid.rs b/datafusion/common/src/types/canonical_extensions/uuid.rs new file mode 100644 index 0000000000000..4a1c158918821 --- /dev/null +++ b/datafusion/common/src/types/canonical_extensions/uuid.rs @@ -0,0 +1,94 @@ +use crate::error::_internal_err; +use crate::types::extension::DFExtensionType; +use arrow::array::{Array, FixedSizeBinaryArray}; +use arrow::datatypes::DataType; +use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; +use std::fmt::Write; +use uuid::{Bytes, Uuid}; + +/// Defines the extension type logic for the canonical `arrow.uuid` extension type. +/// +/// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. +#[derive(Debug)] +pub struct UuidDFExtensionType(); + +impl UuidDFExtensionType { + /// Create a new instance of [`UuidDFExtensionType`]. + pub fn new() -> Self { + Self {} + } +} + +impl Default for UuidDFExtensionType { + fn default() -> Self { + Self::new() + } +} + +impl DFExtensionType for UuidDFExtensionType { + fn create_array_formatter<'fmt>( + &self, + array: &'fmt dyn Array, + options: &FormatOptions<'fmt>, + ) -> crate::Result>> { + if array.data_type() != &DataType::FixedSizeBinary(16) { + return _internal_err!("Wrong array type for Uuid"); + } + + let display_index = UuidValueDisplayIndex { + array: array.as_any().downcast_ref().unwrap(), + null_str: options.null(), + }; + Ok(Some(ArrayFormatter::new( + Box::new(display_index), + options.safe(), + ))) + } +} + +/// Pretty printer for binary UUID values. +#[derive(Debug, Clone, Copy)] +struct UuidValueDisplayIndex<'a> { + array: &'a FixedSizeBinaryArray, + null_str: &'a str, +} + +impl DisplayIndex for UuidValueDisplayIndex<'_> { + fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { + if self.array.is_null(idx) { + write!(f, "arrow.uuid({})", self.null_str)?; + return Ok(()); + } + + let bytes = Bytes::try_from(self.array.value(idx)) + .expect("FixedSizeBinaryArray length checked in create_array_formatter"); + let uuid = Uuid::from_bytes(bytes); + write!(f, "arrow.uuid({uuid})")?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ScalarValue; + + #[test] + pub fn test_pretty_print_uuid() { + let my_uuid = Uuid::nil(); + let uuid = ScalarValue::FixedSizeBinary(16, Some(my_uuid.as_bytes().to_vec())) + .to_array_of_size(1) + .unwrap(); + + let extension_type = UuidDFExtensionType::new(); + let formatter = extension_type + .create_array_formatter(uuid.as_ref(), &FormatOptions::default()) + .unwrap() + .unwrap(); + + assert_eq!( + formatter.value(0).to_string(), + "arrow.uuid(00000000-0000-0000-0000-000000000000)" + ); + } +} diff --git a/datafusion/common/src/types/extension.rs b/datafusion/common/src/types/extension.rs new file mode 100644 index 0000000000000..520b2fceb3975 --- /dev/null +++ b/datafusion/common/src/types/extension.rs @@ -0,0 +1,53 @@ +use crate::error::Result; +use arrow::array::Array; +use arrow::datatypes::DataType; +use arrow::util::display::{ArrayFormatter, FormatOptions}; +use std::fmt::Debug; +use std::sync::Arc; + +/// A cheaply cloneable pointer to a [`DFExtensionType`]. +pub type DFExtensionTypeRef = Arc; + +/// Represents an implementation of a DataFusion extension type, allowing users to customize the +/// behavior of DataFusion for custom extension types. +/// +/// Extension types may change the semantics of a column. For example, adding two values of +/// [`DataType::Int64`] is a sensible thing to do. However, if the same data type is annotated with +/// an extension type like `custom.id`, the correct interpretation of a column changes. For example, +/// adding together two `custom.id` values (represented as a 64-bit integer) may no longer make +/// sense. +/// +/// Note that while helping users to navigate the semantic gap between the data type and extension +/// types is a goal of this trait, DataFusion's extension type support is still evolving and does +/// not cover all use cases. Currently, the following capabilities can be customized: +/// - Pretty-printing values in record batches +/// +/// # Relation to Arrow's `ExtensionType` +/// +/// The purpose of Arrow's `ExtensionType` trait, for the time being, is to provide a way to handle +/// metadata of an extension type in a type-safe manner. The trait does not provide any +/// customization options such that users can customize the behavior of any kernels (e.g., +/// [`DFExtensionType::create_array_formatter`] for formatting record batches). Therefore, +/// downstream users (such as DataFusion) have the flexibility to implement the extension type +/// mechanism according to their needs. [`DFExtensionType`] is DataFusion's implementation of this +/// extension type mechanism. +/// +/// Furthermore, Arrow's current trait is not dyn-compatible which we need for implementing +/// extension type registries. In the future, the two implementations may increasingly converge. +/// +/// # Example +/// +/// +pub trait DFExtensionType: Debug + Send + Sync { + /// Returns an [`ArrayFormatter`] that can format values of this type. + /// + /// If `Ok(None)` is returned, the default implementation will be used. + /// If an error is returned, there was an error creating the formatter. + fn create_array_formatter<'fmt>( + &self, + _array: &'fmt dyn Array, + _options: &FormatOptions<'fmt>, + ) -> Result>> { + Ok(None) + } +} diff --git a/datafusion/common/src/types/mod.rs b/datafusion/common/src/types/mod.rs index 2f9ce4ce02827..57bf921a6d564 100644 --- a/datafusion/common/src/types/mod.rs +++ b/datafusion/common/src/types/mod.rs @@ -16,11 +16,15 @@ // under the License. mod builtin; +mod canonical_extensions; +mod extension; mod field; mod logical; mod native; pub use builtin::*; +pub use canonical_extensions::*; +pub use extension::*; pub use field::*; pub use logical::*; pub use native::*; diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 2292f5855bfde..6591c943264da 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -69,6 +69,7 @@ use datafusion_functions_aggregate::expr_fn::{ avg, count, max, median, min, stddev, sum, }; +use crate::extension_types::DFArrayFormatterFactory; use async_trait::async_trait; use datafusion_catalog::Session; @@ -1516,6 +1517,11 @@ impl DataFrame { let options = self.session_state.config().options().format.clone(); let arrow_options: arrow::util::display::FormatOptions = (&options).try_into()?; + let registry = self.session_state.extension_type_registry(); + let formatter_factory = DFArrayFormatterFactory::new(Arc::clone(registry)); + let arrow_options = + arrow_options.with_formatter_factory(Some(&formatter_factory)); + let results = self.collect().await?; Ok( pretty::pretty_format_batches_with_options(&results, &arrow_options)? diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 9560616c1b6da..8e4e4a51d7c54 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -30,6 +30,7 @@ use crate::datasource::provider_as_source; use crate::execution::SessionStateDefaults; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; +use arrow_schema::extension::ExtensionType; use arrow_schema::{DataType, FieldRef}; use datafusion_catalog::MemoryCatalogProviderList; use datafusion_catalog::information_schema::{ @@ -56,7 +57,7 @@ use datafusion_expr::expr_rewriter::FunctionRewrite; use datafusion_expr::planner::ExprPlanner; #[cfg(feature = "sql")] use datafusion_expr::planner::{RelationPlanner, TypePlanner}; -use datafusion_expr::registry::{FunctionRegistry, SerializerRegistry}; +use datafusion_expr::registry::{ExtensionTypeRegistration, ExtensionTypeRegistrationRef, ExtensionTypeRegistry, ExtensionTypeRegistryRef, FunctionRegistry, MemoryExtensionTypeRegistry, SerializerRegistry, SimpleExtensionTypeRegistration}; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{AggregateUDF, Explain, Expr, LogicalPlan, ScalarUDF, WindowUDF}; use datafusion_optimizer::simplify_expressions::ExprSimplifier; @@ -77,6 +78,7 @@ use datafusion_sql::{ use async_trait::async_trait; use chrono::{DateTime, Utc}; +use datafusion_common::types::UuidDFExtensionType; use itertools::Itertools; use log::{debug, info}; use object_store::ObjectStore; @@ -158,6 +160,8 @@ pub struct SessionState { aggregate_functions: HashMap>, /// Window functions registered in the context window_functions: HashMap>, + /// Extension types registry for extensions. + extension_types: ExtensionTypeRegistryRef, /// Deserializer registry for extensions. serializer_registry: Arc, /// Holds registered external FileFormat implementations @@ -266,6 +270,10 @@ impl Session for SessionState { &self.window_functions } + fn extension_type_registry(&self) -> &ExtensionTypeRegistryRef { + &self.extension_types + } + fn runtime_env(&self) -> &Arc { self.runtime_env() } @@ -986,6 +994,7 @@ pub struct SessionStateBuilder { scalar_functions: Option>>, aggregate_functions: Option>>, window_functions: Option>>, + extension_types: Option, serializer_registry: Option>, file_formats: Option>>, config: Option, @@ -1026,6 +1035,7 @@ impl SessionStateBuilder { scalar_functions: None, aggregate_functions: None, window_functions: None, + extension_types: None, serializer_registry: None, file_formats: None, table_options: None, @@ -1081,6 +1091,7 @@ impl SessionStateBuilder { existing.aggregate_functions.into_values().collect_vec(), ), window_functions: Some(existing.window_functions.into_values().collect_vec()), + extension_types: Some(existing.extension_types), serializer_registry: Some(existing.serializer_registry), file_formats: Some(existing.file_formats.into_values().collect_vec()), config: Some(new_config), @@ -1126,6 +1137,11 @@ impl SessionStateBuilder { .get_or_insert_with(Vec::new) .extend(SessionStateDefaults::default_window_functions()); + self.extension_types + .get_or_insert_with(|| Arc::new(MemoryExtensionTypeRegistry::new())) + .extend(&SessionStateDefaults::default_extension_types()) + .expect("MemoryExtensionTypeRegistry is not read-only."); + self.table_functions .get_or_insert_with(HashMap::new) .extend( @@ -1316,6 +1332,44 @@ impl SessionStateBuilder { self } + /// Set the map of [`ExtensionTypeRegistration`]s + pub fn with_extension_type( + mut self, + registry: ExtensionTypeRegistryRef, + ) -> Self { + self.extension_types = Some(registry); + self + } + + /// Registers [canonical extension types](https://arrow.apache.org/docs/format/CanonicalExtensions.html) + /// in DataFusion's extension type registry. For more information see [`ExtensionTypeRegistry`]. + /// + /// # Errors + /// + /// May fail if an already registered [`ExtensionTypeRegistry`] raises an error while + /// registering the canonical extension types. + pub fn with_canonical_extension_types(mut self) -> datafusion_common::Result { + let canonical_extension_types = vec![SimpleExtensionTypeRegistration::new_arc( + arrow_schema::extension::Uuid::NAME, + Arc::new(UuidDFExtensionType::new()), + )]; + + match &self.extension_types { + None => { + let registry = Arc::new(MemoryExtensionTypeRegistry::new()); + registry + .extend(&canonical_extension_types) + .expect("Adding valid extension types to MemoryExtensionTypeRegistry always succeeds."); + self.extension_types = Some(registry); + } + Some(registry) => { + registry.extend(&canonical_extension_types)?; + } + } + + Ok(self) + } + /// Set the [`SerializerRegistry`] pub fn with_serializer_registry( mut self, @@ -1454,6 +1508,7 @@ impl SessionStateBuilder { scalar_functions, aggregate_functions, window_functions, + extension_types, serializer_registry, file_formats, table_options, @@ -1490,6 +1545,7 @@ impl SessionStateBuilder { scalar_functions: HashMap::new(), aggregate_functions: HashMap::new(), window_functions: HashMap::new(), + extension_types: Arc::new(MemoryExtensionTypeRegistry::default()), serializer_registry: serializer_registry .unwrap_or_else(|| Arc::new(EmptySerializerRegistry)), file_formats: HashMap::new(), @@ -1559,6 +1615,10 @@ impl SessionStateBuilder { }); } + if let Some(extension_types) = extension_types { + state.extension_types = extension_types; + } + if state.config.create_default_catalog_and_schema() { let default_catalog = SessionStateDefaults::default_catalog( &state.config, @@ -2071,6 +2131,35 @@ impl datafusion_execution::TaskContextProvider for SessionState { } } +impl ExtensionTypeRegistry for SessionState { + fn extension_type_registration( + &self, + name: &str, + ) -> datafusion_common::Result { + self.extension_types.extension_type_registration(name) + } + + fn extension_type_registrations(&self) -> Vec> { + self.extension_types.extension_type_registrations() + } + + fn add_extension_type_registration( + &self, + extension_type: ExtensionTypeRegistrationRef, + ) -> datafusion_common::Result> { + self.extension_types + .add_extension_type_registration(extension_type) + } + + fn remove_extension_type_registration( + &self, + name: &str, + ) -> datafusion_common::Result> { + self.extension_types + .remove_extension_type_registration(name) + } +} + impl OptimizerConfig for SessionState { fn query_execution_start_time(&self) -> Option> { self.execution_props.query_execution_start_time diff --git a/datafusion/core/src/execution/session_state_defaults.rs b/datafusion/core/src/execution/session_state_defaults.rs index 721710d4e057e..8ef041e5bf640 100644 --- a/datafusion/core/src/execution/session_state_defaults.rs +++ b/datafusion/core/src/execution/session_state_defaults.rs @@ -36,6 +36,7 @@ use datafusion_execution::config::SessionConfig; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::planner::ExprPlanner; +use datafusion_expr::registry::ExtensionTypeRegistrationRef; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; use std::collections::HashMap; use std::sync::Arc; @@ -122,6 +123,13 @@ impl SessionStateDefaults { functions_window::all_default_window_functions() } + /// Returns the list of default extension types. + /// + /// For now, we do not register any extension types by default. + pub fn default_extension_types() -> Vec { + vec![] + } + /// returns the list of default [`TableFunction`]s pub fn default_table_functions() -> Vec> { functions_table::all_default_table_functions() diff --git a/datafusion/core/src/extension_types/array_formatter_factory.rs b/datafusion/core/src/extension_types/array_formatter_factory.rs new file mode 100644 index 0000000000000..2c0ba5e4e3b9d --- /dev/null +++ b/datafusion/core/src/extension_types/array_formatter_factory.rs @@ -0,0 +1,50 @@ +use arrow::array::Array; +use arrow::util::display::{ArrayFormatter, ArrayFormatterFactory, FormatOptions}; +use arrow_schema::{ArrowError, Field}; +use datafusion_expr::registry::ExtensionTypeRegistryRef; + +/// A factory for creating [`ArrayFormatter`]s that checks whether a registered extension type can +/// format a given array based on its metadata. +#[derive(Debug)] +pub struct DFArrayFormatterFactory { + /// The extension type registry + registry: ExtensionTypeRegistryRef, +} + +impl DFArrayFormatterFactory { + /// Creates a new [`DFArrayFormatterFactory`]. + pub fn new(registry: ExtensionTypeRegistryRef) -> Self { + Self { registry } + } +} + +impl ArrayFormatterFactory for DFArrayFormatterFactory { + fn create_array_formatter<'formatter>( + &self, + array: &'formatter dyn Array, + options: &FormatOptions<'formatter>, + field: Option<&'formatter Field>, + ) -> Result>, ArrowError> { + let Some(field) = field else { + return Ok(None); + }; + + let Some(extension_type_name) = field.extension_type_name() else { + return Ok(None); + }; + + let Some(registration) = self + .registry + .extension_type_registration(extension_type_name) + .ok() + else { + // If the extension type is not registered, we fall back to the default formatter + return Ok(None); + }; + + registration + .create_df_extension_type(field.extension_type_metadata())? + .create_array_formatter(array, options) + .map_err(ArrowError::from) + } +} diff --git a/datafusion/core/src/extension_types/mod.rs b/datafusion/core/src/extension_types/mod.rs new file mode 100644 index 0000000000000..da5e54099e685 --- /dev/null +++ b/datafusion/core/src/extension_types/mod.rs @@ -0,0 +1,5 @@ +//! This module contains code that enables DataFusion's extension type capabilities. + +mod array_formatter_factory; + +pub use array_formatter_factory::*; diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 349eee5592abe..40705d79f645a 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -762,13 +762,11 @@ //! [`RecordBatchReader`]: arrow::record_batch::RecordBatchReader //! [`Array`]: arrow::array::Array -/// DataFusion crate version -pub const DATAFUSION_VERSION: &str = env!("CARGO_PKG_VERSION"); - extern crate core; - #[cfg(feature = "sql")] extern crate sqlparser; +/// DataFusion crate version +pub const DATAFUSION_VERSION: &str = env!("CARGO_PKG_VERSION"); pub mod dataframe; pub mod datasource; @@ -896,6 +894,7 @@ pub mod variable { #[cfg(not(target_arch = "wasm32"))] pub mod test; +mod extension_types; mod schema_equivalence; pub mod test_util; diff --git a/datafusion/core/tests/core_integration.rs b/datafusion/core/tests/core_integration.rs index bdbe72245323d..99783427f022e 100644 --- a/datafusion/core/tests/core_integration.rs +++ b/datafusion/core/tests/core_integration.rs @@ -60,6 +60,9 @@ mod catalog_listing; /// Run all tests that are found in the `tracing` directory mod tracing; +/// Run all tests that are found in the `extension_types` directory +mod extension_types; + #[cfg(test)] #[ctor::ctor] fn init() { diff --git a/datafusion/core/tests/extension_types/mod.rs b/datafusion/core/tests/extension_types/mod.rs new file mode 100644 index 0000000000000..98ca7fa47330a --- /dev/null +++ b/datafusion/core/tests/extension_types/mod.rs @@ -0,0 +1 @@ +mod pretty_printing; diff --git a/datafusion/core/tests/extension_types/pretty_printing.rs b/datafusion/core/tests/extension_types/pretty_printing.rs new file mode 100644 index 0000000000000..cead1958ef83d --- /dev/null +++ b/datafusion/core/tests/extension_types/pretty_printing.rs @@ -0,0 +1,58 @@ +use arrow::array::{FixedSizeBinaryArray, RecordBatch}; +use arrow_schema::extension::Uuid; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion::dataframe::DataFrame; +use datafusion::error::Result; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::SessionContext; +use insta::assert_snapshot; +use std::sync::Arc; + +fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![uuid_field()])) +} + +fn uuid_field() -> Field { + Field::new("my_uuids", DataType::FixedSizeBinary(16), false).with_extension_type(Uuid) +} + +async fn create_test_table() -> Result { + let schema = test_schema(); + + // define data. + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(FixedSizeBinaryArray::from(vec![ + &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 5, 6], + ]))], + )?; + + let state = SessionStateBuilder::default() + .with_canonical_extension_types()? + .build(); + let ctx = SessionContext::new_with_state(state); + + ctx.register_batch("test", batch)?; + + ctx.table("test").await +} + +#[tokio::test] +async fn test_pretty_print_logical_plan() -> Result<()> { + let result = create_test_table().await?.to_string().await?; + + assert_snapshot!( + result, + @r" + +--------------------------------------------------+ + | my_uuids | + +--------------------------------------------------+ + | arrow.uuid(00000000-0000-0000-0000-000000000000) | + | arrow.uuid(00010203-0405-0607-0809-000102030506) | + +--------------------------------------------------+ + " + ); + + Ok(()) +} diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index 472e065211aac..147897beabd7c 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -20,10 +20,12 @@ use crate::expr_rewriter::FunctionRewrite; use crate::planner::ExprPlanner; use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; -use datafusion_common::{HashMap, Result, not_impl_err, plan_datafusion_err}; +use arrow::datatypes::Field; +use datafusion_common::types::DFExtensionTypeRef; +use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result}; use std::collections::HashSet; use std::fmt::Debug; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; /// A registry knows how to build logical expressions out of user-defined function' names pub trait FunctionRegistry { @@ -215,3 +217,247 @@ impl FunctionRegistry for MemoryFunctionRegistry { self.udwfs.keys().cloned().collect() } } + +/// A cheaply cloneable pointer to an [ExtensionTypeRegistration]. +pub type ExtensionTypeRegistrationRef = Arc; + +/// The registration of an extension type. Implementations of this trait are responsible for +/// *creating* instances of [`DFExtensionType`] that represent the entire semantics of an extension +/// type. +/// +/// # Why do we need a Registration? +/// +/// A good question is why this trait is even necessary. Why not directly register the +/// [`DFExtensionType`] in a registration? +/// +/// While this works for extension types without parameters (e.g., `arrow.uuid`), it does not work +/// for more complex extension types that may have another extension type as a parameter. For +/// example, consider an extension type `custom.shortened(n)` that aims to short the pretty-printing +/// string to `n` characters. Here, `n` is a parameter of the extension type and should be a field +/// in the concrete struct that implements the [`DFExtensionType`]. The job of the registration is +/// to read the metadata from the field and create the corresponding [`DFExtensionType`] instance +/// with the correct `n` set. +/// +/// The [`SimpleExtensionTypeRegistration`] provides a convenient way of avoiding this complexity +/// if the extension type has no parameters. +pub trait ExtensionTypeRegistration: Debug + Send + Sync { + /// The name of the extension type. + /// + /// This name will be used to find the correct [ExtensionTypeRegistration] when an extension + /// type is encountered. + fn type_name(&self) -> &str; + + /// Creates an extension type instance from the optional metadata. The name of the extension + /// type is not a parameter as it's already defined by the registration itself. + fn create_df_extension_type( + &self, + metadata: Option<&str>, + ) -> Result; +} + +/// A cheaply cloneable pointer to an [ExtensionTypeRegistry]. +pub type ExtensionTypeRegistryRef = Arc; + +/// Supports registering custom [LogicalType]s, including native types. +pub trait ExtensionTypeRegistry: Debug + Send + Sync { + /// Returns a reference to registration of an extension type named `name`. + /// + /// Returns an error if there is no extension type with that name. + fn extension_type_registration( + &self, + name: &str, + ) -> Result; + + /// Creates a [`DFExtensionTypeRef`] from the type information in the `field`. + /// + /// The result `Ok(None)` indicates that there is no extension type metadata. Returns an error + /// if the extension type in the metadata is not found. + fn create_extension_type_for_field( + &self, + field: &Field, + ) -> Result> { + let Some(extension_type_name) = field.extension_type_name() else { + return Ok(None); + }; + + let registration = self.extension_type_registration(extension_type_name)?; + registration + .create_df_extension_type(field.extension_type_metadata()) + .map(Some) + } + + /// Returns all registered [ExtensionTypeRegistration]. + fn extension_type_registrations(&self) -> Vec>; + + /// Registers a new [ExtensionTypeRegistrationRef], returning any previously registered + /// implementation. + /// + /// Returns an error if the type cannot be registered, for example, if the registry is + /// read-only. + fn add_extension_type_registration( + &self, + extension_type: ExtensionTypeRegistrationRef, + ) -> Result>; + + /// Extends the registry with the provided extension types. + /// + /// Returns an error if the type cannot be registered, for example, if the registry is + /// read-only. + fn extend(&self, extension_types: &[ExtensionTypeRegistrationRef]) -> Result<()> { + for extension_type in extension_types.iter().cloned() { + self.add_extension_type_registration(extension_type)?; + } + Ok(()) + } + + /// Deregisters an extension type registration with the name `name`, returning the + /// implementation that was deregistered. + /// + /// Returns an error if the type cannot be deregistered, for example, if the registry is + /// read-only. + fn remove_extension_type_registration( + &self, + name: &str, + ) -> Result>; +} + +/// A simple implementation of [ExtensionTypeRegistration] where the extension type instance does +/// not have any parameters. As a result, the given [`DFExtensionType`] cannot depend on the +/// metadata that is stored in the field. See [`ExtensionTypeRegistration`] for more details. +#[derive(Debug)] +pub struct SimpleExtensionTypeRegistration { + /// The name of the extension type. + name: String, + /// The extension type instance. + extension_type: DFExtensionTypeRef, +} + +impl SimpleExtensionTypeRegistration { + /// Creates a new registration for the given `name` and `logical_type`. + pub fn new_arc( + name: &str, + extension_type: DFExtensionTypeRef, + ) -> ExtensionTypeRegistrationRef { + Arc::new(Self { + name: name.to_string(), + extension_type, + }) + } +} + +impl ExtensionTypeRegistration for SimpleExtensionTypeRegistration { + fn type_name(&self) -> &str { + &self.name + } + + fn create_df_extension_type( + &self, + _metadata: Option<&str>, + ) -> Result { + Ok(Arc::clone(&self.extension_type)) + } +} + +/// An [`ExtensionTypeRegistry`] that uses in memory [`HashMap`]s. +#[derive(Clone, Debug)] +pub struct MemoryExtensionTypeRegistry { + /// Holds a mapping between the name of an extension type and its logical type. + extension_types: Arc>>, +} + +impl Default for MemoryExtensionTypeRegistry { + fn default() -> Self { + MemoryExtensionTypeRegistry { + extension_types: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +impl MemoryExtensionTypeRegistry { + /// Creates an empty [MemoryExtensionTypeRegistry]. + pub fn new() -> Self { + Self { + extension_types: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Creates a new [MemoryExtensionTypeRegistry] with the provided `types`. + /// + /// # Errors + /// + /// Returns an error if one of the `types` is a native type. + pub fn new_with_types( + types: impl IntoIterator, + ) -> Result { + let extension_types = types + .into_iter() + .map(|t| (t.type_name().to_owned(), t)) + .collect::>(); + Ok(Self { + extension_types: Arc::new(RwLock::new(extension_types)), + }) + } + + /// Returns a list of all registered types. + pub fn all_extension_types(&self) -> Vec { + self.extension_types + .read() + .expect("Extension type registry lock poisoned") + .values() + .cloned() + .collect() + } +} + +impl ExtensionTypeRegistry for MemoryExtensionTypeRegistry { + fn extension_type_registration( + &self, + name: &str, + ) -> Result { + self.extension_types + .write() + .expect("Extension type registry lock poisoned") + .get(name) + .ok_or_else(|| plan_datafusion_err!("Logical type not found.")) + .cloned() + } + + fn extension_type_registrations(&self) -> Vec> { + self.extension_types + .read() + .expect("Extension type registry lock poisoned") + .values() + .cloned() + .collect() + } + + fn add_extension_type_registration( + &self, + extension_type: ExtensionTypeRegistrationRef, + ) -> Result> { + Ok(self + .extension_types + .write() + .expect("Extension type registry lock poisoned") + .insert(extension_type.type_name().to_owned(), extension_type)) + } + + fn remove_extension_type_registration( + &self, + name: &str, + ) -> Result> { + Ok(self + .extension_types + .write() + .expect("Extension type registry lock poisoned") + .remove(name)) + } +} + +impl From> for MemoryExtensionTypeRegistry { + fn from(value: HashMap) -> Self { + Self { + extension_types: Arc::new(RwLock::new(value)), + } + } +} diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index aa910abb9149a..6dae9cf1e6b23 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -46,7 +46,7 @@ use datafusion_proto::protobuf::LogicalExprNode; use datafusion_session::Session; use prost::Message; use tokio::runtime::Handle; - +use datafusion_expr::registry::{ExtensionTypeRegistry, ExtensionTypeRegistryRef}; use crate::arrow_wrappers::WrappedSchema; use crate::execution::FFI_TaskContext; use crate::execution_plan::FFI_ExecutionPlan; @@ -356,6 +356,7 @@ pub struct ForeignSession { scalar_functions: HashMap>, aggregate_functions: HashMap>, window_functions: HashMap>, + extension_types: ExtensionTypeRegistryRef, table_options: TableOptions, runtime_env: Arc, props: ExecutionProps, @@ -424,6 +425,7 @@ impl TryFrom<&FFI_SessionRef> for ForeignSession { scalar_functions, aggregate_functions, window_functions, + extension_types: Default::default(), runtime_env: Default::default(), props: Default::default(), }) @@ -515,6 +517,10 @@ impl Session for ForeignSession { &self.window_functions } + fn extension_type_registry(&self) -> &ExtensionTypeRegistryRef { + &self.extension_types + } + fn runtime_env(&self) -> &Arc { &self.runtime_env } diff --git a/datafusion/session/src/session.rs b/datafusion/session/src/session.rs index 2593e8cd71f4c..6eb85348c6a8d 100644 --- a/datafusion/session/src/session.rs +++ b/datafusion/session/src/session.rs @@ -18,10 +18,11 @@ use async_trait::async_trait; use datafusion_common::config::{ConfigOptions, TableOptions}; use datafusion_common::{DFSchema, Result}; -use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::registry::ExtensionTypeRegistryRef; use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr}; use parking_lot::{Mutex, RwLock}; @@ -116,6 +117,9 @@ pub trait Session: Send + Sync { /// Return reference to window functions fn window_functions(&self) -> &HashMap>; + /// Return a reference to the extension type registry + fn extension_type_registry(&self) -> &ExtensionTypeRegistryRef; + /// Return the runtime env fn runtime_env(&self) -> &Arc; From 2a48e735b1b33bef68c25402b61cfac70129258b Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 12 Feb 2026 11:34:12 +0100 Subject: [PATCH 2/9] Add an example for custom extension types --- datafusion-examples/README.md | 10 + .../examples/extension_types/event_id.rs | 328 ++++++++++++++++++ .../examples/extension_types/main.rs | 84 +++++ datafusion/common/src/types/extension.rs | 1 - .../core/src/execution/session_state.rs | 8 +- datafusion/expr/src/registry.rs | 2 +- datafusion/ffi/src/session/mod.rs | 24 +- datafusion/session/src/session.rs | 2 +- 8 files changed, 442 insertions(+), 17 deletions(-) create mode 100644 datafusion-examples/examples/extension_types/event_id.rs create mode 100644 datafusion-examples/examples/extension_types/main.rs diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 2cf0ec52409f8..e691bc5c54614 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -125,6 +125,16 @@ cargo run --example dataframe -- dataframe | mem_pool_tracking | [`execution_monitoring/memory_pool_tracking.rs`](examples/execution_monitoring/memory_pool_tracking.rs) | Demonstrates memory tracking | | tracing | [`execution_monitoring/tracing.rs`](examples/execution_monitoring/tracing.rs) | Demonstrates tracing integration | +## Extension Types Examples + +### Group: `extension_types` + +#### Category: Single Process + +| Subcommand | File Path | Description | +| --- | --- | --- | +| my_id | [`extension_types/event_id.rs`](examples/extension_types/event_id.rs) | A custom wrapper around integers that represent event ids | + ## External Dependency Examples ### Group: `external_dependency` diff --git a/datafusion-examples/examples/extension_types/event_id.rs b/datafusion-examples/examples/extension_types/event_id.rs new file mode 100644 index 0000000000000..bcdb048d237d4 --- /dev/null +++ b/datafusion-examples/examples/extension_types/event_id.rs @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Array, RecordBatch, StringArray, UInt32Array}; +use arrow::util::display::{ArrayFormatter, DisplayIndex, FormatOptions, FormatResult}; +use arrow_schema::extension::ExtensionType; +use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef}; +use datafusion::dataframe::DataFrame; +use datafusion::error::Result; +use datafusion::execution::SessionStateBuilder; +use datafusion::prelude::SessionContext; +use datafusion_common::internal_err; +use datafusion_common::types::{DFExtensionType, DFExtensionTypeRef}; +use datafusion_expr::registry::{ + ExtensionTypeRegistration, ExtensionTypeRegistry, MemoryExtensionTypeRegistry, +}; +use std::fmt::Write; +use std::sync::Arc; + +/// This example demonstrates using DataFusion's extension type API to create a custom identifier +/// type [`EventIdExtensionType`]. +/// +/// The following use cases are demonstrated: +/// - Use a custom implementation for pretty-printing data frames. +pub async fn event_id_example() -> Result<()> { + let ctx = create_session_context()?; + register_events_table(&ctx).await?; + + // Print the example table with the custom pretty-printer. + ctx.table("example").await?.show().await +} + +/// Creates the DataFusion session context with the custom extension type implementation. +fn create_session_context() -> Result { + // Create a registry with a reference to the custom extension type implementation. + let registry = MemoryExtensionTypeRegistry::new(); + let event_id_registration = Arc::new(EventIdExtensionTypeRegistration {}); + registry.add_extension_type_registration(event_id_registration)?; + + // Set the extension type registry in the session state so that DataFusion can use it. + let state = SessionStateBuilder::default() + .with_extension_type_registry(Arc::new(registry)) + .build(); + Ok(SessionContext::new_with_state(state)) +} + +/// Registers the example table and returns the data frame. +async fn register_events_table(ctx: &SessionContext) -> Result { + let schema = example_schema(); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(UInt32Array::from(vec![ + 20_01_000000, + 20_01_000001, + 21_03_000000, + 21_03_000001, + 21_03_000002, + ])), + Arc::new(UInt32Array::from(vec![ + 2020_01_0000, + 2020_01_0001, + 2021_03_0000, + 2021_03_0001, + 2021_03_0002, + ])), + Arc::new(StringArray::from(vec![ + "First Event Jan 2020", + "Second Event Jan 2020", + "First Event Mar 2021", + "Second Event Mar 2021", + "Third Event Mar 2021", + ])), + ], + )?; + + // Register the table and return the data frame. + ctx.register_batch("example", batch)?; + ctx.table("example").await +} + +/// The schema of the example table. +fn example_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("event_id_short", DataType::UInt32, false) + .with_extension_type(EventIdExtensionType(IdYearMode::Short)), + Field::new("event_id_long", DataType::UInt32, false) + .with_extension_type(EventIdExtensionType(IdYearMode::Long)), + Field::new("name", DataType::Utf8, false), + ])) +} + +/// Represents a 32-bit custom identifier that represents a single event. Using this format is not +/// a good idea in practice, but it is useful for demonstrating the API usage. +/// +/// An event is constructed of three parts: +/// - The year +/// - The month +/// - An auto-incrementing counter within the month +/// +/// For example, the event id `2024-01-0000` represents the first event in 2024. +/// +/// # Year Mode +/// +/// In addition, each event id can be represented in two modes. A short year mode `24-01-000000` and +/// a long year mode `2024-01-0000`. This showcases how extension types can be parameterized using +/// metadata. +#[derive(Debug)] +pub struct EventIdExtensionType(IdYearMode); + +/// Represents whether the id uses the short or long format. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum IdYearMode { + /// The short year format (e.g., `24-01-000000`). Allows for more events per month. + Short, + /// The long year format (e.g., `2024-01-0000`). Allows distinguishing between centuries. + Long, +} + +/// Implementation of [`ExtensionType`] for [`EventIdExtensionType`]. +/// +/// This is for the arrow-rs side of the API usage. The [`ExtensionType::Metadata`] type provides +/// static guarantees on the deserialized metadata for the extension type. We will use this +/// implementation to read and write the type metadata to arrow [`Field`]s. +/// +/// This trait does allow users to customize the behavior of DataFusion for this extension type. +/// This is done in [`DFExtensionType`]. +impl ExtensionType for EventIdExtensionType { + const NAME: &'static str = "custom.event_id"; + type Metadata = IdYearMode; + + fn metadata(&self) -> &Self::Metadata { + &self.0 + } + + fn serialize_metadata(&self) -> Option { + // Arrow extension type metadata is encoded as a string. We simply use the lowercase name. + // In a more involved scenario, more complex serialization formats such as JSON are + // appropriate. + Some(format!("{:?}", self.0).to_lowercase()) + } + + fn deserialize_metadata( + metadata: Option<&str>, + ) -> std::result::Result { + match metadata { + None => Err(ArrowError::InvalidArgumentError( + "Event id type requires metadata".to_owned(), + )), + Some(metadata) => match metadata { + "short" => Ok(IdYearMode::Short), + "long" => Ok(IdYearMode::Long), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid metadata for event id type: {}", + metadata + ))), + }, + } + } + + fn supports_data_type( + &self, + data_type: &DataType, + ) -> std::result::Result<(), ArrowError> { + match data_type { + DataType::UInt32 => Ok(()), + _ => Err(ArrowError::InvalidArgumentError(format!( + "Invalid data type: {data_type} for event id type", + ))), + } + } + + fn try_new( + data_type: &DataType, + metadata: Self::Metadata, + ) -> std::result::Result { + let instance = Self(metadata); + instance.supports_data_type(data_type)?; // Check that the data type is supported. + Ok(instance) + } +} + +/// Implementation of [`ExtensionType`] for [`EventIdExtensionType`]. +/// +/// This is for the DataFusion side of the API usage. Here users can override the default behavior +/// of DataFusion for supported extension points. +impl DFExtensionType for EventIdExtensionType { + fn create_array_formatter<'fmt>( + &self, + array: &'fmt dyn Array, + options: &FormatOptions<'fmt>, + ) -> Result>> { + if array.data_type() != &DataType::UInt32 { + return internal_err!("Wrong array type for Event Id"); + } + + // Create the formatter and pass in the year formatting mode of the type + let display_index = EventIdDisplayIndex { + array: array.as_any().downcast_ref().unwrap(), + null_str: options.null(), + mode: self.0, + }; + Ok(Some(ArrayFormatter::new( + Box::new(display_index), + options.safe(), + ))) + } +} + +/// Pretty printer for event ids. +#[derive(Debug)] +struct EventIdDisplayIndex<'a> { + array: &'a UInt32Array, + null_str: &'a str, + mode: IdYearMode, +} + +/// This implements the arrow-rs API for printing individual values of a column. DataFusion will +/// automatically pass in the reference to this implementation if a column is annotated with the +/// extension type metadata. +impl DisplayIndex for EventIdDisplayIndex<'_> { + fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { + // Handle nulls first + if self.array.is_null(idx) { + write!(f, "{}", self.null_str)?; + return Ok(()); + } + + let value = self.array.value(idx); + + match self.mode { + IdYearMode::Short => { + // Format: YY-MM-CCCCCC + // Logic: + // - The last 6 digits are the counter. + // - The next 2 digits are the month. + // - The remaining digits are the year. + let counter = value % 1_000_000; + let rest = value / 1_000_000; + let month = rest % 100; + let year = rest / 100; + + write!(f, "{:02}-{:02}-{:06}", year, month, counter)?; + } + IdYearMode::Long => { + // Format: YYYY-MM-CCCC + // Logic: + // - The last 4 digits are the counter. + // - The next 2 digits are the month. + // - The remaining digits are the year. + let counter = value % 10_000; + let rest = value / 10_000; + let month = rest % 100; + let year = rest / 100; + + write!(f, "{:04}-{:02}-{:04}", year, month, counter)?; + } + } + Ok(()) + } +} + +/// The registration is the last piece missing for the extension type implementation. It contains +/// the logic for deserializing the metadata from the arrow [`Field`]s and creating the extension +/// type instance. We cannot use the trait from arrow-rs as it's not dyn-compatible (the Metadata +/// type must be known at compile time). +/// +/// If an extension type does not have any parameters, the [`SimpleExtensionTypeRegistration`] +/// provides an easier way of registering it. +#[derive(Debug)] +pub struct EventIdExtensionTypeRegistration(); + +impl ExtensionTypeRegistration for EventIdExtensionTypeRegistration { + fn type_name(&self) -> &str { + EventIdExtensionType::NAME + } + + fn create_df_extension_type( + &self, + metadata: Option<&str>, + ) -> Result { + let metadata = EventIdExtensionType::deserialize_metadata(metadata)?; + Ok(Arc::new(EventIdExtensionType(metadata))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use insta::assert_snapshot; + + #[tokio::test] + async fn test_print_example_table() -> Result<()> { + let ctx = create_session_context()?; + let table = register_events_table(&ctx).await?; + + assert_snapshot!( + table.to_string().await?, + @r" + +----------------+---------------+-----------------------+ + | event_id_short | event_id_long | name | + +----------------+---------------+-----------------------+ + | 20-01-000000 | 2020-01-0000 | First Event Jan 2020 | + | 20-01-000001 | 2020-01-0001 | Second Event Jan 2020 | + | 21-03-000000 | 2021-03-0000 | First Event Mar 2021 | + | 21-03-000001 | 2021-03-0001 | Second Event Mar 2021 | + | 21-03-000002 | 2021-03-0002 | Third Event Mar 2021 | + +----------------+---------------+-----------------------+ + " + ); + + Ok(()) + } +} diff --git a/datafusion-examples/examples/extension_types/main.rs b/datafusion-examples/examples/extension_types/main.rs new file mode 100644 index 0000000000000..75fa4d913d6c8 --- /dev/null +++ b/datafusion-examples/examples/extension_types/main.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Extension type usage examples +//! +//! These examples demonstrate the API for creating and using custom extension types. +//! +//! ## Usage +//! ```bash +//! cargo run --example dataframe -- [all|my_id] +//! ``` +//! +//! Each subcommand runs a corresponding example: +//! - `all` — run all examples included in this module +//! +//! - `my_id` +//! (file: event_id.rs, desc: A custom wrapper around integers that represent event ids) + +mod event_id; + +use datafusion::error::{DataFusionError, Result}; +use strum::{IntoEnumIterator, VariantNames}; +use strum_macros::{Display, EnumIter, EnumString, VariantNames}; + +#[derive(EnumIter, EnumString, Display, VariantNames)] +#[strum(serialize_all = "snake_case")] +enum ExampleKind { + All, + EventId, +} + +impl ExampleKind { + const EXAMPLE_NAME: &str = "extension_types"; + + fn runnable() -> impl Iterator { + ExampleKind::iter().filter(|v| !matches!(v, ExampleKind::All)) + } + + async fn run(&self) -> Result<()> { + match self { + ExampleKind::All => { + for example in ExampleKind::runnable() { + println!("Running example: {example}"); + Box::pin(example.run()).await?; + } + } + ExampleKind::EventId => { + event_id::event_id_example().await?; + } + } + Ok(()) + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let usage = format!( + "Usage: cargo run --example {} -- [{}]", + ExampleKind::EXAMPLE_NAME, + ExampleKind::VARIANTS.join("|") + ); + + let example: ExampleKind = std::env::args() + .nth(1) + .unwrap_or_else(|| ExampleKind::All.to_string()) + .parse() + .map_err(|_| DataFusionError::Execution(format!("Unknown example. {usage}")))?; + + example.run().await +} diff --git a/datafusion/common/src/types/extension.rs b/datafusion/common/src/types/extension.rs index 520b2fceb3975..9f2ee28ceeff2 100644 --- a/datafusion/common/src/types/extension.rs +++ b/datafusion/common/src/types/extension.rs @@ -1,6 +1,5 @@ use crate::error::Result; use arrow::array::Array; -use arrow::datatypes::DataType; use arrow::util::display::{ArrayFormatter, FormatOptions}; use std::fmt::Debug; use std::sync::Arc; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 8e4e4a51d7c54..3e16eabf0b524 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -57,7 +57,11 @@ use datafusion_expr::expr_rewriter::FunctionRewrite; use datafusion_expr::planner::ExprPlanner; #[cfg(feature = "sql")] use datafusion_expr::planner::{RelationPlanner, TypePlanner}; -use datafusion_expr::registry::{ExtensionTypeRegistration, ExtensionTypeRegistrationRef, ExtensionTypeRegistry, ExtensionTypeRegistryRef, FunctionRegistry, MemoryExtensionTypeRegistry, SerializerRegistry, SimpleExtensionTypeRegistration}; +use datafusion_expr::registry::{ + ExtensionTypeRegistration, ExtensionTypeRegistrationRef, ExtensionTypeRegistry, + ExtensionTypeRegistryRef, FunctionRegistry, MemoryExtensionTypeRegistry, + SerializerRegistry, SimpleExtensionTypeRegistration, +}; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{AggregateUDF, Explain, Expr, LogicalPlan, ScalarUDF, WindowUDF}; use datafusion_optimizer::simplify_expressions::ExprSimplifier; @@ -1333,7 +1337,7 @@ impl SessionStateBuilder { } /// Set the map of [`ExtensionTypeRegistration`]s - pub fn with_extension_type( + pub fn with_extension_type_registry( mut self, registry: ExtensionTypeRegistryRef, ) -> Self { diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index 147897beabd7c..1f41ececab6b3 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -22,7 +22,7 @@ use crate::planner::ExprPlanner; use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; use arrow::datatypes::Field; use datafusion_common::types::DFExtensionTypeRef; -use datafusion_common::{not_impl_err, plan_datafusion_err, HashMap, Result}; +use datafusion_common::{HashMap, Result, not_impl_err, plan_datafusion_err}; use std::collections::HashSet; use std::fmt::Debug; use std::sync::{Arc, RwLock}; diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index 6dae9cf1e6b23..5af2f27693f54 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -20,6 +20,17 @@ use std::collections::HashMap; use std::ffi::c_void; use std::sync::Arc; +use crate::arrow_wrappers::WrappedSchema; +use crate::execution::FFI_TaskContext; +use crate::execution_plan::FFI_ExecutionPlan; +use crate::physical_expr::FFI_PhysicalExpr; +use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; +use crate::session::config::FFI_SessionConfig; +use crate::udaf::FFI_AggregateUDF; +use crate::udf::FFI_ScalarUDF; +use crate::udwf::FFI_WindowUDF; +use crate::util::FFIResult; +use crate::{df_result, rresult, rresult_return}; use abi_stable::StableAbi; use abi_stable::std_types::{RHashMap, RResult, RStr, RString, RVec}; use arrow_schema::SchemaRef; @@ -32,6 +43,7 @@ use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::registry::{ExtensionTypeRegistry, ExtensionTypeRegistryRef}; use datafusion_expr::{ AggregateUDF, AggregateUDFImpl, Expr, LogicalPlan, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl, @@ -46,18 +58,6 @@ use datafusion_proto::protobuf::LogicalExprNode; use datafusion_session::Session; use prost::Message; use tokio::runtime::Handle; -use datafusion_expr::registry::{ExtensionTypeRegistry, ExtensionTypeRegistryRef}; -use crate::arrow_wrappers::WrappedSchema; -use crate::execution::FFI_TaskContext; -use crate::execution_plan::FFI_ExecutionPlan; -use crate::physical_expr::FFI_PhysicalExpr; -use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec; -use crate::session::config::FFI_SessionConfig; -use crate::udaf::FFI_AggregateUDF; -use crate::udf::FFI_ScalarUDF; -use crate::udwf::FFI_WindowUDF; -use crate::util::FFIResult; -use crate::{df_result, rresult, rresult_return}; pub mod config; diff --git a/datafusion/session/src/session.rs b/datafusion/session/src/session.rs index 6eb85348c6a8d..34506a0a1718d 100644 --- a/datafusion/session/src/session.rs +++ b/datafusion/session/src/session.rs @@ -18,9 +18,9 @@ use async_trait::async_trait; use datafusion_common::config::{ConfigOptions, TableOptions}; use datafusion_common::{DFSchema, Result}; +use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; -use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::registry::ExtensionTypeRegistryRef; use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; From 0eabd10ca26bfc0782f92935ffe1749d4c4c5524 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 12 Feb 2026 12:52:40 +0100 Subject: [PATCH 3/9] Further improvements of the extension type API proposal --- Cargo.lock | 2 + datafusion-examples/README.md | 2 +- .../examples/extension_types/event_id.rs | 36 +++-------- .../examples/extension_types/main.rs | 4 +- datafusion/common/Cargo.toml | 1 + .../src/types/canonical_extensions/mod.rs | 2 - .../src/types/canonical_extensions/uuid.rs | 24 ++------ datafusion/common/src/types/extension.rs | 46 ++++++++------- datafusion/common/src/types/mod.rs | 1 - .../core/src/execution/session_state.rs | 16 +++-- .../tests/extension_types/pretty_printing.rs | 12 ++-- datafusion/expr/Cargo.toml | 3 +- datafusion/expr/src/registry.rs | 59 ++++++++++++------- datafusion/ffi/src/session/mod.rs | 12 ++-- 14 files changed, 100 insertions(+), 120 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 12a25a1788134..a5c23f212bb18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1898,6 +1898,7 @@ dependencies = [ "apache-avro", "arrow", "arrow-ipc", + "arrow-schema", "chrono", "criterion", "half", @@ -2152,6 +2153,7 @@ name = "datafusion-expr" version = "52.1.0" dependencies = [ "arrow", + "arrow-schema", "async-trait", "chrono", "ctor", diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index e691bc5c54614..5d67630ba6406 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -133,7 +133,7 @@ cargo run --example dataframe -- dataframe | Subcommand | File Path | Description | | --- | --- | --- | -| my_id | [`extension_types/event_id.rs`](examples/extension_types/event_id.rs) | A custom wrapper around integers that represent event ids | +| event_id | [`extension_types/event_id.rs`](examples/extension_types/event_id.rs) | A custom wrapper around integers that represent event ids | ## External Dependency Examples diff --git a/datafusion-examples/examples/extension_types/event_id.rs b/datafusion-examples/examples/extension_types/event_id.rs index bcdb048d237d4..6ee11754b6cbe 100644 --- a/datafusion-examples/examples/extension_types/event_id.rs +++ b/datafusion-examples/examples/extension_types/event_id.rs @@ -24,9 +24,9 @@ use datafusion::error::Result; use datafusion::execution::SessionStateBuilder; use datafusion::prelude::SessionContext; use datafusion_common::internal_err; -use datafusion_common::types::{DFExtensionType, DFExtensionTypeRef}; +use datafusion_common::types::DFExtensionType; use datafusion_expr::registry::{ - ExtensionTypeRegistration, ExtensionTypeRegistry, MemoryExtensionTypeRegistry, + DefaultExtensionTypeRegistration, ExtensionTypeRegistry, MemoryExtensionTypeRegistry, }; use std::fmt::Write; use std::sync::Arc; @@ -48,7 +48,9 @@ pub async fn event_id_example() -> Result<()> { fn create_session_context() -> Result { // Create a registry with a reference to the custom extension type implementation. let registry = MemoryExtensionTypeRegistry::new(); - let event_id_registration = Arc::new(EventIdExtensionTypeRegistration {}); + let event_id_registration = DefaultExtensionTypeRegistration::new_arc(|metadata| { + Ok(EventIdExtensionType(metadata)) + }); registry.add_extension_type_registration(event_id_registration)?; // Set the extension type registry in the session state so that DataFusion can use it. @@ -104,8 +106,8 @@ fn example_schema() -> SchemaRef { ])) } -/// Represents a 32-bit custom identifier that represents a single event. Using this format is not -/// a good idea in practice, but it is useful for demonstrating the API usage. +/// Represents a 32-bit custom identifier that represents a single event. Using this format is +/// probably not a good idea in practice, but it is useful for demonstrating the API usage. /// /// An event is constructed of three parts: /// - The year @@ -274,30 +276,6 @@ impl DisplayIndex for EventIdDisplayIndex<'_> { } } -/// The registration is the last piece missing for the extension type implementation. It contains -/// the logic for deserializing the metadata from the arrow [`Field`]s and creating the extension -/// type instance. We cannot use the trait from arrow-rs as it's not dyn-compatible (the Metadata -/// type must be known at compile time). -/// -/// If an extension type does not have any parameters, the [`SimpleExtensionTypeRegistration`] -/// provides an easier way of registering it. -#[derive(Debug)] -pub struct EventIdExtensionTypeRegistration(); - -impl ExtensionTypeRegistration for EventIdExtensionTypeRegistration { - fn type_name(&self) -> &str { - EventIdExtensionType::NAME - } - - fn create_df_extension_type( - &self, - metadata: Option<&str>, - ) -> Result { - let metadata = EventIdExtensionType::deserialize_metadata(metadata)?; - Ok(Arc::new(EventIdExtensionType(metadata))) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/datafusion-examples/examples/extension_types/main.rs b/datafusion-examples/examples/extension_types/main.rs index 75fa4d913d6c8..672f339871f95 100644 --- a/datafusion-examples/examples/extension_types/main.rs +++ b/datafusion-examples/examples/extension_types/main.rs @@ -21,13 +21,13 @@ //! //! ## Usage //! ```bash -//! cargo run --example dataframe -- [all|my_id] +//! cargo run --example extension_types -- [all|event_id] //! ``` //! //! Each subcommand runs a corresponding example: //! - `all` — run all examples included in this module //! -//! - `my_id` +//! - `event_id` //! (file: event_id.rs, desc: A custom wrapper around integers that represent event ids) mod event_id; diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 65f1f053f6965..a8dc05d3ff41c 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -66,6 +66,7 @@ apache-avro = { workspace = true, features = [ "zstandard", ], optional = true } arrow = { workspace = true } +arrow-schema = { workspace = true, features = ["canonical_extension_types"] } arrow-ipc = { workspace = true } chrono = { workspace = true } half = { workspace = true } diff --git a/datafusion/common/src/types/canonical_extensions/mod.rs b/datafusion/common/src/types/canonical_extensions/mod.rs index f9dba8417d2eb..14aae177b45c6 100644 --- a/datafusion/common/src/types/canonical_extensions/mod.rs +++ b/datafusion/common/src/types/canonical_extensions/mod.rs @@ -1,3 +1 @@ mod uuid; - -pub use uuid::*; diff --git a/datafusion/common/src/types/canonical_extensions/uuid.rs b/datafusion/common/src/types/canonical_extensions/uuid.rs index 4a1c158918821..7711ce935d27e 100644 --- a/datafusion/common/src/types/canonical_extensions/uuid.rs +++ b/datafusion/common/src/types/canonical_extensions/uuid.rs @@ -9,23 +9,7 @@ use uuid::{Bytes, Uuid}; /// Defines the extension type logic for the canonical `arrow.uuid` extension type. /// /// See [`DFExtensionType`] for information on DataFusion's extension type mechanism. -#[derive(Debug)] -pub struct UuidDFExtensionType(); - -impl UuidDFExtensionType { - /// Create a new instance of [`UuidDFExtensionType`]. - pub fn new() -> Self { - Self {} - } -} - -impl Default for UuidDFExtensionType { - fn default() -> Self { - Self::new() - } -} - -impl DFExtensionType for UuidDFExtensionType { +impl DFExtensionType for arrow_schema::extension::Uuid { fn create_array_formatter<'fmt>( &self, array: &'fmt dyn Array, @@ -56,14 +40,14 @@ struct UuidValueDisplayIndex<'a> { impl DisplayIndex for UuidValueDisplayIndex<'_> { fn write(&self, idx: usize, f: &mut dyn Write) -> FormatResult { if self.array.is_null(idx) { - write!(f, "arrow.uuid({})", self.null_str)?; + write!(f, "{}", self.null_str)?; return Ok(()); } let bytes = Bytes::try_from(self.array.value(idx)) .expect("FixedSizeBinaryArray length checked in create_array_formatter"); let uuid = Uuid::from_bytes(bytes); - write!(f, "arrow.uuid({uuid})")?; + write!(f, "{uuid}")?; Ok(()) } } @@ -88,7 +72,7 @@ mod tests { assert_eq!( formatter.value(0).to_string(), - "arrow.uuid(00000000-0000-0000-0000-000000000000)" + "00000000-0000-0000-0000-000000000000" ); } } diff --git a/datafusion/common/src/types/extension.rs b/datafusion/common/src/types/extension.rs index 9f2ee28ceeff2..4ead97ae674d9 100644 --- a/datafusion/common/src/types/extension.rs +++ b/datafusion/common/src/types/extension.rs @@ -7,36 +7,38 @@ use std::sync::Arc; /// A cheaply cloneable pointer to a [`DFExtensionType`]. pub type DFExtensionTypeRef = Arc; -/// Represents an implementation of a DataFusion extension type, allowing users to customize the -/// behavior of DataFusion for custom extension types. -/// -/// Extension types may change the semantics of a column. For example, adding two values of -/// [`DataType::Int64`] is a sensible thing to do. However, if the same data type is annotated with -/// an extension type like `custom.id`, the correct interpretation of a column changes. For example, -/// adding together two `custom.id` values (represented as a 64-bit integer) may no longer make -/// sense. -/// -/// Note that while helping users to navigate the semantic gap between the data type and extension -/// types is a goal of this trait, DataFusion's extension type support is still evolving and does -/// not cover all use cases. Currently, the following capabilities can be customized: +/// Represents an implementation of a DataFusion extension type. +/// +/// This allows users to customize the behavior of DataFusion for certain types. Having this ability +/// is necessary because extension types affect how columns should be treated by the query engine. +/// This effect includes which operations are possible on a column and what are the expected results +/// from these operations. The extension type mechanism allows users to define how these operations +/// apply to a particular extension type. +/// +/// For example, adding two values of [`DataType::Int64`] is a sensible thing to do. However, if the +/// same column is annotated with an extension type like `custom.id`, the correct interpretation of +/// a column changes. Adding together two `custom.id` values, even though they are stored as +/// integers, may no longer make sense. +/// +/// Note that DataFusion's extension type support is still young and therefore might not cover all +/// relevant use cases. Currently, the following operations can be customized: /// - Pretty-printing values in record batches /// /// # Relation to Arrow's `ExtensionType` /// -/// The purpose of Arrow's `ExtensionType` trait, for the time being, is to provide a way to handle -/// metadata of an extension type in a type-safe manner. The trait does not provide any -/// customization options such that users can customize the behavior of any kernels (e.g., -/// [`DFExtensionType::create_array_formatter`] for formatting record batches). Therefore, -/// downstream users (such as DataFusion) have the flexibility to implement the extension type -/// mechanism according to their needs. [`DFExtensionType`] is DataFusion's implementation of this -/// extension type mechanism. +/// The purpose of Arrow's `ExtensionType` trait, for the time being, is to allow reading and +/// writing extension type metadata in a type-safe manner. The trait does not provide any +/// customization options. Therefore, downstream users (such as DataFusion) have the flexibility to +/// implement the extension type mechanism according to their needs. [`DFExtensionType`] is +/// DataFusion's implementation of this extension type mechanism. /// -/// Furthermore, Arrow's current trait is not dyn-compatible which we need for implementing +/// Furthermore, the current trait in arrow-rs is not dyn-compatible, which we need for implementing /// extension type registries. In the future, the two implementations may increasingly converge. /// -/// # Example -/// +/// # Examples /// +/// Examples for using the extension type machinery can be found in the DataFusion examples +/// directory. pub trait DFExtensionType: Debug + Send + Sync { /// Returns an [`ArrayFormatter`] that can format values of this type. /// diff --git a/datafusion/common/src/types/mod.rs b/datafusion/common/src/types/mod.rs index 57bf921a6d564..82455063bc6ce 100644 --- a/datafusion/common/src/types/mod.rs +++ b/datafusion/common/src/types/mod.rs @@ -23,7 +23,6 @@ mod logical; mod native; pub use builtin::*; -pub use canonical_extensions::*; pub use extension::*; pub use field::*; pub use logical::*; diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 3e16eabf0b524..8b215f1718cd5 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -30,7 +30,6 @@ use crate::datasource::provider_as_source; use crate::execution::SessionStateDefaults; use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner}; use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; -use arrow_schema::extension::ExtensionType; use arrow_schema::{DataType, FieldRef}; use datafusion_catalog::MemoryCatalogProviderList; use datafusion_catalog::information_schema::{ @@ -58,9 +57,9 @@ use datafusion_expr::planner::ExprPlanner; #[cfg(feature = "sql")] use datafusion_expr::planner::{RelationPlanner, TypePlanner}; use datafusion_expr::registry::{ - ExtensionTypeRegistration, ExtensionTypeRegistrationRef, ExtensionTypeRegistry, - ExtensionTypeRegistryRef, FunctionRegistry, MemoryExtensionTypeRegistry, - SerializerRegistry, SimpleExtensionTypeRegistration, + DefaultExtensionTypeRegistration, ExtensionTypeRegistration, + ExtensionTypeRegistrationRef, ExtensionTypeRegistry, ExtensionTypeRegistryRef, + FunctionRegistry, MemoryExtensionTypeRegistry, SerializerRegistry, }; use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{AggregateUDF, Explain, Expr, LogicalPlan, ScalarUDF, WindowUDF}; @@ -82,7 +81,6 @@ use datafusion_sql::{ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion_common::types::UuidDFExtensionType; use itertools::Itertools; use log::{debug, info}; use object_store::ObjectStore; @@ -1353,10 +1351,10 @@ impl SessionStateBuilder { /// May fail if an already registered [`ExtensionTypeRegistry`] raises an error while /// registering the canonical extension types. pub fn with_canonical_extension_types(mut self) -> datafusion_common::Result { - let canonical_extension_types = vec![SimpleExtensionTypeRegistration::new_arc( - arrow_schema::extension::Uuid::NAME, - Arc::new(UuidDFExtensionType::new()), - )]; + let uuid = DefaultExtensionTypeRegistration::new_arc(|_| { + Ok(arrow_schema::extension::Uuid {}) + }); + let canonical_extension_types = vec![uuid]; match &self.extension_types { None => { diff --git a/datafusion/core/tests/extension_types/pretty_printing.rs b/datafusion/core/tests/extension_types/pretty_printing.rs index cead1958ef83d..fd30ea93cfbd5 100644 --- a/datafusion/core/tests/extension_types/pretty_printing.rs +++ b/datafusion/core/tests/extension_types/pretty_printing.rs @@ -45,12 +45,12 @@ async fn test_pretty_print_logical_plan() -> Result<()> { assert_snapshot!( result, @r" - +--------------------------------------------------+ - | my_uuids | - +--------------------------------------------------+ - | arrow.uuid(00000000-0000-0000-0000-000000000000) | - | arrow.uuid(00010203-0405-0607-0809-000102030506) | - +--------------------------------------------------+ + +--------------------------------------+ + | my_uuids | + +--------------------------------------+ + | 00000000-0000-0000-0000-000000000000 | + | 00010203-0405-0607-0809-000102030506 | + +--------------------------------------+ " ); diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 75aa59595bed5..e9887f2d21a1f 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -46,7 +46,8 @@ recursive_protection = ["dep:recursive"] sql = ["sqlparser"] [dependencies] -arrow = { workspace = true } +arrow = { workspace = true, features = ["canonical_extension_types"] } +arrow-schema = { workspace = true, features = ["canonical_extension_types"] } async-trait = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = false } diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index 1f41ececab6b3..c6acc0be1a8d9 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -21,10 +21,11 @@ use crate::expr_rewriter::FunctionRewrite; use crate::planner::ExprPlanner; use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; use arrow::datatypes::Field; -use datafusion_common::types::DFExtensionTypeRef; +use arrow_schema::extension::ExtensionType; +use datafusion_common::types::{DFExtensionType, DFExtensionTypeRef}; use datafusion_common::{HashMap, Result, not_impl_err, plan_datafusion_err}; use std::collections::HashSet; -use std::fmt::Debug; +use std::fmt::{Debug, Formatter}; use std::sync::{Arc, RwLock}; /// A registry knows how to build logical expressions out of user-defined function' names @@ -238,7 +239,7 @@ pub type ExtensionTypeRegistrationRef = Arc; /// to read the metadata from the field and create the corresponding [`DFExtensionType`] instance /// with the correct `n` set. /// -/// The [`SimpleExtensionTypeRegistration`] provides a convenient way of avoiding this complexity +/// The [`DefaultExtensionTypeRegistration`] provides a convenient way of avoiding this complexity /// if the extension type has no parameters. pub trait ExtensionTypeRegistration: Debug + Send + Sync { /// The name of the extension type. @@ -321,40 +322,56 @@ pub trait ExtensionTypeRegistry: Debug + Send + Sync { ) -> Result>; } -/// A simple implementation of [ExtensionTypeRegistration] where the extension type instance does -/// not have any parameters. As a result, the given [`DFExtensionType`] cannot depend on the -/// metadata that is stored in the field. See [`ExtensionTypeRegistration`] for more details. -#[derive(Debug)] -pub struct SimpleExtensionTypeRegistration { - /// The name of the extension type. - name: String, - /// The extension type instance. - extension_type: DFExtensionTypeRef, +/// A default implementation of [ExtensionTypeRegistration] that parses the metadata from the +/// given extension type and passes it to a constructor function. +pub struct DefaultExtensionTypeRegistration< + TExtensionType: ExtensionType + DFExtensionType + 'static, +> { + /// A function that creates an instance of [`DFExtensionTypeRef`] from the metadata. + factory: + Box Result + Send + Sync>, } -impl SimpleExtensionTypeRegistration { +impl + DefaultExtensionTypeRegistration +{ /// Creates a new registration for the given `name` and `logical_type`. pub fn new_arc( - name: &str, - extension_type: DFExtensionTypeRef, + factory: impl Fn(TExtensionType::Metadata) -> Result + + Send + + Sync + + 'static, ) -> ExtensionTypeRegistrationRef { Arc::new(Self { - name: name.to_string(), - extension_type, + factory: Box::new(factory), }) } } -impl ExtensionTypeRegistration for SimpleExtensionTypeRegistration { +impl ExtensionTypeRegistration + for DefaultExtensionTypeRegistration +{ fn type_name(&self) -> &str { - &self.name + TExtensionType::NAME } fn create_df_extension_type( &self, - _metadata: Option<&str>, + metadata: Option<&str>, ) -> Result { - Ok(Arc::clone(&self.extension_type)) + let metadata = TExtensionType::deserialize_metadata(metadata)?; + self.factory.as_ref()(metadata) + .map(|extension_type| Arc::new(extension_type) as DFExtensionTypeRef) + } +} + +impl Debug + for DefaultExtensionTypeRegistration +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DefaultExtensionTypeRegistration") + .field("type_name", &TExtensionType::NAME) + .finish() } } diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index 5af2f27693f54..70f69817d858b 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -31,19 +31,19 @@ use crate::udf::FFI_ScalarUDF; use crate::udwf::FFI_WindowUDF; use crate::util::FFIResult; use crate::{df_result, rresult, rresult_return}; -use abi_stable::StableAbi; use abi_stable::std_types::{RHashMap, RResult, RStr, RString, RVec}; -use arrow_schema::SchemaRef; +use abi_stable::StableAbi; use arrow_schema::ffi::FFI_ArrowSchema; +use arrow_schema::SchemaRef; use async_ffi::{FfiFuture, FutureExt}; use async_trait::async_trait; use datafusion_common::config::{ConfigOptions, TableOptions}; use datafusion_common::{DFSchema, DataFusionError}; -use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::registry::{ExtensionTypeRegistry, ExtensionTypeRegistryRef}; +use datafusion_expr::registry::{ExtensionTypeRegistryRef, MemoryExtensionTypeRegistry}; use datafusion_expr::{ AggregateUDF, AggregateUDFImpl, Expr, LogicalPlan, ScalarUDF, ScalarUDFImpl, WindowUDF, WindowUDFImpl, @@ -51,9 +51,9 @@ use datafusion_expr::{ use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; -use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::logical_plan::from_proto::parse_expr; use datafusion_proto::logical_plan::to_proto::serialize_expr; +use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::protobuf::LogicalExprNode; use datafusion_session::Session; use prost::Message; @@ -425,7 +425,7 @@ impl TryFrom<&FFI_SessionRef> for ForeignSession { scalar_functions, aggregate_functions, window_functions, - extension_types: Default::default(), + extension_types: Arc::new(MemoryExtensionTypeRegistry::default()), runtime_env: Default::default(), props: Default::default(), }) From f36534a7cc00230d1e1e55680dbd1b94f76c2c7a Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 12 Feb 2026 12:54:52 +0100 Subject: [PATCH 4/9] Formatting --- datafusion/ffi/src/session/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/ffi/src/session/mod.rs b/datafusion/ffi/src/session/mod.rs index 70f69817d858b..69cc8d9c6954f 100644 --- a/datafusion/ffi/src/session/mod.rs +++ b/datafusion/ffi/src/session/mod.rs @@ -31,17 +31,17 @@ use crate::udf::FFI_ScalarUDF; use crate::udwf::FFI_WindowUDF; use crate::util::FFIResult; use crate::{df_result, rresult, rresult_return}; -use abi_stable::std_types::{RHashMap, RResult, RStr, RString, RVec}; use abi_stable::StableAbi; -use arrow_schema::ffi::FFI_ArrowSchema; +use abi_stable::std_types::{RHashMap, RResult, RStr, RString, RVec}; use arrow_schema::SchemaRef; +use arrow_schema::ffi::FFI_ArrowSchema; use async_ffi::{FfiFuture, FutureExt}; use async_trait::async_trait; use datafusion_common::config::{ConfigOptions, TableOptions}; use datafusion_common::{DFSchema, DataFusionError}; +use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; -use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::registry::{ExtensionTypeRegistryRef, MemoryExtensionTypeRegistry}; use datafusion_expr::{ @@ -51,9 +51,9 @@ use datafusion_expr::{ use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; +use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::logical_plan::from_proto::parse_expr; use datafusion_proto::logical_plan::to_proto::serialize_expr; -use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::protobuf::LogicalExprNode; use datafusion_session::Session; use prost::Message; From f5d5b1236ee593f53445041a6dfe1d29f35cdc63 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 12 Feb 2026 12:57:21 +0100 Subject: [PATCH 5/9] Docs --- datafusion/expr/src/registry.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index c6acc0be1a8d9..859dcace8a399 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -239,8 +239,7 @@ pub type ExtensionTypeRegistrationRef = Arc; /// to read the metadata from the field and create the corresponding [`DFExtensionType`] instance /// with the correct `n` set. /// -/// The [`DefaultExtensionTypeRegistration`] provides a convenient way of avoiding this complexity -/// if the extension type has no parameters. +/// The [`DefaultExtensionTypeRegistration`] provides a convenient way of creating registrations. pub trait ExtensionTypeRegistration: Debug + Send + Sync { /// The name of the extension type. /// From 561601847ecb5475033f52e6f06786e26b194e48 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 12 Feb 2026 13:10:33 +0100 Subject: [PATCH 6/9] License headers and formatting --- datafusion-examples/README.md | 6 +++--- .../src/types/canonical_extensions/mod.rs | 17 +++++++++++++++++ .../src/types/canonical_extensions/uuid.rs | 19 ++++++++++++++++++- datafusion/common/src/types/extension.rs | 17 +++++++++++++++++ .../array_formatter_factory.rs | 17 +++++++++++++++++ datafusion/core/src/extension_types/mod.rs | 17 +++++++++++++++++ datafusion/core/tests/extension_types/mod.rs | 17 +++++++++++++++++ .../tests/extension_types/pretty_printing.rs | 17 +++++++++++++++++ 8 files changed, 123 insertions(+), 4 deletions(-) diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 5d67630ba6406..45fb8159ec451 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -131,9 +131,9 @@ cargo run --example dataframe -- dataframe #### Category: Single Process -| Subcommand | File Path | Description | -| --- | --- | --- | -| event_id | [`extension_types/event_id.rs`](examples/extension_types/event_id.rs) | A custom wrapper around integers that represent event ids | +| Subcommand | File Path | Description | +| ---------- | --------------------------------------------------------------------- | --------------------------------------------------------- | +| event_id | [`extension_types/event_id.rs`](examples/extension_types/event_id.rs) | A custom wrapper around integers that represent event ids | ## External Dependency Examples diff --git a/datafusion/common/src/types/canonical_extensions/mod.rs b/datafusion/common/src/types/canonical_extensions/mod.rs index 14aae177b45c6..e61c415b44811 100644 --- a/datafusion/common/src/types/canonical_extensions/mod.rs +++ b/datafusion/common/src/types/canonical_extensions/mod.rs @@ -1 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + mod uuid; diff --git a/datafusion/common/src/types/canonical_extensions/uuid.rs b/datafusion/common/src/types/canonical_extensions/uuid.rs index 7711ce935d27e..1eb3a09bf3fdc 100644 --- a/datafusion/common/src/types/canonical_extensions/uuid.rs +++ b/datafusion/common/src/types/canonical_extensions/uuid.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::error::_internal_err; use crate::types::extension::DFExtensionType; use arrow::array::{Array, FixedSizeBinaryArray}; @@ -64,7 +81,7 @@ mod tests { .to_array_of_size(1) .unwrap(); - let extension_type = UuidDFExtensionType::new(); + let extension_type = arrow_schema::extension::Uuid {}; let formatter = extension_type .create_array_formatter(uuid.as_ref(), &FormatOptions::default()) .unwrap() diff --git a/datafusion/common/src/types/extension.rs b/datafusion/common/src/types/extension.rs index 4ead97ae674d9..41411f1401baf 100644 --- a/datafusion/common/src/types/extension.rs +++ b/datafusion/common/src/types/extension.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use crate::error::Result; use arrow::array::Array; use arrow::util::display::{ArrayFormatter, FormatOptions}; diff --git a/datafusion/core/src/extension_types/array_formatter_factory.rs b/datafusion/core/src/extension_types/array_formatter_factory.rs index 2c0ba5e4e3b9d..f10576e816bda 100644 --- a/datafusion/core/src/extension_types/array_formatter_factory.rs +++ b/datafusion/core/src/extension_types/array_formatter_factory.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use arrow::array::Array; use arrow::util::display::{ArrayFormatter, ArrayFormatterFactory, FormatOptions}; use arrow_schema::{ArrowError, Field}; diff --git a/datafusion/core/src/extension_types/mod.rs b/datafusion/core/src/extension_types/mod.rs index da5e54099e685..55ec1ad95b5a1 100644 --- a/datafusion/core/src/extension_types/mod.rs +++ b/datafusion/core/src/extension_types/mod.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + //! This module contains code that enables DataFusion's extension type capabilities. mod array_formatter_factory; diff --git a/datafusion/core/tests/extension_types/mod.rs b/datafusion/core/tests/extension_types/mod.rs index 98ca7fa47330a..bfe0c2e34927e 100644 --- a/datafusion/core/tests/extension_types/mod.rs +++ b/datafusion/core/tests/extension_types/mod.rs @@ -1 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + mod pretty_printing; diff --git a/datafusion/core/tests/extension_types/pretty_printing.rs b/datafusion/core/tests/extension_types/pretty_printing.rs index fd30ea93cfbd5..0b99504b97b34 100644 --- a/datafusion/core/tests/extension_types/pretty_printing.rs +++ b/datafusion/core/tests/extension_types/pretty_printing.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use arrow::array::{FixedSizeBinaryArray, RecordBatch}; use arrow_schema::extension::Uuid; use arrow_schema::{DataType, Field, Schema, SchemaRef}; From 493265624d5911e33c559d5b4649f86ad053a410 Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 12 Feb 2026 13:11:42 +0100 Subject: [PATCH 7/9] Add extension type registry implementation for mock sessions --- .../datasource-arrow/src/file_format.rs | 21 ++++++++++++------- datafusion/datasource/src/url.rs | 13 ++++++++---- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 9997d23d4c61f..32a898d877edb 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -30,22 +30,22 @@ use arrow::error::ArrowError; use arrow::ipc::convert::fb_to_schema; use arrow::ipc::reader::{FileReader, StreamReader}; use arrow::ipc::writer::IpcWriteOptions; -use arrow::ipc::{CompressionType, root_as_message}; +use arrow::ipc::{root_as_message, CompressionType}; use datafusion_common::error::Result; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - DEFAULT_ARROW_EXTENSION, DataFusionError, GetExt, Statistics, - internal_datafusion_err, not_impl_err, + internal_datafusion_err, not_impl_err, DataFusionError, GetExt, + Statistics, DEFAULT_ARROW_EXTENSION, }; use datafusion_common_runtime::{JoinSet, SpawnedTask}; -use datafusion_datasource::TableSchema; use datafusion_datasource::display::FileGroupDisplay; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_datasource::write::{ - ObjectWriterBuilder, SharedBuffer, get_writer_schema, + get_writer_schema, ObjectWriterBuilder, SharedBuffer, }; +use datafusion_datasource::TableSchema; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::LexRequirement; @@ -60,10 +60,10 @@ use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::write::demux::DemuxedStreamReceiver; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; -use futures::StreamExt; use futures::stream::BoxStream; +use futures::StreamExt; use object_store::{ - GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore, path::Path, + path::Path, GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore, }; use tokio::io::AsyncWriteExt; @@ -549,11 +549,12 @@ mod tests { use super::*; use chrono::DateTime; - use datafusion_common::DFSchema; use datafusion_common::config::TableOptions; + use datafusion_common::DFSchema; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::execution_props::ExecutionProps; + use datafusion_expr::registry::ExtensionTypeRegistryRef; use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use object_store::{chunked::ChunkedStore, memory::InMemory, path::Path}; @@ -609,6 +610,10 @@ mod tests { unimplemented!() } + fn extension_type_registry(&self) -> &ExtensionTypeRegistryRef { + unimplemented!() + } + fn runtime_env(&self) -> &Arc { &self.runtime_env } diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index 0c274806c09c3..dcda936aba6d8 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use datafusion_common::{DataFusionError, Result, TableReference}; -use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::CachedFileList; +use datafusion_execution::cache::TableScopedPath; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_session::Session; @@ -28,8 +28,8 @@ use futures::{StreamExt, TryStreamExt}; use glob::Pattern; use itertools::Itertools; use log::debug; -use object_store::path::DELIMITER; use object_store::path::Path; +use object_store::path::DELIMITER; use object_store::{ObjectMeta, ObjectStore}; use url::Url; @@ -511,12 +511,13 @@ mod tests { use super::*; use async_trait::async_trait; use bytes::Bytes; - use datafusion_common::DFSchema; use datafusion_common::config::TableOptions; - use datafusion_execution::TaskContext; + use datafusion_common::DFSchema; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; + use datafusion_expr::registry::ExtensionTypeRegistryRef; use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_plan::ExecutionPlan; @@ -1220,6 +1221,10 @@ mod tests { unimplemented!() } + fn extension_type_registry(&self) -> &ExtensionTypeRegistryRef { + unimplemented!() + } + fn runtime_env(&self) -> &Arc { &self.runtime_env } From 6e1522e43d9e19d8577473d8d25567943c221feb Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 12 Feb 2026 14:33:39 +0100 Subject: [PATCH 8/9] Fix error in listing_table_factory.rs, Formatting --- .../core/src/datasource/listing_table_factory.rs | 6 ++++++ datafusion/datasource-arrow/src/file_format.rs | 16 ++++++++-------- datafusion/datasource/src/url.rs | 8 ++++---- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index f85f15a6d8c63..196bc35671bc0 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -241,6 +241,7 @@ mod tests { use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{DFSchema, TableReference}; + use datafusion_expr::registry::ExtensionTypeRegistryRef; #[tokio::test] async fn test_create_using_non_std_file_ext() { @@ -609,6 +610,11 @@ mod tests { ) -> &HashMap> { unimplemented!() } + + fn extension_type_registry(&self) -> &ExtensionTypeRegistryRef { + unreachable!() + } + fn runtime_env(&self) -> &Arc { unimplemented!() } diff --git a/datafusion/datasource-arrow/src/file_format.rs b/datafusion/datasource-arrow/src/file_format.rs index 32a898d877edb..de164e70d1cb8 100644 --- a/datafusion/datasource-arrow/src/file_format.rs +++ b/datafusion/datasource-arrow/src/file_format.rs @@ -30,22 +30,22 @@ use arrow::error::ArrowError; use arrow::ipc::convert::fb_to_schema; use arrow::ipc::reader::{FileReader, StreamReader}; use arrow::ipc::writer::IpcWriteOptions; -use arrow::ipc::{root_as_message, CompressionType}; +use arrow::ipc::{CompressionType, root_as_message}; use datafusion_common::error::Result; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - internal_datafusion_err, not_impl_err, DataFusionError, GetExt, - Statistics, DEFAULT_ARROW_EXTENSION, + DEFAULT_ARROW_EXTENSION, DataFusionError, GetExt, Statistics, + internal_datafusion_err, not_impl_err, }; use datafusion_common_runtime::{JoinSet, SpawnedTask}; +use datafusion_datasource::TableSchema; use datafusion_datasource::display::FileGroupDisplay; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; use datafusion_datasource::write::{ - get_writer_schema, ObjectWriterBuilder, SharedBuffer, + ObjectWriterBuilder, SharedBuffer, get_writer_schema, }; -use datafusion_datasource::TableSchema; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::dml::InsertOp; use datafusion_physical_expr_common::sort_expr::LexRequirement; @@ -60,10 +60,10 @@ use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::write::demux::DemuxedStreamReceiver; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; -use futures::stream::BoxStream; use futures::StreamExt; +use futures::stream::BoxStream; use object_store::{ - path::Path, GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore, + GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore, path::Path, }; use tokio::io::AsyncWriteExt; @@ -549,8 +549,8 @@ mod tests { use super::*; use chrono::DateTime; - use datafusion_common::config::TableOptions; use datafusion_common::DFSchema; + use datafusion_common::config::TableOptions; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::execution_props::ExecutionProps; diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs index dcda936aba6d8..c80f9f097d1db 100644 --- a/datafusion/datasource/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use datafusion_common::{DataFusionError, Result, TableReference}; -use datafusion_execution::cache::cache_manager::CachedFileList; use datafusion_execution::cache::TableScopedPath; +use datafusion_execution::cache::cache_manager::CachedFileList; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_session::Session; @@ -28,8 +28,8 @@ use futures::{StreamExt, TryStreamExt}; use glob::Pattern; use itertools::Itertools; use log::debug; -use object_store::path::Path; use object_store::path::DELIMITER; +use object_store::path::Path; use object_store::{ObjectMeta, ObjectStore}; use url::Url; @@ -511,11 +511,11 @@ mod tests { use super::*; use async_trait::async_trait; use bytes::Bytes; - use datafusion_common::config::TableOptions; use datafusion_common::DFSchema; + use datafusion_common::config::TableOptions; + use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnv; - use datafusion_execution::TaskContext; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::registry::ExtensionTypeRegistryRef; use datafusion_expr::{AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF}; From f9b0b36552c3ecc7441f0799fad297ad5725bfdd Mon Sep 17 00:00:00 2001 From: Tobias Schwarzinger Date: Thu, 12 Feb 2026 15:24:13 +0100 Subject: [PATCH 9/9] Lints and formatting --- .../examples/extension_types/event_id.rs | 19 +++++-------------- datafusion/common/Cargo.toml | 2 +- datafusion/common/src/types/extension.rs | 8 ++++---- datafusion/expr/src/registry.rs | 6 +++++- 4 files changed, 15 insertions(+), 20 deletions(-) diff --git a/datafusion-examples/examples/extension_types/event_id.rs b/datafusion-examples/examples/extension_types/event_id.rs index 6ee11754b6cbe..11a27c0742f62 100644 --- a/datafusion-examples/examples/extension_types/event_id.rs +++ b/datafusion-examples/examples/extension_types/event_id.rs @@ -67,18 +67,10 @@ async fn register_events_table(ctx: &SessionContext) -> Result { schema, vec![ Arc::new(UInt32Array::from(vec![ - 20_01_000000, - 20_01_000001, - 21_03_000000, - 21_03_000001, - 21_03_000002, + 2001000000, 2001000001, 2103000000, 2103000001, 2103000002, ])), Arc::new(UInt32Array::from(vec![ - 2020_01_0000, - 2020_01_0001, - 2021_03_0000, - 2021_03_0001, - 2021_03_0002, + 2020010000, 2020010001, 2021030000, 2021030001, 2021030002, ])), Arc::new(StringArray::from(vec![ "First Event Jan 2020", @@ -167,8 +159,7 @@ impl ExtensionType for EventIdExtensionType { "short" => Ok(IdYearMode::Short), "long" => Ok(IdYearMode::Long), _ => Err(ArrowError::InvalidArgumentError(format!( - "Invalid metadata for event id type: {}", - metadata + "Invalid metadata for event id type: {metadata}" ))), }, } @@ -256,7 +247,7 @@ impl DisplayIndex for EventIdDisplayIndex<'_> { let month = rest % 100; let year = rest / 100; - write!(f, "{:02}-{:02}-{:06}", year, month, counter)?; + write!(f, "{year:02}-{month:02}-{counter:06}")?; } IdYearMode::Long => { // Format: YYYY-MM-CCCC @@ -269,7 +260,7 @@ impl DisplayIndex for EventIdDisplayIndex<'_> { let month = rest % 100; let year = rest / 100; - write!(f, "{:04}-{:02}-{:04}", year, month, counter)?; + write!(f, "{year:04}-{month:02}-{counter:04}")?; } } Ok(()) diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index a8dc05d3ff41c..ab67c5a3f2f6b 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -66,8 +66,8 @@ apache-avro = { workspace = true, features = [ "zstandard", ], optional = true } arrow = { workspace = true } -arrow-schema = { workspace = true, features = ["canonical_extension_types"] } arrow-ipc = { workspace = true } +arrow-schema = { workspace = true, features = ["canonical_extension_types"] } chrono = { workspace = true } half = { workspace = true } hashbrown = { workspace = true } diff --git a/datafusion/common/src/types/extension.rs b/datafusion/common/src/types/extension.rs index 41411f1401baf..1cd19b595f649 100644 --- a/datafusion/common/src/types/extension.rs +++ b/datafusion/common/src/types/extension.rs @@ -32,10 +32,10 @@ pub type DFExtensionTypeRef = Arc; /// from these operations. The extension type mechanism allows users to define how these operations /// apply to a particular extension type. /// -/// For example, adding two values of [`DataType::Int64`] is a sensible thing to do. However, if the -/// same column is annotated with an extension type like `custom.id`, the correct interpretation of -/// a column changes. Adding together two `custom.id` values, even though they are stored as -/// integers, may no longer make sense. +/// For example, adding two values of [`Int64`](arrow::datatypes::DataType::Int64) is a sensible +/// thing to do. However, if the same column is annotated with an extension type like `custom.id`, +/// the correct interpretation of a column changes. Adding together two `custom.id` values, even +/// though they are stored as integers, may no longer make sense. /// /// Note that DataFusion's extension type support is still young and therefore might not cover all /// relevant use cases. Currently, the following operations can be customized: diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index 859dcace8a399..53d67126b04ae 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -258,7 +258,11 @@ pub trait ExtensionTypeRegistration: Debug + Send + Sync { /// A cheaply cloneable pointer to an [ExtensionTypeRegistry]. pub type ExtensionTypeRegistryRef = Arc; -/// Supports registering custom [LogicalType]s, including native types. +/// Manages [`ExtensionTypeRegistration`]s, which allow users to register custom behavior for +/// extension types. +/// +/// Each registration is connected to the extension type name, which can also be looked up to get +/// the registration. pub trait ExtensionTypeRegistry: Debug + Send + Sync { /// Returns a reference to registration of an extension type named `name`. ///