feat!: nested tokio runtime support, async API surface, drain on shutdown, flush on drop#2005
feat!: nested tokio runtime support, async API surface, drain on shutdown, flush on drop#2005Aaalibaba42 wants to merge 8 commits into
Conversation
📚 Documentation Check Results📦
|
Clippy Allow Annotation ReportComparing clippy allow annotations between branches:
Summary by Rule
Annotation Counts by File
Annotation Stats by Crate
About This ReportThis report tracks Clippy allow annotations for specific rules, showing how they've changed in this PR. Decreasing the number of these annotations generally improves code quality. |
🔒 Cargo Deny Results📦
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #2005 +/- ##
==========================================
+ Coverage 72.83% 72.91% +0.07%
==========================================
Files 459 459
Lines 76134 76381 +247
==========================================
+ Hits 55455 55692 +237
- Misses 20679 20689 +10
🚀 New features to boost your workflow:
|
|
Artifact Size Benchmark Reportaarch64-alpine-linux-musl
aarch64-unknown-linux-gnu
libdatadog-x64-windows
libdatadog-x86-windows
x86_64-alpine-linux-musl
x86_64-unknown-linux-gnu
|
b5a716f to
d55d383
Compare
d4d669c to
463495c
Compare
yannham
left a comment
There was a problem hiding this comment.
We've discussed this offline, and the consensus seemed to be that the actual issue of the runtime-in-the-runtime problem is that some functions of our API pretend to be async but actually use block_on under the hood. It seems one possible solution is to actually make them async, and to offer a customary blocking API at the top-level only. Then, from Rust we could only use the purely async API and not care about the shared runtime at all.
On the other hand, the current solution secretly spawns a thread and run a second runtime on the side, while blocking the original executor's thread waiting for the answer. This is a bit fishy, and we never want to block an async executor thread, as this will block a whole bunch of unrelated async tasks queued on this thread.
3dae176 to
c8d5928
Compare
c8d5928 to
bddb672
Compare
…kio-inside-tokio problems
… have tokio-inside-tokio problems" This reverts commit a9fceca.
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
bddb672 to
a73d21d
Compare
| pub(super) enum RuntimeBacking { | ||
| Owned(Arc<Runtime>), | ||
| Borrowed(Handle), | ||
| } |
There was a problem hiding this comment.
I know you have added this for dd-trace-rs but in general we don't want to us the same runtime as the calling app and always want to create a new one
There was a problem hiding this comment.
Creating a new one quickly leads to problems whenever we want to block/shutdown etc. That's why we discussed changing to borrow the parents' runtime iiuc. It's not a problem unique to dd-trace-rs
There was a problem hiding this comment.
Still, in dd-trace-rs I would prefer if we used our own runtime, spawning a thread to block on said runtime.
We don't know what customer are doing with their runtime, what load they put and so on....
What?
Make
TraceBuffer,SharedRuntime, andTraceExportersafe to use from inside ahost tokio runtime, expose an async API surface alongside the existing sync facade,
add a synchronous
flush_and_wait, drain pending spans on shutdown, and fire abest-effort flush on
Drop.Why?
SharedRuntime::block_onandSharedRuntime::shutdownpanicked withCannot start a runtime from within a runtimewhen called from inside anexisting tokio context (e.g. the Rust tracer embedded in an async application).
The original "block_on on a scoped OS thread" workaround was reverted in favor
of a proper async API surface — the OS-thread trick still allocated a second
tokio runtime per call and could not be wired into a host runtime's lifecycle.
block_oncalls deep incheck_agent_info,stop_stats_computation, andTraceExporterBuilder::build,which propagated the nested-runtime panic to every async caller.
nothing drained the sender's buffer after the worker loop stopped.
TraceBufferwithout an explicit flush also lost any buffered spans.How?
SharedRuntime(libdd-shared-runtime)RuntimeBacking::{Owned(Arc<Runtime>), Borrowed(Handle)}enum and a new constructor
SharedRuntime::from_handle(Handle)so callersthat already own a tokio runtime can share it instead of spawning a second one.
Owned mode is unchanged; borrowed mode gives up fork-safety in exchange for
letting
Drop/shutdownwork cleanly from a host worker thread withoutblock_on.block_on, syncshutdown, andbefore_forkreturn a typederror in borrowed mode (
SharedRuntimeError::ForkUnsupportedInBorrowedMode,SyncShutdownNotSupportedInBorrowedMode, orio::Error(ErrorKind::Unsupported)forblock_on) instead of panicking.trigger_shutdown_signal()(snapshots theworker set, spawns shutdown tasks on the underlying runtime, bumps a tracked
expected count) and
wait_shutdown_done(timeout)(parks on aCondvaruntilthe tracked count is reached). Pairs with sync
Dropimpls that must waitfor worker completion without
block_on.Drop,trigger_shutdown_signal,wait_shutdown_done,and
is_borrowedwere converted to poison-tolerant lock acquisition — aDrop impl must never panic, and the public shutdown surface should not
either. Internal counter bumps in spawned shutdown tasks degrade gracefully
on
PoisonError(poison.into_inner()) so a single task panic cannotdeadlock
wait_shutdown_done.ddog_shared_runtime_before_forknow propagates the new errorvariants through
SharedRuntimeFFIError, mapped to a newSharedRuntimeErrorCode::NotSupportedInBorrowedModediscriminator.libdd-data-pipelineasync API surfaceTraceExporterBuilder::buildis now a thin wrapper around anew
build_async. The sync wrapper is gated#[cfg(not(target_arch = "wasm32"))]since it relies on
SharedRuntime::block_on.TraceExporter::send,send_trace_chunks, andshutdownarenow sync wrappers over new
send_async,send_trace_chunks_async, andshutdown_asynccounterparts; the sync wrappers are similarly gated forwasm32. The internal
check_agent_info,stop_stats_computation, andhandle_stats_enabledare nowasyncend-to-end — every internalblock_onwas deleted.
ArcSwap::loadwas changed toload_fullwhere needed so theresulting
Sendfuture survivesawaitpoints.TraceBuffer(libdd-data-pipeline)flush_and_wait(timeout). Triggers a flush, captures the batchgeneration, and parks on a
Condvaruntil the worker has processed it.Short-circuits on empty batches.
Receiver::drain(). Synchronously pulls remaining chunks withoutwaiting for a flush trigger;
TraceExporterWorker::shutdowncalls it toexport any leftover spans before tearing down.
Drop for TraceBuffer<T>. Fires a non-blocking flush-notify so theworker exports pending chunks instead of losing them. Errors are
intentionally swallowed — if the runtime is already gone there is nothing
useful to do.
Tests
block_on/shutdown/dropinvoked from inside#[tokio::test(flavor = "multi_thread")], asserting the borrowed-runtimepath.
SharedRuntimetests:test_from_handle_borrowed_shutdown_wait,test_borrowed_mode_unsupported_apis,test_wait_shutdown_done_no_workers.test_drop_inside_tokio_runtime,test_drop_triggers_flush,test_flush_and_wait,test_shutdown_drains_pending_batch, etc.).Breaking changes
SharedRuntime::before_fork,after_fork_parent, andafter_fork_childnow return
Result<(), SharedRuntimeError>instead of().SharedRuntime::block_onnow returnsResult<F::Output, io::Error>(wasinfallible).
ddog_shared_runtime_before_forknow returns an optionalboxed
SharedRuntimeFFIError.Additional notes
build,send,send_trace_chunks,shutdown) is preservedfor existing callers but now panics / returns
io::Error(ErrorKind::Unsupported)when invoked on a borrowedSharedRuntime.A
sync-apicargo feature gating these methods was prototyped and revertedas premature; see the explanatory comment block in
libdd-data-pipeline/Cargo.tomlfor the rationale and a pointer for futureconsumers who do want a build that statically forbids the sync facade.