Skip to content

Commit f4d8086

Browse files
committed
fix(logs): SQL fallback when Redis marker write fails, fold markers on force-fail, validate marker shape
Addresses review feedback on the redis-progress-markers PR: - persistLast* now falls back to the jsonb_set UPDATE when Redis is unavailable or the write fails (setLast* returns whether it persisted), so a marker is never dropped when the flag is on without a healthy Redis. - markExecutionAsFailed folds live Redis markers into execution_data before clearing, so the last-started/last-completed breadcrumb survives the force-fail path. - getProgressMarkers validates marker shape (rebuilds from typed fields), so a stale or wrong-shaped Redis value can never reach API consumers.
1 parent 70ffdd1 commit f4d8086

4 files changed

Lines changed: 159 additions & 34 deletions

File tree

apps/sim/lib/logs/execution/logging-session.test.ts

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,13 @@ const {
7070
isFeatureEnabledMock,
7171
setLastStartedBlockMock,
7272
setLastCompletedBlockMock,
73+
getProgressMarkersMock,
7374
clearProgressMarkersMock,
7475
} = vi.hoisted(() => ({
7576
isFeatureEnabledMock: vi.fn().mockResolvedValue(false),
76-
setLastStartedBlockMock: vi.fn().mockResolvedValue(undefined),
77-
setLastCompletedBlockMock: vi.fn().mockResolvedValue(undefined),
77+
setLastStartedBlockMock: vi.fn().mockResolvedValue(false),
78+
setLastCompletedBlockMock: vi.fn().mockResolvedValue(false),
79+
getProgressMarkersMock: vi.fn().mockResolvedValue({}),
7880
clearProgressMarkersMock: vi.fn().mockResolvedValue(undefined),
7981
}))
8082

@@ -85,6 +87,7 @@ vi.mock('@/lib/core/config/feature-flags', () => ({
8587
vi.mock('@/lib/logs/execution/progress-markers', () => ({
8688
setLastStartedBlock: setLastStartedBlockMock,
8789
setLastCompletedBlock: setLastCompletedBlockMock,
90+
getProgressMarkers: getProgressMarkersMock,
8891
clearProgressMarkers: clearProgressMarkersMock,
8992
}))
9093

@@ -674,6 +677,28 @@ describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => {
674677
await LoggingSession.markExecutionAsFailed('exec-3', 'boom', undefined, 'wf-3')
675678
expect(clearProgressMarkersMock).toHaveBeenCalledWith('exec-3')
676679
})
680+
681+
it('folds live Redis markers into the row before clearing on force-fail', async () => {
682+
getProgressMarkersMock.mockResolvedValueOnce({
683+
lastStartedBlock: { blockId: 'b1', blockName: 'Fetch', blockType: 'api', startedAt: 't1' },
684+
lastCompletedBlock: {
685+
blockId: 'b1',
686+
blockName: 'Fetch',
687+
blockType: 'api',
688+
endedAt: 't2',
689+
success: false,
690+
},
691+
})
692+
693+
await LoggingSession.markExecutionAsFailed('exec-9', 'boom', undefined, 'wf-9')
694+
695+
const folded = dbMocks.sql.mock.calls
696+
.map((c) => String(Array.from(c[0] as TemplateStringsArray)))
697+
.join(' ')
698+
expect(folded).toContain('lastStartedBlock')
699+
expect(folded).toContain('lastCompletedBlock')
700+
expect(clearProgressMarkersMock).toHaveBeenCalledWith('exec-9')
701+
})
677702
})
678703

