Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/GeneratedColumnPlaceholderBlockInputStream.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/MutableSupport.h>
Expand Down Expand Up @@ -94,14 +95,23 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef
return genNamesAndTypes(table_scan.getColumns(), column_prefix);
}

std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan)
std::tuple<DM::ColumnDefinesPtr, int, std::vector<std::tuple<UInt64, String, DataTypePtr>>> genColumnDefinesForDisaggregatedRead(
const TiDBTableScan & table_scan)
{
auto column_defines = std::make_shared<DM::ColumnDefines>();
int extra_table_id_index = MutSup::invalid_col_id;
column_defines->reserve(table_scan.getColumnSize());
std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
{
const auto & column_info = table_scan.getColumns()[i];
if (column_info.hasGeneratedColumnFlag())
{
const auto & data_type = getDataTypeByColumnInfoForComputingLayer(column_info);
const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i);
generated_column_infos.push_back(std::make_tuple(i, col_name, data_type));
continue;
}
// Now the upper level seems treat disagg read as an ExchangeReceiver output, so
// use this as output column prefix.
// Even if the id is pk_column or extra_table_id, we still output it as
Expand All @@ -117,10 +127,6 @@ std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const
break;
case MutSup::extra_table_id_col_id:
{
column_defines->emplace_back(DM::ColumnDefine{
MutSup::extra_table_id_col_id,
output_name, // MutSup::extra_table_id_column_name
MutSup::getExtraTableIdColumnType()});
extra_table_id_index = i;
break;
}
Expand All @@ -133,7 +139,7 @@ std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const
break;
}
}
return {std::move(column_defines), extra_table_id_index};
return {std::move(column_defines), extra_table_id_index, std::move(generated_column_infos)};
}

ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types)
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ NamesAndTypes genNamesAndTypes(const TiDB::ColumnInfos & column_infos, const Str
ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types);
NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema);

// The column defines and `extra table id index`
std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan);
// The column defines, `extra table id index` and `generated columns info` for disaggregated read.
std::tuple<DM::ColumnDefinesPtr, int, std::vector<std::tuple<UInt64, String, DataTypePtr>>> genColumnDefinesForDisaggregatedRead(
const TiDBTableScan & table_scan);

} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Storages/StorageDisaggregated.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class StorageDisaggregated : public IStorage
std::tuple<DM::RSOperatorPtr, DM::ColumnRangePtr> buildRSOperatorAndColumnRange(
const Context & db_context,
const DM::ColumnDefinesPtr & columns_to_read);
std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr> packSegmentReadTasks(
std::tuple<std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr>, DM::ColumnDefinesPtr> packSegmentReadTasks(
const Context & db_context,
DM::SegmentReadTasks && read_tasks,
const DM::ColumnDefinesPtr & column_defines,
Expand Down Expand Up @@ -161,5 +161,7 @@ class StorageDisaggregated : public IStorage
std::unique_ptr<DAGExpressionAnalyzer> analyzer;
static constexpr auto ZONE_LABEL_KEY = "zone";
std::optional<String> zone_label;
// For generated column, just need a placeholder, and TiDB will fill this column.
std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
};
} // namespace DB
101 changes: 60 additions & 41 deletions dbms/src/Storages/StorageDisaggregatedRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ BlockInputStreams StorageDisaggregated::readThroughS3(const Context & db_context
num_streams,
pipeline,
scan_context);
// handle generated column if necessary.
executeGeneratedColumnPlaceholder(generated_column_infos, log, pipeline);

NamesAndTypes source_columns;
source_columns.reserve(table_scan.getColumnSize());
Expand Down Expand Up @@ -149,6 +151,8 @@ void StorageDisaggregated::readThroughS3(
buildReadTaskWithBackoff(db_context, scan_context),
num_streams,
scan_context);
// handle generated column if necessary.
executeGeneratedColumnPlaceholder(exec_context, group_builder, generated_column_infos, log);

NamesAndTypes source_columns;
auto header = group_builder.getCurrentHeader();
Expand Down Expand Up @@ -609,13 +613,14 @@ std::tuple<DM::RSOperatorPtr, DM::ColumnRangePtr> StorageDisaggregated::buildRSO
return {rs_operator, column_range};
}

