Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow_ipc::CompressionType;
#[cfg(feature = "parquet_encryption")]
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
use crate::error::_config_err;
use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
use crate::format::{CatalogFormat, ExplainAnalyzeLevel, ExplainFormat};
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
Expand Down Expand Up @@ -241,7 +241,7 @@ config_namespace! {
pub location: Option<String>, default = None

/// Type of `TableProvider` to use when loading `default` schema
pub format: Option<String>, default = None
pub format: Option<CatalogFormat>, default = None

/// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
Expand Down Expand Up @@ -3080,8 +3080,8 @@ mod tests {
#[cfg(feature = "parquet")]
use crate::config::TableParquetOptions;
use crate::config::{
ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
Extensions, TableOptions,
ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ConfigOptions,
ExtensionOptions, Extensions, TableOptions,
};
use std::any::Any;
use std::collections::HashMap;
Expand Down Expand Up @@ -3455,4 +3455,45 @@ mod tests {
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}

#[test]
fn test_catalog_format_validation() {
let mut config = ConfigOptions::default();

// Test all valid formats
config.set("datafusion.catalog.format", "CSV").unwrap();
assert!(config.catalog.format.is_some());

config.set("datafusion.catalog.format", "JSON").unwrap();
assert!(config.catalog.format.is_some());

config.set("datafusion.catalog.format", "NDJSON").unwrap();
assert!(config.catalog.format.is_some());

config.set("datafusion.catalog.format", "ARROW").unwrap();
assert!(config.catalog.format.is_some());

#[cfg(feature = "parquet")]
{
config.set("datafusion.catalog.format", "PARQUET").unwrap();
assert!(config.catalog.format.is_some());
}

#[cfg(feature = "avro")]
{
config.set("datafusion.catalog.format", "AVRO").unwrap();
assert!(config.catalog.format.is_some());
}

// Case-insensitive
config.set("datafusion.catalog.format", "csv").unwrap();
assert!(config.catalog.format.is_some());

// Invalid format should error
let result = config.set("datafusion.catalog.format", "INVALID");
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("Invalid catalog format"));
assert!(error_msg.contains("INVALID")); // Error should mention the invalid value
}
}
86 changes: 86 additions & 0 deletions datafusion/common/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,89 @@ impl ConfigField for ExplainAnalyzeLevel {
Ok(())
}
}

/// File format types for catalog table providers
///
/// These correspond to the table provider factories registered in DataFusion.
/// The format is used when loading tables for the default schema via
/// `datafusion.catalog.format` configuration.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CatalogFormat {
/// CSV (Comma-Separated Values) formats
CSV,
#[cfg(feature = "parquet")]
/// Parquet format
PARQUET,
/// JSON (JavaScript Object Notation) format
JSON,
/// NDJSON (Newline Delimited JSON) format
NDJSON,
/// Avro format
#[cfg(feature = "avro")]
AVRO,
/// Arrow format
ARROW,
}

impl FromStr for CatalogFormat {
type Err = DataFusionError;

fn from_str(format: &str) -> std::result::Result<Self, Self::Err> {
match format.to_uppercase().as_str() {
"CSV" => Ok(CatalogFormat::CSV),
#[cfg(feature = "parquet")]
"PARQUET" => Ok(CatalogFormat::PARQUET),
"JSON" => Ok(CatalogFormat::JSON),
"NDJSON" => Ok(CatalogFormat::NDJSON),
#[cfg(feature = "avro")]
"AVRO" => Ok(CatalogFormat::AVRO),
"ARROW" => Ok(CatalogFormat::ARROW),
_ => {
// Build error message dynamically based on enabled features
#[allow(unused_mut)]
let mut valid = vec!["CSV", "JSON", "NDJSON", "ARROW"];
#[cfg(feature = "parquet")]
valid.push("PARQUET");
#[cfg(feature = "avro")]
valid.push("AVRO");
Err(DataFusionError::Configuration(format!(
"Invalid catalog format. Expected one of: {}. Got '{format}'",
valid.join(", ")
)))
}
}
}
}

impl Display for CatalogFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match self {
CatalogFormat::CSV => "CSV",
#[cfg(feature = "parquet")]
CatalogFormat::PARQUET => "PARQUET",
CatalogFormat::JSON => "JSON",
CatalogFormat::NDJSON => "NDJSON",
#[cfg(feature = "avro")]
CatalogFormat::AVRO => "AVRO",
CatalogFormat::ARROW => "ARROW",
};
write!(f, "{s}")
}
}

impl Default for CatalogFormat {
fn default() -> Self {
CatalogFormat::CSV
}
}

impl ConfigField for CatalogFormat {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}

fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = CatalogFormat::from_str(value)?;
Ok(())
}
}
Loading