-
-
Notifications
You must be signed in to change notification settings - Fork 87
Add webhook circuit breaker #124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add webhook circuit breaker #124
Conversation
📝 WalkthroughWalkthroughAdds a Redis-backed webhook circuit breaker with local in-process checks integrated into outgoing HTTP sends; shortens consumer and webhook/fetch timeouts; introduces WEBHOOK_CB_* env vars and many webhook-related flags; adjusts AMQP retry delay for circuit-open errors; updates README docs and examples. Changes
Sequence DiagramsequenceDiagram
participant Producer as Client/Producer
participant AMQP as AMQP Consumer
participant API as OutgoingCloudApi
participant Redis as Redis
participant Webhook as Webhook Endpoint
Producer->>AMQP: publish webhook message
AMQP->>API: process message -> sendHttp()
API->>Redis: isWebhookCircuitOpen(session, webhookId)?
Redis-->>API: open / closed
alt Circuit Open
API-->>AMQP: throw WebhookCircuitOpenError (with delayMs)
AMQP->>AMQP: schedule retry using error.delayMs or WEBHOOK_CB_REQUEUE_DELAY_MS
else Circuit Closed
API->>Webhook: HTTP POST (with WEBHOOK_TIMEOUT_MS / FETCH_TIMEOUT_MS)
alt Success (2xx)
Webhook-->>API: success
API->>Redis: closeWebhookCircuit(session, webhookId)
API-->>AMQP: ack
else Failure / timeout / non-2xx
Webhook-->>API: error
API->>Redis: bumpWebhookCircuitFailure(session, webhookId, ttl)
Redis-->>API: new failure count
alt Threshold reached
API->>Redis: openWebhookCircuit(session, webhookId, openMs)
API-->>AMQP: throw WebhookCircuitOpenError (with requeue delayMs)
else Below threshold
API-->>AMQP: nack / standard retry
end
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/services/outgoing_cloud_api.ts`:
- Around line 111-131: sendHttp currently swallows failures when the circuit
breaker is enabled by calling handleCircuitFailure(...) and returning, which
prevents the AMQP consumer from seeing errors and triggering
retries/dead-lettering; after calling handleCircuitFailure(phone, cbId, cbKey,
error) (on fetch/network error) and after handleCircuitFailure(phone, cbId,
cbKey, errText) (on non-OK response), rethrow the original error/errText so the
exception contract is preserved; keep the existing closeWebhookCircuit(phone,
cbId) try/catch and resetCircuitLocal(cbKey) behavior but ensure errors are not
silently returned in sendHttp so the amqpConsume framework can handle them.
🧹 Nitpick comments (1)
src/services/outgoing_cloud_api.ts (1)
156-188: Consider adding cleanup for stale local circuit state entries.The module-level
cbOpenUntilandcbFailStateMaps grow unbounded as new webhook URLs are encountered. WhileisCircuitOpenLocalcleans up expired entries on access, entries for webhooks that are no longer called will accumulate indefinitely.For most deployments with stable webhook configurations this is negligible, but high-churn scenarios could see gradual memory growth.
♻️ Optional: Add periodic cleanup or size cap
+const MAX_LOCAL_CB_ENTRIES = 1000 + const resetCircuitLocal = (key: string) => { cbOpenUntil.delete(key) cbFailState.delete(key) } + +const cleanupStaleEntries = () => { + const now = Date.now() + for (const [key, until] of cbOpenUntil) { + if (now >= until) cbOpenUntil.delete(key) + } + for (const [key, state] of cbFailState) { + if (now >= state.exp) cbFailState.delete(key) + } +}Then call
cleanupStaleEntries()periodically or when maps exceedMAX_LOCAL_CB_ENTRIES.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/services/outgoing_cloud_api.ts`:
- Around line 47-58: The try/catch around the Redis circuit breaker check is too
broad and swallows the WebhookCircuitOpenError; narrow the try to only call
isWebhookCircuitOpen(phone, cbId) and handle its errors, then perform the throw
new WebhookCircuitOpenError(...) outside the catch so the exception is not
consumed. Specifically, in the cbEnabled branch ensure isWebhookCircuitOpen is
awaited inside a try that only catches/handles Redis/network errors, and if it
returns true then call logger.warn(...) and throw WebhookCircuitOpenError (using
this.cbRequeueDelayMs()) outside the catch; keep the local check using
isCircuitOpenLocal(cbKey, now) unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
README.md (1)
580-582: Duplicate key in JSON example.Line 581 has
"sendGroupMessages"defined twice, which is invalid JSON (the second value would override the first). This appears to be a pre-existing issue, but it should be corrected.🐛 Proposed fix
"token": "kslflkhlkwq", "header": "api_access_token", "sendGroupMessages": false, - "sendGroupMessages": false, "sendNewMessages": false,
♻️ Duplicate comments (1)
src/services/outgoing_cloud_api.ts (1)
47-59: Critical:WebhookCircuitOpenErroris swallowed by the empty catch block.The
throw new WebhookCircuitOpenError(...)on line 52 is inside the try block, and the emptycatch {}on line 54 swallows it. When Redis reports the circuit is open, the error is caught and discarded, and execution continues—completely bypassing the Redis-based circuit breaker check.The try/catch should only wrap the
isWebhookCircuitOpencall, not the throw statement.🐛 Proposed fix
if (cbEnabled) { + let redisOpen = false try { - const open = await isWebhookCircuitOpen(phone, cbId) - if (open) { - logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId) - throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs()) - } - } catch {} + redisOpen = await isWebhookCircuitOpen(phone, cbId) + } catch { + // Redis unavailable - fall through to local check + } + if (redisOpen) { + logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId) + throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs()) + } if (isCircuitOpenLocal(cbKey, now)) {
🧹 Nitpick comments (1)
src/services/outgoing_cloud_api.ts (1)
151-173: Consider logging the original error details when the circuit handler itself fails.When the catch block at line 168 handles an exception from the circuit breaker logic, the original webhook error (
errorparameter) is logged at line 171, but the circuit handler exception (e) is logged with a generic message. This could make debugging difficult if both Redis and logging fail.Also, the method returns
falsewhen an exception occurs (line 172), which means the original error will be thrown but the circuit won't open. This is reasonable as a fallback, but worth documenting.♻️ Minor improvement for clarity
} catch (e) { - logger.warn(e as any, 'WEBHOOK_CB failure handler error') + logger.warn(e as any, 'WEBHOOK_CB failure handler error (phone=%s webhook=%s)', phone, cbId) } - try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {} + try { + logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) + } catch { + // Logging failed - nothing more we can do + } return false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/services/outgoing_cloud_api.ts`:
- Around line 148-176: The trailing logging and return after the try/catch in
handleCircuitFailure are unreachable because both the try and catch blocks
always return; remove the final try { logger.warn(error...) } catch {} and final
return false, or move that log into the catch block before its return (and
ensure it references the correct error variable), so that there is no dead code
after the try/catch in the handleCircuitFailure method.
- Around line 131-139: The code currently throws a raw string (errText); instead
create and throw an Error object containing response metadata and body so
upstream retains stack/context. Build an Error (e.g., new Error(`HTTP
${response?.status}: ${response?.statusText}`)) and attach properties like
status = response?.status, statusText = response?.statusText, and body =
errText, then pass that Error object into handleCircuitFailure(phone, cbId,
cbKey, err) (matching the earlier catch usage) and, if opened, throw the
WebhookCircuitOpenError as before; otherwise throw the constructed Error.
How the circuit breaker (CB) works in the webhook flow
Main flow
Each webhook send first checks whether the circuit is open.
If it is open, it does not attempt to send the webhook and throws WebhookCircuitOpenError so the consumer requeues with delay (avoids backlog and retry loops).
How the circuit opens
When a send fails (timeout, network error, or non‑OK HTTP response), the CB records a failure.
If the number of failures within the window (WEBHOOK_CB_FAILURE_TTL_MS) reaches the threshold (WEBHOOK_CB_FAILURE_THRESHOLD), the circuit opens for WEBHOOK_CB_OPEN_MS.
While open
Messages are not sent to the webhook.
The consumer requeues with delay (WEBHOOK_CB_REQUEUE_DELAY_MS), reducing pressure on the queue.
Where state is stored
Redis: primary, shared state across instances.
Local (in‑memory): fast fallback if Redis fails or as a cache, with periodic cleanup (WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS).
Why a lower timeout helps
A smaller WEBHOOK_TIMEOUT_MS makes errors surface faster → CB opens sooner → fewer messages stuck.
Fluxo principal
Cada envio de webhook tenta primeiro verificar se o circuito está aberto.
Se estiver aberto, ele não tenta enviar o webhook e lança WebhookCircuitOpenError para o consumidor re-enfileirar com atraso (evita backlog e retry em loop).
Como abre o circuito
Quando um envio falha (timeout, erro de rede ou resposta HTTP não‑OK), o CB registra essa falha.
Se o número de falhas dentro da janela (WEBHOOK_CB_FAILURE_TTL_MS) atingir o limite (WEBHOOK_CB_FAILURE_THRESHOLD), o circuito abre por WEBHOOK_CB_OPEN_MS.
Enquanto aberto
As mensagens não são enviadas para o webhook.
O consumidor re-enfileira com delay (WEBHOOK_CB_REQUEUE_DELAY_MS), reduzindo pressão na fila.
Onde guarda estado
Redis: estado principal (compartilhado entre instâncias).
Local (memória): fallback rápido se Redis falhar ou como cache, com limpeza periódica (WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS).
Por que o timeout baixo ajuda
WEBHOOK_TIMEOUT_MS menor faz o erro aparecer rápido → CB abre mais cedo → menos mensagens travadas.
Summary by CodeRabbit
New Features
Bug Fixes / Behavior
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.