Skip to content

Commit d24a071

Browse files
authored
chore: add more logs to cover aggregate spill (#18980)
* chore: add more logs to cover aggregate spill * chore: add more logs to cover aggregate spill * chore: eliminate warnings from sqllogictests for tpch * chore: adjust log style * chore: adjust log level in cluster mode
1 parent ff62551 commit d24a071

File tree

10 files changed

+186
-95
lines changed

10 files changed

+186
-95
lines changed

scripts/ci/deploy/config/databend-query-node-2.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ definition = "CREATE FUNCTION ping(STRING) RETURNS STRING LANGUAGE python HANDLE
5656
[log]
5757

5858
[log.file]
59-
level = "INFO"
59+
level = "DEBUG"
6060
format = "text"
6161
dir = "./.databend/logs_2"
6262
limit = 12 # 12 files, 1 file per hour

scripts/ci/deploy/config/databend-query-node-3.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ definition = "CREATE FUNCTION ping(STRING) RETURNS STRING LANGUAGE python HANDLE
5757
[log]
5858

5959
[log.file]
60-
level = "INFO"
60+
level = "DEBUG"
6161
format = "text"
6262
limit = 12 # 12 files, 1 file per hour
6363
dir = "./.databend/logs_3"

src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ fn build_partition_bucket_experimental(
8484
ctx.clone(),
8585
output_num,
8686
shared_partition_stream.clone(),
87+
true,
8788
)?;
8889
let input_port = InputPort::create();
8990
let output_port = OutputPort::create();

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_aggregate_spiller.rs

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use databend_common_pipeline_transforms::MemorySettings;
3131
use databend_common_storage::DataOperator;
3232
use databend_common_storages_parquet::ReadSettings;
3333
use log::debug;
34+
use log::info;
3435
use parking_lot::Mutex;
3536
use parquet::file::metadata::RowGroupMetaData;
3637

@@ -46,6 +47,8 @@ use crate::spillers::SpillsDataWriter;
4647

4748
struct PayloadWriter {
4849
path: String,
50+
// TODO: this may change to lazy init, for now it will create 128*thread_num files at most even
51+
// if the writer not used to write.
4952
writer: SpillsDataWriter,
5053
}
5154

@@ -110,16 +113,23 @@ struct AggregatePayloadWriters {
110113
writers: Vec<PayloadWriter>,
111114
write_stats: WriteStats,
112115
ctx: Arc<QueryContext>,
116+
is_local: bool,
113117
}
114118

115119
impl AggregatePayloadWriters {
116-
pub fn create(prefix: &str, partition_count: usize, ctx: Arc<QueryContext>) -> Self {
120+
pub fn create(
121+
prefix: &str,
122+
partition_count: usize,
123+
ctx: Arc<QueryContext>,
124+
is_local: bool,
125+
) -> Self {
117126
AggregatePayloadWriters {
118127
spill_prefix: prefix.to_string(),
119128
partition_count,
120129
writers: vec![],
121130
write_stats: WriteStats::default(),
122131
ctx,
132+
is_local,
123133
}
124134
}
125135

@@ -166,6 +176,18 @@ impl AggregatePayloadWriters {
166176
for (partition_id, writer) in writers.into_iter().enumerate() {
167177
let (path, written_size, row_groups) = writer.close()?;
168178

179+
if written_size != 0 {
180+
info!(
181+
"Write aggregate spill finished({}): (bucket: {}, location: {}, bytes: {}, rows: {}, batch_count: {})",
182+
if self.is_local { "local" } else { "exchange" },
183+
partition_id,
184+
path,
185+
written_size,
186+
row_groups.iter().map(|rg| rg.num_rows()).sum::<i64>(),
187+
row_groups.len()
188+
);
189+
}
190+
169191
self.ctx.add_spill_file(
170192
Location::Remote(path.clone()),
171193
Layout::Aggregate,
@@ -276,13 +298,15 @@ impl NewAggregateSpiller {
276298
ctx: Arc<QueryContext>,
277299
partition_count: usize,
278300
partition_stream: SharedPartitionStream,
301+
is_local: bool,
279302
) -> Result<Self> {
280303
let memory_settings = MemorySettings::from_aggregate_settings(&ctx)?;
281304
let table_ctx: Arc<dyn TableContext> = ctx.clone();
282305
let read_setting = ReadSettings::from_settings(&table_ctx.get_settings())?;
283306
let spill_prefix = ctx.query_id_spill_prefix();
284307

285-
let payload_writers = AggregatePayloadWriters::create(&spill_prefix, partition_count, ctx);
308+
let payload_writers =
309+
AggregatePayloadWriters::create(&spill_prefix, partition_count, ctx, is_local);
286310

287311
Ok(Self {
288312
memory_settings,
@@ -320,12 +344,21 @@ impl NewAggregateSpiller {
320344

321345
let operator = DataOperator::instance().spill_operator();
322346
let buffer_pool = SpillsBufferPool::instance();
323-
let mut reader = buffer_pool.reader(operator.clone(), location, vec![row_group.clone()])?;
324347

325-
let read_bytes = row_group.total_byte_size() as usize;
348+
let mut reader =
349+
buffer_pool.reader(operator.clone(), location.clone(), vec![row_group.clone()])?;
350+
326351
let instant = Instant::now();
327352
let data_block = reader.read(self.read_setting)?;
328-
flush_read_profile(&instant, read_bytes);
353+
let elapsed = instant.elapsed();
354+
355+
let read_size = reader.read_bytes();
356+
flush_read_profile(&elapsed, read_size);
357+
358+
info!(
359+
"Read aggregate spill finished: (bucket: {}, location: {}, bytes: {}, rows: {}, elapsed: {:?})",
360+
bucket, location, read_size, row_group.num_rows(), elapsed
361+
);
329362

330363
if let Some(block) = data_block {
331364
Ok(AggregateMeta::Serialized(SerializedPayload {
@@ -339,12 +372,12 @@ impl NewAggregateSpiller {
339372
}
340373
}
341374

342-
fn flush_read_profile(instant: &Instant, read_bytes: usize) {
375+
fn flush_read_profile(elapsed: &Duration, read_bytes: usize) {
343376
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1);
344377
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes);
345378
Profile::record_usize_profile(
346379
ProfileStatisticsName::RemoteSpillReadTime,
347-
instant.elapsed().as_millis() as usize,
380+
elapsed.as_millis() as usize,
348381
);
349382
}
350383

src/query/service/src/pipelines/processors/transforms/aggregator/new_aggregate/new_transform_aggregate_spill_writer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl NewTransformAggregateSpillWriter {
4343
) -> Result<Box<dyn Processor>> {
4444
let partition_count = MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize;
4545
let spiller =
46-
NewAggregateSpiller::try_create(ctx, partition_count, shared_partition_stream)?;
46+
NewAggregateSpiller::try_create(ctx, partition_count, shared_partition_stream, true)?;
4747

4848
Ok(AccumulatingTransformer::create(
4949
input,

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_aggregate_spill_writer.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ pub fn agg_spilling_aggregate_payload(
192192
let mut spilled_buckets_payloads = Vec::with_capacity(partition_count);
193193
// Record how many rows are spilled.
194194
let mut rows = 0;
195+
let mut buckets_count = 0;
195196
let location = spiller.create_unique_location();
196197
for (bucket, payload) in partitioned_payload.payloads.into_iter().enumerate() {
197198
if payload.len() == 0 {
@@ -200,6 +201,7 @@ pub fn agg_spilling_aggregate_payload(
200201

201202
let data_block = payload.aggregate_flush_all()?.consume_convert_to_full();
202203
rows += data_block.num_rows();
204+
buckets_count += 1;
203205

204206
let begin = write_size;
205207
let mut columns_data = Vec::with_capacity(data_block.num_columns());
@@ -227,6 +229,7 @@ pub fn agg_spilling_aggregate_payload(
227229
let (location, write_bytes) = spiller
228230
.spill_stream_aggregate_buffer(Some(location), write_data)
229231
.await?;
232+
let elapsed = instant.elapsed();
230233
// perf
231234
{
232235
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1);
@@ -236,7 +239,7 @@ pub fn agg_spilling_aggregate_payload(
236239
);
237240
Profile::record_usize_profile(
238241
ProfileStatisticsName::RemoteSpillWriteTime,
239-
instant.elapsed().as_millis() as usize,
242+
elapsed.as_millis() as usize,
240243
);
241244
}
242245

@@ -249,9 +252,8 @@ pub fn agg_spilling_aggregate_payload(
249252
}
250253

251254
info!(
252-
"Write aggregate spill {} successfully, elapsed: {:?}",
253-
location,
254-
instant.elapsed()
255+
"Write aggregate spill finished(local): (location: {}, bytes: {}, rows: {}, buckets_count: {}, elapsed: {:?})",
256+
location, write_bytes, rows, buckets_count, elapsed
255257
);
256258
}
257259

src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,13 @@ impl TransformExchangeAggregateSerializer {
107107
let spiller = if params.enable_experiment_aggregate {
108108
let spillers = partition_streams
109109
.into_iter()
110-
.map(|stream| {
110+
.enumerate()
111+
.map(|(pos, stream)| {
111112
NewAggregateSpiller::try_create(
112113
ctx.clone(),
113114
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
114115
stream.clone(),
116+
pos == local_pos,
115117
)
116118
})
117119
.collect::<Result<Vec<NewAggregateSpiller>>>()?;
@@ -356,6 +358,7 @@ fn exchange_agg_spilling_aggregate_payload(
356358
) -> Result<BoxFuture<'static, Result<DataBlock>>> {
357359
let partition_count = partitioned_payload.partition_count();
358360
let mut write_size = 0;
361+
let mut buckets_count = 0;
359362
let mut write_data = Vec::with_capacity(partition_count);
360363
let mut buckets_column_data = Vec::with_capacity(partition_count);
361364
let mut data_range_start_column_data = Vec::with_capacity(partition_count);
@@ -371,6 +374,7 @@ fn exchange_agg_spilling_aggregate_payload(
371374

372375
let data_block = payload.aggregate_flush_all()?;
373376
rows += data_block.num_rows();
377+
buckets_count += 1;
374378

375379
let old_write_size = write_size;
376380
let columns = data_block.columns().to_vec();
@@ -398,6 +402,7 @@ fn exchange_agg_spilling_aggregate_payload(
398402
let (location, write_bytes) = spiller
399403
.spill_stream_aggregate_buffer(None, write_data)
400404
.await?;
405+
let elapsed = instant.elapsed();
401406
// perf
402407
{
403408
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1);
@@ -407,7 +412,7 @@ fn exchange_agg_spilling_aggregate_payload(
407412
);
408413
Profile::record_usize_profile(
409414
ProfileStatisticsName::RemoteSpillWriteTime,
410-
instant.elapsed().as_millis() as usize,
415+
elapsed.as_millis() as usize,
411416
);
412417
}
413418

@@ -422,9 +427,8 @@ fn exchange_agg_spilling_aggregate_payload(
422427
}
423428

424429
info!(
425-
"Write aggregate spill {} successfully, elapsed: {:?}",
426-
location,
427-
instant.elapsed()
430+
"Write aggregate spill finished(exchange): (location: {}, bytes: {}, rows: {}, buckets_count: {}, elapsed: {:?})",
431+
location, write_bytes, rows, buckets_count, elapsed
428432
);
429433

430434
let data_block = DataBlock::new_from_columns(vec![

0 commit comments

Comments
 (0)