Skip to content

Commit f1d2eb2

Browse files
committed
init async work need cleaning
1 parent 501546d commit f1d2eb2

File tree

6 files changed

+255
-10
lines changed

6 files changed

+255
-10
lines changed

demo/python/async_ng/bo.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from iop import BusinessOperation
2+
from msg import MyMessage
3+
4+
import time
5+
6+
class MyAsyncNGBO(BusinessOperation):
7+
def on_message(self, request):
8+
print(f"Received message: {request.message}")
9+
time.sleep(1)
10+
return MyMessage(message=f"Hello, {request.message}")
11+
12+

demo/python/async_ng/bp.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import asyncio
2+
import sys,os
3+
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..', 'src')))
4+
from iop import BusinessProcess
5+
from msg import MyMessage
6+
7+
8+
class MyAsyncNGBP(BusinessProcess):
9+
10+
def on_message(self, request):
11+
import time
12+
start_time = time.time()
13+
results = asyncio.run(self.await_response(request))
14+
end_time = time.time()
15+
print(f"Time taken: {end_time - start_time} seconds")
16+
self.log_info(f"Time taken: {end_time - start_time} seconds")
17+
for result in results:
18+
print(f"Received response: {result.message}")
19+
20+
async def await_response(self, request):
21+
msg_one = MyMessage(message="Message1")
22+
msg_two = MyMessage(message="Message2")
23+
24+
# use asyncio.gather to send multiple requests asynchronously
25+
# using the send_request_async_ng method
26+
tasks = [self.send_request_async_ng("Python.MyAsyncNGBO", msg_one, timeout=5),
27+
self.send_request_async_ng("Python.MyAsyncNGBO", msg_two, timeout=-1)]
28+
29+
return await asyncio.gather(*tasks)
30+
31+
if __name__ == "__main__":
32+
bp = MyAsyncNGBP()
33+
bp.on_message(None)

demo/python/async_ng/msg.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from grongier.pex import Message
2+
from dataclasses import dataclass
3+
4+
@dataclass
5+
class MyMessage(Message):
6+
message : str = None

demo/python/async_ng/settings.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from bo import MyAsyncNGBO
2+
from bp import MyAsyncNGBP
3+
4+
CLASSES = {
5+
"Python.MyAsyncNGBO": MyAsyncNGBO,
6+
"Python.MyAsyncNGBP": MyAsyncNGBP,
7+
}

src/iop/_business_host.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import datetime
23
import pickle
34
import codecs
@@ -133,7 +134,6 @@ def send_request_sync(self, target, request, timeout=-1, description=None):
133134
Raises:
134135
TypeError: if request is not of type Message or IRISObject.
135136
"""
136-
137137
return self.iris_handle.dispatchSendRequestSync(target,request,timeout,description)
138138

139139
@input_serialzer_param(1,'request')
@@ -151,6 +151,17 @@ def send_request_async(self, target, request, description=None):
151151
"""
152152

153153
return self.iris_handle.dispatchSendRequestAsync(target,request,description)
154+
155+
async def send_request_async_ng(self, target, request, timeout=-1, description=None):
156+
""" Send the specified message to the target business process or business operation asynchronously.
157+
Parameters:
158+
target: a string that specifies the name of the business process or operation to receive the request.
159+
The target is the name of the component as specified in the Item Name property in the production definition, not the class name of the component.
160+
request: specifies the message to send to the target. The request is an instance of IRISObject or of a subclass of Message.
161+
If the target is a built-in ObjectScript component, you should use the IRISObject class. The IRISObject class enables the PEX framework to convert the message to a class supported by the target.
162+
description: an optional string parameter that sets a description property in the message header. The default is None.
163+
"""
164+
return await _send_request_async_ng(target, request, timeout ,description, self)
154165

