Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { workspace = true, features = ["rt", "time", "fs"] }
lexical-write-float = "1.0.6"

[dev-dependencies]
arrow-array = { workspace = true }
Expand Down
162 changes: 158 additions & 4 deletions crates/integrations/datafusion/src/system_tables/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@ use std::any::Any;
use std::sync::{Arc, OnceLock};

use async_trait::async_trait;
use datafusion::arrow::array::{new_null_array, Int64Array, RecordBatch, StringArray};
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::catalog::Session;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result as DFResult;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use paimon::spec::{ManifestFileMeta, ManifestList};
use paimon::spec::{BinaryRow, DataField, ManifestFileMeta, ManifestList};
use paimon::table::{SnapshotManager, Table};

use super::row_string_cast::format_row_as_java_cast_string;
use crate::error::to_datafusion_error;

const MIN_PARTITION_STATS_INDEX: usize = 5;
const MAX_PARTITION_STATS_INDEX: usize = 6;

pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
Ok(Arc::new(ManifestsTable { table }))
}
Expand Down Expand Up @@ -93,15 +97,42 @@ impl TableProvider for ManifestsTable {
let mut num_added = Vec::with_capacity(n);
let mut num_deleted = Vec::with_capacity(n);
let mut schema_ids = Vec::with_capacity(n);
let mut min_partition_stats: Vec<Option<String>> = Vec::with_capacity(n);
let mut max_partition_stats: Vec<Option<String>> = Vec::with_capacity(n);
let mut min_row_ids: Vec<Option<i64>> = Vec::with_capacity(n);
let mut max_row_ids: Vec<Option<i64>> = Vec::with_capacity(n);
let partition_fields = self.table.schema().partition_fields();
let projected_columns = projection.map(Vec::as_slice);
let materialize_min_partition_stats =
should_materialize_column(projected_columns, MIN_PARTITION_STATS_INDEX);
let materialize_max_partition_stats =
should_materialize_column(projected_columns, MAX_PARTITION_STATS_INDEX);

for meta in metas {
let stats = meta.partition_stats();
file_names.push(meta.file_name().to_string());
file_sizes.push(meta.file_size());
num_added.push(meta.num_added_files());
num_deleted.push(meta.num_deleted_files());
schema_ids.push(meta.schema_id());
min_partition_stats.push(
materialize_partition_stats_value(
materialize_min_partition_stats,
stats.min_values(),
stats.null_counts(),
&partition_fields,
)
.map_err(to_datafusion_error)?,
);
max_partition_stats.push(
materialize_partition_stats_value(
materialize_max_partition_stats,
stats.max_values(),
stats.null_counts(),
&partition_fields,
)
.map_err(to_datafusion_error)?,
);
min_row_ids.push(meta.min_row_id());
max_row_ids.push(meta.max_row_id());
}
Expand All @@ -115,8 +146,8 @@ impl TableProvider for ManifestsTable {
Arc::new(Int64Array::from(num_added)),
Arc::new(Int64Array::from(num_deleted)),
Arc::new(Int64Array::from(schema_ids)),
new_null_array(&DataType::Utf8, n),
new_null_array(&DataType::Utf8, n),
Arc::new(StringArray::from(min_partition_stats)),
Arc::new(StringArray::from(max_partition_stats)),
Arc::new(Int64Array::from(min_row_ids)),
Arc::new(Int64Array::from(max_row_ids)),
],
Expand Down Expand Up @@ -157,3 +188,126 @@ async fn collect_manifests(table: &Table) -> paimon::Result<Vec<ManifestFileMeta
metas.extend(changelog);
Ok(metas)
}

fn should_materialize_column(projection: Option<&[usize]>, column_index: usize) -> bool {
match projection {
Some(projection) => projection.contains(&column_index),
None => true,
}
}

fn materialize_partition_stats_value(
materialize: bool,
value_bytes: &[u8],
null_counts: &[Option<i64>],
partition_fields: &[DataField],
) -> paimon::Result<Option<String>> {
if materialize {
format_partition_stats_value(value_bytes, null_counts, partition_fields)
} else {
Ok(None)
}
}

fn format_partition_stats_value(
value_bytes: &[u8],
null_counts: &[Option<i64>],
partition_fields: &[DataField],
) -> paimon::Result<Option<String>> {
if value_bytes.is_empty() {
return if partition_fields.is_empty() || null_counts.len() == partition_fields.len() {
Ok(Some(format_all_null_partition_row(partition_fields.len())))
} else {
Ok(None)
};
}

let row = BinaryRow::from_serialized_bytes(value_bytes)?;
format_row_as_java_cast_string(&row, partition_fields).map(Some)
}

fn format_all_null_partition_row(arity: usize) -> String {
if arity == 0 {
return "{}".to_string();
}
format!("{{{}}}", vec!["null"; arity].join(", "))
}

#[cfg(test)]
mod tests {
use super::*;
use paimon::spec::{DataType as PaimonDataType, Datum, FloatType, IntType, VarCharType};

fn field(name: &str, data_type: PaimonDataType) -> DataField {
DataField::new(0, name.to_string(), data_type)
}

fn serialized_row(values: &[(Option<Datum>, PaimonDataType)]) -> Vec<u8> {
let refs: Vec<_> = values
.iter()
.map(|(datum, data_type)| (datum.as_ref(), data_type))
.collect();
BinaryRow::from_datums(&refs).to_serialized_bytes()
}

#[test]
fn test_should_materialize_column() {
let projected_stats = vec![MIN_PARTITION_STATS_INDEX];
let projected_without_stats = vec![0, 1, 2];

assert!(should_materialize_column(None, MIN_PARTITION_STATS_INDEX));
assert!(should_materialize_column(
Some(projected_stats.as_slice()),
MIN_PARTITION_STATS_INDEX
));
assert!(!should_materialize_column(
Some(projected_without_stats.as_slice()),
MIN_PARTITION_STATS_INDEX
));
}

#[test]
fn test_unprojected_partition_stats_are_not_formatted() {
let data_type = PaimonDataType::Float(FloatType::new());
let fields = vec![field("pt", data_type.clone())];
let bytes = serialized_row(&[(Some(Datum::Float(1.0)), data_type.clone())]);

assert_eq!(
materialize_partition_stats_value(false, &bytes, &[Some(0)], &fields).unwrap(),
None
);
assert_eq!(
materialize_partition_stats_value(true, &bytes, &[Some(0)], &fields).unwrap(),
Some("{1.0}".to_string())
);
}

#[test]
fn test_format_empty_partition_row() {
assert_eq!(
format_partition_stats_value(&[], &[], &[]).unwrap(),
Some("{}".to_string())
);
}

#[test]
fn test_format_empty_bytes_with_matching_null_counts_as_all_null() {
let fields = vec![
field("pt1", PaimonDataType::Int(IntType::new())),
field("pt2", PaimonDataType::VarChar(VarCharType::string_type())),
];
assert_eq!(
format_partition_stats_value(&[], &[Some(2), Some(2)], &fields).unwrap(),
Some("{null, null}".to_string())
);
}

#[test]
fn test_format_empty_bytes_with_mismatched_null_counts_as_unknown() {
let fields = vec![field("pt", PaimonDataType::Int(IntType::new()))];
assert_eq!(
format_partition_stats_value(&[], &[], &fields).unwrap(),
None
);
}
}
1 change: 1 addition & 0 deletions crates/integrations/datafusion/src/system_tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::error::to_datafusion_error;
mod branches;
mod manifests;
mod options;
mod row_string_cast;
mod schemas;
mod snapshots;
mod tags;
Expand Down
Loading
Loading