Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions datafusion/execution/src/disk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ use datafusion_common::{
};
use log::debug;
use parking_lot::Mutex;
use rand::{Rng, rng};
use rand::{rng, Rng};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use tempfile::{Builder, NamedTempFile, TempDir};

use datafusion_common::human_readable_size;
Expand Down Expand Up @@ -77,6 +77,7 @@ impl DiskManagerBuilder {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: self.max_temp_directory_size,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}),
DiskManagerMode::Directories(conf_dirs) => {
let local_dirs = create_local_dirs(&conf_dirs)?;
Expand All @@ -87,12 +88,14 @@ impl DiskManagerBuilder {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: self.max_temp_directory_size,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})
}
DiskManagerMode::Disabled => Ok(DiskManager {
local_dirs: Mutex::new(None),
max_temp_directory_size: self.max_temp_directory_size,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}),
}
}
Expand Down Expand Up @@ -169,6 +172,17 @@ pub struct DiskManager {
/// Used disk space in the temporary directories. Now only spilled data for
/// external executors are counted.
used_disk_space: Arc<AtomicU64>,
/// Number of active temporary files created by this disk manager
active_files_count: Arc<AtomicUsize>,
}

/// Information about the current disk usage for spilling
#[derive(Debug, Clone, Copy)]
pub struct SpillingProgress {
/// Total bytes currently used on disk for spilling
pub current_bytes: u64,
/// Total number of active spill files
pub active_files_count: usize,
}

impl DiskManager {
Expand All @@ -187,6 +201,7 @@ impl DiskManager {
local_dirs: Mutex::new(Some(vec![])),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
DiskManagerConfig::NewSpecified(conf_dirs) => {
let local_dirs = create_local_dirs(&conf_dirs)?;
Expand All @@ -197,12 +212,14 @@ impl DiskManager {
local_dirs: Mutex::new(Some(local_dirs)),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
}))
}
DiskManagerConfig::Disabled => Ok(Arc::new(Self {
local_dirs: Mutex::new(None),
max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
used_disk_space: Arc::new(AtomicU64::new(0)),
active_files_count: Arc::new(AtomicUsize::new(0)),
})),
}
}
Expand Down Expand Up @@ -252,6 +269,14 @@ impl DiskManager {
self.max_temp_directory_size
}

/// Returns the current spilling progress
pub fn spilling_progress(&self) -> SpillingProgress {
SpillingProgress {
current_bytes: self.used_disk_space.load(Ordering::Relaxed),
active_files_count: self.active_files_count.load(Ordering::Relaxed),
}
}

