Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions langfuse/_task_manager/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import atexit
import logging
import os
import queue
import weakref
from queue import Queue
from typing import List, Optional

Expand Down Expand Up @@ -35,6 +37,7 @@
_sdk_integration: str
_sample_rate: float
_mask: Optional[MaskFunction]
_shutdown: bool

def __init__(
self,
Expand Down Expand Up @@ -77,12 +80,60 @@
self._enabled = enabled
self._sample_rate = sample_rate
self._mask = mask
self._shutdown = False

self.init_resources()

# cleans up when the python interpreter closes
atexit.register(self.shutdown)

# Register fork handler to reinitialize consumer threads in child process.
# When using Gunicorn with --preload, os.fork() copies memory but not threads.
# Without this, worker processes have no consumer threads and all events are lost.
if hasattr(os, "register_at_fork"):
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(
after_in_child=lambda: weak_reinit()() if weak_reinit() is not None else None
)

Check warning on line 97 in langfuse/_task_manager/task_manager.py

View check run for this annotation

Claude / Claude Code Review

At-fork callbacks accumulate per TaskManager (residual leak)

`os.register_at_fork` has no public unregister API in CPython, so each `TaskManager.__init__` permanently appends a fresh `lambda` (closing over a new `WeakMethod`) to the process-global at-fork list — and the entries are never reclaimed even after the `TaskManager` is GC'd. The `WeakMethod` correctly prevents the manager itself from being pinned, but the lambda+WeakMethod wrapper pair is not collectible, so in long-lived processes that build/tear down many Langfuse clients (e.g. pytest-xdist wi
Comment on lines +93 to +97
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 os.register_at_fork has no public unregister API in CPython, so each TaskManager.__init__ permanently appends a fresh lambda (closing over a new WeakMethod) to the process-global at-fork list — and the entries are never reclaimed even after the TaskManager is GC'd. The WeakMethod correctly prevents the manager itself from being pinned, but the lambda+WeakMethod wrapper pair is not collectible, so in long-lived processes that build/tear down many Langfuse clients (e.g. pytest-xdist with per-test setup, notebook kernels that re-instantiate, multi-tenant handlers that recycle per tenant) the list grows without bound and each subsequent fork() walks O(N) dead no-op callbacks. Nit / follow-up only — the primary --preload bug is correctly fixed; a cleaner pattern is a module-level WeakSet of TaskManager instances with a single process-global at-fork callback that iterates live members.

Extended reasoning...

What this is

os.register_at_fork in CPython has no public unregister API — once an after_in_child callback is added to _PyOS_BeforeFork's arrays in Modules/posixmodule.c, it lives there for the lifetime of the interpreter. This PR calls os.register_at_fork(...) unconditionally on every __init__, registering a fresh lambda that closes over a new WeakMethod:

weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
os.register_at_fork(
    after_in_child=lambda: weak_reinit()() if weak_reinit() is not None else None
)

The WeakMethod was added (correctly) in response to the Greptile P1 review comment to prevent the TaskManager itself — along with its threads, queues, MediaManager, and FernLangfuse client — from being pinned in memory. That part is fine. However, the lambda object and the WeakMethod it captures in its closure are themselves strong-referenced from the at-fork registry, and they are not collectible.

Step-by-step proof

  1. App constructs Langfuse()TaskManager.__init__ runs → entry Add license #1 (lambda+WeakMethod) appended to the global list.
  2. App drops the reference; TaskManager is GC'd. The WeakMethod resolves to None. ✅ Heavy resources are freed.
  3. App constructs Langfuse() again → entry Improvements langchain integration #2 appended. Entry Add license #1 is still there, now dead.
  4. After N construction/drop cycles, the list has N entries. (N-1) of them dereference to None and do nothing; one is live.
  5. Every fork() after this iterates all N entries. Memory cost: N × (lambda + WeakMethod + closure cell) ≈ a few hundred bytes per dead entry, never reclaimed.

Why the existing fix doesn't prevent it

The WeakMethod correctly addresses Greptile's P1 (the bound method had pinned self). But the wrapper — the lambda plus its captured WeakMethod — is what register_at_fork actually holds, and that wrapper has no weakref-aware path. CPython exposes no API to remove a registered callback. So the residual leak is structural: the WeakMethod choice only weakens what the wrapper points at, not the wrapper itself.

Practical impact (and why this is a nit, not a blocker)

I want to explicitly address the refutation: the practical impact is small in the typical deployment.

  • The PR's target use case (Gunicorn --preload) creates one client in the master and forks workers once. The accumulation here is zero.
  • pytest does not fork between tests; Jupyter kernels do not fork; multi-tenant request handlers typically do not fork per tenant. So the combination of "create many TaskManagers + fork repeatedly" is uncommon.
  • Per-entry overhead is small (hundreds of bytes), and the dead callbacks are no-ops.

Where this does matter is the pathological-but-real cases (e.g. pytest-xdist runners that re-instantiate clients in workers that themselves fork, long-running task workers using Celery's prefork pool with per-job client setup as the user @georgiee mentioned). In those, the leak is bounded only by process lifetime and fork() cost grows linearly in N.

Suggested follow-up (not blocking)

Move from per-instance registration to a module-level singleton:

_MANAGERS: "weakref.WeakSet[TaskManager]" = weakref.WeakSet()
_FORK_HANDLER_REGISTERED = False

def _global_after_in_child():
    for mgr in list(_MANAGERS):
        try:
            mgr._at_fork_reinit()
        except Exception:
            pass

# In __init__:
_MANAGERS.add(self)
global _FORK_HANDLER_REGISTERED
if hasattr(os, "register_at_fork") and not _FORK_HANDLER_REGISTERED:
    os.register_at_fork(after_in_child=_global_after_in_child)
    _FORK_HANDLER_REGISTERED = True

That bounds the at-fork list at exactly one entry, regardless of how many clients are created — and the WeakSet keeps the rest of the GC story correct. This is a non-trivial refactor (touches state that has to be coordinated across all instances) and is reasonable to defer; the current PR's primary fix is solid.


def _at_fork_reinit(self):
"""Reinitialize consumer threads after fork in child process.

Called automatically by os.register_at_fork() after fork().
Necessary for Gunicorn --preload deployments where os.fork() is used:
threads are not copied to child processes (POSIX standard), so without
reinitialization, the child process has no consumer threads and all
ingestion events are silently lost.
"""
if self._shutdown:
return

self._log.debug(
f"[PID {os.getpid()}] Fork detected: reinitializing Langfuse consumer threads"
)

# Clear existing consumer references (threads are dead in child process)
self._ingestion_consumers = []
self._media_upload_consumers = []

# Recreate queues (old queues may have shared state with parent process)
self._ingestion_queue = queue.Queue(self._max_task_queue_size)
self._media_upload_queue = Queue(self._max_task_queue_size)

# Recreate MediaManager with new queue
self._media_manager = MediaManager(
api_client=self._api_client,
media_upload_queue=self._media_upload_queue,
max_retries=self._max_retries,
)

# Start fresh consumer threads in child process
self.init_resources()

self._log.debug(
f"[PID {os.getpid()}] Langfuse consumer threads reinitialized after fork"
)
Comment thread
pyg410 marked this conversation as resolved.

def init_resources(self):
for i in range(self._threads):
ingestion_consumer = IngestionConsumer(
Expand Down Expand Up @@ -191,6 +242,8 @@
"""Flush all messages and cleanly shutdown the client."""
self._log.debug("shutdown initiated")

self._shutdown = True

# Unregister the atexit handler first
atexit.unregister(self.shutdown)

Expand Down