Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/iceberg/avro/avro_data_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,19 @@ Status AppendPrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
return {};
}

case TypeId::kTimestampNs:
case TypeId::kTimestampTzNs: {
if (avro_node->type() != ::avro::AVRO_LONG ||
avro_node->logicalType().type() != ::avro::LogicalType::TIMESTAMP_NANOS) {
return InvalidArgument(
"Expected Avro long with TIMESTAMP_NANOS for timestamp field, got: {}",
ToString(avro_node));
}
auto* builder = internal::checked_cast<::arrow::TimestampBuilder*>(array_builder);
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(avro_datum.value<int64_t>()));
return {};
}

default:
return InvalidArgument("Unsupported primitive type {} to append avro node {}",
projected_field.type()->ToString(), ToString(avro_node));
Expand Down
14 changes: 14 additions & 0 deletions src/iceberg/avro/avro_direct_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,20 @@ Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
return {};
}

case TypeId::kTimestampNs:
case TypeId::kTimestampTzNs: {
if (avro_node->type() != ::avro::AVRO_LONG ||
avro_node->logicalType().type() != ::avro::LogicalType::TIMESTAMP_NANOS) {
return InvalidArgument(
"Expected Avro long with TIMESTAMP_NANOS for timestamp field, got: {}",
ToString(avro_node));
}
auto* builder = internal::checked_cast<::arrow::TimestampBuilder*>(array_builder);
int64_t value = decoder.decodeLong();
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
return {};
}

default:
return InvalidArgument("Unsupported primitive type {} to decode from avro node {}",
projected_field.type()->ToString(), ToString(avro_node));
Expand Down
32 changes: 32 additions & 0 deletions src/iceberg/avro/avro_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,24 @@ Status ToAvroNodeVisitor::Visit(const TimestampTzType& type, ::avro::NodePtr* no
return {};
}

Status ToAvroNodeVisitor::Visit(const TimestampNsType& type, ::avro::NodePtr* node) {
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG);
(*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::TIMESTAMP_NANOS});
::avro::CustomAttributes attributes;
attributes.addAttribute(std::string(kAdjustToUtcProp), "false", /*addQuotes=*/false);
(*node)->addCustomAttributesForField(attributes);
return {};
}

Status ToAvroNodeVisitor::Visit(const TimestampTzNsType& type, ::avro::NodePtr* node) {
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_LONG);
(*node)->setLogicalType(::avro::LogicalType{::avro::LogicalType::TIMESTAMP_NANOS});
::avro::CustomAttributes attributes;
attributes.addAttribute(std::string(kAdjustToUtcProp), "true", /*addQuotes=*/false);
(*node)->addCustomAttributesForField(attributes);
return {};
}

