Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 86 additions & 46 deletions be/src/storage/segment/row_binlog_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Status RowBinlogSegmentWriter::init() {
_binlog_col_start_id = static_cast<uint32_t>(lsn_col_id);
_normal_col_start_id = lsn_col_id == 0 ? BINLOG_COLNUM : 0;

uint32_t normal_col_num = cast_set<uint32_t>(source_schema->num_visible_columns());
uint32_t normal_col_num = cast_set<uint32_t>(_source_data_writer->normal_column_count());
_before_col_start_id = _normal_col_start_id + normal_col_num;

if (!_write_before && _tablet_schema->num_columns() > normal_col_num + BINLOG_COLNUM) {
Expand Down Expand Up @@ -111,6 +111,7 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos,
if (UNLIKELY(source_schema == nullptr)) {
return Status::InternalError("binlog<row> writer missing source_tablet_schema");
}
const size_t normal_column_count = _source_data_writer->normal_column_count();

bool is_partial_update = _binlog_opts.source.partial_update_info &&
_binlog_opts.source.partial_update_info->is_partial_update() &&
Expand All @@ -119,6 +120,7 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos,
std::vector<uint32_t> partial_cids =
is_partial_update ? _binlog_opts.source.partial_update_info->update_cids
: std::vector<uint32_t>();
std::vector<uint32_t> row_binlog_partial_cids = partial_cids;
if (is_partial_update) {
if (block->columns() <= source_schema->num_key_columns() ||
block->columns() >= source_schema->num_columns()) {
Expand All @@ -129,11 +131,12 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos,
_tablet_schema->num_columns()));
}

// binlog don't need invisible column
auto erase_invisible_col = std::remove_if(
partial_cids.begin(), partial_cids.end(),
[&](uint32_t cid) { return cid >= source_schema->num_visible_columns(); });
partial_cids.erase(erase_invisible_col, partial_cids.end());
// Partial update lists source cids. Row-binlog normal columns are a source-schema prefix;
// keep only normal columns and skip trailing hidden internal columns.
auto erase_non_normal_col =
std::remove_if(row_binlog_partial_cids.begin(), row_binlog_partial_cids.end(),
[&](uint32_t cid) { return cid >= normal_column_count; });
row_binlog_partial_cids.erase(erase_non_normal_col, row_binlog_partial_cids.end());
}

// get delete_sign_column from source block if has
Expand Down Expand Up @@ -175,9 +178,9 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos,
row_pos, num_rows));
}

size_t max_normal_col_id = _normal_col_start_id + source_schema->num_visible_columns();
RETURN_IF_ERROR(_source_data_writer->fill_normal_columns(_column_writers, _normal_col_start_id,
max_normal_col_id, partial_cids));
size_t max_normal_col_id = _normal_col_start_id + normal_column_count;
RETURN_IF_ERROR(_source_data_writer->fill_normal_columns(
_column_writers, _normal_col_start_id, max_normal_col_id, row_binlog_partial_cids));

// We read historical rows only when we really need them:
// 1. partial update: build the full AFTER row.
Expand All @@ -197,9 +200,14 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos,

