Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,4 @@ compile_commands.json
.github

.worktrees/
.worktree_initialized
1 change: 1 addition & 0 deletions be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ add_thirdparty(gmock)
add_thirdparty(snappy)
add_thirdparty(curl)
add_thirdparty(lz4)
add_thirdparty(lzo2)
add_thirdparty(thrift)
add_thirdparty(thriftnb)
add_thirdparty(crc32c)
Expand Down
4 changes: 4 additions & 0 deletions be/src/core/data_type/data_type_timestamptz.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class DataTypeTimeStampTz final : public DataTypeNumberBase<PrimitiveType::TYPE_
return "TimeStampTz(" + std::to_string(_scale) + ")";
}

void to_protobuf(PTypeDesc* ptype, PTypeNode* node, PScalarType* scalar_type) const override {
scalar_type->set_scale(_scale);
}

void to_pb_column_meta(PColumnMeta* col_meta) const override {
DataTypeNumberBase<PrimitiveType::TYPE_TIMESTAMPTZ>::to_pb_column_meta(col_meta);
col_meta->mutable_decimal_param()->set_scale(_scale);
Expand Down
143 changes: 143 additions & 0 deletions be/src/core/data_type_serde/data_type_datetimev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "core/value/vdatetime_value.h"
#include "exprs/function/cast/cast_to_datetimev2_impl.hpp"
Expand All @@ -43,6 +44,95 @@ enum {
namespace doris {
static const int64_t micro_to_nano_second = 1000;

namespace {

#pragma pack(1)
struct DecodedInt96Timestamp {
int64_t nanos_of_day;
int32_t julian_day;

int64_t to_timestamp_micros() const {
static constexpr int32_t JULIAN_EPOCH_OFFSET_DAYS = 2440588;
static constexpr int64_t MICROS_IN_DAY = 86400000000;
static constexpr int64_t NANOS_PER_MICROSECOND = 1000;
return (julian_day - JULIAN_EPOCH_OFFSET_DAYS) * MICROS_IN_DAY +
nanos_of_day / NANOS_PER_MICROSECOND;
}
};
#pragma pack()
static_assert(sizeof(DecodedInt96Timestamp) == 12);

Status append_datetimev2_from_epoch_micros(ColumnDateTimeV2::Container& data,
int64_t timestamp_micros) {
static constexpr int64_t MICROS_PER_SECOND = 1000000;
static constexpr int64_t MICROS_PER_MINUTE = MICROS_PER_SECOND * 60;
static constexpr int64_t MICROS_PER_HOUR = MICROS_PER_MINUTE * 60;
static constexpr int64_t MICROS_PER_DAY = MICROS_PER_HOUR * 24;
static const int64_t EPOCH_DAYNR = calc_daynr(1970, 1, 1);

int64_t days_since_epoch = timestamp_micros / MICROS_PER_DAY;
int64_t micros_of_day = timestamp_micros % MICROS_PER_DAY;
if (micros_of_day < 0) {
micros_of_day += MICROS_PER_DAY;
--days_since_epoch;
}

const int64_t daynr = EPOCH_DAYNR + days_since_epoch;
if (daynr <= 0) {
return Status::DataQualityError(
"Decoded DATETIMEV2 timestamp is out of range: micros={}, daynr={}",
timestamp_micros, daynr);
}

DateV2Value<DateTimeV2ValueType> datetime_value;
if (!datetime_value.get_date_from_daynr(static_cast<uint64_t>(daynr))) {
return Status::DataQualityError(
"Decoded DATETIMEV2 timestamp is out of range: micros={}, daynr={}",
timestamp_micros, daynr);
}

const auto hour = static_cast<uint8_t>(micros_of_day / MICROS_PER_HOUR);
micros_of_day %= MICROS_PER_HOUR;
const auto minute = static_cast<uint8_t>(micros_of_day / MICROS_PER_MINUTE);
micros_of_day %= MICROS_PER_MINUTE;
const auto second = static_cast<uint16_t>(micros_of_day / MICROS_PER_SECOND);
const auto microsecond = static_cast<uint32_t>(micros_of_day % MICROS_PER_SECOND);
datetime_value.unchecked_set_time(datetime_value.year(), datetime_value.month(),
datetime_value.day(), hour, minute, second, microsecond);
data.push_back(datetime_value);
return Status::OK();
}

void append_datetimev2_from_utc_epoch_micros(ColumnDateTimeV2::Container& data,
int64_t timestamp_micros,
const cctz::time_zone& timezone) {
static constexpr int64_t MICROS_PER_SECOND = 1000000;

int64_t epoch_seconds = timestamp_micros / MICROS_PER_SECOND;
int64_t micros_of_second = timestamp_micros % MICROS_PER_SECOND;
if (micros_of_second < 0) {
micros_of_second += MICROS_PER_SECOND;
--epoch_seconds;
}

DateV2Value<DateTimeV2ValueType> datetime_value;
datetime_value.from_unixtime(epoch_seconds, timezone);
datetime_value.set_microsecond(static_cast<uint32_t>(micros_of_second));
data.push_back(datetime_value);
}

int64_t decoded_timestamp_micros(const DecodedColumnView& view, int64_t value) {
if (view.time_unit == DecodedTimeUnit::MILLIS) {
return value * 1000;
}
if (view.time_unit == DecodedTimeUnit::NANOS) {
return value / 1000;
}
return value;
}

} // namespace

// NOLINTBEGIN(readability-function-size)
// NOLINTBEGIN(readability-function-cognitive-complexity)
Status DataTypeDateTimeV2SerDe::from_string_batch(const ColumnString& col_str,
Expand Down Expand Up @@ -451,6 +541,59 @@ Status DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
return Status::OK();
}

Status DataTypeDateTimeV2SerDe::read_column_from_decoded_values(
IColumn& column, const DecodedColumnView& view) const {
if (view.value_kind != DecodedValueKind::INT64 && view.value_kind != DecodedValueKind::INT96) {
return decoded_column_view_handle_conversion_failure(
column, view,
Status::NotSupported("DATETIMEV2 decoded reader expects INT64 or INT96 source"));
}
if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data = assert_cast<ColumnDateTimeV2&>(column).get_data();
const auto old_size = data.size();
if (view.value_kind == DecodedValueKind::INT96) {
const auto* values = reinterpret_cast<const DecodedInt96Timestamp*>(view.values);
static const auto utc_timezone = cctz::utc_time_zone();
const auto& timezone = view.timezone == nullptr ? utc_timezone : *view.timezone;
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(DateV2Value<DateTimeV2ValueType>());
continue;
}
append_datetimev2_from_utc_epoch_micros(data, values[row].to_timestamp_micros(),
timezone);
}
return Status::OK();
}

const auto* values = reinterpret_cast<const int64_t*>(view.values);
static const auto utc_timezone = cctz::utc_time_zone();
const auto& timezone = view.timezone == nullptr ? utc_timezone : *view.timezone;
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(DateV2Value<DateTimeV2ValueType>());
continue;
}
const int64_t timestamp_micros = decoded_timestamp_micros(view, values[row]);
if (view.timestamp_is_adjusted_to_utc) {
append_datetimev2_from_utc_epoch_micros(data, timestamp_micros, timezone);
} else {
auto st = append_datetimev2_from_epoch_micros(data, timestamp_micros);
if (!st.ok()) {
if (decoded_column_view_can_null_on_conversion_failure(view)) {
decoded_column_view_insert_null_on_conversion_failure(column, view, row);
continue;
}
data.resize(old_size);
return st;
}
}
}
return Status::OK();
}