679704
describe('LoggingSession progress-marker write path', () => {
@@ -689,8 +714,10 @@ describe('LoggingSession progress-marker write path', () => {
689714
dbMocks.execute.mockResolvedValue(undefined)
690715
})
691716

692-
it('writes markers to Redis (not the row) when the flag is on', async () => {
717+
it('writes markers to Redis (not the row) when the flag is on and Redis accepts the write', async () => {
693718
isFeatureEnabledMock.mockResolvedValue(true)
719+
setLastStartedBlockMock.mockResolvedValue(true)
720+
setLastCompletedBlockMock.mockResolvedValue(true)
694721
const session = new LoggingSession('wf-1', 'exec-redis', 'manual', 'req-1')
695722
await session.start({ workspaceId: 'ws-1' })
696723

@@ -708,6 +735,18 @@ describe('LoggingSession progress-marker write path', () => {
708735
expect(dbMocks.execute).not.toHaveBeenCalled()
709736
})
710737

738+
it('falls back to the SQL UPDATE when the flag is on but the Redis write fails', async () => {
739+
isFeatureEnabledMock.mockResolvedValue(true)
740+
setLastStartedBlockMock.mockResolvedValue(false)
741+
const session = new LoggingSession('wf-1', 'exec-redis-down', 'manual', 'req-1')
742+
await session.start({ workspaceId: 'ws-1' })
743+
744+
await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z')
745+
746+
expect(setLastStartedBlockMock).toHaveBeenCalled()
747+
expect(dbMocks.execute).toHaveBeenCalledTimes(1)
748+
})
749+
711750
it('writes markers via jsonb_set UPDATE when the flag is off', async () => {
712751
isFeatureEnabledMock.mockResolvedValue(false)
713752
const session = new LoggingSession('wf-1', 'exec-sql', 'manual', 'req-1')

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
} from '@/lib/logs/execution/logging-factory'
1717
import {
1818
clearProgressMarkers,
19+
getProgressMarkers,
1920
setLastCompletedBlock,
2021
setLastStartedBlock,
2122
} from '@/lib/logs/execution/progress-markers'
@@ -193,8 +194,9 @@ export class LoggingSession {
193194
}
194195

195196
private async persistLastStartedBlock(marker: ExecutionLastStartedBlock): Promise<void> {
196-
if (this.useRedisMarkers) {
197-
await setLastStartedBlock(this.executionId, marker)
197+
// Redis is the primary path when enabled; fall back to the durable SQL write
198+
// when Redis is unavailable or the write fails, so a marker is never dropped.
199+
if (this.useRedisMarkers && (await setLastStartedBlock(this.executionId, marker))) {
198200
return
199201
}
200202
try {
@@ -215,8 +217,9 @@ export class LoggingSession {
215217
}
216218

217219
private async persistLastCompletedBlock(marker: ExecutionLastCompletedBlock): Promise<void> {
218-
if (this.useRedisMarkers) {
219-
await setLastCompletedBlock(this.executionId, marker)
220+
// Redis is the primary path when enabled; fall back to the durable SQL write
221+
// when Redis is unavailable or the write fails, so a marker is never dropped.
222+
if (this.useRedisMarkers && (await setLastCompletedBlock(this.executionId, marker))) {
220223
return
221224
}
222225
try {
@@ -1027,12 +1030,15 @@ export class LoggingSession {
10271030
): Promise<void> {
10281031
try {
10291032
const message = errorMessage || 'Run failed'
1030-
await db
1031-
.update(workflowExecutionLogs)
1032-
.set({
1033-
level: 'error',
1034-
status: 'failed',
1035-
executionData: sql`jsonb_set(
1033+
1034+
// This terminal boundary bypasses completeWorkflowExecution, so fold any
1035+
// live Redis markers into the row here too — otherwise a run whose markers
1036+
// only ever lived in Redis would be saved with error data but no
1037+
// last-started/last-completed breadcrumb. Empty when the standard
1038+
// completion path already folded and cleared them.
1039+
const markers = await getProgressMarkers(executionId)
1040+
1041+
let executionData = sql`jsonb_set(
10361042
jsonb_set(
10371043
jsonb_set(
10381044
COALESCE(execution_data, '{}'::jsonb),
@@ -1044,17 +1050,26 @@ export class LoggingSession {
10441050
),
10451051
ARRAY['finalizationPath'],
10461052
to_jsonb('force_failed'::text)
1047-
)`,
1048-
})
1053+
)`
1054+
if (markers.lastStartedBlock) {
1055+
executionData = sql`jsonb_set(${executionData}, ARRAY['lastStartedBlock'], ${JSON.stringify(markers.lastStartedBlock)}::jsonb)`
1056+
}
1057+
if (markers.lastCompletedBlock) {
1058+
executionData = sql`jsonb_set(${executionData}, ARRAY['lastCompletedBlock'], ${JSON.stringify(markers.lastCompletedBlock)}::jsonb)`
1059+
}
1060+
1061+
await db
1062+
.update(workflowExecutionLogs)
1063+
.set({ level: 'error', status: 'failed', executionData })
10491064
.where(
10501065
and(
10511066
eq(workflowExecutionLogs.executionId, executionId),
10521067
eq(workflowExecutionLogs.workflowId, workflowId)
10531068
)
10541069
)
10551070

1056-
// Terminal boundary that bypasses completeWorkflowExecution; clear markers
1057-
// so this path matches the standard clear-at-every-boundary invariant.
1071+
// Markers are now durable on the row; drop the Redis key to match the
1072+
// standard clear-at-every-boundary invariant.
10581073
void clearProgressMarkers(executionId)
10591074

10601075
logger.info(`[${requestId || 'unknown'}] Marked execution ${executionId} as failed`)

apps/sim/lib/logs/execution/progress-markers.test.ts

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,18 @@ describe('progress-markers', () => {
7272
expect(args[7]).toBe(EXPECTED_TTL_MS)
7373
})
7474

75-
it('swallows eval errors (fire-and-forget)', async () => {
75+
it('returns true when the Redis write succeeds', async () => {
76+
await expect(setLastStartedBlock(EXECUTION_ID, startedMarker)).resolves.toBe(true)
77+
})
78+
79+
it('returns false (caller falls back to SQL) when the eval fails', async () => {
7680
mockRedis.eval.mockRejectedValueOnce(new Error('redis down'))
77-
await expect(setLastStartedBlock(EXECUTION_ID, startedMarker)).resolves.toBeUndefined()
81+
await expect(setLastStartedBlock(EXECUTION_ID, startedMarker)).resolves.toBe(false)
7882
})
7983

80-
it('no-ops when Redis is unavailable', async () => {
84+
it('returns false and no-ops when Redis is unavailable', async () => {
8185
mockGetRedisClient.mockReturnValue(null)
82-
await setLastStartedBlock(EXECUTION_ID, startedMarker)
86+
await expect(setLastStartedBlock(EXECUTION_ID, startedMarker)).resolves.toBe(false)
8387
expect(mockRedis.eval).not.toHaveBeenCalled()
8488
})
8589
})
@@ -127,6 +131,21 @@ describe('progress-markers', () => {
127131
expect(await getProgressMarkers(EXECUTION_ID)).toEqual({})
128132
})
129133

134+
it('drops wrong-shaped JSON so malformed markers never reach clients', async () => {
135+
mockRedis.hgetall.mockResolvedValueOnce({
136+
started: JSON.stringify('just a string'),
137+
completed: JSON.stringify({ blockId: 123, blockName: 'x', blockType: 'api', endedAt: 'z' }),
138+
})
139+
expect(await getProgressMarkers(EXECUTION_ID)).toEqual({})
140+
})
141+
142+
it('strips extra fields, returning only the validated marker shape', async () => {
143+
mockRedis.hgetall.mockResolvedValueOnce({
144+
started: JSON.stringify({ ...startedMarker, secret: 'leak', extra: 1 }),
145+
})
146+
expect(await getProgressMarkers(EXECUTION_ID)).toEqual({ lastStartedBlock: startedMarker })
147+
})
148+
130149
it('returns {} when Redis is unavailable', async () => {
131150
mockGetRedisClient.mockReturnValue(null)
132151
expect(await getProgressMarkers(EXECUTION_ID)).toEqual({})

apps/sim/lib/logs/execution/progress-markers.ts

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,18 @@ return 1
6161
* TTL is a backstop for executions that die without a terminal/pause boundary
6262
* (deterministic cleanup is {@link clearProgressMarkers}); it mirrors the
6363
* admission-reservation TTL so a crashed run's marker key and slot expire
64-
* together.
64+
* together. Returns `true` only when the marker was durably written to Redis,
65+
* so callers can fall back to the SQL path on a missing client or a failure.
6566
*/
6667
async function setMarker(
6768
executionId: string,
6869
field: string,
6970
timestampField: 'startedAt' | 'endedAt',
7071
timestamp: string,
7172
marker: ExecutionLastStartedBlock | ExecutionLastCompletedBlock
72-
): Promise<void> {
73+
): Promise<boolean> {
7374
const redis = getMarkerClient()
74-
if (!redis) return
75+
if (!redis) return false
7576

7677
try {
7778
await redis.eval(
@@ -84,44 +85,95 @@ async function setMarker(
8485
JSON.stringify(marker),
8586
getExecutionReservationTtlMs().toString()
8687
)
88+
return true
8789
} catch (error) {
8890
logger.error(`Failed to persist progress marker for execution ${executionId}`, {
8991
field,
9092
error: toError(error).message,
9193
})
94+
return false
9295
}
9396
}
9497

95-
/** Persist the last-started-block marker. No-op when Redis is unavailable. */
98+
/**
99+
* Persist the last-started-block marker. Returns `false` (caller should fall
100+
* back to the durable SQL path) when Redis is unavailable or the write fails.
101+
*/
96102
export async function setLastStartedBlock(
97103
executionId: string,
98104
marker: ExecutionLastStartedBlock
99-
): Promise<void> {
100-
await setMarker(executionId, STARTED_FIELD, 'startedAt', marker.startedAt, marker)
105+
): Promise<boolean> {
106+
return setMarker(executionId, STARTED_FIELD, 'startedAt', marker.startedAt, marker)
101107
}
102108

103-
/** Persist the last-completed-block marker. No-op when Redis is unavailable. */
109+
/**
110+
* Persist the last-completed-block marker. Returns `false` (caller should fall
111+
* back to the durable SQL path) when Redis is unavailable or the write fails.
112+
*/
104113
export async function setLastCompletedBlock(
105114
executionId: string,
106115
marker: ExecutionLastCompletedBlock
107-
): Promise<void> {
108-
await setMarker(executionId, COMPLETED_FIELD, 'endedAt', marker.endedAt, marker)
116+
): Promise<boolean> {
117+
return setMarker(executionId, COMPLETED_FIELD, 'endedAt', marker.endedAt, marker)
109118
}
110119

111120
export interface ExecutionProgressMarkers {
112121
lastStartedBlock?: ExecutionLastStartedBlock
113122
lastCompletedBlock?: ExecutionLastCompletedBlock
114123
}
115124

116-
function parseMarker<T>(raw: string | undefined): T | undefined {
125+
function safeJsonParse(raw: string | undefined): unknown {
117126
if (!raw) return undefined
118127
try {
119-
return JSON.parse(raw) as T
128+
return JSON.parse(raw)
120129
} catch {
121130
return undefined
122131
}
123132
}
124133

134+
function isRecord(value: unknown): value is Record<string, unknown> {
135+
return typeof value === 'object' && value !== null && !Array.isArray(value)
136+
}
137+
138+
/**
139+
* Parse a stored last-started marker, rebuilding it from validated fields so a
140+
* stale or wrong-shaped Redis value can never reach API consumers.
141+
*/
142+
function parseStartedMarker(raw: string | undefined): ExecutionLastStartedBlock | undefined {
143+
const v = safeJsonParse(raw)
144+
if (!isRecord(v)) return undefined
145+
const { blockId, blockName, blockType, startedAt } = v
146+
if (
147+
typeof blockId === 'string' &&
148+
typeof blockName === 'string' &&
149+
typeof blockType === 'string' &&
150+
typeof startedAt === 'string'
151+
) {
152+
return { blockId, blockName, blockType, startedAt }
153+
}
154+
return undefined
155+
}
156+
157+
/**
158+
* Parse a stored last-completed marker, rebuilding it from validated fields so a
159+
* stale or wrong-shaped Redis value can never reach API consumers.
160+
*/
161+
function parseCompletedMarker(raw: string | undefined): ExecutionLastCompletedBlock | undefined {
162+
const v = safeJsonParse(raw)
163+
if (!isRecord(v)) return undefined
164+
const { blockId, blockName, blockType, endedAt, success } = v
165+
if (
166+
typeof blockId === 'string' &&
167+
typeof blockName === 'string' &&
168+
typeof blockType === 'string' &&
169+
typeof endedAt === 'string' &&
170+
typeof success === 'boolean'
171+
) {
172+
return { blockId, blockName, blockType, endedAt, success }
173+
}
174+
return undefined
175+
}
176+
125177
/**
126178
* Read both markers for an execution. Returns an empty object when Redis is
127179
* unavailable, the key has expired, or nothing has been written yet.
@@ -135,9 +187,9 @@ export async function getProgressMarkers(executionId: string): Promise<Execution
135187
if (!fields || Object.keys(fields).length === 0) return {}
136188

137189
const result: ExecutionProgressMarkers = {}
138-
const started = parseMarker<ExecutionLastStartedBlock>(fields[STARTED_FIELD])
190+
const started = parseStartedMarker(fields[STARTED_FIELD])
139191
if (started) result.lastStartedBlock = started
140-
const completed = parseMarker<ExecutionLastCompletedBlock>(fields[COMPLETED_FIELD])
192+
const completed = parseCompletedMarker(fields[COMPLETED_FIELD])
141193
if (completed) result.lastCompletedBlock = completed
142194
return result
143195
} catch (error) {

0 commit comments

Comments
 (0)