@@ -28,48 +28,95 @@ class ConnectionStringType(enum.Enum):
2828
2929
3030class ServerOperationExecutor :
31- def __init__ (self , store : DocumentStore ):
31+ def __init__ (
32+ self ,
33+ store : DocumentStore ,
34+ request_executor : ClusterRequestExecutor ,
35+ initial_request_executor : ClusterRequestExecutor = None ,
36+ cache : CaseInsensitiveDict = None ,
37+ node_tag : str = None ,
38+ ):
3239 if store is None :
3340 raise ValueError ("Store cannot be None" )
34- request_executor = self .create_request_executor (store )
35-
3641 if request_executor is None :
3742 raise ValueError ("Request Executor cannot be None" )
38- self .__store = store
39- self .__request_executor = request_executor
40- self .__initial_request_executor = None
41- self .__node_tag = None
42- self .__cache = CaseInsensitiveDict ()
43+ if cache is None :
44+ cache = CaseInsensitiveDict ()
45+
46+ self ._store = store
47+ self ._request_executor = request_executor
48+ self ._initial_request_executor = initial_request_executor
49+ self ._node_tag = node_tag
50+ self ._cache = cache
51+
52+ store .register_events_for_request_executor (self ._request_executor )
53+
54+ if self ._node_tag is None :
55+ self ._store .add_after_close (lambda : self ._request_executor .close ())
56+
57+ @classmethod
58+ def from_store (cls , store : DocumentStore ):
59+ return cls (store , cls .create_request_executor (store ), None , CaseInsensitiveDict (), None )
60+
61+ def for_node (self , node_tag : str ) -> ServerOperationExecutor :
62+ if not node_tag or node_tag .isspace ():
63+ raise ValueError ("Value cannot be None or whitespace" )
64+
65+ if self ._node_tag and self ._node_tag .lower () == node_tag .lower ():
66+ return self
4367
44- store .register_events_for_request_executor (self .__request_executor )
45- store .add_after_close (self .close )
68+ if self ._store .conventions .disable_topology_updates :
69+ raise RuntimeError (
70+ "Cannot switch server operation executor, because conventions.disable_topology_updates is set to 'True'"
71+ )
72+
73+ if node_tag in self ._cache :
74+ return self ._cache [node_tag ]
75+
76+ request_executor = self ._initial_request_executor or self ._request_executor
77+ topology = self ._get_topology (self ._request_executor )
78+
79+ node = next ((node for node in topology .nodes if node_tag .lower () == node .cluster_tag .lower ()), None )
80+
81+ if node is None :
82+ available_nodes = str .join (", " , [node .cluster_tag for node in topology .nodes ])
83+ raise RuntimeError (f"Could not find node '{ node_tag } ' in the topology. Available nodes: { available_nodes } " )
84+
85+ cluster_executor = ClusterRequestExecutor .create_for_single_node (
86+ node .url ,
87+ self ._store .thread_pool_executor ,
88+ self ._store .conventions ,
89+ self ._store .certificate_pem_path ,
90+ self ._store .trust_store_path ,
91+ )
92+ return ServerOperationExecutor (self ._store , cluster_executor , request_executor , self ._cache , node .cluster_tag )
4693
4794 def send (self , operation : ServerOperation [_T_OperationResult ]) -> Optional [_T_OperationResult ]:
48- command = operation .get_command (self .__request_executor .conventions )
49- self .__request_executor .execute_command (command )
95+ command = operation .get_command (self ._request_executor .conventions )
96+ self ._request_executor .execute_command (command )
5097
5198 if isinstance (operation , ServerOperation ):
5299 return command .result
53100
54101 def send_async (self , operation : ServerOperation [OperationIdResult ]) -> Operation :
55- command = operation .get_command (self .__request_executor .conventions )
102+ command = operation .get_command (self ._request_executor .conventions )
56103
57- self .__request_executor .execute_command (command )
104+ self ._request_executor .execute_command (command )
58105 return ServerWideOperation (
59- self .__request_executor ,
60- self .__request_executor .conventions ,
106+ self ._request_executor ,
107+ self ._request_executor .conventions ,
61108 command .result .operation_id ,
62109 command .selected_node_tag if command .selected_node_tag else command .result .operation_node_tag ,
63110 )
64111
65112 def close (self ) -> None :
66- if self .__node_tag is not None :
113+ if self ._node_tag is not None :
67114 return
68115
69- if self .__request_executor is not None :
70- self .__request_executor .close ()
116+ if self ._request_executor is not None :
117+ self ._request_executor .close ()
71118
72- cache = self .__cache
119+ cache = self ._cache
73120 if cache is not None :
74121 for key , value in cache .items ():
75122 request_executor = value ._request_executor
@@ -78,8 +125,8 @@ def close(self) -> None:
78125
79126 cache .clear ()
80127
81- def __get_topology (self , request_executor : ClusterRequestExecutor ) -> Topology :
82- topology : Topology = None
128+ def _get_topology (self , request_executor : ClusterRequestExecutor ) -> Topology :
129+ topology : Optional [ Topology ] = None
83130 try :
84131 topology = request_executor .topology
85132 if topology is None :
@@ -92,8 +139,8 @@ def __get_topology(self, request_executor: ClusterRequestExecutor) -> Topology:
92139
93140 topology = request_executor .topology
94141
95- except :
96- pass
142+ except Exception :
143+ ... # ignored
97144
98145 if topology is None :
99146 raise RuntimeError ("Could not fetch the topology" )
0 commit comments