Status DataTypeDateTimeV2SerDe::write_column_to_mysql_binary(const IColumn& column,
MysqlRowBinaryBuffer& result,
int64_t row_idx, bool col_const,
Expand Down
2 changes: 2 additions & 0 deletions be/src/core/data_type_serde/data_type_datetimev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class DataTypeDateTimeV2SerDe : public DataTypeNumberSerDe<PrimitiveType::TYPE_D
const cctz::time_zone& ctz) const override;
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
int64_t end, const cctz::time_zone& ctz) const override;
Status read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const override;

Status write_column_to_mysql_binary(const IColumn& column, MysqlRowBinaryBuffer& row_buffer,
int64_t row_idx, bool col_const,
Expand Down
24 changes: 24 additions & 0 deletions be/src/core/data_type_serde/data_type_datev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/define_primitive_type.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "core/types.h"
#include "core/value/vdatetime_value.h"
#include "exprs/function/cast/cast_to_datev2_impl.hpp"
Expand Down Expand Up @@ -124,6 +125,29 @@ Status DataTypeDateV2SerDe::read_column_from_arrow(IColumn& column, const arrow:
return Status::OK();
}

Status DataTypeDateV2SerDe::read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const {
if (view.value_kind != DecodedValueKind::INT32) {
return decoded_column_view_handle_conversion_failure(
column, view, Status::NotSupported("DATEV2 decoded reader expects INT32 source"));
}
if (view.values == nullptr && decoded_column_view_has_non_null_value(view)) {
return Status::Corruption("Decoded value buffer is null for {}", column.get_name());
}
auto& data = assert_cast<ColumnDateV2&>(column).get_data();
const auto* values = reinterpret_cast<const int32_t*>(view.values);
for (int64_t row = 0; row < view.row_count; ++row) {
if (decoded_column_view_row_is_null(view, row)) {
data.push_back(DateV2Value<DateV2ValueType>());
continue;
}
DateV2Value<DateV2ValueType> date_v2;
date_v2.get_date_from_daynr(values[row] + date_threshold);
data.push_back(date_v2);
}
return Status::OK();
}

Status DataTypeDateV2SerDe::write_column_to_mysql_binary(const IColumn& column,
MysqlRowBinaryBuffer& result,
int64_t row_idx, bool col_const,
Expand Down
2 changes: 2 additions & 0 deletions be/src/core/data_type_serde/data_type_datev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class DataTypeDateV2SerDe : public DataTypeNumberSerDe<PrimitiveType::TYPE_DATEV
const cctz::time_zone& ctz) const override;
Status read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int64_t start,
int64_t end, const cctz::time_zone& ctz) const override;
Status read_column_from_decoded_values(IColumn& column,
const DecodedColumnView& view) const override;
Status write_column_to_mysql_binary(const IColumn& column, MysqlRowBinaryBuffer& row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const override;
Expand Down
Loading
Loading