Skip to content

Commit c0d280e

Browse files
authored
Merge pull request #1161 from Altinity/export_replicated_mt_partition_v2
Fix export part crash and add docs for export partition
2 parents fcb6ded + 6b07680 commit c0d280e

File tree

8 files changed

+200
-10
lines changed

8 files changed

+200
-10
lines changed
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
# ALTER TABLE EXPORT PARTITION
2+
3+
## Overview
4+
5+
The `ALTER TABLE EXPORT PARTITION` command exports entire partitions from Replicated*MergeTree tables to object storage (S3, Azure Blob Storage, etc.), typically in Parquet format. This feature coordinates export part operations across all replicas using ZooKeeper.
6+
7+
Each MergeTree part will become a separate file with the following name convention: `<table_directory>/<partitioning>/<data_part_name>_<merge_tree_part_checksum>.<format>`. To ensure atomicity, a commit file containing the relative paths of all exported parts is also shipped. A data file should only be considered part of the dataset if a commit file references it. The commit file will be named using the following convention: `<table_directory>/commit_<partition_id>_<transaction_id>`.
8+
9+
The set of parts that are exported is based on the list of parts the replica that received the export command sees. The other replicas will assist in the export process if they have those parts locally. Otherwise they will ignore it.
10+
11+
The partition export tasks can be observed through `system.replicated_partition_exports`. Querying this table results in a query to ZooKeeper, so it must be used with care. Individual part export progress can be observed as usual through `system.exports`.
12+
13+
The same partition can not be exported to the same destination more than once. There are two ways to override this behavior: either by setting the `export_merge_tree_partition_force_export` setting or waiting for the task to expire.
14+
15+
The export task can be killed by issuing the kill command: `KILL EXPORT PARTITION <where predicate for system.replicated_partition_exports>`.
16+
17+
The task is persistent - it should be resumed after crashes, failures and etc.
18+
19+
## Syntax
20+
21+
```sql
22+
ALTER TABLE [database.]table_name
23+
EXPORT PARTITION ID 'partition_id'
24+
TO TABLE [destination_database.]destination_table
25+
[SETTINGS setting_name = value, ...]
26+
```
27+
28+
### Parameters
29+
30+
- **`table_name`**: The source Replicated*MergeTree table containing the partition to export
31+
- **`partition_id`**: The partition identifier to export (e.g., `'2020'`, `'2021'`)
32+
- **`destination_table`**: The target table for the export (typically an S3, Azure, or other object storage table)
33+
34+
## Settings
35+
36+
### Server Settings
37+
38+
#### `enable_experimental_export_merge_tree_partition_feature` (Required)
39+
40+
- **Type**: `Bool`
41+
- **Default**: `false`
42+
- **Description**: Enable export replicated merge tree partition feature. It is experimental and not yet ready for production use.
43+
44+
### Query Settings
45+
46+
#### `export_merge_tree_partition_force_export` (Optional)
47+
48+
- **Type**: `Bool`
49+
- **Default**: `false`
50+
- **Description**: Ignore existing partition export and overwrite the ZooKeeper entry. Allows re-exporting a partition to the same destination before the manifest expires.
51+
52+
#### `export_merge_tree_partition_max_retries` (Optional)
53+
54+
- **Type**: `UInt64`
55+
- **Default**: `3`
56+
- **Description**: Maximum number of retries for exporting a merge tree part in an export partition task. If it exceeds, the entire task fails.
57+
58+
#### `export_merge_tree_partition_manifest_ttl` (Optional)
59+
60+
- **Type**: `UInt64`
61+
- **Default**: `180` (seconds)
62+
- **Description**: Determines how long the manifest will live in ZooKeeper. It prevents the same partition from being exported twice to the same destination. This setting does not affect or delete in-progress tasks; it only cleans up completed ones.
63+
64+
#### `export_merge_tree_part_file_already_exists_policy` (Optional)
65+
66+
- **Type**: `MergeTreePartExportFileAlreadyExistsPolicy`
67+
- **Default**: `skip`
68+
- **Description**: Policy for handling files that already exist during export. Possible values:
69+
- `skip` - Skip the file if it already exists
70+
- `error` - Throw an error if the file already exists
71+
- `overwrite` - Overwrite the file
72+
73+
## Examples
74+
75+
### Basic Export to S3
76+
77+
```sql
78+
CREATE TABLE rmt_table (id UInt64, year UInt16)
79+
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/rmt_table', 'replica1')
80+
PARTITION BY year ORDER BY tuple();
81+
82+
CREATE TABLE s3_table (id UInt64, year UInt16)
83+
ENGINE = S3(s3_conn, filename='data', format=Parquet, partition_strategy='hive')
84+
PARTITION BY year;
85+
86+
INSERT INTO rmt_table VALUES (1, 2020), (2, 2020), (3, 2020), (4, 2021);
87+
88+
ALTER TABLE rmt_table EXPORT PARTITION ID '2020' TO TABLE s3_table;
89+
90+
## Killing Exports
91+
92+
You can cancel in-progress partition exports using the `KILL EXPORT PARTITION` command:
93+
94+
```sql
95+
KILL EXPORT PARTITION
96+
WHERE partition_id = '2020'
97+
AND source_table = 'rmt_table'
98+
AND destination_table = 's3_table'
99+
```
100+
101+
The `WHERE` clause filters exports from the `system.replicated_partition_exports` table. You can use any columns from that table in the filter.
102+
103+
## Monitoring
104+
105+
### Active and Completed Exports
106+
107+
Monitor partition exports using the `system.replicated_partition_exports` table:
108+
109+
```sql
110+
arthur :) select * from system.replicated_partition_exports Format Vertical;
111+
112+
SELECT *
113+
FROM system.replicated_partition_exports
114+
FORMAT Vertical
115+
116+
Query id: 9efc271a-a501-44d1-834f-bc4d20156164
117+
118+
Row 1:
119+
──────
120+
source_database: default
121+
source_table: replicated_source
122+
destination_database: default
123+
destination_table: replicated_destination
124+
create_time: 2025-11-21 18:21:51
125+
partition_id: 2022
126+
transaction_id: 7397746091717128192
127+
source_replica: r1
128+
parts: ['2022_0_0_0','2022_1_1_0','2022_2_2_0']
129+
parts_count: 3
130+
parts_to_do: 0
131+
status: COMPLETED
132+
exception_replica:
133+
last_exception:
134+
exception_part:
135+
exception_count: 0
136+
137+
Row 2:
138+
──────
139+
source_database: default
140+
source_table: replicated_source
141+
destination_database: default
142+
destination_table: replicated_destination
143+
create_time: 2025-11-21 18:20:35
144+
partition_id: 2021
145+
transaction_id: 7397745772618674176
146+
source_replica: r1
147+
parts: ['2021_0_0_0']
148+
parts_count: 1
149+
parts_to_do: 0
150+
status: COMPLETED
151+
exception_replica:
152+
last_exception:
153+
exception_part:
154+
exception_count: 0
155+
156+
2 rows in set. Elapsed: 0.019 sec.
157+
158+
arthur :)
159+
```
160+
161+
Status values include:
162+
- `PENDING` - Export is queued / in progress
163+
- `COMPLETED` - Export finished successfully
164+
- `FAILED` - Export failed
165+
- `KILLED` - Export was cancelled
166+
167+
## Related Features
168+
169+
- [ALTER TABLE EXPORT PART](/docs/en/engines/table-engines/mergetree-family/part_export.md) - Export individual parts (non-replicated)
170+

src/Interpreters/ClientInfo.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ class ClientInfo
139139
NOT_A_BACKGROUND_OPERATION = 0,
140140
MERGE = 1,
141141
MUTATION = 2,
142+
EXPORT_PART = 3,
142143
};
143144

144145
/// It's ClientInfo and context created for background operation (not real query)

src/Interpreters/Context.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3136,6 +3136,13 @@ void Context::makeQueryContextForMutate(const MergeTreeSettings & merge_tree_set
31363136
= merge_tree_settings[MergeTreeSetting::mutation_workload].value.empty() ? getMutationWorkload() : merge_tree_settings[MergeTreeSetting::mutation_workload];
31373137
}
31383138

3139+
void Context::makeQueryContextForExportPart()
3140+
{
3141+
makeQueryContext();
3142+
classifier.reset(); // It is assumed that there are no active queries running using this classifier, otherwise this will lead to crashes
3143+
// Export part operations don't have a specific workload setting, so we leave the default workload
3144+
}
3145+
31393146
void Context::makeSessionContext()
31403147
{
31413148
session_context = shared_from_this();

src/Interpreters/Context.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
11351135
void makeQueryContext();
11361136
void makeQueryContextForMerge(const MergeTreeSettings & merge_tree_settings);
11371137
void makeQueryContextForMutate(const MergeTreeSettings & merge_tree_settings);
1138+
void makeQueryContextForExportPart();
11381139
void makeSessionContext();
11391140
void makeGlobalContext();
11401141

src/Storages/MergeTree/ExportPartTask.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ ExportPartTask::ExportPartTask(MergeTreeData & storage_, const MergeTreePartExpo
4747

4848
bool ExportPartTask::executeStep()
4949
{
50-
const auto & metadata_snapshot = manifest.storage_snapshot->metadata;
50+
const auto & metadata_snapshot = manifest.metadata_snapshot;
5151

5252
Names columns_to_read = metadata_snapshot->getColumns().getNamesOfPhysical();
5353

@@ -142,9 +142,13 @@ bool ExportPartTask::executeStep()
142142
bool read_with_direct_io = local_context->getSettingsRef()[Setting::min_bytes_to_use_direct_io] > manifest.data_part->getBytesOnDisk();
143143
bool prefetch = false;
144144

145-
const auto & snapshot_data = assert_cast<const MergeTreeData::SnapshotData &>(*manifest.storage_snapshot->data);
146-
auto mutations_snapshot = snapshot_data.mutations_snapshot;
145+
MergeTreeData::IMutationsSnapshot::Params mutations_snapshot_params
146+
{
147+
.metadata_version = metadata_snapshot->getMetadataVersion(),
148+
.min_part_metadata_version = manifest.data_part->getMetadataVersion()
149+
};
147150

151+
auto mutations_snapshot = storage.getMutationsSnapshot(mutations_snapshot_params);
148152
auto alter_conversions = MergeTreeData::getAlterConversionsForPart(
149153
manifest.data_part,
150154
mutations_snapshot,
@@ -156,7 +160,7 @@ bool ExportPartTask::executeStep()
156160
read_type,
157161
plan_for_part,
158162
storage,
159-
manifest.storage_snapshot,
163+
storage.getStorageSnapshot(metadata_snapshot, local_context),
160164
RangesInDataPart(manifest.data_part),
161165
alter_conversions,
162166
nullptr,

src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ namespace
2222
ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest)
2323
{
2424
auto context_copy = Context::createCopy(context);
25+
context_copy->makeQueryContextForExportPart();
26+
context_copy->setCurrentQueryId(manifest.transaction_id);
2527
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
2628
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
2729
context_copy->setSetting("max_threads", manifest.max_threads);

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6262,7 +6262,7 @@ void MergeTreeData::exportPartToTable(
62626262
transaction_id,
62636263
query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
62646264
format_settings,
6265-
getStorageSnapshot(source_metadata_ptr, query_context),
6265+
source_metadata_ptr,
62666266
completion_callback);
62676267

62686268
std::lock_guard lock(export_manifests_mutex);
@@ -9132,7 +9132,12 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee)
91329132
continue;
91339133
}
91349134

9135-
auto task = std::make_shared<ExportPartTask>(*this, manifest, getContext());
9135+
auto context_copy = Context::createCopy(getContext());
9136+
context_copy->makeQueryContextForExportPart();
9137+
context_copy->setCurrentQueryId(manifest.transaction_id);
9138+
context_copy->setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType::EXPORT_PART);
9139+
9140+
auto task = std::make_shared<ExportPartTask>(*this, manifest, context_copy);
91369141

91379142
manifest.in_progress = assignee.scheduleMoveTask(task);
91389143

src/Storages/MergeTree/MergeTreePartExportManifest.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ struct MergeTreePartExportManifest
4747
const String & transaction_id_,
4848
FileAlreadyExistsPolicy file_already_exists_policy_,
4949
const FormatSettings & format_settings_,
50-
const StorageSnapshotPtr & storage_snapshot_,
50+
const StorageMetadataPtr & metadata_snapshot_,
5151
std::function<void(CompletionCallbackResult)> completion_callback_ = {})
5252
: destination_storage_id(destination_storage_id_),
5353
data_part(data_part_),
5454
transaction_id(transaction_id_),
5555
file_already_exists_policy(file_already_exists_policy_),
5656
format_settings(format_settings_),
57-
storage_snapshot(storage_snapshot_),
57+
metadata_snapshot(metadata_snapshot_),
5858
completion_callback(completion_callback_),
5959
create_time(time(nullptr)) {}
6060

@@ -65,9 +65,9 @@ struct MergeTreePartExportManifest
6565
FileAlreadyExistsPolicy file_already_exists_policy;
6666
FormatSettings format_settings;
6767

68-
/// Storage snapshot captured at the time of query validation to prevent race conditions with mutations
68+
/// Metadata snapshot captured at the time of query validation to prevent race conditions with mutations
6969
/// Otherwise the export could fail if the schema changes between validation and execution
70-
StorageSnapshotPtr storage_snapshot;
70+
StorageMetadataPtr metadata_snapshot;
7171

7272
std::function<void(CompletionCallbackResult)> completion_callback;
7373

0 commit comments

Comments
 (0)