@@ -103,6 +103,7 @@ export function useRealtimeRun<TTask extends AnyTask>(
103103 runId ,
104104 apiClient ,
105105 mutateRun ,
106+ setError ,
106107 abortControllerRef ,
107108 typeof options ?. stopOnCompletion === "boolean" ? options . stopOnCompletion : true
108109 ) ;
@@ -150,6 +151,12 @@ export function useRealtimeRun<TTask extends AnyTask>(
150151 } ;
151152 } , [ runId , stop , options ?. enabled ] ) ;
152153
154+ useEffect ( ( ) => {
155+ if ( run ?. finishedAt ) {
156+ setIsComplete ( true ) ;
157+ }
158+ } , [ run ] ) ;
159+
153160 return { run, error, stop } ;
154161}
155162
@@ -258,6 +265,7 @@ export function useRealtimeRunWithStreams<
258265 mutateRun ,
259266 mutateStreams ,
260267 streamsRef ,
268+ setError ,
261269 abortControllerRef ,
262270 typeof options ?. stopOnCompletion === "boolean" ? options . stopOnCompletion : true ,
263271 options ?. experimental_throttleInMs
@@ -306,6 +314,12 @@ export function useRealtimeRunWithStreams<
306314 } ;
307315 } , [ runId , stop , options ?. enabled ] ) ;
308316
317+ useEffect ( ( ) => {
318+ if ( run ?. finishedAt ) {
319+ setIsComplete ( true ) ;
320+ }
321+ } , [ run ] ) ;
322+
309323 return { run, streams : streams ?? initialStreamsFallback , error, stop } ;
310324}
311325
@@ -380,7 +394,14 @@ export function useRealtimeRunsWithTag<TTask extends AnyTask>(
380394 const abortController = new AbortController ( ) ;
381395 abortControllerRef . current = abortController ;
382396
383- await processRealtimeRunsWithTag ( tag , apiClient , mutateRuns , runsRef , abortControllerRef ) ;
397+ await processRealtimeRunsWithTag (
398+ tag ,
399+ apiClient ,
400+ mutateRuns ,
401+ runsRef ,
402+ setError ,
403+ abortControllerRef
404+ ) ;
384405 } catch ( err ) {
385406 // Ignore abort errors as they are expected.
386407 if ( ( err as any ) . name === "AbortError" ) {
@@ -470,7 +491,14 @@ export function useRealtimeBatch<TTask extends AnyTask>(
470491 const abortController = new AbortController ( ) ;
471492 abortControllerRef . current = abortController ;
472493
473- await processRealtimeBatch ( batchId , apiClient , mutateRuns , runsRef , abortControllerRef ) ;
494+ await processRealtimeBatch (
495+ batchId ,
496+ apiClient ,
497+ mutateRuns ,
498+ runsRef ,
499+ setError ,
500+ abortControllerRef
501+ ) ;
474502 } catch ( err ) {
475503 // Ignore abort errors as they are expected.
476504 if ( ( err as any ) . name === "AbortError" ) {
@@ -506,10 +534,12 @@ async function processRealtimeBatch<TTask extends AnyTask = AnyTask>(
506534 apiClient : ApiClient ,
507535 mutateRunsData : KeyedMutator < RealtimeRun < TTask > [ ] > ,
508536 existingRunsRef : React . MutableRefObject < RealtimeRun < TTask > [ ] > ,
537+ onError : ( e : Error ) => void ,
509538 abortControllerRef : React . MutableRefObject < AbortController | null >
510539) {
511540 const subscription = apiClient . subscribeToBatch < InferRunTypes < TTask > > ( batchId , {
512541 signal : abortControllerRef . current ?. signal ,
542+ onFetchError : onError ,
513543 } ) ;
514544
515545 for await ( const part of subscription ) {
@@ -541,10 +571,12 @@ async function processRealtimeRunsWithTag<TTask extends AnyTask = AnyTask>(
541571 apiClient : ApiClient ,
542572 mutateRunsData : KeyedMutator < RealtimeRun < TTask > [ ] > ,
543573 existingRunsRef : React . MutableRefObject < RealtimeRun < TTask > [ ] > ,
574+ onError : ( e : Error ) => void ,
544575 abortControllerRef : React . MutableRefObject < AbortController | null >
545576) {
546577 const subscription = apiClient . subscribeToRunsWithTag < InferRunTypes < TTask > > ( tag , {
547578 signal : abortControllerRef . current ?. signal ,
579+ onFetchError : onError ,
548580 } ) ;
549581
550582 for await ( const part of subscription ) {
@@ -582,13 +614,15 @@ async function processRealtimeRunWithStreams<
582614 mutateRunData : KeyedMutator < RealtimeRun < TTask > > ,
583615 mutateStreamData : KeyedMutator < StreamResults < TStreams > > ,
584616 existingDataRef : React . MutableRefObject < StreamResults < TStreams > > ,
617+ onError : ( e : Error ) => void ,
585618 abortControllerRef : React . MutableRefObject < AbortController | null > ,
586619 stopOnCompletion : boolean = true ,
587620 throttleInMs ?: number
588621) {
589622 const subscription = apiClient . subscribeToRun < InferRunTypes < TTask > > ( runId , {
590623 signal : abortControllerRef . current ?. signal ,
591624 closeOnComplete : stopOnCompletion ,
625+ onFetchError : onError ,
592626 } ) ;
593627
594628 type StreamUpdate = {
@@ -637,12 +671,14 @@ async function processRealtimeRun<TTask extends AnyTask = AnyTask>(
637671 runId : string ,
638672 apiClient : ApiClient ,
639673 mutateRunData : KeyedMutator < RealtimeRun < TTask > > ,
674+ onError : ( e : Error ) => void ,
640675 abortControllerRef : React . MutableRefObject < AbortController | null > ,
641676 stopOnCompletion : boolean = true
642677) {
643678 const subscription = apiClient . subscribeToRun < InferRunTypes < TTask > > ( runId , {
644679 signal : abortControllerRef . current ?. signal ,
645680 closeOnComplete : stopOnCompletion ,
681+ onFetchError : onError ,
646682 } ) ;
647683
648684 for await ( const part of subscription ) {
0 commit comments