Skip to content

Commit 5e9c82a

Browse files
Process manager generate job ids (geopython#1209)
* Modified process manager to generate job id * Continued to refactor in order to let process manager generate job id * fix syntax error * reorder imports
1 parent 7c69937 commit 5e9c82a

File tree

3 files changed

+27
-20
lines changed

3 files changed

+27
-20
lines changed

pygeoapi/api.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import re
4949
from typing import Any, Tuple, Union, Optional
5050
import urllib.parse
51-
import uuid
5251

5352
from dateutil.parser import parse as dateparse
5453
from pygeofilter.parsers.ecql import parse as parse_ecql_text
@@ -3545,11 +3544,6 @@ def execute_process(self, request: Union[APIRequest, Any],
35453544
data_dict = data.get('inputs', {})
35463545
LOGGER.debug(data_dict)
35473546

3548-
job_id = data.get("job_id", str(uuid.uuid1()))
3549-
url = f"{self.base_url}/jobs/{job_id}"
3550-
3551-
headers['Location'] = url
3552-
35533547
is_async = data.get('mode', 'auto') == 'async'
35543548
if is_async:
35553549
LOGGER.debug('Asynchronous request mode detected')
@@ -3560,8 +3554,9 @@ def execute_process(self, request: Union[APIRequest, Any],
35603554

35613555
try:
35623556
LOGGER.debug('Executing process')
3563-
mime_type, outputs, status = self.manager.execute_process(
3564-
process, job_id, data_dict, is_async)
3557+
job_id, mime_type, outputs, status = self.manager.execute_process(
3558+
process, data_dict, is_async)
3559+
headers['Location'] = f"{self.base_url}/jobs/{job_id}"
35653560
except ProcessorExecuteError as err:
35663561
LOGGER.error(err)
35673562
msg = 'Processing error'

pygeoapi/process/manager/base.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@
3333
from multiprocessing import dummy
3434
from pathlib import Path
3535
from typing import Any, Tuple
36+
import uuid
3637

37-
from pygeoapi.util import DATETIME_FORMAT, JobStatus
3838
from pygeoapi.process.base import BaseProcessor
39+
from pygeoapi.util import DATETIME_FORMAT, JobStatus
3940

4041
LOGGER = logging.getLogger(__name__)
4142

@@ -129,7 +130,7 @@ def delete_job(self, job_id: str) -> bool:
129130
raise NotImplementedError()
130131

131132
def _execute_handler_async(self, p: BaseProcessor, job_id: str,
132-
data_dict: dict) -> Tuple[None, JobStatus]:
133+
data_dict: dict) -> Tuple[str, None, JobStatus]:
133134
"""
134135
This private execution handler executes a process in a background
135136
thread using `multiprocessing.dummy`
@@ -259,24 +260,30 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
259260

260261
return jfmt, outputs, current_status
261262

262-
def execute_process(self, p, job_id, data_dict, is_async=False):
263+
def execute_process(
264+
self,
265+
p,
266+
data_dict,
267+
is_async=False
268+
) -> Tuple[str, str, Any, JobStatus]:
263269
"""
264270
Default process execution handler
265271
266272
:param p: `pygeoapi.process` object
267-
:param job_id: job identifier
268273
:param data_dict: `dict` of data parameters
269274
:param is_async: `bool` specifying sync or async processing.
270275
271-
:returns: tuple of MIME type, response payload and status
276+
:returns: tuple of job_id, MIME type, response payload and status
272277
"""
273278

279+
job_id = str(uuid.uuid1())
274280
if not is_async:
275281
LOGGER.debug('Synchronous execution')
276-
return self._execute_handler_sync(p, job_id, data_dict)
282+
result = self._execute_handler_sync(p, job_id, data_dict)
277283
else:
278284
LOGGER.debug('Asynchronous execution')
279-
return self._execute_handler_async(p, job_id, data_dict)
285+
result = self._execute_handler_async(p, job_id, data_dict)
286+
return (job_id, *result)
280287

281288
def __repr__(self):
282289
return f'<BaseManager> {self.name}'

pygeoapi/process/manager/dummy.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import logging
3131
from typing import Any, Tuple
32+
import uuid
3233

3334
from pygeoapi.process.base import BaseProcessor
3435
from pygeoapi.process.manager.base import BaseManager
@@ -63,17 +64,20 @@ def get_jobs(self, status: JobStatus = None) -> list:
6364

6465
return []
6566

66-
def execute_process(self, p: BaseProcessor, job_id: str, data_dict: dict,
67-
is_async: bool = False) -> Tuple[str, Any, int]:
67+
def execute_process(
68+
self,
69+
p: BaseProcessor,
70+
data_dict: dict,
71+
is_async: bool = False
72+
) -> Tuple[str, str, Any, JobStatus]:
6873
"""
6974
Default process execution handler
7075
7176
:param p: `pygeoapi.process` object
72-
:param job_id: job identifier
7377
:param data_dict: `dict` of data parameters
7478
:param is_async: `bool` specifying sync or async processing.
7579
76-
:returns: tuple of MIME type, response payload and status
80+
:returns: tuple of job_id, MIME type, response payload and status
7781
"""
7882

7983
jfmt = 'application/json'
@@ -93,7 +97,8 @@ def execute_process(self, p: BaseProcessor, job_id: str, data_dict: dict,
9397
current_status = JobStatus.failed
9498
LOGGER.error(err)
9599

96-
return jfmt, outputs, current_status
100+
job_id = str(uuid.uuid1())
101+
return job_id, jfmt, outputs, current_status
97102

98103
def __repr__(self):
99104
return f'<DummyManager> {self.name}'

0 commit comments

Comments
 (0)