Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .run/Test.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Test" type="CargoCommandRunConfiguration" factoryName="Cargo Command" nameIsGenerated="true">
<option name="buildProfileId" value="dev" />
<option name="command" value="test --features integration_tests --test test_fluss -- --test-threads=1" />
<option name="workingDirectory" value="file://$PROJECT_DIR$" />
<envs />
<option name="emulateTerminal" value="true" />
<option name="channel" value="DEFAULT" />
<option name="requiredFeatures" value="true" />
<option name="allFeatures" value="true" />
<option name="withSudo" value="false" />
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />
<method v="2">
<option name="CARGO.BUILD_TASK_PROVIDER" enabled="true" />
</method>
</configuration>
</component>
69 changes: 58 additions & 11 deletions crates/fluss/src/client/table/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,21 @@ impl TableLookup {
let lookup_row_type = row_type.project_with_field_names(primary_keys)?;

let physical_primary_keys = self.table_info.get_physical_primary_keys().to_vec();
let primary_key_encoder =
KeyEncoderFactory::of(&lookup_row_type, &physical_primary_keys, &data_lake_format)?;

let bucket_key_encoder = if self.table_info.is_default_bucket_key() {
let kv_format_version = self.table_info.get_table_config().get_kv_format_version()?;
let is_default_bucket_key = self.table_info.is_default_bucket_key();
let primary_key_encoder = KeyEncoderFactory::of_primary_key_encoder(
&lookup_row_type,
&physical_primary_keys,
&data_lake_format,
kv_format_version,
is_default_bucket_key,
)?;

let bucket_key_encoder = if is_default_bucket_key {
None
} else {
let bucket_keys = self.table_info.get_bucket_keys().to_vec();
Some(KeyEncoderFactory::of(
Some(KeyEncoderFactory::of_bucket_key_encoder(
&lookup_row_type,
&bucket_keys,
&data_lake_format,
Expand Down Expand Up @@ -451,8 +458,37 @@ impl TablePrefixLookup {
let lookup_row_type = row_type.project_with_field_names(&self.lookup_column_names)?;

let bucket_keys = self.table_info.get_bucket_keys().to_vec();
let prefix_key_encoder =
KeyEncoderFactory::of(&lookup_row_type, &bucket_keys, &data_lake_format)?;
let kv_format_version = self.table_info.get_table_config().get_kv_format_version()?;
let is_default_bucket_key = self.table_info.is_default_bucket_key();

// The bytes produced here are sent to the server to perform a
// lexicographic byte-prefix match against the stored primary keys, so
// the encoder must follow the primary key encoder rules (row-level
// byte-comparable). The encoded fields are the bucket key columns,
// which `validate_prefix_lookup` guarantees to be a strict prefix of
// the physical primary keys.
let prefix_lookup_key_encoder = KeyEncoderFactory::of_primary_key_encoder(
&lookup_row_type,
&bucket_keys,
&data_lake_format,
kv_format_version,
is_default_bucket_key,
)?;

// Bucket id must stay consistent with the bucket id computed by the
// downstream lake, so when the bucket key differs from the primary
// key we need a separate lake-aligned encoder. When they are the
// same, the prefix-lookup key encoder already produces the right
// bytes and we fall back to it (None).
let bucket_key_encoder = if is_default_bucket_key {
None
} else {
Some(KeyEncoderFactory::of_bucket_key_encoder(
&lookup_row_type,
&bucket_keys,
&data_lake_format,
)?)
};

let partition_getter = if self.table_info.is_partitioned() {
Some(PartitionGetter::new(
Expand All @@ -471,7 +507,8 @@ impl TablePrefixLookup {
metadata: self.metadata,
lookup_client: self.lookup_client,
bucketing_function,
prefix_key_encoder,
prefix_lookup_key_encoder,
bucket_key_encoder,
partition_getter,
num_buckets,
schema_ctx,
Expand Down Expand Up @@ -574,15 +611,25 @@ pub struct PrefixKeyLookuper {
metadata: Arc<Metadata>,
lookup_client: Arc<LookupClient>,
bucketing_function: Box<dyn BucketingFunction>,
prefix_key_encoder: Box<dyn KeyEncoder>,
/// Encodes the lookup row into the prefix bytes sent to the server for
/// byte-prefix matching against stored primary keys.
prefix_lookup_key_encoder: Box<dyn KeyEncoder>,
/// Optional lake-aligned encoder used solely to compute the bucket id.
/// `None` when the bucket key equals the primary key, in which case the
/// prefix lookup key bytes are reused for bucketing.
bucket_key_encoder: Option<Box<dyn KeyEncoder>>,
partition_getter: Option<PartitionGetter>,
num_buckets: i32,
schema_ctx: LookupSchemaCtx,
}

impl PrefixKeyLookuper {
pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result<LookupResult> {
let prefix_bytes = self.prefix_key_encoder.encode_key(row)?;
let prefix_bytes = self.prefix_lookup_key_encoder.encode_key(row)?;
let bk_bytes = match &mut self.bucket_key_encoder {
Some(encoder) => encoder.encode_key(row)?,
None => prefix_bytes.clone(),
};

let partition_id = if let Some(ref partition_getter) = self.partition_getter {
let partition_name = partition_getter.get_partition(row)?;
Expand All @@ -604,7 +651,7 @@ impl PrefixKeyLookuper {

let bucket_id = self
.bucketing_function
.bucketing(&prefix_bytes, self.num_buckets)?;
.bucketing(&bk_bytes, self.num_buckets)?;

let table_id = self.table_info.get_table_id();
let table_bucket = TableBucket::new_with_partition(table_id, partition_id, bucket_id);
Expand Down
15 changes: 12 additions & 3 deletions crates/fluss/src/client/table/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,18 @@ impl UpsertWriterFactory {
&partial_update_columns,
)?;

let primary_key_encoder = KeyEncoderFactory::of(row_type, physical_pks, data_lake_format)?;
let bucket_key_encoder = if !table_info.is_default_bucket_key() {
Some(KeyEncoderFactory::of(
let kv_format_version = table_info.get_table_config().get_kv_format_version()?;
let is_default_bucket_key = table_info.is_default_bucket_key();

let primary_key_encoder = KeyEncoderFactory::of_primary_key_encoder(
row_type,
physical_pks,
data_lake_format,
kv_format_version,
is_default_bucket_key,
)?;
let bucket_key_encoder = if !is_default_bucket_key {
Some(KeyEncoderFactory::of_bucket_key_encoder(
row_type,
table_info.get_bucket_keys(),
data_lake_format,
Expand Down
13 changes: 13 additions & 0 deletions crates/fluss/src/metadata/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,19 @@ impl TableConfig {
kv_format.parse().map_err(Into::into)
}

/// Returns the KV-format version of the table, defaulting to `1` when the
/// `table.kv.format-version` property is absent. Matches Java's
/// `TableConfig.getKvFormatVersion().orElse(1)` semantics.
pub fn get_kv_format_version(&self) -> Result<i32> {
const DEFAULT: i32 = 1;
match self.properties.get("table.kv.format-version") {
Some(v) => v.parse::<i32>().map_err(|e| {
Error::invalid_table(format!("Invalid table.kv.format-version {v:?}: {e}"))
}),
None => Ok(DEFAULT),
}
}

pub fn get_log_format(&self) -> Result<LogFormat> {
// TODO: Consolidate configurations logic, constants, defaults in a single place
const DEFAULT_LOG_FORMAT: &str = "ARROW";
Expand Down
65 changes: 61 additions & 4 deletions crates/fluss/src/row/encode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

mod compacted_key_encoder;
mod compacted_row_encoder;
mod paimon_key_encoder;

use crate::error::{Error, Result};
use crate::metadata::{DataLakeFormat, KvFormat, RowType};
use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
use crate::row::encode::paimon_key_encoder::PaimonKeyEncoder;
use crate::row::{Datum, InternalRow};
use bytes::Bytes;

Expand All @@ -34,6 +36,61 @@ pub trait KeyEncoder: Send + Sync {
pub struct KeyEncoderFactory;

impl KeyEncoderFactory {
/// Create a key encoder for the primary key, mirroring Java's
/// `KeyEncoder.ofPrimaryKeyEncoder`.
///
/// Use this encoder when encoding the primary key for KV writes/lookups,
/// or when encoding a prefix-lookup key (which is a strict prefix of the
/// primary key and must share its byte layout to support lexicographic
/// prefix matching).
///
/// - `kv_format_version == 2` with `is_default_bucket_key == false`
/// (i.e. the primary key differs from the bucket key) →
/// [`CompactedKeyEncoder::create_key_encoder`] regardless of lake format
/// (primary key must be decodable by the server without lake-specific
/// format awareness).
/// - Otherwise (`kv_format_version == 1`, or `kv_format_version == 2`
/// with `is_default_bucket_key == true` so the primary key matches the
/// bucket key) → align with the lake format via [`Self::of`], producing
/// the same physical layout as the bucket key.
/// - Any other `kv_format_version` → [`Error::UnsupportedOperation`].
pub fn of_primary_key_encoder(
row_type: &RowType,
key_fields: &[String],
data_lake_format: &Option<DataLakeFormat>,
kv_format_version: i32,
is_default_bucket_key: bool,
) -> Result<Box<dyn KeyEncoder>> {
match kv_format_version {
1 => Self::of(row_type, key_fields, data_lake_format),
2 if is_default_bucket_key => Self::of(row_type, key_fields, data_lake_format),
2 => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
row_type, key_fields,
)?)),
v => Err(Error::UnsupportedOperation {
message: format!("Unsupported kv format version: {v}"),
}),
}
}

/// Create a key encoder for the bucket key, mirroring Java's
/// `KeyEncoder.ofBucketKeyEncoder`.
///
/// Use this encoder when computing the bucket id for a row. The bucket
/// key encoding is bound to the configured data lake format (delegating
/// to [`Self::of`]) because the resulting bucket id must stay consistent
/// with the bucket id computed by the downstream lake (e.g. Paimon /
/// Lance / Iceberg); otherwise the same row would land in different
/// buckets on the Fluss side and on the lake side, breaking lake-tiering
/// semantics.
pub fn of_bucket_key_encoder(
row_type: &RowType,
key_fields: &[String],
data_lake_format: &Option<DataLakeFormat>,
) -> Result<Box<dyn KeyEncoder>> {
Self::of(row_type, key_fields, data_lake_format)
}

/// Create a key encoder to encode the key bytes of the input row.
/// # Arguments
/// * `row_type` - the row type of the input row
Expand All @@ -42,15 +99,15 @@ impl KeyEncoderFactory {
///
/// # Returns
/// key encoder
pub fn of(
fn of(
row_type: &RowType,
key_fields: &[String],
data_lake_format: &Option<DataLakeFormat>,
) -> Result<Box<dyn KeyEncoder>> {
match data_lake_format {
Some(DataLakeFormat::Paimon) => Err(Error::UnsupportedOperation {
message: "KeyEncoder for Paimon format is not yet implemented".to_string(),
}),
Some(DataLakeFormat::Paimon) => {
Ok(Box::new(PaimonKeyEncoder::new(row_type, key_fields)?))
}
Some(DataLakeFormat::Lance) => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
row_type, key_fields,
)?)),
Expand Down
Loading
Loading