if (is_partial_update) {
std::vector<uint32_t> row_binlog_missing_column_ids;
_source_data_writer->filter_source_ids(
_binlog_opts.source.partial_update_info->missing_cids,
row_binlog_missing_column_ids);
row_binlog_missing_column_ids.reserve(
_binlog_opts.source.partial_update_info->missing_cids.size());
// Missing cids are source cids too. Only normal columns have AFTER writers.
for (uint32_t cid : _binlog_opts.source.partial_update_info->missing_cids) {
if (cid < normal_column_count) {
row_binlog_missing_column_ids.emplace_back(cid);
}
}

// build AFTER block (fill missing columns in full_block)
RETURN_IF_ERROR(_historical_data_writer->build_after_block(&full_block, row_pos, num_rows));
Expand All @@ -224,9 +232,9 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos,
DCHECK(!_tablet_schema->has_sequence_col());
// _converted_key_columns must be resized before fill binlog columns
_converted_key_columns.resize(_tablet_schema->num_key_columns());
for (size_t i = _normal_col_start_id; i < _tablet_schema->num_key_columns(); i++) {
_converted_key_columns[i] = _source_data_writer->get_converted_column(
cast_set<uint32_t>(i - _normal_col_start_id));
const auto& source_key_columns = _source_data_writer->source_key_columns();
for (size_t i = 0; i < source_key_columns.size(); ++i) {
_converted_key_columns[_normal_col_start_id + i] = source_key_columns[i];
}

std::vector<int64_t> no_operators = std::vector<int64_t> {};
Expand Down Expand Up @@ -373,7 +381,14 @@ Status RowBinlogSegmentWriter::_fill_before_columns(size_t num_rows) {
if (UNLIKELY(source_schema == nullptr)) {
return Status::InternalError("row binlog writer missing source_tablet_schema");
}
size_t value_column_num = source_schema->num_visible_value_columns();
std::vector<uint32_t> value_cids;
for (uint32_t cid = 0; cid < source_schema->num_columns(); ++cid) {
const auto& column = source_schema->column(cid);
if (column.visible() && !column.is_key()) {
value_cids.emplace_back(cid);
}
}
size_t value_column_num = value_cids.size();
if (value_column_num == 0) {
// No BEFORE columns in row binlog schema.
return Status::OK();
Expand All @@ -400,13 +415,6 @@ Status RowBinlogSegmentWriter::_fill_before_columns(size_t num_rows) {
} else {
DCHECK(_historical_data_writer != nullptr);

std::vector<uint32_t> value_cids;
uint32_t value_start = cast_set<uint32_t>(source_schema->num_key_columns());
uint32_t value_end = cast_set<uint32_t>(source_schema->num_visible_columns());
for (uint32_t cid = value_start; cid < value_end; ++cid) {
value_cids.emplace_back(cid);
}

DCHECK_EQ(before_cids.size(), value_cids.size());
RETURN_IF_ERROR(_historical_data_writer->build_before_block(&before_block, value_cids, 0,
num_rows));
Expand All @@ -430,48 +438,76 @@ Status RowBinlogSegmentWriter::_fill_before_columns(size_t num_rows) {

Status RowBinlogSourceDataWriter::init() {
_olap_data_convertor = std::make_unique<OlapBlockDataConvertor>();
// _normal_column_ids: the columns which we need to write into binlog from source block
if (UNLIKELY(_opt.source.tablet_schema == nullptr)) {
return Status::InternalError("row binlog writer missing source_tablet_schema");
}
for (uint32_t i = 0; i < _opt.source.tablet_schema->num_visible_columns(); i++) {
_normal_column_ids.emplace_back(i);

const auto& source_schema = _opt.source.tablet_schema;
// Row-binlog normal columns are a source-schema prefix: visible columns plus hidden key columns.
// Hidden non-key columns are only allowed after this prefix and are not written to row-binlog.
_normal_column_count = 0;
bool seen_hidden_non_key_column = false;
for (uint32_t cid = 0; cid < source_schema->num_columns(); ++cid) {
const auto& column = source_schema->column(cid);
if (!column.visible() && !column.is_key()) {
seen_hidden_non_key_column = true;
continue;
}
if (seen_hidden_non_key_column) {
return Status::InternalError(
"row binlog source schema has visible/key column after hidden non-key column, "
"column={}",
column.name());
}
++_normal_column_count;
}
_olap_data_convertor->reserve(_opt.source.tablet_schema->num_columns());
for (size_t cid = 0; cid < _opt.source.tablet_schema->num_columns(); cid++) {
_olap_data_convertor->add_column_data_convertor(_opt.source.tablet_schema->column(cid));
_olap_data_convertor->reserve(source_schema->num_columns());
for (size_t cid = 0; cid < source_schema->num_columns(); cid++) {
_olap_data_convertor->add_column_data_convertor(source_schema->column(cid));
}
return Status::OK();
}

Status RowBinlogSourceDataWriter::prepare_by_source_block(
const Block* block, size_t row_pos, size_t num_rows,
std::vector<uint32_t>& partial_source_cids, Block* full_block) {
_converted_columns.resize(_normal_column_ids.size());
TabletSchemaSPtr tablet_schema = _opt.source.tablet_schema;
// OlapBlockDataConvertor and historical lookup are indexed by source cid, so keep
// converted columns sparse over the source schema instead of row-binlog normal order.
_converted_columns.assign(tablet_schema->num_columns(), nullptr);

// LOG(INFO) << block->dump_data(0, num_rows);

// convert column data from engine format to storage layer format
size_t col_pos_in_block = 0;
TabletSchemaSPtr tablet_schema = _opt.source.tablet_schema;
const auto& including_cids =
partial_source_cids.empty() ? _normal_column_ids : partial_source_cids;
for (auto& cid : including_cids) {
const ColumnWithTypeAndName& col = block->get_by_position(col_pos_in_block++);
const bool is_partial_update = !partial_source_cids.empty();
const size_t column_count =
is_partial_update ? partial_source_cids.size() : _normal_column_count;
for (size_t ordinal = 0; ordinal < column_count; ++ordinal) {
const uint32_t source_cid =
is_partial_update ? partial_source_cids[ordinal] : cast_set<uint32_t>(ordinal);
DCHECK_LT(source_cid, tablet_schema->num_columns());
const ColumnWithTypeAndName& col =
block->get_by_position(is_partial_update ? col_pos_in_block++ : source_cid);

RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
col, row_pos, num_rows, cid));
col, row_pos, num_rows, source_cid));
// olap data convertor alway start from id = 0
auto converted_result = _olap_data_convertor->convert_column_data(cid);
auto converted_result = _olap_data_convertor->convert_column_data(source_cid);
if (!converted_result.first.ok()) {
return converted_result.first;
}
_converted_columns[cid] = converted_result.second;
_converted_columns[source_cid] = converted_result.second;

if (cid < tablet_schema->num_key_columns()) {
_key_columns.push_back(converted_result.second);
full_block->replace_by_position(source_cid, col.column);
}
for (uint32_t cid = 0; cid < _normal_column_count; ++cid) {
if (!tablet_schema->column(cid).is_key()) {
continue;
}
full_block->replace_by_position(cid, col.column);
DCHECK_LT(cid, _converted_columns.size());
DCHECK(_converted_columns[cid] != nullptr);
_key_columns.push_back(_converted_columns[cid]);
}
_num_rows = num_rows;

Expand All @@ -494,14 +530,18 @@ Status RowBinlogSourceDataWriter::prepare_seq_column(const ColumnWithTypeAndName
Status RowBinlogSourceDataWriter::fill_normal_columns(
std::vector<std::unique_ptr<ColumnWriter>>& column_writers, size_t start, size_t end,
std::vector<uint32_t>& partial_source_cids) {
DCHECK_EQ(end - start, _normal_column_ids.size());

const auto& including_cids =
partial_source_cids.empty() ? _normal_column_ids : partial_source_cids;
for (size_t cid : including_cids) {
DCHECK_EQ(end - start, _normal_column_count);

const bool is_partial_update = !partial_source_cids.empty();
const size_t column_count =
is_partial_update ? partial_source_cids.size() : _normal_column_count;
for (size_t ordinal = 0; ordinal < column_count; ++ordinal) {
size_t cid = is_partial_update ? partial_source_cids[ordinal] : ordinal;
DCHECK_LT(cid, _normal_column_count);
DCHECK(column_writers[start + cid]->get_column()->type() ==
_opt.source.tablet_schema->columns()[cid]->type())
<< cid;
DCHECK(_converted_columns[cid] != nullptr);
RETURN_IF_ERROR(column_writers[start + cid]->append(_converted_columns[cid]->get_nullmap(),
_converted_columns[cid]->get_data(),
_num_rows));
Expand Down
10 changes: 2 additions & 8 deletions be/src/storage/segment/row_binlog_segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,19 @@ class RowBinlogSourceDataWriter {

void clear();

IOlapColumnDataAccessor* get_converted_column(uint32_t cid) { return _converted_columns[cid]; }

bool need_before() const { return _opt.write_before; }

const std::vector<IOlapColumnDataAccessor*>& source_key_columns() const { return _key_columns; }
const IOlapColumnDataAccessor* seq_column() const { return _seq_column; }

std::unique_ptr<OlapBlockDataConvertor>& olap_data_convertor() { return _olap_data_convertor; }

void filter_source_ids(std::vector<uint32_t>& full_cids, std::vector<uint32_t>& res_cids) {
res_cids.reserve(full_cids.size());
std::set_intersection(_normal_column_ids.begin(), _normal_column_ids.end(),
full_cids.begin(), full_cids.end(), std::back_inserter(res_cids));
}
size_t normal_column_count() const { return _normal_column_count; }

private:
const SegmentWriteBinlogOptions& _opt;
std::unique_ptr<OlapBlockDataConvertor> _olap_data_convertor;
std::vector<uint32_t> _normal_column_ids;
size_t _normal_column_count = 0;
std::vector<IOlapColumnDataAccessor*> _converted_columns;
size_t _num_rows = 0;

Expand Down
120 changes: 120 additions & 0 deletions be/test/storage/segment/row_binlog_source_data_writer_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include <memory>
#include <string>
#include <vector>

#include "core/field.h"
#include "core/value/decimalv2_value.h"
#include "storage/binlog.h"
#include "storage/iterator/olap_data_convertor.h"
#include "storage/segment/row_binlog_segment_writer.h"
#include "storage/tablet/tablet_schema.h"

namespace doris::segment_v2 {

namespace {

TabletColumn create_column(int32_t unique_id, const std::string& name, FieldType type, bool is_key,
bool visible) {
TabletColumn column(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, type, !is_key);
column.set_unique_id(unique_id);
column.set_name(name);
column.set_is_key(is_key);
column.set_length(type == FieldType::OLAP_FIELD_TYPE_LARGEINT ? 16 : 4);
column.set_index_length(column.length());
column._visible = visible;
return column;
}

TabletSchemaSPtr create_source_schema() {
auto schema = std::make_shared<TabletSchema>();
schema->append_column(create_column(0, "k1", FieldType::OLAP_FIELD_TYPE_INT, true, true));
schema->append_column(create_column(1, "__DORIS_TEST_HIDDEN_KEY__",
FieldType::OLAP_FIELD_TYPE_LARGEINT, true, false));
schema->append_column(create_column(2, "v1", FieldType::OLAP_FIELD_TYPE_INT, false, true));
schema->append_column(create_column(3, "__DORIS_TEST_HIDDEN_VALUE__",
FieldType::OLAP_FIELD_TYPE_INT, false, false));
schema->_keys_type = UNIQUE_KEYS;
return schema;
}

Block create_source_block(const TabletSchemaSPtr& schema) {
Block block = schema->create_block();
auto columns_guard = block.mutate_columns_scoped();
auto& columns = columns_guard.mutable_columns();
for (int i = 0; i < 2; ++i) {
columns[0]->insert(Field::create_field<TYPE_INT>(10 + i));
columns[1]->insert(Field::create_field<TYPE_LARGEINT>(static_cast<int128_t>(1000 + i)));
columns[2]->insert(Field::create_field<TYPE_INT>(100 + i));
columns[3]->insert(Field::create_field<TYPE_INT>(10000 + i));
}
return block;
}

} // namespace

TEST(RowBinlogSourceDataWriterTest, collectHiddenKeyInNormalPrefix) {
auto source_schema = create_source_schema();
SegmentWriteBinlogOptions options;
options.source.tablet_schema = source_schema;

RowBinlogSourceDataWriter writer(options);
ASSERT_TRUE(writer.init().ok());
EXPECT_EQ(3, writer.normal_column_count());

Block block = create_source_block(source_schema);
Block full_block = source_schema->create_block();
std::vector<uint32_t> partial_source_cids;
ASSERT_TRUE(
writer.prepare_by_source_block(&block, 0, 2, partial_source_cids, &full_block).ok());

const auto& key_columns = writer.source_key_columns();
ASSERT_EQ(2, key_columns.size());

ASSERT_NE(nullptr, key_columns[0]->get_data_at(1));
EXPECT_EQ(11, *reinterpret_cast<const int32_t*>(key_columns[0]->get_data_at(1)));

ASSERT_NE(nullptr, key_columns[1]->get_data_at(1));
EXPECT_EQ(static_cast<int128_t>(1001),
*reinterpret_cast<const int128_t*>(key_columns[1]->get_data_at(1)));
}

TEST(RowBinlogSourceDataWriterTest, rejectVisibleColumnAfterHiddenNonKeyColumn) {
auto source_schema = std::make_shared<TabletSchema>();
source_schema->append_column(
create_column(0, "k1", FieldType::OLAP_FIELD_TYPE_INT, true, true));
source_schema->append_column(create_column(1, "__DORIS_TEST_HIDDEN_VALUE__",
FieldType::OLAP_FIELD_TYPE_INT, false, false));
source_schema->append_column(
create_column(2, "v1", FieldType::OLAP_FIELD_TYPE_INT, false, true));
source_schema->_keys_type = UNIQUE_KEYS;

SegmentWriteBinlogOptions options;
options.source.tablet_schema = source_schema;

RowBinlogSourceDataWriter writer(options);
auto status = writer.init();
EXPECT_FALSE(status.ok());
EXPECT_NE(std::string::npos,
status.to_string().find("visible/key column after hidden non-key"));
}

} // namespace doris::segment_v2
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,9 @@ public boolean processAddColumns(AddColumnsOp addColumnsOp, OlapTable olapTable,
private void addColumnRowBinlog(List<Column> rowBinlogSchema, Column newColumn, ColumnPosition columnPos,
Set<String> newColNameSet, boolean needHistoricalValue,
IntSupplier columnUniqueIdSupplier) throws DdlException {
if (!newColumn.isVisible()) {
// row binlog schema is generated from visible columns only, so schema change must not
// sync hidden system columns such as sequence/delete/version/skip-bitmap columns.
if (!newColumn.isVisible() && !newColumn.isKey()) {
// Row-binlog writes visible columns plus hidden key columns. Skip hidden non-key
// system columns such as sequence/delete/version/skip-bitmap columns.
return;
}

Expand Down
Loading
Loading