Skip to content

Commit a3cb89a

Browse files
authored
chore(query): Refresh virtual column support limit and selection (#19001)
1 parent 0c54db8 commit a3cb89a

File tree

13 files changed

+257
-13
lines changed

13 files changed

+257
-13
lines changed

โ€ŽCargo.lockโ€Ž

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€Žsrc/query/ast/src/ast/statements/virtual_column.rsโ€Ž

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use derive_visitor::Drive;
1919
use derive_visitor::DriveMut;
2020

2121
use crate::ast::write_dot_separated_list;
22+
use crate::ast::Expr;
2223
use crate::ast::Identifier;
2324
use crate::ast::ShowLimit;
2425

@@ -27,19 +28,30 @@ pub struct RefreshVirtualColumnStmt {
2728
pub catalog: Option<Identifier>,
2829
pub database: Option<Identifier>,
2930
pub table: Identifier,
31+
pub selection: Option<Box<Expr>>,
32+
pub limit: Option<u64>,
33+
pub overwrite: bool,
3034
}
3135

3236
impl Display for RefreshVirtualColumnStmt {
3337
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
34-
write!(f, "REFRESH VIRTUAL COLUMN FOR ")?;
38+
write!(f, "REFRESH VIRTUAL COLUMN ON ")?;
3539
write_dot_separated_list(
3640
f,
3741
self.catalog
3842
.iter()
3943
.chain(&self.database)
4044
.chain(Some(&self.table)),
4145
)?;
42-
46+
if let Some(selection) = &self.selection {
47+
write!(f, " WHERE {selection}")?;
48+
}
49+
if let Some(limit) = self.limit {
50+
write!(f, " LIMIT {limit}")?;
51+
}
52+
if self.overwrite {
53+
write!(f, " OVERWRITE")?;
54+
}
4355
Ok(())
4456
}
4557
}

โ€Žsrc/query/ast/src/parser/statement.rsโ€Ž

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,13 +1554,16 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
15541554

