From ba45a4dc9897aeaf33ce20837b48c8772703ad52 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Tue, 27 Jan 2026 21:28:16 +0800 Subject: [PATCH] feat: support scan from oss --- crates/fluss/Cargo.toml | 5 ++- crates/fluss/src/client/credentials.rs | 16 +++++++ .../src/client/table/partition_getter.rs | 3 +- crates/fluss/src/io/mod.rs | 5 +++ crates/fluss/src/io/storage.rs | 14 +++++- crates/fluss/src/io/storage_oss.rs | 45 +++++++++++++++++++ crates/fluss/src/metadata/table.rs | 15 +++---- crates/fluss/src/util/partition.rs | 4 +- 8 files changed, 90 insertions(+), 17 deletions(-) create mode 100644 crates/fluss/src/io/storage_oss.rs diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml index 9aeee72d..6b2707be 100644 --- a/crates/fluss/Cargo.toml +++ b/crates/fluss/Cargo.toml @@ -22,12 +22,13 @@ version = { workspace = true } name = "fluss" [features] -default = ["storage-memory", "storage-fs", "storage-s3"] -storage-all = ["storage-memory", "storage-fs", "storage-s3"] +default = ["storage-memory", "storage-fs"] +storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-oss"] storage-memory = ["opendal/services-memory"] storage-fs = ["opendal/services-fs"] storage-s3 = ["opendal/services-s3"] +storage-oss = ["opendal/services-oss"] integration_tests = [] [dependencies] diff --git a/crates/fluss/src/client/credentials.rs b/crates/fluss/src/client/credentials.rs index 93a53669..29bd6025 100644 --- a/crates/fluss/src/client/credentials.rs +++ b/crates/fluss/src/client/credentials.rs @@ -55,10 +55,14 @@ struct Credentials { /// needs_inversion is true for path_style_access -> enable_virtual_host_style conversion fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> { match hadoop_key { + // S3 specific configurations "fs.s3a.endpoint" => Some(("endpoint".to_string(), false)), "fs.s3a.endpoint.region" => Some(("region".to_string(), false)), "fs.s3a.path.style.access" => Some(("enable_virtual_host_style".to_string(), true)), "fs.s3a.connection.ssl.enabled" => None, + // OSS specific configurations + "fs.oss.endpoint" => Some(("endpoint".to_string(), false)), + "fs.oss.region" => Some(("region".to_string(), false)), _ => None, } } @@ -342,6 +346,7 @@ mod tests { #[test] fn convert_hadoop_key_to_opendal_maps_known_keys() { + // S3 keys let (key, invert) = convert_hadoop_key_to_opendal("fs.s3a.endpoint").expect("key"); assert_eq!(key, "endpoint"); assert!(!invert); @@ -351,6 +356,17 @@ mod tests { assert!(invert); assert!(convert_hadoop_key_to_opendal("fs.s3a.connection.ssl.enabled").is_none()); + + // OSS keys + let (key, invert) = convert_hadoop_key_to_opendal("fs.oss.endpoint").expect("key"); + assert_eq!(key, "endpoint"); + assert!(!invert); + + let (key, invert) = convert_hadoop_key_to_opendal("fs.oss.region").expect("key"); + assert_eq!(key, "region"); + assert!(!invert); + + // Unknown key assert!(convert_hadoop_key_to_opendal("unknown.key").is_none()); } diff --git a/crates/fluss/src/client/table/partition_getter.rs b/crates/fluss/src/client/table/partition_getter.rs index 887c0a4f..1a761068 100644 --- a/crates/fluss/src/client/table/partition_getter.rs +++ b/crates/fluss/src/client/table/partition_getter.rs @@ -49,8 +49,7 @@ impl PartitionGetter { } else { return Err(IllegalArgument { message: format!( - "The partition column {} is not in the row {}.", - partition_key, row_type + "The partition column {partition_key} is not in the row {row_type}." ), }); }; diff --git a/crates/fluss/src/io/mod.rs b/crates/fluss/src/io/mod.rs index a03a3945..74265017 100644 --- a/crates/fluss/src/io/mod.rs +++ b/crates/fluss/src/io/mod.rs @@ -37,3 +37,8 @@ use storage_memory::*; mod storage_s3; #[cfg(feature = "storage-s3")] use storage_s3::*; + +#[cfg(feature = "storage-oss")] +mod storage_oss; +#[cfg(feature = "storage-oss")] +use storage_oss::*; diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs index d90eaa57..1934a782 100644 --- a/crates/fluss/src/io/storage.rs +++ b/crates/fluss/src/io/storage.rs @@ -19,7 +19,6 @@ use crate::error; use crate::error::Result; use crate::io::FileIOBuilder; use opendal::{Operator, Scheme}; -use std::collections::HashMap; /// The storage carries all supported storage services in fluss #[derive(Debug)] @@ -30,6 +29,8 @@ pub enum Storage { LocalFs, #[cfg(feature = "storage-s3")] S3 { props: HashMap }, + #[cfg(feature = "storage-oss")] + Oss { props: HashMap }, } impl Storage { @@ -44,6 +45,8 @@ impl Storage { Scheme::Fs => Ok(Self::LocalFs), #[cfg(feature = "storage-s3")] Scheme::S3 => Ok(Self::S3 { props }), + #[cfg(feature = "storage-oss")] + Scheme::Oss => Ok(Self::Oss { props }), _ => Err(error::Error::IoUnsupported { message: format!("Unsupported storage feature {scheme_str}"), }), @@ -79,6 +82,14 @@ impl Storage { let op = super::s3_config_build(&s3_props)?; Ok((op, key)) } + #[cfg(feature = "storage-oss")] + Storage::Oss { props } => { + let (bucket, key) = super::parse_oss_path(path); + let mut oss_props = props.clone(); + oss_props.insert("bucket".to_string(), bucket.to_string()); + let op = super::oss_config_build(&oss_props)?; + Ok((op, key)) + } } } @@ -87,6 +98,7 @@ impl Storage { "memory" => Ok(Scheme::Memory), "file" | "" => Ok(Scheme::Fs), "s3" | "s3a" => Ok(Scheme::S3), + "oss" => Ok(Scheme::Oss), s => Ok(s.parse::()?), } } diff --git a/crates/fluss/src/io/storage_oss.rs b/crates/fluss/src/io/storage_oss.rs new file mode 100644 index 00000000..3d5d0549 --- /dev/null +++ b/crates/fluss/src/io/storage_oss.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use opendal::Configurator; +use opendal::Operator; +use opendal::layers::TimeoutLayer; +use opendal::services::OssConfig; +use std::collections::HashMap; +use std::time::Duration; + +pub(crate) fn oss_config_build(props: &HashMap) -> Result { + let config = OssConfig::from_iter(props.clone())?; + let op = Operator::from_config(config)?.finish(); + + // Add timeout layer to prevent hanging on OSS operations + let timeout_layer = TimeoutLayer::new() + .with_timeout(Duration::from_secs(10)) + .with_io_timeout(Duration::from_secs(30)); + + Ok(op.layer(timeout_layer)) +} + +pub(crate) fn parse_oss_path(path: &str) -> (&str, &str) { + let path = path.strip_prefix("oss://").unwrap_or(path); + + match path.find('/') { + Some(idx) => (&path[..idx], &path[idx + 1..]), + None => (path, ""), + } +} diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 0c0cdf51..66c0eb83 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -712,14 +712,12 @@ impl TablePath { } if identifier.len() > MAX_NAME_LENGTH { return Some(format!( - "the length of '{}' is longer than the max allowed length {}", - identifier, MAX_NAME_LENGTH + "the length of '{identifier}' is longer than the max allowed length {MAX_NAME_LENGTH}" )); } if Self::contains_invalid_pattern(identifier) { return Some(format!( - "'{}' contains one or more characters other than ASCII alphanumerics, '_' and '-'", - identifier + "'{identifier}' contains one or more characters other than ASCII alphanumerics, '_' and '-'" )); } None @@ -728,8 +726,7 @@ impl TablePath { pub fn validate_prefix(identifier: &str) -> Option { if identifier.starts_with(INTERNAL_NAME_PREFIX) { return Some(format!( - "'{}' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server", - INTERNAL_NAME_PREFIX + "'{INTERNAL_NAME_PREFIX}' is not allowed as prefix, since it is reserved for internal databases/internal tables/internal partitions in Fluss server" )); } None @@ -1195,8 +1192,7 @@ mod tests { assert_invalid_name( &invalid_long_name, &format!( - "the length of '{}' is longer than the max allowed length {}", - invalid_long_name, MAX_NAME_LENGTH + "the length of '{invalid_long_name}' is longer than the max allowed length {MAX_NAME_LENGTH}" ), ); } @@ -1205,8 +1201,7 @@ mod tests { let result = TablePath::detect_invalid_name(name); assert!( result.is_some(), - "Expected '{}' to be invalid, but it was valid", - name + "Expected '{name}' to be invalid, but it was valid" ); assert!( result.as_ref().unwrap().contains(expected_message), diff --git a/crates/fluss/src/util/partition.rs b/crates/fluss/src/util/partition.rs index 036cac46..ccc71a6b 100644 --- a/crates/fluss/src/util/partition.rs +++ b/crates/fluss/src/util/partition.rs @@ -26,7 +26,7 @@ use std::fmt::Write; fn hex_string(bytes: &[u8]) -> String { let mut hex = String::with_capacity(bytes.len() * 2); for &b in bytes { - write!(hex, "{:02x}", b).unwrap(); + write!(hex, "{b:02x}").unwrap(); } hex } @@ -84,7 +84,7 @@ fn milli_to_string(milli: i32) -> String { .div_euclid(MILLIS_PER_SECOND as i32); let ms = milli.rem_euclid(MILLIS_PER_SECOND as i32); - format!("{:02}-{:02}-{:02}_{:03}", hour, min, sec, ms) + format!("{hour:02}-{min:02}-{sec:02}_{ms:03}") } fn time_to_string(time: Time) -> String {