From 96f654a99a6a037ca647b8bb4b8bd5c4b87355ad Mon Sep 17 00:00:00 2001 From: leonzchang Date: Sat, 29 Nov 2025 11:43:44 +0800 Subject: [PATCH 1/5] avoid consuming self in writer builders --- .../src/writer/base_writer/data_file_writer.rs | 8 ++++---- .../src/writer/base_writer/equality_delete_writer.rs | 12 ++++++------ crates/iceberg/src/writer/file_writer/mod.rs | 2 +- .../iceberg/src/writer/file_writer/parquet_writer.rs | 4 ++-- .../iceberg/src/writer/file_writer/rolling_writer.rs | 10 +++++----- crates/iceberg/src/writer/mod.rs | 2 +- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index dcaa56cc97..a6777500bf 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -47,13 +47,13 @@ where #[async_trait::async_trait] impl IcebergWriterBuilder for DataFileWriterBuilder where - B: FileWriterBuilder, - L: LocationGenerator, - F: FileNameGenerator, + B: FileWriterBuilder + Sync, + L: LocationGenerator + Sync, + F: FileNameGenerator + Sync, { type R = DataFileWriter; - async fn build(self, partition_key: Option) -> Result { + async fn build(&self, partition_key: Option) -> Result { Ok(DataFileWriter { inner: Some(self.inner.clone().build()), partition_key, diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 664ea84334..a3357835d9 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -117,17 +117,17 @@ impl EqualityDeleteWriterConfig { #[async_trait::async_trait] impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder where - B: FileWriterBuilder, - L: LocationGenerator, - F: FileNameGenerator, + B: FileWriterBuilder + Sync, + L: LocationGenerator + Sync, + F: FileNameGenerator + Sync, { type R = EqualityDeleteFileWriter; - async fn build(self, partition_key: Option) -> Result { + async fn build(&self, partition_key: Option) -> Result { Ok(EqualityDeleteFileWriter { inner: Some(self.inner.clone().build()), - projector: self.config.projector, - equality_ids: self.config.equality_ids, + projector: self.config.projector.clone(), + equality_ids: self.config.equality_ids.clone(), partition_key, }) } diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 2ed6414ce8..4c7466e4a4 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -40,7 +40,7 @@ pub trait FileWriterBuilder: Send + Clone + 'static { /// The associated file writer type. type R: FileWriter; /// Build file writer. - fn build(self, output_file: OutputFile) -> impl Future> + Send; + fn build(&self, output_file: OutputFile) -> impl Future> + Send; } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 3e9d1715c9..5cf031a9fb 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -81,11 +81,11 @@ impl ParquetWriterBuilder { impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; - async fn build(self, output_file: OutputFile) -> Result { + async fn build(&self, output_file: OutputFile) -> Result { Ok(ParquetWriter { schema: self.schema.clone(), inner_writer: None, - writer_properties: self.props, + writer_properties: self.props.clone(), current_row_num: 0, output_file, nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode), diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 8f03654786..774a322c90 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -103,15 +103,15 @@ where } /// Build a new [`RollingFileWriter`]. - pub fn build(self) -> RollingFileWriter { + pub fn build(&self) -> RollingFileWriter { RollingFileWriter { inner: None, - inner_builder: self.inner_builder, + inner_builder: self.inner_builder.clone(), target_file_size: self.target_file_size, data_file_builders: vec![], - file_io: self.file_io, - location_generator: self.location_generator, - file_name_generator: self.file_name_generator, + file_io: self.file_io.clone(), + location_generator: self.location_generator.clone(), + file_name_generator: self.file_name_generator.clone(), } } } diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index a7892d49e1..d0d3bc3a0f 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -404,7 +404,7 @@ pub trait IcebergWriterBuilder: /// The associated writer type. type R: IcebergWriter; /// Build the iceberg writer with an optional partition key. - async fn build(self, partition_key: Option) -> Result; + async fn build(&self, partition_key: Option) -> Result; } /// The iceberg writer used to write data to iceberg table. From 029a384a81801740d92d48a59c0f482c4aa4ff64 Mon Sep 17 00:00:00 2001 From: leonzchang Date: Sat, 29 Nov 2025 12:23:05 +0800 Subject: [PATCH 2/5] fix doc --- crates/iceberg/src/writer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index d0d3bc3a0f..ba6fa5f048 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -148,7 +148,7 @@ //! impl IcebergWriterBuilder for LatencyRecordWriterBuilder { //! type R = LatencyRecordWriter; //! -//! async fn build(self, partition_key: Option) -> Result { +//! async fn build(&self, partition_key: Option) -> Result { //! Ok(LatencyRecordWriter { //! inner_writer: self.inner_writer_builder.build(partition_key).await?, //! }) From db891df42237c5d3a62fdc6a3af217459447b5f1 Mon Sep 17 00:00:00 2001 From: leonzchang Date: Sat, 29 Nov 2025 12:40:11 +0800 Subject: [PATCH 3/5] add Sync trait bound in writer doc --- crates/iceberg/src/writer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index ba6fa5f048..11dbf7a41a 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -145,7 +145,7 @@ //! } //! //! #[async_trait::async_trait] -//! impl IcebergWriterBuilder for LatencyRecordWriterBuilder { +//! impl IcebergWriterBuilder for LatencyRecordWriterBuilder { //! type R = LatencyRecordWriter; //! //! async fn build(&self, partition_key: Option) -> Result { From 967c1556af8b17d87f7f3df778d3015c69616f20 Mon Sep 17 00:00:00 2001 From: leonzchang Date: Tue, 2 Dec 2025 21:06:11 +0800 Subject: [PATCH 4/5] remove redundant clone & make sync as super trait --- .../src/writer/base_writer/data_file_writer.rs | 10 +++++----- .../src/writer/base_writer/equality_delete_writer.rs | 12 ++++++------ .../src/writer/file_writer/location_generator.rs | 4 ++-- crates/iceberg/src/writer/file_writer/mod.rs | 2 +- .../iceberg/src/writer/file_writer/rolling_writer.rs | 2 -- crates/iceberg/src/writer/mod.rs | 4 +--- .../src/writer/partitioning/clustered_writer.rs | 1 - .../iceberg/src/writer/partitioning/fanout_writer.rs | 1 - .../src/writer/partitioning/unpartitioned_writer.rs | 2 +- 9 files changed, 16 insertions(+), 22 deletions(-) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index a6777500bf..cb7bd172ea 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -27,7 +27,7 @@ use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; /// Builder for `DataFileWriter`. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct DataFileWriterBuilder { inner: RollingFileWriterBuilder, } @@ -47,15 +47,15 @@ where #[async_trait::async_trait] impl IcebergWriterBuilder for DataFileWriterBuilder where - B: FileWriterBuilder + Sync, - L: LocationGenerator + Sync, - F: FileNameGenerator + Sync, + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, { type R = DataFileWriter; async fn build(&self, partition_key: Option) -> Result { Ok(DataFileWriter { - inner: Some(self.inner.clone().build()), + inner: Some(self.inner.build()), partition_key, }) } diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index a3357835d9..cd0b19148d 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -34,7 +34,7 @@ use crate::writer::{IcebergWriter, IcebergWriterBuilder}; use crate::{Error, ErrorKind, Result}; /// Builder for `EqualityDeleteWriter`. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct EqualityDeleteFileWriterBuilder< B: FileWriterBuilder, L: LocationGenerator, @@ -60,7 +60,7 @@ where } /// Config for `EqualityDeleteWriter`. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct EqualityDeleteWriterConfig { // Field ids used to determine row equality in equality delete files. equality_ids: Vec, @@ -117,15 +117,15 @@ impl EqualityDeleteWriterConfig { #[async_trait::async_trait] impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder where - B: FileWriterBuilder + Sync, - L: LocationGenerator + Sync, - F: FileNameGenerator + Sync, + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, { type R = EqualityDeleteFileWriter; async fn build(&self, partition_key: Option) -> Result { Ok(EqualityDeleteFileWriter { - inner: Some(self.inner.clone().build()), + inner: Some(self.inner.build()), projector: self.config.projector.clone(), equality_ids: self.config.equality_ids.clone(), partition_key, diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index a5cfc28292..0ad4d91ac6 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -24,7 +24,7 @@ use crate::Result; use crate::spec::{DataFileFormat, PartitionKey, TableMetadata}; /// `LocationGenerator` used to generate the location of data file. -pub trait LocationGenerator: Clone + Send + 'static { +pub trait LocationGenerator: Clone + Send + Sync + 'static { /// Generate an absolute path for the given file name that includes the partition path. /// /// # Arguments @@ -94,7 +94,7 @@ impl LocationGenerator for DefaultLocationGenerator { } /// `FileNameGeneratorTrait` used to generate file name for data file. The file name can be passed to `LocationGenerator` to generate the location of the file. -pub trait FileNameGenerator: Clone + Send + 'static { +pub trait FileNameGenerator: Clone + Send + Sync + 'static { /// Generate a file name. fn generate_file_name(&self) -> String; } diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 4c7466e4a4..101919f5b3 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -36,7 +36,7 @@ pub mod rolling_writer; type DefaultOutput = Vec; /// File writer builder trait. -pub trait FileWriterBuilder: Send + Clone + 'static { +pub trait FileWriterBuilder: Clone + Send + Sync + 'static { /// The associated file writer type. type R: FileWriter; /// Build file writer. diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index 774a322c90..06246ab660 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -192,7 +192,6 @@ where // initialize inner writer self.inner = Some( self.inner_builder - .clone() .build(self.new_output_file(partition_key)?) .await?, ); @@ -206,7 +205,6 @@ where // start a new writer self.inner = Some( self.inner_builder - .clone() .build(self.new_output_file(partition_key)?) .await?, ); diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 11dbf7a41a..d1a8d4fa93 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -398,9 +398,7 @@ type DefaultOutput = Vec; /// The builder for iceberg writer. #[async_trait::async_trait] -pub trait IcebergWriterBuilder: - Send + Clone + 'static -{ +pub trait IcebergWriterBuilder: Send + 'static { /// The associated writer type. type R: IcebergWriter; /// Build the iceberg writer with an optional partition key. diff --git a/crates/iceberg/src/writer/partitioning/clustered_writer.rs b/crates/iceberg/src/writer/partitioning/clustered_writer.rs index 3587723965..01eb452083 100644 --- a/crates/iceberg/src/writer/partitioning/clustered_writer.rs +++ b/crates/iceberg/src/writer/partitioning/clustered_writer.rs @@ -118,7 +118,6 @@ where // Create a new writer for the new partition self.current_writer = Some( self.inner_builder - .clone() .build(Some(partition_key.clone())) .await?, ); diff --git a/crates/iceberg/src/writer/partitioning/fanout_writer.rs b/crates/iceberg/src/writer/partitioning/fanout_writer.rs index 796c1a4888..21a174b0d0 100644 --- a/crates/iceberg/src/writer/partitioning/fanout_writer.rs +++ b/crates/iceberg/src/writer/partitioning/fanout_writer.rs @@ -73,7 +73,6 @@ where if !self.partition_writers.contains_key(partition_key.data()) { let writer = self .inner_builder - .clone() .build(Some(partition_key.clone())) .await?; self.partition_writers diff --git a/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs index 0fb9cba3f1..29825a5416 100644 --- a/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs +++ b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs @@ -75,7 +75,7 @@ where pub async fn write(&mut self, input: I) -> Result<()> { // Lazily create writer on first write if self.writer.is_none() { - self.writer = Some(self.inner_builder.clone().build(None).await?); + self.writer = Some(self.inner_builder.build(None).await?); } // Write directly to inner writer From b58c4636679dadd508db2dc3c62accea1cafe877 Mon Sep 17 00:00:00 2001 From: leonzchang Date: Wed, 3 Dec 2025 16:21:41 +0800 Subject: [PATCH 5/5] add Sync as super trait to IcebergWriterBuilder --- crates/iceberg/src/writer/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index d1a8d4fa93..d475230685 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -145,7 +145,7 @@ //! } //! //! #[async_trait::async_trait] -//! impl IcebergWriterBuilder for LatencyRecordWriterBuilder { +//! impl IcebergWriterBuilder for LatencyRecordWriterBuilder { //! type R = LatencyRecordWriter; //! //! async fn build(&self, partition_key: Option) -> Result { @@ -398,7 +398,7 @@ type DefaultOutput = Vec; /// The builder for iceberg writer. #[async_trait::async_trait] -pub trait IcebergWriterBuilder: Send + 'static { +pub trait IcebergWriterBuilder: Send + Sync + 'static { /// The associated writer type. type R: IcebergWriter; /// Build the iceberg writer with an optional partition key.