diff --git a/.run/Test.run.xml b/.run/Test.run.xml
new file mode 100644
index 00000000..527e969c
--- /dev/null
+++ b/.run/Test.run.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/crates/fluss/src/client/table/lookup.rs b/crates/fluss/src/client/table/lookup.rs
index 51a0a071..dae08eef 100644
--- a/crates/fluss/src/client/table/lookup.rs
+++ b/crates/fluss/src/client/table/lookup.rs
@@ -303,14 +303,21 @@ impl TableLookup {
let lookup_row_type = row_type.project_with_field_names(primary_keys)?;
let physical_primary_keys = self.table_info.get_physical_primary_keys().to_vec();
- let primary_key_encoder =
- KeyEncoderFactory::of(&lookup_row_type, &physical_primary_keys, &data_lake_format)?;
-
- let bucket_key_encoder = if self.table_info.is_default_bucket_key() {
+ let kv_format_version = self.table_info.get_table_config().get_kv_format_version()?;
+ let is_default_bucket_key = self.table_info.is_default_bucket_key();
+ let primary_key_encoder = KeyEncoderFactory::of_primary_key_encoder(
+ &lookup_row_type,
+ &physical_primary_keys,
+ &data_lake_format,
+ kv_format_version,
+ is_default_bucket_key,
+ )?;
+
+ let bucket_key_encoder = if is_default_bucket_key {
None
} else {
let bucket_keys = self.table_info.get_bucket_keys().to_vec();
- Some(KeyEncoderFactory::of(
+ Some(KeyEncoderFactory::of_bucket_key_encoder(
&lookup_row_type,
&bucket_keys,
&data_lake_format,
@@ -451,8 +458,37 @@ impl TablePrefixLookup {
let lookup_row_type = row_type.project_with_field_names(&self.lookup_column_names)?;
let bucket_keys = self.table_info.get_bucket_keys().to_vec();
- let prefix_key_encoder =
- KeyEncoderFactory::of(&lookup_row_type, &bucket_keys, &data_lake_format)?;
+ let kv_format_version = self.table_info.get_table_config().get_kv_format_version()?;
+ let is_default_bucket_key = self.table_info.is_default_bucket_key();
+
+ // The bytes produced here are sent to the server to perform a
+ // lexicographic byte-prefix match against the stored primary keys, so
+ // the encoder must follow the primary key encoder rules (row-level
+ // byte-comparable). The encoded fields are the bucket key columns,
+ // which `validate_prefix_lookup` guarantees to be a strict prefix of
+ // the physical primary keys.
+ let prefix_lookup_key_encoder = KeyEncoderFactory::of_primary_key_encoder(
+ &lookup_row_type,
+ &bucket_keys,
+ &data_lake_format,
+ kv_format_version,
+ is_default_bucket_key,
+ )?;
+
+ // Bucket id must stay consistent with the bucket id computed by the
+ // downstream lake, so when the bucket key differs from the primary
+ // key we need a separate lake-aligned encoder. When they are the
+ // same, the prefix-lookup key encoder already produces the right
+ // bytes and we fall back to it (None).
+ let bucket_key_encoder = if is_default_bucket_key {
+ None
+ } else {
+ Some(KeyEncoderFactory::of_bucket_key_encoder(
+ &lookup_row_type,
+ &bucket_keys,
+ &data_lake_format,
+ )?)
+ };
let partition_getter = if self.table_info.is_partitioned() {
Some(PartitionGetter::new(
@@ -471,7 +507,8 @@ impl TablePrefixLookup {
metadata: self.metadata,
lookup_client: self.lookup_client,
bucketing_function,
- prefix_key_encoder,
+ prefix_lookup_key_encoder,
+ bucket_key_encoder,
partition_getter,
num_buckets,
schema_ctx,
@@ -574,7 +611,13 @@ pub struct PrefixKeyLookuper {
metadata: Arc,
lookup_client: Arc,
bucketing_function: Box,
- prefix_key_encoder: Box,
+ /// Encodes the lookup row into the prefix bytes sent to the server for
+ /// byte-prefix matching against stored primary keys.
+ prefix_lookup_key_encoder: Box,
+ /// Optional lake-aligned encoder used solely to compute the bucket id.
+ /// `None` when the bucket key equals the primary key, in which case the
+ /// prefix lookup key bytes are reused for bucketing.
+ bucket_key_encoder: Option>,
partition_getter: Option,
num_buckets: i32,
schema_ctx: LookupSchemaCtx,
@@ -582,7 +625,11 @@ pub struct PrefixKeyLookuper {
impl PrefixKeyLookuper {
pub async fn lookup(&mut self, row: &dyn InternalRow) -> Result {
- let prefix_bytes = self.prefix_key_encoder.encode_key(row)?;
+ let prefix_bytes = self.prefix_lookup_key_encoder.encode_key(row)?;
+ let bk_bytes = match &mut self.bucket_key_encoder {
+ Some(encoder) => encoder.encode_key(row)?,
+ None => prefix_bytes.clone(),
+ };
let partition_id = if let Some(ref partition_getter) = self.partition_getter {
let partition_name = partition_getter.get_partition(row)?;
@@ -604,7 +651,7 @@ impl PrefixKeyLookuper {
let bucket_id = self
.bucketing_function
- .bucketing(&prefix_bytes, self.num_buckets)?;
+ .bucketing(&bk_bytes, self.num_buckets)?;
let table_id = self.table_info.get_table_id();
let table_bucket = TableBucket::new_with_partition(table_id, partition_id, bucket_id);
diff --git a/crates/fluss/src/client/table/upsert.rs b/crates/fluss/src/client/table/upsert.rs
index 52ec37b3..8a5547a7 100644
--- a/crates/fluss/src/client/table/upsert.rs
+++ b/crates/fluss/src/client/table/upsert.rs
@@ -143,9 +143,18 @@ impl UpsertWriterFactory {
&partial_update_columns,
)?;
- let primary_key_encoder = KeyEncoderFactory::of(row_type, physical_pks, data_lake_format)?;
- let bucket_key_encoder = if !table_info.is_default_bucket_key() {
- Some(KeyEncoderFactory::of(
+ let kv_format_version = table_info.get_table_config().get_kv_format_version()?;
+ let is_default_bucket_key = table_info.is_default_bucket_key();
+
+ let primary_key_encoder = KeyEncoderFactory::of_primary_key_encoder(
+ row_type,
+ physical_pks,
+ data_lake_format,
+ kv_format_version,
+ is_default_bucket_key,
+ )?;
+ let bucket_key_encoder = if !is_default_bucket_key {
+ Some(KeyEncoderFactory::of_bucket_key_encoder(
row_type,
table_info.get_bucket_keys(),
data_lake_format,
diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs
index 390bdbfc..35d251d7 100644
--- a/crates/fluss/src/metadata/table.rs
+++ b/crates/fluss/src/metadata/table.rs
@@ -1172,6 +1172,19 @@ impl TableConfig {
kv_format.parse().map_err(Into::into)
}
+ /// Returns the KV-format version of the table, defaulting to `1` when the
+ /// `table.kv.format-version` property is absent. Matches Java's
+ /// `TableConfig.getKvFormatVersion().orElse(1)` semantics.
+ pub fn get_kv_format_version(&self) -> Result {
+ const DEFAULT: i32 = 1;
+ match self.properties.get("table.kv.format-version") {
+ Some(v) => v.parse::().map_err(|e| {
+ Error::invalid_table(format!("Invalid table.kv.format-version {v:?}: {e}"))
+ }),
+ None => Ok(DEFAULT),
+ }
+ }
+
pub fn get_log_format(&self) -> Result {
// TODO: Consolidate configurations logic, constants, defaults in a single place
const DEFAULT_LOG_FORMAT: &str = "ARROW";
diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs
index 16a540eb..ce96f0b8 100644
--- a/crates/fluss/src/row/encode/mod.rs
+++ b/crates/fluss/src/row/encode/mod.rs
@@ -17,11 +17,13 @@
mod compacted_key_encoder;
mod compacted_row_encoder;
+mod paimon_key_encoder;
use crate::error::{Error, Result};
use crate::metadata::{DataLakeFormat, KvFormat, RowType};
use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder;
use crate::row::encode::compacted_row_encoder::CompactedRowEncoder;
+use crate::row::encode::paimon_key_encoder::PaimonKeyEncoder;
use crate::row::{Datum, InternalRow};
use bytes::Bytes;
@@ -34,6 +36,61 @@ pub trait KeyEncoder: Send + Sync {
pub struct KeyEncoderFactory;
impl KeyEncoderFactory {
+ /// Create a key encoder for the primary key, mirroring Java's
+ /// `KeyEncoder.ofPrimaryKeyEncoder`.
+ ///
+ /// Use this encoder when encoding the primary key for KV writes/lookups,
+ /// or when encoding a prefix-lookup key (which is a strict prefix of the
+ /// primary key and must share its byte layout to support lexicographic
+ /// prefix matching).
+ ///
+ /// - `kv_format_version == 2` with `is_default_bucket_key == false`
+ /// (i.e. the primary key differs from the bucket key) →
+ /// [`CompactedKeyEncoder::create_key_encoder`] regardless of lake format
+ /// (primary key must be decodable by the server without lake-specific
+ /// format awareness).
+ /// - Otherwise (`kv_format_version == 1`, or `kv_format_version == 2`
+ /// with `is_default_bucket_key == true` so the primary key matches the
+ /// bucket key) → align with the lake format via [`Self::of`], producing
+ /// the same physical layout as the bucket key.
+ /// - Any other `kv_format_version` → [`Error::UnsupportedOperation`].
+ pub fn of_primary_key_encoder(
+ row_type: &RowType,
+ key_fields: &[String],
+ data_lake_format: &Option,
+ kv_format_version: i32,
+ is_default_bucket_key: bool,
+ ) -> Result> {
+ match kv_format_version {
+ 1 => Self::of(row_type, key_fields, data_lake_format),
+ 2 if is_default_bucket_key => Self::of(row_type, key_fields, data_lake_format),
+ 2 => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
+ row_type, key_fields,
+ )?)),
+ v => Err(Error::UnsupportedOperation {
+ message: format!("Unsupported kv format version: {v}"),
+ }),
+ }
+ }
+
+ /// Create a key encoder for the bucket key, mirroring Java's
+ /// `KeyEncoder.ofBucketKeyEncoder`.
+ ///
+ /// Use this encoder when computing the bucket id for a row. The bucket
+ /// key encoding is bound to the configured data lake format (delegating
+ /// to [`Self::of`]) because the resulting bucket id must stay consistent
+ /// with the bucket id computed by the downstream lake (e.g. Paimon /
+ /// Lance / Iceberg); otherwise the same row would land in different
+ /// buckets on the Fluss side and on the lake side, breaking lake-tiering
+ /// semantics.
+ pub fn of_bucket_key_encoder(
+ row_type: &RowType,
+ key_fields: &[String],
+ data_lake_format: &Option,
+ ) -> Result> {
+ Self::of(row_type, key_fields, data_lake_format)
+ }
+
/// Create a key encoder to encode the key bytes of the input row.
/// # Arguments
/// * `row_type` - the row type of the input row
@@ -42,15 +99,15 @@ impl KeyEncoderFactory {
///
/// # Returns
/// key encoder
- pub fn of(
+ fn of(
row_type: &RowType,
key_fields: &[String],
data_lake_format: &Option,
) -> Result> {
match data_lake_format {
- Some(DataLakeFormat::Paimon) => Err(Error::UnsupportedOperation {
- message: "KeyEncoder for Paimon format is not yet implemented".to_string(),
- }),
+ Some(DataLakeFormat::Paimon) => {
+ Ok(Box::new(PaimonKeyEncoder::new(row_type, key_fields)?))
+ }
Some(DataLakeFormat::Lance) => Ok(Box::new(CompactedKeyEncoder::create_key_encoder(
row_type, key_fields,
)?)),
diff --git a/crates/fluss/src/row/encode/paimon_key_encoder.rs b/crates/fluss/src/row/encode/paimon_key_encoder.rs
new file mode 100644
index 00000000..5eeb569d
--- /dev/null
+++ b/crates/fluss/src/row/encode/paimon_key_encoder.rs
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+
+use crate::error::Error::IllegalArgument;
+use crate::error::Result;
+use crate::metadata::RowType;
+use crate::row::binary::ValueWriter;
+use crate::row::encode::KeyEncoder;
+use crate::row::field_getter::FieldGetter;
+use crate::row::paimon::PaimonBinaryRowWriter;
+use crate::row::{Datum, InternalRow};
+
+/// Rust port of Java's `org.apache.fluss.row.encode.paimon.PaimonKeyEncoder`.
+///
+/// Encodes a set of key columns of a Fluss `InternalRow` into Paimon's
+/// BinaryRow layout via [`PaimonBinaryRowWriter`]. The key's row type is a
+/// projection of the original `row_type` to just the `keys` (same column order
+/// as `keys`), and each key column is written into its projection position.
+pub struct PaimonKeyEncoder {
+ field_getters: Vec,
+ field_encoders: Vec,
+ writer: PaimonBinaryRowWriter,
+}
+
+impl PaimonKeyEncoder {
+ /// Construct a Paimon key encoder for `keys` drawn from `row_type`.
+ pub fn new(row_type: &RowType, keys: &[String]) -> Result {
+ let mut field_getters: Vec = Vec::with_capacity(keys.len());
+ let mut field_encoders: Vec = Vec::with_capacity(keys.len());
+
+ for key in keys {
+ let idx = row_type
+ .get_field_index(key)
+ .ok_or_else(|| IllegalArgument {
+ message: format!("Field {key:?} not found in input row type {row_type:?}"),
+ })?;
+ let data_type = row_type.fields()[idx].data_type();
+ // Validate Paimon-supported field type (rejects ARRAY/MAP/ROW).
+ let value_writer = PaimonBinaryRowWriter::create_value_writer(data_type)?;
+ field_getters.push(FieldGetter::create(data_type, idx));
+ field_encoders.push(value_writer);
+ }
+
+ Ok(Self {
+ writer: PaimonBinaryRowWriter::new(keys.len()),
+ field_getters,
+ field_encoders,
+ })
+ }
+}
+
+impl KeyEncoder for PaimonKeyEncoder {
+ fn encode_key(&mut self, row: &dyn InternalRow) -> Result {
+ use crate::row::binary::BinaryWriter;
+
+ self.writer.reset();
+ self.writer.write_change_type_insert();
+
+ for (proj_pos, (getter, encoder)) in self
+ .field_getters
+ .iter()
+ .zip(self.field_encoders.iter())
+ .enumerate()
+ {
+ let datum: Datum = getter.get_field(row)?;
+ encoder.write_value(&mut self.writer, proj_pos, &datum)?;
+ }
+
+ Ok(self.writer.to_bytes())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::DataTypes;
+ use crate::row::{Datum, GenericRow};
+
+ #[test]
+ fn encode_single_int_key_has_insert_header() {
+ let row_type = RowType::with_data_types_and_field_names(
+ vec![DataTypes::int(), DataTypes::string()],
+ vec!["pk", "other"],
+ );
+ let mut encoder =
+ PaimonKeyEncoder::new(&row_type, &["pk".to_string()]).expect("construct encoder");
+
+ let row = GenericRow::from_data(vec![Datum::from(42i32), Datum::from("hi")]);
+ let bytes = encoder.encode_key(&row).unwrap();
+
+ // 8 bytes null-bits + 8 bytes field slot
+ assert_eq!(bytes.len(), 16);
+ // change-type INSERT = 0 at byte 0
+ assert_eq!(bytes[0], 0);
+ // int 42 in little-endian in the field slot
+ assert_eq!(&bytes[8..12], &42_i32.to_le_bytes());
+ }
+
+ #[test]
+ fn encode_multi_field_key() {
+ let row_type = RowType::with_data_types_and_field_names(
+ vec![DataTypes::string(), DataTypes::int(), DataTypes::string()],
+ vec!["other", "pk1", "pk2"],
+ );
+ let mut encoder = PaimonKeyEncoder::new(&row_type, &["pk1".to_string(), "pk2".to_string()])
+ .expect("construct encoder");
+
+ let row = GenericRow::from_data(vec![
+ Datum::from("ignored"),
+ Datum::from(7i32),
+ Datum::from("hi"),
+ ]);
+ let bytes = encoder.encode_key(&row).unwrap();
+
+ // 8 bytes null-bits + 2 * 8 bytes field slots = 24
+ assert_eq!(bytes.len(), 24);
+ // slot 0 -> int 7
+ assert_eq!(&bytes[8..12], &7_i32.to_le_bytes());
+ // slot 1 -> "hi" inlined; bytes[16..18] = "hi", bytes[23] = 0x82
+ assert_eq!(&bytes[16..18], b"hi");
+ assert_eq!(bytes[23], 0x82);
+ }
+
+ #[test]
+ fn encode_reuses_buffer_across_rows() {
+ let row_type = RowType::with_data_types_and_field_names(vec![DataTypes::int()], vec!["pk"]);
+ let mut encoder = PaimonKeyEncoder::new(&row_type, &["pk".to_string()]).unwrap();
+
+ let row_a = GenericRow::from_data(vec![Datum::from(1i32)]);
+ let row_b = GenericRow::from_data(vec![Datum::from(1i32)]);
+ let a = encoder.encode_key(&row_a).unwrap().to_vec();
+ let b = encoder.encode_key(&row_b).unwrap().to_vec();
+ assert_eq!(a, b);
+ }
+
+ #[test]
+ fn encode_missing_key_field_errors() {
+ let row_type = RowType::with_data_types_and_field_names(vec![DataTypes::int()], vec!["pk"]);
+ let res = PaimonKeyEncoder::new(&row_type, &["missing".to_string()]);
+ let err = match res {
+ Ok(_) => panic!("expected IllegalArgument"),
+ Err(e) => e,
+ };
+ assert!(err.to_string().contains("not found in input row type"));
+ }
+}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 1e045b2d..c2510a6c 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -29,6 +29,7 @@ pub mod encode;
pub mod field_getter;
mod fixed_schema_decoder;
mod lookup_row;
+pub mod paimon;
mod projected_row;
mod row_decoder;
diff --git a/crates/fluss/src/row/paimon/mod.rs b/crates/fluss/src/row/paimon/mod.rs
new file mode 100644
index 00000000..99981df2
--- /dev/null
+++ b/crates/fluss/src/row/paimon/mod.rs
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod paimon_binary_row_writer;
+
+pub use paimon_binary_row_writer::PaimonBinaryRowWriter;
diff --git a/crates/fluss/src/row/paimon/paimon_binary_row_writer.rs b/crates/fluss/src/row/paimon/paimon_binary_row_writer.rs
new file mode 100644
index 00000000..23ff4545
--- /dev/null
+++ b/crates/fluss/src/row/paimon/paimon_binary_row_writer.rs
@@ -0,0 +1,550 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Bytes;
+
+use crate::error::{Error, Result};
+use crate::metadata::DataType;
+use crate::row::binary::{BinaryWriter, ValueWriter};
+use crate::row::datum::{TimestampLtz, TimestampNtz};
+use crate::row::{Decimal, FlussArray, FlussMap};
+
+/// Header size in bits (for the ChangeType byte at the front of the null bitset).
+const HEADER_SIZE_IN_BITS: usize = 8;
+/// Maximum number of bytes that can be packed inline into a fixed 8-byte field slot
+/// (Paimon's variable-length inline-encoding optimisation).
+const MAX_FIX_PART_DATA_SIZE: usize = 7;
+
+/// A Rust port of Java's
+/// `org.apache.fluss.row.encode.paimon.PaimonBinaryRowWriter`, encoding a Fluss
+/// `InternalRow` using Paimon's BinaryRow layout.
+///
+/// Layout:
+/// - 1-byte ChangeType header at offset 0 (always `INSERT = 0` for key encoding).
+/// - Null bitset (`nullBitsSizeInBytes` bytes), where bit `pos + 8` indicates
+/// field `pos` is null.
+/// - Fixed-length region: 8 bytes per field after the null bitset.
+/// - Variable-length tail growing on demand.
+///
+/// This writer implements the [`BinaryWriter`] trait so it can plug into
+/// [`ValueWriter::write_value`]. Because the trait API does not pass the
+/// position to the type-specific write methods, the writer keeps an internal
+/// `current_pos` cursor that is advanced after every field write (including
+/// `set_null_at`). The encoder is required to write fields in field order,
+/// matching the iteration order used by [`crate::row::encode`].
+pub struct PaimonBinaryRowWriter {
+ null_bits_size_in_bytes: usize,
+ fixed_size: usize,
+ buffer: Vec,
+ cursor: usize,
+ current_pos: usize,
+}
+
+impl PaimonBinaryRowWriter {
+ pub fn new(arity: usize) -> Self {
+ let null_bits_size_in_bytes = calculate_bit_set_width_in_bytes(arity);
+ let fixed_size = get_fixed_length_part_size(null_bits_size_in_bytes, arity);
+ Self {
+ null_bits_size_in_bytes,
+ fixed_size,
+ buffer: vec![0u8; fixed_size],
+ cursor: fixed_size,
+ current_pos: 0,
+ }
+ }
+
+ /// Mirrors Java's `PaimonBinaryRowWriter.createFieldWriter`, returning the
+ /// Fluss [`ValueWriter`] for a Paimon-supported scalar field type.
+ /// ARRAY/MAP/ROW are explicitly rejected (Java's `default` branch throws).
+ pub fn create_value_writer(field_type: &DataType) -> Result {
+ match field_type {
+ DataType::Char(_)
+ | DataType::String(_)
+ | DataType::Boolean(_)
+ | DataType::Binary(_)
+ | DataType::Bytes(_)
+ | DataType::Decimal(_)
+ | DataType::TinyInt(_)
+ | DataType::SmallInt(_)
+ | DataType::Int(_)
+ | DataType::Date(_)
+ | DataType::Time(_)
+ | DataType::BigInt(_)
+ | DataType::Float(_)
+ | DataType::Double(_)
+ | DataType::Timestamp(_)
+ | DataType::TimestampLTz(_) => ValueWriter::create_value_writer(field_type, None),
+ _ => Err(Error::UnsupportedOperation {
+ message: format!("Unsupported type for Paimon BinaryRow writer: {field_type:?}"),
+ }),
+ }
+ }
+
+ /// Writes the Paimon ChangeType byte at offset 0 (always `INSERT = 0`
+ /// for key encoding). Must be called immediately after [`Self::reset`].
+ pub fn write_change_type_insert(&mut self) {
+ self.buffer[0] = 0;
+ }
+
+ pub fn to_bytes(&self) -> Bytes {
+ Bytes::copy_from_slice(&self.buffer[..self.cursor])
+ }
+
+ #[allow(dead_code)]
+ pub fn buffer(&self) -> &[u8] {
+ &self.buffer[..self.cursor]
+ }
+
+ #[allow(dead_code)]
+ pub fn cursor(&self) -> usize {
+ self.cursor
+ }
+
+ fn field_offset(&self, pos: usize) -> usize {
+ self.null_bits_size_in_bytes + 8 * pos
+ }
+
+ fn set_null_bit(&mut self, pos: usize) {
+ let bit = pos + HEADER_SIZE_IN_BITS;
+ let byte_index = bit / 8;
+ let bit_in_byte = bit % 8;
+ self.buffer[byte_index] |= 1u8 << bit_in_byte;
+ }
+
+ fn put_long_le(&mut self, offset: usize, value: i64) {
+ self.buffer[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
+ }
+
+ fn put_int_le(&mut self, offset: usize, value: i32) {
+ self.buffer[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
+ }
+
+ fn put_short_le(&mut self, offset: usize, value: i16) {
+ self.buffer[offset..offset + 2].copy_from_slice(&value.to_le_bytes());
+ }
+
+ /// Set `(offset << 32) | size` as a little-endian i64 at the field slot.
+ fn set_offset_and_size(&mut self, pos: usize, offset: usize, size: u64) {
+ let packed = ((offset as i64) << 32) | (size as i64);
+ let field_offset = self.field_offset(pos);
+ self.put_long_le(field_offset, packed);
+ }
+
+ /// Inline ≤ 7-byte payload into the 8-byte fixed slot using Paimon's layout
+ /// (`firstByte = len | 0x80` in the high byte, data bytes packed
+ /// little-endian into the low bytes).
+ fn write_bytes_to_fix_len_part(&mut self, pos: usize, bytes: &[u8]) {
+ let len = bytes.len();
+ debug_assert!(len <= MAX_FIX_PART_DATA_SIZE);
+ let field_offset = self.field_offset(pos);
+ // Zero the slot first (in case we're reusing buffer positions on reset).
+ for b in &mut self.buffer[field_offset..field_offset + 8] {
+ *b = 0;
+ }
+ // Data bytes occupy the low-order positions; first byte (len|0x80)
+ // sits at the high-order byte (index 7) thanks to little-endian layout.
+ self.buffer[field_offset..field_offset + len].copy_from_slice(bytes);
+ self.buffer[field_offset + 7] = (len as u8) | 0x80;
+ }
+
+ fn ensure_capacity(&mut self, needed_size: usize) {
+ let length = self.cursor + needed_size;
+ if self.buffer.len() < length {
+ self.grow(length);
+ }
+ }
+
+ fn grow(&mut self, min_capacity: usize) {
+ let old_capacity = self.buffer.len();
+ let mut new_capacity = old_capacity + (old_capacity >> 1);
+ if new_capacity < min_capacity {
+ new_capacity = min_capacity;
+ }
+ self.buffer.resize(new_capacity, 0);
+ }
+
+ /// Zero out the padding region between `numBytes` and the next 8-byte
+ /// boundary at the current cursor (matches Java's `zeroOutPaddingBytes`).
+ fn zero_out_padding_bytes(&mut self, num_bytes: usize) {
+ if (num_bytes & 0x07) > 0 {
+ let aligned = (num_bytes >> 3) << 3;
+ // 8 bytes starting at cursor + aligned.
+ let off = self.cursor + aligned;
+ for b in &mut self.buffer[off..off + 8] {
+ *b = 0;
+ }
+ }
+ }
+
+ fn write_bytes_to_var_len_part(&mut self, pos: usize, bytes: &[u8]) {
+ let len = bytes.len();
+ let rounded_size = round_number_of_bytes_to_nearest_word(len);
+
+ self.ensure_capacity(rounded_size);
+ self.zero_out_padding_bytes(len);
+
+ self.buffer[self.cursor..self.cursor + len].copy_from_slice(bytes);
+
+ self.set_offset_and_size(pos, self.cursor, len as u64);
+ self.cursor += rounded_size;
+ }
+
+ fn write_bytes_internal(&mut self, pos: usize, bytes: &[u8]) {
+ if bytes.len() <= MAX_FIX_PART_DATA_SIZE {
+ self.write_bytes_to_fix_len_part(pos, bytes);
+ } else {
+ self.write_bytes_to_var_len_part(pos, bytes);
+ }
+ }
+}
+
+/// Number of bytes occupied by Paimon's null bitset for the given arity,
+/// including the 1-byte (8-bit) ChangeType header.
+fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
+ ((arity + 63 + HEADER_SIZE_IN_BITS) / 64) * 8
+}
+
+fn get_fixed_length_part_size(null_bits_size_in_bytes: usize, arity: usize) -> usize {
+ null_bits_size_in_bytes + 8 * arity
+}
+
+fn round_number_of_bytes_to_nearest_word(num_bytes: usize) -> usize {
+ let remainder = num_bytes & 0x07;
+ if remainder == 0 {
+ num_bytes
+ } else {
+ num_bytes + (8 - remainder)
+ }
+}
+
+impl BinaryWriter for PaimonBinaryRowWriter {
+ fn reset(&mut self) {
+ self.cursor = self.fixed_size;
+ self.current_pos = 0;
+ // Zero the null-bits region only (Java semantics: field slots are not
+ // wiped because every field is overwritten in the next encode pass).
+ for b in &mut self.buffer[..self.null_bits_size_in_bytes] {
+ *b = 0;
+ }
+ }
+
+ fn set_null_at(&mut self, pos: usize) {
+ debug_assert_eq!(
+ pos, self.current_pos,
+ "Paimon writer expects in-order writes"
+ );
+ self.set_null_bit(pos);
+ let field_offset = self.field_offset(pos);
+ self.put_long_le(field_offset, 0);
+ self.current_pos = pos + 1;
+ }
+
+ fn write_boolean(&mut self, value: bool) {
+ let pos = self.current_pos;
+ let off = self.field_offset(pos);
+ self.buffer[off] = if value { 1 } else { 0 };
+ self.current_pos = pos + 1;
+ }
+
+ fn write_byte(&mut self, value: u8) {
+ let pos = self.current_pos;
+ let off = self.field_offset(pos);
+ self.buffer[off] = value;
+ self.current_pos = pos + 1;
+ }
+
+ fn write_bytes(&mut self, value: &[u8]) {
+ let pos = self.current_pos;
+ self.write_bytes_internal(pos, value);
+ self.current_pos = pos + 1;
+ }
+
+ fn write_char(&mut self, value: &str, _length: usize) {
+ // Paimon treats CHAR identically to STRING (BinaryString in Java).
+ self.write_string(value);
+ }
+
+ fn write_string(&mut self, value: &str) {
+ let pos = self.current_pos;
+ self.write_bytes_internal(pos, value.as_bytes());
+ self.current_pos = pos + 1;
+ }
+
+ fn write_short(&mut self, value: i16) {
+ let pos = self.current_pos;
+ let off = self.field_offset(pos);
+ // Zero the unused high bytes to keep the slot deterministic.
+ self.put_long_le(off, 0);
+ self.put_short_le(off, value);
+ self.current_pos = pos + 1;
+ }
+
+ fn write_int(&mut self, value: i32) {
+ let pos = self.current_pos;
+ let off = self.field_offset(pos);
+ self.put_long_le(off, 0);
+ self.put_int_le(off, value);
+ self.current_pos = pos + 1;
+ }
+
+ fn write_long(&mut self, value: i64) {
+ let pos = self.current_pos;
+ let off = self.field_offset(pos);
+ self.put_long_le(off, value);
+ self.current_pos = pos + 1;
+ }
+
+ fn write_float(&mut self, value: f32) {
+ let pos = self.current_pos;
+ let off = self.field_offset(pos);
+ self.put_long_le(off, 0);
+ self.buffer[off..off + 4].copy_from_slice(&value.to_le_bytes());
+ self.current_pos = pos + 1;
+ }
+
+ fn write_double(&mut self, value: f64) {
+ let pos = self.current_pos;
+ let off = self.field_offset(pos);
+ self.buffer[off..off + 8].copy_from_slice(&value.to_le_bytes());
+ self.current_pos = pos + 1;
+ }
+
+ fn write_binary(&mut self, bytes: &[u8], length: usize) {
+ let pos = self.current_pos;
+ let slice = &bytes[..length.min(bytes.len())];
+ self.write_bytes_internal(pos, slice);
+ self.current_pos = pos + 1;
+ }
+
+ fn write_decimal(&mut self, value: &Decimal, precision: u32) {
+ let pos = self.current_pos;
+ if Decimal::is_compact_precision(precision) {
+ // Compact: store unscaled long in the field slot (use write_long
+ // semantics but consume `current_pos` exactly once).
+ let unscaled = value.to_unscaled_long().unwrap_or(0);
+ let off = self.field_offset(pos);
+ self.put_long_le(off, unscaled);
+ } else {
+ // Non-compact: 16 bytes in variable region, set offset+size in slot.
+ self.ensure_capacity(16);
+ // Zero the 16 bytes.
+ for b in &mut self.buffer[self.cursor..self.cursor + 16] {
+ *b = 0;
+ }
+ let bytes = value.to_unscaled_bytes();
+ debug_assert!(bytes.len() <= 16, "decimal unscaled bytes exceed 16");
+ self.buffer[self.cursor..self.cursor + bytes.len()].copy_from_slice(&bytes);
+ self.set_offset_and_size(pos, self.cursor, bytes.len() as u64);
+ self.cursor += 16;
+ }
+ self.current_pos = pos + 1;
+ }
+
+ fn write_time(&mut self, value: i32, _precision: u32) {
+ // Java's Paimon writer uses writeInt for TIME.
+ self.write_int(value);
+ }
+
+ fn write_timestamp_ntz(&mut self, value: &TimestampNtz, precision: u32) {
+ let pos = self.current_pos;
+ if TimestampNtz::is_compact(precision) {
+ let off = self.field_offset(pos);
+ self.put_long_le(off, value.get_millisecond());
+ } else {
+ self.ensure_capacity(8);
+ self.put_long_le(self.cursor, value.get_millisecond());
+ self.set_offset_and_size(pos, self.cursor, value.get_nano_of_millisecond() as u64);
+ self.cursor += 8;
+ }
+ self.current_pos = pos + 1;
+ }
+
+ fn write_timestamp_ltz(&mut self, value: &TimestampLtz, precision: u32) {
+ let pos = self.current_pos;
+ if TimestampLtz::is_compact(precision) {
+ let off = self.field_offset(pos);
+ self.put_long_le(off, value.get_epoch_millisecond());
+ } else {
+ self.ensure_capacity(8);
+ self.put_long_le(self.cursor, value.get_epoch_millisecond());
+ self.set_offset_and_size(pos, self.cursor, value.get_nano_of_millisecond() as u64);
+ self.cursor += 8;
+ }
+ self.current_pos = pos + 1;
+ }
+
+ fn write_array(&mut self, _value: &FlussArray) {
+ // Java's PaimonBinaryRowWriter rejects ARRAY in its switch default.
+ // This should be unreachable because `create_value_writer` rejects
+ // ARRAY at encoder-construction time.
+ panic!("Paimon BinaryRow writer does not support ARRAY field types");
+ }
+
+ fn write_map(&mut self, _value: &FlussMap) {
+ panic!("Paimon BinaryRow writer does not support MAP field types");
+ }
+
+ fn complete(&mut self) {
+ // No-op: `to_bytes` already returns the trimmed-to-cursor buffer.
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::DataTypes;
+
+ fn writer_for_arity(arity: usize) -> PaimonBinaryRowWriter {
+ let mut w = PaimonBinaryRowWriter::new(arity);
+ w.write_change_type_insert();
+ w
+ }
+
+ #[test]
+ fn fixed_layout_sizes() {
+ // arity=4 -> nullBitsSizeInBytes = ceil((4 + 63 + 8)/64)*8 = 8
+ // fixedSize = 8 + 8*4 = 40
+ let w = PaimonBinaryRowWriter::new(4);
+ assert_eq!(w.null_bits_size_in_bytes, 8);
+ assert_eq!(w.fixed_size, 40);
+ assert_eq!(w.cursor, 40);
+ }
+
+ #[test]
+ fn write_int_in_slot_le() {
+ let mut w = writer_for_arity(1);
+ w.write_int(0x01020304);
+ let bytes = w.to_bytes();
+ // header byte 0 (INSERT) + null bits (zeros) total 8, then slot 8 bytes.
+ assert_eq!(bytes.len(), 16);
+ assert_eq!(bytes[0], 0x00);
+ // INT is written little-endian into bytes 8..12.
+ assert_eq!(&bytes[8..12], &0x01020304_i32.to_le_bytes());
+ // Remaining 4 bytes of the slot are zero.
+ assert_eq!(&bytes[12..16], &[0u8; 4]);
+ }
+
+ #[test]
+ fn write_short_string_inlined() {
+ let mut w = writer_for_arity(1);
+ w.write_string("hi"); // 2 bytes -> inline
+ let bytes = w.to_bytes();
+ assert_eq!(bytes.len(), 16);
+ // bytes[8..10] = "hi", bytes[15] = 2 | 0x80
+ assert_eq!(&bytes[8..10], b"hi");
+ assert_eq!(bytes[15], 0x82);
+ }
+
+ #[test]
+ fn write_long_string_in_var_part() {
+ let mut w = writer_for_arity(1);
+ let s = "this_is_a_long_string"; // 21 bytes -> var part, rounded to 24
+ w.write_string(s);
+ let bytes = w.to_bytes();
+ assert_eq!(bytes.len(), 16 + 24);
+ // Field slot at 8..16 is (offset=16 << 32) | 21
+ let packed = i64::from_le_bytes(bytes[8..16].try_into().unwrap());
+ let offset = (packed >> 32) as usize;
+ let size = (packed & 0xFFFFFFFF) as usize;
+ assert_eq!(offset, 16);
+ assert_eq!(size, 21);
+ assert_eq!(&bytes[offset..offset + size], s.as_bytes());
+ }
+
+ #[test]
+ fn set_null_at_marks_bit_and_zeroes_slot() {
+ let mut w = writer_for_arity(2);
+ w.set_null_at(0);
+ w.write_int(7);
+ let bytes = w.to_bytes();
+ // bit 0+8 = 8 -> null bitset byte index 1, bit 0 -> 0x01
+ assert_eq!(bytes[1], 0x01);
+ assert_eq!(&bytes[8..16], &[0u8; 8]);
+ assert_eq!(&bytes[16..20], &7_i32.to_le_bytes());
+ }
+
+ #[test]
+ fn reset_clears_null_bits_and_reuses_buffer() {
+ let mut w = PaimonBinaryRowWriter::new(2);
+ w.write_change_type_insert();
+ w.set_null_at(0);
+ w.write_int(7);
+ let first = w.to_bytes().to_vec();
+
+ // Re-encode the same row after reset; bytes should match.
+ w.reset();
+ w.write_change_type_insert();
+ w.set_null_at(0);
+ w.write_int(7);
+ let second = w.to_bytes().to_vec();
+
+ assert_eq!(first, second);
+ }
+
+ #[test]
+ fn buffer_grows_for_large_var_len() {
+ let mut w = PaimonBinaryRowWriter::new(1);
+ w.write_change_type_insert();
+ let big: Vec = (0..200u8).collect();
+ w.write_bytes(&big);
+ let bytes = w.to_bytes();
+ // var part rounded up: 200 -> 200 (already multiple of 8)
+ assert_eq!(bytes.len(), 16 + 200);
+ let packed = i64::from_le_bytes(bytes[8..16].try_into().unwrap());
+ let off = (packed >> 32) as usize;
+ let size = (packed & 0xFFFFFFFF) as usize;
+ assert_eq!(off, 16);
+ assert_eq!(size, 200);
+ assert_eq!(&bytes[off..off + size], big.as_slice());
+ }
+
+ #[test]
+ fn create_value_writer_rejects_array() {
+ let dt = DataTypes::array(DataTypes::int());
+ let res = PaimonBinaryRowWriter::create_value_writer(&dt);
+ match res {
+ Err(Error::UnsupportedOperation { message }) => {
+ assert!(message.contains("Unsupported type for Paimon BinaryRow writer"));
+ }
+ Err(other) => panic!("expected UnsupportedOperation, got {other:?}"),
+ Ok(_) => panic!("expected error, got Ok"),
+ }
+ }
+
+ #[test]
+ fn create_value_writer_accepts_scalars() {
+ let cases = [
+ DataTypes::int(),
+ DataTypes::bigint(),
+ DataTypes::string(),
+ DataTypes::char(8),
+ DataTypes::boolean(),
+ DataTypes::float(),
+ DataTypes::double(),
+ DataTypes::binary(8),
+ DataTypes::bytes(),
+ DataTypes::date(),
+ DataTypes::time(),
+ DataTypes::decimal(10, 2),
+ DataTypes::timestamp(),
+ ];
+ for dt in cases {
+ PaimonBinaryRowWriter::create_value_writer(&dt)
+ .unwrap_or_else(|e| panic!("expected scalar {dt:?} accepted, got {e}"));
+ }
+ }
+}
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 977b69d1..aaeca70d 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -78,12 +78,12 @@ impl ApiKey {
| ApiKey::GetDatabaseInfo
| ApiKey::CreatePartition
| ApiKey::DropPartition
- | ApiKey::Authenticate
- // TODO(key-encoding-v1): The Java server supports v0..v1 for these
- // APIs, but the Rust client has not yet implemented the v1 key
- // encoding format. Pinned to v0 until that is done.
- | ApiKey::PutKv | ApiKey::Lookup | ApiKey::PrefixLookup => {
- Some(ApiVersionRange::new(ApiVersion(0), ApiVersion(0)))
+ | ApiKey::Authenticate => Some(ApiVersionRange::new(ApiVersion(0), ApiVersion(0))),
+ // PutKv / Lookup / PrefixLookup support v0 (legacy key encoding)
+ // and v1 (Paimon BinaryRow key encoding for kv_format_version=2
+ // non-default bucket keys). The Rust client encodes both.
+ ApiKey::PutKv | ApiKey::Lookup | ApiKey::PrefixLookup => {
+ Some(ApiVersionRange::new(ApiVersion(0), ApiVersion(1)))
}
Unknown(_) => None,
}
diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs
index 4da7c75d..4ea0c1ae 100644
--- a/crates/fluss/tests/integration/kv_table.rs
+++ b/crates/fluss/tests/integration/kv_table.rs
@@ -1252,6 +1252,270 @@ mod kv_table_test {
.expect("Failed to drop table");
}
+ /// Verifies upsert/lookup/update/delete on a KV table whose data lake format
+ /// is `paimon`, exercising every scalar `DataType` supported by Paimon's
+ /// BinaryRow encoder. With the default `kv_format_version=1`, the client
+ /// uses `PaimonKeyEncoder` for both the bucket key and the primary key,
+ /// so this also exercises the end-to-end Paimon key encoding path.
+ /// ARRAY/MAP/ROW are intentionally excluded because Paimon's BinaryRow
+ /// writer rejects them.
+ #[tokio::test]
+ async fn upsert_and_lookup_with_paimon_format() {
+ let cluster = get_shared_cluster();
+ let connection = cluster.get_fluss_connection().await;
+
+ let admin = connection.get_admin().unwrap();
+
+ let table_path = TablePath::new("fluss", "test_kv_paimon_format");
+
+ // Schema covering every Paimon-supported scalar type.
+ // INT primary key keeps the bucket-key/primary-key encoding focused
+ // while the value columns sweep all remaining scalar types.
+ let table_descriptor = TableDescriptor::builder()
+ .schema(
+ Schema::builder()
+ .column("id", DataTypes::int())
+ .column("col_tinyint", DataTypes::tinyint())
+ .column("col_smallint", DataTypes::smallint())
+ .column("col_bigint", DataTypes::bigint())
+ .column("col_float", DataTypes::float())
+ .column("col_double", DataTypes::double())
+ .column("col_boolean", DataTypes::boolean())
+ .column("col_char", DataTypes::char(10))
+ .column("col_string", DataTypes::string())
+ .column("col_decimal", DataTypes::decimal(10, 2))
+ .column("col_date", DataTypes::date())
+ .column("col_time_s", DataTypes::time_with_precision(0))
+ .column("col_time_ms", DataTypes::time_with_precision(3))
+ .column("col_time_us", DataTypes::time_with_precision(6))
+ .column("col_time_ns", DataTypes::time_with_precision(9))
+ .column("col_timestamp", DataTypes::timestamp_with_precision(0))
+ .column(
+ "col_timestamp_ltz",
+ DataTypes::timestamp_ltz_with_precision(6),
+ )
+ .column("col_bytes", DataTypes::bytes())
+ .column("col_binary", DataTypes::binary(4))
+ .primary_key(vec!["id"])
+ .build()
+ .expect("Failed to build schema"),
+ )
+ .property("table.datalake.format", "paimon")
+ .build()
+ .expect("Failed to build table");
+
+ create_table(&admin, &table_path, &table_descriptor).await;
+
+ let table = connection.get_table(&table_path).await.unwrap();
+
+ let field_count = table.get_table_info().schema.columns().len();
+
+ let table_upsert = table.new_upsert().expect("Failed to create upsert");
+ let upsert_writer = table_upsert
+ .create_writer()
+ .expect("Failed to create writer");
+
+ // Test data covering every Paimon scalar type.
+ let id: i32 = 1;
+ let col_tinyint: i8 = 127;
+ let col_smallint: i16 = 32767;
+ let col_bigint: i64 = 9_223_372_036_854_775_807;
+ let col_float: f32 = std::f32::consts::PI;
+ let col_double: f64 = std::f64::consts::E;
+ let col_boolean: bool = true;
+ let col_char: &str = "hello";
+ let col_string: &str = "world of fluss rust client (paimon format)";
+ let col_decimal = Decimal::from_unscaled_long(12345, 10, 2).unwrap(); // 123.45
+ let col_date = Date::new(20476); // 2026-01-23
+ let col_time_s = Time::new(36_827_000); // 10:13:47
+ let col_time_ms = Time::new(36_827_123);
+ let col_time_us = Time::new(86_399_999);
+ let col_time_ns = Time::new(1);
+ // col_timestamp uses precision 0 (second granularity); col_timestamp_ltz
+ // uses precision 6 (microsecond granularity) to also exercise the
+ // sub-millisecond nanos path.
+ let col_timestamp = TimestampNtz::new(1_769_163_227_000);
+ let col_timestamp_ltz =
+ TimestampLtz::from_millis_nanos(1_769_163_227_123, 456_000).unwrap();
+ let col_bytes: Vec = b"binary data".to_vec();
+ let col_binary: Vec = vec![0xDE, 0xAD, 0xBE, 0xEF];
+
+ let build_full_row = |id_val: i32| {
+ let mut row = GenericRow::new(field_count);
+ row.set_field(0, id_val);
+ row.set_field(1, col_tinyint);
+ row.set_field(2, col_smallint);
+ row.set_field(3, col_bigint);
+ row.set_field(4, col_float);
+ row.set_field(5, col_double);
+ row.set_field(6, col_boolean);
+ row.set_field(7, col_char);
+ row.set_field(8, col_string);
+ row.set_field(9, col_decimal.clone());
+ row.set_field(10, col_date);
+ row.set_field(11, col_time_s);
+ row.set_field(12, col_time_ms);
+ row.set_field(13, col_time_us);
+ row.set_field(14, col_time_ns);
+ row.set_field(15, col_timestamp);
+ row.set_field(16, col_timestamp_ltz);
+ row.set_field(17, col_bytes.as_slice());
+ row.set_field(18, col_binary.as_slice());
+ row
+ };
+
+ // Upsert two rows so we can also exercise delete on one and keep the
+ // other intact afterwards.
+ upsert_writer
+ .upsert(&build_full_row(id))
+ .expect("Failed to upsert paimon row");
+ upsert_writer
+ .upsert(&build_full_row(id + 1))
+ .expect("Failed to upsert second paimon row");
+ upsert_writer.flush().await.expect("Failed to flush");
+
+ let mut lookuper = table
+ .new_lookup()
+ .expect("Failed to create lookup")
+ .create_lookuper()
+ .expect("Failed to create lookuper");
+
+ // Verify every type round-trips through paimon-format upsert/lookup.
+ let assert_full_row = |row: &dyn InternalRow, expected_id: i32| {
+ assert_eq!(row.get_int(0).unwrap(), expected_id, "id mismatch");
+ assert_eq!(row.get_byte(1).unwrap(), col_tinyint, "tinyint mismatch");
+ assert_eq!(row.get_short(2).unwrap(), col_smallint, "smallint mismatch");
+ assert_eq!(row.get_long(3).unwrap(), col_bigint, "bigint mismatch");
+ assert!(
+ (row.get_float(4).unwrap() - col_float).abs() < f32::EPSILON,
+ "float mismatch",
+ );
+ assert!(
+ (row.get_double(5).unwrap() - col_double).abs() < f64::EPSILON,
+ "double mismatch",
+ );
+ assert_eq!(row.get_boolean(6).unwrap(), col_boolean, "boolean mismatch");
+ assert_eq!(row.get_char(7, 10).unwrap(), col_char, "char mismatch");
+ assert_eq!(row.get_string(8).unwrap(), col_string, "string mismatch");
+ assert_eq!(
+ row.get_decimal(9, 10, 2).unwrap(),
+ col_decimal,
+ "decimal mismatch",
+ );
+ assert_eq!(
+ row.get_date(10).unwrap().get_inner(),
+ col_date.get_inner(),
+ "date mismatch",
+ );
+ assert_eq!(
+ row.get_time(11).unwrap().get_inner(),
+ col_time_s.get_inner(),
+ "time(0) mismatch",
+ );
+ assert_eq!(
+ row.get_time(12).unwrap().get_inner(),
+ col_time_ms.get_inner(),
+ "time(3) mismatch",
+ );
+ assert_eq!(
+ row.get_time(13).unwrap().get_inner(),
+ col_time_us.get_inner(),
+ "time(6) mismatch",
+ );
+ assert_eq!(
+ row.get_time(14).unwrap().get_inner(),
+ col_time_ns.get_inner(),
+ "time(9) mismatch",
+ );
+ assert_eq!(
+ row.get_timestamp_ntz(15, 0).unwrap().get_millisecond(),
+ col_timestamp.get_millisecond(),
+ "timestamp(0) mismatch",
+ );
+ let ts_ltz = row.get_timestamp_ltz(16, 6).unwrap();
+ assert_eq!(
+ ts_ltz.get_epoch_millisecond(),
+ col_timestamp_ltz.get_epoch_millisecond(),
+ "timestamp_ltz(6) millis mismatch",
+ );
+ assert_eq!(
+ ts_ltz.get_nano_of_millisecond(),
+ col_timestamp_ltz.get_nano_of_millisecond(),
+ "timestamp_ltz(6) nanos mismatch",
+ );
+ assert_eq!(row.get_bytes(17).unwrap(), col_bytes, "bytes mismatch");
+ assert_eq!(
+ row.get_binary(18, 4).unwrap(),
+ col_binary,
+ "binary mismatch",
+ );
+ };
+
+ for expected_id in [id, id + 1] {
+ let result = lookuper
+ .lookup(&make_key_with_field_count(expected_id, 1))
+ .await
+ .expect("Failed to lookup");
+ let row = result.get_single_row().unwrap().expect("Row should exist");
+ assert_full_row(&row, expected_id);
+ }
+
+ // Update one row by upserting again with a different decimal value to
+ // verify update path under paimon format.
+ let updated_decimal = Decimal::from_unscaled_long(98765, 10, 2).unwrap(); // 987.65
+ let mut updated_row = build_full_row(id);
+ updated_row.set_field(9, updated_decimal.clone());
+ upsert_writer
+ .upsert(&updated_row)
+ .expect("Failed to upsert updated row")
+ .await
+ .expect("Failed to wait for upsert acknowledgment");
+
+ let result = lookuper
+ .lookup(&make_key_with_field_count(id, 1))
+ .await
+ .expect("Failed to lookup after update");
+ let found_row = result.get_single_row().unwrap().expect("Row should exist");
+ assert_eq!(
+ found_row.get_decimal(9, 10, 2).unwrap(),
+ updated_decimal,
+ "Decimal should be updated under paimon format",
+ );
+ // Other columns remain unchanged.
+ assert_eq!(found_row.get_string(8).unwrap(), col_string);
+
+ // Delete the updated record.
+ let mut delete_row = GenericRow::new(field_count);
+ delete_row.set_field(0, id);
+ upsert_writer
+ .delete(&delete_row)
+ .expect("Failed to delete")
+ .await
+ .expect("Failed to wait for delete acknowledgment");
+
+ let result = lookuper
+ .lookup(&make_key_with_field_count(id, 1))
+ .await
+ .expect("Failed to lookup deleted record");
+ assert!(
+ result.get_single_row().unwrap().is_none(),
+ "Record {id} should not exist after delete under paimon format",
+ );
+
+ // The other record remains intact and still round-trips correctly.
+ let result = lookuper
+ .lookup(&make_key_with_field_count(id + 1, 1))
+ .await
+ .expect("Failed to lookup remaining record");
+ let row = result.get_single_row().unwrap().expect("Row should exist");
+ assert_full_row(&row, id + 1);
+
+ admin
+ .drop_table(&table_path, false)
+ .await
+ .expect("Failed to drop table");
+ }
+
/// KV upsert + lookup against a schema covering every supported data type.
#[tokio::test]
async fn all_supported_datatypes() {