Skip to content

Commit 8eef79f

Browse files
committed
refactor: strike events in rpc-client request handling, get result from queue
1 parent 6077499 commit 8eef79f

File tree

1 file changed

+13
-31
lines changed
  • deltachat-rpc-client/src/deltachat_rpc_client

1 file changed

+13
-31
lines changed

deltachat-rpc-client/src/deltachat_rpc_client/rpc.py

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,14 @@
99
import subprocess
1010
import sys
1111
from queue import Empty, Queue
12-
from threading import Event, Thread
12+
from threading import Thread
1313
from typing import Any, Iterator, Optional
1414

1515

1616
class JsonRpcError(Exception):
1717
"""JSON-RPC error."""
1818

1919

20-
class RpcFuture:
21-
"""RPC future waiting for RPC call result."""
22-
23-
def __init__(self, rpc: "Rpc", request_id: int, event: Event):
24-
self.rpc = rpc
25-
self.request_id = request_id
26-
self.event = event
27-
28-
def __call__(self):
29-
"""Wait for the future to return the result."""
30-
self.event.wait()
31-
response = self.rpc.request_results.pop(self.request_id)
32-
if "error" in response:
33-
raise JsonRpcError(response["error"])
34-
if "result" in response:
35-
return response["result"]
36-
return None
37-
38-
3920
class RpcMethod:
4021
"""RPC method."""
4122

@@ -57,11 +38,17 @@ def future(self, *args) -> Any:
5738
"params": args,
5839
"id": request_id,
5940
}
60-
event = Event()
61-
self.rpc.request_events[request_id] = event
41+
self.rpc.request_results[request_id] = queue = Queue()
6242
self.rpc.request_queue.put(request)
6343

64-
return RpcFuture(self.rpc, request_id, event)
44+
def rpc_future():
45+
"""Wait for the request to receive a result."""
46+
response = queue.get()
47+
if "error" in response:
48+
raise JsonRpcError(response["error"])
49+
return response.get("result", None)
50+
51+
return rpc_future
6552

6653

6754
class Rpc:
@@ -82,10 +69,8 @@ def __init__(self, accounts_dir: Optional[str] = None, **kwargs):
8269
self.process: subprocess.Popen
8370
self.id_iterator: Iterator[int]
8471
self.event_queues: dict[int, Queue]
85-
# Map from request ID to `threading.Event`.
86-
self.request_events: dict[int, Event]
87-
# Map from request ID to the result.
88-
self.request_results: dict[int, Any]
72+
# Map from request ID to a Queue which provides a single result
73+
self.request_results: dict[int, Queue]
8974
self.request_queue: Queue[Any]
9075
self.closing: bool
9176
self.reader_thread: Thread
@@ -114,7 +99,6 @@ def start(self) -> None:
11499
)
115100
self.id_iterator = itertools.count(start=1)
116101
self.event_queues = {}
117-
self.request_events = {}
118102
self.request_results = {}
119103
self.request_queue = Queue()
120104
self.closing = False
@@ -149,9 +133,7 @@ def reader_loop(self) -> None:
149133
response = json.loads(line)
150134
if "id" in response:
151135
response_id = response["id"]
152-
event = self.request_events.pop(response_id)
153-
self.request_results[response_id] = response
154-
event.set()
136+
self.request_results.pop(response_id).put(response)
155137
else:
156138
logging.warning("Got a response without ID: %s", response)
157139
except Exception:

0 commit comments

Comments
 (0)