std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr> StorageDisaggregated::packSegmentReadTasks(
const Context & db_context,
DM::SegmentReadTasks && read_tasks,
const DM::ColumnDefinesPtr & column_defines,
const DM::ScanContextPtr & scan_context,
size_t num_streams,
int extra_table_id_index)
std::tuple<std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr>, DM::ColumnDefinesPtr> StorageDisaggregated::
packSegmentReadTasks(
const Context & db_context,
DM::SegmentReadTasks && read_tasks,
const DM::ColumnDefinesPtr & column_defines,
const DM::ScanContextPtr & scan_context,
size_t num_streams,
int extra_table_id_index)
{
const auto & executor_id = table_scan.getTableScanExecutorID();

Expand Down Expand Up @@ -651,53 +656,61 @@ std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr> StorageDisagg
scan_context->read_mode = read_mode;
const UInt64 start_ts = sender_target_mpp_task_id.gather_id.query_id.start_ts;
const auto enable_read_thread = db_context.getSettingsRef().dt_enable_read_thread;
const auto & final_columns_defines = push_down_executor && push_down_executor->extra_cast
? push_down_executor->columns_after_cast
: column_defines;
RUNTIME_CHECK(num_streams > 0, num_streams);
LOG_INFO(
log,
"packSegmentReadTasks: enable_read_thread={} read_mode={} is_fast_scan={} keep_order={} task_count={} "
"num_streams={} column_defines={}",
"num_streams={} column_defines={} final_columns_defines={}",
enable_read_thread,
magic_enum::enum_name(read_mode),
table_scan.isFastScan(),
table_scan.keepOrder(),
read_tasks.size(),
num_streams,
*column_defines);
*column_defines,
*final_columns_defines);

if (enable_read_thread)
{
// Under disagg arch, now we use blocking IO to read data from cloud storage. So it require more active
// segments to fully utilize the read threads.
const size_t read_thread_num_active_seg = 10 * num_streams;
return std::make_shared<DM::SegmentReadTaskPool>(
extra_table_id_index,
*column_defines,
push_down_executor,
start_ts,
db_context.getSettingsRef().max_block_size,
read_mode,
std::move(read_tasks),
/*after_segment_read*/ [](const DM::DMContextPtr &, const DM::SegmentPtr &) {},
log->identifier(),
/*enable_read_thread*/ true,
num_streams,
read_thread_num_active_seg,
context.getDAGContext()->getKeyspaceID(),
context.getDAGContext()->getResourceGroupName());
return {
std::make_shared<DM::SegmentReadTaskPool>(
extra_table_id_index,
*final_columns_defines,
push_down_executor,
start_ts,
db_context.getSettingsRef().max_block_size,
read_mode,
std::move(read_tasks),
/*after_segment_read*/ [](const DM::DMContextPtr &, const DM::SegmentPtr &) {},
log->identifier(),
/*enable_read_thread*/ true,
num_streams,
read_thread_num_active_seg,
context.getDAGContext()->getKeyspaceID(),
context.getDAGContext()->getResourceGroupName()),
final_columns_defines};
}
else
{
return DM::Remote::RNWorkers::create(
db_context,
std::move(read_tasks),
{
.log = log->getChild(executor_id),
.columns_to_read = column_defines,
.start_ts = start_ts,
.push_down_executor = push_down_executor,
.read_mode = read_mode,
},
num_streams);
return {
DM::Remote::RNWorkers::create(
db_context,
std::move(read_tasks),
{
.log = log->getChild(executor_id),
.columns_to_read = final_columns_defines,
.start_ts = start_ts,
.push_down_executor = push_down_executor,
.read_mode = read_mode,
},
num_streams),
final_columns_defines};
}
}

