From 07e28f7106cfa64e6f7dc6bc0d76e9f5d1b5e633 Mon Sep 17 00:00:00 2001 From: caitano28 Date: Fri, 23 Jan 2026 11:38:35 -0400 Subject: [PATCH 01/10] Add webhook circuit breaker --- README.md | 14 ++++- src/defaults.ts | 6 ++ src/services/outgoing_cloud_api.ts | 91 +++++++++++++++++++++++++++++- src/services/redis.ts | 61 ++++++++++++++++++++ 4 files changed, 170 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 198b174a..af252f74 100644 --- a/README.md +++ b/README.md @@ -488,6 +488,10 @@ WEBHOOK_URL=the webhook url, this config attribute put phone number on the end, WEBHOOK_TOKEN=the webhook header token WEBHOOK_HEADER=the webhook header name WEBHOOK_TIMEOUT_MS=webhook request timeout, default 5000 ms +WEBHOOK_CB_ENABLED=true enable webhook circuit breaker to avoid backlog when endpoint is offline, default true +WEBHOOK_CB_FAILURE_THRESHOLD=number of failures within window to open circuit, default 1 +WEBHOOK_CB_OPEN_MS=how long to keep the circuit open (skip sends), default 120000 +WEBHOOK_CB_FAILURE_TTL_MS=failure counter window in ms, default 300000 WEBHOOK_SEND_NEW_MESSAGES=true, send new messages to webhook, caution with this, messages will be duplicated, default is false WEBHOOK_SEND_GROUP_MESSAGES=true, send group messages to webhook, default is true WEBHOOK_SEND_OUTGOING_MESSAGES=true, send outgoing messages to webhook, default is true @@ -523,6 +527,13 @@ WEBHOOK_FORWARD_VERSION=the version of whatsapp cloud api, default is v17.0 WEBHOOK_FORWARD_URL=the url of whatsapp cloud api, default is https://graph.facebook.com WEBHOOK_FORWARD_TIMEOUT_MS=the timeout for request to whatsapp cloud api, default is 360000 ``` +Example (circuit breaker): +```env +WEBHOOK_CB_ENABLED=true +WEBHOOK_CB_FAILURE_THRESHOLD=1 +WEBHOOK_CB_FAILURE_TTL_MS=300000 +WEBHOOK_CB_OPEN_MS=120000 +``` ### Config session with redis @@ -808,4 +819,5 @@ Mail to sales@unoapi.cloud - Connect with pairing code: https://github.com/WhiskeySockets/Baileys#starting-socket-with-pairing-code - Counting connection retry attempts even when restarting to prevent looping messages - Message delete endpoint -- Send reply message with please to send again, when any error and message enqueue in .dead \ No newline at end of file +- Send reply message with please to send again, when any error and message enqueue in .dead + diff --git a/src/defaults.ts b/src/defaults.ts index 8e32a59a..73afd694 100644 --- a/src/defaults.ts +++ b/src/defaults.ts @@ -65,6 +65,12 @@ export const WEBHOOK_ADD_TO_BLACKLIST_ON_OUTGOING_MESSAGE_WITH_TTL = ? undefined : parseInt(process.env.WEBHOOK_ADD_TO_BLACKLIST_ON_OUTGOING_MESSAGE_WITH_TTL!) export const WEBHOOK_SESSION = process.env.WEBHOOK_SESSION || '' +// Webhook circuit breaker (fail fast when endpoints are offline) +export const WEBHOOK_CB_ENABLED = + process.env.WEBHOOK_CB_ENABLED == _undefined ? true : process.env.WEBHOOK_CB_ENABLED == 'true' +export const WEBHOOK_CB_FAILURE_THRESHOLD = parseInt(process.env.WEBHOOK_CB_FAILURE_THRESHOLD || '1') +export const WEBHOOK_CB_OPEN_MS = parseInt(process.env.WEBHOOK_CB_OPEN_MS || '120000') +export const WEBHOOK_CB_FAILURE_TTL_MS = parseInt(process.env.WEBHOOK_CB_FAILURE_TTL_MS || '300000') export const AMQP_URL = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672' export const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379' export const PROXY_URL = process.env.PROXY_URL diff --git a/src/services/outgoing_cloud_api.ts b/src/services/outgoing_cloud_api.ts index cdbff65e..8bb05222 100644 --- a/src/services/outgoing_cloud_api.ts +++ b/src/services/outgoing_cloud_api.ts @@ -5,6 +5,8 @@ import logger from './logger' import { completeCloudApiWebHook, isGroupMessage, isOutgoingMessage, isNewsletterMessage, isUpdateMessage, extractDestinyPhone, extractFromPhone } from './transformer' import { addToBlacklist, isInBlacklist } from './blacklist' import { PublishOption } from '../amqp' +import { WEBHOOK_CB_ENABLED, WEBHOOK_CB_FAILURE_THRESHOLD, WEBHOOK_CB_OPEN_MS, WEBHOOK_CB_FAILURE_TTL_MS } from '../defaults' +import { isWebhookCircuitOpen, openWebhookCircuit, closeWebhookCircuit, bumpWebhookCircuitFailure } from './redis' export class OutgoingCloudApi implements Outgoing { private getConfig: getConfig @@ -29,6 +31,23 @@ export class OutgoingCloudApi implements Outgoing { } public async sendHttp(phone: string, webhook: Webhook, message: object, _options: Partial = {}) { + const cbEnabled = !!WEBHOOK_CB_ENABLED && WEBHOOK_CB_FAILURE_THRESHOLD > 0 && WEBHOOK_CB_OPEN_MS > 0 + const cbId = (webhook && (webhook.id || webhook.url || webhook.urlAbsolute)) ? `${webhook.id || webhook.url || webhook.urlAbsolute}` : 'default' + const cbKey = `${phone}:${cbId}` + const now = Date.now() + if (cbEnabled) { + try { + const open = await isWebhookCircuitOpen(phone, cbId) + if (open) { + logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId) + return + } + } catch {} + if (isCircuitOpenLocal(cbKey, now)) { + logger.warn('WEBHOOK_CB open (local): skipping send (phone=%s webhook=%s)', phone, cbId) + return + } + } const destinyPhone = await this.isInBlacklist(phone, webhook.id, message) if (destinyPhone) { logger.info(`Session phone %s webhook %s and destiny phone %s are in blacklist`, phone, webhook.id, destinyPhone) @@ -89,11 +108,81 @@ export class OutgoingCloudApi implements Outgoing { } catch (error) { logger.error('Error on send to url %s with headers %s and body %s', url, JSON.stringify(headers), body) logger.error(error) + if (cbEnabled) { + await this.handleCircuitFailure(phone, cbId, cbKey, error as any) + return + } throw error } logger.debug('Response: %s', response?.status) if (!response?.ok) { - throw await response?.text() + const errText = await response?.text() + if (cbEnabled) { + await this.handleCircuitFailure(phone, cbId, cbKey, errText) + return + } + throw errText + } + if (cbEnabled) { + try { + await closeWebhookCircuit(phone, cbId) + } catch {} + resetCircuitLocal(cbKey) + } + } + + private async handleCircuitFailure(phone: string, cbId: string, cbKey: string, error: any) { + try { + const threshold = WEBHOOK_CB_FAILURE_THRESHOLD || 1 + const openMs = WEBHOOK_CB_OPEN_MS || 120000 + const ttlMs = WEBHOOK_CB_FAILURE_TTL_MS || openMs + const count = await bumpWebhookCircuitFailure(phone, cbId, ttlMs) + const localCount = bumpCircuitFailureLocal(cbKey, ttlMs) + const finalCount = Math.max(count || 0, localCount || 0) + if (finalCount >= threshold) { + await openWebhookCircuit(phone, cbId, openMs) + openCircuitLocal(cbKey, openMs) + logger.warn('WEBHOOK_CB opened (phone=%s webhook=%s count=%s openMs=%s)', phone, cbId, finalCount, openMs) + } else { + logger.warn('WEBHOOK_CB failure (phone=%s webhook=%s count=%s/%s)', phone, cbId, finalCount, threshold) + } + } catch (e) { + logger.warn(e as any, 'WEBHOOK_CB failure handler error') } + try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {} + } +} + +const cbOpenUntil: Map = new Map() +const cbFailState: Map = new Map() + +const isCircuitOpenLocal = (key: string, now: number) => { + const until = cbOpenUntil.get(key) + if (!until) return false + if (now >= until) { + cbOpenUntil.delete(key) + return false + } + return true +} + +const openCircuitLocal = (key: string, openMs: number) => { + cbOpenUntil.set(key, Date.now() + Math.max(1, openMs || 0)) +} + +const resetCircuitLocal = (key: string) => { + cbOpenUntil.delete(key) + cbFailState.delete(key) +} + +const bumpCircuitFailureLocal = (key: string, ttlMs: number): number => { + const now = Date.now() + const ttl = Math.max(1, ttlMs || 0) + const current = cbFailState.get(key) + if (!current || now >= current.exp) { + cbFailState.set(key, { count: 1, exp: now + ttl }) + return 1 } + current.count += 1 + return current.count } diff --git a/src/services/redis.ts b/src/services/redis.ts index 6dc14c9c..8827d8e3 100644 --- a/src/services/redis.ts +++ b/src/services/redis.ts @@ -131,6 +131,67 @@ const redisSetAndExpire = async function (key: string, value: any, ttl: number) } } +export const redisDelKey = async (key: string) => redisDel(key) + +// Atomic increment with TTL (seconds). Sets TTL on first increment. +export const redisIncrWithTtl = async (key: string, ttlSec: number): Promise => { + logger.trace(`INCR ${key} with ttl ${ttlSec}s`) + try { + const v = await client.incr(key) + if (v === 1 && ttlSec > 0) { + try { await client.expire(key, ttlSec) } catch {} + } + return v + } catch (error) { + if (!client) { + await getRedis() + const v = await client.incr(key) + if (v === 1 && ttlSec > 0) { + try { await client.expire(key, ttlSec) } catch {} + } + return v + } + throw error + } +} + +// Webhook circuit breaker keys +export const webhookCircuitOpenKey = (session: string, webhookId: string) => + `${BASE_KEY}webhook-cb:${session}:${webhookId}:open` +export const webhookCircuitFailKey = (session: string, webhookId: string) => + `${BASE_KEY}webhook-cb:${session}:${webhookId}:fail` + +export const isWebhookCircuitOpen = async (session: string, webhookId: string): Promise => { + const key = webhookCircuitOpenKey(session, webhookId) + try { + const v = await redisGet(key) + return !!v + } catch { + return false + } +} + +export const openWebhookCircuit = async (session: string, webhookId: string, openMs: number): Promise => { + const ttlSec = Math.max(1, Math.ceil((openMs || 0) / 1000)) + try { + await redisSetAndExpire(webhookCircuitOpenKey(session, webhookId), '1', ttlSec) + } catch {} +} + +export const closeWebhookCircuit = async (session: string, webhookId: string): Promise => { + try { await redisDel(webhookCircuitOpenKey(session, webhookId)) } catch {} + try { await redisDel(webhookCircuitFailKey(session, webhookId)) } catch {} +} + +export const bumpWebhookCircuitFailure = async (session: string, webhookId: string, ttlMs: number): Promise => { + const ttlSec = Math.max(1, Math.ceil((ttlMs || 0) / 1000)) + try { + return await redisIncrWithTtl(webhookCircuitFailKey(session, webhookId), ttlSec) + } catch { + return 0 + } +} + export const authKey = (phone: string) => { return `${BASE_KEY}auth:${phone}` } From 810878834109bff1dfba3025202f4b7854d5c273 Mon Sep 17 00:00:00 2001 From: caitano28 Date: Fri, 23 Jan 2026 11:44:20 -0400 Subject: [PATCH 02/10] Document circuit breaker behavior and set timeout default to 60s --- README.md | 12 +++++++++++- src/defaults.ts | 6 +++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index af252f74..44db7476 100644 --- a/README.md +++ b/README.md @@ -487,7 +487,7 @@ WEBHOOK_URL_ABSOLUTE=the webhook absolute url, not use this if already use WEBHO WEBHOOK_URL=the webhook url, this config attribute put phone number on the end, no use if use WEBHOOK_URL_ABSOLUTE WEBHOOK_TOKEN=the webhook header token WEBHOOK_HEADER=the webhook header name -WEBHOOK_TIMEOUT_MS=webhook request timeout, default 5000 ms +WEBHOOK_TIMEOUT_MS=webhook request timeout, default 60000 ms WEBHOOK_CB_ENABLED=true enable webhook circuit breaker to avoid backlog when endpoint is offline, default true WEBHOOK_CB_FAILURE_THRESHOLD=number of failures within window to open circuit, default 1 WEBHOOK_CB_OPEN_MS=how long to keep the circuit open (skip sends), default 120000 @@ -527,12 +527,22 @@ WEBHOOK_FORWARD_VERSION=the version of whatsapp cloud api, default is v17.0 WEBHOOK_FORWARD_URL=the url of whatsapp cloud api, default is https://graph.facebook.com WEBHOOK_FORWARD_TIMEOUT_MS=the timeout for request to whatsapp cloud api, default is 360000 ``` +Circuit breaker behavior: +- Counts consecutive webhook failures within `WEBHOOK_CB_FAILURE_TTL_MS`. +- When the count reaches `WEBHOOK_CB_FAILURE_THRESHOLD`, the circuit opens for `WEBHOOK_CB_OPEN_MS` and sends are skipped. +- After the open window, delivery is attempted again automatically. + +Why keep `WEBHOOK_TIMEOUT_MS` low: +- A high timeout blocks the consumer for too long when the endpoint is offline. +- With lower timeout, failures are detected faster and the circuit opens sooner, reducing backlog. + Example (circuit breaker): ```env WEBHOOK_CB_ENABLED=true WEBHOOK_CB_FAILURE_THRESHOLD=1 WEBHOOK_CB_FAILURE_TTL_MS=300000 WEBHOOK_CB_OPEN_MS=120000 +WEBHOOK_TIMEOUT_MS=60000 ``` ### Config session with redis diff --git a/src/defaults.ts b/src/defaults.ts index 73afd694..4e84266c 100644 --- a/src/defaults.ts +++ b/src/defaults.ts @@ -34,7 +34,7 @@ export const WEBHOOK_FORWARD_BUSINESS_ACCOUNT_ID = process.env.WEBHOOK_FORWARD_B export const WEBHOOK_FORWARD_TOKEN = process.env.WEBHOOK_FORWARD_TOKEN || '' export const WEBHOOK_FORWARD_VERSION = process.env.WEBHOOK_FORWARD_VERSION || 'v17.0' export const WEBHOOK_FORWARD_URL = process.env.WEBHOOK_FORWARD_URL || 'https://graph.facebook.com' -export const WEBHOOK_FORWARD_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '360000') +export const WEBHOOK_FORWARD_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '60000') // comunication export const UNOAPI_URL = process.env.UNOAPI_URL || 'http://localhost:9876' @@ -42,8 +42,8 @@ export const WEBHOOK_URL_ABSOLUTE = process.env.WEBHOOK_URL_ABSOLUTE || '' export const WEBHOOK_URL = process.env.WEBHOOK_URL || 'http://localhost:9876/webhooks/fake' export const WEBHOOK_HEADER = process.env.WEBHOOK_HEADER || 'Authorization' export const WEBHOOK_TOKEN = process.env.WEBHOOK_TOKEN || UNOAPI_AUTH_TOKEN || '123abc' -export const WEBHOOK_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '360000') -export const FETCH_TIMEOUT_MS = parseInt(process.env.FETCH_TIMEOUT_MS || '360000') +export const WEBHOOK_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '60000') +export const FETCH_TIMEOUT_MS = parseInt(process.env.FETCH_TIMEOUT_MS || '60000') export const CONNECTION_TYPE = process.env.CONNECTION_TYPE || 'qrcode' export const CONSUMER_TIMEOUT_MS = parseInt(process.env.CONSUMER_TIMEOUT_MS || '360000') From 4f712f696d129e536c83ffad7b858a7a83e11857 Mon Sep 17 00:00:00 2001 From: caitano28 Date: Fri, 23 Jan 2026 12:08:32 -0400 Subject: [PATCH 03/10] Requeue on webhook circuit breaker open --- README.md | 3 +++ src/amqp.ts | 10 ++++++++- src/defaults.ts | 1 + src/services/outgoing_cloud_api.ts | 36 +++++++++++++++++++++++------- 4 files changed, 41 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 44db7476..e2f4fb1f 100644 --- a/README.md +++ b/README.md @@ -492,6 +492,7 @@ WEBHOOK_CB_ENABLED=true enable webhook circuit breaker to avoid backlog when end WEBHOOK_CB_FAILURE_THRESHOLD=number of failures within window to open circuit, default 1 WEBHOOK_CB_OPEN_MS=how long to keep the circuit open (skip sends), default 120000 WEBHOOK_CB_FAILURE_TTL_MS=failure counter window in ms, default 300000 +WEBHOOK_CB_REQUEUE_DELAY_MS=delay (ms) used to requeue when circuit is open, default 300000 WEBHOOK_SEND_NEW_MESSAGES=true, send new messages to webhook, caution with this, messages will be duplicated, default is false WEBHOOK_SEND_GROUP_MESSAGES=true, send group messages to webhook, default is true WEBHOOK_SEND_OUTGOING_MESSAGES=true, send outgoing messages to webhook, default is true @@ -531,6 +532,7 @@ Circuit breaker behavior: - Counts consecutive webhook failures within `WEBHOOK_CB_FAILURE_TTL_MS`. - When the count reaches `WEBHOOK_CB_FAILURE_THRESHOLD`, the circuit opens for `WEBHOOK_CB_OPEN_MS` and sends are skipped. - After the open window, delivery is attempted again automatically. +- When the circuit is open, the message is requeued with a longer delay (`WEBHOOK_CB_REQUEUE_DELAY_MS`) to avoid retry storms. Why keep `WEBHOOK_TIMEOUT_MS` low: - A high timeout blocks the consumer for too long when the endpoint is offline. @@ -542,6 +544,7 @@ WEBHOOK_CB_ENABLED=true WEBHOOK_CB_FAILURE_THRESHOLD=1 WEBHOOK_CB_FAILURE_TTL_MS=300000 WEBHOOK_CB_OPEN_MS=120000 +WEBHOOK_CB_REQUEUE_DELAY_MS=300000 WEBHOOK_TIMEOUT_MS=60000 ``` diff --git a/src/amqp.ts b/src/amqp.ts index b954edb6..df540255 100644 --- a/src/amqp.ts +++ b/src/amqp.ts @@ -16,6 +16,7 @@ import { UNOAPI_EXCHANGE_BRIDGE_NAME, IGNORED_TO_NUMBERS, UNOAPI_QUEUE_LISTENER, + WEBHOOK_CB_REQUEUE_DELAY_MS, } from './defaults' import logger from './services/logger' import { version } from '../package.json' @@ -352,7 +353,14 @@ export const amqpConsume = async ( await amqpPublish(exchange, queue, routingKey, { ...data, traces }, { dead: true, type: options.type }) } else { logger.info('Publish retry %s of %s', countRetries, maxRetries) - const delay = (options.delay || UNOAPI_MESSAGE_RETRY_DELAY) * countRetries + let delay = (options.delay || UNOAPI_MESSAGE_RETRY_DELAY) * countRetries + try { + const err: any = error as any + if (err && (err.code === 'WEBHOOK_CB_OPEN' || err.name === 'WebhookCircuitOpenError')) { + delay = err.delayMs || WEBHOOK_CB_REQUEUE_DELAY_MS || delay + logger.info('WEBHOOK_CB requeue delay %s ms (queue=%s)', delay, queue) + } + } catch {} await amqpPublish(exchange, queue, routingKey, data, { delay, maxRetries, countRetries, type: options.type }) } await channel?.ack(payload) diff --git a/src/defaults.ts b/src/defaults.ts index 4e84266c..cdc0dbfb 100644 --- a/src/defaults.ts +++ b/src/defaults.ts @@ -71,6 +71,7 @@ export const WEBHOOK_CB_ENABLED = export const WEBHOOK_CB_FAILURE_THRESHOLD = parseInt(process.env.WEBHOOK_CB_FAILURE_THRESHOLD || '1') export const WEBHOOK_CB_OPEN_MS = parseInt(process.env.WEBHOOK_CB_OPEN_MS || '120000') export const WEBHOOK_CB_FAILURE_TTL_MS = parseInt(process.env.WEBHOOK_CB_FAILURE_TTL_MS || '300000') +export const WEBHOOK_CB_REQUEUE_DELAY_MS = parseInt(process.env.WEBHOOK_CB_REQUEUE_DELAY_MS || '300000') export const AMQP_URL = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672' export const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379' export const PROXY_URL = process.env.PROXY_URL diff --git a/src/services/outgoing_cloud_api.ts b/src/services/outgoing_cloud_api.ts index 8bb05222..54fe0fc4 100644 --- a/src/services/outgoing_cloud_api.ts +++ b/src/services/outgoing_cloud_api.ts @@ -5,9 +5,18 @@ import logger from './logger' import { completeCloudApiWebHook, isGroupMessage, isOutgoingMessage, isNewsletterMessage, isUpdateMessage, extractDestinyPhone, extractFromPhone } from './transformer' import { addToBlacklist, isInBlacklist } from './blacklist' import { PublishOption } from '../amqp' -import { WEBHOOK_CB_ENABLED, WEBHOOK_CB_FAILURE_THRESHOLD, WEBHOOK_CB_OPEN_MS, WEBHOOK_CB_FAILURE_TTL_MS } from '../defaults' +import { WEBHOOK_CB_ENABLED, WEBHOOK_CB_FAILURE_THRESHOLD, WEBHOOK_CB_OPEN_MS, WEBHOOK_CB_FAILURE_TTL_MS, WEBHOOK_CB_REQUEUE_DELAY_MS } from '../defaults' import { isWebhookCircuitOpen, openWebhookCircuit, closeWebhookCircuit, bumpWebhookCircuitFailure } from './redis' +class WebhookCircuitOpenError extends Error { + public code = 'WEBHOOK_CB_OPEN' + public delayMs: number + constructor(message: string, delayMs: number) { + super(message) + this.delayMs = delayMs + } +} + export class OutgoingCloudApi implements Outgoing { private getConfig: getConfig private isInBlacklist: isInBlacklist @@ -40,12 +49,12 @@ export class OutgoingCloudApi implements Outgoing { const open = await isWebhookCircuitOpen(phone, cbId) if (open) { logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId) - return + throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs()) } } catch {} if (isCircuitOpenLocal(cbKey, now)) { logger.warn('WEBHOOK_CB open (local): skipping send (phone=%s webhook=%s)', phone, cbId) - return + throw new WebhookCircuitOpenError(`WEBHOOK_CB open (local) for ${cbId}`, this.cbRequeueDelayMs()) } } const destinyPhone = await this.isInBlacklist(phone, webhook.id, message) @@ -109,8 +118,10 @@ export class OutgoingCloudApi implements Outgoing { logger.error('Error on send to url %s with headers %s and body %s', url, JSON.stringify(headers), body) logger.error(error) if (cbEnabled) { - await this.handleCircuitFailure(phone, cbId, cbKey, error as any) - return + const opened = await this.handleCircuitFailure(phone, cbId, cbKey, error as any) + if (opened) { + throw new WebhookCircuitOpenError(`WEBHOOK_CB opened for ${cbId}`, this.cbRequeueDelayMs()) + } } throw error } @@ -118,8 +129,10 @@ export class OutgoingCloudApi implements Outgoing { if (!response?.ok) { const errText = await response?.text() if (cbEnabled) { - await this.handleCircuitFailure(phone, cbId, cbKey, errText) - return + const opened = await this.handleCircuitFailure(phone, cbId, cbKey, errText) + if (opened) { + throw new WebhookCircuitOpenError(`WEBHOOK_CB opened for ${cbId}`, this.cbRequeueDelayMs()) + } } throw errText } @@ -131,7 +144,11 @@ export class OutgoingCloudApi implements Outgoing { } } - private async handleCircuitFailure(phone: string, cbId: string, cbKey: string, error: any) { + private cbRequeueDelayMs() { + return WEBHOOK_CB_REQUEUE_DELAY_MS || WEBHOOK_CB_OPEN_MS || 120000 + } + + private async handleCircuitFailure(phone: string, cbId: string, cbKey: string, error: any): Promise { try { const threshold = WEBHOOK_CB_FAILURE_THRESHOLD || 1 const openMs = WEBHOOK_CB_OPEN_MS || 120000 @@ -143,13 +160,16 @@ export class OutgoingCloudApi implements Outgoing { await openWebhookCircuit(phone, cbId, openMs) openCircuitLocal(cbKey, openMs) logger.warn('WEBHOOK_CB opened (phone=%s webhook=%s count=%s openMs=%s)', phone, cbId, finalCount, openMs) + return true } else { logger.warn('WEBHOOK_CB failure (phone=%s webhook=%s count=%s/%s)', phone, cbId, finalCount, threshold) + return false } } catch (e) { logger.warn(e as any, 'WEBHOOK_CB failure handler error') } try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {} + return false } } From 022e68c1fdcab98a515943ca238c9d6216427b4e Mon Sep 17 00:00:00 2001 From: caitano28 Date: Fri, 23 Jan 2026 12:28:20 -0400 Subject: [PATCH 04/10] Lower consumer timeout default to 15s --- README.md | 4 ++-- src/defaults.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index e2f4fb1f..ccff1205 100644 --- a/README.md +++ b/README.md @@ -435,7 +435,7 @@ Create a `.env`file and put configuration if you need change default value: This a general env: ```env -CONSUMER_TIMEOUT_MS=miliseconds in timeout for consume job, default is 30000 +CONSUMER_TIMEOUT_MS=miliseconds in timeout for consume job, default is 15000 AVAILABLE_LOCALES=default is `["en", "pt_BR", "pt"]` DEFAULT_LOCALE=locale for notifications status, now possibile is en, pt_BR and pt, default is en, to add new, use docker volume for exempla `/app/dist/src/locales/custom.json` and add `custom` in `AVAILABLE_LOCALES` ONLY_HELLO_TEMPLATE=true sets hello template as the only default template, default false. @@ -526,7 +526,7 @@ WEBHOOK_FORWARD_BUSINESS_ACCOUNT_ID=the business account id of whatsapp cloud ap WEBHOOK_FORWARD_TOKEN=the token of whatsapp cloud api, default is empty WEBHOOK_FORWARD_VERSION=the version of whatsapp cloud api, default is v17.0 WEBHOOK_FORWARD_URL=the url of whatsapp cloud api, default is https://graph.facebook.com -WEBHOOK_FORWARD_TIMEOUT_MS=the timeout for request to whatsapp cloud api, default is 360000 +WEBHOOK_FORWARD_TIMEOUT_MS=the timeout for request to whatsapp cloud api, default is 60000 ``` Circuit breaker behavior: - Counts consecutive webhook failures within `WEBHOOK_CB_FAILURE_TTL_MS`. diff --git a/src/defaults.ts b/src/defaults.ts index cdc0dbfb..48179983 100644 --- a/src/defaults.ts +++ b/src/defaults.ts @@ -46,7 +46,7 @@ export const WEBHOOK_TIMEOUT_MS = parseInt(process.env.WEBHOOK_TIMEOUT_MS || '60 export const FETCH_TIMEOUT_MS = parseInt(process.env.FETCH_TIMEOUT_MS || '60000') export const CONNECTION_TYPE = process.env.CONNECTION_TYPE || 'qrcode' -export const CONSUMER_TIMEOUT_MS = parseInt(process.env.CONSUMER_TIMEOUT_MS || '360000') +export const CONSUMER_TIMEOUT_MS = parseInt(process.env.CONSUMER_TIMEOUT_MS || '15000') export const WEBHOOK_SEND_NEW_MESSAGES = process.env.WEBHOOK_SEND_NEW_MESSAGES == _undefined ? false : process.env.WEBHOOK_SEND_NEW_MESSAGES == 'true' export const WEBHOOK_SEND_INCOMING_MESSAGES = process.env.WEBHOOK_SEND_INCOMING_MESSAGES == _undefined ? true : process.env.WEBHOOK_SEND_INCOMING_MESSAGES == 'true' From df6957dec1e9228d0fae7a547ba888dbc8c06361 Mon Sep 17 00:00:00 2001 From: caitano28 Date: Fri, 23 Jan 2026 13:24:12 -0400 Subject: [PATCH 05/10] Add periodic cleanup for local CB state --- src/services/outgoing_cloud_api.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/services/outgoing_cloud_api.ts b/src/services/outgoing_cloud_api.ts index 54fe0fc4..3a943e6f 100644 --- a/src/services/outgoing_cloud_api.ts +++ b/src/services/outgoing_cloud_api.ts @@ -175,8 +175,11 @@ export class OutgoingCloudApi implements Outgoing { const cbOpenUntil: Map = new Map() const cbFailState: Map = new Map() +let cbLastCleanup = 0 +const CB_CLEANUP_INTERVAL_MS = 60 * 60 * 1000 const isCircuitOpenLocal = (key: string, now: number) => { + maybeCleanupLocalCircuit(now) const until = cbOpenUntil.get(key) if (!until) return false if (now >= until) { @@ -187,16 +190,19 @@ const isCircuitOpenLocal = (key: string, now: number) => { } const openCircuitLocal = (key: string, openMs: number) => { + maybeCleanupLocalCircuit(Date.now()) cbOpenUntil.set(key, Date.now() + Math.max(1, openMs || 0)) } const resetCircuitLocal = (key: string) => { + maybeCleanupLocalCircuit(Date.now()) cbOpenUntil.delete(key) cbFailState.delete(key) } const bumpCircuitFailureLocal = (key: string, ttlMs: number): number => { const now = Date.now() + maybeCleanupLocalCircuit(now) const ttl = Math.max(1, ttlMs || 0) const current = cbFailState.get(key) if (!current || now >= current.exp) { @@ -206,3 +212,14 @@ const bumpCircuitFailureLocal = (key: string, ttlMs: number): number => { current.count += 1 return current.count } + +const maybeCleanupLocalCircuit = (now: number) => { + if (now - cbLastCleanup < CB_CLEANUP_INTERVAL_MS) return + cbLastCleanup = now + for (const [key, until] of cbOpenUntil) { + if (now >= until) cbOpenUntil.delete(key) + } + for (const [key, st] of cbFailState) { + if (now >= st.exp) cbFailState.delete(key) + } +} From 7602d5d5710cb2bc63d159a63f05383a5e81a55f Mon Sep 17 00:00:00 2001 From: caitano28 Date: Fri, 23 Jan 2026 13:26:01 -0400 Subject: [PATCH 06/10] Add env for local CB cleanup interval --- README.md | 2 ++ src/defaults.ts | 1 + src/services/outgoing_cloud_api.ts | 4 ++-- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index ccff1205..fe50aa8e 100644 --- a/README.md +++ b/README.md @@ -493,6 +493,7 @@ WEBHOOK_CB_FAILURE_THRESHOLD=number of failures within window to open circuit, d WEBHOOK_CB_OPEN_MS=how long to keep the circuit open (skip sends), default 120000 WEBHOOK_CB_FAILURE_TTL_MS=failure counter window in ms, default 300000 WEBHOOK_CB_REQUEUE_DELAY_MS=delay (ms) used to requeue when circuit is open, default 300000 +WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS=local CB map cleanup interval (ms), default 3600000 WEBHOOK_SEND_NEW_MESSAGES=true, send new messages to webhook, caution with this, messages will be duplicated, default is false WEBHOOK_SEND_GROUP_MESSAGES=true, send group messages to webhook, default is true WEBHOOK_SEND_OUTGOING_MESSAGES=true, send outgoing messages to webhook, default is true @@ -545,6 +546,7 @@ WEBHOOK_CB_FAILURE_THRESHOLD=1 WEBHOOK_CB_FAILURE_TTL_MS=300000 WEBHOOK_CB_OPEN_MS=120000 WEBHOOK_CB_REQUEUE_DELAY_MS=300000 +WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS=3600000 WEBHOOK_TIMEOUT_MS=60000 ``` diff --git a/src/defaults.ts b/src/defaults.ts index 48179983..2c0f0446 100644 --- a/src/defaults.ts +++ b/src/defaults.ts @@ -72,6 +72,7 @@ export const WEBHOOK_CB_FAILURE_THRESHOLD = parseInt(process.env.WEBHOOK_CB_FAIL export const WEBHOOK_CB_OPEN_MS = parseInt(process.env.WEBHOOK_CB_OPEN_MS || '120000') export const WEBHOOK_CB_FAILURE_TTL_MS = parseInt(process.env.WEBHOOK_CB_FAILURE_TTL_MS || '300000') export const WEBHOOK_CB_REQUEUE_DELAY_MS = parseInt(process.env.WEBHOOK_CB_REQUEUE_DELAY_MS || '300000') +export const WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS = parseInt(process.env.WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS || '3600000') export const AMQP_URL = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672' export const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379' export const PROXY_URL = process.env.PROXY_URL diff --git a/src/services/outgoing_cloud_api.ts b/src/services/outgoing_cloud_api.ts index 3a943e6f..f2896813 100644 --- a/src/services/outgoing_cloud_api.ts +++ b/src/services/outgoing_cloud_api.ts @@ -5,7 +5,7 @@ import logger from './logger' import { completeCloudApiWebHook, isGroupMessage, isOutgoingMessage, isNewsletterMessage, isUpdateMessage, extractDestinyPhone, extractFromPhone } from './transformer' import { addToBlacklist, isInBlacklist } from './blacklist' import { PublishOption } from '../amqp' -import { WEBHOOK_CB_ENABLED, WEBHOOK_CB_FAILURE_THRESHOLD, WEBHOOK_CB_OPEN_MS, WEBHOOK_CB_FAILURE_TTL_MS, WEBHOOK_CB_REQUEUE_DELAY_MS } from '../defaults' +import { WEBHOOK_CB_ENABLED, WEBHOOK_CB_FAILURE_THRESHOLD, WEBHOOK_CB_OPEN_MS, WEBHOOK_CB_FAILURE_TTL_MS, WEBHOOK_CB_REQUEUE_DELAY_MS, WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS } from '../defaults' import { isWebhookCircuitOpen, openWebhookCircuit, closeWebhookCircuit, bumpWebhookCircuitFailure } from './redis' class WebhookCircuitOpenError extends Error { @@ -176,7 +176,7 @@ export class OutgoingCloudApi implements Outgoing { const cbOpenUntil: Map = new Map() const cbFailState: Map = new Map() let cbLastCleanup = 0 -const CB_CLEANUP_INTERVAL_MS = 60 * 60 * 1000 +const CB_CLEANUP_INTERVAL_MS = WEBHOOK_CB_LOCAL_CLEANUP_INTERVAL_MS || 60 * 60 * 1000 const isCircuitOpenLocal = (key: string, now: number) => { maybeCleanupLocalCircuit(now) From f4edfc054fd59c544e35d3e02d129169149be318 Mon Sep 17 00:00:00 2001 From: caitano28 Date: Fri, 23 Jan 2026 13:33:28 -0400 Subject: [PATCH 07/10] Fix CB open check and add cleanup env --- src/services/outgoing_cloud_api.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/services/outgoing_cloud_api.ts b/src/services/outgoing_cloud_api.ts index f2896813..60908477 100644 --- a/src/services/outgoing_cloud_api.ts +++ b/src/services/outgoing_cloud_api.ts @@ -45,13 +45,14 @@ export class OutgoingCloudApi implements Outgoing { const cbKey = `${phone}:${cbId}` const now = Date.now() if (cbEnabled) { + let open = false try { - const open = await isWebhookCircuitOpen(phone, cbId) - if (open) { - logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId) - throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs()) - } + open = await isWebhookCircuitOpen(phone, cbId) } catch {} + if (open) { + logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId) + throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs()) + } if (isCircuitOpenLocal(cbKey, now)) { logger.warn('WEBHOOK_CB open (local): skipping send (phone=%s webhook=%s)', phone, cbId) throw new WebhookCircuitOpenError(`WEBHOOK_CB open (local) for ${cbId}`, this.cbRequeueDelayMs()) From a6e13db9075626a81b71107b04379e6250e722a4 Mon Sep 17 00:00:00 2001 From: caitano28 Date: Fri, 23 Jan 2026 13:39:48 -0400 Subject: [PATCH 08/10] Fix duplicate key in README example --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index fe50aa8e..9779ec68 100644 --- a/README.md +++ b/README.md @@ -578,7 +578,6 @@ The `.env` can be save one config, but on redis use different webhook by session "token": "kslflkhlkwq", "header": "api_access_token", "sendGroupMessages": false, - "sendGroupMessages": false, "sendNewMessages": false, } ], From 77b01ca85411cd8b24aa889d8e939fea554b4e20 Mon Sep 17 00:00:00 2001 From: caitano28 Date: Fri, 23 Jan 2026 14:50:43 -0400 Subject: [PATCH 09/10] Log original error when CB handler fails --- src/services/outgoing_cloud_api.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/services/outgoing_cloud_api.ts b/src/services/outgoing_cloud_api.ts index 60908477..7d72fbd1 100644 --- a/src/services/outgoing_cloud_api.ts +++ b/src/services/outgoing_cloud_api.ts @@ -167,7 +167,10 @@ export class OutgoingCloudApi implements Outgoing { return false } } catch (e) { - logger.warn(e as any, 'WEBHOOK_CB failure handler error') + logger.warn(e as any, 'WEBHOOK_CB failure handler error (phone=%s webhook=%s)', phone, cbId) + try { logger.warn(error as any, 'WEBHOOK_CB original error (phone=%s webhook=%s)', phone, cbId) } catch {} + // If the CB handler fails, fall back to the original error path (no circuit open) + return false } try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {} return false @@ -224,3 +227,4 @@ const maybeCleanupLocalCircuit = (now: number) => { if (now >= st.exp) cbFailState.delete(key) } } + From 82a6f8c855379c7cbe6ff0c0028aa46bef840e2d Mon Sep 17 00:00:00 2001 From: caitano28 Date: Fri, 23 Jan 2026 15:20:57 -0400 Subject: [PATCH 10/10] Fix webhook CB error handling --- src/services/outgoing_cloud_api.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/services/outgoing_cloud_api.ts b/src/services/outgoing_cloud_api.ts index 7d72fbd1..f45b0d37 100644 --- a/src/services/outgoing_cloud_api.ts +++ b/src/services/outgoing_cloud_api.ts @@ -129,13 +129,14 @@ export class OutgoingCloudApi implements Outgoing { logger.debug('Response: %s', response?.status) if (!response?.ok) { const errText = await response?.text() + const err = new Error(`Webhook response ${response?.status} ${response?.statusText}: ${errText}`) if (cbEnabled) { - const opened = await this.handleCircuitFailure(phone, cbId, cbKey, errText) + const opened = await this.handleCircuitFailure(phone, cbId, cbKey, err) if (opened) { throw new WebhookCircuitOpenError(`WEBHOOK_CB opened for ${cbId}`, this.cbRequeueDelayMs()) } } - throw errText + throw err } if (cbEnabled) { try { @@ -164,6 +165,7 @@ export class OutgoingCloudApi implements Outgoing { return true } else { logger.warn('WEBHOOK_CB failure (phone=%s webhook=%s count=%s/%s)', phone, cbId, finalCount, threshold) + try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {} return false } } catch (e) { @@ -172,8 +174,6 @@ export class OutgoingCloudApi implements Outgoing { // If the CB handler fails, fall back to the original error path (no circuit open) return false } - try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {} - return false } }