15551555
let refresh_virtual_column = map(
15561556
rule! {
1557-
REFRESH ~ VIRTUAL ~ COLUMN ~ FOR ~ #dot_separated_idents_1_to_3
1557+
REFRESH ~ VIRTUAL ~ ^COLUMN ~ ^( FOR | ON ) ~ ^#dot_separated_idents_1_to_3 ~ ( WHERE ~ ^#expr )? ~ ( LIMIT ~ ^#literal_u64 )? ~ OVERWRITE?
15581558
},
1559-
|(_, _, _, _, (catalog, database, table))| {
1559+
|(_, _, _, _, (catalog, database, table), opt_selection, opt_limit, opt_overwrite)| {
15601560
Statement::RefreshVirtualColumn(RefreshVirtualColumnStmt {
15611561
catalog,
15621562
database,
15631563
table,
1564+
selection: opt_selection.map(|(_, selection)| Box::new(selection)),
1565+
limit: opt_limit.map(|(_, limit)| limit),
1566+
overwrite: opt_overwrite.is_some(),
15641567
})
15651568
},
15661569
);

โ€Žsrc/query/ast/tests/it/testdata/stmt.txtโ€Ž

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22511,7 +22511,7 @@ DropDatamaskPolicy(
2251122511
---------- Input ----------
2251222512
REFRESH VIRTUAL COLUMN FOR t
2251322513
---------- Output ---------
22514-
REFRESH VIRTUAL COLUMN FOR t
22514+
REFRESH VIRTUAL COLUMN ON t
2251522515
---------- AST ------------
2251622516
RefreshVirtualColumn(
2251722517
RefreshVirtualColumnStmt {
@@ -22525,6 +22525,9 @@ RefreshVirtualColumn(
2252522525
quote: None,
2252622526
ident_type: None,
2252722527
},
22528+
selection: None,
22529+
limit: None,
22530+
overwrite: false,
2252822531
},
2252922532
)
2253022533

โ€Žsrc/query/ee/src/storages/fuse/operations/virtual_columns.rsโ€Ž

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use databend_common_pipeline::sources::AsyncSourcer;
3939
use databend_common_pipeline_transforms::processors::AsyncTransform;
4040
use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
4141
use databend_common_sql::executor::physical_plans::MutationKind;
42+
use databend_common_sql::plans::RefreshSelection;
4243
use databend_common_storages_fuse::io::read::read_segment_stats;
4344
use databend_common_storages_fuse::io::write_data;
4445
use databend_common_storages_fuse::io::BlockReader;
@@ -60,6 +61,8 @@ use databend_storages_common_table_meta::meta::BlockMeta;
6061
use databend_storages_common_table_meta::meta::ExtendedBlockMeta;
6162
use databend_storages_common_table_meta::meta::RawBlockHLL;
6263
use databend_storages_common_table_meta::meta::Statistics;
64+
use log::debug;
65+
use log::info;
6366
use opendal::Operator;
6467

6568
// The big picture of refresh virtual column into pipeline:
@@ -81,6 +84,9 @@ pub async fn do_refresh_virtual_column(
8184
ctx: Arc<dyn TableContext>,
8285
fuse_table: &FuseTable,
8386
pipeline: &mut Pipeline,
87+
limit: Option<u64>,
88+
overwrite: bool,
89+
selection: Option<RefreshSelection>,
8490
) -> Result<()> {
8591
let Some(snapshot) = fuse_table.read_table_snapshot().await? else {
8692
// no snapshot
@@ -127,9 +133,29 @@ pub async fn do_refresh_virtual_column(
127133

128134
let operator = fuse_table.get_operator_ref();
129135

136+
let limit = limit.unwrap_or_default() as usize;
137+
let segment_filter = selection.as_ref().and_then(|sel| match sel {
138+
RefreshSelection::SegmentLocation(loc) => Some(loc.clone()),
139+
_ => None,
140+
});
141+
let block_filter = selection.as_ref().and_then(|sel| match sel {
142+
RefreshSelection::BlockLocation(loc) => Some(loc.clone()),
143+
_ => None,
144+
});
145+
let mut matched_selection = false;
146+
let mut reached_limit = false;
130147
// Iterates through all segments and collect blocks don't have virtual block meta.
131148
let mut virtual_column_metas = VecDeque::new();
132149
for (segment_idx, (location, ver)) in snapshot.segments.iter().enumerate() {
150+
if reached_limit {
151+
break;
152+
}
153+
if let Some(target) = segment_filter.as_ref() {
154+
if location != target {
155+
continue;
156+
}
157+
matched_selection = true;
158+
}
133159
let segment_info = segment_reader
134160
.read(&LoadParams {
135161
location: location.to_string(),
@@ -144,9 +170,23 @@ pub async fn do_refresh_virtual_column(
144170
};
145171

146172
for (block_idx, block_meta) in segment_info.block_metas()?.into_iter().enumerate() {
147-
if block_meta.virtual_block_meta.is_some() {
173+
let mut matched_block_filter = false;
174+
if let Some(target) = block_filter.as_ref() {
175+
if &block_meta.location.0 != target {
176+
continue;
177+
}
178+
matched_selection = true;
179+
matched_block_filter = true;
180+
}
181+
182+
if !overwrite && block_meta.virtual_block_meta.is_some() {
183+
if matched_block_filter {
184+
reached_limit = true;
185+
break;
186+
}
148187
continue;
149188
}
189+
150190
virtual_column_metas.push_back(VirtualColumnMeta {
151191
index: BlockMetaIndex {
152192
segment_idx,
@@ -158,13 +198,40 @@ pub async fn do_refresh_virtual_column(
158198
.and_then(|v| v.block_hlls.get(block_idx))
159199
.cloned(),
160200
});
201+
202+
if limit > 0 && virtual_column_metas.len() >= limit {
203+
reached_limit = true;
204+
break;
205+
}
206+
if matched_block_filter {
207+
reached_limit = true;
208+
break;
209+
}
161210
}
162211
}
163212

213+
if let (Some(sel), false) = (selection.as_ref(), matched_selection) {
214+
let message = match sel {
215+
RefreshSelection::SegmentLocation(loc) => {
216+
format!("segment_location '{loc}' not found")
217+
}
218+
RefreshSelection::BlockLocation(loc) => {
219+
format!("block_location '{loc}' not found")
220+
}
221+
};
222+
return Err(ErrorCode::VirtualColumnError(message));
223+
}
224+
164225
if virtual_column_metas.is_empty() {
165226
return Ok(());
166227
}
167228

229+
let block_nums = virtual_column_metas.len();
230+
info!(
231+
"Prepared {} blocks for virtual column refresh (limit={}, overwrite={})",
232+
block_nums, limit, overwrite
233+
);
234+
168235
// Read source blocks.
169236
let settings = ReadSettings::from_ctx(&ctx)?;
170237
pipeline.add_source(
@@ -181,9 +248,12 @@ pub async fn do_refresh_virtual_column(
181248
)?;
182249

183250
// Extract inner fields as virtual columns and write virtual block data.
184-
let block_nums = virtual_column_metas.len();
185251
let max_threads = ctx.get_settings().get_max_threads()? as usize;
186252
let max_threads = std::cmp::min(block_nums, max_threads);
253+
info!(
254+
"Virtual column pipeline will process {} blocks with {} async workers",
255+
block_nums, max_threads
256+
);
187257
pipeline.try_resize(max_threads)?;
188258
pipeline.add_async_transformer(|| {
189259
VirtualColumnTransform::new(
@@ -229,12 +299,16 @@ pub async fn do_refresh_virtual_column(
229299
Ok(())
230300
}
231301

302+
const VIRTUAL_COLUMN_PROGRESS_LOG_STEP: usize = 10;
303+
232304
/// `VirtualColumnSource` is used to read data blocks that need generate virtual columns.
233305
pub struct VirtualColumnSource {
234306
settings: ReadSettings,
235307
storage_format: FuseStorageFormat,
236308
block_reader: Arc<BlockReader>,
237309
virtual_column_metas: VecDeque<VirtualColumnMeta>,
310+
total_blocks: usize,
311+
processed_blocks: usize,
238312
is_finished: bool,
239313
}
240314

@@ -245,11 +319,14 @@ impl VirtualColumnSource {
245319
block_reader: Arc<BlockReader>,
246320
virtual_column_metas: VecDeque<VirtualColumnMeta>,
247321
) -> Self {
322+
let total_blocks = virtual_column_metas.len();
248323
Self {
249324
settings,
250325
storage_format,
251326
block_reader,
252327
virtual_column_metas,
328+
total_blocks,
329+
processed_blocks: 0,
253330
is_finished: false,
254331
}
255332
}
@@ -267,6 +344,21 @@ impl AsyncSource for VirtualColumnSource {
267344

268345
match self.virtual_column_metas.pop_front() {
269346
Some(meta) => {
347+
self.processed_blocks += 1;
348+
if self.processed_blocks == 1
349+
|| self.processed_blocks == self.total_blocks
350+
|| self.processed_blocks % VIRTUAL_COLUMN_PROGRESS_LOG_STEP == 0
351+
{
352+
info!(
353+
"Virtual column source progress: {}/{}",
354+
self.processed_blocks, self.total_blocks
355+
);
356+
} else {
357+
debug!(
358+
"Virtual column source progress: {}/{}",
359+
self.processed_blocks, self.total_blocks
360+
);
361+
}
270362
let block = self
271363
.block_reader
272364
.read_by_meta(&self.settings, &meta.block_meta, &self.storage_format)
@@ -275,6 +367,12 @@ impl AsyncSource for VirtualColumnSource {
275367
Ok(Some(block))
276368
}
277369
None => {
370+
if !self.is_finished {
371+
info!(
372+
"Virtual column source finished reading {} blocks",
373+
self.processed_blocks
374+
);
375+
}
278376
self.is_finished = true;
279377
Ok(None)
280378
}
@@ -348,6 +446,15 @@ impl AsyncTransform for VirtualColumnTransform {
348446
start.elapsed().as_millis() as u64
349447
);
350448
}
449+
info!(
450+
"Virtual column written for segment {} block {} at {} ({} bytes)",
451+
index.segment_idx, index.block_idx, location, virtual_column_size
452+
);
453+
} else {
454+
info!(
455+
"No virtual column data produced for segment {} block {}",
456+
index.segment_idx, index.block_idx
457+
);
351458
}
352459

353460
let extended_block_meta = ExtendedBlockMeta {

โ€Žsrc/query/ee/src/virtual_column/virtual_column_handler.rsโ€Ž

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use databend_common_base::base::GlobalInstance;
1818
use databend_common_catalog::table_context::TableContext;
1919
use databend_common_exception::Result;
2020
use databend_common_pipeline::core::Pipeline;
21+
use databend_common_sql::plans::RefreshSelection;
2122
use databend_common_storages_fuse::FuseTable;
2223
use databend_enterprise_virtual_column::VirtualColumnHandler;
2324
use databend_enterprise_virtual_column::VirtualColumnHandlerWrapper;
@@ -33,8 +34,11 @@ impl VirtualColumnHandler for RealVirtualColumnHandler {
3334
ctx: Arc<dyn TableContext>,
3435
fuse_table: &FuseTable,
3536
pipeline: &mut Pipeline,
37+
limit: Option<u64>,
38+
overwrite: bool,
39+
selection: Option<RefreshSelection>,
3640
) -> Result<()> {
37-
do_refresh_virtual_column(ctx, fuse_table, pipeline).await
41+
do_refresh_virtual_column(ctx, fuse_table, pipeline, limit, overwrite, selection).await
3842
}
3943
}
4044

โ€Žsrc/query/ee/tests/it/storages/fuse/operations/virtual_columns.rsโ€Ž

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,15 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
5151
let snapshot = snapshot_opt.unwrap();
5252

5353
let mut build_res = PipelineBuildResult::create();
54-
do_refresh_virtual_column(table_ctx.clone(), fuse_table, &mut build_res.main_pipeline).await?;
54+
do_refresh_virtual_column(
55+
table_ctx.clone(),
56+
fuse_table,
57+
&mut build_res.main_pipeline,
58+
None,
59+
false,
60+
None,
61+
)
62+
.await?;
5563

5664
let settings = table_ctx.get_settings();
5765
build_res.set_max_threads(settings.get_max_threads()? as usize);

โ€Žsrc/query/ee_features/virtual_column/Cargo.tomlโ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ databend-common-base = { workspace = true }
1515
databend-common-catalog = { workspace = true }
1616
databend-common-exception = { workspace = true }
1717
databend-common-pipeline = { workspace = true }
18+
databend-common-sql = { workspace = true }
1819
databend-common-storages-fuse = { workspace = true }
1920

2021
[build-dependencies]

โ€Žsrc/query/ee_features/virtual_column/src/virtual_column.rsโ€Ž

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use databend_common_base::base::GlobalInstance;
1818
use databend_common_catalog::table_context::TableContext;
1919
use databend_common_exception::Result;
2020
use databend_common_pipeline::core::Pipeline;
21+
use databend_common_sql::plans::RefreshSelection;
2122
use databend_common_storages_fuse::FuseTable;
2223

2324
#[async_trait::async_trait]
@@ -27,6 +28,9 @@ pub trait VirtualColumnHandler: Sync + Send {
2728
ctx: Arc<dyn TableContext>,
2829
fuse_table: &FuseTable,
2930
pipeline: &mut Pipeline,
31+
limit: Option<u64>,
32+
overwrite: bool,
33+
selection: Option<RefreshSelection>,
3034
) -> Result<()>;
3135
}
3236

@@ -45,9 +49,12 @@ impl VirtualColumnHandlerWrapper {
4549
ctx: Arc<dyn TableContext>,
4650
fuse_table: &FuseTable,
4751
pipeline: &mut Pipeline,
52+
limit: Option<u64>,
53+
overwrite: bool,
54+
selection: Option<RefreshSelection>,
4855
) -> Result<()> {
4956
self.handler
50-
.do_refresh_virtual_column(ctx, fuse_table, pipeline)
57+
.do_refresh_virtual_column(ctx, fuse_table, pipeline, limit, overwrite, selection)
5158
.await
5259
}
5360
}

โ€Žsrc/query/expression/src/schema.rsโ€Ž

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ pub const SEARCH_SCORE_COLUMN_ID: u32 = u32::MAX - 8;
5757
pub const VECTOR_SCORE_COLUMN_ID: u32 = u32::MAX - 9;
5858

5959
pub const VIRTUAL_COLUMN_ID_START: u32 = 3_000_000_000;
60-
pub const VIRTUAL_COLUMNS_ID_UPPER: u32 = 3_000_001_000;
61-
pub const VIRTUAL_COLUMNS_LIMIT: usize = 1000;
60+
pub const VIRTUAL_COLUMNS_ID_UPPER: u32 = 3_000_002_000;
61+
pub const VIRTUAL_COLUMNS_LIMIT: usize = 2000;
6262

6363
// internal column name.
6464
pub const ROW_ID_COL_NAME: &str = "_row_id";

0 commit comments

Comments
ย (0)