155166
def send_multi_request_sync(self, target_request:list, timeout=-1, description=None)->list:
156167
""" Send the specified list of tuple (target,request) to business process or business operation synchronously.
@@ -550,3 +561,62 @@ def object_hook(self, obj):
550561
else:
551562
ret[key] = value
552563
return ret
564+
565+
class _send_request_async_ng(asyncio.Future):
566+
567+
_message_header_id = 0
568+
_queue_name = ""
569+
_end_time = 0
570+
_response = None
571+
_done = False
572+
573+
def __init__(self, target, request, timeout=-1, description=None, host=None):
574+
super().__init__()
575+
self.target = target
576+
self.request = request
577+
self.timeout = timeout
578+
self.description = description
579+
self.host = host
580+
self._iris_handle = host.iris_handle
581+
asyncio.create_task(self.send())
582+
583+
async def send_async(self):
584+
# Call the synchronous function
585+
self._iris_handle.dispatchSendRequestAsyncNG(self.target, self.host._dispatch_serializer(self.request), self.timeout, self.description)
586+
587+
# Periodically check if the request is done
588+
while not self._iris_handle.dispatchIsRequestDone():
589+
await asyncio.sleep(0.1) # Adjust the sleep duration as needed
590+
591+
# Set the result of the Future
592+
self.set_result("Request sent and completed")
593+
594+
async def send(self):
595+
message_header_id = iris.ref()
596+
queue_name = iris.ref()
597+
end_time = iris.ref()
598+
request = self.host._dispatch_serializer(self.request)
599+
self._iris_handle.dispatchSendRequestAsyncNG(self.target, request, self.timeout, self.description, message_header_id, queue_name, end_time)
600+
self._message_header_id = message_header_id.value
601+
self._queue_name = queue_name.value
602+
self._end_time = end_time.value
603+
604+
while not self._done:
605+
await asyncio.sleep(0.1)
606+
self.done()
607+
608+
self.set_result(self._response)
609+
610+
def done(self):
611+
response = iris.ref()
612+
status = self._iris_handle.dispatchIsRequestDone(self.timeout, self._end_time, self._queue_name, self._message_header_id, response)
613+
self._response = self.host._dispatch_deserializer(response.value)
614+
if status == 1 and self._response is not None:
615+
self._done = True
616+
617+
# def __await__(self):
618+
# if not self._done:
619+
# self.done()
620+
# yield self
621+
# else:
622+
# return self._response

src/iop/cls/IOP/Common.cls

Lines changed: 126 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,12 @@ Method SetPropertyValues()
139139
}
140140

141141
Method dispatchSendRequestSync(
142-
target,
143-
request,
142+
pTarget,
143+
pRequest,
144144
timeout,
145-
description) As %String
145+
pDescription) As %String
146146
{
147-
set tSC = ..SendRequestSync(target,request,.objResponse,timeout,description)
147+
set tSC = ..SendRequestSync(pTarget,pRequest,.objResponse,timeout,pDescription)
148148
if $$$ISERR(tSC) throw ##class(%Exception.StatusException).CreateFromStatus(tSC)
149149
quit $g(objResponse)
150150
}
@@ -162,7 +162,7 @@ Method dispatchSendRequestSyncMultiple(
162162

163163
set tSC = ..SendRequestSyncMultiple(.tCallStructList,pTimeout)
164164
if $$$ISERR(tSC) throw ##class(%Exception.StatusException).CreateFromStatus(tSC)
165-
165+
166166
// Convert multidimensional array to Python list
167167
set tResponseList = builtins.list()
168168

@@ -173,11 +173,11 @@ Method dispatchSendRequestSyncMultiple(
173173
}
174174

175175
Method dispatchSendRequestAsync(
176-
target,
177-
request,
178-
description)
176+
pTarget,
177+
pRequest,
178+
pDescription)
179179
{
180-
set tSC = ..SendRequestAsync(target,request,description)
180+
set tSC = ..SendRequestAsync(pTarget,pRequest,pDescription)
181181
if $$$ISERR(tSC) throw ##class(%Exception.StatusException).CreateFromStatus(tSC)
182182
quit
183183
}
@@ -223,4 +223,121 @@ ClassMethod OnGetConnections(
223223
quit
224224
}
225225

226+
Method dispatchSendRequestAsyncNGTest(
227+
pTarget,
228+
pRequest,
229+
pTimeout,
230+
pDescription,
231+
ByRef pMessageHeaderId,
232+
ByRef pQueueName,
233+
ByRef pEndTime) As %String
234+
{
235+
Set tTargetConfigName = $get($$$DispatchNameToConfigName(pTarget)) Quit:""=tTargetConfigName $$$EnsError($$$EnsErrBusinessDispatchNameNotRegistered,pTarget)
236+
Set tTargetBusinessClass=$$$ConfigClassName(tTargetConfigName)
237+
Set tPriority=$$$eMessagePriorityAsync
238+
239+
Set tSC=$classmethod(tTargetBusinessClass,"acceptRequestAsync",..%ConfigName,tTargetConfigName,pRequest,tPriority,$$$queueSyncCallQueueName,..%SessionId,"",.tRequestHeader,pDescription,..%SuperSession)
240+
241+
Quit tSC
242+
}
243+
244+
Method dispatchSendRequestAsyncNG(
245+
pTarget,
246+
pRequest,
247+
pTimeout,
248+
pDescription,
249+
ByRef pMessageHeaderId,
250+
ByRef pQueueName,
251+
ByRef pEndTime) As %String
252+
{
253+
set tSC=$$$OK, tResponse=$$$NULLOREF
254+
try {
255+
256+
set tTargetDispatchName=pTarget
257+
set tTargetConfigName=$get($$$DispatchNameToConfigName(pTarget))
258+
if tTargetConfigName="" set tSC=$$$EnsError($$$EnsErrBusinessDispatchNameNotRegistered,tTargetDispatchName) quit
259+
set tTargetBusinessClass = $$$ConfigClassName(tTargetConfigName)
260+
set tINVOCATION=$classmethod(tTargetBusinessClass,"%GetParameter","INVOCATION")
261+
if (tINVOCATION'="Queue")&&(tINVOCATION'="InProc") set tSC=$$$ERROR($$$EnsErrParameterInvocationInvalid,tTargetBusinessClass) quit
262+
263+
quit:$$$ISERR(tSC)
264+
;
265+
set tStartTime=$zh
266+
set:pTimeout'=-1 tEndTime=$zh+pTimeout
267+
268+
if tINVOCATION="InProc" {
269+
set tTimeout=$s(pTimeout=-1:-1,1:tEndTime-$zh)
270+
if (pTimeout'=-1)&&(tTimeout<0) quit
271+
set tSC=..SendRequestSync(tTargetConfigName,pRequest,.tResponse,tTimeout,pDescription)
272+
return tResponse
273+
} elseif tINVOCATION="Queue" {
274+
Set tSessionId=..%SessionId
275+
Set tSuperSession = ..%SuperSession
276+
Set tSC = ##class(Ens.MessageHeader).NewRequestMessage(.tRequestHeader,pRequest,.tSessionId,.tSuperSession) quit:$$$ISERR(tSC)
277+
Set ..%SessionId=tSessionId
278+
Set ..%SuperSession=tSuperSession
279+
Set tRequestHeader.SourceConfigName = ..%ConfigName
280+
Set tRequestHeader.TargetConfigName = tTargetConfigName
281+
Set tRequestHeader.SourceBusinessType = $$$ConfigBusinessType($$$DispatchNameToConfigName(..%ConfigName))
282+
Set tRequestHeader.TargetBusinessType = $$$ConfigBusinessType($$$DispatchNameToConfigName(tTargetConfigName))
283+
Set tRequestHeader.TargetQueueName = $$$getConfigQueueName($$$DispatchNameToConfigName(tTargetConfigName),..%SessionId)
284+
Set tRequestHeader.ReturnQueueName = $$$queueSyncCallQueueName
285+
Set tRequestHeader.BusinessProcessId = ""
286+
Set tRequestHeader.Priority = $$$eMessagePriorityAsync
287+
Set tRequestHeader.Description = pDescription
288+
Set tSC = ##class(Ens.Queue).Create($$$queueSyncCallQueueName) quit:$$$ISERR(tSC)
289+
Set tSC = ##class(Ens.Queue).EnQueue(tRequestHeader) quit:$$$ISERR(tSC)
290+
Set pMessageHeaderId = tRequestHeader.MessageId()
291+
Set pQueueName = $$$queueSyncCallQueueName
292+
Set:(pTimeout'=-1) pEndTime = tEndTime
293+
}
294+
}
295+
catch {
296+
set tSC = $$$EnsSystemError
297+
}
298+
quit tSC
299+
}
300+
301+
Method dispatchIsRequestDone(
302+
pTimeout,
303+
pEndTime,
304+
pQueueName,
305+
pMessageHeaderId,
306+
ByRef pResponse) As %Status
307+
{
308+
set tSC=$$$OK
309+
try {
310+
set tTimeout=$s(pTimeout=-1:-1,1:pEndTime-$zh) if (pTimeout'=-1)&&(tTimeout<0) quit
311+
set tSC = ##class(Ens.Queue).DeQueue($$$queueSyncCallQueueName,.tResponseHeader,tTimeout,.tIsTimedOut,0) Quit:$$$ISERR(tSC)
312+
quit:tIsTimedOut
313+
314+
set tFound = $select(tResponseHeader.CorrespondingMessageId: pMessageHeaderId=tResponseHeader.CorrespondingMessageId, 1: 0)
315+
if tFound=0 {
316+
$$$sysTRACE("Out-of-band message '"_tResponseHeader.%Id()_"' discarded")
317+
do tResponseHeader.SetStatus($$$eMessageStatusDiscarded)
318+
quit
319+
}
320+
if tResponseHeader.IsError {
321+
do tResponseHeader.SetStatus($$$eMessageStatusCompleted)
322+
set tSC = $$$EnsError($$$EnsErrGeneral,"Error message received: "_tResponseHeader.ErrorText)
323+
quit
324+
}
325+
if tResponseHeader.MessageBodyClassName'="" {
326+
set tResponse = $classmethod(tResponseHeader.MessageBodyClassName,"%OpenId",tResponseHeader.MessageBodyId,,.tSC)
327+
if '$IsObject(tResponse) Set tSC=$$$EnsError($$$EnsErrGeneral,"Could not open MessageBody "_tResponseHeader.MessageBodyId_" for MessageHeader #"_tResponseHeader.%Id()_" with body class "_tResponseHeader.MessageBodyClassName_":"_$$$StatusDisplayString(tSC)) Quit
328+
} else {
329+
set tResponse=$$$NULLOREF
330+
}
331+
set pResponse=tResponse
332+
do tResponseHeader.SetStatus($$$eMessageStatusCompleted)
333+
334+
335+
Set tSC2 = ##class(Ens.Queue).Delete($$$queueSyncCallQueueName,"*") quit:$$$ISERR(tSC2)
336+
}
337+
catch {
338+
set tSC = $$$EnsSystemError
339+
}
340+
quit tSC
341+
}
342+
226343
}

0 commit comments

Comments
 (0)