-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtimer.py
More file actions
275 lines (229 loc) · 8.1 KB
/
timer.py
File metadata and controls
275 lines (229 loc) · 8.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import logging
import time
import threading
from queue import Queue
from functools import wraps
from typing import Any, Callable
logger = logging.getLogger('Timer')
logging.basicConfig(level=logging.DEBUG, format='%(levelname)s │ %(message)s')
class FunctionTimeoutError(Exception):
pass
def fib(n: int) -> int:
if n <= 1:
return n
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return b
def get_func_path(func: Callable[..., Any]) -> str:
"""
Get a function's relative path.
Args:
func (Callable[..., Any]):
Examples:
>>> get_func_path(report)
'module.device.platform.winapi.functions_windows.report'
>>> get_func_path(FILETIME.__init_subclass__)
'module.device.platform.winapi.structures_windows.Structure::__init_subclass__'
Returns:
str:
"""
if not callable(func):
raise TypeError(f"Expected a callable, but got {type(func).__name__}")
module = getattr(func, '__module__', '')
qualname = getattr(func, '__qualname__', getattr(func, '__name__', '')).replace('.', '::')
return '.'.join(filter(lambda x: x != '', [module, qualname]))
class Timer:
def __init__(self, limit, count=0):
"""
Args:
limit (int, float): Timer limit
count (int): Timer reach confirm count. Default to 0.
When using a structure like this, must set a count.
Otherwise it goes wrong, if screenshot time cost greater than limit.
if self.appear(MAIN_CHECK):
if confirm_timer.reached():
pass
else:
confirm_timer.reset()
Also, It's a good idea to set `count`, to make alas run more stable on slow computers.
Expected speed is 0.35 second / screenshot.
"""
self.limit = limit
self.count = count
self._current = 0
self._reach_count = count
def start(self):
if not self.started():
self._current = time.time()
self._reach_count = 0
return self
def started(self):
return bool(self._current)
def current(self):
"""
Returns:
float
"""
if self.started():
return time.time() - self._current
else:
return 0.
def reached(self):
"""
Returns:
bool
"""
self._reach_count += 1
return time.time() - self._current > self.limit and self._reach_count > self.count
def reset(self):
self._current = time.time()
self._reach_count = 0
return self
def clear(self):
self._current = 0
self._reach_count = self.count
return self
def reached_and_reset(self):
"""
Returns:
bool:
"""
if self.reached():
self.reset()
return True
else:
return False
def wait(self):
"""
Wait until timer reached.
"""
diff = self._current + self.limit - time.time()
if diff > 0:
time.sleep(diff)
def show(self):
logger.info(str(self))
def __str__(self):
return f'Timer(limit={round(self.current(), 3)}/{self.limit}, count={self._reach_count}/{self.count})'
__repr__ = __str__
class TimerLogger:
def __init__(self, func: Callable[..., Any], level: int):
self.path = get_func_path(func)
self.original_level = level
logger.setLevel(logging.DEBUG)
def __enter__(self):
logger.debug(f"{self.path} | Enter")
self.start_time = time.perf_counter()
return self.path
def __exit__(self, exc_type: type, exc_val: Exception, exc_tb: Any):
self.execution_time = (time.perf_counter() - self.start_time) * 1e3
logger.debug(f"{self.path} | Leave, {self.execution_time:.2f} ms")
logger.setLevel(self.original_level)
"""
if exc_type is None:
logger.debug(f"{self.path} | Leave, {self.execution_time:.2f} ms")
else:
logger.error(f"{self.path} | Exception occurred: {exc_type.__name__}: {exc_val}")
if exc_tb:
tb_str = ''.join(traceback.format_tb(exc_tb))
logger.error(f"{self.path} | Traceback:\n{tb_str}")
"""
F = Callable[..., Any]
def timer(timeout: int | float = 1):
"""
A decorator to measure the execution time of a function with timeout control.
Args:
timeout (int): The maximum allowed execution time (in seconds) for the function.
Notes:
- There's no way to kill a RUNNING thread! Please ensure that the decorated function doesn't get stuck in an infinite loop.
- threading.RLock is strongly recommended to ensure thread safety.
- This decorator is not intended for use in user environments. Please modify it if needed.
Examples:
>>> @timer(timeout=2)
>>> def fun():
>>> for i in range(5):
>>> logger.info("Function 'fun' running...")
>>> time.sleep(1)
>>>
>>> try:
>>> fun()
>>> except TimeoutError as e:
>>> logger.error(e)
DEBUG │ __main__.fun | Enter
INFO │ Function 'fun' running...
INFO │ Function 'fun' running...
INFO │ Function 'fun' running...
DEBUG │ __main__.fun | Leave, 2015.5029000000013 ms
ERROR │ TimeoutError: Function __main__.fun timedout after 2 seconds
INFO │ Function 'fun' running...
INFO │ Function 'fun' running...
Raises:
TypeError: If the function is not callable.
FunctionTimeoutError: If the function execution time exceeds the specified timeout.
Exception: Any exceptions that may be thrown by the decorated function.
"""
def decorator(func: F) -> F:
if not callable(func):
raise TypeError(f"Expected a callable, but got {type(func).__name__}")
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
result, exc = None, None
stop_event, _lock = threading.Event(), threading.RLock()
def target():
nonlocal result, exc
with _lock:
try:
result = func(*args, **kwargs)
except Exception as e:
exc = e
finally:
stop_event.set()
with TimerLogger(func, logger.level) as path:
target_thread = threading.Thread(target=target, name=f"Thread-{path}")
target_thread.start()
if not stop_event.wait(timeout=timeout):
raise FunctionTimeoutError(f"Function {path} timed out after {timeout} seconds")
if exc is not None:
raise exc
return result
return wrapper
return decorator
@timer(timeout=2)
def fun(t: int):
logger.info(f"Function 'fun' running. Sleep for {t} seconds...")
time.sleep(t)
def test():
threads: list[threading.Thread] = []
for t in range(5):
threads.append(threading.Thread(target=fun, args=(t,)))
for thread in threads:
try:
thread.start()
except FunctionTimeoutError as e:
logger.error(e)
@timer(timeout=2)
def test2():
_lock = threading.Lock()
def worker(t: int, q: Queue[tuple[int, int]]) -> None:
result = fib(t)
with _lock:
q.put((t, result))
threads: list[threading.Thread] = []
results: Queue[tuple[int, int]] = Queue()
for t in range(20, 40, 2):
threads.append(threading.Thread(target=worker, args=(t, results)))
for thread in threads:
try:
thread.start()
except FunctionTimeoutError as e:
logger.error(e)
for thread in threads:
thread.join()
while not results.empty():
t, result = results.get()
print(f"fib({t}) = {result}")
if __name__ == '__main__':
try:
test2()
except FunctionTimeoutError as e:
logger.error(e)