From 044dfffe9f09419efd2aa740ac535af41254da8a Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Wed, 1 Apr 2026 14:48:54 +0100 Subject: [PATCH] fix oneshot race in tsan with ffi --- vortex-io/src/runtime/handle.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/vortex-io/src/runtime/handle.rs b/vortex-io/src/runtime/handle.rs index 4893c2e57e5..6ebe956d5c6 100644 --- a/vortex-io/src/runtime/handle.rs +++ b/vortex-io/src/runtime/handle.rs @@ -8,6 +8,8 @@ use std::task::Context; use std::task::Poll; use std::task::ready; +use futures::channel::oneshot::channel; +use futures::channel::oneshot::Receiver; use futures::FutureExt; use tracing::Instrument; use vortex_error::vortex_panic; @@ -65,7 +67,7 @@ impl Handle { Fut: Future + Send + 'static, R: Send + 'static, { - let (send, recv) = oneshot::channel(); + let (send, recv) = channel(); let span = tracing::Span::current(); let abort_handle = self.runtime().spawn( async move { @@ -105,12 +107,12 @@ impl Handle { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (send, recv) = oneshot::channel(); + let (send, recv) = channel(); let span = tracing::Span::current(); let abort_handle = self.runtime().spawn_cpu(Box::new(move || { let _guard = span.enter(); // Optimistically avoid the work if the result won't be used. - if !send.is_closed() { + if !send.is_canceled() { // Task::detach allows the receiver to be dropped, so we ignore send errors. drop(send.send(f())); } @@ -127,12 +129,12 @@ impl Handle { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (send, recv) = oneshot::channel(); + let (send, recv) = channel(); let span = tracing::Span::current(); let abort_handle = self.runtime().spawn_blocking_io(Box::new(move || { let _guard = span.enter(); // Optimistically avoid the work if the result won't be used. - if !send.is_closed() { + if !send.is_canceled() { // Task::detach allows the receiver to be dropped, so we ignore send errors. drop(send.send(f())); } @@ -150,7 +152,7 @@ impl Handle { /// continue running in the background, call [`Task::detach`]. #[must_use = "When a Task is dropped without being awaited, it is cancelled"] pub struct Task { - recv: oneshot::AsyncReceiver, + recv: Receiver, abort_handle: Option, }