From 27fbac52289ec1b9dd02a0933598d57961fe52ec Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 27 May 2026 13:05:24 +0100 Subject: [PATCH 1/5] Idiomatic parquet-variant import/export Signed-off-by: Adam Gutglick --- encodings/parquet-variant/src/arrow.rs | 123 +++++++++++++++++++++++ encodings/parquet-variant/src/lib.rs | 99 ++++++++++++++++++ vortex-array/src/arrow/session.rs | 21 ++-- vortex-array/src/extension/uuid/arrow.rs | 6 +- vortex-tensor/src/types/vector/arrow.rs | 23 +++-- 5 files changed, 252 insertions(+), 20 deletions(-) create mode 100644 encodings/parquet-variant/src/arrow.rs diff --git a/encodings/parquet-variant/src/arrow.rs b/encodings/parquet-variant/src/arrow.rs new file mode 100644 index 00000000000..bcad493f0ad --- /dev/null +++ b/encodings/parquet-variant/src/arrow.rs @@ -0,0 +1,123 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use arrow_array::Array as _; +use arrow_array::ArrayRef as ArrowArrayRef; +use arrow_array::cast::AsArray; +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; +use parquet_variant_compute::VariantArray as ArrowVariantArray; +use vortex_array::ArrayRef; +use vortex_array::ExecutionCtx; +use vortex_array::IntoArray; +use vortex_array::VTable; +use vortex_array::arrow::ArrowExport; +use vortex_array::arrow::ArrowExportVTable; +use vortex_array::arrow::ArrowImport; +use vortex_array::arrow::ArrowImportVTable; +use vortex_array::arrow::ArrowSession; +use vortex_array::dtype::DType; +use vortex_array::dtype::extension::ExtDTypeRef; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_session::registry::CachedId; +use vortex_session::registry::Id; + +use crate::ParquetVariant; +use crate::ParquetVariantArrayExt; + +/// Arrow canonical extension name for Parquet Variant storage. +pub const PARQUET_VARIANT_ARROW_EXTENSION_NAME: &str = "arrow.parquet.variant"; + +static ARROW_PARQUET_VARIANT: CachedId = CachedId::new(PARQUET_VARIANT_ARROW_EXTENSION_NAME); + +impl ArrowExportVTable for ParquetVariant { + fn arrow_ext_id(&self) -> Id { + *ARROW_PARQUET_VARIANT + } + + fn vortex_ext_id(&self) -> Id { + ParquetVariant.id() + } + + fn to_arrow_field( + &self, + _name: &str, + _dtype: &ExtDTypeRef, + _session: &ArrowSession, + ) -> VortexResult> { + Ok(None) + } + + fn execute_arrow( + &self, + array: ArrayRef, + target: &Field, + ctx: &mut ExecutionCtx, + ) -> VortexResult { + if target + .metadata() + .get(EXTENSION_TYPE_NAME_KEY) + .map(String::as_str) + != Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME) + || !array.dtype().is_variant() + { + return Ok(ArrowExport::Unsupported(array)); + } + + let executed = array.execute_until::(ctx)?; + let parquet_array = executed + .as_opt::() + .ok_or_else(|| vortex_err!("cannot export Variant without ParquetVariant storage"))?; + let arrow_variant = parquet_array.to_arrow(ctx)?; + Ok(ArrowExport::Exported(Arc::new(arrow_variant.into_inner()))) + } +} + +impl ArrowImportVTable for ParquetVariant { + fn arrow_ext_id(&self) -> Id { + *ARROW_PARQUET_VARIANT + } + + fn from_arrow_field(&self, field: &Field) -> VortexResult> { + if field + .metadata() + .get(EXTENSION_TYPE_NAME_KEY) + .map(String::as_str) + != Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME) + { + return Ok(None); + } + + Ok(Some(DType::Variant(field.is_nullable().into()))) + } + + fn from_arrow_array( + &self, + array: ArrowArrayRef, + field: &Field, + dtype: &DType, + ) -> VortexResult { + if !matches!(dtype, DType::Variant(_)) + || field + .metadata() + .get(EXTENSION_TYPE_NAME_KEY) + .map(String::as_str) + != Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME) + || !matches!(array.data_type(), DataType::Struct(_)) + { + return Ok(ArrowImport::Unsupported(array)); + } + + let arrow_variant = ArrowVariantArray::try_new(array.as_struct())?; + let imported = if dtype.is_nullable() { + ParquetVariant::from_arrow_variant_nullable(&arrow_variant)? + } else { + ParquetVariant::from_arrow_variant(&arrow_variant)? + }; + Ok(ArrowImport::Imported(imported.into_array())) + } +} diff --git a/encodings/parquet-variant/src/lib.rs b/encodings/parquet-variant/src/lib.rs index e606e7b4e8c..2f87fffb374 100644 --- a/encodings/parquet-variant/src/lib.rs +++ b/encodings/parquet-variant/src/lib.rs @@ -25,11 +25,110 @@ //! [Arrow canonical extension type]: https://arrow.apache.org/docs/format/CanonicalExtensions.html#parquet-variant mod array; +mod arrow; mod kernel; mod operations; mod validity; mod vtable; +use std::sync::Arc; + pub use array::ParquetVariantArrayExt; +pub use arrow::PARQUET_VARIANT_ARROW_EXTENSION_NAME; +use vortex_array::arrow::ArrowSessionExt; +use vortex_array::session::ArraySessionExt; +use vortex_session::VortexSession; pub use vtable::ParquetVariant; pub use vtable::ParquetVariantArray; + +/// Register Parquet Variant array and Arrow extension support with a session. +pub fn initialize(session: &VortexSession) { + session.arrays().register(ParquetVariant); + session.arrow().register_exporter(Arc::new(ParquetVariant)); + session.arrow().register_importer(Arc::new(ParquetVariant)); +} + +#[cfg(test)] +mod arrow_session_tests { + use std::sync::Arc; + + use arrow_array::Array as _; + use arrow_array::ArrayRef as ArrowArrayRef; + use arrow_array::StructArray; + use arrow_array::cast::AsArray; + use arrow_schema::Field; + use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; + use parquet_variant::Variant as PqVariant; + use parquet_variant_compute::VariantArrayBuilder; + use vortex_array::VortexSessionExecute; + use vortex_array::arrow::ArrowSessionExt; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::session::ArraySession; + use vortex_error::VortexResult; + use vortex_session::VortexSession; + + use crate::ParquetVariant; + + fn session() -> VortexSession { + let session = VortexSession::empty().with::(); + crate::initialize(&session); + session + } + + fn arrow_variant_storage() -> StructArray { + let mut builder = VariantArrayBuilder::new(3); + builder.append_variant(PqVariant::from(42i8)); + builder.append_variant(PqVariant::from(true)); + builder.append_variant(PqVariant::from("vortex")); + builder.build().into_inner() + } + + fn arrow_variant_field(storage: &StructArray) -> Field { + Field::new("variant", storage.data_type().clone(), false).with_metadata( + [( + EXTENSION_TYPE_NAME_KEY.to_string(), + "arrow.parquet.variant".to_string(), + )] + .into(), + ) + } + + #[test] + fn arrow_session_imports_parquet_variant_extension_array() -> VortexResult<()> { + let session = session(); + let storage = arrow_variant_storage(); + let field = arrow_variant_field(&storage); + let imported = session + .arrow() + .from_arrow_array(Arc::new(storage) as ArrowArrayRef, &field)?; + + assert_eq!(imported.dtype(), &DType::Variant(Nullability::NonNullable)); + assert!(imported.as_opt::().is_some()); + Ok(()) + } + + #[test] + fn arrow_session_exports_parquet_variant_extension_array() -> VortexResult<()> { + let session = session(); + let storage = arrow_variant_storage(); + let field = arrow_variant_field(&storage); + let imported = session + .arrow() + .from_arrow_array(Arc::new(storage.clone()) as ArrowArrayRef, &field)?; + + let mut ctx = session.create_execution_ctx(); + let exported = session + .arrow() + .execute_arrow(imported, Some(&field), &mut ctx)?; + let exported = exported.as_struct(); + + assert_eq!(exported.len(), storage.len()); + assert_eq!(exported.column_names(), storage.column_names()); + assert_eq!(exported.fields(), storage.fields()); + for (actual, expected) in exported.columns().iter().zip(storage.columns()) { + assert_eq!(actual.to_data(), expected.to_data()); + } + Ok(()) + } +} diff --git a/vortex-array/src/arrow/session.rs b/vortex-array/src/arrow/session.rs index 012b9001bd2..0be4d8e0c50 100644 --- a/vortex-array/src/arrow/session.rs +++ b/vortex-array/src/arrow/session.rs @@ -125,7 +125,7 @@ pub trait ArrowExportVTable: 'static + Send + Sync + Debug { ) -> VortexResult; } -/// Plugin layer for importing an Arrow extension-typed array into a Vortex extension array. +/// Plugin layer for importing an Arrow extension-typed array into a Vortex array. /// /// Plugins are dispatched by `arrow_ext_id`. /// @@ -140,7 +140,7 @@ pub trait ArrowImportVTable: 'static + Send + Sync + Debug { #[allow(clippy::wrong_self_convention)] fn from_arrow_field(&self, field: &Field) -> VortexResult>; - /// Convert an Arrow array into a Vortex extension array of `dtype`. + /// Convert an Arrow array into a Vortex array of `dtype`. /// /// Returns ownership of `array` via [`ArrowImport::Unsupported`] when the plugin cannot /// handle the input. @@ -148,7 +148,8 @@ pub trait ArrowImportVTable: 'static + Send + Sync + Debug { fn from_arrow_array( &self, array: ArrowArrayRef, - dtype: &ExtDTypeRef, + field: &Field, + dtype: &DType, ) -> VortexResult; } @@ -490,16 +491,14 @@ impl ArrowSession { let importers = self.importers(&Id::new(extension_name)); if !importers.is_empty() { let dtype = self.from_arrow_field(field)?; - if let DType::Extension(ext_dtype) = dtype { - let mut current = array; - for plugin in importers.iter() { - match plugin.from_arrow_array(current, &ext_dtype)? { - ArrowImport::Imported(arr) => return Ok(arr), - ArrowImport::Unsupported(arr) => current = arr, - } + let mut current = array; + for plugin in importers.iter() { + match plugin.from_arrow_array(current, field, &dtype)? { + ArrowImport::Imported(arr) => return Ok(arr), + ArrowImport::Unsupported(arr) => current = arr, } - return ArrayRef::from_arrow(current.as_ref(), field.is_nullable()); } + return ArrayRef::from_arrow(current.as_ref(), field.is_nullable()); } } self.from_arrow_array_canonical(array, field) diff --git a/vortex-array/src/extension/uuid/arrow.rs b/vortex-array/src/extension/uuid/arrow.rs index eacbe975a55..7d683ae0af0 100644 --- a/vortex-array/src/extension/uuid/arrow.rs +++ b/vortex-array/src/extension/uuid/arrow.rs @@ -125,8 +125,12 @@ impl ArrowImportVTable for Uuid { fn from_arrow_array( &self, array: ArrowArrayRef, - dtype: &ExtDTypeRef, + _field: &Field, + dtype: &DType, ) -> VortexResult { + let DType::Extension(dtype) = dtype else { + return Ok(ArrowImport::Unsupported(array)); + }; if !matches!(array.data_type(), DataType::FixedSizeBinary(UUID_BYTE_LEN)) || !dtype.is::() { diff --git a/vortex-tensor/src/types/vector/arrow.rs b/vortex-tensor/src/types/vector/arrow.rs index b7e5e7577a7..e186a72bfe4 100644 --- a/vortex-tensor/src/types/vector/arrow.rs +++ b/vortex-tensor/src/types/vector/arrow.rs @@ -141,8 +141,12 @@ impl ArrowImportVTable for Vector { fn from_arrow_array( &self, array: ArrowArrayRef, - dtype: &ExtDTypeRef, + _field: &Field, + dtype: &DType, ) -> VortexResult { + let DType::Extension(dtype) = dtype else { + return Ok(ArrowImport::Unsupported(array)); + }; if !dtype.is::() { return Ok(ArrowImport::Unsupported(array)); } @@ -362,13 +366,11 @@ mod tests { #[test] fn from_arrow_array_returns_unsupported_for_non_fsl() -> VortexResult<()> { let dtype = vector_dtype(false); - let ext = dtype - .as_extension_opt() - .expect("vector dtype should be an extension dtype") - .clone(); + let field = Field::new("embedding", DataType::Int32, false); let int_array: ArrowArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); - let result = ::from_arrow_array(&Vector, int_array, &ext)?; + let result = + ::from_arrow_array(&Vector, int_array, &field, &dtype)?; assert!(matches!(result, ArrowImport::Unsupported(_))); Ok(()) } @@ -386,8 +388,13 @@ mod tests { ExtDType::try_with_vtable(Uuid, UuidMetadata::default(), uuid_storage)?.erased(); let fsl_arrow = arrow_fsl_f32(vec![1.0, 2.0, 3.0], DIM as i32); - let result = - ::from_arrow_array(&Vector, fsl_arrow, &uuid_ext)?; + let field = Field::new("embedding", fsl_arrow.data_type().clone(), false); + let result = ::from_arrow_array( + &Vector, + fsl_arrow, + &field, + &DType::Extension(uuid_ext), + )?; assert!(matches!(result, ArrowImport::Unsupported(_))); Ok(()) } From 644067c5db7ef361cdf0b9449a39c1b795840a54 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 27 May 2026 17:07:10 +0100 Subject: [PATCH 2/5] Fix thing Signed-off-by: Adam Gutglick --- encodings/parquet-variant/src/arrow.rs | 5 ++--- vortex-array/src/arrow/session.rs | 22 +++++++++------------- vortex-array/src/extension/uuid/arrow.rs | 6 ++---- vortex-tensor/src/types/vector/arrow.rs | 10 ++++++---- 4 files changed, 19 insertions(+), 24 deletions(-) diff --git a/encodings/parquet-variant/src/arrow.rs b/encodings/parquet-variant/src/arrow.rs index bcad493f0ad..cadc4715c61 100644 --- a/encodings/parquet-variant/src/arrow.rs +++ b/encodings/parquet-variant/src/arrow.rs @@ -20,7 +20,6 @@ use vortex_array::arrow::ArrowImport; use vortex_array::arrow::ArrowImportVTable; use vortex_array::arrow::ArrowSession; use vortex_array::dtype::DType; -use vortex_array::dtype::extension::ExtDTypeRef; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_session::registry::CachedId; @@ -39,14 +38,14 @@ impl ArrowExportVTable for ParquetVariant { *ARROW_PARQUET_VARIANT } - fn vortex_ext_id(&self) -> Id { + fn vortex_id(&self) -> Id { ParquetVariant.id() } fn to_arrow_field( &self, _name: &str, - _dtype: &ExtDTypeRef, + _dtype: &DType, _session: &ArrowSession, ) -> VortexResult> { Ok(None) diff --git a/vortex-array/src/arrow/session.rs b/vortex-array/src/arrow/session.rs index 0be4d8e0c50..73a09fbcb2f 100644 --- a/vortex-array/src/arrow/session.rs +++ b/vortex-array/src/arrow/session.rs @@ -61,8 +61,6 @@ use crate::dtype::Nullability; use crate::dtype::StructFields; use crate::dtype::arrow::FromArrowType; use crate::dtype::arrow::to_data_type_naive; -use crate::dtype::extension::ExtDTypeRef; -use crate::dtype::extension::ExtId; use crate::extension::datetime::AnyTemporal; use crate::extension::uuid::Uuid; use crate::validity::Validity; @@ -99,17 +97,17 @@ pub trait ArrowExportVTable: 'static + Send + Sync + Debug { /// The Arrow extension ID this plugin produces. fn arrow_ext_id(&self) -> Id; - /// The Vortex extension ID this plugin maps from. Used only for inference by + /// The Vortex array or extension ID this plugin maps from. Used only for inference by /// [`ArrowSession::to_arrow_field`] / [`ArrowSession::to_arrow_schema`]; never as a /// dispatch key for [`execute_arrow`][Self::execute_arrow]. - fn vortex_ext_id(&self) -> ExtId; + fn vortex_id(&self) -> Id; /// Build the Arrow [`Field`] this plugin produces for the given Vortex extension /// `dtype`. Used during schema inference. fn to_arrow_field( &self, name: &str, - dtype: &ExtDTypeRef, + dtype: &DType, session: &ArrowSession, ) -> VortexResult>; @@ -158,7 +156,7 @@ pub type ArrowImportVTableRef = Arc; type ExportMap = HashMap>; type ImportMap = HashMap>; -type ExportDTypeMap = HashMap>; +type ExportDTypeMap = HashMap>; /// Session-scoped registry of Arrow extension plugins. /// @@ -200,11 +198,7 @@ impl ArrowSession { exporter.arrow_ext_id(), ArrowExportVTableRef::clone(&exporter), ); - Self::insert( - &self.exporters_by_vortex, - exporter.vortex_ext_id(), - exporter, - ); + Self::insert(&self.exporters_by_vortex, exporter.vortex_id(), exporter); } /// Register an [`ArrowImportVTable`] under its source Arrow extension name. @@ -235,7 +229,7 @@ impl ArrowSession { .unwrap_or_else(|| Arc::from([])) } - fn exporters_by_vortex(&self, id: &ExtId) -> Arc<[ArrowExportVTableRef]> { + fn exporters_by_vortex(&self, id: &Id) -> Arc<[ArrowExportVTableRef]> { self.exporters_by_vortex .load() .get(id) @@ -287,7 +281,9 @@ impl ArrowSession { } DType::Extension(ext) if !ext.is::() => { for plugin in self.exporters_by_vortex(&ext.id()).iter() { - if let Some(field) = plugin.to_arrow_field(name, ext, self)? { + if let Some(field) = + plugin.to_arrow_field(name, &DType::Extension(ext.clone()), self)? + { return Ok(field); } } diff --git a/vortex-array/src/extension/uuid/arrow.rs b/vortex-array/src/extension/uuid/arrow.rs index 7d683ae0af0..3d66ce99d58 100644 --- a/vortex-array/src/extension/uuid/arrow.rs +++ b/vortex-array/src/extension/uuid/arrow.rs @@ -45,8 +45,6 @@ use crate::dtype::DType; use crate::dtype::Nullability; use crate::dtype::PType; use crate::dtype::extension::ExtDType; -use crate::dtype::extension::ExtDTypeRef; -use crate::dtype::extension::ExtId; use crate::dtype::extension::ExtVTable; use crate::extension::uuid::Uuid; use crate::extension::uuid::UuidMetadata; @@ -61,7 +59,7 @@ impl ArrowExportVTable for Uuid { *ARROW_UUID } - fn vortex_ext_id(&self) -> ExtId { + fn vortex_id(&self) -> Id { Uuid.id() } @@ -69,7 +67,7 @@ impl ArrowExportVTable for Uuid { fn to_arrow_field( &self, name: &str, - dtype: &ExtDTypeRef, + dtype: &DType, _session: &ArrowSession, ) -> VortexResult> { let mut field = Field::new( diff --git a/vortex-tensor/src/types/vector/arrow.rs b/vortex-tensor/src/types/vector/arrow.rs index e186a72bfe4..85690ea0cd5 100644 --- a/vortex-tensor/src/types/vector/arrow.rs +++ b/vortex-tensor/src/types/vector/arrow.rs @@ -29,8 +29,6 @@ use vortex_array::arrow::FromArrowArray; use vortex_array::dtype::DType; use vortex_array::dtype::arrow::FromArrowType; use vortex_array::dtype::extension::ExtDType; -use vortex_array::dtype::extension::ExtDTypeRef; -use vortex_array::dtype::extension::ExtId; use vortex_array::dtype::extension::ExtVTable; use vortex_array::extension::EmptyMetadata; use vortex_error::VortexResult; @@ -68,16 +66,20 @@ impl ArrowExportVTable for Vector { *ARROW_VECTOR } - fn vortex_ext_id(&self) -> ExtId { + fn vortex_id(&self) -> Id { Vector.id() } fn to_arrow_field( &self, name: &str, - dtype: &ExtDTypeRef, + dtype: &DType, session: &ArrowSession, ) -> VortexResult> { + let DType::Extension(dtype) = dtype else { + return Ok(None); + }; + if !dtype.is::() { return Ok(None); } From 556345fdf302ac3aa5f67fc618476489091adfcd Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 28 May 2026 11:39:20 +0100 Subject: [PATCH 3/5] Fixes Signed-off-by: Adam Gutglick --- encodings/parquet-variant/src/arrow.rs | 215 +++++++++++++++++++++++-- encodings/parquet-variant/src/lib.rs | 86 ---------- 2 files changed, 204 insertions(+), 97 deletions(-) diff --git a/encodings/parquet-variant/src/arrow.rs b/encodings/parquet-variant/src/arrow.rs index cadc4715c61..492fb4a0589 100644 --- a/encodings/parquet-variant/src/arrow.rs +++ b/encodings/parquet-variant/src/arrow.rs @@ -8,6 +8,7 @@ use arrow_array::ArrayRef as ArrowArrayRef; use arrow_array::cast::AsArray; use arrow_schema::DataType; use arrow_schema::Field; +use arrow_schema::Fields; use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; use parquet_variant_compute::VariantArray as ArrowVariantArray; use vortex_array::ArrayRef; @@ -29,8 +30,7 @@ use crate::ParquetVariant; use crate::ParquetVariantArrayExt; /// Arrow canonical extension name for Parquet Variant storage. -pub const PARQUET_VARIANT_ARROW_EXTENSION_NAME: &str = "arrow.parquet.variant"; - +const PARQUET_VARIANT_ARROW_EXTENSION_NAME: &str = "arrow.parquet.variant"; static ARROW_PARQUET_VARIANT: CachedId = CachedId::new(PARQUET_VARIANT_ARROW_EXTENSION_NAME); impl ArrowExportVTable for ParquetVariant { @@ -42,13 +42,36 @@ impl ArrowExportVTable for ParquetVariant { ParquetVariant.id() } + // The current API doesn't see the array at this point. + // which is what we actually need to know exactly what the arrow + // storage type is. fn to_arrow_field( &self, - _name: &str, - _dtype: &DType, + name: &str, + dtype: &DType, _session: &ArrowSession, ) -> VortexResult> { - Ok(None) + if !dtype.is_variant() { + return Ok(None); + } + + Ok(Some( + Field::new( + name, + DataType::Struct(Fields::from(vec![ + Arc::new(Field::new("metadata", DataType::BinaryView, false)), + Arc::new(Field::new("value", DataType::BinaryView, false)), + ])), + dtype.is_nullable(), + ) + .with_metadata( + [( + EXTENSION_TYPE_NAME_KEY.to_string(), + PARQUET_VARIANT_ARROW_EXTENSION_NAME.to_string(), + )] + .into(), + ), + )) } fn execute_arrow( @@ -60,8 +83,7 @@ impl ArrowExportVTable for ParquetVariant { if target .metadata() .get(EXTENSION_TYPE_NAME_KEY) - .map(String::as_str) - != Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME) + .is_some_and(|ext| ext != PARQUET_VARIANT_ARROW_EXTENSION_NAME) || !array.dtype().is_variant() { return Ok(ArrowExport::Unsupported(array)); @@ -85,8 +107,7 @@ impl ArrowImportVTable for ParquetVariant { if field .metadata() .get(EXTENSION_TYPE_NAME_KEY) - .map(String::as_str) - != Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME) + .is_some_and(|ext| ext != PARQUET_VARIANT_ARROW_EXTENSION_NAME) { return Ok(None); } @@ -104,8 +125,7 @@ impl ArrowImportVTable for ParquetVariant { || field .metadata() .get(EXTENSION_TYPE_NAME_KEY) - .map(String::as_str) - != Some(PARQUET_VARIANT_ARROW_EXTENSION_NAME) + .is_some_and(|ext| ext != PARQUET_VARIANT_ARROW_EXTENSION_NAME) || !matches!(array.data_type(), DataType::Struct(_)) { return Ok(ArrowImport::Unsupported(array)); @@ -120,3 +140,176 @@ impl ArrowImportVTable for ParquetVariant { Ok(ArrowImport::Imported(imported.into_array())) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::Array as _; + use arrow_array::ArrayRef as ArrowArrayRef; + use arrow_array::StructArray; + use arrow_array::cast::AsArray; + use arrow_schema::Field; + use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; + use parquet_variant::Variant as PqVariant; + use parquet_variant::VariantBuilder; + use parquet_variant_compute::VariantArrayBuilder; + use rstest::fixture; + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::VortexSessionExecute; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::arrow::ArrowExportVTable; + use vortex_array::arrow::ArrowSessionExt; + use vortex_array::assert_arrays_eq; + use vortex_array::dtype::DType; + use vortex_array::dtype::Nullability; + use vortex_array::session::ArraySession; + use vortex_array::validity::Validity; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_error::vortex_err; + use vortex_session::VortexSession; + + use super::PARQUET_VARIANT_ARROW_EXTENSION_NAME; + use crate::ParquetVariant; + + #[fixture] + fn session() -> VortexSession { + let session = VortexSession::empty().with::(); + crate::initialize(&session); + session + } + + fn arrow_variant_storage() -> StructArray { + let mut builder = VariantArrayBuilder::new(3); + builder.append_variant(PqVariant::from(42i8)); + builder.append_variant(PqVariant::from(true)); + builder.append_variant(PqVariant::from("vortex")); + builder.build().into_inner() + } + + fn arrow_variant_field(storage: &StructArray) -> Field { + Field::new("variant", storage.data_type().clone(), false).with_metadata( + [( + EXTENSION_TYPE_NAME_KEY.to_string(), + PARQUET_VARIANT_ARROW_EXTENSION_NAME.to_string(), + )] + .into(), + ) + } + + fn assert_struct_arrays_eq(actual: &StructArray, expected: &StructArray) { + assert_eq!(actual.len(), expected.len()); + assert_eq!(actual.column_names(), expected.column_names()); + assert_eq!(actual.fields(), expected.fields()); + assert_eq!(actual.nulls(), expected.nulls()); + for (actual, expected) in actual.columns().iter().zip(expected.columns()) { + assert_eq!(actual.to_data(), expected.to_data()); + } + } + + #[rstest] + fn import_parquet_variant_extension_array(session: VortexSession) -> VortexResult<()> { + let storage = arrow_variant_storage(); + let field = arrow_variant_field(&storage); + let imported = session + .arrow() + .from_arrow_array(Arc::new(storage) as ArrowArrayRef, &field)?; + + assert_eq!(imported.dtype(), &DType::Variant(Nullability::NonNullable)); + assert!(imported.as_opt::().is_some()); + Ok(()) + } + + #[rstest] + fn roundtrip_parquet_variant_extension_array_from_arrow( + session: VortexSession, + ) -> VortexResult<()> { + let storage = arrow_variant_storage(); + let field = arrow_variant_field(&storage); + let imported = session + .arrow() + .from_arrow_array(Arc::new(storage.clone()) as ArrowArrayRef, &field)?; + + let mut ctx = session.create_execution_ctx(); + let exported = session + .arrow() + .execute_arrow(imported, Some(&field), &mut ctx)?; + let exported = exported.as_struct(); + + assert_struct_arrays_eq(exported, &storage); + Ok(()) + } + + #[rstest] + fn roundtrip_parquet_variant_extension_array_from_vortex( + session: VortexSession, + ) -> VortexResult<()> { + let rows = [ + VariantBuilder::new().with_value(42i32).finish(), + VariantBuilder::new().with_value(true).finish(), + VariantBuilder::new().with_value("vortex").finish(), + ]; + let metadata = + VarBinViewArray::from_iter_bin(rows.iter().map(|(metadata, _)| metadata.as_slice())) + .into_array(); + let value = VarBinViewArray::from_iter_bin(rows.iter().map(|(_, value)| value.as_slice())) + .into_array(); + let array = ParquetVariant::try_new(Validity::NonNullable, metadata, Some(value), None)? + .into_array(); + let expected = array.clone(); + + let field = ParquetVariant + .to_arrow_field("variant", array.dtype(), &session.arrow())? + .ok_or_else(|| vortex_err!("expected ParquetVariant Arrow field"))?; + let mut ctx = session.create_execution_ctx(); + let exported = session + .arrow() + .execute_arrow(array, Some(&field), &mut ctx)?; + let actual = session + .arrow() + .from_arrow_array(Arc::clone(&exported), &field)?; + + assert_arrays_eq!(actual, expected); + Ok(()) + } + + #[rstest] + fn roundtrip_shredded_parquet_variant_extension_array_from_vortex( + session: VortexSession, + ) -> VortexResult<()> { + let rows = [ + VariantBuilder::new().with_value(10i32).finish(), + VariantBuilder::new().with_value(20i32).finish(), + VariantBuilder::new().with_value(30i32).finish(), + ]; + let metadata = + VarBinViewArray::from_iter_bin(rows.iter().map(|(metadata, _)| metadata.as_slice())) + .into_array(); + + let typed_value = buffer![10i32, 20, 30].into_array(); + let array = + ParquetVariant::try_new(Validity::NonNullable, metadata, None, Some(typed_value))? + .into_array(); + let expected = array.clone(); + + let field = ParquetVariant + .to_arrow_field("variant", array.dtype(), &session.arrow())? + .ok_or_else(|| vortex_err!("expected ParquetVariant Arrow field"))?; + let mut ctx = session.create_execution_ctx(); + let exported = session + .arrow() + .execute_arrow(array, Some(&field), &mut ctx)?; + assert_ne!( + exported.data_type(), + field.data_type(), + "The current arrow field isn't fully validated to the full storage type" + ); + + let actual = session.arrow().from_arrow_array(exported, &field)?; + + assert_arrays_eq!(actual, expected); + Ok(()) + } +} diff --git a/encodings/parquet-variant/src/lib.rs b/encodings/parquet-variant/src/lib.rs index 2f87fffb374..03d2a046442 100644 --- a/encodings/parquet-variant/src/lib.rs +++ b/encodings/parquet-variant/src/lib.rs @@ -34,7 +34,6 @@ mod vtable; use std::sync::Arc; pub use array::ParquetVariantArrayExt; -pub use arrow::PARQUET_VARIANT_ARROW_EXTENSION_NAME; use vortex_array::arrow::ArrowSessionExt; use vortex_array::session::ArraySessionExt; use vortex_session::VortexSession; @@ -47,88 +46,3 @@ pub fn initialize(session: &VortexSession) { session.arrow().register_exporter(Arc::new(ParquetVariant)); session.arrow().register_importer(Arc::new(ParquetVariant)); } - -#[cfg(test)] -mod arrow_session_tests { - use std::sync::Arc; - - use arrow_array::Array as _; - use arrow_array::ArrayRef as ArrowArrayRef; - use arrow_array::StructArray; - use arrow_array::cast::AsArray; - use arrow_schema::Field; - use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; - use parquet_variant::Variant as PqVariant; - use parquet_variant_compute::VariantArrayBuilder; - use vortex_array::VortexSessionExecute; - use vortex_array::arrow::ArrowSessionExt; - use vortex_array::dtype::DType; - use vortex_array::dtype::Nullability; - use vortex_array::session::ArraySession; - use vortex_error::VortexResult; - use vortex_session::VortexSession; - - use crate::ParquetVariant; - - fn session() -> VortexSession { - let session = VortexSession::empty().with::(); - crate::initialize(&session); - session - } - - fn arrow_variant_storage() -> StructArray { - let mut builder = VariantArrayBuilder::new(3); - builder.append_variant(PqVariant::from(42i8)); - builder.append_variant(PqVariant::from(true)); - builder.append_variant(PqVariant::from("vortex")); - builder.build().into_inner() - } - - fn arrow_variant_field(storage: &StructArray) -> Field { - Field::new("variant", storage.data_type().clone(), false).with_metadata( - [( - EXTENSION_TYPE_NAME_KEY.to_string(), - "arrow.parquet.variant".to_string(), - )] - .into(), - ) - } - - #[test] - fn arrow_session_imports_parquet_variant_extension_array() -> VortexResult<()> { - let session = session(); - let storage = arrow_variant_storage(); - let field = arrow_variant_field(&storage); - let imported = session - .arrow() - .from_arrow_array(Arc::new(storage) as ArrowArrayRef, &field)?; - - assert_eq!(imported.dtype(), &DType::Variant(Nullability::NonNullable)); - assert!(imported.as_opt::().is_some()); - Ok(()) - } - - #[test] - fn arrow_session_exports_parquet_variant_extension_array() -> VortexResult<()> { - let session = session(); - let storage = arrow_variant_storage(); - let field = arrow_variant_field(&storage); - let imported = session - .arrow() - .from_arrow_array(Arc::new(storage.clone()) as ArrowArrayRef, &field)?; - - let mut ctx = session.create_execution_ctx(); - let exported = session - .arrow() - .execute_arrow(imported, Some(&field), &mut ctx)?; - let exported = exported.as_struct(); - - assert_eq!(exported.len(), storage.len()); - assert_eq!(exported.column_names(), storage.column_names()); - assert_eq!(exported.fields(), storage.fields()); - for (actual, expected) in exported.columns().iter().zip(storage.columns()) { - assert_eq!(actual.to_data(), expected.to_data()); - } - Ok(()) - } -} From 03beac4b419e24fee5999295af64e6137338a44f Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 28 May 2026 11:51:41 +0100 Subject: [PATCH 4/5] Error when trying to infer field Signed-off-by: Adam Gutglick --- encodings/parquet-variant/src/arrow.rs | 70 +++++++++++++++----------- 1 file changed, 40 insertions(+), 30 deletions(-) diff --git a/encodings/parquet-variant/src/arrow.rs b/encodings/parquet-variant/src/arrow.rs index 492fb4a0589..a1ed8f50901 100644 --- a/encodings/parquet-variant/src/arrow.rs +++ b/encodings/parquet-variant/src/arrow.rs @@ -8,7 +8,6 @@ use arrow_array::ArrayRef as ArrowArrayRef; use arrow_array::cast::AsArray; use arrow_schema::DataType; use arrow_schema::Field; -use arrow_schema::Fields; use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; use parquet_variant_compute::VariantArray as ArrowVariantArray; use vortex_array::ArrayRef; @@ -22,6 +21,7 @@ use vortex_array::arrow::ArrowImportVTable; use vortex_array::arrow::ArrowSession; use vortex_array::dtype::DType; use vortex_error::VortexResult; +use vortex_error::vortex_bail; use vortex_error::vortex_err; use vortex_session::registry::CachedId; use vortex_session::registry::Id; @@ -42,12 +42,9 @@ impl ArrowExportVTable for ParquetVariant { ParquetVariant.id() } - // The current API doesn't see the array at this point. - // which is what we actually need to know exactly what the arrow - // storage type is. fn to_arrow_field( &self, - name: &str, + _name: &str, dtype: &DType, _session: &ArrowSession, ) -> VortexResult> { @@ -55,23 +52,7 @@ impl ArrowExportVTable for ParquetVariant { return Ok(None); } - Ok(Some( - Field::new( - name, - DataType::Struct(Fields::from(vec![ - Arc::new(Field::new("metadata", DataType::BinaryView, false)), - Arc::new(Field::new("value", DataType::BinaryView, false)), - ])), - dtype.is_nullable(), - ) - .with_metadata( - [( - EXTENSION_TYPE_NAME_KEY.to_string(), - PARQUET_VARIANT_ARROW_EXTENSION_NAME.to_string(), - )] - .into(), - ), - )) + vortex_bail!(InvalidArgument: "ParquetVariant array can't infer its Arrow storage schema from dtype"); } fn execute_arrow( @@ -149,6 +130,7 @@ mod tests { use arrow_array::ArrayRef as ArrowArrayRef; use arrow_array::StructArray; use arrow_array::cast::AsArray; + use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; use parquet_variant::Variant as PqVariant; @@ -159,7 +141,6 @@ mod tests { use vortex_array::IntoArray; use vortex_array::VortexSessionExecute; use vortex_array::arrays::VarBinViewArray; - use vortex_array::arrow::ArrowExportVTable; use vortex_array::arrow::ArrowSessionExt; use vortex_array::assert_arrays_eq; use vortex_array::dtype::DType; @@ -168,7 +149,6 @@ mod tests { use vortex_array::validity::Validity; use vortex_buffer::buffer; use vortex_error::VortexResult; - use vortex_error::vortex_err; use vortex_session::VortexSession; use super::PARQUET_VARIANT_ARROW_EXTENSION_NAME; @@ -260,9 +240,24 @@ mod tests { .into_array(); let expected = array.clone(); - let field = ParquetVariant - .to_arrow_field("variant", array.dtype(), &session.arrow())? - .ok_or_else(|| vortex_err!("expected ParquetVariant Arrow field"))?; + let field = Field::new( + "variant", + DataType::Struct( + vec![ + Field::new("metadata", DataType::Binary, false), + Field::new("value", DataType::Binary, false), + ] + .into(), + ), + false, + ) + .with_metadata( + [( + EXTENSION_TYPE_NAME_KEY.to_string(), + PARQUET_VARIANT_ARROW_EXTENSION_NAME.to_string(), + )] + .into(), + ); let mut ctx = session.create_execution_ctx(); let exported = session .arrow() @@ -294,9 +289,24 @@ mod tests { .into_array(); let expected = array.clone(); - let field = ParquetVariant - .to_arrow_field("variant", array.dtype(), &session.arrow())? - .ok_or_else(|| vortex_err!("expected ParquetVariant Arrow field"))?; + let field = Field::new( + "variant", + DataType::Struct( + vec![ + Field::new("metadata", DataType::Binary, false), + Field::new("typed_value", DataType::Int32, false), + ] + .into(), + ), + false, + ) + .with_metadata( + [( + EXTENSION_TYPE_NAME_KEY.to_string(), + PARQUET_VARIANT_ARROW_EXTENSION_NAME.to_string(), + )] + .into(), + ); let mut ctx = session.create_execution_ctx(); let exported = session .arrow() From 0dd96bdeca45970da5367bf9ee98fe839c9a44bd Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 28 May 2026 14:02:15 +0100 Subject: [PATCH 5/5] Add comment Signed-off-by: Adam Gutglick --- encodings/parquet-variant/src/arrow.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/encodings/parquet-variant/src/arrow.rs b/encodings/parquet-variant/src/arrow.rs index a1ed8f50901..5124158c5bd 100644 --- a/encodings/parquet-variant/src/arrow.rs +++ b/encodings/parquet-variant/src/arrow.rs @@ -48,6 +48,8 @@ impl ArrowExportVTable for ParquetVariant { dtype: &DType, _session: &ArrowSession, ) -> VortexResult> { + // TODO(#8135): This is wrong and won't work for shredded arrays. + // We need to be able to access array metadata to accurately provide Arrow schemas. if !dtype.is_variant() { return Ok(None); }