Status ToAvroNodeVisitor::Visit(const StringType& type, ::avro::NodePtr* node) {
*node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_STRING);
return {};
Expand Down Expand Up @@ -548,6 +566,20 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type,
return {};
}
break;
case TypeId::kTimestampNs:
if (avro_node->type() == ::avro::AVRO_LONG &&
HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_NANOS) &&
GetAdjustToUtc(avro_node).value_or("false") == "false") {
return {};
}
break;
case TypeId::kTimestampTzNs:
if (avro_node->type() == ::avro::AVRO_LONG &&
HasLogicalType(avro_node, ::avro::LogicalType::TIMESTAMP_NANOS) &&
GetAdjustToUtc(avro_node).value_or("false") == "true") {
return {};
}
break;
case TypeId::kString:
if (avro_node->type() == ::avro::AVRO_STRING) {
return {};
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/avro/avro_schema_util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class ToAvroNodeVisitor {
Status Visit(const TimeType& type, ::avro::NodePtr* node);
Status Visit(const TimestampType& type, ::avro::NodePtr* node);
Status Visit(const TimestampTzType& type, ::avro::NodePtr* node);
Status Visit(const TimestampNsType& type, ::avro::NodePtr* node);
Status Visit(const TimestampTzNsType& type, ::avro::NodePtr* node);
Status Visit(const StringType& type, ::avro::NodePtr* node);
Status Visit(const UuidType& type, ::avro::NodePtr* node);
Status Visit(const FixedType& type, ::avro::NodePtr* node);
Expand Down
25 changes: 25 additions & 0 deletions src/iceberg/expression/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ Result<nlohmann::json> ToJson(const Literal& literal) {
case TypeId::kTimestampTz:
return nlohmann::json(
TransformUtil::HumanTimestampWithZone(std::get<int64_t>(value)));
case TypeId::kTimestampNs:
return nlohmann::json(TransformUtil::HumanTimestampNs(std::get<int64_t>(value)));
case TypeId::kTimestampTzNs:
return nlohmann::json(
TransformUtil::HumanTimestampNsWithZone(std::get<int64_t>(value)));
case TypeId::kFloat:
return nlohmann::json(std::get<float>(value));
case TypeId::kDouble:
Expand Down Expand Up @@ -390,6 +395,26 @@ Result<Literal> LiteralFromJson(const nlohmann::json& json, const Type* type) {
return Literal::TimestampTz(micros);
}

case TypeId::kTimestampNs: {
if (!json.is_string()) [[unlikely]] {
return JsonParseError("Cannot parse {} as a timestamp_ns value",
SafeDumpJson(json));
}
ICEBERG_ASSIGN_OR_RAISE(auto nanos,
TransformUtil::ParseTimestampNs(json.get<std::string>()));
return Literal::TimestampNs(nanos);
}

case TypeId::kTimestampTzNs: {
if (!json.is_string()) [[unlikely]] {
return JsonParseError("Cannot parse {} as a timestamptz_ns value",
SafeDumpJson(json));
}
ICEBERG_ASSIGN_OR_RAISE(
auto nanos, TransformUtil::ParseTimestampNsWithZone(json.get<std::string>()));
return Literal::TimestampTzNs(nanos);
}

case TypeId::kUuid: {
if (!json.is_string()) [[unlikely]] {
return JsonParseError("Cannot parse {} as a uuid value", SafeDumpJson(json));
Expand Down
66 changes: 60 additions & 6 deletions src/iceberg/expression/literal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ Result<Literal> LiteralCaster::CastFromLong(
return Literal::Timestamp(long_val);
case TypeId::kTimestampTz:
return Literal::TimestampTz(long_val);
case TypeId::kTimestampNs:
return Literal::TimestampNs(long_val);
case TypeId::kTimestampTzNs:
return Literal::TimestampTzNs(long_val);
default:
return NotSupported("Cast from Long to {} is not supported",
target_type->ToString());
Expand Down Expand Up @@ -215,6 +219,15 @@ Result<Literal> LiteralCaster::CastFromString(
TransformUtil::ParseTimestampWithZone(str_val));
return Literal::TimestampTz(micros);
}
case TypeId::kTimestampNs: {
ICEBERG_ASSIGN_OR_RAISE(auto nanos, TransformUtil::ParseTimestampNs(str_val));
return Literal::TimestampNs(nanos);
}
case TypeId::kTimestampTzNs: {
ICEBERG_ASSIGN_OR_RAISE(auto nanos,
TransformUtil::ParseTimestampNsWithZone(str_val));
return Literal::TimestampTzNs(nanos);
}
case TypeId::kBinary: {
ICEBERG_ASSIGN_OR_RAISE(auto bytes, StringUtils::HexStringToBytes(str_val));
return Literal::Binary(std::move(bytes));
Expand Down Expand Up @@ -250,14 +263,27 @@ Result<Literal> LiteralCaster::CastFromString(
Result<Literal> LiteralCaster::CastFromTimestamp(
const Literal& literal, const std::shared_ptr<PrimitiveType>& target_type) {
auto timestamp_val = std::get<int64_t>(literal.value_);
const auto& source_timestamp =
internal::checked_cast<const TimestampBase&>(*literal.type());
const bool source_is_nanos = source_timestamp.time_unit() == TimeUnit::kNanosecond;

switch (target_type->type_id()) {
case TypeId::kDate: {
ICEBERG_ASSIGN_OR_RAISE(auto days, TemporalUtils::ExtractDay(literal));
return Literal::Date(std::get<int32_t>(days.value()));
}
case TypeId::kTimestamp:
return source_is_nanos ? Literal::Timestamp(timestamp_val / 1000)
: Literal::Timestamp(timestamp_val);
case TypeId::kTimestampTz:
return Literal::TimestampTz(timestamp_val);
return source_is_nanos ? Literal::TimestampTz(timestamp_val / 1000)
: Literal::TimestampTz(timestamp_val);
case TypeId::kTimestampNs:
return source_is_nanos ? Literal::TimestampNs(timestamp_val)
: Literal::TimestampNs(timestamp_val * 1000);
case TypeId::kTimestampTzNs:
return source_is_nanos ? Literal::TimestampTzNs(timestamp_val)
: Literal::TimestampTzNs(timestamp_val * 1000);
default:
return NotSupported("Cast from Timestamp to {} is not supported",
target_type->ToString());
Expand All @@ -266,15 +292,28 @@ Result<Literal> LiteralCaster::CastFromTimestamp(

Result<Literal> LiteralCaster::CastFromTimestampTz(
const Literal& literal, const std::shared_ptr<PrimitiveType>& target_type) {
auto micros = std::get<int64_t>(literal.value_);
auto timestamp_val = std::get<int64_t>(literal.value_);
const auto& source_timestamp =
internal::checked_cast<const TimestampBase&>(*literal.type());
const bool source_is_nanos = source_timestamp.time_unit() == TimeUnit::kNanosecond;

switch (target_type->type_id()) {
case TypeId::kDate: {
ICEBERG_ASSIGN_OR_RAISE(auto days, TemporalUtils::ExtractDay(literal));
return Literal::Date(std::get<int32_t>(days.value()));
}
case TypeId::kTimestampTz:
return source_is_nanos ? Literal::TimestampTz(timestamp_val / 1000)
: Literal::TimestampTz(timestamp_val);
case TypeId::kTimestamp:
return Literal::Timestamp(micros);
return source_is_nanos ? Literal::Timestamp(timestamp_val / 1000)
: Literal::Timestamp(timestamp_val);
case TypeId::kTimestampNs:
return source_is_nanos ? Literal::TimestampNs(timestamp_val)
: Literal::TimestampNs(timestamp_val * 1000);
case TypeId::kTimestampTzNs:
return source_is_nanos ? Literal::TimestampTzNs(timestamp_val)
: Literal::TimestampTzNs(timestamp_val * 1000);
default:
return NotSupported("Cast from TimestampTz to {} is not supported",
target_type->ToString());
Expand Down Expand Up @@ -329,6 +368,10 @@ Literal Literal::Timestamp(int64_t value) { return {Value{value}, timestamp()};

Literal Literal::TimestampTz(int64_t value) { return {Value{value}, timestamp_tz()}; }

Literal Literal::TimestampNs(int64_t value) { return {Value{value}, timestamp_ns()}; }

Literal Literal::TimestampTzNs(int64_t value) { return {Value{value}, timestamptz_ns()}; }

Literal Literal::Float(float value) { return {Value{value}, float32()}; }

Literal Literal::Double(double value) { return {Value{value}, float64()}; }
Expand Down Expand Up @@ -395,8 +438,11 @@ bool Comparable(TypeId lhs, TypeId rhs) {
case TypeId::kLong:
case TypeId::kTimestamp:
case TypeId::kTimestampTz:
case TypeId::kTimestampNs:
case TypeId::kTimestampTzNs:
return rhs == TypeId::kLong || rhs == TypeId::kTimestamp ||
rhs == TypeId::kTimestampTz;
rhs == TypeId::kTimestampTz || rhs == TypeId::kTimestampNs ||
rhs == TypeId::kTimestampTzNs;
default:
return lhs == rhs;
}
Expand Down Expand Up @@ -439,7 +485,9 @@ std::partial_ordering Literal::operator<=>(const Literal& other) const {
case TypeId::kLong:
case TypeId::kTime:
case TypeId::kTimestamp:
case TypeId::kTimestampTz: {
case TypeId::kTimestampTz:
case TypeId::kTimestampNs:
case TypeId::kTimestampTzNs: {
auto this_val = std::get<int64_t>(value_);
auto other_val = std::get<int64_t>(other.value_);
return this_val <=> other_val;
Expand Down Expand Up @@ -548,7 +596,9 @@ std::string Literal::ToString() const {
}
case TypeId::kTime:
case TypeId::kTimestamp:
case TypeId::kTimestampTz: {
case TypeId::kTimestampTz:
case TypeId::kTimestampNs:
case TypeId::kTimestampTzNs: {
return std::to_string(std::get<int64_t>(value_));
}
case TypeId::kDate: {
Expand Down Expand Up @@ -613,6 +663,10 @@ Result<Literal> LiteralCaster::CastTo(const Literal& literal,
return CastFromTimestamp(literal, target_type);
case TypeId::kTimestampTz:
return CastFromTimestampTz(literal, target_type);
case TypeId::kTimestampNs:
return CastFromTimestamp(literal, target_type);
case TypeId::kTimestampTzNs:
return CastFromTimestampTz(literal, target_type);
default:
break;
}
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/expression/literal.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class ICEBERG_EXPORT Literal : public util::Formattable {
static Literal Time(int64_t value);
static Literal Timestamp(int64_t value);
static Literal TimestampTz(int64_t value);
static Literal TimestampNs(int64_t value);
static Literal TimestampTzNs(int64_t value);
static Literal Float(float value);
static Literal Double(double value);
static Literal String(std::string value);
Expand Down Expand Up @@ -199,6 +201,8 @@ DEFINE_LITERAL_TRAIT(kLong, int64_t)
DEFINE_LITERAL_TRAIT(kTime, int64_t)
DEFINE_LITERAL_TRAIT(kTimestamp, int64_t)
DEFINE_LITERAL_TRAIT(kTimestampTz, int64_t)
DEFINE_LITERAL_TRAIT(kTimestampNs, int64_t)
DEFINE_LITERAL_TRAIT(kTimestampTzNs, int64_t)
DEFINE_LITERAL_TRAIT(kFloat, float)
DEFINE_LITERAL_TRAIT(kDouble, double)
DEFINE_LITERAL_TRAIT(kDecimal, Decimal)
Expand Down
6 changes: 3 additions & 3 deletions src/iceberg/expression/predicate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,10 @@ bool BoundLiteralPredicate::Equals(const Expression& other) const {
}
}

// TODO(gangwu): add TypeId::kTimestampNano
static const std::unordered_set<TypeId> kIntegralTypes = {
TypeId::kInt, TypeId::kLong, TypeId::kDate,
TypeId::kTime, TypeId::kTimestamp, TypeId::kTimestampTz};
TypeId::kInt, TypeId::kLong, TypeId::kDate,
TypeId::kTime, TypeId::kTimestamp, TypeId::kTimestampTz,
TypeId::kTimestampNs, TypeId::kTimestampTzNs};

if (kIntegralTypes.contains(term_->type()->type_id()) &&
term_->Equals(*other_pred->term())) {
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ nlohmann::json ToJson(const Type& type) {
return "timestamp";
case TypeId::kTimestampTz:
return "timestamptz";
case TypeId::kTimestampNs:
return "timestamp_ns";
case TypeId::kTimestampTzNs:
return "timestamptz_ns";
case TypeId::kString:
return "string";
case TypeId::kBinary:
Expand Down Expand Up @@ -488,6 +492,10 @@ Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json) {
return std::make_unique<TimestampType>();
} else if (type_str == "timestamptz") {
return std::make_unique<TimestampTzType>();
} else if (type_str == "timestamp_ns") {
return std::make_unique<TimestampNsType>();
} else if (type_str == "timestamptz_ns") {
return std::make_unique<TimestampTzNsType>();
} else if (type_str == "string") {
return std::make_unique<StringType>();
} else if (type_str == "binary") {
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/manifest/manifest_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ Status ManifestEntryAdapter::AppendPartitionValues(
case TypeId::kTime:
case TypeId::kTimestamp:
case TypeId::kTimestampTz:
case TypeId::kTimestampNs:
case TypeId::kTimestampTzNs:
ICEBERG_RETURN_UNEXPECTED(
AppendField(child_array, std::get<int64_t>(partition_value.value())));
break;
Expand Down
20 changes: 20 additions & 0 deletions src/iceberg/parquet/parquet_schema_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,26 @@ Status ValidateParquetSchemaEvolution(
}
}
break;
case TypeId::kTimestampNs:
if (arrow_type->id() == ::arrow::Type::TIMESTAMP) {
const auto& timestamp_type =
internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type);
if (timestamp_type.unit() == ::arrow::TimeUnit::NANO &&
timestamp_type.timezone().empty()) {
return {};
}
}
break;
case TypeId::kTimestampTzNs:
if (arrow_type->id() == ::arrow::Type::TIMESTAMP) {
const auto& timestamp_type =
internal::checked_cast<const ::arrow::TimestampType&>(*arrow_type);
if (timestamp_type.unit() == ::arrow::TimeUnit::NANO &&
!timestamp_type.timezone().empty()) {
return {};
}
}
break;
case TypeId::kString:
if (arrow_type->id() == ::arrow::Type::STRING) {
return {};
Expand Down
2 changes: 2 additions & 0 deletions src/iceberg/row/partition_values.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Result<Scalar> PartitionValues::GetField(size_t pos) const {
case TypeId::kTime:
case TypeId::kTimestamp:
case TypeId::kTimestampTz:
case TypeId::kTimestampNs:
case TypeId::kTimestampTzNs:
return Scalar{std::get<int64_t>(literal.value())};
case TypeId::kFloat:
return Scalar{std::get<float>(literal.value())};
Expand Down
6 changes: 6 additions & 0 deletions src/iceberg/row/struct_like.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Result<Scalar> LiteralToScalar(const Literal& literal) {
case TypeId::kTime:
case TypeId::kTimestamp:
case TypeId::kTimestampTz:
case TypeId::kTimestampNs:
case TypeId::kTimestampTzNs:
return Scalar{std::get<int64_t>(literal.value())};
case TypeId::kFloat:
return Scalar{std::get<float>(literal.value())};
Expand Down Expand Up @@ -152,6 +154,10 @@ Result<Literal> StructLikeAccessor::GetLiteral(const StructLike& struct_like) co
return Literal::Timestamp(std::get<int64_t>(scalar));
case TypeId::kTimestampTz:
return Literal::TimestampTz(std::get<int64_t>(scalar));
case TypeId::kTimestampNs:
return Literal::TimestampNs(std::get<int64_t>(scalar));
case TypeId::kTimestampTzNs:
return Literal::TimestampTzNs(std::get<int64_t>(scalar));
case TypeId::kFixed: {
const auto& fixed_data = std::get<std::string_view>(scalar);
return Literal::Fixed(std::vector<uint8_t>(fixed_data.cbegin(), fixed_data.cend()));
Expand Down
Loading
Loading