From a6d62033848a6fc836312a56237852f86ffb5c44 Mon Sep 17 00:00:00 2001 From: Vlad Scherbich Date: Thu, 13 Nov 2025 14:30:51 -0500 Subject: [PATCH] fix(profiling): fix race condition when generating upload files sequencing --- .../profiling/dd_wrapper/include/uploader.hpp | 1 + .../profiling/dd_wrapper/src/uploader.cpp | 9 +- tests/profiling_v2/exporter/test_ddup.py | 113 ++++++++++++++++++ 3 files changed, 120 insertions(+), 3 deletions(-) diff --git a/ddtrace/internal/datadog/profiling/dd_wrapper/include/uploader.hpp b/ddtrace/internal/datadog/profiling/dd_wrapper/include/uploader.hpp index 3ffd613aae4..f234b8925fb 100644 --- a/ddtrace/internal/datadog/profiling/dd_wrapper/include/uploader.hpp +++ b/ddtrace/internal/datadog/profiling/dd_wrapper/include/uploader.hpp @@ -22,6 +22,7 @@ class Uploader static inline ddog_CancellationToken cancel{ .inner = nullptr }; static inline std::atomic upload_seq{ 0 }; std::string output_filename; + uint64_t instance_upload_seq; // Captured sequence number for this instance ddog_prof_ProfileExporter ddog_exporter{ .inner = nullptr }; bool export_to_file(ddog_prof_EncodedProfile* encoded); diff --git a/ddtrace/internal/datadog/profiling/dd_wrapper/src/uploader.cpp b/ddtrace/internal/datadog/profiling/dd_wrapper/src/uploader.cpp index 63b5877d71c..7ccc4868ef8 100644 --- a/ddtrace/internal/datadog/profiling/dd_wrapper/src/uploader.cpp +++ b/ddtrace/internal/datadog/profiling/dd_wrapper/src/uploader.cpp @@ -17,9 +17,11 @@ Datadog::Uploader::Uploader(std::string_view _output_filename, ddog_prof_Profile : output_filename{ _output_filename } , ddog_exporter{ _ddog_exporter } { - // Increment the upload sequence number every time we build an uploader. - // Upoloaders are use-once-and-destroy. - upload_seq++; + // Atomically increment and capture the upload sequence number for this instance. + // This prevents race conditions where multiple threads creating Uploaders concurrently + // could result in duplicate sequence numbers in filenames. + // Uploaders are use-once-and-destroy. + instance_upload_seq = ++upload_seq; } bool @@ -28,6 +30,7 @@ Datadog::Uploader::export_to_file(ddog_prof_EncodedProfile* encoded) // Write the profile to a file using the following format for filename: // .. std::ostringstream oss; + // oss << output_filename << "." << getpid() << "." << instance_upload_seq; oss << output_filename << "." << getpid() << "." << upload_seq; std::string filename = oss.str(); std::ofstream out(filename, std::ios::binary); diff --git a/tests/profiling_v2/exporter/test_ddup.py b/tests/profiling_v2/exporter/test_ddup.py index f799bfe0e28..f6c69ac4df3 100644 --- a/tests/profiling_v2/exporter/test_ddup.py +++ b/tests/profiling_v2/exporter/test_ddup.py @@ -62,3 +62,116 @@ def test_tags_propagated(): # Profiler could add tags, so check that tags is a superset of config.tags for k, v in config.tags.items(): assert tags[k] == v + + +@pytest.mark.skipif(not ddup.is_available, reason="ddup not available") +def test_concurrent_upload_sequence_numbers(tmp_path): + """Test that concurrent uploads produce unique, sequential file sequence numbers. + + This is a regression test for a race condition in the C++ Uploader class where + multiple threads creating Uploader objects concurrently could result in files + with duplicate or non-sequential sequence numbers. + + The bug occurred because: + 1. Thread A creates Uploader -> upload_seq becomes 1 + 2. Thread B creates Uploader -> upload_seq becomes 2 + 3. Thread B exports file -> uses current upload_seq value (2) + 4. Thread A exports file -> uses current upload_seq value (2, not 1!) + + This resulted in duplicate sequence numbers or missing sequences. + + The fix captures the sequence number atomically in the Uploader constructor + (instance_upload_seq = ++upload_seq) so each instance has its own unique number. + """ + import glob + import os + import threading + import time + + from ddtrace.profiling.collector.threading import ThreadingLockCollector + + test_name = "test_concurrent_upload" + pprof_prefix = str(tmp_path / test_name) + output_filename = pprof_prefix + "." + str(os.getpid()) + + # Configure ddup + ddup.config( + env="test", + service=test_name, + version="test_version", + output_filename=pprof_prefix, + ) + ddup.start() + + num_threads = 10 + num_uploads_per_thread = 5 + barrier = threading.Barrier(num_threads) # Synchronize thread start for maximum contention + + def upload_worker(): + """Each thread collects samples and uploads multiple times.""" + # Wait for all threads to be ready + barrier.wait() + + for _ in range(num_uploads_per_thread): + # Collect some samples (using lock collector to generate profile data) + with ThreadingLockCollector(capture_pct=100): + lock = threading.Lock() + with lock: + time.sleep(0.001) # Small amount of work + + # Upload immediately to create race condition + ddup.upload() + + # Start threads + threads = [] + for _ in range(num_threads): + t = threading.Thread(target=upload_worker) + threads.append(t) + t.start() + + # Wait for all threads to complete + for t in threads: + t.join() + + # Analyze the created files + files = glob.glob(output_filename + ".*") + + # Extract sequence numbers from filenames + # Format: .. + sequence_numbers = [] + for f in files: + seq = int(f.rsplit(".", 1)[-1]) + sequence_numbers.append(seq) + + sequence_numbers.sort() + + print(f"\nCreated {len(files)} files") + print(f"Sequence numbers: {sequence_numbers}") + + # Check for issues + expected_count = num_threads * num_uploads_per_thread + + # Issue 1: Missing files (duplicates overwrite each other) + assert len(files) == expected_count, ( + f"Expected {expected_count} files, but found {len(files)}. " + f"This suggests duplicate sequence numbers caused file overwrites!" + ) + + # Issue 2: Duplicate sequence numbers + assert len(sequence_numbers) == len( + set(sequence_numbers) + ), f"Duplicate sequence numbers found! Sequences: {sequence_numbers}" + + # Issue 3: Gaps in sequence numbers + # Sequences should be continuous (no gaps) + for i in range(len(sequence_numbers) - 1): + assert ( + sequence_numbers[i + 1] == sequence_numbers[i] + 1 + ), f"Gap in sequence numbers: {sequence_numbers[i]} -> {sequence_numbers[i + 1]}" + + # Cleanup + for f in files: + try: + os.remove(f) + except Exception as e: + print(f"Error removing file {f}: {e}")