diff --git a/src/sinks/topsql_data_deltalake/mod.rs b/src/sinks/topsql_data_deltalake/mod.rs index b81dd2f..21a65b8 100644 --- a/src/sinks/topsql_data_deltalake/mod.rs +++ b/src/sinks/topsql_data_deltalake/mod.rs @@ -117,14 +117,14 @@ impl GenerateConfig for DeltaLakeConfig { #[typetag::serde(name = "topsql_data_deltalake")] impl SinkConfig for DeltaLakeConfig { async fn build(&self, cx: SinkContext) -> vector::Result<(VectorSink, Healthcheck)> { - error!( + info!( "DEBUG: Building Delta Lake sink with bucket: {:?}", self.bucket ); // Create S3 service if bucket is configured let s3_service = if self.bucket.is_some() { - error!("DEBUG: Bucket configured, creating S3 service"); + info!("DEBUG: Bucket configured, creating S3 service"); match self.create_service(&cx.proxy).await { Ok(service) => { info!("S3 service created successfully"); diff --git a/src/sinks/topsql_data_deltalake/processor.rs b/src/sinks/topsql_data_deltalake/processor.rs index b24daa8..6f1b15f 100644 --- a/src/sinks/topsql_data_deltalake/processor.rs +++ b/src/sinks/topsql_data_deltalake/processor.rs @@ -357,16 +357,37 @@ impl TopSQLDeltaLakeSink { // Get or create writer for this table let mut writers = self.writers.lock().await; let writer = writers.entry(table_name.to_string()).or_insert_with(|| { + let (table_type, table_instance) = match table_name + .strip_prefix("topsql_") + .and_then(|rest| rest.split_once('_')) + { + Some((t, inst)) if !t.is_empty() && !inst.is_empty() => (t, inst), + _ => { + error!( + "Unexpected table_name format (expected `topsql_{{type}}_{{instance}}`): {}", + table_name + ); + ("unknown", "unknown") + } + }; + + let type_dir = format!("type=topsql_{}", table_type); + let instance_dir = format!("instance={}", table_instance); + let table_path = if self.base_path.to_string_lossy().starts_with("s3://") { - // For S3 paths, append the table name to the S3 path + // For S3 paths, build a partition-like directory structure + // /topsql/data/type=.../instance=.../ + let base = self.base_path.to_string_lossy(); + let base = base.trim_end_matches('/'); PathBuf::from(format!( - "{}/{}", - self.base_path.to_string_lossy(), - table_name + "{}/{}/{}", + base, type_dir, instance_dir )) } else { // For local paths, use join as before - self.base_path.join(table_name) + self.base_path + .join(&type_dir) + .join(&instance_dir) }; let table_config = self diff --git a/src/sinks/topsql_meta_deltalake/mod.rs b/src/sinks/topsql_meta_deltalake/mod.rs index 3a8ab85..6b332a3 100644 --- a/src/sinks/topsql_meta_deltalake/mod.rs +++ b/src/sinks/topsql_meta_deltalake/mod.rs @@ -126,14 +126,14 @@ impl GenerateConfig for DeltaLakeConfig { #[typetag::serde(name = "topsql_meta_deltalake")] impl SinkConfig for DeltaLakeConfig { async fn build(&self, cx: SinkContext) -> vector::Result<(VectorSink, Healthcheck)> { - error!( + info!( "DEBUG: Building Delta Lake sink with bucket: {:?}", self.bucket ); // Create S3 service if bucket is configured let s3_service = if self.bucket.is_some() { - error!("DEBUG: Bucket configured, creating S3 service"); + info!("DEBUG: Bucket configured, creating S3 service"); match self.create_service(&cx.proxy).await { Ok(service) => { info!("S3 service created successfully"); diff --git a/src/sinks/topsql_meta_deltalake/processor.rs b/src/sinks/topsql_meta_deltalake/processor.rs index 0bb26a2..beae8dc 100644 --- a/src/sinks/topsql_meta_deltalake/processor.rs +++ b/src/sinks/topsql_meta_deltalake/processor.rs @@ -410,13 +410,13 @@ impl TopSQLDeltaLakeSink { let table_path = if self.base_path.to_string_lossy().starts_with("s3://") { // For S3 paths, append the table name to the S3 path PathBuf::from(format!( - "{}/{}", + "{}/type={}", self.base_path.to_string_lossy(), table_name )) } else { // For local paths, use join as before - self.base_path.join(table_name) + self.base_path.join(format!("type={}", table_name)) }; let table_config = self