22import time
33from contextlib import contextmanager
44from datetime import timedelta
5- from typing import ContextManager , Generator , Literal , Optional , Union
5+ from functools import wraps
6+ from typing import (
7+ Callable ,
8+ Generator ,
9+ Literal ,
10+ Optional ,
11+ ParamSpec ,
12+ TypeVar ,
13+ Union ,
14+ )
615
716import httpx
817from pydantic import BaseModel , Field , TypeAdapter , ValidationError
@@ -22,6 +31,10 @@ class BuildLogError(Exception):
2231 pass
2332
2433
34+ class TooManyRetriesError (Exception ):
35+ pass
36+
37+
2538class BuildLogLineGeneric (BaseModel ):
2639 type : Literal ["complete" , "failed" , "timeout" , "heartbeat" ]
2740 id : Optional [str ] = None
@@ -81,18 +94,39 @@ def _backoff() -> None:
8194 ) from error
8295
8396
97+ P = ParamSpec ("P" )
98+ T = TypeVar ("T" )
99+
100+
84101def attempts (
85102 total_attempts : int = 3 , timeout : timedelta = timedelta (minutes = 5 )
86- ) -> Generator [ContextManager [None ], None , None ]:
87- start = time .monotonic ()
103+ ) -> Callable [
104+ [Callable [P , Generator [T , None , None ]]], Callable [P , Generator [T , None , None ]]
105+ ]:
106+ def decorator (
107+ func : Callable [P , Generator [T , None , None ]],
108+ ) -> Callable [P , Generator [T , None , None ]]:
109+ @wraps (func )
110+ def wrapper (* args : P .args , ** kwargs : P .kwargs ) -> Generator [T , None , None ]:
111+ start = time .monotonic ()
88112
89- for attempt_number in range (total_attempts ):
90- if time .monotonic () - start > timeout .total_seconds ():
91- raise TimeoutError (
92- "Build log streaming timed out after %ds" , timeout .total_seconds ()
93- )
113+ for attempt_number in range (total_attempts ):
114+ if time .monotonic () - start > timeout .total_seconds ():
115+ raise TimeoutError (
116+ "Build log streaming timed out after %ds" ,
117+ timeout .total_seconds (),
118+ )
119+
120+ with attempt (attempt_number ):
121+ yield from func (* args , ** kwargs )
122+ # If we get here without exception, the generator completed successfully
123+ return
124+
125+ raise TooManyRetriesError (f"Failed after { total_attempts } attempts" )
94126
95- yield attempt (attempt_number )
127+ return wrapper
128+
129+ return decorator
96130
97131
98132class APIClient (httpx .Client ):
@@ -110,54 +144,47 @@ def __init__(self) -> None:
110144 },
111145 )
112146
147+ @attempts (BUILD_LOG_MAX_RETRIES , BUILD_LOG_TIMEOUT )
113148 def stream_build_logs (
114149 self , deployment_id : str
115150 ) -> Generator [BuildLogLine , None , None ]:
116151 last_id = None
117152
118- for attempt in attempts (BUILD_LOG_MAX_RETRIES , BUILD_LOG_TIMEOUT ):
119- with attempt :
120- while True :
121- params = {"last_id" : last_id } if last_id else None
122-
123- with self .stream (
124- "GET" ,
125- f"/deployments/{ deployment_id } /build-logs" ,
126- timeout = 60 ,
127- params = params ,
128- ) as response :
129- response .raise_for_status ()
130-
131- for line in response .iter_lines ():
132- if not line or not line .strip ():
133- continue
153+ while True :
154+ params = {"last_id" : last_id } if last_id else None
134155
135- if log_line := self ._parse_log_line (line ):
136- if log_line .id :
137- last_id = log_line .id
156+ with self .stream (
157+ "GET" ,
158+ f"/deployments/{ deployment_id } /build-logs" ,
159+ timeout = 60 ,
160+ params = params ,
161+ ) as response :
162+ response .raise_for_status ()
138163
139- if log_line .type == "message" :
140- yield log_line
164+ for line in response .iter_lines ():
165+ if not line or not line .strip ():
166+ continue
141167
142- if log_line .type in ("complete" , "failed" ):
143- yield log_line
168+ if log_line := self ._parse_log_line (line ):
169+ if log_line .id :
170+ last_id = log_line .id
144171
145- return
172+ if log_line .type == "message" :
173+ yield log_line
146174
147- if log_line .type == "timeout" :
148- logger . debug ( "Received timeout; reconnecting" )
149- break # Breaks for loop to reconnect
175+ if log_line .type in ( "complete" , "failed" ) :
176+ yield log_line
177+ return
150178
151- else : # Only triggered if the for loop is not broken
152- logger .debug (
153- "Connection closed by server unexpectedly; attempting to reconnect"
154- )
155- break
179+ if log_line . type == "timeout" :
180+ logger .debug ("Received timeout; reconnecting" )
181+ break # Breaks for loop to reconnect
182+ else :
183+ logger . debug ( "Connection closed by server unexpectedly; will retry" )
156184
157- time . sleep ( 0.5 )
185+ raise httpx . NetworkError ( "Connection closed without terminal state" )
158186
159- # Exhausted retries without getting any response
160- raise BuildLogError (f"Failed after { BUILD_LOG_MAX_RETRIES } attempts" )
187+ time .sleep (0.5 )
161188
162189 def _parse_log_line (self , line : str ) -> Optional [BuildLogLine ]:
163190 try :
0 commit comments