From 31f75782c47675c2d01af84e3a88fa2e10169d9b Mon Sep 17 00:00:00 2001 From: "boyu.wjb" Date: Mon, 26 Jan 2026 19:18:22 +0800 Subject: [PATCH 1/5] [chore] Implement is_auto_partitioned method in table.rs --- crates/fluss/src/metadata/table.rs | 157 ++++++++++++++++++++++++++++- 1 file changed, 155 insertions(+), 2 deletions(-) diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 0c0cdf5..8fa0afa 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -19,7 +19,7 @@ use crate::compression::ArrowCompressionInfo; use crate::error::Error::{IllegalArgument, InvalidTableError}; use crate::error::{Error, Result}; use crate::metadata::DataLakeFormat; -use crate::metadata::datatype::{DataField, DataType, RowType}; +use crate::metadata::datatype::{DataField, DataType, DataTypes, RowType}; use crate::{BucketId, PartitionId, TableId}; use core::fmt; use serde::{Deserialize, Serialize}; @@ -834,6 +834,69 @@ impl TableInfo { } } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AutoPartitionStrategy { + auto_partition_enabled: bool, + auto_partition_key: Option, + auto_partition_time_unit: Option, + auto_partition_num_precreate: i32, + auto_partition_num_retention: i32, + auto_partition_timezone: String, +} + +impl AutoPartitionStrategy { + pub fn from(properties: &HashMap) -> Self { + Self { + auto_partition_enabled: properties + .get("table.auto.partition.enabled") + .and_then(|s| s.parse().ok()) + .unwrap_or(false), + auto_partition_key: properties + .get("table.auto.partition.key") + .map(|s| s.to_string()), + auto_partition_time_unit: properties + .get("table.auto.partition.time.unit") + .map(|s| s.to_string()), + auto_partition_num_precreate: properties + .get("table.auto.partition.num.precreate") + .and_then(|s| s.parse().ok()) + .unwrap_or(0), + auto_partition_num_retention: properties + .get("table.auto.partition.num.retention") + .and_then(|s| s.parse().ok()) + .unwrap_or(0), + auto_partition_timezone: properties + .get("table.auto.partition.timezone") + .map(|s| s.to_string()) + .unwrap_or_else(|| "UTC".to_string()), + } + } + + pub fn is_auto_partition_enabled(&self) -> bool { + self.auto_partition_enabled + } + + pub fn key(&self) -> Option<&str> { + self.auto_partition_key.as_deref() + } + + pub fn time_unit(&self) -> Option<&str> { + self.auto_partition_time_unit.as_deref() + } + + pub fn num_precreate(&self) -> i32 { + self.auto_partition_num_precreate + } + + pub fn num_retention(&self) -> i32 { + self.auto_partition_num_retention + } + + pub fn timezone(&self) -> &str { + &self.auto_partition_timezone + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct TableConfig { pub properties: HashMap, @@ -866,6 +929,10 @@ impl TableConfig { .unwrap_or(DEFAULT_KV_FORMAT); kv_format.parse().map_err(Into::into) } + + pub fn get_auto_partition_strategy(&self) -> AutoPartitionStrategy { + AutoPartitionStrategy::from(&self.properties) + } } impl TableInfo { @@ -1003,7 +1070,11 @@ impl TableInfo { } pub fn is_auto_partitioned(&self) -> bool { - self.is_partitioned() && todo!() + self.is_partitioned() + && self + .table_config + .get_auto_partition_strategy() + .is_auto_partition_enabled() } pub fn get_partition_keys(&self) -> &[String] { @@ -1214,5 +1285,87 @@ mod tests { expected_message, result.unwrap() ); + + fn test_is_auto_partitioned() { + let schema = Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .unwrap(); + + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + + // 1. Not partitioned, auto partition disabled + let mut properties = HashMap::new(); + let table_info = TableInfo::new( + table_path.clone(), + 1, + 1, + schema.clone(), + vec!["id".to_string()], + vec![], // No partition keys + 1, + properties.clone(), + HashMap::new(), + None, + 0, + 0, + ); + assert!(!table_info.is_auto_partitioned()); + + // 2. Not partitioned, auto partition enabled + properties.insert("table.auto.partition.enabled".to_string(), "true".to_string()); + let table_info = TableInfo::new( + table_path.clone(), + 1, + 1, + schema.clone(), + vec!["id".to_string()], + vec![], // No partition keys + 1, + properties.clone(), + HashMap::new(), + None, + 0, + 0, + ); + assert!(!table_info.is_auto_partitioned()); + + // 3. Partitioned, auto partition disabled + properties.insert("table.auto.partition.enabled".to_string(), "false".to_string()); + let table_info = TableInfo::new( + table_path.clone(), + 1, + 1, + schema.clone(), + vec!["id".to_string()], + vec!["name".to_string()], // Partition keys + 1, + properties.clone(), + HashMap::new(), + None, + 0, + 0, + ); + assert!(!table_info.is_auto_partitioned()); + + // 4. Partitioned, auto partition enabled + properties.insert("table.auto.partition.enabled".to_string(), "true".to_string()); + let table_info = TableInfo::new( + table_path.clone(), + 1, + 1, + schema.clone(), + vec!["id".to_string()], + vec!["name".to_string()], // Partition keys + 1, + properties.clone(), + HashMap::new(), + None, + 0, + 0, + ); + assert!(table_info.is_auto_partitioned()); } } From e9d8d39f3cd6ebfb7a28d0b38da213d2670a0538 Mon Sep 17 00:00:00 2001 From: "boyu.wjb" Date: Tue, 27 Jan 2026 09:23:21 +0800 Subject: [PATCH 2/5] code format --- crates/fluss/src/metadata/table.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 8fa0afa..f77efe7 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -1315,7 +1315,10 @@ mod tests { assert!(!table_info.is_auto_partitioned()); // 2. Not partitioned, auto partition enabled - properties.insert("table.auto.partition.enabled".to_string(), "true".to_string()); + properties.insert( + "table.auto.partition.enabled".to_string(), + "true".to_string(), + ); let table_info = TableInfo::new( table_path.clone(), 1, @@ -1333,7 +1336,10 @@ mod tests { assert!(!table_info.is_auto_partitioned()); // 3. Partitioned, auto partition disabled - properties.insert("table.auto.partition.enabled".to_string(), "false".to_string()); + properties.insert( + "table.auto.partition.enabled".to_string(), + "false".to_string(), + ); let table_info = TableInfo::new( table_path.clone(), 1, @@ -1351,7 +1357,10 @@ mod tests { assert!(!table_info.is_auto_partitioned()); // 4. Partitioned, auto partition enabled - properties.insert("table.auto.partition.enabled".to_string(), "true".to_string()); + properties.insert( + "table.auto.partition.enabled".to_string(), + "true".to_string(), + ); let table_info = TableInfo::new( table_path.clone(), 1, From 30988c10b6daa763ac461135cd12f18ef7e79e67 Mon Sep 17 00:00:00 2001 From: "boyu.wjb" Date: Tue, 27 Jan 2026 12:14:22 +0800 Subject: [PATCH 3/5] fix(table): correct auto-partitioning configuration properties and default values --- crates/fluss/src/metadata/table.rs | 38 +++++++++++++++++------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index f77efe7..7eb9cf1 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -838,7 +838,7 @@ impl TableInfo { pub struct AutoPartitionStrategy { auto_partition_enabled: bool, auto_partition_key: Option, - auto_partition_time_unit: Option, + auto_partition_time_unit: String, auto_partition_num_precreate: i32, auto_partition_num_retention: i32, auto_partition_timezone: String, @@ -848,27 +848,33 @@ impl AutoPartitionStrategy { pub fn from(properties: &HashMap) -> Self { Self { auto_partition_enabled: properties - .get("table.auto.partition.enabled") + .get("table.auto-partition.enabled") .and_then(|s| s.parse().ok()) .unwrap_or(false), auto_partition_key: properties - .get("table.auto.partition.key") + .get("table.auto-partition.key") .map(|s| s.to_string()), auto_partition_time_unit: properties - .get("table.auto.partition.time.unit") - .map(|s| s.to_string()), + .get("table.auto-partition.time-unit") + .map(|s| s.to_string()) + .unwrap_or_else(|| "DAY".to_string()), auto_partition_num_precreate: properties - .get("table.auto.partition.num.precreate") + .get("table.auto-partition.num-precreate") .and_then(|s| s.parse().ok()) - .unwrap_or(0), + .unwrap_or(2), auto_partition_num_retention: properties - .get("table.auto.partition.num.retention") + .get("table.auto-partition.num-retention") .and_then(|s| s.parse().ok()) - .unwrap_or(0), + .unwrap_or(7), auto_partition_timezone: properties - .get("table.auto.partition.timezone") + .get("table.auto-partition.time-zone") .map(|s| s.to_string()) - .unwrap_or_else(|| "UTC".to_string()), + .unwrap_or_else(|| { + jiff::tz::TimeZone::system() + .iana_name() + .unwrap_or("UTC") + .to_string() + }), } } @@ -880,8 +886,8 @@ impl AutoPartitionStrategy { self.auto_partition_key.as_deref() } - pub fn time_unit(&self) -> Option<&str> { - self.auto_partition_time_unit.as_deref() + pub fn time_unit(&self) -> &str { + &self.auto_partition_time_unit } pub fn num_precreate(&self) -> i32 { @@ -1316,7 +1322,7 @@ mod tests { // 2. Not partitioned, auto partition enabled properties.insert( - "table.auto.partition.enabled".to_string(), + "table.auto-partition.enabled".to_string(), "true".to_string(), ); let table_info = TableInfo::new( @@ -1337,7 +1343,7 @@ mod tests { // 3. Partitioned, auto partition disabled properties.insert( - "table.auto.partition.enabled".to_string(), + "table.auto-partition.enabled".to_string(), "false".to_string(), ); let table_info = TableInfo::new( @@ -1358,7 +1364,7 @@ mod tests { // 4. Partitioned, auto partition enabled properties.insert( - "table.auto.partition.enabled".to_string(), + "table.auto-partition.enabled".to_string(), "true".to_string(), ); let table_info = TableInfo::new( From 648f3a5cd3764a9eb14aa379bdbfce1e5ab7af91 Mon Sep 17 00:00:00 2001 From: "boyu.wjb" Date: Wed, 28 Jan 2026 11:18:37 +0800 Subject: [PATCH 4/5] fix test --- crates/fluss/src/metadata/table.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 7eb9cf1..8588ac4 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -1291,6 +1291,7 @@ mod tests { expected_message, result.unwrap() ); + } fn test_is_auto_partitioned() { let schema = Schema::builder() From d858991e48ae800150e94d24fb95c47edef5945e Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Wed, 28 Jan 2026 11:45:25 +0800 Subject: [PATCH 5/5] fix ci --- .../src/client/table/partition_getter.rs | 3 +-- crates/fluss/src/metadata/table.rs | 21 ++++++++----------- crates/fluss/src/util/partition.rs | 4 ++-- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index 887c0a4..1a76106 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -49,8 +49,7 @@ impl PartitionGetter { } else { return Err(IllegalArgument { message: format!( - "The partition column {} is not in the row {}.", - partition_key, row_type + "The partition column {partition_key} is not in the row {row_type}." ), }); }; diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 8588ac4..7b93aca 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -19,7 +19,7 @@ use crate::compression::ArrowCompressionInfo; use crate::error::Error::{IllegalArgument, InvalidTableError}; use crate::error::{Error, Result}; use crate::metadata::DataLakeFormat; -use crate::metadata::datatype::{DataField, DataType, DataTypes, RowType}; +use crate::metadata::datatype::{DataField, DataType, RowType}; use crate::{BucketId, PartitionId, TableId}; use core::fmt; use serde::{Deserialize, Serialize}; @@ -712,14 +712,12 @@ impl TablePath { } if identifier.len() > MAX_NAME_LENGTH { return Some(format!( - "the length of '{}' is longer than the max allowed length {}", - identifier, MAX_NAME_LENGTH + "the length of '{identifier}' is longer than the max allowed length {MAX_NAME_LENGTH}" )); } if Self::contains_invalid_pattern(identifier) { return Some(format!( - "'{}' contains one or more characters other than ASCII alphanumerics, '_' and '-'", - identifier + "'{identifier}' contains one or more characters other than ASCII alphanumerics, '_' and '-'" )); } None @@ -728,8 +726,7 @@ impl TablePath { pub fn validate_prefix(identifier: &str) -> Option { if identifier.starts_with(INTERNAL_NAME_PREFIX) { return Some(format!( - "'{}' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server", - INTERNAL_NAME_PREFIX + "'{INTERNAL_NAME_PREFIX}' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server" )); } None @@ -1238,6 +1235,7 @@ impl LakeSnapshot { #[cfg(test)] mod tests { use super::*; + use crate::metadata::DataTypes; #[test] fn test_validate() { @@ -1272,8 +1270,7 @@ mod tests { assert_invalid_name( &invalid_long_name, &format!( - "the length of '{}' is longer than the max allowed length {}", - invalid_long_name, MAX_NAME_LENGTH + "the length of '{invalid_long_name}' is longer than the max allowed length {MAX_NAME_LENGTH}" ), ); } @@ -1282,8 +1279,7 @@ mod tests { let result = TablePath::detect_invalid_name(name); assert!( result.is_some(), - "Expected '{}' to be invalid, but it was valid", - name + "Expected '{name}' to be invalid, but it was valid" ); assert!( result.as_ref().unwrap().contains(expected_message), @@ -1292,7 +1288,8 @@ mod tests { result.unwrap() ); } - + + #[test] fn test_is_auto_partitioned() { let schema = Schema::builder() .column("id", DataTypes::int()) diff --git a/crates/fluss/src/util/partition.rs b/crates/fluss/src/util/partition.rs index 036cac4..ccc71a6 100644 --- a/crates/fluss/src/util/partition.rs +++ b/crates/fluss/src/util/partition.rs @@ -26,7 +26,7 @@ use std::fmt::Write; fn hex_string(bytes: &[u8]) -> String { let mut hex = String::with_capacity(bytes.len() * 2); for &b in bytes { - write!(hex, "{:02x}", b).unwrap(); + write!(hex, "{b:02x}").unwrap(); } hex } @@ -84,7 +84,7 @@ fn milli_to_string(milli: i32) -> String { .div_euclid(MILLIS_PER_SECOND as i32); let ms = milli.rem_euclid(MILLIS_PER_SECOND as i32); - format!("{:02}-{:02}-{:02}_{:03}", hour, min, sec, ms) + format!("{hour:02}-{min:02}-{sec:02}_{ms:03}") } fn time_to_string(time: Time) -> String {