Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/iceberg/src/writer/base_writer/data_file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};

/// Builder for `DataFileWriter`.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> {
inner: RollingFileWriterBuilder<B, L, F>,
}
Expand All @@ -53,9 +53,9 @@ where
{
type R = DataFileWriter<B, L, F>;

async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
Ok(DataFileWriter {
inner: Some(self.inner.clone().build()),
inner: Some(self.inner.build()),
partition_key,
})
}
Expand Down
12 changes: 6 additions & 6 deletions crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};

/// Builder for `EqualityDeleteWriter`.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct EqualityDeleteFileWriterBuilder<
B: FileWriterBuilder,
L: LocationGenerator,
Expand All @@ -60,7 +60,7 @@ where
}

/// Config for `EqualityDeleteWriter`.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct EqualityDeleteWriterConfig {
// Field ids used to determine row equality in equality delete files.
equality_ids: Vec<i32>,
Expand Down Expand Up @@ -123,11 +123,11 @@ where
{
type R = EqualityDeleteFileWriter<B, L, F>;

async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
Ok(EqualityDeleteFileWriter {
inner: Some(self.inner.clone().build()),
projector: self.config.projector,
equality_ids: self.config.equality_ids,
inner: Some(self.inner.build()),
projector: self.config.projector.clone(),
equality_ids: self.config.equality_ids.clone(),
partition_key,
})
}
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/writer/file_writer/location_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::Result;
use crate::spec::{DataFileFormat, PartitionKey, TableMetadata};

/// `LocationGenerator` used to generate the location of data file.
pub trait LocationGenerator: Clone + Send + 'static {
pub trait LocationGenerator: Clone + Send + Sync + 'static {
/// Generate an absolute path for the given file name that includes the partition path.
///
/// # Arguments
Expand Down Expand Up @@ -94,7 +94,7 @@ impl LocationGenerator for DefaultLocationGenerator {
}

/// `FileNameGeneratorTrait` used to generate file name for data file. The file name can be passed to `LocationGenerator` to generate the location of the file.
pub trait FileNameGenerator: Clone + Send + 'static {
pub trait FileNameGenerator: Clone + Send + Sync + 'static {
/// Generate a file name.
fn generate_file_name(&self) -> String;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/writer/file_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ pub mod rolling_writer;
type DefaultOutput = Vec<DataFileBuilder>;

/// File writer builder trait.
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
pub trait FileWriterBuilder<O = DefaultOutput>: Clone + Send + Sync + 'static {
/// The associated file writer type.
type R: FileWriter<O>;
/// Build file writer.
fn build(self, output_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
fn build(&self, output_file: OutputFile) -> impl Future<Output = Result<Self::R>> + Send;
}

/// File writer focus on writing record batch to different physical file format.(Such as parquet. orc)
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/writer/file_writer/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ impl ParquetWriterBuilder {
impl FileWriterBuilder for ParquetWriterBuilder {
type R = ParquetWriter;

async fn build(self, output_file: OutputFile) -> Result<Self::R> {
async fn build(&self, output_file: OutputFile) -> Result<Self::R> {
Ok(ParquetWriter {
schema: self.schema.clone(),
inner_writer: None,
writer_properties: self.props,
writer_properties: self.props.clone(),
current_row_num: 0,
output_file,
nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode),
Expand Down
12 changes: 5 additions & 7 deletions crates/iceberg/src/writer/file_writer/rolling_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,15 @@ where
}

/// Build a new [`RollingFileWriter`].
pub fn build(self) -> RollingFileWriter<B, L, F> {
pub fn build(&self) -> RollingFileWriter<B, L, F> {
RollingFileWriter {
inner: None,
inner_builder: self.inner_builder,
inner_builder: self.inner_builder.clone(),
target_file_size: self.target_file_size,
data_file_builders: vec![],
file_io: self.file_io,
location_generator: self.location_generator,
file_name_generator: self.file_name_generator,
file_io: self.file_io.clone(),
location_generator: self.location_generator.clone(),
file_name_generator: self.file_name_generator.clone(),
}
}
}
Expand Down Expand Up @@ -192,7 +192,6 @@ where
// initialize inner writer
self.inner = Some(
self.inner_builder
.clone()
.build(self.new_output_file(partition_key)?)
.await?,
);
Expand All @@ -206,7 +205,6 @@ where
// start a new writer
self.inner = Some(
self.inner_builder
.clone()
.build(self.new_output_file(partition_key)?)
.await?,
);
Expand Down
10 changes: 4 additions & 6 deletions crates/iceberg/src/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@
//! }
//!
//! #[async_trait::async_trait]
//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
//! impl<B: IcebergWriterBuilder + Sync> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why we don't add Sync as supertrait here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CTTY This was the missing part. Thank you!

//! type R = LatencyRecordWriter<B::R>;
//!
//! async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
//! async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
//! Ok(LatencyRecordWriter {
//! inner_writer: self.inner_writer_builder.build(partition_key).await?,
//! })
Expand Down Expand Up @@ -398,13 +398,11 @@ type DefaultOutput = Vec<DataFile>;

/// The builder for iceberg writer.
#[async_trait::async_trait]
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
Send + Clone + 'static
{
pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>: Send + 'static {
/// The associated writer type.
type R: IcebergWriter<I, O>;
/// Build the iceberg writer with an optional partition key.
async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
async fn build(&self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
}

/// The iceberg writer used to write data to iceberg table.
Expand Down
1 change: 0 additions & 1 deletion crates/iceberg/src/writer/partitioning/clustered_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ where
// Create a new writer for the new partition
self.current_writer = Some(
self.inner_builder
.clone()
.build(Some(partition_key.clone()))
.await?,
);
Expand Down
1 change: 0 additions & 1 deletion crates/iceberg/src/writer/partitioning/fanout_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ where
if !self.partition_writers.contains_key(partition_key.data()) {
let writer = self
.inner_builder
.clone()
.build(Some(partition_key.clone()))
.await?;
self.partition_writers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
pub async fn write(&mut self, input: I) -> Result<()> {
// Lazily create writer on first write
if self.writer.is_none() {
self.writer = Some(self.inner_builder.clone().build(None).await?);
self.writer = Some(self.inner_builder.build(None).await?);
}

// Write directly to inner writer
Expand Down
Loading