From 70ffdd19cafa1478998d0b0c64fd353a550999c3 Mon Sep 17 00:00:00 2001 From: waleed Date: Sat, 27 Jun 2026 14:51:26 -0700 Subject: [PATCH 1/7] improvement(logs): move per-block progress markers to Redis to cut write amplification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-block lastStartedBlock/lastCompletedBlock markers were persisted via a jsonb_set UPDATE on workflow_execution_logs on every block start and complete (~2N UPDATEs per run) — the heaviest write query in the DB. These are live progress breadcrumbs with no DB-polling consumer (live progress comes from the executor over WebSocket); their only durable value is a breadcrumb folded into the final record. Behind the redis-progress-markers flag, markers now live in Redis during the run and are folded into the single terminal UPDATE at completion, dropping per-run row UPDATEs from ~2N+1 to 1. - New progress-markers module: HASH execution:progress:{id}, atomic Lua monotonic-guard writes preserving the existing <= ordering, reservation-aligned TTL backstop, graceful no-op when Redis is unavailable - Deterministic GC: cleared at every terminal/pause boundary; TTL covers crashes - Flag resolved once per logging session so a run never mixes write paths - Fold markers into the completion record (Redis wins, falls back to row markers) - Merge live markers for in-flight detail reads - Extract shared getExecutionReservationTtlMs so marker and admission-slot TTLs share one source of truth --- .../billing/calculations/usage-reservation.ts | 7 +- apps/sim/lib/core/config/env.ts | 1 + apps/sim/lib/core/config/feature-flags.ts | 8 + apps/sim/lib/core/execution-limits/types.ts | 13 ++ apps/sim/lib/logs/execution/logger.ts | 31 +++- .../logs/execution/logging-session.test.ts | 82 +++++++++ .../sim/lib/logs/execution/logging-session.ts | 39 ++++ .../logs/execution/progress-markers.test.ts | 154 ++++++++++++++++ .../lib/logs/execution/progress-markers.ts | 167 ++++++++++++++++++ apps/sim/lib/logs/fetch-log-detail.ts | 8 + 10 files changed, 499 insertions(+), 11 deletions(-) create mode 100644 apps/sim/lib/logs/execution/progress-markers.test.ts create mode 100644 apps/sim/lib/logs/execution/progress-markers.ts diff --git a/apps/sim/lib/billing/calculations/usage-reservation.ts b/apps/sim/lib/billing/calculations/usage-reservation.ts index a66e9e43f7f..9cb85a5b68e 100644 --- a/apps/sim/lib/billing/calculations/usage-reservation.ts +++ b/apps/sim/lib/billing/calculations/usage-reservation.ts @@ -5,7 +5,7 @@ import { getPlanTypeForLimits } from '@/lib/billing/plan-helpers' import { isOrgScopedSubscription } from '@/lib/billing/subscriptions/utils' import { isBillingEnabled } from '@/lib/core/config/env-flags' import { getRedisClient } from '@/lib/core/config/redis' -import { getMaxExecutionTimeout } from '@/lib/core/execution-limits' +import { getExecutionReservationTtlMs } from '@/lib/core/execution-limits' import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types' const logger = createLogger('UsageReservation') @@ -41,9 +41,6 @@ const MAX_CONCURRENT_EXECUTIONS: Record = { */ const SLOT_COST_ESTIMATE = BASE_EXECUTION_CHARGE -/** Safety buffer added to the reservation TTL beyond the max execution timeout. */ -const RESERVATION_TTL_BUFFER_MS = 60_000 - const INFLIGHT_KEY_PREFIX = 'usage:inflight:' const POINTER_KEY_PREFIX = 'usage:reservation:' @@ -135,7 +132,7 @@ export async function reserveExecutionSlot( const maxConcurrency = getMaxConcurrentExecutions(subscription?.plan) const headroom = Math.max(0, limit - currentUsage) const headroomSlots = Math.floor(headroom / SLOT_COST_ESTIMATE) - const ttlMs = getMaxExecutionTimeout() + RESERVATION_TTL_BUFFER_MS + const ttlMs = getExecutionReservationTtlMs() const now = Date.now() const expiryScore = now + ttlMs diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 68371cfccb5..ace19c05b10 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -77,6 +77,7 @@ export const env = createEnv({ TABLE_SNAPSHOT_CACHE: z.boolean().optional(), // Mount tables into sandboxes by reference via a version-keyed CSV snapshot in object storage instead of draining the whole table into web-process heap PII_REDACTION: z.boolean().optional(), // Redact PII from workflow logs via configurable Data Retention rules (Presidio at the logger persist choke point) and expose the Data Retention config UI TRIGGER_EU_REGION: z.boolean().optional(), // Route Trigger.dev runs to eu-central-1 instead of the default us-east-1 (fallback for the trigger-eu-region flag when AppConfig is not the source of truth) + REDIS_PROGRESS_MARKERS: z.boolean().optional(), // Write per-block live progress markers to Redis instead of jsonb_set UPDATEs on workflow_execution_logs (fallback for the redis-progress-markers flag when AppConfig is not the source of truth) // Table feature limits (per plan). Apply when billing is disabled (free tier defaults) or for billed plans. FREE_TABLES_LIMIT: z.number().optional(), // Max user tables per workspace on free tier (default: 5) diff --git a/apps/sim/lib/core/config/feature-flags.ts b/apps/sim/lib/core/config/feature-flags.ts index 658b57105e8..9d0847a7152 100644 --- a/apps/sim/lib/core/config/feature-flags.ts +++ b/apps/sim/lib/core/config/feature-flags.ts @@ -97,6 +97,14 @@ const FEATURE_FLAGS = { 'resolveTriggerRegion, so the whole deployment switches regions together.', fallback: 'TRIGGER_EU_REGION', }, + 'redis-progress-markers': { + description: + 'Write per-block live progress markers (lastStartedBlock/lastCompletedBlock) to Redis ' + + 'instead of jsonb_set UPDATEs on workflow_execution_logs, folding them into the single ' + + 'terminal UPDATE at completion. Eliminates the heaviest write query. Resolved once per ' + + 'logging session (no user/org context) so an execution never mixes write paths.', + fallback: 'REDIS_PROGRESS_MARKERS', + }, } satisfies Record /** diff --git a/apps/sim/lib/core/execution-limits/types.ts b/apps/sim/lib/core/execution-limits/types.ts index ae1174aadb3..a76461e54d5 100644 --- a/apps/sim/lib/core/execution-limits/types.ts +++ b/apps/sim/lib/core/execution-limits/types.ts @@ -75,6 +75,19 @@ export function getMaxExecutionTimeout(): number { return EXECUTION_TIMEOUTS.enterprise.async } +/** Safety buffer added beyond the max execution timeout for execution-lifetime TTLs. */ +export const RESERVATION_TTL_BUFFER_MS = 60_000 + +/** + * TTL (ms) bounding how long a single execution can remain in flight: the max + * execution timeout plus a safety buffer. Shared source of truth for the + * admission-reservation key and the live progress-marker key so they expire on + * the same timeline. + */ +export function getExecutionReservationTtlMs(): number { + return getMaxExecutionTimeout() + RESERVATION_TTL_BUFFER_MS +} + export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync export function isTimeoutError(error: unknown): boolean { diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index ae99addff41..9c7e72f7b5a 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -37,6 +37,11 @@ import { replaceLargeValueReferenceKeysWithClient, } from '@/lib/execution/payloads/large-value-metadata' import { type RedactablePayload, redactPIIFromExecution } from '@/lib/logs/execution/pii-redaction' +import { + clearProgressMarkers, + type ExecutionProgressMarkers, + getProgressMarkers, +} from '@/lib/logs/execution/progress-markers' import { snapshotService } from '@/lib/logs/execution/snapshot/service' import { externalizeExecutionData, @@ -417,8 +422,15 @@ export class ExecutionLogger implements IExecutionLoggerService { return minimalWithSize.executionData } + /** + * Assemble the final `execution_data` for the terminal UPDATE. Live progress + * markers are sourced from `progressMarkers` (Redis, current run) and fall back + * to markers already on the row — the legacy SQL path and resumed rows that + * folded markers in at their prior pause boundary. + */ private buildCompletedExecutionData(params: { existingExecutionData?: WorkflowExecutionLog['executionData'] + progressMarkers?: ExecutionProgressMarkers traceSpans?: TraceSpan[] finalOutput: BlockOutputData finalizationPath?: ExecutionFinalizationPath @@ -436,6 +448,7 @@ export class ExecutionLogger implements IExecutionLoggerService { }): WorkflowExecutionLog['executionData'] { const { existingExecutionData, + progressMarkers, traceSpans, finalOutput, finalizationPath, @@ -446,6 +459,11 @@ export class ExecutionLogger implements IExecutionLoggerService { } = params const traceSpanCount = countTraceSpans(traceSpans) + const lastStartedBlock = + progressMarkers?.lastStartedBlock ?? existingExecutionData?.lastStartedBlock + const lastCompletedBlock = + progressMarkers?.lastCompletedBlock ?? existingExecutionData?.lastCompletedBlock + return { ...(existingExecutionData?.environment ? { environment: existingExecutionData.environment } @@ -459,12 +477,8 @@ export class ExecutionLogger implements IExecutionLoggerService { } : {}), ...(existingExecutionData?.error ? { error: existingExecutionData.error } : {}), - ...(existingExecutionData?.lastStartedBlock - ? { lastStartedBlock: existingExecutionData.lastStartedBlock } - : {}), - ...(existingExecutionData?.lastCompletedBlock - ? { lastCompletedBlock: existingExecutionData.lastCompletedBlock } - : {}), + ...(lastStartedBlock ? { lastStartedBlock } : {}), + ...(lastCompletedBlock ? { lastCompletedBlock } : {}), ...(completionFailure ? { completionFailure } : {}), ...(finalizationPath ? { finalizationPath } : {}), hasTraceSpans: traceSpanCount > 0, @@ -731,8 +745,11 @@ export class ExecutionLogger implements IExecutionLoggerService { models: costSummary.models, } + const progressMarkers = await getProgressMarkers(executionId) + const builtExecutionData = this.buildCompletedExecutionData({ existingExecutionData, + progressMarkers, traceSpans: mergedTraceSpans, finalOutput, finalizationPath, @@ -893,6 +910,8 @@ export class ExecutionLogger implements IExecutionLoggerService { return log }) + void clearProgressMarkers(executionId) + try { // Skip workflow lookup if workflow was deleted. const wf = updatedLog.workflowId diff --git a/apps/sim/lib/logs/execution/logging-session.test.ts b/apps/sim/lib/logs/execution/logging-session.test.ts index 7ad138d0f93..6d5f056f432 100644 --- a/apps/sim/lib/logs/execution/logging-session.test.ts +++ b/apps/sim/lib/logs/execution/logging-session.test.ts @@ -66,6 +66,28 @@ vi.mock('@/lib/logs/execution/logger', () => ({ }, })) +const { + isFeatureEnabledMock, + setLastStartedBlockMock, + setLastCompletedBlockMock, + clearProgressMarkersMock, +} = vi.hoisted(() => ({ + isFeatureEnabledMock: vi.fn().mockResolvedValue(false), + setLastStartedBlockMock: vi.fn().mockResolvedValue(undefined), + setLastCompletedBlockMock: vi.fn().mockResolvedValue(undefined), + clearProgressMarkersMock: vi.fn().mockResolvedValue(undefined), +})) + +vi.mock('@/lib/core/config/feature-flags', () => ({ + isFeatureEnabled: isFeatureEnabledMock, +})) + +vi.mock('@/lib/logs/execution/progress-markers', () => ({ + setLastStartedBlock: setLastStartedBlockMock, + setLastCompletedBlock: setLastCompletedBlockMock, + clearProgressMarkers: clearProgressMarkersMock, +})) + vi.mock('@/lib/logs/execution/logging-factory', () => ({ calculateCostSummary: vi.fn().mockReturnValue({ totalCost: 0, @@ -647,4 +669,64 @@ describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => { const combined = String(Array.from(strings)).toLowerCase() + values.join(' ').toLowerCase() expect(combined).toContain('force_failed') }) + + it('clears Redis markers when marking failed (terminal boundary outside completeWorkflowExecution)', async () => { + await LoggingSession.markExecutionAsFailed('exec-3', 'boom', undefined, 'wf-3') + expect(clearProgressMarkersMock).toHaveBeenCalledWith('exec-3') + }) +}) + +describe('LoggingSession progress-marker write path', () => { + beforeEach(() => { + vi.clearAllMocks() + startWorkflowExecutionMock.mockResolvedValue({}) + loadWorkflowStateForExecutionMock.mockResolvedValue({ + blocks: {}, + edges: [], + loops: {}, + parallels: {}, + }) + dbMocks.execute.mockResolvedValue(undefined) + }) + + it('writes markers to Redis (not the row) when the flag is on', async () => { + isFeatureEnabledMock.mockResolvedValue(true) + const session = new LoggingSession('wf-1', 'exec-redis', 'manual', 'req-1') + await session.start({ workspaceId: 'ws-1' }) + + await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z') + await session.onBlockComplete('b1', 'Fetch', 'api', { endedAt: '2026-06-27T10:00:01.000Z' }) + + expect(setLastStartedBlockMock).toHaveBeenCalledWith( + 'exec-redis', + expect.objectContaining({ blockId: 'b1', startedAt: '2026-06-27T10:00:00.000Z' }) + ) + expect(setLastCompletedBlockMock).toHaveBeenCalledWith( + 'exec-redis', + expect.objectContaining({ blockId: 'b1', success: true }) + ) + expect(dbMocks.execute).not.toHaveBeenCalled() + }) + + it('writes markers via jsonb_set UPDATE when the flag is off', async () => { + isFeatureEnabledMock.mockResolvedValue(false) + const session = new LoggingSession('wf-1', 'exec-sql', 'manual', 'req-1') + await session.start({ workspaceId: 'ws-1' }) + + await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z') + + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + expect(setLastStartedBlockMock).not.toHaveBeenCalled() + }) + + it('falls back to the SQL path when flag resolution throws', async () => { + isFeatureEnabledMock.mockRejectedValue(new Error('appconfig unavailable')) + const session = new LoggingSession('wf-1', 'exec-fallback', 'manual', 'req-1') + await session.start({ workspaceId: 'ws-1' }) + + await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z') + + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + expect(setLastStartedBlockMock).not.toHaveBeenCalled() + }) }) diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index a63aa3fb309..9152a9a71e5 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { describeError, toError } from '@sim/utils/errors' import { and, eq, sql } from 'drizzle-orm' import { releaseExecutionSlot } from '@/lib/billing/calculations/usage-reservation' +import { isFeatureEnabled } from '@/lib/core/config/feature-flags' import { isRetryableInfrastructureError } from '@/lib/core/errors/retryable-infrastructure' import { executionLogger } from '@/lib/logs/execution/logger' import { @@ -13,6 +14,11 @@ import { loadDeployedWorkflowStateForLogging, loadWorkflowStateForExecution, } from '@/lib/logs/execution/logging-factory' +import { + clearProgressMarkers, + setLastCompletedBlock, + setLastStartedBlock, +} from '@/lib/logs/execution/progress-markers' import type { ExecutionEnvironment, ExecutionFinalizationPath, @@ -129,6 +135,13 @@ export class LoggingSession { private workflowState?: WorkflowState private correlation?: NonNullable['correlation'] private isResume = false + /** + * Whether per-block progress markers go to Redis (vs jsonb_set UPDATEs on the + * log row). Resolved once in {@link start} and cached so an execution never + * mixes write paths across its block callbacks. Defaults to the legacy SQL + * path until resolved. + */ + private useRedisMarkers = false private completed = false /** Synchronous flag to prevent concurrent completion attempts (race condition guard) */ private completing = false @@ -167,7 +180,23 @@ export class LoggingSession { ) } + /** + * Resolve the per-block marker write path (Redis vs jsonb_set UPDATE) for this + * session. Defaults to the legacy SQL path if flag resolution fails. + */ + private async resolveRedisMarkerMode(): Promise { + try { + return await isFeatureEnabled('redis-progress-markers') + } catch { + return false + } + } + private async persistLastStartedBlock(marker: ExecutionLastStartedBlock): Promise { + if (this.useRedisMarkers) { + await setLastStartedBlock(this.executionId, marker) + return + } try { await db.execute( buildStartedMarkerPersistenceQuery({ @@ -186,6 +215,10 @@ export class LoggingSession { } private async persistLastCompletedBlock(marker: ExecutionLastCompletedBlock): Promise { + if (this.useRedisMarkers) { + await setLastCompletedBlock(this.executionId, marker) + return + } try { await db.execute( buildCompletedMarkerPersistenceQuery({ @@ -313,6 +346,8 @@ export class LoggingSession { } = params try { + this.useRedisMarkers = await this.resolveRedisMarkerMode() + this.trigger = createTriggerObject(this.triggerType, triggerData) this.correlation = triggerData?.correlation this.environment = createEnvironmentObject( @@ -1018,6 +1053,10 @@ export class LoggingSession { ) ) + // Terminal boundary that bypasses completeWorkflowExecution; clear markers + // so this path matches the standard clear-at-every-boundary invariant. + void clearProgressMarkers(executionId) + logger.info(`[${requestId || 'unknown'}] Marked execution ${executionId} as failed`) } catch (error) { logger.error(`Failed to mark execution ${executionId} as failed:`, { diff --git a/apps/sim/lib/logs/execution/progress-markers.test.ts b/apps/sim/lib/logs/execution/progress-markers.test.ts new file mode 100644 index 00000000000..dd380801c09 --- /dev/null +++ b/apps/sim/lib/logs/execution/progress-markers.test.ts @@ -0,0 +1,154 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import type { ExecutionLastCompletedBlock, ExecutionLastStartedBlock } from '@/lib/logs/types' + +const { mockGetRedisClient, mockRedis } = vi.hoisted(() => { + const mockRedis = { + eval: vi.fn(), + hgetall: vi.fn(), + del: vi.fn(), + } + return { mockGetRedisClient: vi.fn<[], typeof mockRedis | null>(() => mockRedis), mockRedis } +}) + +vi.mock('@/lib/core/config/redis', () => ({ + getRedisClient: mockGetRedisClient, +})) + +vi.mock('@/lib/core/execution-limits', () => ({ + getExecutionReservationTtlMs: () => 5_460_000, +})) + +import { + clearProgressMarkers, + getProgressMarkers, + setLastCompletedBlock, + setLastStartedBlock, +} from '@/lib/logs/execution/progress-markers' + +const EXECUTION_ID = 'exec-1' +const KEY = `execution:progress:${EXECUTION_ID}` +const EXPECTED_TTL_MS = '5460000' // getExecutionReservationTtlMs() mock value + +const startedMarker: ExecutionLastStartedBlock = { + blockId: 'b1', + blockName: 'Fetch', + blockType: 'api', + startedAt: '2026-06-27T10:00:00.000Z', +} + +const completedMarker: ExecutionLastCompletedBlock = { + blockId: 'b1', + blockName: 'Fetch', + blockType: 'api', + endedAt: '2026-06-27T10:00:01.000Z', + success: true, +} + +describe('progress-markers', () => { + beforeEach(() => { + vi.clearAllMocks() + mockGetRedisClient.mockReturnValue(mockRedis) + mockRedis.eval.mockResolvedValue(1) + mockRedis.hgetall.mockResolvedValue({}) + mockRedis.del.mockResolvedValue(1) + }) + + describe('setLastStartedBlock', () => { + it('evals the monotonic-guard script with key, started field, timestamp, json, and TTL', async () => { + await setLastStartedBlock(EXECUTION_ID, startedMarker) + + expect(mockRedis.eval).toHaveBeenCalledTimes(1) + const args = mockRedis.eval.mock.calls[0] + // [script, numKeys, key, field, tsField, timestamp, json, ttl] + expect(args[1]).toBe(1) + expect(args[2]).toBe(KEY) + expect(args[3]).toBe('started') + expect(args[4]).toBe('startedAt') + expect(args[5]).toBe(startedMarker.startedAt) + expect(JSON.parse(args[6] as string)).toEqual(startedMarker) + expect(args[7]).toBe(EXPECTED_TTL_MS) + }) + + it('swallows eval errors (fire-and-forget)', async () => { + mockRedis.eval.mockRejectedValueOnce(new Error('redis down')) + await expect(setLastStartedBlock(EXECUTION_ID, startedMarker)).resolves.toBeUndefined() + }) + + it('no-ops when Redis is unavailable', async () => { + mockGetRedisClient.mockReturnValue(null) + await setLastStartedBlock(EXECUTION_ID, startedMarker) + expect(mockRedis.eval).not.toHaveBeenCalled() + }) + }) + + describe('setLastCompletedBlock', () => { + it('evals with the completed field and endedAt timestamp', async () => { + await setLastCompletedBlock(EXECUTION_ID, completedMarker) + + const args = mockRedis.eval.mock.calls[0] + expect(args[3]).toBe('completed') + expect(args[4]).toBe('endedAt') + expect(args[5]).toBe(completedMarker.endedAt) + expect(JSON.parse(args[6] as string)).toEqual(completedMarker) + }) + }) + + describe('getProgressMarkers', () => { + it('parses both markers from the hash', async () => { + mockRedis.hgetall.mockResolvedValueOnce({ + started: JSON.stringify(startedMarker), + completed: JSON.stringify(completedMarker), + }) + + const result = await getProgressMarkers(EXECUTION_ID) + expect(mockRedis.hgetall).toHaveBeenCalledWith(KEY) + expect(result).toEqual({ + lastStartedBlock: startedMarker, + lastCompletedBlock: completedMarker, + }) + }) + + it('returns only the present field', async () => { + mockRedis.hgetall.mockResolvedValueOnce({ started: JSON.stringify(startedMarker) }) + const result = await getProgressMarkers(EXECUTION_ID) + expect(result).toEqual({ lastStartedBlock: startedMarker }) + }) + + it('returns {} for an empty / missing key', async () => { + mockRedis.hgetall.mockResolvedValueOnce({}) + expect(await getProgressMarkers(EXECUTION_ID)).toEqual({}) + }) + + it('returns {} and does not throw on malformed JSON', async () => { + mockRedis.hgetall.mockResolvedValueOnce({ started: '{not json' }) + expect(await getProgressMarkers(EXECUTION_ID)).toEqual({}) + }) + + it('returns {} when Redis is unavailable', async () => { + mockGetRedisClient.mockReturnValue(null) + expect(await getProgressMarkers(EXECUTION_ID)).toEqual({}) + expect(mockRedis.hgetall).not.toHaveBeenCalled() + }) + }) + + describe('clearProgressMarkers', () => { + it('deletes the key', async () => { + await clearProgressMarkers(EXECUTION_ID) + expect(mockRedis.del).toHaveBeenCalledWith(KEY) + }) + + it('swallows del errors', async () => { + mockRedis.del.mockRejectedValueOnce(new Error('redis down')) + await expect(clearProgressMarkers(EXECUTION_ID)).resolves.toBeUndefined() + }) + + it('no-ops when Redis is unavailable', async () => { + mockGetRedisClient.mockReturnValue(null) + await clearProgressMarkers(EXECUTION_ID) + expect(mockRedis.del).not.toHaveBeenCalled() + }) + }) +}) diff --git a/apps/sim/lib/logs/execution/progress-markers.ts b/apps/sim/lib/logs/execution/progress-markers.ts new file mode 100644 index 00000000000..34f609646f2 --- /dev/null +++ b/apps/sim/lib/logs/execution/progress-markers.ts @@ -0,0 +1,167 @@ +import { createLogger } from '@sim/logger' +import { toError } from '@sim/utils/errors' +import { getRedisClient } from '@/lib/core/config/redis' +import { getExecutionReservationTtlMs } from '@/lib/core/execution-limits' +import type { ExecutionLastCompletedBlock, ExecutionLastStartedBlock } from '@/lib/logs/types' + +const logger = createLogger('ExecutionProgressMarkers') + +/** + * Live per-block progress markers (`lastStartedBlock` / `lastCompletedBlock`) + * used to be written to `workflow_execution_logs.execution_data` via a + * `jsonb_set` UPDATE on every block start and complete — ~2N row UPDATEs per + * run and the single heaviest write query in the database. They carry no + * value beyond a breadcrumb folded into the final record (no client polls + * them; live progress comes from the executor over WebSocket), so they live + * in Redis during the run and are folded into the single terminal UPDATE at + * completion. See the logs-contention plan for the full rationale. + */ + +/** Redis key namespace — matches the `execution:*` family (stream, cancel, budget). */ +const PROGRESS_KEY_PREFIX = 'execution:progress:' + +const STARTED_FIELD = 'started' +const COMPLETED_FIELD = 'completed' + +/** + * Single source of the client used for progress markers. Indirection kept so a + * future split to a dedicated `LOGS_REDIS_URL` is a one-line change here. + */ +function getMarkerClient() { + return getRedisClient() +} + +function markerKey(executionId: string): string { + return `${PROGRESS_KEY_PREFIX}${executionId}` +} + +/** + * Atomic monotonic write: set the field only when the incoming marker's embedded + * timestamp is >= the stored one, then refresh the TTL — all in one script so + * concurrent block callbacks can't race a read-modify-write. Preserves the exact + * `<=` ordering the legacy SQL used (`COALESCE(stored, '') <= incoming`). ISO + * UTC timestamps compare correctly lexicographically. + */ +const SET_MARKER_SCRIPT = ` +local existing = redis.call('HGET', KEYS[1], ARGV[1]) +if existing then + local ok, decoded = pcall(cjson.decode, existing) + if ok and decoded[ARGV[2]] and tostring(decoded[ARGV[2]]) > ARGV[3] then + redis.call('PEXPIRE', KEYS[1], ARGV[5]) + return 0 + end +end +redis.call('HSET', KEYS[1], ARGV[1], ARGV[4]) +redis.call('PEXPIRE', KEYS[1], ARGV[5]) +return 1 +` + +/** + * Write a marker field under the monotonic guard, refreshing the key TTL. The + * TTL is a backstop for executions that die without a terminal/pause boundary + * (deterministic cleanup is {@link clearProgressMarkers}); it mirrors the + * admission-reservation TTL so a crashed run's marker key and slot expire + * together. + */ +async function setMarker( + executionId: string, + field: string, + timestampField: 'startedAt' | 'endedAt', + timestamp: string, + marker: ExecutionLastStartedBlock | ExecutionLastCompletedBlock +): Promise { + const redis = getMarkerClient() + if (!redis) return + + try { + await redis.eval( + SET_MARKER_SCRIPT, + 1, + markerKey(executionId), + field, + timestampField, + timestamp, + JSON.stringify(marker), + getExecutionReservationTtlMs().toString() + ) + } catch (error) { + logger.error(`Failed to persist progress marker for execution ${executionId}`, { + field, + error: toError(error).message, + }) + } +} + +/** Persist the last-started-block marker. No-op when Redis is unavailable. */ +export async function setLastStartedBlock( + executionId: string, + marker: ExecutionLastStartedBlock +): Promise { + await setMarker(executionId, STARTED_FIELD, 'startedAt', marker.startedAt, marker) +} + +/** Persist the last-completed-block marker. No-op when Redis is unavailable. */ +export async function setLastCompletedBlock( + executionId: string, + marker: ExecutionLastCompletedBlock +): Promise { + await setMarker(executionId, COMPLETED_FIELD, 'endedAt', marker.endedAt, marker) +} + +export interface ExecutionProgressMarkers { + lastStartedBlock?: ExecutionLastStartedBlock + lastCompletedBlock?: ExecutionLastCompletedBlock +} + +function parseMarker(raw: string | undefined): T | undefined { + if (!raw) return undefined + try { + return JSON.parse(raw) as T + } catch { + return undefined + } +} + +/** + * Read both markers for an execution. Returns an empty object when Redis is + * unavailable, the key has expired, or nothing has been written yet. + */ +export async function getProgressMarkers(executionId: string): Promise { + const redis = getMarkerClient() + if (!redis) return {} + + try { + const fields = await redis.hgetall(markerKey(executionId)) + if (!fields || Object.keys(fields).length === 0) return {} + + const result: ExecutionProgressMarkers = {} + const started = parseMarker(fields[STARTED_FIELD]) + if (started) result.lastStartedBlock = started + const completed = parseMarker(fields[COMPLETED_FIELD]) + if (completed) result.lastCompletedBlock = completed + return result + } catch (error) { + logger.error(`Failed to read progress markers for execution ${executionId}`, { + error: toError(error).message, + }) + return {} + } +} + +/** + * Delete the markers for an execution. Called at every terminal/pause boundary + * after the durable record has been written, so paused executions (which can + * live indefinitely) hold no Redis keys. Fire-and-forget; no-op without Redis. + */ +export async function clearProgressMarkers(executionId: string): Promise { + const redis = getMarkerClient() + if (!redis) return + + try { + await redis.del(markerKey(executionId)) + } catch (error) { + logger.error(`Failed to clear progress markers for execution ${executionId}`, { + error: toError(error).message, + }) + } +} diff --git a/apps/sim/lib/logs/fetch-log-detail.ts b/apps/sim/lib/logs/fetch-log-detail.ts index dd90e80180f..4b57d8eeeb8 100644 --- a/apps/sim/lib/logs/fetch-log-detail.ts +++ b/apps/sim/lib/logs/fetch-log-detail.ts @@ -9,6 +9,7 @@ import { } from '@sim/db/schema' import { and, eq, type SQL } from 'drizzle-orm' import type { CostLedger } from '@/lib/api/contracts/logs' +import { getProgressMarkers } from '@/lib/logs/execution/progress-markers' import { materializeExecutionData } from '@/lib/logs/execution/trace-store' import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' @@ -165,6 +166,12 @@ export async function fetchLogDetail({ { workspaceId, workflowId: log.workflowId, executionId: log.executionId } ) + // In-flight markers live in Redis until folded into the row at a terminal/pause boundary. + const liveMarkers = + log.status === 'running' || log.status === 'pending' + ? await getProgressMarkers(log.executionId) + : {} + return { id: log.id, workflowId: log.workflowId, @@ -190,6 +197,7 @@ export async function fetchLogDetail({ executionData: { totalDuration: log.totalDurationMs, ...executionData, + ...liveMarkers, enhanced: true as const, }, files: log.files ?? null, From f4d8086790d8526c33d628c2a9ece416dfd803eb Mon Sep 17 00:00:00 2001 From: waleed Date: Sat, 27 Jun 2026 15:48:55 -0700 Subject: [PATCH 2/7] fix(logs): SQL fallback when Redis marker write fails, fold markers on force-fail, validate marker shape Addresses review feedback on the redis-progress-markers PR: - persistLast* now falls back to the jsonb_set UPDATE when Redis is unavailable or the write fails (setLast* returns whether it persisted), so a marker is never dropped when the flag is on without a healthy Redis. - markExecutionAsFailed folds live Redis markers into execution_data before clearing, so the last-started/last-completed breadcrumb survives the force-fail path. - getProgressMarkers validates marker shape (rebuilds from typed fields), so a stale or wrong-shaped Redis value can never reach API consumers. --- .../logs/execution/logging-session.test.ts | 45 ++++++++++- .../sim/lib/logs/execution/logging-session.ts | 43 ++++++---- .../logs/execution/progress-markers.test.ts | 27 ++++++- .../lib/logs/execution/progress-markers.ts | 78 +++++++++++++++---- 4 files changed, 159 insertions(+), 34 deletions(-) diff --git a/apps/sim/lib/logs/execution/logging-session.test.ts b/apps/sim/lib/logs/execution/logging-session.test.ts index 6d5f056f432..cd99c3e77ca 100644 --- a/apps/sim/lib/logs/execution/logging-session.test.ts +++ b/apps/sim/lib/logs/execution/logging-session.test.ts @@ -70,11 +70,13 @@ const { isFeatureEnabledMock, setLastStartedBlockMock, setLastCompletedBlockMock, + getProgressMarkersMock, clearProgressMarkersMock, } = vi.hoisted(() => ({ isFeatureEnabledMock: vi.fn().mockResolvedValue(false), - setLastStartedBlockMock: vi.fn().mockResolvedValue(undefined), - setLastCompletedBlockMock: vi.fn().mockResolvedValue(undefined), + setLastStartedBlockMock: vi.fn().mockResolvedValue(false), + setLastCompletedBlockMock: vi.fn().mockResolvedValue(false), + getProgressMarkersMock: vi.fn().mockResolvedValue({}), clearProgressMarkersMock: vi.fn().mockResolvedValue(undefined), })) @@ -85,6 +87,7 @@ vi.mock('@/lib/core/config/feature-flags', () => ({ vi.mock('@/lib/logs/execution/progress-markers', () => ({ setLastStartedBlock: setLastStartedBlockMock, setLastCompletedBlock: setLastCompletedBlockMock, + getProgressMarkers: getProgressMarkersMock, clearProgressMarkers: clearProgressMarkersMock, })) @@ -674,6 +677,28 @@ describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => { await LoggingSession.markExecutionAsFailed('exec-3', 'boom', undefined, 'wf-3') expect(clearProgressMarkersMock).toHaveBeenCalledWith('exec-3') }) + + it('folds live Redis markers into the row before clearing on force-fail', async () => { + getProgressMarkersMock.mockResolvedValueOnce({ + lastStartedBlock: { blockId: 'b1', blockName: 'Fetch', blockType: 'api', startedAt: 't1' }, + lastCompletedBlock: { + blockId: 'b1', + blockName: 'Fetch', + blockType: 'api', + endedAt: 't2', + success: false, + }, + }) + + await LoggingSession.markExecutionAsFailed('exec-9', 'boom', undefined, 'wf-9') + + const folded = dbMocks.sql.mock.calls + .map((c) => String(Array.from(c[0] as TemplateStringsArray))) + .join(' ') + expect(folded).toContain('lastStartedBlock') + expect(folded).toContain('lastCompletedBlock') + expect(clearProgressMarkersMock).toHaveBeenCalledWith('exec-9') + }) }) describe('LoggingSession progress-marker write path', () => { @@ -689,8 +714,10 @@ describe('LoggingSession progress-marker write path', () => { dbMocks.execute.mockResolvedValue(undefined) }) - it('writes markers to Redis (not the row) when the flag is on', async () => { + it('writes markers to Redis (not the row) when the flag is on and Redis accepts the write', async () => { isFeatureEnabledMock.mockResolvedValue(true) + setLastStartedBlockMock.mockResolvedValue(true) + setLastCompletedBlockMock.mockResolvedValue(true) const session = new LoggingSession('wf-1', 'exec-redis', 'manual', 'req-1') await session.start({ workspaceId: 'ws-1' }) @@ -708,6 +735,18 @@ describe('LoggingSession progress-marker write path', () => { expect(dbMocks.execute).not.toHaveBeenCalled() }) + it('falls back to the SQL UPDATE when the flag is on but the Redis write fails', async () => { + isFeatureEnabledMock.mockResolvedValue(true) + setLastStartedBlockMock.mockResolvedValue(false) + const session = new LoggingSession('wf-1', 'exec-redis-down', 'manual', 'req-1') + await session.start({ workspaceId: 'ws-1' }) + + await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z') + + expect(setLastStartedBlockMock).toHaveBeenCalled() + expect(dbMocks.execute).toHaveBeenCalledTimes(1) + }) + it('writes markers via jsonb_set UPDATE when the flag is off', async () => { isFeatureEnabledMock.mockResolvedValue(false) const session = new LoggingSession('wf-1', 'exec-sql', 'manual', 'req-1') diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index 9152a9a71e5..8242d792647 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -16,6 +16,7 @@ import { } from '@/lib/logs/execution/logging-factory' import { clearProgressMarkers, + getProgressMarkers, setLastCompletedBlock, setLastStartedBlock, } from '@/lib/logs/execution/progress-markers' @@ -193,8 +194,9 @@ export class LoggingSession { } private async persistLastStartedBlock(marker: ExecutionLastStartedBlock): Promise { - if (this.useRedisMarkers) { - await setLastStartedBlock(this.executionId, marker) + // Redis is the primary path when enabled; fall back to the durable SQL write + // when Redis is unavailable or the write fails, so a marker is never dropped. + if (this.useRedisMarkers && (await setLastStartedBlock(this.executionId, marker))) { return } try { @@ -215,8 +217,9 @@ export class LoggingSession { } private async persistLastCompletedBlock(marker: ExecutionLastCompletedBlock): Promise { - if (this.useRedisMarkers) { - await setLastCompletedBlock(this.executionId, marker) + // Redis is the primary path when enabled; fall back to the durable SQL write + // when Redis is unavailable or the write fails, so a marker is never dropped. + if (this.useRedisMarkers && (await setLastCompletedBlock(this.executionId, marker))) { return } try { @@ -1027,12 +1030,15 @@ export class LoggingSession { ): Promise { try { const message = errorMessage || 'Run failed' - await db - .update(workflowExecutionLogs) - .set({ - level: 'error', - status: 'failed', - executionData: sql`jsonb_set( + + // This terminal boundary bypasses completeWorkflowExecution, so fold any + // live Redis markers into the row here too — otherwise a run whose markers + // only ever lived in Redis would be saved with error data but no + // last-started/last-completed breadcrumb. Empty when the standard + // completion path already folded and cleared them. + const markers = await getProgressMarkers(executionId) + + let executionData = sql`jsonb_set( jsonb_set( jsonb_set( COALESCE(execution_data, '{}'::jsonb), @@ -1044,8 +1050,17 @@ export class LoggingSession { ), ARRAY['finalizationPath'], to_jsonb('force_failed'::text) - )`, - }) + )` + if (markers.lastStartedBlock) { + executionData = sql`jsonb_set(${executionData}, ARRAY['lastStartedBlock'], ${JSON.stringify(markers.lastStartedBlock)}::jsonb)` + } + if (markers.lastCompletedBlock) { + executionData = sql`jsonb_set(${executionData}, ARRAY['lastCompletedBlock'], ${JSON.stringify(markers.lastCompletedBlock)}::jsonb)` + } + + await db + .update(workflowExecutionLogs) + .set({ level: 'error', status: 'failed', executionData }) .where( and( eq(workflowExecutionLogs.executionId, executionId), @@ -1053,8 +1068,8 @@ export class LoggingSession { ) ) - // Terminal boundary that bypasses completeWorkflowExecution; clear markers - // so this path matches the standard clear-at-every-boundary invariant. + // Markers are now durable on the row; drop the Redis key to match the + // standard clear-at-every-boundary invariant. void clearProgressMarkers(executionId) logger.info(`[${requestId || 'unknown'}] Marked execution ${executionId} as failed`) diff --git a/apps/sim/lib/logs/execution/progress-markers.test.ts b/apps/sim/lib/logs/execution/progress-markers.test.ts index dd380801c09..9cf42005d97 100644 --- a/apps/sim/lib/logs/execution/progress-markers.test.ts +++ b/apps/sim/lib/logs/execution/progress-markers.test.ts @@ -72,14 +72,18 @@ describe('progress-markers', () => { expect(args[7]).toBe(EXPECTED_TTL_MS) }) - it('swallows eval errors (fire-and-forget)', async () => { + it('returns true when the Redis write succeeds', async () => { + await expect(setLastStartedBlock(EXECUTION_ID, startedMarker)).resolves.toBe(true) + }) + + it('returns false (caller falls back to SQL) when the eval fails', async () => { mockRedis.eval.mockRejectedValueOnce(new Error('redis down')) - await expect(setLastStartedBlock(EXECUTION_ID, startedMarker)).resolves.toBeUndefined() + await expect(setLastStartedBlock(EXECUTION_ID, startedMarker)).resolves.toBe(false) }) - it('no-ops when Redis is unavailable', async () => { + it('returns false and no-ops when Redis is unavailable', async () => { mockGetRedisClient.mockReturnValue(null) - await setLastStartedBlock(EXECUTION_ID, startedMarker) + await expect(setLastStartedBlock(EXECUTION_ID, startedMarker)).resolves.toBe(false) expect(mockRedis.eval).not.toHaveBeenCalled() }) }) @@ -127,6 +131,21 @@ describe('progress-markers', () => { expect(await getProgressMarkers(EXECUTION_ID)).toEqual({}) }) + it('drops wrong-shaped JSON so malformed markers never reach clients', async () => { + mockRedis.hgetall.mockResolvedValueOnce({ + started: JSON.stringify('just a string'), + completed: JSON.stringify({ blockId: 123, blockName: 'x', blockType: 'api', endedAt: 'z' }), + }) + expect(await getProgressMarkers(EXECUTION_ID)).toEqual({}) + }) + + it('strips extra fields, returning only the validated marker shape', async () => { + mockRedis.hgetall.mockResolvedValueOnce({ + started: JSON.stringify({ ...startedMarker, secret: 'leak', extra: 1 }), + }) + expect(await getProgressMarkers(EXECUTION_ID)).toEqual({ lastStartedBlock: startedMarker }) + }) + it('returns {} when Redis is unavailable', async () => { mockGetRedisClient.mockReturnValue(null) expect(await getProgressMarkers(EXECUTION_ID)).toEqual({}) diff --git a/apps/sim/lib/logs/execution/progress-markers.ts b/apps/sim/lib/logs/execution/progress-markers.ts index 34f609646f2..262e854bbbb 100644 --- a/apps/sim/lib/logs/execution/progress-markers.ts +++ b/apps/sim/lib/logs/execution/progress-markers.ts @@ -61,7 +61,8 @@ return 1 * TTL is a backstop for executions that die without a terminal/pause boundary * (deterministic cleanup is {@link clearProgressMarkers}); it mirrors the * admission-reservation TTL so a crashed run's marker key and slot expire - * together. + * together. Returns `true` only when the marker was durably written to Redis, + * so callers can fall back to the SQL path on a missing client or a failure. */ async function setMarker( executionId: string, @@ -69,9 +70,9 @@ async function setMarker( timestampField: 'startedAt' | 'endedAt', timestamp: string, marker: ExecutionLastStartedBlock | ExecutionLastCompletedBlock -): Promise { +): Promise { const redis = getMarkerClient() - if (!redis) return + if (!redis) return false try { await redis.eval( @@ -84,28 +85,36 @@ async function setMarker( JSON.stringify(marker), getExecutionReservationTtlMs().toString() ) + return true } catch (error) { logger.error(`Failed to persist progress marker for execution ${executionId}`, { field, error: toError(error).message, }) + return false } } -/** Persist the last-started-block marker. No-op when Redis is unavailable. */ +/** + * Persist the last-started-block marker. Returns `false` (caller should fall + * back to the durable SQL path) when Redis is unavailable or the write fails. + */ export async function setLastStartedBlock( executionId: string, marker: ExecutionLastStartedBlock -): Promise { - await setMarker(executionId, STARTED_FIELD, 'startedAt', marker.startedAt, marker) +): Promise { + return setMarker(executionId, STARTED_FIELD, 'startedAt', marker.startedAt, marker) } -/** Persist the last-completed-block marker. No-op when Redis is unavailable. */ +/** + * Persist the last-completed-block marker. Returns `false` (caller should fall + * back to the durable SQL path) when Redis is unavailable or the write fails. + */ export async function setLastCompletedBlock( executionId: string, marker: ExecutionLastCompletedBlock -): Promise { - await setMarker(executionId, COMPLETED_FIELD, 'endedAt', marker.endedAt, marker) +): Promise { + return setMarker(executionId, COMPLETED_FIELD, 'endedAt', marker.endedAt, marker) } export interface ExecutionProgressMarkers { @@ -113,15 +122,58 @@ export interface ExecutionProgressMarkers { lastCompletedBlock?: ExecutionLastCompletedBlock } -function parseMarker(raw: string | undefined): T | undefined { +function safeJsonParse(raw: string | undefined): unknown { if (!raw) return undefined try { - return JSON.parse(raw) as T + return JSON.parse(raw) } catch { return undefined } } +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value) +} + +/** + * Parse a stored last-started marker, rebuilding it from validated fields so a + * stale or wrong-shaped Redis value can never reach API consumers. + */ +function parseStartedMarker(raw: string | undefined): ExecutionLastStartedBlock | undefined { + const v = safeJsonParse(raw) + if (!isRecord(v)) return undefined + const { blockId, blockName, blockType, startedAt } = v + if ( + typeof blockId === 'string' && + typeof blockName === 'string' && + typeof blockType === 'string' && + typeof startedAt === 'string' + ) { + return { blockId, blockName, blockType, startedAt } + } + return undefined +} + +/** + * Parse a stored last-completed marker, rebuilding it from validated fields so a + * stale or wrong-shaped Redis value can never reach API consumers. + */ +function parseCompletedMarker(raw: string | undefined): ExecutionLastCompletedBlock | undefined { + const v = safeJsonParse(raw) + if (!isRecord(v)) return undefined + const { blockId, blockName, blockType, endedAt, success } = v + if ( + typeof blockId === 'string' && + typeof blockName === 'string' && + typeof blockType === 'string' && + typeof endedAt === 'string' && + typeof success === 'boolean' + ) { + return { blockId, blockName, blockType, endedAt, success } + } + return undefined +} + /** * Read both markers for an execution. Returns an empty object when Redis is * unavailable, the key has expired, or nothing has been written yet. @@ -135,9 +187,9 @@ export async function getProgressMarkers(executionId: string): Promise(fields[STARTED_FIELD]) + const started = parseStartedMarker(fields[STARTED_FIELD]) if (started) result.lastStartedBlock = started - const completed = parseMarker(fields[COMPLETED_FIELD]) + const completed = parseCompletedMarker(fields[COMPLETED_FIELD]) if (completed) result.lastCompletedBlock = completed return result } catch (error) { From 27afbce9cbc8823b74703bd239d86555dc36c471 Mon Sep 17 00:00:00 2001 From: waleed Date: Sat, 27 Jun 2026 15:53:37 -0700 Subject: [PATCH 3/7] chore(logs): convert inline marker comments to TSDoc --- .../sim/lib/logs/execution/logging-session.ts | 28 +++++++++++-------- .../logs/execution/progress-markers.test.ts | 18 ++++++------ apps/sim/lib/logs/fetch-log-detail.ts | 5 +++- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index 8242d792647..ae0dd4cc71d 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -193,9 +193,12 @@ export class LoggingSession { } } + /** + * Persist the last-started-block marker. Redis is the primary path when the + * flag is on; falls back to the durable jsonb_set UPDATE when Redis is + * unavailable or the write fails, so a marker is never dropped. + */ private async persistLastStartedBlock(marker: ExecutionLastStartedBlock): Promise { - // Redis is the primary path when enabled; fall back to the durable SQL write - // when Redis is unavailable or the write fails, so a marker is never dropped. if (this.useRedisMarkers && (await setLastStartedBlock(this.executionId, marker))) { return } @@ -216,9 +219,12 @@ export class LoggingSession { } } + /** + * Persist the last-completed-block marker. Redis is the primary path when the + * flag is on; falls back to the durable jsonb_set UPDATE when Redis is + * unavailable or the write fails, so a marker is never dropped. + */ private async persistLastCompletedBlock(marker: ExecutionLastCompletedBlock): Promise { - // Redis is the primary path when enabled; fall back to the durable SQL write - // when Redis is unavailable or the write fails, so a marker is never dropped. if (this.useRedisMarkers && (await setLastCompletedBlock(this.executionId, marker))) { return } @@ -1022,6 +1028,13 @@ export class LoggingSession { ) } + /** + * Force-fail terminal boundary that bypasses completeWorkflowExecution. Folds + * any live Redis progress markers into execution_data before clearing the key, + * so a run whose markers only ever lived in Redis still keeps its + * last-started/last-completed breadcrumb. Both the fold and clear are no-ops + * when the standard completion path already persisted and cleared them. + */ static async markExecutionAsFailed( executionId: string, errorMessage: string | undefined, @@ -1031,11 +1044,6 @@ export class LoggingSession { try { const message = errorMessage || 'Run failed' - // This terminal boundary bypasses completeWorkflowExecution, so fold any - // live Redis markers into the row here too — otherwise a run whose markers - // only ever lived in Redis would be saved with error data but no - // last-started/last-completed breadcrumb. Empty when the standard - // completion path already folded and cleared them. const markers = await getProgressMarkers(executionId) let executionData = sql`jsonb_set( @@ -1068,8 +1076,6 @@ export class LoggingSession { ) ) - // Markers are now durable on the row; drop the Redis key to match the - // standard clear-at-every-boundary invariant. void clearProgressMarkers(executionId) logger.info(`[${requestId || 'unknown'}] Marked execution ${executionId} as failed`) diff --git a/apps/sim/lib/logs/execution/progress-markers.test.ts b/apps/sim/lib/logs/execution/progress-markers.test.ts index 9cf42005d97..a65fe5f19a6 100644 --- a/apps/sim/lib/logs/execution/progress-markers.test.ts +++ b/apps/sim/lib/logs/execution/progress-markers.test.ts @@ -61,15 +61,15 @@ describe('progress-markers', () => { await setLastStartedBlock(EXECUTION_ID, startedMarker) expect(mockRedis.eval).toHaveBeenCalledTimes(1) - const args = mockRedis.eval.mock.calls[0] - // [script, numKeys, key, field, tsField, timestamp, json, ttl] - expect(args[1]).toBe(1) - expect(args[2]).toBe(KEY) - expect(args[3]).toBe('started') - expect(args[4]).toBe('startedAt') - expect(args[5]).toBe(startedMarker.startedAt) - expect(JSON.parse(args[6] as string)).toEqual(startedMarker) - expect(args[7]).toBe(EXPECTED_TTL_MS) + const [, numKeys, key, field, timestampField, timestamp, json, ttl] = + mockRedis.eval.mock.calls[0] + expect(numKeys).toBe(1) + expect(key).toBe(KEY) + expect(field).toBe('started') + expect(timestampField).toBe('startedAt') + expect(timestamp).toBe(startedMarker.startedAt) + expect(JSON.parse(json as string)).toEqual(startedMarker) + expect(ttl).toBe(EXPECTED_TTL_MS) }) it('returns true when the Redis write succeeds', async () => { diff --git a/apps/sim/lib/logs/fetch-log-detail.ts b/apps/sim/lib/logs/fetch-log-detail.ts index 4b57d8eeeb8..145fa10c701 100644 --- a/apps/sim/lib/logs/fetch-log-detail.ts +++ b/apps/sim/lib/logs/fetch-log-detail.ts @@ -78,6 +78,10 @@ interface FetchLogDetailArgs { * Shared loader for the workflow-log detail shape returned by the by-id and * by-execution routes. Returns `null` when no matching row exists in either * the workflow-execution or job-execution tables for this user + workspace. + * + * For in-flight (running/pending) executions, live progress markers are merged + * from Redis, since they are only folded into the row at a terminal/pause + * boundary. */ export async function fetchLogDetail({ userId, @@ -166,7 +170,6 @@ export async function fetchLogDetail({ { workspaceId, workflowId: log.workflowId, executionId: log.executionId } ) - // In-flight markers live in Redis until folded into the row at a terminal/pause boundary. const liveMarkers = log.status === 'running' || log.status === 'pending' ? await getProgressMarkers(log.executionId) From 08f54efffb6f281a36e5b09dc63af08ed8dfb719 Mon Sep 17 00:00:00 2001 From: waleed Date: Sat, 27 Jun 2026 16:03:21 -0700 Subject: [PATCH 4/7] fix(logs): preserve markers when the completion read fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit getProgressMarkers now returns null on a Redis read error (vs {} for genuinely empty). completeWorkflowExecution and markExecutionAsFailed skip clearProgressMarkers when the read returns null, so a transient read error at completion no longer wipes markers that are still durably in Redis — the TTL reclaims them instead. --- apps/sim/lib/logs/execution/logger.ts | 4 ++-- apps/sim/lib/logs/execution/logging-session.test.ts | 6 ++++++ apps/sim/lib/logs/execution/logging-session.ts | 6 +++--- apps/sim/lib/logs/execution/progress-markers.test.ts | 5 +++++ apps/sim/lib/logs/execution/progress-markers.ts | 11 ++++++++--- apps/sim/lib/logs/fetch-log-detail.ts | 2 +- 6 files changed, 25 insertions(+), 9 deletions(-) diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 9c7e72f7b5a..fc3f30746c0 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -749,7 +749,7 @@ export class ExecutionLogger implements IExecutionLoggerService { const builtExecutionData = this.buildCompletedExecutionData({ existingExecutionData, - progressMarkers, + progressMarkers: progressMarkers ?? undefined, traceSpans: mergedTraceSpans, finalOutput, finalizationPath, @@ -910,7 +910,7 @@ export class ExecutionLogger implements IExecutionLoggerService { return log }) - void clearProgressMarkers(executionId) + if (progressMarkers !== null) void clearProgressMarkers(executionId) try { // Skip workflow lookup if workflow was deleted. diff --git a/apps/sim/lib/logs/execution/logging-session.test.ts b/apps/sim/lib/logs/execution/logging-session.test.ts index cd99c3e77ca..bdf90f58f03 100644 --- a/apps/sim/lib/logs/execution/logging-session.test.ts +++ b/apps/sim/lib/logs/execution/logging-session.test.ts @@ -699,6 +699,12 @@ describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => { expect(folded).toContain('lastCompletedBlock') expect(clearProgressMarkersMock).toHaveBeenCalledWith('exec-9') }) + + it('does not clear markers when the Redis read fails (avoids wiping the only copy)', async () => { + getProgressMarkersMock.mockResolvedValueOnce(null) + await LoggingSession.markExecutionAsFailed('exec-readfail', 'boom', undefined, 'wf-x') + expect(clearProgressMarkersMock).not.toHaveBeenCalled() + }) }) describe('LoggingSession progress-marker write path', () => { diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index ae0dd4cc71d..a373a340e4f 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -1059,10 +1059,10 @@ export class LoggingSession { ARRAY['finalizationPath'], to_jsonb('force_failed'::text) )` - if (markers.lastStartedBlock) { + if (markers?.lastStartedBlock) { executionData = sql`jsonb_set(${executionData}, ARRAY['lastStartedBlock'], ${JSON.stringify(markers.lastStartedBlock)}::jsonb)` } - if (markers.lastCompletedBlock) { + if (markers?.lastCompletedBlock) { executionData = sql`jsonb_set(${executionData}, ARRAY['lastCompletedBlock'], ${JSON.stringify(markers.lastCompletedBlock)}::jsonb)` } @@ -1076,7 +1076,7 @@ export class LoggingSession { ) ) - void clearProgressMarkers(executionId) + if (markers !== null) void clearProgressMarkers(executionId) logger.info(`[${requestId || 'unknown'}] Marked execution ${executionId} as failed`) } catch (error) { diff --git a/apps/sim/lib/logs/execution/progress-markers.test.ts b/apps/sim/lib/logs/execution/progress-markers.test.ts index a65fe5f19a6..b565c97b091 100644 --- a/apps/sim/lib/logs/execution/progress-markers.test.ts +++ b/apps/sim/lib/logs/execution/progress-markers.test.ts @@ -151,6 +151,11 @@ describe('progress-markers', () => { expect(await getProgressMarkers(EXECUTION_ID)).toEqual({}) expect(mockRedis.hgetall).not.toHaveBeenCalled() }) + + it('returns null when the Redis read fails so callers do not clear the only copy', async () => { + mockRedis.hgetall.mockRejectedValueOnce(new Error('redis down')) + expect(await getProgressMarkers(EXECUTION_ID)).toBeNull() + }) }) describe('clearProgressMarkers', () => { diff --git a/apps/sim/lib/logs/execution/progress-markers.ts b/apps/sim/lib/logs/execution/progress-markers.ts index 262e854bbbb..7d0ac91389b 100644 --- a/apps/sim/lib/logs/execution/progress-markers.ts +++ b/apps/sim/lib/logs/execution/progress-markers.ts @@ -176,9 +176,14 @@ function parseCompletedMarker(raw: string | undefined): ExecutionLastCompletedBl /** * Read both markers for an execution. Returns an empty object when Redis is - * unavailable, the key has expired, or nothing has been written yet. + * unavailable (markers were never stored here) or the key holds nothing, and + * `null` when the Redis read itself failed — callers must treat `null` as + * "unknown" and skip {@link clearProgressMarkers}, so a transient read error + * never wipes the only copy of markers that are still in Redis. */ -export async function getProgressMarkers(executionId: string): Promise { +export async function getProgressMarkers( + executionId: string +): Promise { const redis = getMarkerClient() if (!redis) return {} @@ -196,7 +201,7 @@ export async function getProgressMarkers(executionId: string): Promise Date: Sat, 27 Jun 2026 16:20:01 -0700 Subject: [PATCH 5/7] fix(logs): resolve marker store split-brain by latest-timestamp-wins + drain on force-fail - When a Redis marker write falls back to SQL, Redis and the row can each hold a marker for a different block; reads/folds previously preferred Redis unconditionally and could pick a stale value. Now the completion fold, the in-flight detail read, and the force-fail fold all pick the marker with the later timestamp (pickLatestStartedMarker/pickLatestCompletedMarker; markExecutionAsFailed uses a monotonic SQL guard). - markAsFailed now drains pending per-block marker writes (not just the completion promise) before folding, so a force-fail racing onBlockStart/onBlockComplete still captures the latest breadcrumb. --- apps/sim/lib/logs/execution/logger.ts | 14 +++++++--- .../sim/lib/logs/execution/logging-session.ts | 18 +++++++++++-- .../logs/execution/progress-markers.test.ts | 25 ++++++++++++++++++ .../lib/logs/execution/progress-markers.ts | 26 +++++++++++++++++++ apps/sim/lib/logs/fetch-log-detail.ts | 19 ++++++++++++-- 5 files changed, 94 insertions(+), 8 deletions(-) diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index fc3f30746c0..539b87174e6 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -41,6 +41,8 @@ import { clearProgressMarkers, type ExecutionProgressMarkers, getProgressMarkers, + pickLatestCompletedMarker, + pickLatestStartedMarker, } from '@/lib/logs/execution/progress-markers' import { snapshotService } from '@/lib/logs/execution/snapshot/service' import { @@ -459,10 +461,14 @@ export class ExecutionLogger implements IExecutionLoggerService { } = params const traceSpanCount = countTraceSpans(traceSpans) - const lastStartedBlock = - progressMarkers?.lastStartedBlock ?? existingExecutionData?.lastStartedBlock - const lastCompletedBlock = - progressMarkers?.lastCompletedBlock ?? existingExecutionData?.lastCompletedBlock + const lastStartedBlock = pickLatestStartedMarker( + progressMarkers?.lastStartedBlock, + existingExecutionData?.lastStartedBlock + ) + const lastCompletedBlock = pickLatestCompletedMarker( + progressMarkers?.lastCompletedBlock, + existingExecutionData?.lastCompletedBlock + ) return { ...(existingExecutionData?.environment diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index a373a340e4f..e5d1d6f1d1a 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -1018,8 +1018,14 @@ export class LoggingSession { } } + /** + * Force-fail the execution. Waits for any in-flight completion and drains + * pending per-block marker writes first, so a force-fail racing + * onBlockStart/onBlockComplete still captures the latest breadcrumb in the fold. + */ async markAsFailed(errorMessage?: string): Promise { await this.waitForCompletion() + await this.drainPendingProgressWrites() await LoggingSession.markExecutionAsFailed( this.executionId, errorMessage, @@ -1060,10 +1066,18 @@ export class LoggingSession { to_jsonb('force_failed'::text) )` if (markers?.lastStartedBlock) { - executionData = sql`jsonb_set(${executionData}, ARRAY['lastStartedBlock'], ${JSON.stringify(markers.lastStartedBlock)}::jsonb)` + const startedAt = markers.lastStartedBlock.startedAt + const startedJson = JSON.stringify(markers.lastStartedBlock) + executionData = sql`CASE WHEN COALESCE(jsonb_extract_path_text(execution_data, 'lastStartedBlock', 'startedAt'), '') <= ${startedAt} + THEN jsonb_set(${executionData}, ARRAY['lastStartedBlock'], ${startedJson}::jsonb) + ELSE ${executionData} END` } if (markers?.lastCompletedBlock) { - executionData = sql`jsonb_set(${executionData}, ARRAY['lastCompletedBlock'], ${JSON.stringify(markers.lastCompletedBlock)}::jsonb)` + const endedAt = markers.lastCompletedBlock.endedAt + const completedJson = JSON.stringify(markers.lastCompletedBlock) + executionData = sql`CASE WHEN COALESCE(jsonb_extract_path_text(execution_data, 'lastCompletedBlock', 'endedAt'), '') <= ${endedAt} + THEN jsonb_set(${executionData}, ARRAY['lastCompletedBlock'], ${completedJson}::jsonb) + ELSE ${executionData} END` } await db diff --git a/apps/sim/lib/logs/execution/progress-markers.test.ts b/apps/sim/lib/logs/execution/progress-markers.test.ts index b565c97b091..2fc17b45d93 100644 --- a/apps/sim/lib/logs/execution/progress-markers.test.ts +++ b/apps/sim/lib/logs/execution/progress-markers.test.ts @@ -24,6 +24,8 @@ vi.mock('@/lib/core/execution-limits', () => ({ import { clearProgressMarkers, getProgressMarkers, + pickLatestCompletedMarker, + pickLatestStartedMarker, setLastCompletedBlock, setLastStartedBlock, } from '@/lib/logs/execution/progress-markers' @@ -175,4 +177,27 @@ describe('progress-markers', () => { expect(mockRedis.del).not.toHaveBeenCalled() }) }) + + describe('latest-wins pickers (stale-store safety)', () => { + const older = { ...startedMarker, blockId: 'old', startedAt: '2026-06-27T10:00:00.000Z' } + const newer = { ...startedMarker, blockId: 'new', startedAt: '2026-06-27T10:00:05.000Z' } + + it('returns the defined side when the other is undefined', () => { + expect(pickLatestStartedMarker(older, undefined)).toBe(older) + expect(pickLatestStartedMarker(undefined, newer)).toBe(newer) + expect(pickLatestStartedMarker(undefined, undefined)).toBeUndefined() + }) + + it('picks the later startedAt regardless of argument order (row newer than Redis still wins)', () => { + expect(pickLatestStartedMarker(older, newer)).toBe(newer) + expect(pickLatestStartedMarker(newer, older)).toBe(newer) + }) + + it('picks the later endedAt for completed markers', () => { + const c1 = { ...completedMarker, endedAt: '2026-06-27T10:00:01.000Z' } + const c2 = { ...completedMarker, endedAt: '2026-06-27T10:00:09.000Z' } + expect(pickLatestCompletedMarker(c1, c2)).toBe(c2) + expect(pickLatestCompletedMarker(c2, c1)).toBe(c2) + }) + }) }) diff --git a/apps/sim/lib/logs/execution/progress-markers.ts b/apps/sim/lib/logs/execution/progress-markers.ts index 7d0ac91389b..7fe57eb020e 100644 --- a/apps/sim/lib/logs/execution/progress-markers.ts +++ b/apps/sim/lib/logs/execution/progress-markers.ts @@ -122,6 +122,32 @@ export interface ExecutionProgressMarkers { lastCompletedBlock?: ExecutionLastCompletedBlock } +/** + * Pick the later of two last-started markers by `startedAt`. Markers can split + * across stores — a failed Redis write falls back to the row, so an earlier + * successful Redis write may coexist with a newer row marker (or vice versa). + * Choosing by timestamp keeps the freshest breadcrumb regardless of which store + * holds it. ISO UTC timestamps compare correctly lexicographically. + */ +export function pickLatestStartedMarker( + a: ExecutionLastStartedBlock | undefined, + b: ExecutionLastStartedBlock | undefined +): ExecutionLastStartedBlock | undefined { + if (!a) return b + if (!b) return a + return a.startedAt >= b.startedAt ? a : b +} + +/** Pick the later of two last-completed markers by `endedAt`. See {@link pickLatestStartedMarker}. */ +export function pickLatestCompletedMarker( + a: ExecutionLastCompletedBlock | undefined, + b: ExecutionLastCompletedBlock | undefined +): ExecutionLastCompletedBlock | undefined { + if (!a) return b + if (!b) return a + return a.endedAt >= b.endedAt ? a : b +} + function safeJsonParse(raw: string | undefined): unknown { if (!raw) return undefined try { diff --git a/apps/sim/lib/logs/fetch-log-detail.ts b/apps/sim/lib/logs/fetch-log-detail.ts index 72b29a25c86..5b38171d6b6 100644 --- a/apps/sim/lib/logs/fetch-log-detail.ts +++ b/apps/sim/lib/logs/fetch-log-detail.ts @@ -9,7 +9,12 @@ import { } from '@sim/db/schema' import { and, eq, type SQL } from 'drizzle-orm' import type { CostLedger } from '@/lib/api/contracts/logs' -import { getProgressMarkers } from '@/lib/logs/execution/progress-markers' +import { + type ExecutionProgressMarkers, + getProgressMarkers, + pickLatestCompletedMarker, + pickLatestStartedMarker, +} from '@/lib/logs/execution/progress-markers' import { materializeExecutionData } from '@/lib/logs/execution/trace-store' import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils' @@ -174,6 +179,15 @@ export async function fetchLogDetail({ log.status === 'running' || log.status === 'pending' ? ((await getProgressMarkers(log.executionId)) ?? {}) : {} + const rowMarkers = (executionData ?? {}) as ExecutionProgressMarkers + const mergedStartedBlock = pickLatestStartedMarker( + liveMarkers.lastStartedBlock, + rowMarkers.lastStartedBlock + ) + const mergedCompletedBlock = pickLatestCompletedMarker( + liveMarkers.lastCompletedBlock, + rowMarkers.lastCompletedBlock + ) return { id: log.id, @@ -200,7 +214,8 @@ export async function fetchLogDetail({ executionData: { totalDuration: log.totalDurationMs, ...executionData, - ...liveMarkers, + ...(mergedStartedBlock ? { lastStartedBlock: mergedStartedBlock } : {}), + ...(mergedCompletedBlock ? { lastCompletedBlock: mergedCompletedBlock } : {}), enhanced: true as const, }, files: log.files ?? null, From 811846eca65f8241e733093b60feeb9dc595fbc4 Mon Sep 17 00:00:00 2001 From: waleed Date: Sat, 27 Jun 2026 16:42:37 -0700 Subject: [PATCH 6/7] fix(logs): harden Lua marker guard against non-table decoded values Guard the monotonic-check index with type(decoded) == 'table' so a corrupted Redis field that decodes to a non-table (e.g. a number) can't error the eval; our write path only ever stores JSON objects, so this is defense-in-depth. --- apps/sim/lib/logs/execution/progress-markers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/sim/lib/logs/execution/progress-markers.ts b/apps/sim/lib/logs/execution/progress-markers.ts index 7fe57eb020e..352196cebca 100644 --- a/apps/sim/lib/logs/execution/progress-markers.ts +++ b/apps/sim/lib/logs/execution/progress-markers.ts @@ -46,7 +46,7 @@ const SET_MARKER_SCRIPT = ` local existing = redis.call('HGET', KEYS[1], ARGV[1]) if existing then local ok, decoded = pcall(cjson.decode, existing) - if ok and decoded[ARGV[2]] and tostring(decoded[ARGV[2]]) > ARGV[3] then + if ok and type(decoded) == 'table' and decoded[ARGV[2]] and tostring(decoded[ARGV[2]]) > ARGV[3] then redis.call('PEXPIRE', KEYS[1], ARGV[5]) return 0 end From 66691709e2090a564889a3c04a3fe4873e60a868 Mon Sep 17 00:00:00 2001 From: waleed Date: Sat, 27 Jun 2026 16:52:35 -0700 Subject: [PATCH 7/7] perf(logs): skip completion Redis read/clear when markers went to SQL completeWorkflowExecution now takes readProgressMarkers (the session's resolved marker mode); when the flag is off it skips the per-completion HGETALL+DEL entirely instead of probing a key that was never written. Sticky to the session so it stays flip-safe (an execution that wrote to Redis always folds+clears Redis). Non-session callers default to true (safe read-and-fold). Also hardened the Lua guard with type(decoded)=='table'. --- apps/sim/lib/logs/execution/logger.ts | 4 +++- .../logs/execution/logging-session.test.ts | 20 +++++++++++++++++++ .../sim/lib/logs/execution/logging-session.ts | 1 + apps/sim/lib/logs/types.ts | 7 +++++++ 4 files changed, 31 insertions(+), 1 deletion(-) diff --git a/apps/sim/lib/logs/execution/logger.ts b/apps/sim/lib/logs/execution/logger.ts index 539b87174e6..a78225d42f5 100644 --- a/apps/sim/lib/logs/execution/logger.ts +++ b/apps/sim/lib/logs/execution/logger.ts @@ -679,6 +679,7 @@ export class ExecutionLogger implements IExecutionLoggerService { isResume?: boolean level?: 'info' | 'error' status?: 'completed' | 'failed' | 'cancelled' | 'pending' + readProgressMarkers?: boolean }): Promise { const { executionId, @@ -694,6 +695,7 @@ export class ExecutionLogger implements IExecutionLoggerService { isResume, level: levelOverride, status: statusOverride, + readProgressMarkers = true, } = params let execLog = logger.withMetadata({ executionId }) @@ -751,7 +753,7 @@ export class ExecutionLogger implements IExecutionLoggerService { models: costSummary.models, } - const progressMarkers = await getProgressMarkers(executionId) + const progressMarkers = readProgressMarkers ? await getProgressMarkers(executionId) : null const builtExecutionData = this.buildCompletedExecutionData({ existingExecutionData, diff --git a/apps/sim/lib/logs/execution/logging-session.test.ts b/apps/sim/lib/logs/execution/logging-session.test.ts index bdf90f58f03..cf978fccd5f 100644 --- a/apps/sim/lib/logs/execution/logging-session.test.ts +++ b/apps/sim/lib/logs/execution/logging-session.test.ts @@ -774,4 +774,24 @@ describe('LoggingSession progress-marker write path', () => { expect(dbMocks.execute).toHaveBeenCalledTimes(1) expect(setLastStartedBlockMock).not.toHaveBeenCalled() }) + + it('tells completion to read Redis markers only when the flag is on (no wasted ops when off)', async () => { + completeWorkflowExecutionMock.mockResolvedValue({}) + + isFeatureEnabledMock.mockResolvedValue(true) + const onSession = new LoggingSession('wf-1', 'exec-on', 'manual', 'req-1') + await onSession.start({ workspaceId: 'ws-1' }) + await onSession.safeComplete({ finalOutput: { ok: true } }) + expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith( + expect.objectContaining({ executionId: 'exec-on', readProgressMarkers: true }) + ) + + isFeatureEnabledMock.mockResolvedValue(false) + const offSession = new LoggingSession('wf-1', 'exec-off', 'manual', 'req-1') + await offSession.start({ workspaceId: 'ws-1' }) + await offSession.safeComplete({ finalOutput: { ok: true } }) + expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith( + expect.objectContaining({ executionId: 'exec-off', readProgressMarkers: false }) + ) + }) }) diff --git a/apps/sim/lib/logs/execution/logging-session.ts b/apps/sim/lib/logs/execution/logging-session.ts index e5d1d6f1d1a..d6d7c271fff 100644 --- a/apps/sim/lib/logs/execution/logging-session.ts +++ b/apps/sim/lib/logs/execution/logging-session.ts @@ -308,6 +308,7 @@ export class LoggingSession { isResume: this.isResume, level: params.level, status: params.status, + readProgressMarkers: this.useRedisMarkers, }) // Release the admission reservation from preprocessing. Skipped on pause: a diff --git a/apps/sim/lib/logs/types.ts b/apps/sim/lib/logs/types.ts index f4021439a35..f146c27699b 100644 --- a/apps/sim/lib/logs/types.ts +++ b/apps/sim/lib/logs/types.ts @@ -463,5 +463,12 @@ export interface ExecutionLoggerService { isResume?: boolean level?: 'info' | 'error' status?: 'completed' | 'failed' | 'cancelled' | 'pending' + /** + * Whether this session wrote live progress markers to Redis. When false, the + * completion fold skips the Redis read/clear entirely (markers are already on + * the row via the SQL path). Defaults to true so non-session callers keep the + * safe read-and-fold behavior. + */ + readProgressMarkers?: boolean }): Promise }