diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 284af908f92..72391d51dbc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -627,7 +627,7 @@ jobs: --exclude vortex-cub --exclude vortex-test-e2e-cuda --exclude duckdb-bench ` --exclude lance-bench --exclude datafusion-bench --exclude random-access-bench ` --exclude compress-bench --exclude xtask --exclude vortex-datafusion ` - --exclude vortex-sqllogictest + --exclude gpu-scan-cli --exclude vortex-sqllogictest - name: Rust Tests (Other) if: matrix.os != 'windows-x64' run: | diff --git a/Cargo.lock b/Cargo.lock index 59bfa30932b..ea88c2c94a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4118,6 +4118,19 @@ dependencies = [ "yansi", ] +[[package]] +name = "gpu-scan-cli" +version = "0.1.0" +dependencies = [ + "futures", + "tokio", + "tracing", + "tracing-subscriber", + "vortex", + "vortex-cuda", + "vortex-cuda-macros", +] + [[package]] name = "grid" version = "1.0.0" @@ -9582,6 +9595,16 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.22" @@ -9592,12 +9615,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex-automata", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index bd82b6bac7f..387a1ec7733 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "vortex-duckdb", "vortex-cuda", "vortex-cuda/cub", + "vortex-cuda/gpu-scan-cli", "vortex-cuda/macros", "vortex-cuda/nvcomp", "vortex-cxx", diff --git a/_typos.toml b/_typos.toml index b2af33e423b..5d601d1036f 100644 --- a/_typos.toml +++ b/_typos.toml @@ -1,5 +1,5 @@ [default] -extend-ignore-identifiers-re = ["FoR", "typ"] +extend-ignore-identifiers-re = ["ffor", "FFOR", "FoR", "typ"] # We support a few common special comments to tell the checker to ignore sections of code extend-ignore-re = [ "(#|//)\\s*spellchecker:ignore-next-line\\n.*", # Ignore the next line diff --git a/vortex-array/src/arrays/primitive/vtable/mod.rs b/vortex-array/src/arrays/primitive/vtable/mod.rs index d316e6e93cd..450f9faf99e 100644 --- a/vortex-array/src/arrays/primitive/vtable/mod.rs +++ b/vortex-array/src/arrays/primitive/vtable/mod.rs @@ -89,6 +89,11 @@ impl VTable for PrimitiveVTable { let ptype = PType::try_from(dtype)?; + vortex_ensure!( + buffer.is_aligned_to(Alignment::new(ptype.byte_width())), + "Misaligned buffer cannot be used to build PrimitiveArray of {ptype}" + ); + if buffer.len() != ptype.byte_width() * len { vortex_bail!( "Buffer length {} does not match expected length {} for {}, {}", diff --git a/vortex-btrblocks/public-api.lock b/vortex-btrblocks/public-api.lock index 30aa578343e..bc5af615759 100644 --- a/vortex-btrblocks/public-api.lock +++ b/vortex-btrblocks/public-api.lock @@ -222,6 +222,8 @@ impl vortex_btrblocks::BtrBlocksCompressorBuilder pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::build(self) -> vortex_btrblocks::BtrBlocksCompressor +pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::empty() -> Self + pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::exclude_float(self, codes: impl core::iter::traits::collect::IntoIterator) -> Self pub fn vortex_btrblocks::BtrBlocksCompressorBuilder::exclude_int(self, codes: impl core::iter::traits::collect::IntoIterator) -> Self diff --git a/vortex-btrblocks/src/builder.rs b/vortex-btrblocks/src/builder.rs index b71ca7f9caf..d329ec8c139 100644 --- a/vortex-btrblocks/src/builder.rs +++ b/vortex-btrblocks/src/builder.rs @@ -71,6 +71,15 @@ impl Default for BtrBlocksCompressorBuilder { } impl BtrBlocksCompressorBuilder { + /// Create a new builder with no encodings enabled. + pub fn empty() -> Self { + Self { + int_schemes: Default::default(), + float_schemes: Default::default(), + string_schemes: Default::default(), + } + } + /// Excludes the specified integer compression schemes. pub fn exclude_int(mut self, codes: impl IntoIterator) -> Self { let codes: HashSet<_> = codes.into_iter().collect(); diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index 0636fc3a91d..5d042de0a56 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -34,7 +34,8 @@ futures = { workspace = true, features = ["executor"] } kanal = { workspace = true } paste = { workspace = true } prost = { workspace = true } -tracing = { workspace = true } +tokio = { workspace = true, features = ["fs"] } +tracing = { workspace = true, features = ["std", "attributes"] } vortex-alp = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } diff --git a/vortex-cuda/benches/bitpacked_cuda.rs b/vortex-cuda/benches/bitpacked_cuda.rs index b551c91c9e1..880ce673e6d 100644 --- a/vortex-cuda/benches/bitpacked_cuda.rs +++ b/vortex-cuda/benches/bitpacked_cuda.rs @@ -6,27 +6,24 @@ #![allow(clippy::unwrap_used)] #![allow(clippy::cast_possible_truncation)] +mod common; + use std::mem::size_of; use std::ops::Add; +use std::sync::Arc; +use std::sync::atomic::Ordering; use std::time::Duration; use criterion::BenchmarkId; use criterion::Criterion; use criterion::Throughput; use cudarc::driver::DeviceRepr; -use cudarc::driver::PushKernelArg; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_BLOCKING_SYNC; use futures::executor::block_on; use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity::NonNullable; use vortex_buffer::Buffer; -use vortex_cuda::CudaBufferExt; -use vortex_cuda::CudaDeviceBuffer; -use vortex_cuda::CudaExecutionCtx; use vortex_cuda::CudaSession; -use vortex_cuda::bitpacked_cuda_kernel; -use vortex_cuda::bitpacked_cuda_launch_config; -use vortex_cuda::launch_cuda_kernel_with_config; +use vortex_cuda::executor::CudaArrayExt; use vortex_cuda_macros::cuda_available; use vortex_cuda_macros::cuda_not_available; use vortex_dtype::NativePType; @@ -35,6 +32,8 @@ use vortex_fastlanes::BitPackedArray; use vortex_fastlanes::unpack_iter::BitPacked; use vortex_session::VortexSession; +use crate::common::TimedLaunchStrategy; + const N_ROWS: usize = 100_000_000; /// Create a bit-packed array with the given bit width @@ -56,54 +55,6 @@ where .vortex_expect("failed to create BitPacked array") } -/// Launch the bit unpacking kernel and return elapsed GPU time -fn launch_bitunpack_kernel_timed_typed( - bitpacked_array: &BitPackedArray, - cuda_ctx: &mut CudaExecutionCtx, -) -> vortex_error::VortexResult -where - T: BitPacked + DeviceRepr, - T::Physical: DeviceRepr, -{ - let packed = bitpacked_array.packed().clone(); - let bit_width = bitpacked_array.bit_width(); - let len = bitpacked_array.len(); - - // Move packed data to device if not already there - let device_input = if packed.is_on_device() { - packed - } else { - block_on(cuda_ctx.move_to_device(packed)?).vortex_expect("failed to move to device") - }; - - // Allocate output buffer - let output_slice = cuda_ctx - .device_alloc::(len.next_multiple_of(1024)) - .vortex_expect("failed to allocate output"); - let output_buf = CudaDeviceBuffer::new(output_slice); - - // Get device views - let input_view = device_input - .cuda_view::() - .vortex_expect("failed to get input view"); - let output_view = output_buf.as_view::(); - - let output_width = size_of::() * 8; - let cuda_function = bitpacked_cuda_kernel(bit_width, output_width, cuda_ctx)?; - let mut launch_builder = cuda_ctx.launch_builder(&cuda_function); - - launch_builder.arg(&input_view); - launch_builder.arg(&output_view); - - let config = bitpacked_cuda_launch_config(output_width, len)?; - - // Launch kernel - let events = - launch_cuda_kernel_with_config(&mut launch_builder, config, CU_EVENT_BLOCKING_SYNC)?; - - events.duration() -} - /// Generic benchmark function for a specific type and bit width fn benchmark_bitunpack_typed(c: &mut Criterion, bit_width: u8, type_name: &str) where @@ -123,19 +74,18 @@ where &array, |b, array| { b.iter_custom(|iters| { - let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) - .vortex_expect("failed to create execution context"); + let timed = TimedLaunchStrategy::default(); + let timer = Arc::clone(&timed.total_time_ns); - let mut total_time = Duration::ZERO; + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context") + .with_launch_strategy(Arc::new(timed)); for _ in 0..iters { - let kernel_time = - launch_bitunpack_kernel_timed_typed::(array, &mut cuda_ctx) - .vortex_expect("kernel launch failed"); - total_time += kernel_time; + block_on(array.to_array().execute_cuda(&mut cuda_ctx)).unwrap(); } - total_time + Duration::from_nanos(timer.load(Ordering::Relaxed)) }); }, ); diff --git a/vortex-cuda/benches/common/mod.rs b/vortex-cuda/benches/common/mod.rs new file mode 100644 index 00000000000..578cd398c79 --- /dev/null +++ b/vortex-cuda/benches/common/mod.rs @@ -0,0 +1,33 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +use cudarc::driver::sys::CUevent_flags; +use cudarc::driver::sys::CUevent_flags::CU_EVENT_BLOCKING_SYNC; +use vortex_cuda::CudaKernelEvents; +use vortex_cuda::LaunchStrategy; +use vortex_error::VortexResult; + +#[derive(Debug, Default)] +pub struct TimedLaunchStrategy { + pub total_time_ns: Arc, +} + +impl LaunchStrategy for TimedLaunchStrategy { + fn event_flags(&self) -> CUevent_flags { + // using blocking_sync to make sure all events flush before we complete. + CU_EVENT_BLOCKING_SYNC + } + + fn on_complete(&self, events: &CudaKernelEvents, _len: usize) -> VortexResult<()> { + // NOTE: as long as the duration < 584 years this cast is safe. + let elapsed_nanos = events.duration()?.as_nanos() as u64; + self.total_time_ns + .fetch_add(elapsed_nanos, Ordering::Relaxed); + + Ok(()) + } +} diff --git a/vortex-cuda/benches/date_time_parts_cuda.rs b/vortex-cuda/benches/date_time_parts_cuda.rs index df38a563363..2f01fd8f3b8 100644 --- a/vortex-cuda/benches/date_time_parts_cuda.rs +++ b/vortex-cuda/benches/date_time_parts_cuda.rs @@ -6,34 +6,36 @@ #![allow(clippy::unwrap_used)] #![allow(clippy::cast_possible_truncation)] +mod common; + use std::mem::size_of; +use std::sync::Arc; +use std::sync::atomic::Ordering; use std::time::Duration; use criterion::BenchmarkId; use criterion::Criterion; use criterion::Throughput; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_BLOCKING_SYNC; use futures::executor::block_on; use vortex_array::IntoArray; -use vortex_array::ToCanonical; use vortex_array::arrays::ConstantArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity; use vortex_buffer::Buffer; -use vortex_cuda::CudaBufferExt; -use vortex_cuda::CudaExecutionCtx; use vortex_cuda::CudaSession; +use vortex_cuda::executor::CudaArrayExt; use vortex_cuda_macros::cuda_available; use vortex_cuda_macros::cuda_not_available; use vortex_datetime_parts::DateTimePartsArray; use vortex_dtype::DType; use vortex_dtype::Nullability; -use vortex_dtype::PType; use vortex_dtype::datetime::TimeUnit; use vortex_dtype::datetime::Timestamp; use vortex_error::VortexExpect; use vortex_session::VortexSession; +use crate::common::TimedLaunchStrategy; + fn make_datetimeparts_array(len: usize, time_unit: TimeUnit) -> DateTimePartsArray { let days: Vec = (0..len).map(|i| (i / 1000) as i16).collect(); let days_arr = PrimitiveArray::new(Buffer::from(days), Validity::NonNullable).into_array(); @@ -46,80 +48,6 @@ fn make_datetimeparts_array(len: usize, time_unit: TimeUnit) -> DateTimePartsArr .vortex_expect("Failed to create DateTimePartsArray") } -/// Launches DateTimeParts decode kernel and returns elapsed GPU time. -fn launch_datetimeparts_kernel_timed( - dtp_array: &DateTimePartsArray, - time_unit: TimeUnit, - cuda_ctx: &mut CudaExecutionCtx, -) -> vortex_error::VortexResult { - let days_prim = dtp_array.days().to_primitive(); - - // TODO(0ax1): figure out how to represent constant array in CUDA kernels - let seconds_prim = dtp_array.seconds().to_primitive(); - let subseconds_prim = dtp_array.subseconds().to_primitive(); - - let output_len = dtp_array.len(); - - let divisor: i64 = match time_unit { - TimeUnit::Nanoseconds => 1_000_000_000, - TimeUnit::Microseconds => 1_000_000, - TimeUnit::Milliseconds => 1_000, - TimeUnit::Seconds => 1, - TimeUnit::Days => unreachable!("Days not supported for DateTimeParts"), - }; - - let days_device = block_on( - cuda_ctx - .copy_to_device(days_prim.as_slice::().to_vec()) - .unwrap(), - ) - .vortex_expect("failed to copy days to device"); - - let seconds_device = block_on( - cuda_ctx - .copy_to_device(seconds_prim.as_slice::().to_vec()) - .unwrap(), - ) - .vortex_expect("failed to copy seconds to device"); - - let subseconds_device = block_on( - cuda_ctx - .copy_to_device(subseconds_prim.as_slice::().to_vec()) - .unwrap(), - ) - .vortex_expect("failed to copy subseconds to device"); - - // Allocate output buffer - let output_device = block_on(cuda_ctx.copy_to_device(vec![0i64; output_len]).unwrap()) - .vortex_expect("failed to allocate output buffer"); - - let days_view = days_device - .cuda_view::() - .vortex_expect("failed to get days view"); - let seconds_view = seconds_device - .cuda_view::() - .vortex_expect("failed to get seconds view"); - let subseconds_view = subseconds_device - .cuda_view::() - .vortex_expect("failed to get subseconds view"); - let output_view = output_device - .cuda_view::() - .vortex_expect("failed to get output view"); - - let array_len_u64 = output_len as u64; - - let events = vortex_cuda::launch_cuda_kernel!( - execution_ctx: cuda_ctx, - module: "date_time_parts", - ptypes: &[PType::I16, PType::I8, PType::I8], - launch_args: [days_view, seconds_view, subseconds_view, divisor, output_view, array_len_u64], - event_recording: CU_EVENT_BLOCKING_SYNC, - array_len: output_len - ); - - events.duration() -} - fn benchmark_datetimeparts(c: &mut Criterion) { let mut group = c.benchmark_group("datetimeparts_cuda"); group.sample_size(10); @@ -139,19 +67,19 @@ fn benchmark_datetimeparts(c: &mut Criterion) { &dtp_array, |b, dtp_array| { b.iter_custom(|iters| { - let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) - .vortex_expect("failed to create execution context"); + let timed = TimedLaunchStrategy::default(); + let timer = Arc::clone(&timed.total_time_ns); - let mut total_time = Duration::ZERO; + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context") + .with_launch_strategy(Arc::new(timed)); for _ in 0..iters { - let kernel_time = - launch_datetimeparts_kernel_timed(dtp_array, time_unit, &mut cuda_ctx) - .vortex_expect("kernel launch failed"); - total_time += kernel_time; + // block on immediately here + block_on(dtp_array.to_array().execute_cuda(&mut cuda_ctx)).unwrap(); } - total_time + Duration::from_nanos(timer.load(Ordering::Relaxed)) }); }, ); diff --git a/vortex-cuda/benches/dict_cuda.rs b/vortex-cuda/benches/dict_cuda.rs index c555d799a30..f71867dcd5b 100644 --- a/vortex-cuda/benches/dict_cuda.rs +++ b/vortex-cuda/benches/dict_cuda.rs @@ -6,30 +6,33 @@ #![allow(clippy::unwrap_used)] #![allow(clippy::cast_possible_truncation)] +mod common; + use std::mem::size_of; +use std::sync::Arc; +use std::sync::atomic::Ordering; use std::time::Duration; use criterion::BenchmarkId; use criterion::Criterion; use criterion::Throughput; use cudarc::driver::DeviceRepr; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_BLOCKING_SYNC; use futures::executor::block_on; use vortex_array::IntoArray; use vortex_array::arrays::DictArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity::NonNullable; use vortex_buffer::Buffer; -use vortex_cuda::CudaBufferExt; -use vortex_cuda::CudaDeviceBuffer; -use vortex_cuda::CudaExecutionCtx; use vortex_cuda::CudaSession; +use vortex_cuda::executor::CudaArrayExt; use vortex_cuda_macros::cuda_available; use vortex_cuda_macros::cuda_not_available; use vortex_dtype::NativePType; use vortex_error::VortexExpect; use vortex_session::VortexSession; +use crate::common::TimedLaunchStrategy; + const BENCH_ARGS: &[(usize, &str)] = &[(10_000_000, "10M")]; /// Configuration for a dictionary benchmark specifying value and code types along with dictionary size. @@ -40,7 +43,7 @@ struct DictBenchConfig { } /// Creates a Dict array with parameterized value type V and code type C. -fn make_dict_array_typed(len: usize, dict_size: usize) -> (DictArray, Vec, Vec) +fn make_dict_array_typed(len: usize, dict_size: usize) -> DictArray where V: NativePType + From, C: NativePType + TryFrom, @@ -50,62 +53,16 @@ where let values: Vec = (0..dict_size) .map(|i| >::from((i * 1000) as u32)) .collect(); - let values_array = PrimitiveArray::new(Buffer::from(values.clone()), NonNullable); + let values_array = PrimitiveArray::new(Buffer::from(values), NonNullable); // Codes cycling through all dictionary values let codes: Vec = (0..len) .map(|i| C::try_from(i % dict_size).unwrap()) .collect(); - let codes_array = PrimitiveArray::new(Buffer::from(codes.clone()), NonNullable); - - let dict_array = DictArray::try_new(codes_array.into_array(), values_array.into_array()) - .vortex_expect("failed to create Dict array"); + let codes_array = PrimitiveArray::new(Buffer::from(codes), NonNullable); - (dict_array, values, codes) -} - -/// Launches Dict decompression kernel and returns elapsed GPU time. -fn launch_dict_kernel_timed_typed( - values: &[V], - codes: &[C], - output_len: usize, - cuda_ctx: &mut CudaExecutionCtx, -) -> vortex_error::VortexResult -where - V: NativePType + DeviceRepr, - C: NativePType + DeviceRepr, -{ - let values_device = block_on(cuda_ctx.copy_to_device(values.to_vec()).unwrap()) - .vortex_expect("failed to copy values to device"); - - let codes_device = block_on(cuda_ctx.copy_to_device(codes.to_vec()).unwrap()) - .vortex_expect("failed to copy codes to device"); - - let output_slice = cuda_ctx - .device_alloc::(output_len) - .vortex_expect("failed to allocate output"); - let output_device = CudaDeviceBuffer::new(output_slice); - - let codes_view = codes_device - .cuda_view::() - .vortex_expect("failed to get codes view"); - let values_view = values_device - .cuda_view::() - .vortex_expect("failed to get values view"); - let output_view = output_device.as_view::(); - - let codes_len_u64 = output_len as u64; - - let events = vortex_cuda::launch_cuda_kernel!( - execution_ctx: cuda_ctx, - module: "dict", - ptypes: &[V::PTYPE, C::PTYPE], - launch_args: [codes_view, codes_len_u64, values_view, output_view], - event_recording: CU_EVENT_BLOCKING_SYNC, - array_len: output_len - ); - - events.duration() + DictArray::try_new(codes_array.into_array(), values_array.into_array()) + .vortex_expect("failed to create Dict array") } /// Benchmark Dict decompression for specific value and code types. @@ -122,7 +79,7 @@ where // Throughput is based on output size (values read from dictionary) group.throughput(Throughput::Bytes((len * size_of::()) as u64)); - let (dict_array, values, codes) = make_dict_array_typed::(*len, config.dict_size); + let dict_array = make_dict_array_typed::(*len, config.dict_size); group.bench_with_input( BenchmarkId::new( @@ -132,26 +89,22 @@ where config.value_type_name, config.code_type_name ), ), - &(dict_array, values, codes), - |b, (dict_array, values, codes)| { + &dict_array, + |b, dict_array| { b.iter_custom(|iters| { - let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) - .vortex_expect("failed to create execution context"); + let timed = TimedLaunchStrategy::default(); + let timer = Arc::clone(&timed.total_time_ns); - let mut total_time = Duration::ZERO; + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context") + .with_launch_strategy(Arc::new(timed)); for _ in 0..iters { - let kernel_time = launch_dict_kernel_timed_typed::( - values, - codes, - dict_array.len(), - &mut cuda_ctx, - ) - .vortex_expect("kernel launch failed"); - total_time += kernel_time; + block_on(dict_array.to_array().execute_cuda(&mut cuda_ctx)) + .vortex_expect("execute"); } - total_time + Duration::from_nanos(timer.load(Ordering::Relaxed)) }); }, ); diff --git a/vortex-cuda/benches/for_cuda.rs b/vortex-cuda/benches/for_cuda.rs index dcd76d9ea11..6de94eae8c7 100644 --- a/vortex-cuda/benches/for_cuda.rs +++ b/vortex-cuda/benches/for_cuda.rs @@ -6,118 +6,134 @@ #![allow(clippy::unwrap_used)] #![allow(clippy::cast_possible_truncation)] +mod common; + use std::mem::size_of; use std::ops::Add; +use std::sync::Arc; +use std::sync::atomic::Ordering; use std::time::Duration; use criterion::BenchmarkId; use criterion::Criterion; use criterion::Throughput; use cudarc::driver::DeviceRepr; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_BLOCKING_SYNC; use futures::executor::block_on; use vortex_array::IntoArray; -use vortex_array::ToCanonical; use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity; use vortex_buffer::Buffer; -use vortex_cuda::CudaBufferExt; -use vortex_cuda::CudaExecutionCtx; use vortex_cuda::CudaSession; +use vortex_cuda::executor::CudaArrayExt; use vortex_cuda_macros::cuda_available; use vortex_cuda_macros::cuda_not_available; use vortex_dtype::NativePType; +use vortex_dtype::PType; use vortex_error::VortexExpect; +use vortex_fastlanes::BitPackedArray; use vortex_fastlanes::FoRArray; use vortex_scalar::Scalar; use vortex_session::VortexSession; +use crate::common::TimedLaunchStrategy; + const BENCH_ARGS: &[(usize, &str)] = &[(10_000_000, "10M")]; const REFERENCE_VALUE: u8 = 10; /// Creates a FoR array with the specified type and length. -fn make_for_array_typed(len: usize) -> FoRArray +fn make_for_array_typed(len: usize, bp: bool) -> FoRArray where T: NativePType + From + Add, Scalar: From, { let reference = >::from(REFERENCE_VALUE); let data: Vec = (0..len) - .map(|i| >::from((i % 256) as u8) + reference) + .map(|i| >::from((i % 256) as u8)) .collect(); let primitive_array = PrimitiveArray::new(Buffer::from(data), Validity::NonNullable).into_array(); - FoRArray::try_new(primitive_array, reference.into()).vortex_expect("failed to create FoR array") + if bp && T::PTYPE != PType::U8 { + let child = + BitPackedArray::encode(primitive_array.as_ref(), 8).vortex_expect("failed to bitpack"); + FoRArray::try_new(child.into_array(), reference.into()) + .vortex_expect("failed to create FoR array") + } else { + FoRArray::try_new(primitive_array, reference.into()) + .vortex_expect("failed to create FoR array") + } } -/// Launches FoR decompression kernel and returns elapsed GPU time. -fn launch_for_kernel_timed_typed( - for_array: &FoRArray, - cuda_ctx: &mut CudaExecutionCtx, -) -> vortex_error::VortexResult +/// Benchmark FoR decompression for a specific type. +fn benchmark_for_typed(c: &mut Criterion, type_name: &str) where - T: NativePType + DeviceRepr + From, + T: NativePType + DeviceRepr + From + Add, + Scalar: From, { - let encoded = for_array.encoded(); - let unpacked_array = encoded.to_primitive(); - let unpacked_slice = unpacked_array.as_slice::(); + let mut group = c.benchmark_group("for_cuda"); + group.sample_size(10); - let device_data = block_on(cuda_ctx.copy_to_device(unpacked_slice.to_vec()).unwrap()) - .vortex_expect("failed to copy to device"); + for &(len, len_str) in BENCH_ARGS { + group.throughput(Throughput::Bytes((len * size_of::()) as u64)); - let reference = >::from(REFERENCE_VALUE); - let array_len_u64 = for_array.len() as u64; - - let device_view = device_data - .cuda_view::() - .vortex_expect("failed to get device view"); - - let events = vortex_cuda::launch_cuda_kernel!( - execution_ctx: cuda_ctx, - module: "for", - ptypes: &[for_array.ptype()], - launch_args: [device_view, reference, array_len_u64], - event_recording: CU_EVENT_BLOCKING_SYNC, - array_len: for_array.len() - ); - - events.duration() + let for_array = make_for_array_typed::(len, false); + + group.bench_with_input( + BenchmarkId::new("for", format!("{len_str}_{type_name}")), + &for_array, + |b, for_array| { + b.iter_custom(|iters| { + let timed = TimedLaunchStrategy::default(); + let timer = Arc::clone(&timed.total_time_ns); + + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context") + .with_launch_strategy(Arc::new(timed)); + + for _ in 0..iters { + block_on(for_array.to_array().execute_cuda(&mut cuda_ctx)).unwrap(); + } + + Duration::from_nanos(timer.load(Ordering::Relaxed)) + }); + }, + ); + } + + group.finish(); } -/// Benchmark FoR decompression for a specific type. -fn benchmark_for_typed(c: &mut Criterion, type_name: &str) +fn benchmark_ffor_typed(c: &mut Criterion, type_name: &str) where T: NativePType + DeviceRepr + From + Add, Scalar: From, { - let mut group = c.benchmark_group("for_cuda"); + let mut group = c.benchmark_group("ffor_cuda"); group.sample_size(10); - for (len, len_str) in BENCH_ARGS { + for &(len, len_str) in BENCH_ARGS { group.throughput(Throughput::Bytes((len * size_of::()) as u64)); - let for_array = make_for_array_typed::(*len); + let for_array = make_for_array_typed::(len, true); group.bench_with_input( BenchmarkId::new("for", format!("{len_str}_{type_name}")), &for_array, |b, for_array| { b.iter_custom(|iters| { - let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) - .vortex_expect("failed to create execution context"); + let timed = TimedLaunchStrategy::default(); + let timer = Arc::clone(&timed.total_time_ns); - let mut total_time = Duration::ZERO; + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context") + .with_launch_strategy(Arc::new(timed)); for _ in 0..iters { - let kernel_time = - launch_for_kernel_timed_typed::(for_array, &mut cuda_ctx) - .vortex_expect("kernel launch failed"); - total_time += kernel_time; + block_on(for_array.to_array().execute_cuda(&mut cuda_ctx)).unwrap(); } - total_time + Duration::from_nanos(timer.load(Ordering::Relaxed)) }); }, ); @@ -134,7 +150,15 @@ fn benchmark_for(c: &mut Criterion) { benchmark_for_typed::(c, "u64"); } -criterion::criterion_group!(benches, benchmark_for); +/// Benchmark FOR+BP decompression for all types. +fn benchmark_ffor(c: &mut Criterion) { + benchmark_ffor_typed::(c, "u8"); + benchmark_ffor_typed::(c, "u16"); + benchmark_ffor_typed::(c, "u32"); + benchmark_ffor_typed::(c, "u64"); +} + +criterion::criterion_group!(benches, benchmark_for, benchmark_ffor); #[cuda_available] criterion::criterion_main!(benches); diff --git a/vortex-cuda/benches/runend_cuda.rs b/vortex-cuda/benches/runend_cuda.rs index cb7b1effbb5..633d44a3d88 100644 --- a/vortex-cuda/benches/runend_cuda.rs +++ b/vortex-cuda/benches/runend_cuda.rs @@ -6,31 +6,32 @@ #![allow(clippy::unwrap_used)] #![allow(clippy::cast_possible_truncation)] +mod common; + use std::mem::size_of; +use std::sync::Arc; +use std::sync::atomic::Ordering; use std::time::Duration; use criterion::BenchmarkId; use criterion::Criterion; use criterion::Throughput; use cudarc::driver::DeviceRepr; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_BLOCKING_SYNC; use futures::executor::block_on; use vortex_array::IntoArray; -use vortex_array::ToCanonical; use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity; use vortex_buffer::Buffer; -use vortex_cuda::CudaBufferExt; -use vortex_cuda::CudaExecutionCtx; use vortex_cuda::CudaSession; +use vortex_cuda::executor::CudaArrayExt; use vortex_cuda_macros::cuda_available; use vortex_cuda_macros::cuda_not_available; use vortex_dtype::NativePType; -use vortex_dtype::PType; -use vortex_error::VortexExpect; use vortex_runend::RunEndArray; use vortex_session::VortexSession; +use crate::common::TimedLaunchStrategy; + /// Creates a run-end encoded array with the specified output length and average run length. fn make_runend_array_typed(output_len: usize, avg_run_len: usize) -> RunEndArray where @@ -56,64 +57,6 @@ where RunEndArray::new(ends_array, values_array) } -/// Launches runend decode kernel and returns elapsed GPU time. -fn launch_runend_kernel_timed_typed( - runend_array: &RunEndArray, - cuda_ctx: &mut CudaExecutionCtx, -) -> vortex_error::VortexResult -where - T: NativePType + DeviceRepr, -{ - let ends_prim = runend_array.ends().to_primitive(); - let values_prim = runend_array.values().to_primitive(); - - let output_len = runend_array.len(); - let num_runs = ends_prim.len(); - let offset = runend_array.offset(); - - let ends_device = block_on( - cuda_ctx - .copy_to_device(ends_prim.as_slice::().to_vec()) - .unwrap(), - ) - .vortex_expect("failed to copy ends to device"); - - let values_device = block_on( - cuda_ctx - .copy_to_device(values_prim.as_slice::().to_vec()) - .unwrap(), - ) - .vortex_expect("failed to copy values to device"); - - let output_device = block_on( - cuda_ctx - .copy_to_device(vec![T::default(); output_len]) - .unwrap(), - ) - .vortex_expect("failed to allocate output buffer"); - - let ends_view = ends_device - .cuda_view::() - .vortex_expect("failed to get ends view"); - let values_view = values_device - .cuda_view::() - .vortex_expect("failed to get values view"); - let output_view = output_device - .cuda_view::() - .vortex_expect("failed to get output view"); - - let events = vortex_cuda::launch_cuda_kernel!( - execution_ctx: cuda_ctx, - module: "runend", - ptypes: &[T::PTYPE, PType::U64], - launch_args: [ends_view, num_runs, values_view, offset, output_len, output_view], - event_recording: CU_EVENT_BLOCKING_SYNC, - array_len: output_len - ); - - events.duration() -} - /// Benchmark run-end decoding for a specific type with varying run lengths fn benchmark_runend_typed(c: &mut Criterion, type_name: &str) where @@ -137,20 +80,19 @@ where &runend_array, |b, runend_array| { b.iter_custom(|iters| { + let timed = TimedLaunchStrategy::default(); + let timer = Arc::clone(&timed.total_time_ns); + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) - .vortex_expect("failed to create execution context"); - - let mut total_time = Duration::ZERO; + .unwrap() + .with_launch_strategy(Arc::new(timed)); for _ in 0..iters { - let kernel_time = - launch_runend_kernel_timed_typed::(runend_array, &mut cuda_ctx) - .vortex_expect("kernel launch failed"); - total_time += kernel_time; + block_on(runend_array.to_array().execute_cuda(&mut cuda_ctx)).unwrap(); } - total_time + Duration::from_nanos(timer.load(Ordering::Relaxed)) }); }, ); diff --git a/vortex-cuda/gpu-scan-cli/Cargo.toml b/vortex-cuda/gpu-scan-cli/Cargo.toml new file mode 100644 index 00000000000..0acd85438da --- /dev/null +++ b/vortex-cuda/gpu-scan-cli/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "gpu-scan-cli" +authors = { workspace = true } +description = "CUDA scan testing" +edition = { workspace = true } +homepage = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +publish = false +repository = { workspace = true } +rust-version = { workspace = true } +version = { workspace = true } + +[lints] +workspace = true + +[dependencies] +futures = { workspace = true, features = ["executor"] } +tokio = { workspace = true, features = ["macros", "full"] } +tracing = { workspace = true, features = ["std", "attributes"] } +tracing-subscriber = { workspace = true, features = ["env-filter", "json"] } +vortex = { workspace = true } +vortex-cuda = { workspace = true, features = ["_test-harness"] } +vortex-cuda-macros = { workspace = true } diff --git a/vortex-cuda/gpu-scan-cli/README.md b/vortex-cuda/gpu-scan-cli/README.md new file mode 100644 index 00000000000..90210dc3ed8 --- /dev/null +++ b/vortex-cuda/gpu-scan-cli/README.md @@ -0,0 +1,19 @@ +# gpu-scan-cli + +A CLI tool for benchmarking CUDA-accelerated scans of Vortex files. + +## What it does + +1. Reads a Vortex file from disk +2. Recompresses it using only GPU-compatible encodings +3. Executes a full scan on the GPU via CUDA +4. Outputs tracing information about kernel execution times + +## Usage + +```bash +FLAT_LAYOUT_INLINE_ARRAY_NODE=true RUST_LOG=vortex_cuda=trace,info \ + cargo run --release --bin gpu-scan-cli -- ./path/to/file.vortex +``` + +Use `--json` for JSON-formatted trace output. diff --git a/vortex-cuda/gpu-scan-cli/src/main.rs b/vortex-cuda/gpu-scan-cli/src/main.rs new file mode 100644 index 00000000000..87bbbe69f46 --- /dev/null +++ b/vortex-cuda/gpu-scan-cli/src/main.rs @@ -0,0 +1,176 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow(unused_imports)] + +use std::env::args; +use std::path::PathBuf; +use std::sync::Arc; + +use futures::StreamExt; +use tracing::Instrument; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::fmt::format::FmtSpan; +use vortex::VortexSessionDefault; +use vortex::array::ToCanonical; +use vortex::array::arrays::DictVTable; +use vortex::error::VortexResult; +use vortex::file::OpenOptionsSessionExt; +use vortex::session::VortexSession; +use vortex_cuda::CopyDeviceReadAt; +use vortex_cuda::CudaSession; +use vortex_cuda::TracingLaunchStrategy; +use vortex_cuda::VortexCudaStreamPool; +use vortex_cuda::executor::CudaArrayExt; +use vortex_cuda_macros::cuda_available; +use vortex_cuda_macros::cuda_not_available; + +#[cuda_not_available] +fn main() {} + +#[cuda_available] +#[tokio::main] +async fn main() -> VortexResult<()> { + let args: Vec = args().collect(); + let json_output = args.iter().any(|arg| arg == "--json"); + + if json_output { + tracing_subscriber::fmt() + .json() + .with_env_filter(EnvFilter::from_default_env()) + .with_span_events(FmtSpan::NONE) + .with_ansi(false) + .init(); + } else { + tracing_subscriber::fmt() + .pretty() + .with_env_filter(EnvFilter::from_default_env()) + .with_span_events(FmtSpan::NONE) + .with_ansi(false) + .event_format(tracing_subscriber::fmt::format().with_target(true)) + .init(); + } + + let session = VortexSession::default(); + let mut cuda_ctx = CudaSession::create_execution_ctx(&session)? + .with_launch_strategy(Arc::new(TracingLaunchStrategy)); + + #[allow(clippy::expect_used, clippy::unwrap_in_result)] + let input_path = args + .iter() + .skip(1) + .find(|arg| !arg.starts_with("--")) + .expect("must provide path to .vortex file"); + let input_path = PathBuf::from(input_path); + + assert!(input_path.exists(), "input path does not exist"); + + let (recompressed, footer) = recompress_for_gpu(input_path, &session).await?; + + // Create a full scan that executes on the GPU + let cuda_stream = + VortexCudaStreamPool::new(Arc::clone(cuda_ctx.stream().context()), 1).get_stream()?; + let gpu_reader = CopyDeviceReadAt::new(recompressed, cuda_stream); + + let gpu_file = session + .open_options() + .with_footer(footer) + .open(Arc::new(gpu_reader)) + .await?; + + // execute_micros => µs to execute + let mut batches = gpu_file.scan()?.into_array_stream()?; + + let mut chunk = 0; + while let Some(next) = batches.next().await.transpose()? { + let record = next.to_struct(); + + for (field, field_name) in record + .unmasked_fields() + .iter() + .zip(record.struct_fields().names().iter()) + { + let field_name = field_name.to_string(); + // skip dict, varbin isn't properly implemented. + if field.is::() { + continue; + } + + let span = + tracing::info_span!("array execution", chunk = chunk, field_name = field_name); + + async { + if field.clone().execute_cuda(&mut cuda_ctx).await.is_err() { + tracing::error!("failed to execute_cuda on column"); + } + } + .instrument(span) + .await; + } + + chunk += 1; + } + + Ok(()) +} + +// Dump the values out as a new Vortex file for analysis. + +/// Recompress the input file using only GPU-executable encodings, returning the file as an +/// in-memory byte array. +#[cuda_available] +async fn recompress_for_gpu( + input_path: impl AsRef, + session: &VortexSession, +) -> VortexResult<(ByteBuffer, Footer)> { + // Setup the reader + let input = session.open_options().open_path(input_path).await?; + + // Build a scan to read all columns from the input, and recompress them using only GPU-compatible + // encodings. + let scan = input.scan()?.into_array_stream()?; + + // Rebuild a copy of the file that only uses GPU-compatible compression algorithms. + let compressor = BtrBlocksCompressorBuilder::empty() + .include_int([ + IntCode::Uncompressed, + IntCode::Constant, + IntCode::BitPacking, + IntCode::For, + IntCode::Sequence, + IntCode::ZigZag, + IntCode::Dict, + ]) + .include_float([ + FloatCode::Uncompressed, + FloatCode::Constant, + FloatCode::Alp, + FloatCode::AlpRd, + FloatCode::RunEnd, + ]) + // Don't compress strings, this is b/c we don't have any BtrBlocks encodings that support + // strings. + .include_string([ + StringCode::Uncompressed, + StringCode::Constant, + StringCode::Dict, + StringCode::Zstd, + StringCode::ZstdBuffers, + ]) + .build(); + + // Read an input stream from a Vortex file. + let writer = WriteStrategyBuilder::default() + .with_compressor(compressor) + .build(); + + // Segment sink? + let mut out = ByteBufferMut::empty(); + let result = session + .write_options() + .with_strategy(writer) + .write(&mut out, scan) + .await?; + + Ok((out.freeze(), result.footer().clone())) +} diff --git a/vortex-cuda/src/executor.rs b/vortex-cuda/src/executor.rs index 31f66f3ecb8..42475945fc3 100644 --- a/vortex-cuda/src/executor.rs +++ b/vortex-cuda/src/executor.rs @@ -12,7 +12,10 @@ use cudarc::driver::CudaSlice; use cudarc::driver::CudaStream; use cudarc::driver::DeviceRepr; use cudarc::driver::LaunchArgs; +use cudarc::driver::LaunchConfig; use futures::future::BoxFuture; +use tracing::debug; +use tracing::trace; use vortex_array::Array; use vortex_array::ArrayRef; use vortex_array::Canonical; @@ -28,6 +31,11 @@ use vortex_error::vortex_err; use crate::CudaSession; use crate::ExportDeviceArray; +use crate::kernel::DefaultLaunchStrategy; +use crate::kernel::LaunchStrategy; +use crate::kernel::LaunchStrategyExt; +use crate::kernel::launch_cuda_kernel_impl; +use crate::kernel::launch_cuda_kernel_with_config; use crate::session::CudaSessionExt; use crate::stream::VortexCudaStream; @@ -57,6 +65,7 @@ pub struct CudaExecutionCtx { stream: VortexCudaStream, ctx: ExecutionCtx, cuda_session: CudaSession, + strategy: Arc, } impl CudaExecutionCtx { @@ -67,9 +76,83 @@ impl CudaExecutionCtx { stream, ctx, cuda_session, + strategy: Arc::new(DefaultLaunchStrategy), } } + /// Set the launch strategy for the execution context. + /// + /// This can only be set on setup (an "owned" context) and not from within + /// a kernel execution. + pub fn with_launch_strategy(mut self, launch_strategy: Arc) -> Self { + self.strategy = launch_strategy; + self + } + + /// Perform an external kernel launch, with events created and logged via the configured + /// [`LaunchStrategy`]. + /// + /// We use CUB and NVCOMP routines, and those don't match the normal `cudarc` entrypoints, so + /// to inject the configured launch strategy we need to bracket it ourselves. + pub fn launch_external VortexResult<()>>( + &self, + len: usize, + function: F, + ) -> VortexResult<()> { + self.strategy + .as_ref() + .with_strategy(&self.stream.0, len, function) + } + + /// Launch a Kernel function with args setup done by the provided `build_args` closure. + /// + /// Kernels launched this way will use the default launch configuration, which provides no + /// shared memory bytes, and uses grid parameters based on the ideal thread block size for + /// the given `len`. + pub fn launch_kernel<'a, F>( + &'a mut self, + function: &'a CudaFunction, + len: usize, + build_args: F, + ) -> VortexResult<()> + where + F: FnOnce(&mut LaunchArgs<'a>), + { + let mut launcher = self.launch_builder(function); + build_args(&mut launcher); + + let events = launch_cuda_kernel_impl(&mut launcher, self.strategy.event_flags(), len)?; + self.strategy.on_complete(&events, len)?; + + drop(events); + + Ok(()) + } + + /// Launch a function with args provided by the `build_args` closure, with an explicit + /// [`LaunchConfig`], for kernels which need specific grid and shared memory configuration. + pub fn launch_kernel_config<'a, F>( + &'a mut self, + function: &'a CudaFunction, + cfg: LaunchConfig, + len: usize, + build_args: F, + ) -> VortexResult<()> + where + F: FnOnce(&mut LaunchArgs<'a>), + { + let mut launcher = self.launch_builder(function); + build_args(&mut launcher); + + let events = + launch_cuda_kernel_with_config(&mut launcher, cfg, self.strategy.event_flags())?; + self.strategy.on_complete(&events, len)?; + + drop(events); + + Ok(()) + } + /// Loads a CUDA kernel function by module name and ptype(s). /// /// # Arguments @@ -235,18 +318,19 @@ impl CudaArrayExt for ArrayRef { } if self.is_canonical() || self.is_empty() { + trace!(encoding = ?self.encoding_id(), "skipping canonical"); return self.execute(&mut ctx.ctx); } let Some(support) = ctx.cuda_session.kernel(&self.encoding_id()) else { - tracing::debug!( + debug!( encoding = %self.encoding_id(), "No CUDA support registered for encoding, falling back to CPU execution" ); return self.execute(&mut ctx.ctx); }; - tracing::debug!( + debug!( encoding = %self.encoding_id(), "Executing array on CUDA device" ); diff --git a/vortex-cuda/src/kernel/arrays/constant.rs b/vortex-cuda/src/kernel/arrays/constant.rs index f38784e3af7..9c6a43525d6 100644 --- a/vortex-cuda/src/kernel/arrays/constant.rs +++ b/vortex-cuda/src/kernel/arrays/constant.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use async_trait::async_trait; use cudarc::driver::DeviceRepr; use cudarc::driver::PushKernelArg; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::ConstantArray; @@ -31,14 +31,13 @@ use vortex_error::vortex_err; use crate::CudaDeviceBuffer; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; -use crate::launch_cuda_kernel_impl; /// CUDA executor for constant arrays with numeric types. /// /// Materializes a constant array by filling a device buffer with the scalar value. /// Supports primitive types (integers, floats) and decimal types (i128, i256). #[derive(Debug)] -pub struct ConstantNumericExecutor; +pub(crate) struct ConstantNumericExecutor; impl ConstantNumericExecutor { fn try_specialize(array: ArrayRef) -> Option { @@ -48,6 +47,7 @@ impl ConstantNumericExecutor { #[async_trait] impl CudaExecute for ConstantNumericExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -114,16 +114,12 @@ where // Load kernel function let kernel_ptypes = [P::PTYPE]; let cuda_function = ctx.load_function_ptype("constant_numeric", &kernel_ptypes)?; - let mut launch_builder = ctx.launch_builder(&cuda_function); - // Build launch args: output, value, length - launch_builder.arg(&output_view); - launch_builder.arg(&value); - launch_builder.arg(&array_len_u64); - - // Launch kernel - let _cuda_events = - launch_cuda_kernel_impl(&mut launch_builder, CU_EVENT_DISABLE_TIMING, array_len)?; + ctx.launch_kernel(&cuda_function, array_len, |args| { + args.arg(&output_view); + args.arg(&value); + args.arg(&array_len_u64); + })?; // Wrap the CudaSlice in a CudaDeviceBuffer and then BufferHandle let device_buffer = CudaDeviceBuffer::new(output_buffer); @@ -174,16 +170,12 @@ where // Load kernel function let cuda_function = ctx.load_function("constant_numeric", &[&D::DECIMAL_TYPE.to_string()])?; - let mut launch_builder = ctx.launch_builder(&cuda_function); - - // Build launch args: output, value, length - launch_builder.arg(&output_view); - launch_builder.arg(&value); - launch_builder.arg(&array_len_u64); - // Launch kernel - let _cuda_events = - launch_cuda_kernel_impl(&mut launch_builder, CU_EVENT_DISABLE_TIMING, array_len)?; + ctx.launch_kernel(&cuda_function, array_len, |args| { + args.arg(&output_view); + args.arg(&value); + args.arg(&array_len_u64); + })?; // Wrap the CudaSlice in a CudaDeviceBuffer and then BufferHandle let device_buffer = CudaDeviceBuffer::new(output_buffer); diff --git a/vortex-cuda/src/kernel/arrays/dict.rs b/vortex-cuda/src/kernel/arrays/dict.rs index 4a9f2e07a8b..26ecb17efcc 100644 --- a/vortex-cuda/src/kernel/arrays/dict.rs +++ b/vortex-cuda/src/kernel/arrays/dict.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use async_trait::async_trait; use cudarc::driver::DeviceRepr; use cudarc::driver::PushKernelArg; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::DecimalArray; @@ -34,14 +35,14 @@ use crate::CudaDeviceBuffer; use crate::executor::CudaArrayExt; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; -use crate::launch_cuda_kernel_impl; /// CUDA executor for dictionary-encoded arrays. #[derive(Debug)] -pub struct DictExecutor; +pub(crate) struct DictExecutor; #[async_trait] impl CudaExecute for DictExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -118,15 +119,14 @@ async fn execute_dict_prim_typed(); let codes_len_u64 = codes_len as u64; - // Launch the dict kernel - let _cuda_events = crate::launch_cuda_kernel!( - execution_ctx: ctx, - module: "dict", - ptypes: &[value_ptype, I::PTYPE], - launch_args: [codes_view, codes_len_u64, values_view, output_view], - event_recording: cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING, - array_len: codes_len - ); + + let kernel_function = ctx.load_function_ptype("dict", &[value_ptype, I::PTYPE])?; + ctx.launch_kernel(&kernel_function, codes_len, |args| { + args.arg(&codes_view) + .arg(&codes_len_u64) + .arg(&values_view) + .arg(&output_view); + })?; Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle( BufferHandle::new_device(Arc::new(output_device)), @@ -177,9 +177,7 @@ async fn execute_dict_decimal_typed< ) -> VortexResult { assert!(!codes.is_empty()); let codes_len = codes.len(); - if codes_len == 0 { - vortex_bail!("Cannot execute dict on empty codes array"); - } + let codes_len_u64 = codes_len as u64; let DecimalArrayParts { values: values_buffer, @@ -211,18 +209,13 @@ async fn execute_dict_decimal_typed< "dict", &[&V::DECIMAL_TYPE.to_string(), &C::PTYPE.to_string()], )?; - let mut launch_builder = ctx.launch_builder(&cuda_function); - launch_builder.arg(&codes_view); - launch_builder.arg(&codes_len); - launch_builder.arg(&values_view); - launch_builder.arg(&output_view); - - let _cuda_events = launch_cuda_kernel_impl( - &mut launch_builder, - cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING, - codes_len, - )?; + ctx.launch_kernel(&cuda_function, codes_len, |args| { + args.arg(&codes_view) + .arg(&codes_len_u64) + .arg(&values_view) + .arg(&output_view); + })?; Ok(Canonical::Decimal(DecimalArray::new_handle( BufferHandle::new_device(Arc::new(output_device)), @@ -283,19 +276,15 @@ async fn execute_dict_varbinview( let codes_ptype_str = C::PTYPE.to_string(); let cuda_function = ctx.load_function("dict", &["i128", &codes_ptype_str])?; - let mut launch_builder = ctx.launch_builder(&cuda_function); let codes_len_u64 = codes_len as u64; - launch_builder.arg(&codes_view); - launch_builder.arg(&codes_len_u64); - launch_builder.arg(&values_view); - launch_builder.arg(&output_view); - - let _cuda_events = launch_cuda_kernel_impl( - &mut launch_builder, - cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING, - codes_len, - )?; + + ctx.launch_kernel(&cuda_function, codes_len, |args| { + args.arg(&codes_view); + args.arg(&codes_len_u64); + args.arg(&values_view); + args.arg(&output_view); + })?; }); // Output views gathered by the kernel share the values' data buffers. diff --git a/vortex-cuda/src/kernel/arrays/mod.rs b/vortex-cuda/src/kernel/arrays/mod.rs index dc3a9a80fba..ab81934bb27 100644 --- a/vortex-cuda/src/kernel/arrays/mod.rs +++ b/vortex-cuda/src/kernel/arrays/mod.rs @@ -5,6 +5,6 @@ mod constant; mod dict; mod shared; -pub use constant::ConstantNumericExecutor; -pub use dict::DictExecutor; -pub use shared::SharedExecutor; +pub(crate) use constant::ConstantNumericExecutor; +pub(crate) use dict::DictExecutor; +pub(crate) use shared::SharedExecutor; diff --git a/vortex-cuda/src/kernel/arrays/shared.rs b/vortex-cuda/src/kernel/arrays/shared.rs index 8bd4e0e3f98..914f5edbd32 100644 --- a/vortex-cuda/src/kernel/arrays/shared.rs +++ b/vortex-cuda/src/kernel/arrays/shared.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use async_trait::async_trait; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::SharedVTable; @@ -14,10 +15,11 @@ use crate::executor::CudaExecutionCtx; /// CUDA executor for SharedArray. #[derive(Debug)] -pub struct SharedExecutor; +pub(crate) struct SharedExecutor; #[async_trait] impl CudaExecute for SharedExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, diff --git a/vortex-cuda/src/kernel/encodings/alp.rs b/vortex-cuda/src/kernel/encodings/alp.rs index bf5753a47cb..65192f7445d 100644 --- a/vortex-cuda/src/kernel/encodings/alp.rs +++ b/vortex-cuda/src/kernel/encodings/alp.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use async_trait::async_trait; use cudarc::driver::DeviceRepr; use cudarc::driver::PushKernelArg; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING; +use tracing::instrument; use vortex_alp::ALPArray; use vortex_alp::ALPFloat; use vortex_alp::ALPVTable; @@ -31,14 +31,14 @@ use crate::executor::CudaArrayExt; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; use crate::kernel::patches::execute_patches; -use crate::launch_cuda_kernel_impl; /// CUDA decoder for ALP (Adaptive Lossless floating-Point) decompression. #[derive(Debug)] -pub struct ALPExecutor; +pub(crate) struct ALPExecutor; #[async_trait] impl CudaExecute for ALPExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -87,20 +87,14 @@ where // Load kernel function let kernel_ptypes = [A::ALPInt::PTYPE, A::PTYPE]; let cuda_function = ctx.load_function_ptype("alp", &kernel_ptypes)?; - { - let mut launch_builder = ctx.launch_builder(&cuda_function); - - // Build launch args: input, output, f, e, length - launch_builder.arg(&input_view); - launch_builder.arg(&output_view); - launch_builder.arg(&f); - launch_builder.arg(&e); - launch_builder.arg(&array_len_u64); - - // Launch kernel - let _cuda_events = - launch_cuda_kernel_impl(&mut launch_builder, CU_EVENT_DISABLE_TIMING, array_len)?; - } + + ctx.launch_kernel(&cuda_function, array_len, |args| { + args.arg(&input_view) + .arg(&output_view) + .arg(&f) + .arg(&e) + .arg(&array_len_u64); + })?; // Check if there are any patches to decode here let output_buf = if let Some(patches) = array.patches() { diff --git a/vortex-cuda/src/kernel/encodings/bitpacked.rs b/vortex-cuda/src/kernel/encodings/bitpacked.rs index 9f1d54e7209..775e9ea665b 100644 --- a/vortex-cuda/src/kernel/encodings/bitpacked.rs +++ b/vortex-cuda/src/kernel/encodings/bitpacked.rs @@ -9,7 +9,7 @@ use cudarc::driver::CudaFunction; use cudarc::driver::DeviceRepr; use cudarc::driver::LaunchConfig; use cudarc::driver::PushKernelArg; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::PrimitiveArray; @@ -32,12 +32,11 @@ use crate::CudaBufferExt; use crate::CudaDeviceBuffer; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; -use crate::kernel::launch_cuda_kernel_with_config; use crate::kernel::patches::execute_patches; /// CUDA decoder for bit-packed arrays. #[derive(Debug)] -pub struct BitPackedExecutor; +pub(crate) struct BitPackedExecutor; impl BitPackedExecutor { fn try_specialize(array: ArrayRef) -> Option { @@ -47,6 +46,7 @@ impl BitPackedExecutor { #[async_trait] impl CudaExecute for BitPackedExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -87,7 +87,7 @@ pub fn bitpacked_cuda_launch_config(output_width: usize, len: usize) -> VortexRe }) } -async fn decode_bitpacked( +pub(crate) async fn decode_bitpacked( array: BitPackedArray, ctx: &mut CudaExecutionCtx, ) -> VortexResult @@ -119,18 +119,11 @@ where let output_width = size_of::() * 8; let cuda_function = bitpacked_cuda_kernel(bit_width, output_width, ctx)?; + let config = bitpacked_cuda_launch_config(output_width, len)?; - { - let mut launch_builder = ctx.launch_builder(&cuda_function); - - launch_builder.arg(&input_view); - launch_builder.arg(&output_view); - - let config = bitpacked_cuda_launch_config(output_width, len)?; - - let _cuda_events = - launch_cuda_kernel_with_config(&mut launch_builder, config, CU_EVENT_DISABLE_TIMING)?; - } + ctx.launch_kernel_config(&cuda_function, config, len, |args| { + args.arg(&input_view).arg(&output_view); + })?; let output_handle = match patches { None => BufferHandle::new_device(output_buf.slice_typed::(offset..(offset + len))), diff --git a/vortex-cuda/src/kernel/encodings/date_time_parts.rs b/vortex-cuda/src/kernel/encodings/date_time_parts.rs index 6bad3309301..ce0589eda76 100644 --- a/vortex-cuda/src/kernel/encodings/date_time_parts.rs +++ b/vortex-cuda/src/kernel/encodings/date_time_parts.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use async_trait::async_trait; use cudarc::driver::DeviceRepr; use cudarc::driver::PushKernelArg; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::IntoArray; @@ -35,16 +35,16 @@ use crate::CudaDeviceBuffer; use crate::executor::CudaArrayExt; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; -use crate::launch_cuda_kernel_impl; /// CUDA executor for DateTimeParts arrays. /// /// Combines the days, seconds, and subseconds components into a single i64 timestamp array. #[derive(Debug)] -pub struct DateTimePartsExecutor; +pub(crate) struct DateTimePartsExecutor; #[async_trait] impl CudaExecute for DateTimePartsExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -182,18 +182,17 @@ where ]; let kernel_suffix_strs: Vec<&str> = kernel_suffixes.iter().map(|s| s.as_str()).collect(); let cuda_function = ctx.load_function("date_time_parts", &kernel_suffix_strs)?; - let mut launch_builder = ctx.launch_builder(&cuda_function); - launch_builder.arg(&days_view); - launch_builder.arg(&seconds_view); - launch_builder.arg(&subseconds_view); - launch_builder.arg(&divisor); - launch_builder.arg(&output_view); let array_len_u64 = output_len as u64; - launch_builder.arg(&array_len_u64); - let _cuda_events = - launch_cuda_kernel_impl(&mut launch_builder, CU_EVENT_DISABLE_TIMING, output_len)?; + ctx.launch_kernel(&cuda_function, output_len, |args| { + args.arg(&days_view) + .arg(&seconds_view) + .arg(&subseconds_view) + .arg(&divisor) + .arg(&output_view) + .arg(&array_len_u64); + })?; let output_buffer = BufferHandle::new_device(Arc::new(output_device)); let output_primitive = PrimitiveArray::from_buffer_handle(output_buffer, PType::I64, validity); diff --git a/vortex-cuda/src/kernel/encodings/decimal_byte_parts.rs b/vortex-cuda/src/kernel/encodings/decimal_byte_parts.rs index 57472a1eb5e..9a50d30aec7 100644 --- a/vortex-cuda/src/kernel/encodings/decimal_byte_parts.rs +++ b/vortex-cuda/src/kernel/encodings/decimal_byte_parts.rs @@ -4,6 +4,7 @@ use std::fmt::Debug; use async_trait::async_trait; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::DecimalArray; @@ -20,10 +21,11 @@ use crate::executor::CudaExecute; // See `DecimalBytePartsArray` #[derive(Debug)] -pub struct DecimalBytePartsExecutor; +pub(crate) struct DecimalBytePartsExecutor; #[async_trait] impl CudaExecute for DecimalBytePartsExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, diff --git a/vortex-cuda/src/kernel/encodings/for_.rs b/vortex-cuda/src/kernel/encodings/for_.rs index 9eab2c53de4..30001941863 100644 --- a/vortex-cuda/src/kernel/encodings/for_.rs +++ b/vortex-cuda/src/kernel/encodings/for_.rs @@ -6,7 +6,8 @@ use std::fmt::Debug; use async_trait::async_trait; use cudarc::driver::DeviceRepr; use cudarc::driver::PushKernelArg; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING; +use tracing::instrument; +use vortex_array::Array; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::PrimitiveArray; @@ -25,11 +26,10 @@ use crate::CudaBufferExt; use crate::executor::CudaArrayExt; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; -use crate::launch_cuda_kernel_impl; /// CUDA decoder for frame-of-reference. #[derive(Debug)] -pub struct FoRExecutor; +pub(crate) struct FoRExecutor; impl FoRExecutor { fn try_specialize(array: ArrayRef) -> Option { @@ -39,6 +39,7 @@ impl FoRExecutor { #[async_trait] impl CudaExecute for FoRExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -79,16 +80,10 @@ where // Load kernel function let kernel_ptypes = [P::PTYPE]; let cuda_function = ctx.load_function_ptype("for", &kernel_ptypes)?; - let mut launch_builder = ctx.launch_builder(&cuda_function); - // Build launch args: buffer, reference, length - launch_builder.arg(&cuda_view); - launch_builder.arg(&reference); - launch_builder.arg(&array_len_u64); - - // Launch kernel - let _cuda_events = - launch_cuda_kernel_impl(&mut launch_builder, CU_EVENT_DISABLE_TIMING, array_len)?; + ctx.launch_kernel(&cuda_function, array_len, |args| { + args.arg(&cuda_view).arg(&reference).arg(&array_len_u64); + })?; // Build result - in-place reuses the same buffer Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle( diff --git a/vortex-cuda/src/kernel/encodings/mod.rs b/vortex-cuda/src/kernel/encodings/mod.rs index b26ca50e1cc..62a8d9f606d 100644 --- a/vortex-cuda/src/kernel/encodings/mod.rs +++ b/vortex-cuda/src/kernel/encodings/mod.rs @@ -13,18 +13,16 @@ mod zstd; #[cfg(feature = "unstable_encodings")] mod zstd_buffers; -pub use alp::ALPExecutor; -pub use bitpacked::BitPackedExecutor; -pub use bitpacked::bitpacked_cuda_kernel; -pub use bitpacked::bitpacked_cuda_launch_config; -pub use date_time_parts::DateTimePartsExecutor; -pub use decimal_byte_parts::DecimalBytePartsExecutor; -pub use for_::FoRExecutor; -pub use runend::RunEndExecutor; -pub use sequence::SequenceExecutor; -pub use zigzag::ZigZagExecutor; -pub use zstd::ZstdExecutor; +pub(crate) use alp::ALPExecutor; +pub(crate) use bitpacked::BitPackedExecutor; +pub(crate) use date_time_parts::DateTimePartsExecutor; +pub(crate) use decimal_byte_parts::DecimalBytePartsExecutor; +pub(crate) use for_::FoRExecutor; +pub(crate) use runend::RunEndExecutor; +pub(crate) use sequence::SequenceExecutor; +pub(crate) use zigzag::ZigZagExecutor; +pub(crate) use zstd::ZstdExecutor; pub use zstd::ZstdKernelPrep; pub use zstd::zstd_kernel_prepare; #[cfg(feature = "unstable_encodings")] -pub use zstd_buffers::ZstdBuffersExecutor; +pub(crate) use zstd_buffers::ZstdBuffersExecutor; diff --git a/vortex-cuda/src/kernel/encodings/runend.rs b/vortex-cuda/src/kernel/encodings/runend.rs index 846131e3861..aed2fb226a7 100644 --- a/vortex-cuda/src/kernel/encodings/runend.rs +++ b/vortex-cuda/src/kernel/encodings/runend.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use async_trait::async_trait; use cudarc::driver::DeviceRepr; use cudarc::driver::PushKernelArg; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::ConstantArray; @@ -33,11 +33,10 @@ use crate::CudaDeviceBuffer; use crate::executor::CudaArrayExt; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; -use crate::launch_cuda_kernel_impl; /// CUDA executor for run-end encoded arrays. #[derive(Debug)] -pub struct RunEndExecutor; +pub(crate) struct RunEndExecutor; impl RunEndExecutor { fn try_specialize(array: ArrayRef) -> Option { @@ -47,6 +46,7 @@ impl RunEndExecutor { #[async_trait] impl CudaExecute for RunEndExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -131,18 +131,15 @@ async fn decode_runend_typed = kernel_ptypes.iter().map(|s| s.as_str()).collect(); let cuda_function = ctx.load_function("runend", &kernel_ptype_strs)?; - let mut launch_builder = ctx.launch_builder(&cuda_function); - - launch_builder.arg(&ends_view); - launch_builder.arg(&num_runs); - launch_builder.arg(&values_view); - launch_builder.arg(&offset); - launch_builder.arg(&output_len); - launch_builder.arg(&output_view); - - // Launch kernel - let _cuda_events = - launch_cuda_kernel_impl(&mut launch_builder, CU_EVENT_DISABLE_TIMING, output_len)?; + + ctx.launch_kernel(&cuda_function, output_len, |args| { + args.arg(&ends_view) + .arg(&num_runs) + .arg(&values_view) + .arg(&offset) + .arg(&output_len) + .arg(&output_view); + })?; let output_validity = match values_validity { Validity::NonNullable => Validity::NonNullable, diff --git a/vortex-cuda/src/kernel/encodings/sequence.rs b/vortex-cuda/src/kernel/encodings/sequence.rs index daf08bb9654..0556aea2a49 100644 --- a/vortex-cuda/src/kernel/encodings/sequence.rs +++ b/vortex-cuda/src/kernel/encodings/sequence.rs @@ -5,7 +5,8 @@ use std::sync::Arc; use async_trait::async_trait; use cudarc::driver::DeviceRepr; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING; +use cudarc::driver::PushKernelArg; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::PrimitiveArray; @@ -22,14 +23,14 @@ use vortex_sequence::SequenceVTable; use crate::CudaDeviceBuffer; use crate::CudaExecutionCtx; use crate::executor::CudaExecute; -use crate::launch_cuda_kernel; /// CUDA execution for `SequenceArray`. #[derive(Debug)] -pub struct SequenceExecutor; +pub(crate) struct SequenceExecutor; #[async_trait] impl CudaExecute for SequenceExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -67,14 +68,11 @@ async fn execute_typed( let len_u64 = len as u64; - let _events = launch_cuda_kernel!( - execution_ctx: ctx, - module: "sequence", - ptypes: &[T::PTYPE], - launch_args: [buffer, base, multiplier, len_u64], - event_recording: CU_EVENT_DISABLE_TIMING, - array_len: len - ); + let kernel_func = ctx.load_function_ptype("sequence", &[T::PTYPE])?; + + ctx.launch_kernel(&kernel_func, len, |args| { + args.arg(&buffer).arg(&base).arg(&multiplier).arg(&len_u64); + })?; let output_buf = BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(buffer))); diff --git a/vortex-cuda/src/kernel/encodings/zigzag.rs b/vortex-cuda/src/kernel/encodings/zigzag.rs index 4b265b66a5b..393a87eed43 100644 --- a/vortex-cuda/src/kernel/encodings/zigzag.rs +++ b/vortex-cuda/src/kernel/encodings/zigzag.rs @@ -6,7 +6,7 @@ use std::fmt::Debug; use async_trait::async_trait; use cudarc::driver::DeviceRepr; use cudarc::driver::PushKernelArg; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::PrimitiveArray; @@ -25,11 +25,10 @@ use crate::CudaBufferExt; use crate::executor::CudaArrayExt; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; -use crate::launch_cuda_kernel_impl; /// CUDA decoder for ZigZag decoding. #[derive(Debug)] -pub struct ZigZagExecutor; +pub(crate) struct ZigZagExecutor; impl ZigZagExecutor { fn try_specialize(array: ArrayRef) -> Option { @@ -39,6 +38,7 @@ impl ZigZagExecutor { #[async_trait] impl CudaExecute for ZigZagExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -82,17 +82,11 @@ where let array_len_u64 = array_len as u64; // Load kernel function - let kernel_ptypes = [U::PTYPE]; - let cuda_function = ctx.load_function_ptype("zigzag", &kernel_ptypes)?; - let mut launch_builder = ctx.launch_builder(&cuda_function); + let cuda_function = ctx.load_function_ptype("zigzag", &[U::PTYPE])?; - // Build launch args: buffer, length - launch_builder.arg(&cuda_view); - launch_builder.arg(&array_len_u64); - - // Launch kernel - let _cuda_events = - launch_cuda_kernel_impl(&mut launch_builder, CU_EVENT_DISABLE_TIMING, array_len)?; + ctx.launch_kernel(&cuda_function, array_len, |args| { + args.arg(&cuda_view).arg(&array_len_u64); + })?; // Build result - in-place, reinterpret as signed Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle( diff --git a/vortex-cuda/src/kernel/encodings/zstd.rs b/vortex-cuda/src/kernel/encodings/zstd.rs index d4b68937047..ed11bab81a3 100644 --- a/vortex-cuda/src/kernel/encodings/zstd.rs +++ b/vortex-cuda/src/kernel/encodings/zstd.rs @@ -11,6 +11,8 @@ use cudarc::driver::CudaSlice; use cudarc::driver::DevicePtr; use cudarc::driver::DevicePtrMut; use futures::future::try_join_all; +use tracing::debug; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::BinaryView; @@ -186,7 +188,7 @@ pub async fn zstd_kernel_prepare( /// CUDA executor for ZSTD decompression using nvCOMP. #[derive(Debug)] -pub struct ZstdExecutor; +pub(crate) struct ZstdExecutor; impl ZstdExecutor { fn try_specialize(array: ArrayRef) -> Option { @@ -196,6 +198,7 @@ impl ZstdExecutor { #[async_trait] impl CudaExecute for ZstdExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -205,9 +208,9 @@ impl CudaExecute for ZstdExecutor { match zstd.as_ref().dtype() { DType::Binary(_) | DType::Utf8(_) => decode_zstd(zstd, ctx).await, - other => { - tracing::debug!( - dtype = %other, + _other => { + debug!( + dtype = %_other, "Only Binary/Utf8 ZSTD arrays supported on GPU, falling back to CPU" ); zstd.decompress()?.to_canonical() @@ -250,21 +253,24 @@ async fn decode_zstd(array: ZstdArray, ctx: &mut CudaExecutionCtx) -> VortexResu let stream = ctx.stream(); - unsafe { - nvcomp_zstd::decompress_async( - exec.frame_ptrs_ptr as _, - exec.frame_sizes_ptr as _, - exec.output_sizes_ptr as _, - exec.device_actual_sizes.device_ptr_mut(stream).0 as _, - exec.num_frames, - exec.nvcomp_temp_buffer.device_ptr_mut(stream).0 as _, - exec.nvcomp_temp_buffer_size, - exec.output_ptrs_ptr as _, - exec.device_statuses.device_ptr_mut(stream).0 as _, - stream.cu_stream().cast(), - ) - .map_err(|e| vortex_err!("nvcomp decompress_async failed: {}", e))?; - } + ctx.launch_external(n_rows, || { + // SAFETY: zstd_kernel_prepare makes sure to return valid kernel params. + unsafe { + nvcomp_zstd::decompress_async( + exec.frame_ptrs_ptr as _, + exec.frame_sizes_ptr as _, + exec.output_sizes_ptr as _, + exec.device_actual_sizes.device_ptr_mut(stream).0 as _, + exec.num_frames, + exec.nvcomp_temp_buffer.device_ptr_mut(stream).0 as _, + exec.nvcomp_temp_buffer_size, + exec.output_ptrs_ptr as _, + exec.device_statuses.device_ptr_mut(stream).0 as _, + stream.cu_stream().cast(), + ) + .map_err(|e| vortex_err!("nvcomp decompress_async failed: {}", e)) + } + })?; // Unconditionally copy back to the host as Zstd arrays are fully // self-contained. They neither have any parent or child encodings. diff --git a/vortex-cuda/src/kernel/encodings/zstd_buffers.rs b/vortex-cuda/src/kernel/encodings/zstd_buffers.rs index 56207741cbf..509ed41f121 100644 --- a/vortex-cuda/src/kernel/encodings/zstd_buffers.rs +++ b/vortex-cuda/src/kernel/encodings/zstd_buffers.rs @@ -34,7 +34,7 @@ use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; #[derive(Debug)] -pub struct ZstdBuffersExecutor; +pub(crate) struct ZstdBuffersExecutor; #[async_trait] impl CudaExecute for ZstdBuffersExecutor { diff --git a/vortex-cuda/src/kernel/filter/mod.rs b/vortex-cuda/src/kernel/filter/mod.rs index f76c8f8126c..962007eafad 100644 --- a/vortex-cuda/src/kernel/filter/mod.rs +++ b/vortex-cuda/src/kernel/filter/mod.rs @@ -15,6 +15,7 @@ use async_trait::async_trait; use cudarc::driver::DevicePtr; use cudarc::driver::DevicePtrMut; use cudarc::driver::DeviceRepr; +use tracing::instrument; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::arrays::FilterArrayParts; @@ -42,6 +43,7 @@ pub struct FilterExecutor; #[async_trait] impl CudaExecute for FilterExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, @@ -134,7 +136,7 @@ async fn filter_sized {{ - use ::cudarc::driver::PushKernelArg as _; - let cuda_function = $ctx.load_function_ptype($module, $ptypes)?; - let mut launch_builder = $ctx.launch_builder(&cuda_function); - - $( - launch_builder.arg(&$arg); - )* - - $crate::launch_cuda_kernel_impl(&mut launch_builder, $event_recording, $len)? - }}; +/// Implementations can add tracing, async callbacks, or other behavior +/// around kernel launches. +pub trait LaunchStrategy: Debug + Send + Sync + 'static { + /// Returns the event flags to use for this launch. + fn event_flags(&self) -> CUevent_flags; + + /// Called after the kernel launch completes with the recorded events. + fn on_complete(&self, events: &CudaKernelEvents, len: usize) -> VortexResult<()>; +} + +/// Extension trait for executing a function which may generate CUDA operations, bracketing them +/// with CUDA events created using the launch strategy system. +pub trait LaunchStrategyExt: LaunchStrategy { + fn with_strategy(&self, stream: &CudaStream, len: usize, func: F) -> VortexResult<()> + where + F: FnMut() -> VortexResult<()>; +} + +impl LaunchStrategyExt for S { + fn with_strategy(&self, stream: &CudaStream, len: usize, mut func: F) -> VortexResult<()> + where + F: FnMut() -> VortexResult<()>, + { + let flags = self.event_flags(); + + let before = stream + .record_event(Some(flags)) + .map_err(|e| vortex_err!("record_event: {e}"))?; + + func()?; + + let after = stream + .record_event(Some(flags)) + .map_err(|e| vortex_err!("record_event: {e}"))?; + + self.on_complete( + &CudaKernelEvents { + before_launch: before, + after_launch: after, + }, + len, + )?; + + Ok(()) + } +} + +/// Default launch strategy with no tracing overhead. +#[derive(Debug)] +pub struct DefaultLaunchStrategy; + +impl LaunchStrategy for DefaultLaunchStrategy { + fn event_flags(&self) -> CUevent_flags { + CUevent_flags::CU_EVENT_DISABLE_TIMING + } + + fn on_complete(&self, _events: &CudaKernelEvents, _len: usize) -> VortexResult<()> { + Ok(()) + } +} + +/// Launch strategy that records timing and emits trace events. +#[derive(Debug)] +pub struct TracingLaunchStrategy; + +impl LaunchStrategy for TracingLaunchStrategy { + fn event_flags(&self) -> CUevent_flags { + CUevent_flags::CU_EVENT_DEFAULT + } + + fn on_complete(&self, events: &CudaKernelEvents, len: usize) -> VortexResult<()> { + let duration = events.duration()?; + trace!( + execution_nanos = duration.as_nanos(), + len, "execution completed" + ); + Ok(()) + } } /// Launches a CUDA kernel with the passed launch builder. @@ -92,7 +134,7 @@ macro_rules! launch_cuda_kernel { /// Depending on `CUevent_flags` these events can contain timestamps. Use /// `CU_EVENT_DISABLE_TIMING` for minimal overhead and `CU_EVENT_DEFAULT` to /// enable timestamps. -pub fn launch_cuda_kernel_impl( +pub(crate) fn launch_cuda_kernel_impl( launch_builder: &mut LaunchArgs, event_flags: CUevent_flags, array_len: usize, @@ -127,7 +169,7 @@ pub fn launch_cuda_kernel_impl( /// Depending on `CUevent_flags` these events can contain timestamps. Use /// `CU_EVENT_DISABLE_TIMING` for minimal overhead and `CU_EVENT_DEFAULT` to /// enable timestamps. -pub fn launch_cuda_kernel_with_config( +pub(crate) fn launch_cuda_kernel_with_config( launch_builder: &mut LaunchArgs, config: LaunchConfig, event_flags: CUevent_flags, @@ -153,7 +195,7 @@ pub fn launch_cuda_kernel_with_config( /// /// Handles loading PTX files, compiling modules, and loading functions. #[derive(Debug)] -pub struct KernelLoader { +pub(crate) struct KernelLoader { /// Cache of loaded CUDA modules, keyed by module name modules: DashMap>, } diff --git a/vortex-cuda/src/kernel/patches/mod.rs b/vortex-cuda/src/kernel/patches/mod.rs index bff666ab66a..ced8d822bb5 100644 --- a/vortex-cuda/src/kernel/patches/mod.rs +++ b/vortex-cuda/src/kernel/patches/mod.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use cudarc::driver::DeviceRepr; -use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING; +use cudarc::driver::PushKernelArg; use vortex_array::arrays::PrimitiveArrayParts; use vortex_array::patches::Patches; use vortex_array::validity::Validity; @@ -16,7 +16,6 @@ use crate::CudaBufferExt; use crate::CudaDeviceBuffer; use crate::CudaExecutionCtx; use crate::executor::CudaArrayExt; -use crate::launch_cuda_kernel; /// Apply a set of patches in-place onto a [`CudaDeviceBuffer`] holding `ValuesT`. pub(crate) async fn execute_patches< @@ -77,20 +76,14 @@ pub(crate) async fn execute_patches< let d_patch_indices_view = d_patch_indices.cuda_view::()?; let d_patch_values_view = d_patch_values.cuda_view::()?; - // kernel arg order for patches is values, patchIndices, patchValues, patchesLen - let _events = launch_cuda_kernel!( - execution_ctx: ctx, - module: "patches", - ptypes: &[ValuesT::PTYPE, IndicesT::PTYPE], - launch_args: [ - d_target_view, - d_patch_indices_view, - d_patch_values_view, - patches_len_u64, - ], - event_recording: CU_EVENT_DISABLE_TIMING, - array_len: patches_len - ); + let kernel_func = ctx.load_function_ptype("patches", &[ValuesT::PTYPE, IndicesT::PTYPE])?; + + ctx.launch_kernel(&kernel_func, patches_len, |args| { + args.arg(&d_target_view) + .arg(&d_patch_indices_view) + .arg(&d_patch_values_view) + .arg(&patches_len_u64); + })?; Ok(target) } diff --git a/vortex-cuda/src/kernel/slice/mod.rs b/vortex-cuda/src/kernel/slice/mod.rs index 4b19dfc746e..74f95bc525c 100644 --- a/vortex-cuda/src/kernel/slice/mod.rs +++ b/vortex-cuda/src/kernel/slice/mod.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use async_trait::async_trait; +use tracing::instrument; use vortex_array::Array; use vortex_array::ArrayRef; use vortex_array::Canonical; @@ -19,6 +20,7 @@ pub struct SliceExecutor; #[async_trait] impl CudaExecute for SliceExecutor { + #[instrument(level = "trace", skip_all, fields(executor = ?self))] async fn execute( &self, array: ArrayRef, diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index adf66bcf8e2..ebdf225c36a 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -5,6 +5,8 @@ use std::process::Command; +use tracing::info; + pub mod arrow; mod canonical; mod device_buffer; @@ -29,20 +31,19 @@ use kernel::BitPackedExecutor; use kernel::ConstantNumericExecutor; use kernel::DateTimePartsExecutor; use kernel::DecimalBytePartsExecutor; +pub use kernel::DefaultLaunchStrategy; use kernel::DictExecutor; use kernel::FilterExecutor; use kernel::FoRExecutor; +pub use kernel::LaunchStrategy; use kernel::RunEndExecutor; use kernel::SharedExecutor; +pub use kernel::TracingLaunchStrategy; use kernel::ZigZagExecutor; #[cfg(feature = "unstable_encodings")] use kernel::ZstdBuffersExecutor; use kernel::ZstdExecutor; pub use kernel::ZstdKernelPrep; -pub use kernel::bitpacked_cuda_kernel; -pub use kernel::bitpacked_cuda_launch_config; -pub use kernel::launch_cuda_kernel_impl; -pub use kernel::launch_cuda_kernel_with_config; pub use kernel::zstd_kernel_prepare; pub use session::CudaSession; pub use session::CudaSessionExt; @@ -78,7 +79,7 @@ pub fn cuda_available() -> bool { /// Registers CUDA kernels. pub fn initialize_cuda(session: &CudaSession) { - tracing::info!("Registering CUDA kernels"); + info!("Registering CUDA kernels"); session.register_kernel(ALPVTable::ID, &ALPExecutor); session.register_kernel(BitPackedVTable::ID, &BitPackedExecutor); session.register_kernel(ConstantVTable::ID, &ConstantNumericExecutor); diff --git a/vortex-cuda/src/session.rs b/vortex-cuda/src/session.rs index 33233582116..14e42078f38 100644 --- a/vortex-cuda/src/session.rs +++ b/vortex-cuda/src/session.rs @@ -16,6 +16,7 @@ use crate::ExportDeviceArray; use crate::arrow::CanonicalDeviceArrayExport; use crate::executor::CudaExecute; pub use crate::executor::CudaExecutionCtx; +use crate::initialize_cuda; use crate::kernel::KernelLoader; use crate::stream::VortexCudaStream; use crate::stream_pool::VortexCudaStreamPool; @@ -128,7 +129,7 @@ impl CudaSession { } impl Default for CudaSession { - /// Creates a default CUDA session using device 0. + /// Creates a default CUDA session using device 0, with all GPU array kernels preloaded. /// /// # Panics /// @@ -136,7 +137,9 @@ impl Default for CudaSession { fn default() -> Self { #[expect(clippy::expect_used)] let context = CudaContext::new(0).expect("Failed to initialize CUDA device 0"); - Self::new(context) + let this = Self::new(context); + initialize_cuda(&this); + this } } diff --git a/vortex-cuda/src/stream.rs b/vortex-cuda/src/stream.rs index fad54b36ab4..305408c4f6f 100644 --- a/vortex-cuda/src/stream.rs +++ b/vortex-cuda/src/stream.rs @@ -14,6 +14,7 @@ use cudarc::driver::result::memcpy_htod_async; use cudarc::driver::result::stream; use futures::future::BoxFuture; use kanal::Sender; +use tracing::warn; use vortex_array::buffer::BufferHandle; use vortex_error::VortexResult; use vortex_error::vortex_err; @@ -155,10 +156,11 @@ fn register_stream_callback(stream: &CudaStream) -> VortexResult) }; // Blocking send as we're in a callback invoked by the CUDA driver. - #[expect(clippy::expect_used)] - tx.send(()) - // A send should never fail. Panic otherwise. - .expect("CUDA callback receiver dropped unexpectedly"); + // NOTE: send can fail if the CudaEvent is dropped by the caller, in which case the receiver + // is closed and sends will fail. + if let Err(_e) = tx.send(()) { + warn!(error = ?_e, "register_stream_callback send failed due to error"); + } } // SAFETY: diff --git a/vortex-python/src/arrow.rs b/vortex-python/src/arrow.rs index ece662ee80e..861602f08c9 100644 --- a/vortex-python/src/arrow.rs +++ b/vortex-python/src/arrow.rs @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2016-2025 Copyright The Apache Software Foundation // SPDX-FileCopyrightText: 2025 Copyright the Vortex contributors // SPDX-License-Identifier: Apache-2.0 -// SPDX-FileComment: Derived from upstream file arrow-pyarrow/src/lib.rs at commit 549709fb at https://github.com/apache/arrow-rs +// SPDX-FileComment: Derived from upstream file arrow-pyarrow/src/main at commit 549709fb at https://github.com/apache/arrow-rs // SPDX-FileNotice: https://github.com/apache/arrow-rs/blob/549709fbdf91cd1f6c263a7e4540c542b6fecf6b/NOTICE.txt #![allow(clippy::same_name_method)] diff --git a/vortex/public-api.lock b/vortex/public-api.lock index 6a7a137a0be..cba20f93640 100644 --- a/vortex/public-api.lock +++ b/vortex/public-api.lock @@ -18,6 +18,12 @@ pub use vortex::compressor::BtrBlocksCompressor pub use vortex::compressor::BtrBlocksCompressorBuilder +pub use vortex::compressor::FloatCode + +pub use vortex::compressor::IntCode + +pub use vortex::compressor::StringCode + pub mod vortex::compute2 pub use vortex::compute2::<> diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 23d12726e87..c989d634cb2 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -33,6 +33,9 @@ pub mod compute2 { pub mod compressor { pub use vortex_btrblocks::BtrBlocksCompressor; pub use vortex_btrblocks::BtrBlocksCompressorBuilder; + pub use vortex_btrblocks::FloatCode; + pub use vortex_btrblocks::IntCode; + pub use vortex_btrblocks::StringCode; } pub mod dtype {