fix(workers): commit database writes that were silently rolled back#50
Conversation
The photo worker opened its connection with engine.connect(), which does not autocommit, and never issued a commit. Every write in the message handler (processing job status, detected faces, photo status/visibility, and the single-face match) was rolled back when the connection closed, so photos were processed but their results never persisted. Wrap each message in a single transaction via self._conn.begin(). Messages are delivered one at a time, so the shared connection is only ever used by one handler at a time. SingleFaceMatchService previously managed its own transaction with self.conn.begin(). That nested begin() raised once create_processing_job had already opened the connection's transaction, and any writes it made after the block were left uncommitted. It now runs inside the worker's transaction instead.
The audit worker ran record_event on a connection opened with engine.connect() and never committed, so every audit row was discarded when the connection eventually closed. Commit after each event and roll back on failure. The worker processes events one at a time, so the shared connection is safe to commit per message.
DeviceInvalidationStore wrote through a connection opened with engine.connect() and never committed, so devices flagged for an invalid push token were never persisted. The worker also runs handlers concurrently against that single shared connection, which is not safe. Give each write its own transaction with engine.begin(). One transaction per token keeps a failure on one token from aborting the others, and removes the shared connection from the worker entrypoint.
ademboukabes
left a comment
There was a problem hiding this comment.
The connection handling in photo-worker and audit-worker introduced two critical stability issues:
1. Database Disconnects
By initializing self._conn once in run_worker(), the connection is kept alive forever. If PostgreSQL restarts, the worker is stuck looping on OperationalError because the dead socket is never renewed.
2. Silent Crashes (PendingRollbackError)
In photo_worker, SQL errors (like ForeignKeyViolationError) were swallowed by try...except blocks in helpers like _create_job. Because the exception didn't bubble up, the async with self._conn.begin() block tried to COMMIT an aborted transaction, causing a fatal PendingRollbackError.
I suggest applying the following structural fixes:
pool_pre_ping=True(database.py): Add this to the async engine creation. It auto-recovers dead connections upon network drops.- Per-Message Transactions (
photo_worker/audit_worker): Remove the global connectionself._connentirely. Useasync with engine.begin() as conn:insidehandle_messageinstead. - Error Propagation (
photo_worker/main.py): Removetry...exceptinside SQL helpers (like_create_job) so invalid transactions trigger an automaticROLLBACK, avoidingPendingRollbackError. Wrap the main logic inhandle_messagein atry...exceptto catch and log the failure without crashing the worker.
Without pre-ping, a pooled connection that died while idle (for example after a Postgres restart or an idle-timeout cut by a proxy) is handed out as-is and the next query fails with a stale-socket error. pool_pre_ping validates the connection on checkout and transparently replaces a dead one, which the workers rely on now that they check out a fresh connection per message.
Follow-up to the commit fix, addressing review feedback. The worker held a single connection for its whole lifetime, so a Postgres restart left it looping on a dead socket. It now opens a fresh connection and transaction per message with engine.begin(), which commits on success and rolls back on failure; combined with pool_pre_ping the connection is revalidated on checkout. The SQL helpers (_create_job, _update_job) and the per-face insert used to catch and swallow DB errors. Inside one transaction that is unsafe: the first failed statement aborts the whole transaction, and continuing would either hit "current transaction is aborted" on the next statement or raise PendingRollbackError at commit. Those swallows are removed so a DB error propagates and the transaction rolls back cleanly. The per-message handler in run_worker wraps everything in try/except and logs, so one bad message no longer tears down the subscription.
Replace the long-lived connection with a fresh engine.begin() per event, for the same reasons as the photo worker: auto-commit/rollback per message and recovery from dropped connections via pool_pre_ping. Removes the manual start/stop connection lifecycle.
|
Good catches, both of these are real. Addressed in 3c12a6a, c0258aa, 33f6bd5. 1. Dead connections — agreed. The workers held one connection for the whole process, so a Postgres restart left them on a dead socket. Both photo and audit workers now open a fresh connection + transaction per message with 2. PendingRollbackError — agreed, and it's exactly the trap with the previous version: a swallowed DB error aborts the transaction, then the wrapping block tries to COMMIT and blows up. Removed the Net effect per message: fresh connection → Checks: One thing I left as a follow-up rather than fold in here: the worker still publishes its NATS audit/cleanup events from inside the transaction, so a publish can precede a commit. Moving those to publish-after-commit (outbox) is worth doing but felt out of scope for this fix — happy to do it in a separate PR if you'd prefer. |
Problem
The background workers open their database connection with
engine.connect(),which in SQLAlchemy 2.0 async does not autocommit. None of them ever issued
a commit, so every write they made was rolled back when the connection closed.
Only the request path (
get_db) and the upload-group worker usedengine.begin(), which commits.Confirmed against SQLAlchemy 2.0.47 with Postgres:
engine.connect()connection does not persist without anexplicit
commit().conn.begin()raisesInvalidRequestError. This is why the single-face path silently failed:create_processing_jobopened the transaction first, thenSingleFaceMatchServicetriedconn.begin()again and the error wasswallowed by a broad
except.begin()block (the match status update andnotification in
SingleFaceMatchService) were left in a fresh, uncommittedtransaction.
Impact: photos were processed but their status/visibility, detected faces, and
match records were never saved; audit events were dropped entirely; devices
flagged for invalid push tokens were never marked.
Changes
self._conn.begin()transaction,so the processing job, detected faces, photo status/visibility, and the
single-face match commit (or roll back) together. Messages are delivered one
at a time, so the shared connection is only ever used by one handler.
SingleFaceMatchServiceno longer opens its own transaction.It runs inside the worker's transaction, which removes the nested-
begin()crash and the lost post-block writes.
DeviceInvalidationStorenow writes in its ownengine.begin()transaction, one per token. This commits the write and stopssharing a single connection across the worker's concurrent handlers.
Testing
make lint(ruff) - passesmake check_type(mypy, 136 files) - passespytest app/worker/photo_worker/tests- 7 passed, 5 deselected. The deselectedtests reach a real
NatsClient.connect()and need a live NATS broker, whichisn't available in my sandbox; they don't touch the changed code. The 7 that
ran cover the no-faces, single-face, and group paths through the new
per-message transaction.