Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};

/// Builder for `DataFileWriter`.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> {
inner: RollingFileWriterBuilder<B, L, F>,
}
Expand All @@ -53,9 +53,9 @@ where
{
type R = DataFileWriter<B, L, F>;

async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
Ok(DataFileWriter {
inner: Some(self.inner.clone().build()),
inner: Some(self.inner.build()),
partition_key,
})
}
Expand Down
12 changes: 6 additions & 6 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};

/// Builder for `EqualityDeleteWriter`.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct EqualityDeleteFileWriterBuilder<
B: FileWriterBuilder,
L: LocationGenerator,
Expand All @@ -60,7 +60,7 @@ where
}

/// Config for `EqualityDeleteWriter`.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct EqualityDeleteWriterConfig {
// Field ids used to determine row equality in equality delete files.
equality_ids: Vec<i32>,
Expand Down Expand Up @@ -123,11 +123,11 @@ where
{
type R = EqualityDeleteFileWriter<B, L, F>;

async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
Ok(EqualityDeleteFileWriter {
inner: Some(self.inner.clone().build()),
projector: self.config.projector,
equality_ids: self.config.equality_ids,
inner: Some(self.inner.build()),
projector: self.config.projector.clone(),
equality_ids: self.config.equality_ids.clone(),
partition_key,
})
}
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/writer/file_writer/location_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::Result;
use crate::spec::{DataFileFormat, PartitionKey, TableMetadata};

/// `LocationGenerator` used to generate the location of data file.
pub trait LocationGenerator: Clone + Send + 'static {
pub trait LocationGenerator: Clone + Send + Sync + 'static {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above.

/// Generate an absolute path for the given file name that includes the partition path.
///
/// # Arguments
Expand Down Expand Up @@ -94,7 +94,7 @@ impl LocationGenerator for DefaultLocationGenerator {
}

/// `FileNameGeneratorTrait` used to generate file name for data file. The file name can be passed to `LocationGenerator` to generate the location of the file.
pub trait FileNameGenerator: Clone + Send + 'static {
pub trait FileNameGenerator: Clone + Send + Sync + 'static {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above.

/// Generate a file name.
fn generate_file_name(&self) -> String;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/writer/file_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ pub mod rolling_writer;
type DefaultOutput = Vec<DataFileBuilder>;

/// File writer builder trait.
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
pub trait FileWriterBuilder<O = DefaultOutput>: Clone + Send + Sync + 'static {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to keep this Clone?

Copy link
Author

@leonzchang leonzchang Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the RollingFileWriterBuilder::build function, even though we change the builder not self-consumed, its inner still need to be cloned at the lowest levels of the construction chain.

IcebergWriterBuilderRollingFileWriterBuilderFileWriterBuilder / LocationGenerator / FileNameGenerator.

For reference #1735 (comment).

pub fn build(&self) -> RollingFileWriter<B, L, F> {
RollingFileWriter {
inner: None,
inner_builder: self.inner_builder.clone(),
target_file_size: self.target_file_size,
data_file_builders: vec![],
file_io: self.file_io.clone(),
location_generator: self.location_generator.clone(),
file_name_generator: self.file_name_generator.clone(),
}
}

/// The associated file writer type.
type R: FileWriter<O>;
/// Build file writer.
fn build(self, output_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
fn build(&self, output_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
}

/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc)
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ impl ParquetWriterBuilder {
impl FileWriterBuilder for ParquetWriterBuilder {
type R = ParquetWriter;

async fn build(self, output_file: OutputFile) -> Result<Self::R> {
async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
Ok(ParquetWriter {
schema: self.schema.clone(),
inner_writer: None,
writer_properties: self.props,
writer_properties: self.props.clone(),
current_row_num: 0,
output_file,
nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
Expand Down
12 changes: 5 additions & 7 deletions crates/iceberg/src/writer/file_writer/rolling_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ where
}

/// Build a new [`RollingFileWriter`].
pub fn build(self) -> RollingFileWriter<B, L, F> {
pub fn build(&self) -> RollingFileWriter<B, L, F> {
RollingFileWriter {
inner: None,
inner_builder: self.inner_builder,
inner_builder: self.inner_builder.clone(),
target_file_size: self.target_file_size,
data_file_builders: vec![],
file_io: self.file_io,
location_generator: self.location_generator,
file_name_generator: self.file_name_generator,
file_io: self.file_io.clone(),
location_generator: self.location_generator.clone(),
file_name_generator: self.file_name_generator.clone(),
}
}
}
Expand Down Expand Up @@ -192,7 +192,6 @@ where
// initialize inner writer
self.inner = Some(
self.inner_builder
.clone()
.build(self.new_output_file(partition_key)?)
.await?,
);
Expand All @@ -206,7 +205,6 @@ where
// start a new writer
self.inner = Some(
self.inner_builder
.clone()
.build(self.new_output_file(partition_key)?)
.await?,
);
Expand Down
8 changes: 3 additions & 5 deletions crates/iceberg/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@
//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
//! type R = LatencyRecordWriter<B::R>;
//!
//! async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
//! async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
//! Ok(LatencyRecordWriter {
//! inner_writer: self.inner_writer_builder.build(partition_key).await?,
//! })
Expand Down Expand Up @@ -398,13 +398,11 @@ type DefaultOutput = Vec<DataFile>;

/// The builder for iceberg writer.
#[async_trait::async_trait]
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
Send + Clone + 'static
{
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>: Send + Sync + 'static {
/// The associated writer type.
type R: IcebergWriter<I, O>;
/// Build the iceberg writer with an optional partition key.
async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
}

/// The iceberg writer used to write data to iceberg table.
Expand Down
1 change: 0 additions & 1 deletion crates/iceberg/src/writer/partitioning/clustered_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ where
// Create a new writer for the new partition
self.current_writer = Some(
self.inner_builder
.clone()
.build(Some(partition_key.clone()))
.await?,
);
Expand Down
1 change: 0 additions & 1 deletion crates/iceberg/src/writer/partitioning/fanout_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ where
if !self.partition_writers.contains_key(partition_key.data()) {
let writer = self
.inner_builder
.clone()
.build(Some(partition_key.clone()))
.await?;
self.partition_writers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
pub async fn write(&mut self, input: I) -> Result<()> {
// Lazily create writer on first write
if self.writer.is_none() {
self.writer = Some(self.inner_builder.clone().build(None).await?);
self.writer = Some(self.inner_builder.build(None).await?);
}

// Write directly to inner writer
Expand Down
Loading