@@ -8,22 +8,30 @@ import { type NextRequest, NextResponse } from 'next/server'
88import { verifyCronAuth } from '@/lib/auth/internal'
99import { acquireLock , releaseLock } from '@/lib/core/config/redis'
1010import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
11- import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
11+ import {
12+ computeEarliestResumeAt ,
13+ PauseResumeManager ,
14+ } from '@/lib/workflows/executor/human-in-the-loop-manager'
15+ import type { PausePoint } from '@/executor/types'
1216
1317const logger = createLogger ( 'TimePauseResumePoll' )
1418
1519export const dynamic = 'force-dynamic'
1620export const maxDuration = 120
1721
1822const LOCK_KEY = 'time-pause-resume-poll-lock'
19- const LOCK_TTL_SECONDS = 120
23+ const LOCK_TTL_SECONDS = 180
2024const POLL_BATCH_LIMIT = 200
2125
22- interface StoredPausePoint {
23- contextId ?: string
24- resumeStatus ?: string
25- pauseKind ?: string
26- resumeAt ?: string
26+ interface DispatchFailure {
27+ executionId : string
28+ contextId : string
29+ error : string
30+ }
31+
32+ interface RowResult {
33+ dispatched : number
34+ failures : DispatchFailure [ ]
2735}
2836
2937export const GET = withRouteHandler ( async ( request : NextRequest ) => {
@@ -40,10 +48,6 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
4048 )
4149 }
4250
43- let claimedRows = 0
44- let dispatched = 0
45- const failures : { executionId : string ; contextId : string ; error : string } [ ] = [ ]
46-
4751 try {
4852 const now = new Date ( )
4953
@@ -65,89 +69,21 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
6569 )
6670 . limit ( POLL_BATCH_LIMIT )
6771
68- claimedRows = dueRows . length
69-
70- for ( const row of dueRows ) {
71- const points = ( row . pausePoints ?? { } ) as Record < string , StoredPausePoint >
72- const metadata = ( row . metadata ?? { } ) as Record < string , unknown >
73- const userId = typeof metadata . executorUserId === 'string' ? metadata . executorUserId : ''
74-
75- const duePoints : StoredPausePoint [ ] = [ ]
76- let nextRemaining : Date | null = null
77-
78- for ( const point of Object . values ( points ) ) {
79- if ( point . pauseKind !== 'time' || ! point . resumeAt ) continue
80- if ( point . resumeStatus && point . resumeStatus !== 'paused' ) continue
81-
82- const resumeAt = new Date ( point . resumeAt )
83- if ( Number . isNaN ( resumeAt . getTime ( ) ) ) continue
84-
85- if ( resumeAt <= now ) {
86- duePoints . push ( point )
87- } else if ( ! nextRemaining || resumeAt < nextRemaining ) {
88- nextRemaining = resumeAt
89- }
90- }
91-
92- for ( const point of duePoints ) {
93- const contextId = point . contextId
94- if ( ! contextId ) continue
95- try {
96- const enqueueResult = await PauseResumeManager . enqueueOrStartResume ( {
97- executionId : row . executionId ,
98- contextId,
99- resumeInput : { } ,
100- userId,
101- } )
102-
103- if ( enqueueResult . status === 'starting' ) {
104- PauseResumeManager . startResumeExecution ( {
105- resumeEntryId : enqueueResult . resumeEntryId ,
106- resumeExecutionId : enqueueResult . resumeExecutionId ,
107- pausedExecution : enqueueResult . pausedExecution ,
108- contextId : enqueueResult . contextId ,
109- resumeInput : enqueueResult . resumeInput ,
110- userId : enqueueResult . userId ,
111- } ) . catch ( ( error ) => {
112- logger . error ( 'Background time-pause resume failed' , {
113- executionId : row . executionId ,
114- contextId,
115- error : toError ( error ) . message ,
116- } )
117- } )
118- }
119- dispatched ++
120- } catch ( error ) {
121- const message = toError ( error ) . message
122- logger . warn ( 'Failed to dispatch time-pause resume' , {
123- executionId : row . executionId ,
124- contextId,
125- error : message ,
126- } )
127- failures . push ( { executionId : row . executionId , contextId, error : message } )
128- }
129- }
130-
131- // We never auto-retry a failed dispatch: workflow blocks aren't idempotent, and an
132- // operator must investigate stranded rows by hand. Setting nextResumeAt to the next
133- // future pause (or null) drops the row out of the poll, surfacing the failure.
134- await db
135- . update ( pausedExecutions )
136- . set ( { nextResumeAt : nextRemaining } )
137- . where ( eq ( pausedExecutions . id , row . id ) )
138- }
72+ const results = await Promise . all ( dueRows . map ( ( row ) => dispatchRow ( row , now ) ) )
73+ const dispatched = results . reduce ( ( sum , r ) => sum + r . dispatched , 0 )
74+ const failures = results . flatMap ( ( r ) => r . failures )
13975
14076 logger . info ( 'Time-pause resume poll completed' , {
14177 requestId,
142- claimedRows,
78+ claimedRows : dueRows . length ,
14379 dispatched,
14480 failureCount : failures . length ,
14581 } )
14682
14783 return NextResponse . json ( {
14884 success : true ,
14985 requestId,
150- claimedRows,
86+ claimedRows : dueRows . length ,
15187 dispatched,
15288 failures,
15389 } )
@@ -159,3 +95,79 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
15995 await releaseLock ( LOCK_KEY , requestId ) . catch ( ( ) => { } )
16096 }
16197} )
98+
99+ interface DueRow {
100+ id : string
101+ executionId : string
102+ workflowId : string
103+ pausePoints : unknown
104+ metadata : unknown
105+ }
106+
107+ async function dispatchRow ( row : DueRow , now : Date ) : Promise < RowResult > {
108+ const points = ( row . pausePoints ?? { } ) as Record < string , PausePoint >
109+ const metadata = ( row . metadata ?? { } ) as Record < string , unknown >
110+ const userId = typeof metadata . executorUserId === 'string' ? metadata . executorUserId : ''
111+
112+ const eligiblePoints = Object . values ( points ) . filter (
113+ ( point ) =>
114+ point . pauseKind === 'time' && ( ! point . resumeStatus || point . resumeStatus === 'paused' )
115+ )
116+ const duePoints = eligiblePoints . filter ( ( point ) => {
117+ if ( ! point . resumeAt ) return false
118+ const at = new Date ( point . resumeAt )
119+ return ! Number . isNaN ( at . getTime ( ) ) && at <= now
120+ } )
121+
122+ const failures : DispatchFailure [ ] = [ ]
123+ let dispatched = 0
124+
125+ for ( const point of duePoints ) {
126+ if ( ! point . contextId ) continue
127+ try {
128+ const enqueueResult = await PauseResumeManager . enqueueOrStartResume ( {
129+ executionId : row . executionId ,
130+ contextId : point . contextId ,
131+ resumeInput : { } ,
132+ userId,
133+ } )
134+
135+ if ( enqueueResult . status === 'starting' ) {
136+ PauseResumeManager . startResumeExecution ( {
137+ resumeEntryId : enqueueResult . resumeEntryId ,
138+ resumeExecutionId : enqueueResult . resumeExecutionId ,
139+ pausedExecution : enqueueResult . pausedExecution ,
140+ contextId : enqueueResult . contextId ,
141+ resumeInput : enqueueResult . resumeInput ,
142+ userId : enqueueResult . userId ,
143+ } ) . catch ( ( error ) => {
144+ logger . error ( 'Background time-pause resume failed' , {
145+ executionId : row . executionId ,
146+ contextId : point . contextId ,
147+ error : toError ( error ) . message ,
148+ } )
149+ } )
150+ }
151+ dispatched ++
152+ } catch ( error ) {
153+ const message = toError ( error ) . message
154+ logger . warn ( 'Failed to dispatch time-pause resume' , {
155+ executionId : row . executionId ,
156+ contextId : point . contextId ,
157+ error : message ,
158+ } )
159+ failures . push ( { executionId : row . executionId , contextId : point . contextId , error : message } )
160+ }
161+ }
162+
163+ // We never auto-retry a failed dispatch: workflow blocks aren't idempotent, and
164+ // an operator must investigate stranded rows by hand. The status='paused' guard
165+ // also prevents clobbering when a concurrent manual resume has already advanced
166+ // the row's state since we read it.
167+ await PauseResumeManager . setNextResumeAt ( {
168+ pausedExecutionId : row . id ,
169+ nextResumeAt : computeEarliestResumeAt ( eligiblePoints , { after : now } ) ,
170+ } )
171+
172+ return { dispatched, failures }
173+ }
0 commit comments