Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/sinks/topsql_data_deltalake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ impl GenerateConfig for DeltaLakeConfig {
#[typetag::serde(name = "topsql_data_deltalake")]
impl SinkConfig for DeltaLakeConfig {
async fn build(&self, cx: SinkContext) -> vector::Result<(VectorSink, Healthcheck)> {
error!(
info!(
"DEBUG: Building Delta Lake sink with bucket: {:?}",
self.bucket
);

// Create S3 service if bucket is configured
let s3_service = if self.bucket.is_some() {
error!("DEBUG: Bucket configured, creating S3 service");
info!("DEBUG: Bucket configured, creating S3 service");
match self.create_service(&cx.proxy).await {
Ok(service) => {
info!("S3 service created successfully");
Expand Down
31 changes: 26 additions & 5 deletions src/sinks/topsql_data_deltalake/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,16 +357,37 @@ impl TopSQLDeltaLakeSink {
// Get or create writer for this table
let mut writers = self.writers.lock().await;
let writer = writers.entry(table_name.to_string()).or_insert_with(|| {
let (table_type, table_instance) = match table_name
.strip_prefix("topsql_")
.and_then(|rest| rest.split_once('_'))
{
Some((t, inst)) if !t.is_empty() && !inst.is_empty() => (t, inst),
_ => {
error!(
"Unexpected table_name format (expected `topsql_{{type}}_{{instance}}`): {}",
table_name
);
("unknown", "unknown")
}
};

let type_dir = format!("type=topsql_{}", table_type);
let instance_dir = format!("instance={}", table_instance);

let table_path = if self.base_path.to_string_lossy().starts_with("s3://") {
// For S3 paths, append the table name to the S3 path
// For S3 paths, build a partition-like directory structure
// <base>/topsql/data/type=.../instance=.../<table_name>
let base = self.base_path.to_string_lossy();
let base = base.trim_end_matches('/');
PathBuf::from(format!(
"{}/{}",
self.base_path.to_string_lossy(),
table_name
"{}/{}/{}",
base, type_dir, instance_dir
))
} else {
// For local paths, use join as before
self.base_path.join(table_name)
self.base_path
.join(&type_dir)
.join(&instance_dir)
};

let table_config = self
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/topsql_meta_deltalake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ impl GenerateConfig for DeltaLakeConfig {
#[typetag::serde(name = "topsql_meta_deltalake")]
impl SinkConfig for DeltaLakeConfig {
async fn build(&self, cx: SinkContext) -> vector::Result<(VectorSink, Healthcheck)> {
error!(
info!(
"DEBUG: Building Delta Lake sink with bucket: {:?}",
self.bucket
);

// Create S3 service if bucket is configured
let s3_service = if self.bucket.is_some() {
error!("DEBUG: Bucket configured, creating S3 service");
info!("DEBUG: Bucket configured, creating S3 service");
match self.create_service(&cx.proxy).await {
Ok(service) => {
info!("S3 service created successfully");
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/topsql_meta_deltalake/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,13 @@ impl TopSQLDeltaLakeSink {
let table_path = if self.base_path.to_string_lossy().starts_with("s3://") {
// For S3 paths, append the table name to the S3 path
PathBuf::from(format!(
"{}/{}",
"{}/type={}",
self.base_path.to_string_lossy(),
table_name
))
} else {
// For local paths, use join as before
self.base_path.join(table_name)
self.base_path.join(format!("type={}", table_name))
};

let table_config = self
Expand Down