22import types
33import grpc
44from google .protobuf .json_format import ParseDict
5- from google .protobuf .message_factory import MessageFactory
5+ from google .protobuf .message_factory import MessageFactory , GetMessageClass
66from google .protobuf .descriptor_pool import DescriptorPool
77from google .protobuf .descriptor import ServiceDescriptor , MethodDescriptor
8- from grpc_reflection .v1alpha .proto_reflection_descriptor_database import ProtoReflectionDescriptorDatabase
8+ from grpc_reflection .v1alpha .proto_reflection_descriptor_database import (
9+ ProtoReflectionDescriptorDatabase ,
10+ )
911from spaceone .core .error import *
1012
1113_MAX_RETRIES = 2
1416
1517
1618class _ClientInterceptor (
17- grpc .UnaryUnaryClientInterceptor , grpc .UnaryStreamClientInterceptor ,
18- grpc .StreamUnaryClientInterceptor , grpc .StreamStreamClientInterceptor ):
19-
19+ grpc .UnaryUnaryClientInterceptor ,
20+ grpc .UnaryStreamClientInterceptor ,
21+ grpc .StreamUnaryClientInterceptor ,
22+ grpc .StreamStreamClientInterceptor ,
23+ ):
2024 def __init__ (self , options : dict , channel_key : str , request_map : dict ):
2125 self ._request_map = request_map
2226 self ._channel_key = channel_key
23- self .metadata = options .get (' metadata' , {})
27+ self .metadata = options .get (" metadata" , {})
2428
2529 def _check_message (self , client_call_details , request_or_iterator , is_stream ):
2630 if client_call_details .method in self ._request_map :
2731 if is_stream :
2832 if not isinstance (request_or_iterator , types .GeneratorType ):
29- raise Exception ("Stream method must be specified as a generator type." )
33+ raise Exception (
34+ "Stream method must be specified as a generator type."
35+ )
3036
31- return self ._generate_message (request_or_iterator , client_call_details .method )
37+ return self ._generate_message (
38+ request_or_iterator , client_call_details .method
39+ )
3240
3341 else :
34- return self ._make_message (request_or_iterator , client_call_details .method )
42+ return self ._make_message (
43+ request_or_iterator , client_call_details .method
44+ )
3545
3646 return request_or_iterator
3747
@@ -50,17 +60,17 @@ def _check_error(self, response):
5060 if isinstance (response , Exception ):
5161 details = response .details ()
5262 status_code = response .code ().name
53- if details .startswith (' ERROR_' ):
54- details_split = details .split (':' , 1 )
63+ if details .startswith (" ERROR_" ):
64+ details_split = details .split (":" , 1 )
5565 if len (details_split ) == 2 :
5666 error_code , error_message = details_split
5767 else :
5868 error_code = details_split [0 ]
5969 error_message = details
6070
61- if status_code == ' PERMISSION_DENIED' :
71+ if status_code == " PERMISSION_DENIED" :
6272 raise ERROR_PERMISSION_DENIED ()
63- elif status_code == ' UNAUTHENTICATED' :
73+ elif status_code == " UNAUTHENTICATED" :
6474 raise ERROR_AUTHENTICATE_FAILURE (message = error_message )
6575 else :
6676 e = ERROR_INTERNAL_API (message = error_message )
@@ -70,13 +80,15 @@ def _check_error(self, response):
7080
7181 else :
7282 error_message = response .details ()
73- if status_code == ' PERMISSION_DENIED' :
83+ if status_code == " PERMISSION_DENIED" :
7484 raise ERROR_PERMISSION_DENIED ()
75- elif status_code == ' PERMISSION_DENIED' :
85+ elif status_code == " PERMISSION_DENIED" :
7686 raise ERROR_AUTHENTICATE_FAILURE (message = error_message )
77- elif status_code == 'UNAVAILABLE' :
78- e = ERROR_GRPC_CONNECTION (channel = self ._channel_key , message = error_message )
79- e .meta ['channel' ] = self ._channel_key
87+ elif status_code == "UNAVAILABLE" :
88+ e = ERROR_GRPC_CONNECTION (
89+ channel = self ._channel_key , message = error_message
90+ )
91+ e .meta ["channel" ] = self ._channel_key
8092 raise e
8193 else :
8294 e = ERROR_INTERNAL_API (message = error_message )
@@ -92,12 +104,16 @@ def _generate_response(self, response_iterator):
92104 except Exception as e :
93105 self ._check_error (e )
94106
95- def _retry_call (self , continuation , client_call_details , request_or_iterator , is_stream ):
107+ def _retry_call (
108+ self , continuation , client_call_details , request_or_iterator , is_stream
109+ ):
96110 retries = 0
97111
98112 while True :
99113 try :
100- response_or_iterator = continuation (client_call_details , request_or_iterator )
114+ response_or_iterator = continuation (
115+ client_call_details , request_or_iterator
116+ )
101117
102118 if is_stream :
103119 response_or_iterator = self ._generate_response (response_or_iterator )
@@ -107,84 +123,142 @@ def _retry_call(self, continuation, client_call_details, request_or_iterator, is
107123 return response_or_iterator
108124
109125 except Exception as e :
110- if e .error_code == ' ERROR_GRPC_CONNECTION' :
126+ if e .error_code == " ERROR_GRPC_CONNECTION" :
111127 if retries >= _MAX_RETRIES :
112- channel = e .meta .get (' channel' )
128+ channel = e .meta .get (" channel" )
113129 if channel in _GRPC_CHANNEL :
114- _LOGGER .error (f'Disconnect gRPC Endpoint. (channel = { channel } )' )
130+ _LOGGER .error (
131+ f"Disconnect gRPC Endpoint. (channel = { channel } )"
132+ )
115133 del _GRPC_CHANNEL [channel ]
116134 raise e
117135 else :
118- _LOGGER .debug (f'Retry gRPC Call: reason = { e .message } , retry = { retries + 1 } ' )
136+ _LOGGER .debug (
137+ f"Retry gRPC Call: reason = { e .message } , retry = { retries + 1 } "
138+ )
119139 else :
120140 raise e
121141
122142 retries += 1
123143
124- def _intercept_call (self , continuation , client_call_details ,
125- request_or_iterator , is_request_stream , is_response_stream ):
126- new_request_or_iterator = self . _check_message (
127- client_call_details , request_or_iterator , is_request_stream )
128-
129- return self ._retry_call (continuation , client_call_details ,
130- new_request_or_iterator , is_response_stream )
144+ def _intercept_call (
145+ self ,
146+ continuation ,
147+ client_call_details ,
148+ request_or_iterator ,
149+ is_request_stream ,
150+ is_response_stream ,
151+ ):
152+ new_request_or_iterator = self ._check_message (
153+ client_call_details , request_or_iterator , is_request_stream
154+ )
155+
156+ return self ._retry_call (
157+ continuation ,
158+ client_call_details ,
159+ new_request_or_iterator ,
160+ is_response_stream ,
161+ )
131162
132163 def intercept_unary_unary (self , continuation , client_call_details , request ):
133- return self ._intercept_call (continuation , client_call_details , request , False , False )
164+ return self ._intercept_call (
165+ continuation , client_call_details , request , False , False
166+ )
134167
135168 def intercept_unary_stream (self , continuation , client_call_details , request ):
136- return self ._intercept_call (continuation , client_call_details , request , False , True )
169+ return self ._intercept_call (
170+ continuation , client_call_details , request , False , True
171+ )
137172
138- def intercept_stream_unary (self , continuation , client_call_details , request_iterator ):
139- return self ._intercept_call (continuation , client_call_details , request_iterator , True , False )
173+ def intercept_stream_unary (
174+ self , continuation , client_call_details , request_iterator
175+ ):
176+ return self ._intercept_call (
177+ continuation , client_call_details , request_iterator , True , False
178+ )
140179
141- def intercept_stream_stream (self , continuation , client_call_details , request_iterator ):
142- return self ._intercept_call (continuation , client_call_details , request_iterator , True , True )
180+ def intercept_stream_stream (
181+ self , continuation , client_call_details , request_iterator
182+ ):
183+ return self ._intercept_call (
184+ continuation , client_call_details , request_iterator , True , True
185+ )
143186
144187
145188class _GRPCStub (object ):
146-
147- def __init__ (self , desc_pool : DescriptorPool , service_desc : ServiceDescriptor , channel : grpc .Channel ):
189+ def __init__ (
190+ self ,
191+ desc_pool : DescriptorPool ,
192+ service_desc : ServiceDescriptor ,
193+ channel : grpc .Channel ,
194+ ):
148195 self ._desc_pool = desc_pool
149196 for method_desc in service_desc .methods :
150197 self ._bind_grpc_method (service_desc , method_desc , channel )
151198
152- def _bind_grpc_method (self , service_desc : ServiceDescriptor , method_desc : MethodDescriptor , channel : grpc .Channel ):
199+ def _bind_grpc_method (
200+ self ,
201+ service_desc : ServiceDescriptor ,
202+ method_desc : MethodDescriptor ,
203+ channel : grpc .Channel ,
204+ ):
153205 method_name = method_desc .name
154- method_key = f'/{ service_desc .full_name } /{ method_name } '
155- request_desc = self ._desc_pool .FindMessageTypeByName (method_desc .input_type .full_name )
156- request_message_desc = MessageFactory (self ._desc_pool ).GetPrototype (request_desc )
157- response_desc = self ._desc_pool .FindMessageTypeByName (method_desc .output_type .full_name )
158- response_message_desc = MessageFactory (self ._desc_pool ).GetPrototype (response_desc )
206+ method_key = f"/{ service_desc .full_name } /{ method_name } "
207+ request_desc = self ._desc_pool .FindMessageTypeByName (
208+ method_desc .input_type .full_name
209+ )
210+ # request_message_desc = MessageFactory(self._desc_pool).GetPrototype(request_desc)
211+ request_message_desc = GetMessageClass (request_desc )
212+
213+ response_desc = self ._desc_pool .FindMessageTypeByName (
214+ method_desc .output_type .full_name
215+ )
216+ # response_message_desc = MessageFactory(self._desc_pool).GetPrototype(response_desc)
217+ response_message_desc = GetMessageClass (response_desc )
159218
160219 if method_desc .client_streaming and method_desc .server_streaming :
161- setattr (self , method_name , channel .stream_stream (
162- method_key ,
163- request_serializer = request_message_desc .SerializeToString ,
164- response_deserializer = response_message_desc .FromString
165- ))
220+ setattr (
221+ self ,
222+ method_name ,
223+ channel .stream_stream (
224+ method_key ,
225+ request_serializer = request_message_desc .SerializeToString ,
226+ response_deserializer = response_message_desc .FromString ,
227+ ),
228+ )
166229 elif method_desc .client_streaming and not method_desc .server_streaming :
167- setattr (self , method_name , channel .stream_unary (
168- method_key ,
169- request_serializer = request_message_desc .SerializeToString ,
170- response_deserializer = response_message_desc .FromString
171- ))
230+ setattr (
231+ self ,
232+ method_name ,
233+ channel .stream_unary (
234+ method_key ,
235+ request_serializer = request_message_desc .SerializeToString ,
236+ response_deserializer = response_message_desc .FromString ,
237+ ),
238+ )
172239 elif not method_desc .client_streaming and method_desc .server_streaming :
173- setattr (self , method_name , channel .unary_stream (
174- method_key ,
175- request_serializer = request_message_desc .SerializeToString ,
176- response_deserializer = response_message_desc .FromString
177- ))
240+ setattr (
241+ self ,
242+ method_name ,
243+ channel .unary_stream (
244+ method_key ,
245+ request_serializer = request_message_desc .SerializeToString ,
246+ response_deserializer = response_message_desc .FromString ,
247+ ),
248+ )
178249 else :
179- setattr (self , method_name , channel .unary_unary (
180- method_key ,
181- request_serializer = request_message_desc .SerializeToString ,
182- response_deserializer = response_message_desc .FromString
183- ))
250+ setattr (
251+ self ,
252+ method_name ,
253+ channel .unary_unary (
254+ method_key ,
255+ request_serializer = request_message_desc .SerializeToString ,
256+ response_deserializer = response_message_desc .FromString ,
257+ ),
258+ )
184259
185260
186261class GRPCClient (object ):
187-
188262 def __init__ (self , channel , options , channel_key ):
189263 self ._request_map = {}
190264 self ._api_resources = {}
@@ -193,7 +267,9 @@ def __init__(self, channel, options, channel_key):
193267 self ._desc_pool = DescriptorPool (self ._reflection_db )
194268 self ._init_grpc_reflection ()
195269
196- _client_interceptor = _ClientInterceptor (options , channel_key , self ._request_map )
270+ _client_interceptor = _ClientInterceptor (
271+ options , channel_key , self ._request_map
272+ )
197273 _intercept_channel = grpc .intercept_channel (channel , _client_interceptor )
198274 self ._bind_grpc_stub (_intercept_channel )
199275
@@ -206,9 +282,12 @@ def _init_grpc_reflection(self):
206282 service_desc : ServiceDescriptor = self ._desc_pool .FindServiceByName (service )
207283 service_name = service_desc .name
208284 for method_desc in service_desc .methods :
209- method_key = f'/{ service } /{ method_desc .name } '
210- request_desc = self ._desc_pool .FindMessageTypeByName (method_desc .input_type .full_name )
211- self ._request_map [method_key ] = MessageFactory (self ._desc_pool ).GetPrototype (request_desc )
285+ method_key = f"/{ service } /{ method_desc .name } "
286+ request_desc = self ._desc_pool .FindMessageTypeByName (
287+ method_desc .input_type .full_name
288+ )
289+ # self._request_map[method_key] = MessageFactory(self._desc_pool).GetPrototype(request_desc)
290+ self ._request_map [method_key ] = GetMessageClass (request_desc )
212291
213292 if service_desc .name not in self ._api_resources :
214293 self ._api_resources [service_name ] = []
@@ -219,7 +298,11 @@ def _bind_grpc_stub(self, intercept_channel: grpc.Channel):
219298 for service in self ._reflection_db .get_services ():
220299 service_desc : ServiceDescriptor = self ._desc_pool .FindServiceByName (service )
221300
222- setattr (self , service_desc .name , _GRPCStub (self ._desc_pool , service_desc , intercept_channel ))
301+ setattr (
302+ self ,
303+ service_desc .name ,
304+ _GRPCStub (self ._desc_pool , service_desc , intercept_channel ),
305+ )
223306
224307
225308def _create_secure_channel (endpoint , options ):
@@ -245,8 +328,8 @@ def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_o
245328 options = []
246329
247330 if max_message_length :
248- options .append ((' grpc.max_send_message_length' , max_message_length ))
249- options .append ((' grpc.max_receive_message_length' , max_message_length ))
331+ options .append ((" grpc.max_send_message_length" , max_message_length ))
332+ options .append ((" grpc.max_receive_message_length" , max_message_length ))
250333
251334 if ssl_enabled :
252335 channel = _create_secure_channel (endpoint , options )
@@ -256,12 +339,14 @@ def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_o
256339 try :
257340 grpc .channel_ready_future (channel ).result (timeout = 3 )
258341 except Exception as e :
259- raise ERROR_GRPC_CONNECTION (channel = endpoint , message = 'Channel is not ready.' )
342+ raise ERROR_GRPC_CONNECTION (
343+ channel = endpoint , message = "Channel is not ready."
344+ )
260345
261346 try :
262347 _GRPC_CHANNEL [endpoint ] = GRPCClient (channel , client_opts , endpoint )
263348 except Exception as e :
264- if hasattr (e , ' details' ):
349+ if hasattr (e , " details" ):
265350 raise ERROR_GRPC_CONNECTION (channel = endpoint , message = e .details ())
266351 else :
267352 raise ERROR_GRPC_CONNECTION (channel = endpoint , message = str (e ))
@@ -271,12 +356,16 @@ def client(endpoint=None, ssl_enabled=False, max_message_length=None, **client_o
271356
272357def get_grpc_method (uri_info ):
273358 try :
274- conn = client (endpoint = uri_info ['endpoint' ], ssl_enabled = uri_info ['ssl_enabled' ])
275- return getattr (getattr (conn , uri_info ['service' ]), uri_info ['method' ])
359+ conn = client (
360+ endpoint = uri_info ["endpoint" ], ssl_enabled = uri_info ["ssl_enabled" ]
361+ )
362+ return getattr (getattr (conn , uri_info ["service" ]), uri_info ["method" ])
276363
277364 except ERROR_BASE as e :
278365 raise e
279366 except Exception as e :
280- raise ERROR_GRPC_CONFIGURATION (endpoint = uri_info .get ('endpoint' ),
281- service = uri_info .get ('service' ),
282- method = uri_info .get ('method' ))
367+ raise ERROR_GRPC_CONFIGURATION (
368+ endpoint = uri_info .get ("endpoint" ),
369+ service = uri_info .get ("service" ),
370+ method = uri_info .get ("method" ),
371+ )
0 commit comments