diff --git a/be/src/storage/segment/row_binlog_segment_writer.cpp b/be/src/storage/segment/row_binlog_segment_writer.cpp index 575eb10821c204..6f9edfc0c1236a 100644 --- a/be/src/storage/segment/row_binlog_segment_writer.cpp +++ b/be/src/storage/segment/row_binlog_segment_writer.cpp @@ -70,7 +70,7 @@ Status RowBinlogSegmentWriter::init() { _binlog_col_start_id = static_cast(lsn_col_id); _normal_col_start_id = lsn_col_id == 0 ? BINLOG_COLNUM : 0; - uint32_t normal_col_num = cast_set(source_schema->num_visible_columns()); + uint32_t normal_col_num = cast_set(_source_data_writer->normal_column_count()); _before_col_start_id = _normal_col_start_id + normal_col_num; if (!_write_before && _tablet_schema->num_columns() > normal_col_num + BINLOG_COLNUM) { @@ -111,6 +111,7 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos, if (UNLIKELY(source_schema == nullptr)) { return Status::InternalError("binlog writer missing source_tablet_schema"); } + const size_t normal_column_count = _source_data_writer->normal_column_count(); bool is_partial_update = _binlog_opts.source.partial_update_info && _binlog_opts.source.partial_update_info->is_partial_update() && @@ -119,6 +120,7 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos, std::vector partial_cids = is_partial_update ? _binlog_opts.source.partial_update_info->update_cids : std::vector(); + std::vector row_binlog_partial_cids = partial_cids; if (is_partial_update) { if (block->columns() <= source_schema->num_key_columns() || block->columns() >= source_schema->num_columns()) { @@ -129,11 +131,12 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos, _tablet_schema->num_columns())); } - // binlog don't need invisible column - auto erase_invisible_col = std::remove_if( - partial_cids.begin(), partial_cids.end(), - [&](uint32_t cid) { return cid >= source_schema->num_visible_columns(); }); - partial_cids.erase(erase_invisible_col, partial_cids.end()); + // Partial update lists source cids. Row-binlog normal columns are a source-schema prefix; + // keep only normal columns and skip trailing hidden internal columns. + auto erase_non_normal_col = + std::remove_if(row_binlog_partial_cids.begin(), row_binlog_partial_cids.end(), + [&](uint32_t cid) { return cid >= normal_column_count; }); + row_binlog_partial_cids.erase(erase_non_normal_col, row_binlog_partial_cids.end()); } // get delete_sign_column from source block if has @@ -175,9 +178,9 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos, row_pos, num_rows)); } - size_t max_normal_col_id = _normal_col_start_id + source_schema->num_visible_columns(); - RETURN_IF_ERROR(_source_data_writer->fill_normal_columns(_column_writers, _normal_col_start_id, - max_normal_col_id, partial_cids)); + size_t max_normal_col_id = _normal_col_start_id + normal_column_count; + RETURN_IF_ERROR(_source_data_writer->fill_normal_columns( + _column_writers, _normal_col_start_id, max_normal_col_id, row_binlog_partial_cids)); // We read historical rows only when we really need them: // 1. partial update: build the full AFTER row. @@ -197,9 +200,14 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos, if (is_partial_update) { std::vector row_binlog_missing_column_ids; - _source_data_writer->filter_source_ids( - _binlog_opts.source.partial_update_info->missing_cids, - row_binlog_missing_column_ids); + row_binlog_missing_column_ids.reserve( + _binlog_opts.source.partial_update_info->missing_cids.size()); + // Missing cids are source cids too. Only normal columns have AFTER writers. + for (uint32_t cid : _binlog_opts.source.partial_update_info->missing_cids) { + if (cid < normal_column_count) { + row_binlog_missing_column_ids.emplace_back(cid); + } + } // build AFTER block (fill missing columns in full_block) RETURN_IF_ERROR(_historical_data_writer->build_after_block(&full_block, row_pos, num_rows)); @@ -224,9 +232,9 @@ Status RowBinlogSegmentWriter::append_block(const Block* block, size_t row_pos, DCHECK(!_tablet_schema->has_sequence_col()); // _converted_key_columns must be resized before fill binlog columns _converted_key_columns.resize(_tablet_schema->num_key_columns()); - for (size_t i = _normal_col_start_id; i < _tablet_schema->num_key_columns(); i++) { - _converted_key_columns[i] = _source_data_writer->get_converted_column( - cast_set(i - _normal_col_start_id)); + const auto& source_key_columns = _source_data_writer->source_key_columns(); + for (size_t i = 0; i < source_key_columns.size(); ++i) { + _converted_key_columns[_normal_col_start_id + i] = source_key_columns[i]; } std::vector no_operators = std::vector {}; @@ -373,7 +381,14 @@ Status RowBinlogSegmentWriter::_fill_before_columns(size_t num_rows) { if (UNLIKELY(source_schema == nullptr)) { return Status::InternalError("row binlog writer missing source_tablet_schema"); } - size_t value_column_num = source_schema->num_visible_value_columns(); + std::vector value_cids; + for (uint32_t cid = 0; cid < source_schema->num_columns(); ++cid) { + const auto& column = source_schema->column(cid); + if (column.visible() && !column.is_key()) { + value_cids.emplace_back(cid); + } + } + size_t value_column_num = value_cids.size(); if (value_column_num == 0) { // No BEFORE columns in row binlog schema. return Status::OK(); @@ -400,13 +415,6 @@ Status RowBinlogSegmentWriter::_fill_before_columns(size_t num_rows) { } else { DCHECK(_historical_data_writer != nullptr); - std::vector value_cids; - uint32_t value_start = cast_set(source_schema->num_key_columns()); - uint32_t value_end = cast_set(source_schema->num_visible_columns()); - for (uint32_t cid = value_start; cid < value_end; ++cid) { - value_cids.emplace_back(cid); - } - DCHECK_EQ(before_cids.size(), value_cids.size()); RETURN_IF_ERROR(_historical_data_writer->build_before_block(&before_block, value_cids, 0, num_rows)); @@ -430,16 +438,32 @@ Status RowBinlogSegmentWriter::_fill_before_columns(size_t num_rows) { Status RowBinlogSourceDataWriter::init() { _olap_data_convertor = std::make_unique(); - // _normal_column_ids: the columns which we need to write into binlog from source block if (UNLIKELY(_opt.source.tablet_schema == nullptr)) { return Status::InternalError("row binlog writer missing source_tablet_schema"); } - for (uint32_t i = 0; i < _opt.source.tablet_schema->num_visible_columns(); i++) { - _normal_column_ids.emplace_back(i); + + const auto& source_schema = _opt.source.tablet_schema; + // Row-binlog normal columns are a source-schema prefix: visible columns plus hidden key columns. + // Hidden non-key columns are only allowed after this prefix and are not written to row-binlog. + _normal_column_count = 0; + bool seen_hidden_non_key_column = false; + for (uint32_t cid = 0; cid < source_schema->num_columns(); ++cid) { + const auto& column = source_schema->column(cid); + if (!column.visible() && !column.is_key()) { + seen_hidden_non_key_column = true; + continue; + } + if (seen_hidden_non_key_column) { + return Status::InternalError( + "row binlog source schema has visible/key column after hidden non-key column, " + "column={}", + column.name()); + } + ++_normal_column_count; } - _olap_data_convertor->reserve(_opt.source.tablet_schema->num_columns()); - for (size_t cid = 0; cid < _opt.source.tablet_schema->num_columns(); cid++) { - _olap_data_convertor->add_column_data_convertor(_opt.source.tablet_schema->column(cid)); + _olap_data_convertor->reserve(source_schema->num_columns()); + for (size_t cid = 0; cid < source_schema->num_columns(); cid++) { + _olap_data_convertor->add_column_data_convertor(source_schema->column(cid)); } return Status::OK(); } @@ -447,31 +471,43 @@ Status RowBinlogSourceDataWriter::init() { Status RowBinlogSourceDataWriter::prepare_by_source_block( const Block* block, size_t row_pos, size_t num_rows, std::vector& partial_source_cids, Block* full_block) { - _converted_columns.resize(_normal_column_ids.size()); + TabletSchemaSPtr tablet_schema = _opt.source.tablet_schema; + // OlapBlockDataConvertor and historical lookup are indexed by source cid, so keep + // converted columns sparse over the source schema instead of row-binlog normal order. + _converted_columns.assign(tablet_schema->num_columns(), nullptr); // LOG(INFO) << block->dump_data(0, num_rows); // convert column data from engine format to storage layer format size_t col_pos_in_block = 0; - TabletSchemaSPtr tablet_schema = _opt.source.tablet_schema; - const auto& including_cids = - partial_source_cids.empty() ? _normal_column_ids : partial_source_cids; - for (auto& cid : including_cids) { - const ColumnWithTypeAndName& col = block->get_by_position(col_pos_in_block++); + const bool is_partial_update = !partial_source_cids.empty(); + const size_t column_count = + is_partial_update ? partial_source_cids.size() : _normal_column_count; + for (size_t ordinal = 0; ordinal < column_count; ++ordinal) { + const uint32_t source_cid = + is_partial_update ? partial_source_cids[ordinal] : cast_set(ordinal); + DCHECK_LT(source_cid, tablet_schema->num_columns()); + const ColumnWithTypeAndName& col = + block->get_by_position(is_partial_update ? col_pos_in_block++ : source_cid); RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column( - col, row_pos, num_rows, cid)); + col, row_pos, num_rows, source_cid)); // olap data convertor alway start from id = 0 - auto converted_result = _olap_data_convertor->convert_column_data(cid); + auto converted_result = _olap_data_convertor->convert_column_data(source_cid); if (!converted_result.first.ok()) { return converted_result.first; } - _converted_columns[cid] = converted_result.second; + _converted_columns[source_cid] = converted_result.second; - if (cid < tablet_schema->num_key_columns()) { - _key_columns.push_back(converted_result.second); + full_block->replace_by_position(source_cid, col.column); + } + for (uint32_t cid = 0; cid < _normal_column_count; ++cid) { + if (!tablet_schema->column(cid).is_key()) { + continue; } - full_block->replace_by_position(cid, col.column); + DCHECK_LT(cid, _converted_columns.size()); + DCHECK(_converted_columns[cid] != nullptr); + _key_columns.push_back(_converted_columns[cid]); } _num_rows = num_rows; @@ -494,14 +530,18 @@ Status RowBinlogSourceDataWriter::prepare_seq_column(const ColumnWithTypeAndName Status RowBinlogSourceDataWriter::fill_normal_columns( std::vector>& column_writers, size_t start, size_t end, std::vector& partial_source_cids) { - DCHECK_EQ(end - start, _normal_column_ids.size()); - - const auto& including_cids = - partial_source_cids.empty() ? _normal_column_ids : partial_source_cids; - for (size_t cid : including_cids) { + DCHECK_EQ(end - start, _normal_column_count); + + const bool is_partial_update = !partial_source_cids.empty(); + const size_t column_count = + is_partial_update ? partial_source_cids.size() : _normal_column_count; + for (size_t ordinal = 0; ordinal < column_count; ++ordinal) { + size_t cid = is_partial_update ? partial_source_cids[ordinal] : ordinal; + DCHECK_LT(cid, _normal_column_count); DCHECK(column_writers[start + cid]->get_column()->type() == _opt.source.tablet_schema->columns()[cid]->type()) << cid; + DCHECK(_converted_columns[cid] != nullptr); RETURN_IF_ERROR(column_writers[start + cid]->append(_converted_columns[cid]->get_nullmap(), _converted_columns[cid]->get_data(), _num_rows)); diff --git a/be/src/storage/segment/row_binlog_segment_writer.h b/be/src/storage/segment/row_binlog_segment_writer.h index bbd5e18cb1320c..370c8fc8c40681 100644 --- a/be/src/storage/segment/row_binlog_segment_writer.h +++ b/be/src/storage/segment/row_binlog_segment_writer.h @@ -44,8 +44,6 @@ class RowBinlogSourceDataWriter { void clear(); - IOlapColumnDataAccessor* get_converted_column(uint32_t cid) { return _converted_columns[cid]; } - bool need_before() const { return _opt.write_before; } const std::vector& source_key_columns() const { return _key_columns; } @@ -53,16 +51,12 @@ class RowBinlogSourceDataWriter { std::unique_ptr& olap_data_convertor() { return _olap_data_convertor; } - void filter_source_ids(std::vector& full_cids, std::vector& res_cids) { - res_cids.reserve(full_cids.size()); - std::set_intersection(_normal_column_ids.begin(), _normal_column_ids.end(), - full_cids.begin(), full_cids.end(), std::back_inserter(res_cids)); - } + size_t normal_column_count() const { return _normal_column_count; } private: const SegmentWriteBinlogOptions& _opt; std::unique_ptr _olap_data_convertor; - std::vector _normal_column_ids; + size_t _normal_column_count = 0; std::vector _converted_columns; size_t _num_rows = 0; diff --git a/be/test/storage/segment/row_binlog_source_data_writer_test.cpp b/be/test/storage/segment/row_binlog_source_data_writer_test.cpp new file mode 100644 index 00000000000000..a71d862f4967b4 --- /dev/null +++ b/be/test/storage/segment/row_binlog_source_data_writer_test.cpp @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include + +#include "core/field.h" +#include "core/value/decimalv2_value.h" +#include "storage/binlog.h" +#include "storage/iterator/olap_data_convertor.h" +#include "storage/segment/row_binlog_segment_writer.h" +#include "storage/tablet/tablet_schema.h" + +namespace doris::segment_v2 { + +namespace { + +TabletColumn create_column(int32_t unique_id, const std::string& name, FieldType type, bool is_key, + bool visible) { + TabletColumn column(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, type, !is_key); + column.set_unique_id(unique_id); + column.set_name(name); + column.set_is_key(is_key); + column.set_length(type == FieldType::OLAP_FIELD_TYPE_LARGEINT ? 16 : 4); + column.set_index_length(column.length()); + column._visible = visible; + return column; +} + +TabletSchemaSPtr create_source_schema() { + auto schema = std::make_shared(); + schema->append_column(create_column(0, "k1", FieldType::OLAP_FIELD_TYPE_INT, true, true)); + schema->append_column(create_column(1, "__DORIS_TEST_HIDDEN_KEY__", + FieldType::OLAP_FIELD_TYPE_LARGEINT, true, false)); + schema->append_column(create_column(2, "v1", FieldType::OLAP_FIELD_TYPE_INT, false, true)); + schema->append_column(create_column(3, "__DORIS_TEST_HIDDEN_VALUE__", + FieldType::OLAP_FIELD_TYPE_INT, false, false)); + schema->_keys_type = UNIQUE_KEYS; + return schema; +} + +Block create_source_block(const TabletSchemaSPtr& schema) { + Block block = schema->create_block(); + auto columns_guard = block.mutate_columns_scoped(); + auto& columns = columns_guard.mutable_columns(); + for (int i = 0; i < 2; ++i) { + columns[0]->insert(Field::create_field(10 + i)); + columns[1]->insert(Field::create_field(static_cast(1000 + i))); + columns[2]->insert(Field::create_field(100 + i)); + columns[3]->insert(Field::create_field(10000 + i)); + } + return block; +} + +} // namespace + +TEST(RowBinlogSourceDataWriterTest, collectHiddenKeyInNormalPrefix) { + auto source_schema = create_source_schema(); + SegmentWriteBinlogOptions options; + options.source.tablet_schema = source_schema; + + RowBinlogSourceDataWriter writer(options); + ASSERT_TRUE(writer.init().ok()); + EXPECT_EQ(3, writer.normal_column_count()); + + Block block = create_source_block(source_schema); + Block full_block = source_schema->create_block(); + std::vector partial_source_cids; + ASSERT_TRUE( + writer.prepare_by_source_block(&block, 0, 2, partial_source_cids, &full_block).ok()); + + const auto& key_columns = writer.source_key_columns(); + ASSERT_EQ(2, key_columns.size()); + + ASSERT_NE(nullptr, key_columns[0]->get_data_at(1)); + EXPECT_EQ(11, *reinterpret_cast(key_columns[0]->get_data_at(1))); + + ASSERT_NE(nullptr, key_columns[1]->get_data_at(1)); + EXPECT_EQ(static_cast(1001), + *reinterpret_cast(key_columns[1]->get_data_at(1))); +} + +TEST(RowBinlogSourceDataWriterTest, rejectVisibleColumnAfterHiddenNonKeyColumn) { + auto source_schema = std::make_shared(); + source_schema->append_column( + create_column(0, "k1", FieldType::OLAP_FIELD_TYPE_INT, true, true)); + source_schema->append_column(create_column(1, "__DORIS_TEST_HIDDEN_VALUE__", + FieldType::OLAP_FIELD_TYPE_INT, false, false)); + source_schema->append_column( + create_column(2, "v1", FieldType::OLAP_FIELD_TYPE_INT, false, true)); + source_schema->_keys_type = UNIQUE_KEYS; + + SegmentWriteBinlogOptions options; + options.source.tablet_schema = source_schema; + + RowBinlogSourceDataWriter writer(options); + auto status = writer.init(); + EXPECT_FALSE(status.ok()); + EXPECT_NE(std::string::npos, + status.to_string().find("visible/key column after hidden non-key")); +} + +} // namespace doris::segment_v2 diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 1aea8b9e074a4e..04682441cba3b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -322,9 +322,9 @@ public boolean processAddColumns(AddColumnsOp addColumnsOp, OlapTable olapTable, private void addColumnRowBinlog(List rowBinlogSchema, Column newColumn, ColumnPosition columnPos, Set newColNameSet, boolean needHistoricalValue, IntSupplier columnUniqueIdSupplier) throws DdlException { - if (!newColumn.isVisible()) { - // row binlog schema is generated from visible columns only, so schema change must not - // sync hidden system columns such as sequence/delete/version/skip-bitmap columns. + if (!newColumn.isVisible() && !newColumn.isKey()) { + // Row-binlog writes visible columns plus hidden key columns. Skip hidden non-key + // system columns such as sequence/delete/version/skip-bitmap columns. return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 970999ef80f50e..71124ca18ffc93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2425,7 +2425,15 @@ public List generateTableRowBinlogSchema() { boolean needHistoricalValue = getBinlogConfig().getNeedHistoricalValue(); List beforeColumns = new ArrayList<>(); - for (Column column : getBaseSchema(false)) { + boolean seenHiddenNonKeyColumn = false; + for (Column column : getBaseSchema(true)) { + if (!column.isVisible() && !column.isKey()) { + seenHiddenNonKeyColumn = true; + continue; + } + Preconditions.checkState(!seenHiddenNonKeyColumn, + "binlog does not support visible/key column after hidden non-key column: " + + column.getName()); Preconditions.checkState(!column.getType().isVariantType(), "binlog does not support VARIANT column: " + column.getName()); Preconditions.checkState(!column.isAutoInc(), diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeHandlerTest.java index 95b4cf9145f1b5..330c01a8696706 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeHandlerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/SchemaChangeHandlerTest.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.info.ColumnPosition; import org.apache.doris.catalog.info.IndexType; @@ -37,6 +38,7 @@ import org.apache.doris.qe.StmtExecutor; import org.apache.doris.utframe.TestWithFeService; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; @@ -46,8 +48,11 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.lang.reflect.Method; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntSupplier; import java.util.stream.Collectors; public class SchemaChangeHandlerTest extends TestWithFeService { @@ -884,6 +889,42 @@ public void testAddValueColumnOnAggMV() { } + @Test + public void testRowBinlogSchemaChangeKeepsHiddenKeyColumn() throws Exception { + SchemaChangeHandler schemaChangeHandler = new SchemaChangeHandler(); + List rowBinlogSchema = Lists.newArrayList(); + Column key = new Column("k1", PrimitiveType.INT); + key.setIsKey(true); + rowBinlogSchema.add(Column.generateRowBinlogKeyColumn(key)); + rowBinlogSchema.add(new Column(Column.BINLOG_LSN_COL, PrimitiveType.LARGEINT)); + rowBinlogSchema.add(new Column(Column.BINLOG_OPERATION_COL, PrimitiveType.BIGINT)); + rowBinlogSchema.add(new Column(Column.BINLOG_TIMESTAMP_COL, PrimitiveType.BIGINT)); + + Column hiddenKey = new Column("__DORIS_TEST_HIDDEN_KEY__", PrimitiveType.BIGINT); + hiddenKey.setIsKey(true); + hiddenKey.setIsVisible(false); + AtomicInteger uniqueId = new AtomicInteger(10); + IntSupplier uniqueIdSupplier = uniqueId::getAndIncrement; + Method addColumnRowBinlog = SchemaChangeHandler.class.getDeclaredMethod( + "addColumnRowBinlog", List.class, Column.class, ColumnPosition.class, + java.util.Set.class, boolean.class, IntSupplier.class); + addColumnRowBinlog.setAccessible(true); + addColumnRowBinlog.invoke(schemaChangeHandler, rowBinlogSchema, hiddenKey, null, + Sets.newHashSet(hiddenKey.getName()), false, uniqueIdSupplier); + + List columnNames = rowBinlogSchema.stream().map(Column::getName).collect(Collectors.toList()); + Assertions.assertEquals(1, columnNames.indexOf("__DORIS_TEST_HIDDEN_KEY__")); + Assertions.assertEquals(2, columnNames.indexOf(Column.BINLOG_LSN_COL)); + + Column hiddenValue = new Column("__DORIS_TEST_HIDDEN_VALUE__", PrimitiveType.INT); + hiddenValue.setIsKey(false); + hiddenValue.setIsVisible(false); + addColumnRowBinlog.invoke(schemaChangeHandler, rowBinlogSchema, hiddenValue, null, + Sets.newHashSet(hiddenValue.getName()), false, uniqueIdSupplier); + columnNames = rowBinlogSchema.stream().map(Column::getName).collect(Collectors.toList()); + Assertions.assertFalse(columnNames.contains("__DORIS_TEST_HIDDEN_VALUE__")); + } + @Test public void testAggAddOrDropInvertedIndex() throws Exception { LOG.info("dbName: {}", Env.getCurrentInternalCatalog().getDbNames()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableRowBinlogSchemaTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableRowBinlogSchemaTest.java index f6130782d73b19..36414380d59167 100755 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableRowBinlogSchemaTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableRowBinlogSchemaTest.java @@ -30,12 +30,15 @@ public class OlapTableRowBinlogSchemaTest { private static OlapTable newTestTable(BinlogConfig binlogConfig) { - long baseIndexId = 1L; Column key = new Column("k1", PrimitiveType.INT); key.setIsKey(true); Column value = new Column("v1", PrimitiveType.INT); value.setIsKey(false); - List baseSchema = Lists.newArrayList(key, value); + return newTestTable(binlogConfig, Lists.newArrayList(key, value)); + } + + private static OlapTable newTestTable(BinlogConfig binlogConfig, List baseSchema) { + long baseIndexId = 1L; // Construct a minimal olap table for row binlog schema generation. OlapTable table = new OlapTable(1L, "tbl", baseSchema, KeysType.PRIMARY_KEYS, null, null); @@ -89,4 +92,48 @@ public void testRowBinlogSchemaOnDisable() { Assertions.assertFalse(table.needRowBinlog()); Assertions.assertTrue(table.getBaseIndexMeta().getRowBinlogIndexId() <= 0); } + + @Test + public void testRowBinlogSchemaIncludesHiddenKeyColumns() { + Column key = new Column("k1", PrimitiveType.INT); + key.setIsKey(true); + Column hiddenKey = new Column("__DORIS_TEST_HIDDEN_KEY__", PrimitiveType.BIGINT); + hiddenKey.setIsKey(true); + hiddenKey.setIsVisible(false); + Column value = new Column("v1", PrimitiveType.INT); + value.setIsKey(false); + Column hiddenValue = new Column("__DORIS_TEST_HIDDEN_VALUE__", PrimitiveType.INT); + hiddenValue.setIsKey(false); + hiddenValue.setIsVisible(false); + + OlapTable table = newTestTable(BinlogTestUtils.newTestRowBinlogConfig(true, true), + Lists.newArrayList(key, hiddenKey, value, hiddenValue)); + + List columnNames = table.getRowBinlogMeta().getSchema(true).stream().map(Column::getName) + .collect(Collectors.toList()); + Assertions.assertEquals("k1", columnNames.get(0)); + Assertions.assertEquals("__DORIS_TEST_HIDDEN_KEY__", columnNames.get(1)); + Assertions.assertEquals("v1", columnNames.get(2)); + Assertions.assertFalse(columnNames.contains("__DORIS_TEST_HIDDEN_VALUE__")); + Assertions.assertTrue(columnNames.contains(Column.generateBeforeColName("v1"))); + Assertions.assertFalse(columnNames.contains(Column.generateBeforeColName("__DORIS_TEST_HIDDEN_KEY__"))); + Assertions.assertEquals(4, columnNames.indexOf(Column.BINLOG_LSN_COL)); + } + + @Test + public void testRowBinlogSchemaRejectsVisibleColumnAfterHiddenNonKeyColumn() { + Column key = new Column("k1", PrimitiveType.INT); + key.setIsKey(true); + Column hiddenValue = new Column("__DORIS_TEST_HIDDEN_VALUE__", PrimitiveType.INT); + hiddenValue.setIsKey(false); + hiddenValue.setIsVisible(false); + Column value = new Column("v1", PrimitiveType.INT); + value.setIsKey(false); + + OlapTable table = newTestTable(BinlogTestUtils.newTestRowBinlogConfig(false, false), + Lists.newArrayList(key, hiddenValue, value)); + IllegalStateException exception = Assertions.assertThrows(IllegalStateException.class, + table::generateTableRowBinlogSchema); + Assertions.assertTrue(exception.getMessage().contains("visible/key column after hidden non-key column")); + } } diff --git a/regression-test/data/row_binlog_p0/test_row_binlog_hidden_column_schema.out b/regression-test/data/row_binlog_p0/test_row_binlog_hidden_column_schema.out new file mode 100644 index 00000000000000..dfaf78a63cf8df --- /dev/null +++ b/regression-test/data/row_binlog_p0/test_row_binlog_hidden_column_schema.out @@ -0,0 +1,5 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !row_binlog_hidden_non_key_schema -- +0 1 10 \N +1 1 20 10 + diff --git a/regression-test/suites/row_binlog_p0/test_row_binlog_hidden_column_schema.groovy b/regression-test/suites/row_binlog_p0/test_row_binlog_hidden_column_schema.groovy new file mode 100644 index 00000000000000..ab655f72b59c77 --- /dev/null +++ b/regression-test/suites/row_binlog_p0/test_row_binlog_hidden_column_schema.groovy @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_row_binlog_hidden_column_schema", "nonConcurrent") { + if (isCloudMode()) { + return + } + + sql "DROP TABLE IF EXISTS test_mow_seq_hidden_column_row_binlog FORCE" + + sql """ + CREATE TABLE test_mow_seq_hidden_column_row_binlog ( + k1 INT, + v1 INT + ) + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "binlog.enable" = "true", + "binlog.format" = "ROW", + "binlog.need_historical_value" = "true" + ) + """ + + sql """ALTER TABLE test_mow_seq_hidden_column_row_binlog + ENABLE FEATURE "SEQUENCE_LOAD" + WITH PROPERTIES ("function_column.sequence_type" = "int")""" + + sql """ + INSERT INTO test_mow_seq_hidden_column_row_binlog(k1, v1, __DORIS_SEQUENCE_COL__) + VALUES (1, 10, 1) + """ + sql """ + INSERT INTO test_mow_seq_hidden_column_row_binlog(k1, v1, __DORIS_SEQUENCE_COL__) + VALUES (1, 20, 2) + """ + sql "sync" + + qt_row_binlog_hidden_non_key_schema """ + SELECT __DORIS_BINLOG_OP__ AS op, + k1, + v1, + __BEFORE__v1__ + FROM binlog("table" = "test_mow_seq_hidden_column_row_binlog") + ORDER BY __DORIS_BINLOG_LSN__ + """ + + test { + sql """ + SELECT __DORIS_SEQUENCE_COL__ + FROM binlog("table" = "test_mow_seq_hidden_column_row_binlog") + """ + exception "Unknown column" + } +}