/// Returns the temporary directory paths
pub fn temp_dir_paths(&self) -> Vec<PathBuf> {
self.local_dirs
Expand Down Expand Up @@ -301,6 +326,7 @@ impl DiskManager {
}

let dir_index = rng().random_range(0..local_dirs.len());
self.active_files_count.fetch_add(1, Ordering::Relaxed);
Ok(RefCountedTempFile {
parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
tempfile: Arc::new(
Expand Down Expand Up @@ -422,6 +448,9 @@ impl Drop for RefCountedTempFile {
self.disk_manager
.used_disk_space
.fetch_sub(current_usage, Ordering::Relaxed);
self.disk_manager
.active_files_count
.fetch_sub(1, Ordering::Relaxed);
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! store, memory manager, disk manager.

#[expect(deprecated)]
use crate::disk_manager::DiskManagerConfig;
use crate::disk_manager::{DiskManagerConfig, SpillingProgress};
use crate::{
disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
memory_pool::{
Expand Down Expand Up @@ -199,6 +199,11 @@ impl RuntimeEnv {
self.object_store_registry.get_store(url.as_ref())
}

/// Returns the current spilling progress
pub fn spilling_progress(&self) -> SpillingProgress {
self.disk_manager.spilling_progress()
}

/// Register an [`EncryptionFactory`] with an associated identifier that can be later
/// used to configure encryption when reading or writing Parquet.
/// If an encryption factory with the same identifier was already registered, it is replaced and returned.
Expand Down
29 changes: 23 additions & 6 deletions datafusion/physical-plan/src/spill/in_progress_spill_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl InProgressSpillFile {
}
if self.writer.is_none() {
let schema = batch.schema();
if let Some(ref in_progress_file) = self.in_progress_file {
if let Some(in_progress_file) = &mut self.in_progress_file {
self.writer = Some(IPCStreamWriter::new(
in_progress_file.path(),
schema.as_ref(),
Expand All @@ -72,18 +72,31 @@ impl InProgressSpillFile {

// Update metrics
self.spill_writer.metrics.spill_file_count.add(1);

// Update initial size (schema/header)
in_progress_file.update_disk_usage()?;
let initial_size = in_progress_file.current_disk_usage();
self.spill_writer
.metrics
.spilled_bytes
.add(initial_size as usize);
}
}
if let Some(writer) = &mut self.writer {
let (spilled_rows, _) = writer.write(batch)?;
if let Some(in_progress_file) = &mut self.in_progress_file {
let pre_size = in_progress_file.current_disk_usage();
in_progress_file.update_disk_usage()?;
let post_size = in_progress_file.current_disk_usage();

self.spill_writer.metrics.spilled_rows.add(spilled_rows);
self.spill_writer
.metrics
.spilled_bytes
.add((post_size - pre_size) as usize);
} else {
unreachable!() // Already checked inside current function
}

// Update metrics
self.spill_writer.metrics.spilled_rows.add(spilled_rows);
}
Ok(())
}
Expand All @@ -106,9 +119,13 @@ impl InProgressSpillFile {
// Since spill files are append-only, add the file size to spilled_bytes
if let Some(in_progress_file) = &mut self.in_progress_file {
// Since writer.finish() writes continuation marker and message length at the end
let pre_size = in_progress_file.current_disk_usage();
in_progress_file.update_disk_usage()?;
let size = in_progress_file.current_disk_usage();
self.spill_writer.metrics.spilled_bytes.add(size as usize);
let post_size = in_progress_file.current_disk_usage();
self.spill_writer
.metrics
.spilled_bytes
.add((post_size - pre_size) as usize);
}

Ok(self.in_progress_file.take())
Expand Down
66 changes: 66 additions & 0 deletions datafusion/physical-plan/src/spill/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,4 +799,70 @@ mod tests {
assert_eq!(alignment, 8);
Ok(())
}
#[tokio::test]
async fn test_real_time_spill_metrics() -> Result<()> {
let env = Arc::new(RuntimeEnv::default());
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));

let spill_manager = Arc::new(SpillManager::new(
Arc::clone(&env),
metrics.clone(),
Arc::clone(&schema),
));
let mut in_progress_file = spill_manager.create_in_progress_file("Test")?;

let batch1 = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)?;

// Before any batch, metrics should be 0
assert_eq!(metrics.spilled_bytes.value(), 0);
assert_eq!(metrics.spill_file_count.value(), 0);

// Append first batch
in_progress_file.append_batch(&batch1)?;

// Metrics should be updated immediately (at least schema and first batch)
let bytes_after_batch1 = metrics.spilled_bytes.value();
assert_eq!(bytes_after_batch1, 440);
assert_eq!(metrics.spill_file_count.value(), 1);

// Check global progress
let progress = env.spilling_progress();
assert_eq!(progress.current_bytes, bytes_after_batch1 as u64);
assert_eq!(progress.active_files_count, 1);

// Append another batch
in_progress_file.append_batch(&batch1)?;
let bytes_after_batch2 = metrics.spilled_bytes.value();
assert!(bytes_after_batch2 > bytes_after_batch1);

// Check global progress again
let progress = env.spilling_progress();
assert_eq!(progress.current_bytes, bytes_after_batch2 as u64);

// Finish the file
let _spilled_file = in_progress_file.finish()?;
let final_bytes = metrics.spilled_bytes.value();
assert!(final_bytes > bytes_after_batch2);

// Even after finish, file is still "active" until dropped
let progress = env.spilling_progress();
assert!(progress.current_bytes > 0);
assert_eq!(progress.active_files_count, 1);

drop(_spilled_file);
assert_eq!(env.spilling_progress().active_files_count, 0);
assert_eq!(env.spilling_progress().current_bytes, 0);

Ok(())
}
}
Loading