From 49f220521c715d13a515b9a1b8f06cb4d05718d6 Mon Sep 17 00:00:00 2001 From: Aly Date: Fri, 26 Dec 2025 05:16:17 +0200 Subject: [PATCH] fix: validate catalog.format using CatalogFormat enum - Add CatalogFormat enum with valid format values (CSV, PARQUET, JSON, NDJSON, AVRO, ARROW) - Implement FromStr for CatalogFormat with validation and clear error messages - Implement Display and ConfigField traits for CatalogFormat - Update CatalogOptions.format from Option to Option - Update session_state_defaults.rs to work with enum - Add test for catalog format validation --- datafusion/common/src/config.rs | 49 +++++++++++++++++-- datafusion/common/src/format.rs | 86 +++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2bea2ec5a4526..304ad55ee6d44 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -22,7 +22,7 @@ use arrow_ipc::CompressionType; #[cfg(feature = "parquet_encryption")] use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties}; use crate::error::_config_err; -use crate::format::{ExplainAnalyzeLevel, ExplainFormat}; +use crate::format::{CatalogFormat, ExplainAnalyzeLevel, ExplainFormat}; use crate::parsers::CompressionTypeVariant; use crate::utils::get_available_parallelism; use crate::{DataFusionError, Result}; @@ -241,7 +241,7 @@ config_namespace! { pub location: Option, default = None /// Type of `TableProvider` to use when loading `default` schema - pub format: Option, default = None + pub format: Option, default = None /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE` /// if not specified explicitly in the statement. @@ -3080,8 +3080,8 @@ mod tests { #[cfg(feature = "parquet")] use crate::config::TableParquetOptions; use crate::config::{ - ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions, - Extensions, TableOptions, + ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ConfigOptions, + ExtensionOptions, Extensions, TableOptions, }; use std::any::Any; use std::collections::HashMap; @@ -3455,4 +3455,45 @@ mod tests { let parsed_metadata = table_config.parquet.key_value_metadata; assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into()))); } + + #[test] + fn test_catalog_format_validation() { + let mut config = ConfigOptions::default(); + + // Test all valid formats + config.set("datafusion.catalog.format", "CSV").unwrap(); + assert!(config.catalog.format.is_some()); + + config.set("datafusion.catalog.format", "JSON").unwrap(); + assert!(config.catalog.format.is_some()); + + config.set("datafusion.catalog.format", "NDJSON").unwrap(); + assert!(config.catalog.format.is_some()); + + config.set("datafusion.catalog.format", "ARROW").unwrap(); + assert!(config.catalog.format.is_some()); + + #[cfg(feature = "parquet")] + { + config.set("datafusion.catalog.format", "PARQUET").unwrap(); + assert!(config.catalog.format.is_some()); + } + + #[cfg(feature = "avro")] + { + config.set("datafusion.catalog.format", "AVRO").unwrap(); + assert!(config.catalog.format.is_some()); + } + + // Case-insensitive + config.set("datafusion.catalog.format", "csv").unwrap(); + assert!(config.catalog.format.is_some()); + + // Invalid format should error + let result = config.set("datafusion.catalog.format", "INVALID"); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!(error_msg.contains("Invalid catalog format")); + assert!(error_msg.contains("INVALID")); // Error should mention the invalid value + } } diff --git a/datafusion/common/src/format.rs b/datafusion/common/src/format.rs index a505bd0e1c74e..712f58e1e09c5 100644 --- a/datafusion/common/src/format.rs +++ b/datafusion/common/src/format.rs @@ -250,3 +250,89 @@ impl ConfigField for ExplainAnalyzeLevel { Ok(()) } } + +/// File format types for catalog table providers +/// +/// These correspond to the table provider factories registered in DataFusion. +/// The format is used when loading tables for the default schema via +/// `datafusion.catalog.format` configuration. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum CatalogFormat { + /// CSV (Comma-Separated Values) formats + CSV, + #[cfg(feature = "parquet")] + /// Parquet format + PARQUET, + /// JSON (JavaScript Object Notation) format + JSON, + /// NDJSON (Newline Delimited JSON) format + NDJSON, + /// Avro format + #[cfg(feature = "avro")] + AVRO, + /// Arrow format + ARROW, +} + +impl FromStr for CatalogFormat { + type Err = DataFusionError; + + fn from_str(format: &str) -> std::result::Result { + match format.to_uppercase().as_str() { + "CSV" => Ok(CatalogFormat::CSV), + #[cfg(feature = "parquet")] + "PARQUET" => Ok(CatalogFormat::PARQUET), + "JSON" => Ok(CatalogFormat::JSON), + "NDJSON" => Ok(CatalogFormat::NDJSON), + #[cfg(feature = "avro")] + "AVRO" => Ok(CatalogFormat::AVRO), + "ARROW" => Ok(CatalogFormat::ARROW), + _ => { + // Build error message dynamically based on enabled features + #[allow(unused_mut)] + let mut valid = vec!["CSV", "JSON", "NDJSON", "ARROW"]; + #[cfg(feature = "parquet")] + valid.push("PARQUET"); + #[cfg(feature = "avro")] + valid.push("AVRO"); + Err(DataFusionError::Configuration(format!( + "Invalid catalog format. Expected one of: {}. Got '{format}'", + valid.join(", ") + ))) + } + } + } +} + +impl Display for CatalogFormat { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + CatalogFormat::CSV => "CSV", + #[cfg(feature = "parquet")] + CatalogFormat::PARQUET => "PARQUET", + CatalogFormat::JSON => "JSON", + CatalogFormat::NDJSON => "NDJSON", + #[cfg(feature = "avro")] + CatalogFormat::AVRO => "AVRO", + CatalogFormat::ARROW => "ARROW", + }; + write!(f, "{s}") + } +} + +impl Default for CatalogFormat { + fn default() -> Self { + CatalogFormat::CSV + } +} + +impl ConfigField for CatalogFormat { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = CatalogFormat::from_str(value)?; + Ok(()) + } +}