Skip to content

Commit d744a92

Browse files
authored
chore: limit symdb uploaders under spawn (#15140)
## Description We use file-based IPC to ensure that Symbol DB has as most 2 active uploader processes under more general circumstances than fork, such as spawn.
1 parent e9582f2 commit d744a92

File tree

3 files changed

+126
-29
lines changed

3 files changed

+126
-29
lines changed

ddtrace/internal/ipc.py

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from contextlib import contextmanager
12
import os
23
import secrets
34
import tempfile
@@ -99,32 +100,42 @@ def open_file(path, mode): # type: ignore
99100
class SharedStringFile:
100101
"""A simple shared-file implementation for multiprocess communication."""
101102

102-
def __init__(self) -> None:
103-
self.filename: typing.Optional[str] = str(TMPDIR / secrets.token_hex(8)) if TMPDIR is not None else None
103+
def __init__(self, name: typing.Optional[str] = None) -> None:
104+
self.filename: typing.Optional[str] = (
105+
str(TMPDIR / (name or secrets.token_hex(8))) if TMPDIR is not None else None
106+
)
107+
if self.filename is not None:
108+
Path(self.filename).touch(exist_ok=True)
109+
110+
def put_unlocked(self, f: typing.BinaryIO, data: str) -> None:
111+
f.seek(0, os.SEEK_END)
112+
dt = (data + "\x00").encode()
113+
if f.tell() + len(dt) <= MAX_FILE_SIZE:
114+
f.write(dt)
104115

105116
def put(self, data: str) -> None:
106117
"""Put a string into the file."""
107118
if self.filename is None:
108119
return
109120

110121
try:
111-
with open_file(self.filename, "ab") as f, WriteLock(f):
112-
f.seek(0, os.SEEK_END)
113-
dt = (data + "\x00").encode()
114-
if f.tell() + len(dt) <= MAX_FILE_SIZE:
115-
f.write(dt)
122+
with self.lock_exclusive() as f:
123+
self.put_unlocked(f, data)
116124
except Exception: # nosec
117125
pass
118126

127+
def peekall_unlocked(self, f: typing.BinaryIO) -> typing.List[str]:
128+
f.seek(0)
129+
return data.decode().split("\x00") if (data := f.read().strip(b"\x00")) else []
130+
119131
def peekall(self) -> typing.List[str]:
120132
"""Peek at all strings from the file."""
121133
if self.filename is None:
122134
return []
123135

124136
try:
125-
with open_file(self.filename, "r+b") as f, ReadLock(f):
126-
f.seek(0)
127-
return f.read().strip(b"\x00").decode().split("\x00")
137+
with self.lock_shared() as f:
138+
return self.peekall_unlocked(f)
128139
except Exception: # nosec
129140
return []
130141

@@ -134,13 +145,39 @@ def snatchall(self) -> typing.List[str]:
134145
return []
135146

136147
try:
137-
with open_file(self.filename, "r+b") as f, WriteLock(f):
138-
f.seek(0)
139-
strings = f.read().strip(b"\x00").decode().split("\x00")
148+
with self.lock_exclusive() as f:
149+
try:
150+
return self.peekall_unlocked(f)
151+
finally:
152+
self.clear_unlocked(f)
153+
except Exception: # nosec
154+
return []
140155

141-
f.seek(0)
142-
f.truncate()
156+
def clear_unlocked(self, f: typing.BinaryIO) -> None:
157+
f.seek(0)
158+
f.truncate()
159+
160+
def clear(self) -> None:
161+
"""Clear all strings from the file."""
162+
if self.filename is None:
163+
return
143164

144-
return strings
165+
try:
166+
with self.lock_exclusive() as f:
167+
self.clear_unlocked(f)
145168
except Exception: # nosec
146-
return []
169+
pass
170+
171+
@contextmanager
172+
def lock_shared(self):
173+
"""Context manager to acquire a shared/read lock on the file."""
174+
with open_file(self.filename, "rb") as f, ReadLock(f):
175+
yield f
176+
177+
@contextmanager
178+
def lock_exclusive(self):
179+
"""Context manager to acquire an exclusive/write lock on the file."""
180+
if self.filename is None:
181+
return
182+
with open_file(self.filename, "r+b") as f, WriteLock(f):
183+
yield f

ddtrace/internal/symbol_db/remoteconfig.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import typing as t
33

44
from ddtrace.internal.forksafe import has_forked
5+
from ddtrace.internal.ipc import SharedStringFile
56
from ddtrace.internal.logger import get_logger
67
from ddtrace.internal.products import manager as product_manager
78
from ddtrace.internal.remoteconfig import Payload
@@ -18,20 +19,34 @@
1819

1920
log = get_logger(__name__)
2021

22+
# Use a shared file to keep track of which PIDs have Symbol DB enabled. This way
23+
# we can ensure that at most two processes are emitting symbols under a large
24+
# range of scenarios.
25+
shared_pid_file = SharedStringFile(f"{os.getppid()}-symdb-pids")
26+
27+
MAX_CHILD_UPLOADERS = 1 # max one child
28+
2129

2230
def _rc_callback(data: t.Sequence[Payload]):
23-
if get_ancestor_runtime_id() is not None and has_forked():
24-
log.debug("[PID %d] SymDB: Disabling Symbol DB in forked process", os.getpid())
25-
# We assume that forking is being used for spawning child worker
26-
# processes. Therefore, we avoid uploading the same symbols from each
27-
# child process. We restrict the enablement of Symbol DB to just the
28-
# parent process and the first fork child.
29-
remoteconfig_poller.unregister("LIVE_DEBUGGING_SYMBOL_DB")
30-
31-
if SymbolDatabaseUploader.is_installed():
32-
SymbolDatabaseUploader.uninstall()
33-
34-
return
31+
with shared_pid_file.lock_exclusive() as f:
32+
if (get_ancestor_runtime_id() is not None and has_forked()) or len(
33+
set(shared_pid_file.peekall_unlocked(f))
34+
) >= MAX_CHILD_UPLOADERS:
35+
log.debug("[PID %d] SymDB: Disabling Symbol DB in child process", os.getpid())
36+
# We assume that forking is being used for spawning child worker
37+
# processes. Therefore, we avoid uploading the same symbols from each
38+
# child process. We restrict the enablement of Symbol DB to just the
39+
# parent process and the first fork child.
40+
remoteconfig_poller.unregister("LIVE_DEBUGGING_SYMBOL_DB")
41+
42+
if SymbolDatabaseUploader.is_installed():
43+
SymbolDatabaseUploader.uninstall()
44+
45+
return
46+
47+
# Store the PID of the current process so that we know which processes
48+
# have Symbol DB enabled.
49+
shared_pid_file.put_unlocked(f, str(os.getpid()))
3550

3651
for payload in data:
3752
if payload.metadata is None:

tests/internal/symbol_db/test_symbols.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@
1515
from ddtrace.internal.symbol_db.symbols import SymbolType
1616

1717

18+
@pytest.fixture(autouse=True, scope="function")
19+
def pid_file_teardown():
20+
from ddtrace.internal.symbol_db.remoteconfig import shared_pid_file
21+
22+
yield
23+
24+
shared_pid_file.clear()
25+
26+
1827
def test_symbol_from_code():
1928
def foo(a, b, c=None):
2029
loc = 42
@@ -320,3 +329,39 @@ def test_symbols_fork_uploads():
320329

321330
for pid in pids:
322331
os.waitpid(pid, 0)
332+
333+
334+
@pytest.mark.subprocess(run_module=True, err=None)
335+
def test_symbols_spawn_uploads():
336+
def spawn_target(results):
337+
from ddtrace.internal.remoteconfig import ConfigMetadata
338+
from ddtrace.internal.remoteconfig import Payload
339+
from ddtrace.internal.symbol_db.remoteconfig import _rc_callback
340+
from ddtrace.internal.symbol_db.symbols import SymbolDatabaseUploader
341+
342+
SymbolDatabaseUploader.install()
343+
344+
rc_data = [Payload(ConfigMetadata("test", "symdb", "hash", 0, 0), "test", None)]
345+
_rc_callback(rc_data)
346+
results.append(SymbolDatabaseUploader.is_installed())
347+
348+
if __name__ == "__main__":
349+
import multiprocessing
350+
351+
multiprocessing.freeze_support()
352+
353+
multiprocessing.set_start_method("spawn", force=True)
354+
mc_context = multiprocessing.get_context("spawn")
355+
manager = multiprocessing.Manager()
356+
returns = manager.list()
357+
jobs = []
358+
359+
for _ in range(10):
360+
p = mc_context.Process(target=spawn_target, args=(returns,))
361+
p.start()
362+
jobs.append(p)
363+
364+
for p in jobs:
365+
p.join()
366+
367+
assert sum(returns) == 1, returns

0 commit comments

Comments
 (0)