Expand Down Expand Up @@ -739,8 +752,11 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams(
const DM::ScanContextPtr & scan_context)
{
// Build the input streams to read blocks from remote segments
auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan);
auto packed_read_tasks = packSegmentReadTasks(
DM::ColumnDefinesPtr column_defines;
int extra_table_id_index;
std::tie(column_defines, extra_table_id_index, generated_column_infos)
= genColumnDefinesForDisaggregatedRead(table_scan);
auto [packed_read_tasks, final_column_defines] = packSegmentReadTasks(
db_context,
std::move(read_tasks),
column_defines,
Expand All @@ -751,7 +767,7 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams(

InputStreamBuilder builder{
.tracing_id = log->identifier(),
.columns_to_read = column_defines,
.columns_to_read = final_column_defines,
.extra_table_id_index = extra_table_id_index,
};
for (size_t stream_idx = 0; stream_idx < num_streams; ++stream_idx)
Expand Down Expand Up @@ -810,8 +826,11 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps(
const DM::ScanContextPtr & scan_context)
{
// Build the input streams to read blocks from remote segments
auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead(table_scan);
auto packed_read_tasks = packSegmentReadTasks(
DM::ColumnDefinesPtr column_defines;
int extra_table_id_index;
std::tie(column_defines, extra_table_id_index, generated_column_infos)
= genColumnDefinesForDisaggregatedRead(table_scan);
auto [packed_read_tasks, final_column_defines] = packSegmentReadTasks(
db_context,
std::move(read_tasks),
column_defines,
Expand All @@ -821,7 +840,7 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps(

SourceOpBuilder builder{
.tracing_id = log->identifier(),
.column_defines = column_defines,
.column_defines = final_column_defines,
.extra_table_id_index = extra_table_id_index,
.exec_context = exec_context,
};
Expand Down
2 changes: 2 additions & 0 deletions tests/docker/next-gen-config/tidb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ disaggregated-tiflash = true
# Now tests are ran on the SYSTEM keyspace tidb.
keyspace-name = "SYSTEM"

tikv-worker-url = "tikv-worker0:19000"

enable-telemetry = false
temp-dir = "/data/tmp"
[performance]
Expand Down
28 changes: 28 additions & 0 deletions tests/docker/next-gen-config/tikv-worker.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2025 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# object storage for next-gen
[dfs]
prefix = "tikv"
s3-endpoint = "http://minio0:9000"
s3-key-id = "minioadmin"
s3-secret-key = "minioadmin"
s3-bucket = "tiflash-test"
s3-region = "local"

[schema-manager]
dir = "/data/schemas"
enabled = true
keyspace-refresh-interval = "10s"
schema-refresh-threshold = 1
1 change: 1 addition & 0 deletions tests/docker/next-gen-config/tikv.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ reserve-space = "0"
# Enable keyspace and ttl for next-gen
api-version = 2
enable-ttl = true
low-space-threshold = 0

[raftstore]
capacity = "100GB"
Expand Down
13 changes: 13 additions & 0 deletions tests/docker/next-gen-yaml/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ services:
- "pd0"
- "minio0"
restart: on-failure
tikv-worker0:
image: ${TIKV_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/hub/tikv/tikv/image:dedicated-next-gen}
security_opt:
- seccomp:unconfined
volumes:
- ./next-gen-config/tikv-worker.toml:/tikv-worker.toml:ro
- ./data/tikv-worker0:/data
- ./log/tikv-worker0:/log
entrypoint: /tikv-worker
command: --addr=0.0.0.0:19000 --pd-endpoints=pd0:2379 --config=/tikv-worker.toml --data-dir=/data --log-file=/log/tikv-worker.log
depends_on:
- "pd0"
restart: on-failure
tidb0:
image: ${TIDB_IMAGE:-us-docker.pkg.dev/pingcap-testing-account/hub/pingcap/tidb/images/tidb-server:master-next-gen}
security_opt:
Expand Down
8 changes: 4 additions & 4 deletions tests/fullstack-test-next-gen/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ if [[ -n "$ENABLE_NEXT_GEN" && "$ENABLE_NEXT_GEN" != "false" && "$ENABLE_NEXT_GE
ENV_ARGS="ENABLE_NEXT_GEN=true verbose=${verbose} "
# most failpoints are expected to be set on the compute layer, use tiflash-cn0 to run tests
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test/sample.test"
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test-index/vector"
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test-index"
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test-next-gen/placement"
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test2/clustered_index"
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test2/dml"
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test2/variables"
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test2/mpp"
# TODO: enable the following tests after they are fixed. And maybe we need to split them into parallel pipelines because they take too long to run.
#${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test/expr"
#${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test/mpp"
# maybe we need to split them into parallel pipelines because they take too long to run.
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test/expr"
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" exec -T tiflash-cn0 bash -c "cd /tests ; ${ENV_ARGS} ./run-test.sh fullstack-test/mpp"
${COMPOSE} -f next-gen-cluster.yaml -f "${DISAGG_TIFLASH_YAML}" down
clean_data_log

Expand Down
3 changes: 0 additions & 3 deletions tests/fullstack-test/mpp/extra_physical_table_column.test
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# line 27: Pessimistic lock response corrupted
#SKIP_FOR_NEXT_GEN

# Preparation.
=> DBGInvoke __init_fail_point()

Expand Down