diff --git a/README.md b/README.md index b634e6ab7..6539ac8aa 100644 --- a/README.md +++ b/README.md @@ -83,14 +83,17 @@ python demo.py 4. Run via web browser - `http://localhost/pywps/?service=WPS&request=GetCapabilities&version=1.0.0` + `http://localhost/pywps/wps?service=WPS&request=GetCapabilities&version=1.0.0` 5. Run in command line: ```bash - curl 'http://localhost/pywps/?service=WPS&request=GetCapabilities&version=1.0.0' + curl 'http://localhost/pywps/wps?service=WPS&request=GetCapabilities&version=1.0.0' ``` +# Notes + +Pywps know add `/wps` at the end of server url as default endpoint for wps. It also use several other endpoint such as `/api`. # Issues diff --git a/default-sample.cfg b/default-sample.cfg index 6c4fb673d..bbe5a2542 100644 --- a/default-sample.cfg +++ b/default-sample.cfg @@ -25,7 +25,8 @@ contact_role=pointOfContact [server] maxsingleinputsize=1mb maxrequestsize=3mb -url=http://localhost:5000/wps +# Base URL, now pywps have several end point such as /wps or /api that will be added to base url +url=http://localhost:5000/ outputurl=http://localhost:5000/outputs/ outputpath=outputs workdir=workdir diff --git a/pywps/app/Process.py b/pywps/app/Process.py index 2df9311ac..29078b7f7 100644 --- a/pywps/app/Process.py +++ b/pywps/app/Process.py @@ -8,13 +8,12 @@ from pywps.translations import lower_case_dict import sys import traceback -import json import shutil +import copy +import tempfile from pywps import dblog -from pywps.response import get_response from pywps.response.status import WPS_STATUS -from pywps.response.execute import ExecuteResponse from pywps.app.WPSRequest import WPSRequest from pywps.inout.inputs import input_from_json from pywps.inout.outputs import output_from_json @@ -23,10 +22,12 @@ ServerBusy, NoApplicableCode, InvalidParameterValue) from pywps.app.exceptions import ProcessError -from pywps.inout.storage.builder import StorageBuilder +from pywps.inout.storage import new_storage from pywps.inout.outputs import ComplexOutput import importlib +from pywps import configuration + LOGGER = logging.getLogger("PYWPS") @@ -70,13 +71,9 @@ def __init__(self, handler, identifier, title, abstract='', keywords=None, profi self.inputs = inputs if inputs is not None else [] self.outputs = outputs if outputs is not None else [] self.uuid = None - self._status_store = None - # self.status_location = '' - # self.status_url = '' self.workdir = None self._grass_mapset = None self.grass_location = grass_location - self.service = None self.translations = lower_case_dict(translations) if store_supported: @@ -123,32 +120,18 @@ def from_json(cls, value): new_process.set_workdir(value['workdir']) return new_process - def execute(self, wps_request, uuid): - self._set_uuid(uuid) - self._setup_status_storage() - self.async_ = False - response_cls = get_response("execute") - wps_response = response_cls(wps_request, process=self, uuid=self.uuid) - - LOGGER.debug('Check if status storage and updating are supported by this process') - if wps_request.store_execute == 'true': - if self.store_supported != 'true': - raise StorageNotSupported('Process does not support the storing of the execute response') - - if wps_request.status == 'true': - if self.status_supported != 'true': - raise OperationNotSupported('Process does not support the updating of status') - - wps_response.store_status_file = True - self.async_ = True - else: - wps_response.store_status_file = False - - LOGGER.debug('Check if updating of status is not required then no need to spawn a process') - - wps_response = self._execute_process(self.async_, wps_request, wps_response) - - return wps_response + def new_instance(self, wps_request: WPSRequest): + """Generate a new instance of that process with a new temporary directory""" + # make deep copy of the process instance + # so that processes are not overriding each other + # just for execute + process = copy.deepcopy(self) + process.setup_outputs_from_wps_request(wps_request) + workdir = os.path.abspath(config.get_config_value('server', 'workdir')) + tempdir = tempfile.mkdtemp(prefix='pywps_process_', dir=workdir) + process.set_workdir(tempdir) + process._set_uuid(wps_request.uuid) + return process def _set_uuid(self, uuid): """Set uuid and status location path and url @@ -161,18 +144,10 @@ def _set_uuid(self, uuid): for outpt in self.outputs: outpt.uuid = uuid - def _setup_status_storage(self): - self._status_store = StorageBuilder.buildStorage() - - @property - def status_store(self): - if self._status_store is None: - self._setup_status_storage() - return self._status_store - @property def status_location(self): - return self.status_store.location(self.status_filename) + base_url = configuration.get_config_value('server', 'url').rstrip('/') + return f'{base_url}/status?uuid={self.uuid}' @property def status_filename(self): @@ -180,153 +155,24 @@ def status_filename(self): @property def status_url(self): - return self.status_store.url(self.status_filename) - - def _execute_process(self, async_, wps_request, wps_response): - """Uses :module:`pywps.processing` module for sending process to - background BUT first, check for maxprocesses configuration value - - :param async_: run in asynchronous mode - :return: wps_response or None - """ - - maxparallel = int(config.get_config_value('server', 'parallelprocesses')) - - running, stored = dblog.get_process_counts() - - if maxparallel != -1 and running >= maxparallel: - # Try to check for crashed process - dblog.cleanup_crashed_process() - running, stored = dblog.get_process_counts() - - # async - if async_: - - # run immedietly - LOGGER.debug("Running processes: {} of {} allowed parallelprocesses".format(running, maxparallel)) - LOGGER.debug("Stored processes: {}".format(stored)) - - if running < maxparallel or maxparallel == -1: - wps_response._update_status(WPS_STATUS.ACCEPTED, "PyWPS Request accepted", 0) - LOGGER.debug("Accepted request {}".format(self.uuid)) - self._run_async(wps_request, wps_response) - - # try to store for later usage - else: - maxprocesses = int(config.get_config_value('server', 'maxprocesses')) - if stored >= maxprocesses and maxprocesses != -1: - raise ServerBusy('Maximum number of processes in queue reached. Please try later.') - LOGGER.debug("Store process in job queue, uuid={}".format(self.uuid)) - dblog.store_process(self.uuid, wps_request) - wps_response._update_status(WPS_STATUS.ACCEPTED, 'PyWPS Process stored in job queue', 0) - - # not async - else: - if running >= maxparallel and maxparallel != -1: - raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') - wps_response._update_status(WPS_STATUS.ACCEPTED, "PyWPS Request accepted", 0) - wps_response = self._run_process(wps_request, wps_response) - - return wps_response - - # This function may not raise exception and must return a valid wps_response - # Failure must be reported as wps_response.status = WPS_STATUS.FAILED - def _run_async(self, wps_request, wps_response): - import pywps.processing - process = pywps.processing.Process( - process=self, - wps_request=wps_request, - wps_response=wps_response) - LOGGER.debug("Starting process for request: {}".format(self.uuid)) - process.start() - - # This function may not raise exception and must return a valid wps_response - # Failure must be reported as wps_response.status = WPS_STATUS.FAILED - def _run_process(self, wps_request, wps_response): - LOGGER.debug("Started processing request: {} with pid: {}".format(self.uuid, os.getpid())) - # Update the actual pid of current process to check if failed latter - dblog.update_pid(self.uuid, os.getpid()) - try: - self._set_grass(wps_request) - # if required set HOME to the current working directory. - if config.get_config_value('server', 'sethomedir') is True: - os.environ['HOME'] = self.workdir - LOGGER.info('Setting HOME to current working directory: {}'.format(os.environ['HOME'])) - LOGGER.debug('ProcessID={}, HOME={}'.format(self.uuid, os.environ.get('HOME'))) - wps_response._update_status(WPS_STATUS.STARTED, 'PyWPS Process started', 0) - self.handler(wps_request, wps_response) # the user must update the wps_response. - # Ensure process termination - if wps_response.status != WPS_STATUS.SUCCEEDED and wps_response.status != WPS_STATUS.FAILED: - # if (not wps_response.status_percentage) or (wps_response.status_percentage != 100): - LOGGER.debug('Updating process status to 100% if everything went correctly') - wps_response._update_status(WPS_STATUS.SUCCEEDED, f'PyWPS Process {self.title} finished', 100) - except Exception as e: - traceback.print_exc() - LOGGER.debug('Retrieving file and line number where exception occurred') - exc_type, exc_obj, exc_tb = sys.exc_info() - found = False - while not found: - # search for the _handler method - m_name = exc_tb.tb_frame.f_code.co_name - if m_name == '_handler': - found = True - else: - if exc_tb.tb_next is not None: - exc_tb = exc_tb.tb_next - else: - # if not found then take the first - exc_tb = sys.exc_info()[2] - break - fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] - method_name = exc_tb.tb_frame.f_code.co_name - - # update the process status to display process failed - - msg = 'Process error: method={}.{}, line={}, msg={}'.format(fname, method_name, exc_tb.tb_lineno, e) - LOGGER.error(msg) - # In case of a ProcessError use the validated exception message. - if isinstance(e, ProcessError): - msg = "Process error: {}".format(e) - # Only in debug mode we use the log message including the traceback ... - elif config.get_config_value("logging", "level") != "DEBUG": - # ... otherwise we use a sparse common error message. - msg = 'Process failed, please check server error log' - wps_response._update_status(WPS_STATUS.FAILED, msg, 100) - - finally: - # The run of the next pending request if finished here, weather or not it successful - self.launch_next_process() + return self.status_location + def run_process(self, wps_request, wps_response): + self._set_grass(wps_request) + # if required set HOME to the current working directory. + if config.get_config_value('server', 'sethomedir') is True: + os.environ['HOME'] = self.workdir + LOGGER.info('Setting HOME to current working directory: {}'.format(os.environ['HOME'])) + LOGGER.debug('ProcessID={}, HOME={}'.format(self.uuid, os.environ.get('HOME'))) + wps_response._update_status(WPS_STATUS.STARTED, 'PyWPS Process started', 0) + self.handler(wps_request, wps_response) # the user must update the wps_response. + # Ensure process termination + if wps_response.status != WPS_STATUS.SUCCEEDED and wps_response.status != WPS_STATUS.FAILED: + # if (not wps_response.status_percentage) or (wps_response.status_percentage != 100): + LOGGER.debug('Updating process status to 100% if everything went correctly') + wps_response._update_status(WPS_STATUS.SUCCEEDED, f'PyWPS Process {self.title} finished', 100) return wps_response - def launch_next_process(self): - """Look at the queue of async process, if the queue is not empty launch the next pending request. - """ - try: - LOGGER.debug("Checking for stored requests") - - stored_request = dblog.pop_first_stored() - if not stored_request: - LOGGER.debug("No stored request found") - return - - (uuid, request_json) = (stored_request.uuid, stored_request.request) - request_json = request_json.decode('utf-8') - LOGGER.debug("Launching the stored request {}".format(str(uuid))) - new_wps_request = WPSRequest() - new_wps_request.json = json.loads(request_json) - process_identifier = new_wps_request.identifier - process = self.service.prepare_process_for_execution(process_identifier) - process._set_uuid(uuid) - process._setup_status_storage() - process.async_ = True - process.setup_outputs_from_wps_request(new_wps_request) - new_wps_response = ExecuteResponse(new_wps_request, process=process, uuid=uuid) - new_wps_response.store_status_file = True - process._run_async(new_wps_request, new_wps_response) - except Exception as e: - LOGGER.exception("Could not run stored process. {}".format(e)) - def clean(self): """Clean the process working dir and other temporary files """ @@ -356,6 +202,22 @@ def set_workdir(self, workdir): for outpt in self.outputs: outpt.workdir = workdir + def is_async(self, wps_request: WPSRequest): + """Check and return if the request is async + Raise Exception if the request is not compatible with the process + """ + wps_request.is_async = False + if wps_request.store_execute == 'true': + if self.store_supported != 'true': + raise StorageNotSupported('Process does not support the storing of the execute response') + + if wps_request.status == 'true': + if self.status_supported != 'true': + raise OperationNotSupported('Process does not support the updating of status') + + return True + return False + def _set_grass(self, wps_request): """Handle given grass_location parameter of the constructor diff --git a/pywps/app/Service.py b/pywps/app/Service.py index 4799de965..128e386df 100755 --- a/pywps/app/Service.py +++ b/pywps/app/Service.py @@ -7,29 +7,54 @@ import tempfile from typing import Sequence, Optional, Dict +from .basic import select_response_mimetype + from werkzeug.exceptions import HTTPException from werkzeug.wrappers import Request, Response from urllib.parse import urlparse + +from pywps.response.status import StatusResponse + +import pywps from pywps.app.WPSRequest import WPSRequest +from pywps.app.Process import Process import pywps.configuration as config -from pywps.exceptions import MissingParameterValue, NoApplicableCode, InvalidParameterValue, FileSizeExceeded, \ - StorageNotSupported, FileURLNotSupported + from pywps.inout.inputs import ComplexInput, LiteralInput, BoundingBoxInput from pywps.dblog import log_request, store_status -from pywps import response +from pywps.inout.storage import get_storage_instance from pywps.response.status import WPS_STATUS +from pywps.response.execute import ExecuteResponse +from pywps.response.describe import DescribeResponse +from pywps.response.capabilities import CapabilitiesResponse +from pywps import dblog +from pywps.exceptions import (StorageNotSupported, OperationNotSupported, MissingParameterValue, FileURLNotSupported, + ServerBusy, NoApplicableCode, + InvalidParameterValue) +from pywps.app.exceptions import ProcessError +import json from collections import deque, OrderedDict import os +import re import sys -import uuid import copy import shutil - +import traceback LOGGER = logging.getLogger("PYWPS") +# Handle one request +class ServiceInstance(object): + def __init__(self, service, process): + self.service = service + self.process = process + + def _run_process(self, wps_request, wps_response): + self.service._run_process(self.process, wps_request, wps_response) + + class Service(object): """ The top-level object that represents a WPS service. It's a WSGI @@ -49,6 +74,12 @@ def __init__(self, processes: Sequence = [], cfgfiles=None, preprocessors: Optio if cfgfiles: config.load_configuration(cfgfiles) + # Maximum running processes + self.maxparallel = int(config.get_config_value('server', 'parallelprocesses')) + + # Maximum queued processes + self.maxprocesses = int(config.get_config_value('server', 'maxprocesses')) + if config.get_config_value('logging', 'file') and config.get_config_value('logging', 'level'): LOGGER.setLevel(getattr(logging, config.get_config_value('logging', 'level'))) if not LOGGER.handlers: # hasHandlers in Python 3.x @@ -60,92 +91,203 @@ def __init__(self, processes: Sequence = [], cfgfiles=None, preprocessors: Optio LOGGER.addHandler(logging.NullHandler()) def get_capabilities(self, wps_request, uuid): - - response_cls = response.get_response("capabilities") - return response_cls(wps_request, uuid, version=wps_request.version, processes=self.processes) + return CapabilitiesResponse(wps_request, uuid, version=wps_request.version, processes=self.processes) def describe(self, wps_request, uuid, identifiers): - - response_cls = response.get_response("describe") - return response_cls(wps_request, uuid, processes=self.processes, - identifiers=identifiers) - - def execute(self, identifier, wps_request, uuid): + return DescribeResponse(wps_request, uuid, processes=self.processes, identifiers=identifiers) + + # Return more or less accurate counts, if no concurrency + def _get_accurate_process_counts(self): + running, stored = dblog.get_process_counts() + if self.maxparallel != -1 and running >= self.maxparallel: + # Try to check for crashed process + dblog.cleanup_crashed_process() + running, stored = dblog.get_process_counts() + return running, stored + + def _try_run_stored_processes(self): + while self.launch_next_process(): + pass + + def execute(self, wps_request: WPSRequest): """Parse and perform Execute WPS request call :param identifier: process identifier string :param wps_request: pywps.WPSRequest structure with parsed inputs, still in memory :param uuid: string identifier of the request """ - self._set_grass() - process = self.prepare_process_for_execution(identifier) - return self._parse_and_execute(process, wps_request, uuid) - def prepare_process_for_execution(self, identifier): + LOGGER.debug('Check if the requested process exist') + process = self.processes.get(wps_request.identifier, None) + if process is None: + raise InvalidParameterValue("Unknown process '{}'".format(wps_request.identifier), 'Identifier') + + LOGGER.debug('Check if status storage and updating are supported by this process') + wps_request.is_async = process.is_async(wps_request) + + running, stored = self._get_accurate_process_counts() + LOGGER.debug("Running processes: {} of {} allowed running processes".format(running, self.maxparallel)) + LOGGER.debug("Stored processes: {} of {} allowed stored process".format(stored, self.maxprocesses)) + + if wps_request.is_async: + if running < self.maxparallel or self.maxparallel == -1: + # Run immediately + process, wps_request, wps_response = self.prepare_process_for_execution(process, wps_request, True) + wps_response._update_status(WPS_STATUS.ACCEPTED, "PyWPS Request accepted", 0) + LOGGER.debug("Accepted request {}".format(process.uuid)) + self._run_async(process, wps_request, wps_response) + return wps_response + + # try to store for later usage + else: + if stored >= self.maxprocesses and self.maxprocesses != -1: + raise ServerBusy('Maximum number of processes in queue reached. Please try later.') + LOGGER.debug("Store process in job queue, uuid={}".format(process.uuid)) + process, wps_request, wps_response = self.prepare_process_for_execution(process, wps_request, True) + dblog.store_process(wps_request) + wps_response._update_status(WPS_STATUS.ACCEPTED, 'PyWPS Process stored in job queue', 0) + return wps_response + else: + if running >= self.maxparallel and self.maxparallel != -1: + raise ServerBusy('Maximum number of parallel running processes reached. Please try later.') + + process, wps_request, wps_response = self.prepare_process_for_execution(process, wps_request, True) + wps_response._update_status(WPS_STATUS.ACCEPTED, "PyWPS Request accepted", 0) + wps_response = process.run_process(wps_request, wps_response) + return wps_response + + def prepare_process_for_execution(self, process: Process, wps_request: WPSRequest, fetch_inputs=False): """Prepare the process identified by ``identifier`` for execution. """ - try: - process = self.processes[identifier] - except KeyError: - raise InvalidParameterValue("Unknown process '{}'".format(identifier), 'Identifier') - # make deep copy of the process instance - # so that processes are not overriding each other - # just for execute - process = copy.deepcopy(process) - process.service = self - workdir = os.path.abspath(config.get_config_value('server', 'workdir')) - tempdir = tempfile.mkdtemp(prefix='pywps_process_', dir=workdir) - process.set_workdir(tempdir) - return process - - def _parse_and_execute(self, process, wps_request, uuid): - """Parse and execute request + self._set_grass() + process = process.new_instance(wps_request) + if fetch_inputs: + wps_request = Service._parse_request_inputs(process, wps_request) + wps_response = ExecuteResponse(wps_request, process=process, uuid=process.uuid) + # Store status file if the process is asynchronous + wps_response.store_status_file = wps_request.is_async + return process, wps_request, wps_response + + def launch_next_process(self): + """Look at the queue of async process, if the queue is not empty launch the next pending request. """ + try: + LOGGER.debug("Checking for stored requests") + + stored_request = dblog.pop_first_stored_with_limit(self.maxparallel) + if not stored_request: + LOGGER.debug("No stored request found") + return False + + (uuid, request_json) = (stored_request.uuid, stored_request.request) + request_json = request_json.decode('utf-8') + LOGGER.debug("Launching the stored request {}".format(str(uuid))) + wps_request = WPSRequest(json=json.loads(request_json)) + process = self.processes.get(wps_request.identifier, None) + if process is None: + raise InvalidParameterValue("Unknown process '{}'".format(wps_request.identifier), 'Identifier') + process, wps_request, wps_response = self.prepare_process_for_execution(process, wps_request, False) + self._run_async(process, wps_request, wps_response) + except Exception as e: + LOGGER.exception("Could not run stored process. {}".format(e)) + return False + return True + + # This function may not raise exception and must return a valid wps_response + # Failure must be reported as wps_response.status = WPS_STATUS.FAILED + def _run_async(self, process, wps_request, wps_response): + import pywps.processing + xprocess = pywps.processing.Process( + process=ServiceInstance(self, process), + wps_request=wps_request, + wps_response=wps_response) + LOGGER.debug("Starting process for request: {}".format(process.uuid)) + xprocess.start() + + # This function may not raise exception and must return a valid wps_response + # Failure must be reported as wps_response.status = WPS_STATUS.FAILED + def _run_process(self, process, wps_request, wps_response): + LOGGER.debug("Started processing request: {} with pid: {}".format(process.uuid, os.getpid())) + # Update the actual pid of current process to check if failed latter + dblog.update_pid(process.uuid, os.getpid()) + try: + wps_response = process.run_process(wps_request, wps_response) + except Exception as e: + traceback.print_exc() + LOGGER.debug('Retrieving file and line number where exception occurred') + exc_type, exc_obj, exc_tb = sys.exc_info() + found = False + while not found: + # search for the _handler method + m_name = exc_tb.tb_frame.f_code.co_name + if m_name == '_handler': + found = True + else: + if exc_tb.tb_next is not None: + exc_tb = exc_tb.tb_next + else: + # if not found then take the first + exc_tb = sys.exc_info()[2] + break + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + method_name = exc_tb.tb_frame.f_code.co_name + + # update the process status to display process failed + + msg = 'Process error: method={}.{}, line={}, msg={}'.format(fname, method_name, exc_tb.tb_lineno, e) + LOGGER.error(msg) + # In case of a ProcessError use the validated exception message. + if isinstance(e, ProcessError): + msg = "Process error: {}".format(e) + # Only in debug mode we use the log message including the traceback ... + elif config.get_config_value("logging", "level") != "DEBUG": + # ... otherwise we use a sparse common error message. + msg = 'Process failed, please check server error log' + wps_response._update_status(WPS_STATUS.FAILED, msg, 100) + + finally: + # The run of the next pending request if finished here, weather or not it successful + self._try_run_stored_processes() + return wps_response + + @staticmethod + def _parse_request_inputs(process: Process, wps_request: WPSRequest): + """Parse input data for the given process and update wps_request accordingly + """ LOGGER.debug('Checking if all mandatory inputs have been passed') data_inputs = {} for inpt in process.inputs: # Replace the dicts with the dict of Literal/Complex inputs # set the input to the type defined in the process. - request_inputs = None - if inpt.identifier in wps_request.inputs: - request_inputs = wps_request.inputs[inpt.identifier] + request_inputs = wps_request.inputs.get(inpt.identifier, None) - if not request_inputs: + if request_inputs is None: if inpt._default is not None: if not inpt.data_set and isinstance(inpt, ComplexInput): inpt._set_default_value() - data_inputs[inpt.identifier] = [inpt.clone()] else: - if isinstance(inpt, ComplexInput): - data_inputs[inpt.identifier] = self.create_complex_inputs( - inpt, request_inputs) + data_inputs[inpt.identifier] = Service.create_complex_inputs(inpt, request_inputs) elif isinstance(inpt, LiteralInput): - data_inputs[inpt.identifier] = self.create_literal_inputs( - inpt, request_inputs) + data_inputs[inpt.identifier] = Service.create_literal_inputs(inpt, request_inputs) elif isinstance(inpt, BoundingBoxInput): - data_inputs[inpt.identifier] = self.create_bbox_inputs( - inpt, request_inputs) + data_inputs[inpt.identifier] = Service.create_bbox_inputs(inpt, request_inputs) + # Check for missing inputs for inpt in process.inputs: - - if inpt.identifier not in data_inputs: - if inpt.min_occurs > 0: - LOGGER.error('Missing parameter value: {}'.format(inpt.identifier)) - raise MissingParameterValue( - inpt.identifier, inpt.identifier) + if inpt.min_occurs > 0 and inpt.identifier not in data_inputs: + LOGGER.error('Missing parameter value: {}'.format(inpt.identifier)) + raise MissingParameterValue(inpt.identifier, inpt.identifier) wps_request.inputs = data_inputs - process.setup_outputs_from_wps_request(wps_request) - - wps_response = process.execute(wps_request, uuid) - return wps_response + return wps_request - def create_complex_inputs(self, source, inputs): + @staticmethod + def create_complex_inputs(source, inputs): """Create new ComplexInput as clone of original ComplexInput because of inputs can be more than one, take it just as Prototype. @@ -184,7 +326,8 @@ def create_complex_inputs(self, source, inputs): raise MissingParameterValue(description=description, locator=source.identifier) return outinputs - def create_literal_inputs(self, source, inputs): + @staticmethod + def create_literal_inputs(source, inputs): """ Takes the http_request and parses the input to objects :return collections.deque: """ @@ -239,7 +382,8 @@ def _set_grass(self): os.putenv('PYTHONPATH', os.environ.get('PYTHONPATH')) sys.path.insert(0, python_path) - def create_bbox_inputs(self, source, inputs): + @staticmethod + def create_bbox_inputs(source, inputs): """ Takes the http_request and parses the input to objects :return collections.deque: """ @@ -263,9 +407,79 @@ def create_bbox_inputs(self, source, inputs): return outinputs + @staticmethod + def _process_files(http_request): + if http_request.method != "GET": + return Response("Method Not Allowed", status=405) + file_uuid = http_request.args.get('uuid', None) + if file_uuid is None: + raise NoApplicableCode("Invalid uuid for files request", code=500) + store = get_storage_instance(file_uuid) + if store is None: + raise NoApplicableCode("Invalid uuid for files request", code=500) + if store.mimetype is None: + raise NoApplicableCode("Invalid uuid for files request", code=500) + return Response(store.open("r"), + mimetype=store.mimetype, + headers={'Content-Disposition': f'attachment; filename="{store.pretty_filename}"'}) + + @staticmethod + def _process_status(http_request): + if http_request.method != "GET": + return Response("Method Not Allowed", status=405) + dblog.cleanup_crashed_process() + process_uuid = http_request.args.get('uuid', None) + if process_uuid is None: + raise NoApplicableCode("Invalid uuid for status request", code=500) + status = dblog.get_status_record(process_uuid) + if status is None: + raise NoApplicableCode("Invalid uuid for status request", code=500) + mimetype = select_response_mimetype(http_request.accept_mimetypes, None) + return StatusResponse(status.data, mimetype) + + def _process_wps(self, http_request): + """ + Process WPS request + Note: the WPS request may use non standard REST api, see pywps.app.basic.parse_http_url + """ + environ_cfg = http_request.environ.get('PYWPS_CFG') + if 'PYWPS_CFG' not in os.environ and environ_cfg: + LOGGER.debug('Setting PYWPS_CFG to {}'.format(environ_cfg)) + os.environ['PYWPS_CFG'] = environ_cfg + + wps_request = WPSRequest(http_request=http_request, preprocessors=self.preprocessors) + LOGGER.info('Request: {}'.format(wps_request.operation)) + if wps_request.operation in ['getcapabilities', + 'describeprocess', + 'execute']: + log_request(wps_request.uuid, wps_request) + try: + response = None + if wps_request.operation == 'getcapabilities': + response = self.get_capabilities(wps_request, wps_request.uuid) + response._update_status(WPS_STATUS.SUCCEEDED, '', 100) + + elif wps_request.operation == 'describeprocess': + response = self.describe(wps_request, wps_request.uuid, wps_request.identifiers) + response._update_status(WPS_STATUS.SUCCEEDED, '', 100) + + elif wps_request.operation == 'execute': + response = self.execute(wps_request) + return response + except Exception as e: + # This ensure that logged request get terminated in case of exception while the request is not + # accepted + store_status(wps_request.uuid, WPS_STATUS.FAILED, 'Request rejected due to exception', 100) + raise e + else: + raise RuntimeError("Unknown operation {}".format(wps_request.operation)) + # May not raise exceptions, this function must return a valid werkzeug.wrappers.Response. def call(self, http_request): + # Before running the current request try to run older async request + self._try_run_stored_processes() + try: # This try block handle Exception generated before the request is accepted. Once the request is accepted # a valid wps_reponse must exist. To report error use the wps_response using @@ -277,43 +491,21 @@ def call(self, http_request): # Exeception from CapabilityResponse and DescribeResponse are always catched by this try ... except close # because they never have status. - request_uuid = uuid.uuid1() - - environ_cfg = http_request.environ.get('PYWPS_CFG') - if 'PYWPS_CFG' not in os.environ and environ_cfg: - LOGGER.debug('Setting PYWPS_CFG to {}'.format(environ_cfg)) - os.environ['PYWPS_CFG'] = environ_cfg - - wps_request = WPSRequest(http_request, self.preprocessors) - LOGGER.info('Request: {}'.format(wps_request.operation)) - if wps_request.operation in ['getcapabilities', - 'describeprocess', - 'execute']: - log_request(request_uuid, wps_request) - try: - response = None - if wps_request.operation == 'getcapabilities': - response = self.get_capabilities(wps_request, request_uuid) - response._update_status(WPS_STATUS.SUCCEEDED, '', 100) - - elif wps_request.operation == 'describeprocess': - response = self.describe(wps_request, request_uuid, wps_request.identifiers) - response._update_status(WPS_STATUS.SUCCEEDED, '', 100) - - elif wps_request.operation == 'execute': - response = self.execute( - wps_request.identifier, - wps_request, - request_uuid - ) - return response - except Exception as e: - # This ensure that logged request get terminated in case of exception while the request is not - # accepted - store_status(request_uuid, WPS_STATUS.FAILED, 'Request rejected due to exception', 100) - raise e + p = re.compile("^/(wps|api|processes|jobs|files|status)(/.+)?$") + + m = p.match(http_request.path) + if m is None: + return Response("Not Found", status=404) + + # TODO: make sane dispatch + if m.group(1) in ['wps', 'api', 'processes', 'jobs']: + return self._process_wps(http_request) + elif m.group(1) == 'files': + return Service._process_files(http_request) + elif m.group(1) == 'status': + return Service._process_status(http_request) else: - raise RuntimeError("Unknown operation {}".format(wps_request.operation)) + return Response("Not Found", status=404) except NoApplicableCode as e: return e diff --git a/pywps/app/WPSRequest.py b/pywps/app/WPSRequest.py index 186e55408..aacb334ac 100644 --- a/pywps/app/WPSRequest.py +++ b/pywps/app/WPSRequest.py @@ -20,6 +20,7 @@ from pywps import get_version_from_ns import json +import uuid from urllib.parse import unquote LOGGER = logging.getLogger("PYWPS") @@ -28,9 +29,26 @@ class WPSRequest(object): - def __init__(self, http_request=None, preprocessors=None): - self.http_request = http_request + def __init__(self, **kwargs): + """Create a new WPS request, valid kwargs are: + - http_request: The http request used + - preprocessors: preprocessors list + - json: a json string + """ + + # TODO: Remove self.http_request because it usage is insane + self.http_request = kwargs.get("http_request", None) + + self.preprocessors = kwargs.get("preprocessors", {}) + if "json" in kwargs: + self.json = kwargs["json"] + return + + # Generate uuid if not loaded from json + self.uuid = uuid.uuid1() + + self.is_async = False self.operation = None self.version = None self.api = None @@ -48,11 +66,11 @@ def __init__(self, http_request=None, preprocessors=None): self.WPS = None self.OWS = None self.xpath_ns = None - self.preprocessors = preprocessors or dict() self.preprocess_request = None self.preprocess_response = None - if http_request: + if "http_request" in kwargs: + http_request = kwargs["http_request"] d = parse_http_url(http_request) self.operation = d.get('operation') self.identifier = d.get('identifier') @@ -60,7 +78,7 @@ def __init__(self, http_request=None, preprocessors=None): self.api = d.get('api') self.default_mimetype = d.get('default_mimetype') request_parser = self._get_request_parser_method(http_request.method) - request_parser() + request_parser(http_request) def _get_request_parser_method(self, method): @@ -71,12 +89,12 @@ def _get_request_parser_method(self, method): else: raise MethodNotAllowed() - def _get_request(self): + def _get_request(self, http_request): """HTTP GET request parser """ # service shall be WPS - service = _get_get_param(self.http_request, 'service', None if wps_strict else 'wps') + service = _get_get_param(http_request, 'service', None if wps_strict else 'wps') if service: if str(service).lower() != 'wps': raise InvalidParameterValue( @@ -84,29 +102,29 @@ def _get_request(self): else: raise MissingParameterValue('service', 'service') - self.operation = _get_get_param(self.http_request, 'request', self.operation) + self.operation = _get_get_param(http_request, 'request', self.operation) - language = _get_get_param(self.http_request, 'language') + language = _get_get_param(http_request, 'language') self.check_and_set_language(language) request_parser = self._get_request_parser(self.operation) - request_parser(self.http_request) + request_parser(http_request) - def _post_request(self): + def _post_request(self, http_request): """HTTP GET request parser """ # check if input file size was not exceeded maxsize = configuration.get_config_value('server', 'maxrequestsize') maxsize = configuration.get_size_mb(maxsize) * 1024 * 1024 - if self.http_request.content_length > maxsize: + if http_request.content_length > maxsize: raise FileSizeExceeded('File size for input exceeded.' ' Maximum request size allowed: {} megabytes'.format(maxsize / 1024 / 1024)) - content_type = self.http_request.content_type or [] # or self.http_request.mimetype + content_type = http_request.content_type or [] # or http_request.mimetype json_input = 'json' in content_type if not json_input: try: - doc = etree.fromstring(self.http_request.get_data()) + doc = etree.fromstring(http_request.get_data()) except Exception as e: raise NoApplicableCode(str(e)) operation = doc.tag @@ -120,7 +138,7 @@ def _post_request(self): request_parser(doc) else: try: - jdoc = json.loads(self.http_request.get_data()) + jdoc = json.loads(http_request.get_data()) except Exception as e: raise NoApplicableCode(str(e)) if self.identifier is not None: @@ -142,8 +160,8 @@ def _post_request(self): jdoc['default_mimetype'] = self.default_mimetype if self.preprocess_request is not None: - jdoc = self.preprocess_request(jdoc, http_request=self.http_request) - self.json = jdoc + jdoc = self.preprocess_request(jdoc, http_request=http_request) + self._from_json_request(jdoc) version = jdoc.get('version') self.set_version(version) @@ -443,6 +461,7 @@ def default(self, obj): return encoded_object obj = { + 'uuid': str(self.uuid), 'operation': self.operation, 'version': self.version, 'api': self.api, @@ -455,7 +474,8 @@ def default(self, obj): 'lineage': self.lineage, 'inputs': dict((i, [inpt.json for inpt in self.inputs[i]]) for i in self.inputs), 'outputs': self.outputs, - 'raw': self.raw + 'raw': self.raw, + 'is_async': self.is_async } return json.dumps(obj, allow_nan=False, cls=ExtendedJSONEncoder) @@ -467,6 +487,11 @@ def json(self, value): :param value: the json (not string) representation """ + self.uuid = uuid.UUID(value.get('uuid')) + self.is_async = value.get('is_async') + self._from_json_request(value) + + def _from_json_request(self, value): self.operation = value.get('operation') self.version = value.get('version') self.api = value.get('api') diff --git a/pywps/app/basic.py b/pywps/app/basic.py index f27175145..7606705ff 100644 --- a/pywps/app/basic.py +++ b/pywps/app/basic.py @@ -6,6 +6,7 @@ XML tools """ +import re import logging from typing import Optional, Tuple @@ -54,7 +55,7 @@ def get_json_indent(): return json_ident if json_ident >= 0 else None -def get_response_type(accept_mimetypes, default_mimetype) -> Tuple[bool, str]: +def select_response_mimetype(accept_mimetypes, default_mimetype) -> str: """ This function determinate if the response should be JSON or XML based on the accepted mimetypes of the request and the default mimetype provided, @@ -62,25 +63,28 @@ def get_response_type(accept_mimetypes, default_mimetype) -> Tuple[bool, str]: :param accept_mimetypes: determinate which mimetypes are accepted :param default_mimetype: "text/xml", "application/json" - :return: Tuple[bool, str] - - bool - True: The response type is JSON, False: Otherwise - XML - str - The output mimetype + :return: The selected mimetype """ - accept_json = \ - accept_mimetypes.accept_json or \ - accept_mimetypes.best is None or \ - 'json' in accept_mimetypes.best.lower() - accept_xhtml = \ - accept_mimetypes.accept_xhtml or \ - accept_mimetypes.best is None or \ - 'xml' in accept_mimetypes.best.lower() + if accept_mimetypes is not None: + accept_json = \ + accept_mimetypes.accept_json or \ + accept_mimetypes.best is None or \ + 'json' in accept_mimetypes.best.lower() + accept_xhtml = \ + accept_mimetypes.accept_xhtml or \ + accept_mimetypes.best is None or \ + 'xml' in accept_mimetypes.best.lower() + else: + # If accept_mimetype is not define, accept all response + accept_json = True + accept_xhtml = True if not default_mimetype: default_mimetype = get_default_response_mimetype() json_is_default = 'json' in default_mimetype or '*' in default_mimetype json_response = (accept_json and (not accept_xhtml or json_is_default)) or \ (json_is_default and accept_json == accept_xhtml) mimetype = 'application/json' if json_response else 'text/xml' if accept_xhtml else '' - return json_response, mimetype + return mimetype def parse_http_url(http_request) -> dict: @@ -106,42 +110,46 @@ def parse_http_url(http_request) -> dict: identifier - the process identifier output_ids - if exist then it selects raw output with the name output_ids """ - operation = api = identifier = output_ids = default_mimetype = base_url = None - if http_request: - parts = str(http_request.path[1:]).split('/') - i = 0 - if len(parts) > i: - base_url = parts[i].lower() - if base_url == 'wps': - default_mimetype = 'xml' - elif base_url in ['api', 'processes', 'jobs']: - default_mimetype = 'json' - i += 1 - if base_url == 'api': - api = parts[i] - i += 1 - if len(parts) > i: - identifier = parts[i] - i += 1 - if len(parts) > i: - output_ids = parts[i] - if not output_ids: - output_ids = None - if base_url in ['jobs', 'api']: - operation = 'execute' - elif base_url == 'processes': - operation = 'describeprocess' if identifier else 'getcapabilities' + d = {} - if operation: - d['operation'] = operation - if identifier: - d['identifier'] = identifier - if output_ids: - d['output_ids'] = output_ids - if default_mimetype: - d['default_mimetype'] = default_mimetype - if api: - d['api'] = api - if base_url: - d['base_url'] = base_url - return d + + if http_request is None: + return d + + p = re.compile("^/(wps|api|processes|jobs)(/.+)?$") + m = p.match(http_request.path) + + if m is None: + return d + + base_url = m.group(1) + if m.group(2) is not None: + args = re.findall("/([^/]+)", m.groups(2)) + else: + args = [] + + d['base_url'] = base_url + + if base_url == 'wps': + d['default_mimetype'] = 'application/xml' + return d + + if base_url == 'api': + d['operation'] = 'execute' + d['default_mimetype'] = 'application/json' + d.update(dict(zip(['api', 'identifier', 'output_ids'], args))) + return d + + if base_url == 'jobs': + d['operation'] = 'execute' + d['default_mimetype'] = 'application/json' + d.update(dict(zip(['identifier', 'output_ids'], args))) + return d + + if base_url == 'processes': + d['operation'] = 'describeprocess' if len(args) == 0 else 'getcapabilities' + d['default_mimetype'] = 'json' + d.update(dict(zip(['identifier', 'output_ids'], args))) + return d + + return dict() diff --git a/pywps/configuration.py b/pywps/configuration.py index 48ba7ccb2..af0c4315c 100755 --- a/pywps/configuration.py +++ b/pywps/configuration.py @@ -92,7 +92,7 @@ def load_configuration(cfgfiles=None): # If this flag is enabled PyWPS will remove the process temporary workdir # after process has finished. CONFIG.set('server', 'cleantempdir', 'true') - CONFIG.set('server', 'storagetype', 'file') + CONFIG.set('server', 'storagetype', 'FileStorage') # File storage outputs can be copied, moved or linked # from the workdir to the output folder. # Allowed functions: "copy", "move", "link" (default "copy") @@ -115,6 +115,8 @@ def load_configuration(cfgfiles=None): CONFIG.set('logging', 'file', '') CONFIG.set('logging', 'level', 'DEBUG') CONFIG.set('logging', 'database', 'sqlite:///:memory:') + CONFIG.set('logging', 'database_echo', 'false') + CONFIG.set('logging', 'database_filelock', 'none') CONFIG.set('logging', 'prefix', 'pywps_') CONFIG.set('logging', 'format', '%(asctime)s] [%(levelname)s] file=%(pathname)s line=%(lineno)s module=%(module)s function=%(funcName)s %(message)s') # noqa diff --git a/pywps/dblog.py b/pywps/dblog.py index 9f57503dc..f5b5fb253 100644 --- a/pywps/dblog.py +++ b/pywps/dblog.py @@ -4,7 +4,7 @@ ################################################################## """ -Implementation of logging for PyWPS-4 +Database interface for PyWPS-4 """ import logging @@ -12,6 +12,7 @@ from pywps import configuration from pywps.exceptions import NoApplicableCode + import sqlite3 import datetime import pickle @@ -27,6 +28,10 @@ from pywps.response.status import WPS_STATUS +from types import SimpleNamespace as ns + +from .util import FileLock + LOGGER = logging.getLogger('PYWPS') _SESSION_MAKER = None @@ -35,7 +40,20 @@ Base = declarative_base() -lock = Lock() +# Use custom lock scheme for the database because there is no unified database lock mechanism +_db_lock = None + + +class guard_session: + def __init__(self, func): + self.unsafe = func + + def __call__(self, *args, **kwargs): + with _get_lock(): + session = _get_session() + r = self.unsafe(session, *args, **kwargs) + session.close() + return r class ProcessInstance(Base): @@ -57,10 +75,34 @@ class RequestInstance(Base): __tablename__ = '{}stored_requests'.format(_tableprefix) uuid = Column(VARCHAR(255), primary_key=True, nullable=False) + timestamp = Column(DateTime(), nullable=False) request = Column(LargeBinary, nullable=False) -def log_request(uuid, request): +class StorageRecord(Base): + __tablename__ = '{}storage_records'.format(_tableprefix) + + uuid = Column(VARCHAR(255), primary_key=True, nullable=False) + type = Column(VARCHAR(255), nullable=False) + pretty_filename = Column(VARCHAR(255), nullable=True) + mimetype = Column(VARCHAR(255), nullable=True) + timestamp = Column(DateTime(), nullable=False) + data = Column(LargeBinary, nullable=False) + + +class StatusRecord(Base): + __tablename__ = '{}status_records'.format(_tableprefix) + + # Process uuid + uuid = Column(VARCHAR(255), primary_key=True, nullable=False) + # Time stamp for creation time + timestamp = Column(DateTime(), nullable=False) + # json data used in template + data = Column(LargeBinary, nullable=False) + + +@guard_session +def log_request(session, uuid, request): """Write OGC WPS request (only the necessary parts) to database logging system """ @@ -71,39 +113,33 @@ def log_request(uuid, request): time_start = datetime.datetime.now() identifier = _get_identifier(request) - session = get_session() request = ProcessInstance( uuid=str(uuid), pid=pid, operation=operation, version=version, time_start=time_start, identifier=identifier) session.add(request) session.commit() - session.close() # NoApplicableCode("Could commit to database: {}".format(e.message)) -def get_process_counts(): +@guard_session +def get_process_counts(session): """Returns running and stored process counts and """ - - session = get_session() stored_query = session.query(RequestInstance.uuid) running_count = ( session.query(ProcessInstance) - .filter(ProcessInstance.percent_done < 100) - .filter(ProcessInstance.percent_done > -1) - .filter(~ProcessInstance.uuid.in_(stored_query)) + .filter(ProcessInstance.status.in_([WPS_STATUS.STARTED, WPS_STATUS.PAUSED])) .count() ) stored_count = stored_query.count() - session.close() return running_count, stored_count -def pop_first_stored(): +@guard_session +def pop_first_stored(session): """Gets the first stored process and delete it from the stored_requests table """ - session = get_session() request = session.query(RequestInstance).first() if request: @@ -116,11 +152,64 @@ def pop_first_stored(): return request -def store_status(uuid, wps_status, message=None, status_percentage=None): - """Writes response to database +@guard_session +def pop_first_stored_with_limit(session, target_limit): + """Gets n first stored process to reach target_count """ - session = get_session() + # Cleanup crashed request + if sys.platform == "linux": + running = session.query(ProcessInstance) \ + .filter(ProcessInstance.status.in_([WPS_STATUS.STARTED, WPS_STATUS.PAUSED])) + + failed = [] + for uuid, pid in ((p.uuid, p.pid) for p in running): + # No process with this pid, the process has crashed + if not os.path.exists(os.path.join("/proc", str(pid))): + failed.append(uuid) + continue + + # If we can't read the environ, that mean the process belong another user + # which mean that this is not our process, thus our process has crashed + # this not work because root is the user for the apache + # if not os.access(os.path.join("/proc", str(pid), "environ"), os.R_OK): + # failed.append(uuid) + # continue + + for uuid in failed: + set_process_failed.unsafe(session, uuid) + + running = session.query(ProcessInstance) \ + .filter(ProcessInstance.status.in_([WPS_STATUS.STARTED, WPS_STATUS.PAUSED])) + + if running.count() >= target_limit: + return None + + request = session.query(RequestInstance) \ + .order_by(RequestInstance.timestamp.asc()) \ + .first() + + if request: + delete_count = session.query(RequestInstance).filter_by(uuid=request.uuid).delete() + if delete_count == 0: + LOGGER.debug("WARNING should not happen: Another thread or process took the same stored request") + request = None + + # Ensure the process is marked as started to be included in running_count + process_instance = session.query(ProcessInstance).filter_by(uuid=str(request.uuid)).one() + if process_instance: + process_instance.pid = os.getpid() + process_instance.time_end = datetime.datetime.now() + process_instance.message = 'PyWPS Process started' + process_instance.status = WPS_STATUS.STARTED + + session.commit() + return request + +@guard_session +def store_status(session, uuid, wps_status, message=None, status_percentage=None): + """Writes response to database + """ requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) if requests.count(): request = requests.one() @@ -129,28 +218,115 @@ def store_status(uuid, wps_status, message=None, status_percentage=None): request.percent_done = status_percentage request.status = wps_status session.commit() - session.close() -def update_pid(uuid, pid): +# Update or create a store instance +@guard_session +def update_storage_record(session, store_instance): + r = session.query(StorageRecord).filter_by(uuid=str(store_instance.uuid)) + if r.count(): + store_instance_record = r.one() + store_instance_record.type = store_instance.__class__.__name__ + store_instance_record.pretty_filename = store_instance.pretty_filename + store_instance_record.mimetype = store_instance.mimetype + store_instance_record.data = store_instance.dump() + else: + store_instance_record = StorageRecord( + uuid=str(store_instance.uuid), + type=store_instance.__class__.__name__, + timestamp=datetime.datetime.now(), + pretty_filename=store_instance.pretty_filename, + mimetype=store_instance.mimetype, + data=store_instance.dump() + ) + session.add(store_instance_record) + session.commit() + + +# Get store instance data from uuid +@guard_session +def get_storage_record(session, uuid): + r = session.query(StorageRecord).filter_by(uuid=str(uuid)) + if r.count(): + store_instance_record = r.one() + # Copy store_instance_record content to unlink data from session + # TODO: get dynamic list of attributes + attrs = ["uuid", "type", "timestamp", "pretty_filename", "mimetype", "data"] + store_instance_record = ns(**{k: getattr(store_instance_record, k) for k in attrs}) + store_instance_record.data = store_instance_record.data + return store_instance_record + return None + + +# Update or create a store instance +@guard_session +def update_status_record(session, uuid, data): + r = session.query(StatusRecord).filter_by(uuid=str(uuid)) + if r.count(): + status_record = r.one() + status_record.timestamp = datetime.datetime.now() + status_record.data = json.dumps(data).encode("utf-8") + else: + status_record = StatusRecord( + uuid=str(uuid), + timestamp=datetime.datetime.now(), + data=json.dumps(data).encode("utf-8") + ) + session.add(status_record) + session.commit() + + +# Get store instance data from uuid +@guard_session +def get_status_record(session, uuid): + r = session.query(StatusRecord).filter_by(uuid=str(uuid)) + if r.count(): + status_record = r.one() + # Ensure new item + # TODO: get dynamic list of attributes + attrs = ["uuid", "timestamp", "data"] + status_record = ns(**{k: getattr(status_record, k) for k in attrs}) + status_record.data = json.loads(status_record.data.decode("utf-8")) + return status_record + return None + + +@guard_session +def update_pid(session, uuid, pid): """Update actual pid for the uuid processing """ - session = get_session() - requests = session.query(ProcessInstance).filter_by(uuid=str(uuid)) if requests.count(): request = requests.one() request.pid = pid session.commit() - session.close() -def cleanup_crashed_process(): +@guard_session +def set_process_failed(session, uuid): + store_status.unsafe(session, uuid, WPS_STATUS.FAILED, "Process crashed", 100) + # Update status record + r = session.query(StatusRecord).filter_by(uuid=str(uuid)) + if r.count(): + status_record = r.one() + data = json.loads(status_record.data.decode("utf-8")) + data["status"].update({ + "status": "failed", + "code": "ProcessCrashed", + "locator": "None", + "message": "Process crashed" + }) + LOGGER.debug(str(data)) + status_record.data = json.dumps(data).encode("utf-8") + session.commit() + + +@guard_session +def cleanup_crashed_process(session): # TODO: implement other platform if sys.platform != "linux": return - session = get_session() stored_query = session.query(RequestInstance.uuid) running_cur = ( session.query(ProcessInstance) @@ -160,6 +336,7 @@ def cleanup_crashed_process(): failed = [] running = [(p.uuid, p.pid) for p in running_cur] + for uuid, pid in running: # No process with this pid, the process has crashed if not os.path.exists(os.path.join("/proc", str(pid))): @@ -175,11 +352,10 @@ def cleanup_crashed_process(): pass for uuid in failed: - store_status(uuid, WPS_STATUS.FAILED, "Process crashed", 100) - - session.close() + set_process_failed.unsafe(session, uuid) +# TODO: move this to request object. def _get_identifier(request): """Get operation identifier """ @@ -195,7 +371,19 @@ def _get_identifier(request): return None -def get_session(): +def _get_lock(): + global _db_lock + if _db_lock is None: + lock_filename = configuration.get_config_value('logging', 'database_filelock') + if lock_filename == 'none': + # Default lock work accross all forked process, but does not work with multiple process + _db_lock = Lock() + else: + _db_lock = FileLock(lock_filename) + return _db_lock + + +def _get_session(): """Get Connection for database """ LOGGER.debug('Initializing database connection') @@ -204,47 +392,40 @@ def get_session(): if _SESSION_MAKER: return _SESSION_MAKER() - with lock: - database = configuration.get_config_value('logging', 'database') - echo = True - level = configuration.get_config_value('logging', 'level') - level_name = logging.getLevelName(level) - if isinstance(level_name, int) and level_name >= logging.INFO: - echo = False - try: - if ":memory:" in database: - engine = sqlalchemy.create_engine(database, - echo=echo, - connect_args={'check_same_thread': False}, - poolclass=StaticPool) - elif database.startswith("sqlite"): - engine = sqlalchemy.create_engine(database, - echo=echo, - connect_args={'check_same_thread': False}, - poolclass=NullPool) - else: - engine = sqlalchemy.create_engine(database, echo=echo, poolclass=NullPool) - except sqlalchemy.exc.SQLAlchemyError as e: - raise NoApplicableCode("Could not connect to database: {}".format(e.message)) - - Session = sessionmaker(bind=engine) - ProcessInstance.metadata.create_all(engine) - RequestInstance.metadata.create_all(engine) - - _SESSION_MAKER = Session + database = configuration.get_config_value('logging', 'database') + echo = configuration.get_config_value('logging', 'database_echo') == 'true' + try: + if ":memory:" in database: + engine = sqlalchemy.create_engine(database, + echo=echo, + connect_args={'check_same_thread': False}, + poolclass=StaticPool) + elif database.startswith("sqlite"): + engine = sqlalchemy.create_engine(database, + echo=echo, + connect_args={'check_same_thread': False}, + poolclass=NullPool) + else: + engine = sqlalchemy.create_engine(database, echo=echo, poolclass=NullPool) + except sqlalchemy.exc.SQLAlchemyError as e: + raise NoApplicableCode("Could not connect to database: {}".format(e.message)) + + Session = sessionmaker(bind=engine) + ProcessInstance.metadata.create_all(engine) + RequestInstance.metadata.create_all(engine) + + _SESSION_MAKER = Session return _SESSION_MAKER() -def store_process(uuid, request): +@guard_session +def store_process(session, request): """Save given request under given UUID for later usage """ - - session = get_session() request_json = request.json # the BLOB type requires bytes on Python 3 request_json = request_json.encode('utf-8') - request = RequestInstance(uuid=str(uuid), request=request_json) + request = RequestInstance(uuid=str(request.uuid), request=request_json, timestamp=datetime.datetime.now()) session.add(request) session.commit() - session.close() diff --git a/pywps/exceptions.py b/pywps/exceptions.py index bc6d54d6f..dc1f633e7 100644 --- a/pywps/exceptions.py +++ b/pywps/exceptions.py @@ -22,7 +22,7 @@ import logging from pywps import __version__ -from pywps.app.basic import get_json_indent, get_response_type, parse_http_url +from pywps.app.basic import get_json_indent, select_response_mimetype, parse_http_url __author__ = "Alex Morega & Calin Ciociu" @@ -74,8 +74,8 @@ def get_response(self, environ=None): default_mimetype = None if not request else request.args.get('f', None) if default_mimetype is None: default_mimetype = parse_http_url(request).get('default_mimetype') - json_response, mimetype = get_response_type(accept_mimetypes, default_mimetype) - if json_response: + mimetype = select_response_mimetype(accept_mimetypes, default_mimetype) + if mimetype == 'application/json': doc = json.dumps(args, indent=get_json_indent()) else: doc = str(( diff --git a/pywps/inout/basic.py b/pywps/inout/basic.py index ef5e240eb..df2139d96 100644 --- a/pywps/inout/basic.py +++ b/pywps/inout/basic.py @@ -2,6 +2,7 @@ # Copyright 2018 Open Source Geospatial Foundation and others # # licensed under MIT, Please consult LICENSE.txt for details # ################################################################## +import io import json from pathlib import PurePath @@ -1111,11 +1112,19 @@ def storage(self, storage): if self._storage is None: self._storage = storage - # TODO: refactor ? - def get_url(self): - """Return URL pointing to data - """ - # TODO: it is not obvious that storing happens here - (_, _, url) = self.storage.store(self) - # url = self.storage.url(self) - return url + # Ensure the storage of handled data to be able to expose it to with url + # return the corresponding url + def ensure_storage(self): + # Copy data to the storage + with self.stream as fi: + # TODO: sanitise self.stream to always provide a binary stream. + encoding = None + if isinstance(fi, io.StringIO): + encoding = "utf-8" + with self._storage.open("w", encoding) as fo: + s = fi.read(4096) + while len(s) > 0: + fo.write(s) + s = fi.read(4096) + # TODO: For now the __exit__ is not implemented properly + return self._storage.export(self.identifier + self.data_format.extension, self.data_format.mime_type) diff --git a/pywps/inout/outputs.py b/pywps/inout/outputs.py index 9b8b18ada..197abadca 100644 --- a/pywps/inout/outputs.py +++ b/pywps/inout/outputs.py @@ -13,7 +13,7 @@ from pywps.app.Common import Metadata from pywps.exceptions import InvalidParameterValue from pywps.inout import basic -from pywps.inout.storage.file import FileStorageBuilder +from pywps.inout.storage import new_storage from pywps.inout.types import Translations from pywps.validator.mode import MODE from pywps import configuration as config @@ -205,8 +205,8 @@ def _json_reference(self, data): if self.prop == 'url': data["href"] = self.url elif self.prop is not None: - self.storage = FileStorageBuilder().build() - data["href"] = self.get_url() + self.storage = new_storage() + data["href"] = self.ensure_storage() return data diff --git a/pywps/inout/storage/__init__.py b/pywps/inout/storage/__init__.py index 2931a83f7..77fe195f2 100644 --- a/pywps/inout/storage/__init__.py +++ b/pywps/inout/storage/__init__.py @@ -1,97 +1,3 @@ -################################################################## -# Copyright 2018 Open Source Geospatial Foundation and others # -# licensed under MIT, Please consult LICENSE.txt for details # -################################################################## - -import logging -import os -from abc import ABCMeta, abstractmethod - -LOGGER = logging.getLogger('PYWPS') - - -class STORE_TYPE: - PATH = 0 - S3 = 1 -# TODO: cover with tests - - -class StorageAbstract(object, metaclass=ABCMeta): - """Data storage abstract class - """ - - @abstractmethod - def store(self, output): - """ - :param output: of type IOHandler - :returns: (type, store, url) where - type - is type of STORE_TYPE - number - store - string describing storage - file name, database connection - url - url, where the data can be downloaded - """ - raise NotImplementedError - - @abstractmethod - def write(self, data, destination, data_format=None): - """ - :param data: data to write to storage - :param destination: identifies the destination to write to storage - generally a file name which can be interpreted - by the implemented Storage class in a manner of - its choosing - :param data_format: Optional parameter of type pywps.inout.formats.FORMAT - describing the format of the data to write. - :returns: url where the data can be downloaded - """ - raise NotImplementedError - - @abstractmethod - def url(self, destination): - """ - :param destination: the name of the output to calculate - the url for - :returns: URL where file_name can be reached - """ - raise NotImplementedError - - @abstractmethod - def location(self, destination): - """ - Provides a location for the specified destination. - This may be any path, pathlike object, db connection string, URL, etc - and it is not guaranteed to be accessible on the local file system - :param destination: the name of the output to calculate - the location for - :returns: location where file_name can be found - """ - raise NotImplementedError - - -class CachedStorage(StorageAbstract): - def __init__(self): - self._cache = {} - - def store(self, output): - if output.identifier not in self._cache: - self._cache[output.identifier] = self._do_store(output) - return self._cache[output.identifier] - - def _do_store(self, output): - raise NotImplementedError - - -class DummyStorage(StorageAbstract): - """Dummy empty storage implementation, does nothing - - Default instance, for non-reference output request - - >>> store = DummyStorage() - >>> assert store.store - """ - - def __init__(self): - """ - """ - - def store(self, output): - pass +from .basic import new_storage, get_storage_instance +from .database import DatabaseStorage +from .file import FileStorage diff --git a/pywps/inout/storage/basic.py b/pywps/inout/storage/basic.py new file mode 100644 index 000000000..ebf162172 --- /dev/null +++ b/pywps/inout/storage/basic.py @@ -0,0 +1,105 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## +import uuid + +import pywps.configuration +import pywps.dblog + +# Allow to store, create and convert storage +_storage_type_registry = {} + + +# register the given class as storage +def register_storage_type(cls): + global _storage_type_registry + _storage_type_registry[cls.__name__] = cls + return cls + + +# Return a fresh storage of given typename or the default one +def new_storage(type=None): + global _storage_type_registry + if type is None: + type = pywps.configuration.get_config_value('server', 'storagetype', "DatabaseStorage") + return _storage_type_registry[type]() + + +# Return a Storage that handle the given uuid or None if not found. +def get_storage_instance(uuid): + global _storage_type_registry + store_instance_record = pywps.dblog.get_storage_record(uuid) + if store_instance_record: + return _storage_type_registry[store_instance_record.type]( + uuid=store_instance_record.uuid, + pretty_filename=store_instance_record.pretty_filename, + mimetype=store_instance_record.mimetype, + data=store_instance_record.data + ) + return None + + +class StorageAbstract(object): + """Data storage abstract class + """ + + def __init__(self, **kwargs): + + if "uuid" in kwargs: + self._uuid = uuid.UUID(kwargs["uuid"]) + self._pretty_filename = kwargs.get("pretty_filename", None) + self._mimetype = kwargs.get("mimetype", None) + self.load(kwargs.get("data", b'')) + else: + # Given uuid to the store + self._uuid = uuid.uuid1() + + # Will be set only when export is made + self._pretty_filename = None + self._mimetype = None + + def open(self, mode="r", encoding=None): + """ + Return file object like handler + """ + raise NotImplementedError + + def export(self, pretty_filename, mimetype): + """ + Export this file to be available from web + """ + self._pretty_filename = pretty_filename + self._mimetype = mimetype + + pywps.dblog.update_storage_record(self) + return self.url + + def unexport(self): + self.export(None, None) + + @property + def uuid(self): + return self._uuid + + @property + def pretty_filename(self): + return self._pretty_filename + + @property + def mimetype(self): + return self._mimetype + + @property + def url(self): + """Build and return the exported url""" + base_url = pywps.configuration.get_config_value('server', 'url').rstrip('/') + return f"{base_url}/files?uuid={self.uuid}" + + def dump(self): + """Dump data into bytes array""" + raise NotImplementedError + + def load(self, bytes): + """Load bytes array""" + raise NotImplementedError diff --git a/pywps/inout/storage/builder.py b/pywps/inout/storage/builder.py deleted file mode 100644 index 8dba60657..000000000 --- a/pywps/inout/storage/builder.py +++ /dev/null @@ -1,34 +0,0 @@ -################################################################## -# Copyright 2018 Open Source Geospatial Foundation and others # -# licensed under MIT, Please consult LICENSE.txt for details # -################################################################## - -from .s3 import S3StorageBuilder -from .file import FileStorageBuilder -import pywps.configuration as wpsConfig - -STORAGE_MAP = { - 's3': S3StorageBuilder, - 'file': FileStorageBuilder -} - - -class StorageBuilder: - """ - Class to construct other storage classes using - the server configuration to determine the appropriate type. - Will default to using FileStorage if the specified type - cannot be found - """ - @staticmethod - def buildStorage(): - """ - :returns: A StorageAbstract conforming object for storing - outputs that has been configured using the server - configuration - """ - storage_type = wpsConfig.get_config_value('server', 'storagetype').lower() - if storage_type not in STORAGE_MAP: - return FileStorageBuilder().build() - else: - return STORAGE_MAP[storage_type]().build() diff --git a/pywps/inout/storage/database.py b/pywps/inout/storage/database.py new file mode 100644 index 000000000..60a300e70 --- /dev/null +++ b/pywps/inout/storage/database.py @@ -0,0 +1,82 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## + +from .basic import StorageAbstract, register_storage_type + +import base64 +import io + +import pywps.dblog + + +class DatabaseStorageByteHandler(io.BytesIO): + def __init__(self, ds, initial_bytes=b''): + super(DatabaseStorageByteHandler, self).__init__(initial_bytes) + self._ds = ds + + def flush(self): + super(DatabaseStorageByteHandler, self).flush() + self._ds.store(self.getvalue()) + + # Override close + def close(self): + self._ds.store(self.getvalue()) + super(DatabaseStorageByteHandler, self).close() + + def __exit__(self, *args, **kwargs): + self._ds.store(self.getvalue()) + super(DatabaseStorageByteHandler, self).__exit__(*args, **kwargs) + + +class DatabaseStorageStringHandler(io.StringIO): + def __init__(self, ds, encoding, initial_value=""): + super(DatabaseStorageStringHandler, self).__init__(initial_value) + self._ds = ds + self._encoding = encoding + + def flush(self): + super(DatabaseStorageStringHandler, self).flush() + self._ds.store(self.getvalue().encode(self._encoding)) + + # Override close + def close(self): + self._ds.store(self.getvalue().encode(self._encoding)) + super(DatabaseStorageStringHandler, self).close() + + def __exit__(self, *args, **kwargs): + self._ds.store(self.getvalue().encode(self._encoding)) + super(DatabaseStorageStringHandler, self).__exit__(*args, **kwargs) + + +@register_storage_type +class DatabaseStorage(StorageAbstract): + """Database storage will put data in base64 within the database + """ + + def __init__(self, **kwargs): + self._data = b'' + super().__init__(**kwargs) + + def open(self, mode, encoding=None): + if encoding is None: + if mode == "w": + return DatabaseStorageByteHandler(self) + else: + return DatabaseStorageByteHandler(self, self._data) + else: + if mode == "w": + return DatabaseStorageStringHandler(self, encoding) + else: + return DatabaseStorageStringHandler(self, encoding, self._data.decode(encoding)) + + def store(self, bytes): + self._data = bytes + pywps.dblog.update_storage_record(self) + + def dump(self): + return self._data + + def load(self, bytes): + self._data = bytes diff --git a/pywps/inout/storage/file.py b/pywps/inout/storage/file.py index e8a467b44..d1a8f5532 100644 --- a/pywps/inout/storage/file.py +++ b/pywps/inout/storage/file.py @@ -3,194 +3,45 @@ # licensed under MIT, Please consult LICENSE.txt for details # ################################################################## -import logging -import os -from urllib.parse import urljoin -from pywps.exceptions import NotEnoughStorage, FileStorageError -from pywps import configuration as config -from pywps.inout.basic import IOHandler - -from . import CachedStorage -from .implementationbuilder import StorageImplementationBuilder -from . import STORE_TYPE - -LOGGER = logging.getLogger('PYWPS') - - -class FileStorageBuilder(StorageImplementationBuilder): - - def build(self): - file_path = config.get_config_value('server', 'outputpath') - base_url = config.get_config_value('server', 'outputurl') - copy_function = config.get_config_value('server', 'storage_copy_function') - return FileStorage(file_path, base_url, copy_function=copy_function) - - -def _build_output_name(output): - (prefix, suffix) = os.path.splitext(output.file) - if not suffix: - suffix = output.data_format.extension - _, file_name = os.path.split(prefix) - output_name = file_name + suffix - return (output_name, suffix) +from .basic import StorageAbstract, register_storage_type +import pywps.configuration +from pywps.exceptions import StorageNotSupported, NoApplicableCode +import os +import pywps.dblog -class FileStorage(CachedStorage): - """File storage implementation, stores data to file system - >>> import ConfigParser - >>> config = ConfigParser.RawConfigParser() - >>> config.add_section('FileStorage') - >>> config.set('FileStorage', 'target', './') - >>> config.add_section('server') - >>> config.set('server', 'outputurl', 'http://foo/bar/filestorage') - >>> - >>> store = FileStorage() - >>> - >>> class FakeOutput(object): - ... def __init__(self): - ... self.file = self._get_file() - ... def _get_file(self): - ... tiff_file = open('file.tiff', 'w') - ... tiff_file.close() - ... return 'file.tiff' - >>> fake_out = FakeOutput() - >>> (type, path, url) = store.store(fake_out) - >>> type == STORE_TYPE.PATH - True +@register_storage_type +class FileStorage(StorageAbstract): + """Store data into file in the directory 'server' 'outputpath' """ - def __init__(self, output_path, output_url, copy_function=None): - """ - """ - CachedStorage.__init__(self) - self.target = output_path - self.output_url = output_url - self.copy_function = copy_function - - def _do_store(self, output): - """Copy output to final storage location. - - - Create output directory - - Check available file space - - Create output file name, taking care of possible duplicates - - Copy / link output in work directory to output directory - - Return store type, output path and output URL - """ - import platform - import math - import tempfile - import uuid - - file_name = output.file - - request_uuid = output.uuid or uuid.uuid1() - - # Create a target folder for each request - target = os.path.join(self.target, str(request_uuid)) - if not os.path.exists(target): - os.makedirs(target) - - # st.blksize is not available in windows, skips the validation on windows - if platform.system() != 'Windows': - file_block_size = os.stat(file_name).st_blksize - # get_free_space delivers the numer of free blocks, not the available size! - avail_size = get_free_space(self.target) * file_block_size - file_size = os.stat(file_name).st_size - - # calculate space used according to block size - actual_file_size = math.ceil(file_size / float(file_block_size)) * file_block_size - - if avail_size < actual_file_size: - raise NotEnoughStorage('Not enough space in {} to store {}'.format(self.target, file_name)) - - # build output name - output_name, suffix = _build_output_name(output) - # build tempfile in case of duplicates - if os.path.exists(os.path.join(target, output_name)): - output_name = tempfile.mkstemp(suffix=suffix, prefix=file_name + '_', - dir=target)[1] - - full_output_name = os.path.join(target, output_name) - LOGGER.info(f'Storing file output to {full_output_name} ({self.copy_function}).') - try: - self.copy(output.file, full_output_name, self.copy_function) - except Exception: - LOGGER.exception(f"Could not copy {output_name}.") - raise FileStorageError("Could not copy output file.") - - just_file_name = os.path.basename(output_name) - - url = self.url("{}/{}".format(request_uuid, just_file_name)) - LOGGER.info('File output URI: {}'.format(url)) - - return STORE_TYPE.PATH, output_name, url - - @staticmethod - def copy(src, dst, copy_function=None): - """Copy file from source to destination using `copy_function`. - - Values of `copy_function` (default=`copy`): - * copy: using `shutil.copy2` - * move: using `shutil.move` - * link: using `os.link` (hardlink) - """ - import shutil - if copy_function == 'move': - shutil.move(src, dst) - elif copy_function == 'link': - try: - os.link(src, dst) - except Exception: - LOGGER.warning("Could not create hardlink. Fallback to copy.") - FileStorage.copy(src, dst) - else: - shutil.copy2(src, dst) - - def write(self, data, destination, data_format=None): - """ - Write data to self.target - """ - if not os.path.exists(os.path.dirname(self.target)): - os.makedirs(self.target) - - full_output_name = os.path.join(self.target, destination) - - with open(full_output_name, "w") as file: - file.write(data) - - return self.url(destination) - - def url(self, destination): - if isinstance(destination, IOHandler): - output_name, _ = _build_output_name(destination) - just_file_name = os.path.basename(output_name) - dst = f"{destination.uuid}/{just_file_name}" + def __init__(self, **kwargs): + self._path = None + super().__init__(**kwargs) + + if self._path is None: + outputpath = pywps.configuration.get_config_value('server', 'outputpath', None) + if outputpath is None: + raise NoApplicableCode("Configuration [server] outputpath is missing", code=500) + self._path = os.path.join(outputpath, str(self.uuid)) + + def open(self, mode, encoding=None): + if encoding is None: + if mode == "w": + pywps.dblog.update_storage_record(self) + return open(self._path, "wb") + else: + return open(self._path, "rb") else: - dst = destination - - # make sure base url ends with '/' - baseurl = self.output_url.rstrip('/') + '/' - url = urljoin(baseurl, dst) - return url - - def location(self, destination): - return os.path.join(self.target, destination) - - -def get_free_space(folder): - """ Return folder/drive free space (in bytes) - """ - import platform - - if platform.system() == 'Windows': - import ctypes + if mode == "w": + pywps.dblog.update_storage_record(self) + return open(self._path, "w", encoding=encoding) + else: + return open(self._path, "r", encoding=encoding) - free_bytes = ctypes.c_ulonglong(0) - ctypes.windll.kernel32.GetDiskFreeSpaceExW(ctypes.c_wchar_p(folder), None, None, ctypes.pointer(free_bytes)) - free_space = free_bytes.value - else: - free_space = os.statvfs(folder).f_bfree + def dump(self): + return self._path.encode("utf-8") - LOGGER.debug('Free space: {}'.format(free_space)) - return free_space + def load(self, bytes): + self._path = bytes.decode("utf-8") diff --git a/pywps/inout/storage/implementationbuilder.py b/pywps/inout/storage/implementationbuilder.py deleted file mode 100644 index 271858830..000000000 --- a/pywps/inout/storage/implementationbuilder.py +++ /dev/null @@ -1,22 +0,0 @@ -################################################################## -# Copyright 2019 Open Source Geospatial Foundation and others # -# licensed under MIT, Please consult LICENSE.txt for details # -################################################################## - -from abc import ABCMeta, abstractmethod - - -class StorageImplementationBuilder(object, metaclass=ABCMeta): - """ - Storage implementations should implement - this class and build method then import and register - the build class into the StorageBuilder. - """ - - @abstractmethod - def build(self): - """ - :returns: An object which implements the - StorageAbstract class - """ - raise NotImplementedError diff --git a/pywps/inout/storage/s3.py b/pywps/inout/storage/s3.py deleted file mode 100644 index 8b0db7ae9..000000000 --- a/pywps/inout/storage/s3.py +++ /dev/null @@ -1,162 +0,0 @@ -################################################################## -# Copyright 2019 Open Source Geospatial Foundation and others # -# licensed under MIT, Please consult LICENSE.txt for details # -################################################################## - -import pywps.configuration as wpsConfig -from . import StorageAbstract -from .implementationbuilder import StorageImplementationBuilder -from . import STORE_TYPE - -import os -import logging - -LOGGER = logging.getLogger('PYWPS') - - -class S3StorageBuilder(StorageImplementationBuilder): - - def build(self): - bucket = wpsConfig.get_config_value('s3', 'bucket') - prefix = wpsConfig.get_config_value('s3', 'prefix') - public_access = wpsConfig.get_config_value('s3', 'public') - encrypt = wpsConfig.get_config_value('s3', 'encrypt') - region = wpsConfig.get_config_value('s3', 'region') - - return S3Storage(bucket, prefix, public_access, encrypt, region) - - -def _build_s3_file_path(prefix, filename): - if prefix: - path = prefix.rstrip('/') + '/' + filename.lstrip('/') - else: - path = filename.lstrip('/') - return path - - -def _build_extra_args(public=False, encrypt=False, mime_type=''): - extraArgs = dict() - - if public: - extraArgs['ACL'] = 'public-read' - if encrypt: - extraArgs['ServerSideEncryption'] = 'AES256' - - extraArgs['ContentType'] = mime_type - - return extraArgs - - -class S3Storage(StorageAbstract): - """ - Implements a simple class to store files on AWS S3 - Can optionally set the outputs to be publically readable - and can also encrypt files at rest - """ - def __init__(self, bucket, prefix, public_access, encrypt, region): - self.bucket = bucket - self.public = public_access - self.encrypt = encrypt - self.prefix = prefix - self.region = region - - def _wait_for(self, filename): - import boto3 - client = boto3.client('s3', region_name=self.region) - waiter = client.get_waiter('object_exists') - waiter.wait(Bucket=self.bucket, Key=filename) - - def uploadData(self, data, filename, extraArgs): - """ - :param data: Data to upload to S3 - :param filename: name of the file to upload to s3 - will be appened to the configured prefix - :returns: url to access the uploaded file - Creates or updates a file on S3 in the bucket specified in the server - configuration. The key of the created object will be equal to the - configured prefix with the destination parameter appended. - """ - import boto3 - - s3 = boto3.resource('s3', region_name=self.region) - s3.Object(self.bucket, filename).put(Body=data, **extraArgs) - LOGGER.debug('S3 Put: {} into bucket {}'.format(self.bucket, filename)) - # Ensure object is available before returning URL - self._wait_for(filename) - - # Create s3 URL - url = self.url(filename) - return url - - def uploadFileToS3(self, filename, extraArgs): - """ - :param filename: Path to file on local filesystem - :returns: url to access the uploaded file - Uploads a file from the local filesystem to AWS S3 - """ - url = '' - with open(filename, "rb") as data: - s3_path = _build_s3_file_path(self.prefix, os.path.basename(filename)) - url = self.uploadData(data, s3_path, extraArgs) - return url - - def store(self, output): - """ - :param output: Of type IOHandler - :returns: tuple(STORE_TYPE.S3, uploaded filename, url to access the uploaded file) - Stores an IOHandler object to AWS S3 and returns the storage type, string and a URL - to access the uploaded object - """ - filename = output.file - s3_path = _build_s3_file_path(self.prefix, os.path.basename(filename)) - extraArgs = _build_extra_args( - public=self.public, - encrypt=self.encrypt, - mime_type=output.data_format.mime_type) - url = self.uploadFileToS3(filename, extraArgs) - return (STORE_TYPE.S3, s3_path, url) - - def write(self, data, destination, data_format=None): - """ - :param data: Data that will be written to S3. Can be binary or text - :param destination: Filename of object that will be created / updated - on S3. - :param data_format: Format of the data. Will set the mime_type - of the file on S3. If not set, no mime_type will - be set. - Creates or updates a file on S3 in the bucket specified in the server - configuration. The key of the created object will be equal to the - configured prefix with the destination parameter appended. - """ - # Get MimeType from format if it exists - mime_type = data_format.mime_type if data_format is not None else '' - s3_path = _build_s3_file_path(self.prefix, destination) - extraArgs = _build_extra_args( - public=self.public, - encrypt=self.encrypt, - mime_type=mime_type) - return self.uploadData(data, s3_path, extraArgs) - - def url(self, destination): - """ - :param destination: File of object to create a URL for. This should - not include any prefix configured in the server - configuration. - :returns: URL for accessing an object in S3 using a HTTPS GET - request - """ - import boto3 - client = boto3.client('s3', region_name=self.region) - url = '{}/{}/{}'.format(client.meta.endpoint_url, self.bucket, destination) - LOGGER.debug('S3 URL calculated as: {}'.format(url)) - return url - - def location(self, destination): - """ - :param destination: File of object to create a location for. This should - not include any prefix configured in the server - configuration. - :returns: URL for accessing an object in S3 using a HTTPS GET - request - """ - return self.url(destination) diff --git a/pywps/response/capabilities.py b/pywps/response/capabilities.py index 56ba4aa1f..369abce8e 100644 --- a/pywps/response/capabilities.py +++ b/pywps/response/capabilities.py @@ -2,7 +2,7 @@ from werkzeug.wrappers import Request import pywps.configuration as config -from pywps.app.basic import make_response, get_response_type, get_json_indent +from pywps.app.basic import make_response, select_response_mimetype, get_json_indent from .basic import WPSResponse from pywps import __version__ from pywps.exceptions import NoApplicableCode @@ -56,7 +56,7 @@ def json(self): 'instructions': config.get_config_value('metadata:main', 'contact_instructions'), 'role': config.get_config_value('metadata:main', 'contact_role') }, - 'serviceurl': config.get_config_value('server', 'url'), + 'serviceurl': config.get_config_value('server', 'url').rstrip('/') + '/wps', 'languages': config.get_config_value('server', 'language').split(','), 'language': self.wps_request.language, 'processes': processes @@ -68,9 +68,9 @@ def _render_json_response(jdoc): def _construct_doc(self): doc = self.json - json_response, mimetype = get_response_type( + mimetype = select_response_mimetype( self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) - if json_response: + if mimetype == 'application/json': doc = json.dumps(self._render_json_response(doc), indent=get_json_indent()) else: template = self.template_env.get_template(self.version + '/capabilities/main.xml') diff --git a/pywps/response/describe.py b/pywps/response/describe.py index c4dec0b26..1fb965f93 100644 --- a/pywps/response/describe.py +++ b/pywps/response/describe.py @@ -2,7 +2,7 @@ from werkzeug.wrappers import Request import pywps.configuration as config -from pywps.app.basic import make_response, get_response_type, get_json_indent +from pywps.app.basic import make_response, select_response_mimetype, get_json_indent from pywps.exceptions import NoApplicableCode from pywps.exceptions import MissingParameterValue from pywps.exceptions import InvalidParameterValue @@ -52,9 +52,9 @@ def _construct_doc(self): raise MissingParameterValue('Missing parameter value "identifier"', 'identifier') doc = self.json - json_response, mimetype = get_response_type( + mimetype = select_response_mimetype( self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) - if json_response: + if mimetype == 'application/json': doc = json.dumps(self._render_json_response(doc), indent=get_json_indent()) else: template = self.template_env.get_template(self.version + '/describe/main.xml') diff --git a/pywps/response/execute.py b/pywps/response/execute.py index 4989ab5f7..ace21ed91 100755 --- a/pywps/response/execute.py +++ b/pywps/response/execute.py @@ -7,14 +7,16 @@ import logging import time from werkzeug.wrappers import Request + +import pywps.dblog from pywps import get_ElementMakerForVersion -from pywps.app.basic import get_response_type, get_json_indent, get_default_response_mimetype +from pywps.app.basic import select_response_mimetype, get_json_indent, get_default_response_mimetype from pywps.exceptions import NoApplicableCode import pywps.configuration as config from werkzeug.wrappers import Response from pywps.inout.array_encode import ArrayEncoder -from pywps.response.status import WPS_STATUS +from pywps.response.status import WPS_STATUS, StatusResponse from .basic import WPSResponse from pywps.inout.formats import FORMATS from pywps.inout.outputs import ComplexOutput @@ -43,6 +45,10 @@ def __init__(self, wps_request, uuid, **kwargs): self.outputs = {o.identifier: o for o in self.process.outputs} self.store_status_file = False + # select the output mimetype + accept_mimetypes = getattr(self.wps_request.http_request, 'accept_mimetypes', None) + self.mimetype = select_response_mimetype(accept_mimetypes, self.wps_request.default_mimetype) + # override WPSResponse._update_status def _update_status(self, status, message, status_percentage, clean=True): """ @@ -56,9 +62,8 @@ def _update_status(self, status, message, status_percentage, clean=True): """ super(ExecuteResponse, self)._update_status(status, message, status_percentage) LOGGER.debug("_update_status: status={}, clean={}".format(status, clean)) - self._update_status_doc() if self.store_status_file: - self._update_status_file() + pywps.dblog.update_status_record(self.uuid, self.json) if clean: if self.status == WPS_STATUS.SUCCEEDED or self.status == WPS_STATUS.FAILED: LOGGER.debug("clean workdir: status={}".format(status)) @@ -78,24 +83,6 @@ def update_status(self, message, status_percentage=None): status_percentage = self.status_percentage self._update_status(self.status, message, status_percentage, False) - def _update_status_doc(self): - try: - # rebuild the doc - self.doc, self.content_type = self._construct_doc() - except Exception as e: - raise NoApplicableCode('Building Response Document failed with : {}'.format(e)) - - def _update_status_file(self): - # TODO: check if file/directory is still present, maybe deleted in mean time - try: - # update the status xml file - self.process.status_store.write( - self.doc, - self.process.status_filename, - data_format=FORMATS.XML) - except Exception as e: - raise NoApplicableCode('Writing Response Document failed with : {}'.format(e)) - def _process_accepted(self): percent = int(self.status_percentage) if percent > 99: @@ -152,14 +139,23 @@ def _get_serviceinstance(self): @property def json(self): + if self.status == WPS_STATUS.SUCCEEDED and \ + hasattr(self.wps_request, 'preprocess_response') and \ + self.wps_request.preprocess_response: + self.outputs = self.wps_request.preprocess_response(self.outputs, + request=self.wps_request, + http_request=self.wps_request.http_request) + self.preprocess_response = None + data = {} data["language"] = self.wps_request.language data["service_instance"] = self._get_serviceinstance() data["process"] = self.process.json + data["version"] = self.version if self.store_status_file: if self.process.status_location: - data["status_location"] = self.process.status_url + data["status_location"] = self.process.status_location if self.status == WPS_STATUS.ACCEPTED: self.message = 'PyWPS Process {} accepted'.format(self.process.identifier) @@ -191,48 +187,8 @@ def json(self): data["output_definitions"] = [self.outputs[o].json for o in self.outputs] return data - @staticmethod - def _render_json_response(jdoc): - response = dict() - response['status'] = jdoc['status'] - out = jdoc['process']['outputs'] - d = {} - for val in out: - id = val.get('identifier') - if id is None: - continue - type = val.get('type') - key = 'bbox' if type == 'bbox' else 'data' - if key in val: - d[id] = val[key] - response['outputs'] = d - return response - - def _construct_doc(self): - if self.status == WPS_STATUS.SUCCEEDED and \ - hasattr(self.wps_request, 'preprocess_response') and \ - self.wps_request.preprocess_response: - self.outputs = self.wps_request.preprocess_response(self.outputs, - request=self.wps_request, - http_request=self.wps_request.http_request) - doc = self.json - try: - json_response, mimetype = get_response_type( - self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) - except Exception: - mimetype = get_default_response_mimetype() - json_response = 'json' in mimetype - if json_response: - doc = json.dumps(self._render_json_response(doc), cls=ArrayEncoder, indent=get_json_indent()) - else: - template = self.template_env.get_template(self.version + '/execute/main.xml') - doc = template.render(**doc) - return doc, mimetype - @Request.application def __call__(self, request): - accept_json_response, accepted_mimetype = get_response_type( - self.wps_request.http_request.accept_mimetypes, self.wps_request.default_mimetype) if self.wps_request.raw: if self.status == WPS_STATUS.FAILED: return NoApplicableCode(self.message) @@ -260,7 +216,7 @@ def __call__(self, request): mimetype = self.wps_request.outputs[wps_output_identifier].get('mimetype', None) if not isinstance(response, (str, bytes, bytearray)): if not mimetype: - mimetype = accepted_mimetype + mimetype = self.mimetype json_response = mimetype and 'json' in mimetype if json_response: mimetype = 'application/json' @@ -274,6 +230,4 @@ def __call__(self, request): headers={'Content-Disposition': 'attachment; filename="{}"' .format(wps_output_identifier + suffix)}) else: - if not self.doc: - return NoApplicableCode("Output was not generated") - return Response(self.doc, mimetype=accepted_mimetype) + return StatusResponse(self.json, self.mimetype) diff --git a/pywps/response/status.py b/pywps/response/status.py index 4a3606e04..4db7fc9f4 100644 --- a/pywps/response/status.py +++ b/pywps/response/status.py @@ -1,4 +1,48 @@ from collections import namedtuple +from werkzeug.wrappers import Response +import json +from pywps.inout.array_encode import ArrayEncoder +from pywps.app.basic import get_json_indent +from jinja2 import Environment, PackageLoader +import os +from pywps.translations import get_translation + +from . import RelEnvironment _WPS_STATUS = namedtuple('WPSStatus', ['UNKNOWN', 'ACCEPTED', 'STARTED', 'PAUSED', 'SUCCEEDED', 'FAILED']) WPS_STATUS = _WPS_STATUS(0, 1, 2, 3, 4, 5) + + +class StatusResponse(Response): + def __init__(self, json_data, mimetype): + + template_env = RelEnvironment( + loader=PackageLoader('pywps', 'templates'), + trim_blocks=True, lstrip_blocks=True, + autoescape=True, + ) + template_env.globals.update(get_translation=get_translation) + + if mimetype == 'application/json': + doc = json.dumps(self._render_json_response(json_data), cls=ArrayEncoder, indent=get_json_indent()) + else: + template = template_env.get_template(json_data["version"] + '/execute/main.xml') + doc = template.render(**json_data) + super(StatusResponse, self).__init__(response=doc, mimetype=mimetype) + + @staticmethod + def _render_json_response(jdoc): + response = dict() + response['status'] = jdoc['status'] + out = jdoc['process']['outputs'] + d = {} + for val in out: + id = val.get('identifier') + if id is None: + continue + type = val.get('type') + key = 'bbox' if type == 'bbox' else 'data' + if key in val: + d[id] = val[key] + response['outputs'] = d + return response diff --git a/pywps/util.py b/pywps/util.py index bc466d6b0..516b48601 100644 --- a/pywps/util.py +++ b/pywps/util.py @@ -8,9 +8,13 @@ from pathlib import Path from urllib.parse import urlparse +import os is_windows = platform.system() == 'Windows' +import struct +import fcntl + def file_uri(path: Union[str, Path]) -> str: path = Path(path) @@ -24,3 +28,31 @@ def uri_to_path(uri) -> str: if is_windows: path = str(Path(path)).lstrip('\\') return path + + +class FileLock(): + """Implement a file based lock""" + + def __init__(self, filename): + self._fd = os.open(filename, os.O_RDWR | os.O_CREAT) + + def acquire(self): + # Wait to lock the whole file + fcntl.fcntl(self._fd, fcntl.F_SETLKW, struct.pack("hhlll", fcntl.F_WRLCK, os.SEEK_SET, 0, 0, 0)) + pass + + def release(self): + # Unlock the file + fcntl.fcntl(self._fd, fcntl.F_SETLKW, struct.pack("hhlll", fcntl.F_UNLCK, os.SEEK_SET, 0, 0, 0)) + pass + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, *args, **kwargs): + self.release() + + def __del__(self): + self.release() + os.close(self._fd) diff --git a/tests/__init__.py b/tests/__init__.py index 8ea1a2b2e..48e6126c1 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -28,9 +28,8 @@ from tests import test_processing from tests import test_assync from tests import test_grass_location -from tests import test_storage -from tests import test_filestorage -from tests import test_s3storage +from tests import test_storage_database +from tests import test_storage_file from tests.validator import test_complexvalidators from tests.validator import test_literalvalidators @@ -92,9 +91,8 @@ def load_tests(loader=None, tests=None, pattern=None): test_processing.load_tests(), test_assync.load_tests(), test_grass_location.load_tests(), - test_storage.load_tests(), - test_filestorage.load_tests(), - test_s3storage.load_tests(), + test_storage_database.load_tests(), + test_storage_file.load_tests(), ]) diff --git a/tests/test_assync.py b/tests/test_assync.py index 2db2eb074..45b4d7644 100644 --- a/tests/test_assync.py +++ b/tests/test_assync.py @@ -59,7 +59,7 @@ def test_assync(self): ), version="1.0.0" ) - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert_response_accepted(resp) # TODO: diff --git a/tests/test_assync_inout.py b/tests/test_assync_inout.py index 496fcd714..7e4218fb6 100644 --- a/tests/test_assync_inout.py +++ b/tests/test_assync_inout.py @@ -60,7 +60,7 @@ def test_assync_inout(): ), version="1.0.0" ) - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert resp.status_code == 200 # TODO: diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py index 8e3e3851b..1ccaa4b33 100644 --- a/tests/test_capabilities.py +++ b/tests/test_capabilities.py @@ -17,17 +17,17 @@ class BadRequestTest(unittest.TestCase): def test_bad_http_verb(self): client = client_for(Service()) - resp = client.put('') - assert resp.status_code == 405 # method not allowed + resp = client.put('/wps') + assert resp.status_code == 405 # Not found def test_bad_request_type_with_get(self): client = client_for(Service()) - resp = client.get('?Request=foo') + resp = client.get('/wps?Request=foo') assert resp.status_code == 400 def test_bad_service_type_with_get(self): client = client_for(Service()) - resp = client.get('?service=foo') + resp = client.get('/wps?service=foo') exception = resp.xpath('/ows:ExceptionReport' '/ows:Exception') @@ -38,7 +38,7 @@ def test_bad_service_type_with_get(self): def test_bad_request_type_with_post(self): client = client_for(Service()) request_doc = WPS.Foo() - resp = client.post_xml('', doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert resp.status_code == 400 @@ -97,20 +97,20 @@ def check_capabilities_response(self, resp): assert len(metadatas) == 2 def test_get_request(self): - resp = self.client.get('?Request=GetCapabilities&service=WpS') + resp = self.client.get('/wps?Request=GetCapabilities&service=WpS') self.check_capabilities_response(resp) # case insesitive check - resp = self.client.get('?request=getcapabilities&service=wps') + resp = self.client.get('/wps?request=getcapabilities&service=wps') self.check_capabilities_response(resp) def test_post_request(self): request_doc = WPS.GetCapabilities() - resp = self.client.post_xml(doc=request_doc) + resp = self.client.post_xml('/wps', doc=request_doc) self.check_capabilities_response(resp) def test_get_bad_version(self): - resp = self.client.get('?request=getcapabilities&service=wps&acceptversions=2001-123') + resp = self.client.get('/wps?request=getcapabilities&service=wps&acceptversions=2001-123') exception = resp.xpath('/ows:ExceptionReport' '/ows:Exception') assert resp.status_code == 400 @@ -119,7 +119,7 @@ def test_get_bad_version(self): def test_post_bad_version(self): acceptedVersions_doc = OWS.AcceptVersions(OWS.Version('2001-123')) request_doc = WPS.GetCapabilities(acceptedVersions_doc) - resp = self.client.post_xml(doc=request_doc) + resp = self.client.post_xml('/wps', doc=request_doc) exception = resp.xpath('/ows:ExceptionReport' '/ows:Exception') @@ -127,11 +127,11 @@ def test_post_bad_version(self): assert exception[0].attrib['exceptionCode'] == 'VersionNegotiationFailed' def test_version(self): - resp = self.client.get('?service=WPS&request=GetCapabilities&version=1.0.0') + resp = self.client.get('/wps?service=WPS&request=GetCapabilities&version=1.0.0') assert_wps_version(resp) def test_version2(self): - resp = self.client.get('?service=WPS&request=GetCapabilities&acceptversions=2.0.0') + resp = self.client.get('/wps?service=WPS&request=GetCapabilities&acceptversions=2.0.0') assert_wps_version(resp, version="2.0.0") @@ -164,7 +164,7 @@ def tearDown(self): configuration.CONFIG.set('server', 'language', 'en-US') def test_get_translated(self): - resp = self.client.get('?Request=GetCapabilities&service=wps&language=fr-CA') + resp = self.client.get('/wps?Request=GetCapabilities&service=wps&language=fr-CA') assert resp.xpath('/wps:Capabilities/@xml:lang')[0] == "fr-CA" diff --git a/tests/test_complexdata_io.py b/tests/test_complexdata_io.py index 84ab9a46b..a1d3f8c6b 100644 --- a/tests/test_complexdata_io.py +++ b/tests/test_complexdata_io.py @@ -96,7 +96,7 @@ def compare_io(self, name, fn, fmt): inputs=[('complex', ComplexDataInput(data, mimeType=fmt.mime_type, encoding=fmt.encoding))], mode='sync') - resp = client.post_xml(doc=doc) + resp = client.post_xml('/wps', doc=doc) assert_response_success(resp) wps.parseResponse(resp.xml) out = wps.processOutputs[0].data[0] diff --git a/tests/test_dblog.py b/tests/test_dblog.py index 7e26a685c..84645b7a5 100644 --- a/tests/test_dblog.py +++ b/tests/test_dblog.py @@ -9,9 +9,21 @@ import unittest from pywps import configuration -from pywps.dblog import get_session -from pywps.dblog import ProcessInstance +import pywps.dblog as dblog +from types import SimpleNamespace as ns +import json + +fake_request = ns( + version = '1.0.0', + operation = 'execute', + identifier = 'dummy_identifier' +) + +fake_process = ns( + uuid="0bf3cd00-0102-11ed-8421-e4b97ac7e08e", + json=json.dumps({"identifier": "something"}) +) class DBLogTest(unittest.TestCase): """DBGLog test cases""" @@ -20,29 +32,91 @@ def setUp(self): self.database = configuration.get_config_value('logging', 'database') - def test_0_dblog(self): - """Test pywps.formats.Format class - """ - session = get_session() - self.assertTrue(session) - - def test_db_content(self): - session = get_session() - null_time_end = session.query(ProcessInstance).filter(ProcessInstance.time_end == None) - self.assertEqual(null_time_end.count(), 0, - 'There are no unfinished processes loged') - - null_status = session.query(ProcessInstance).filter(ProcessInstance.status == None) - self.assertEqual(null_status.count(), 0, - 'There are no processes without status loged') - - null_percent = session.query(ProcessInstance).filter(ProcessInstance.percent_done == None) - self.assertEqual(null_percent.count(), 0, - 'There are no processes without percent loged') - - null_percent = session.query(ProcessInstance).filter(ProcessInstance.percent_done < 100) - self.assertEqual(null_percent.count(), 0, - 'There are no unfinished processes') + def test_log_request(self): + dblog.log_request("0bf3cd00-0102-11ed-8421-e4b97ac7e02e", fake_request) + dblog.log_request("0bf3cd00-0102-11ed-8421-e4b97ac7e03e", fake_request) + dblog.log_request("0bf3cd00-0102-11ed-8421-e4b97ac7e04e", fake_request) + + running, stored = dblog.get_process_counts() + assert running == 0 + assert stored == 0 + + dblog.store_status("0bf3cd00-0102-11ed-8421-e4b97ac7e03e", dblog.WPS_STATUS.ACCEPTED, "accepted", 10) + + running, stored = dblog.get_process_counts() + assert running == 0 + assert stored == 0 + + dblog.store_status("0bf3cd00-0102-11ed-8421-e4b97ac7e04e", dblog.WPS_STATUS.STARTED, "started", 10) + dblog.update_pid("0bf3cd00-0102-11ed-8421-e4b97ac7e04e", 10) + + running, stored = dblog.get_process_counts() + assert running == 1 + assert stored == 0 + + dblog.store_status(fake_process.uuid, dblog.WPS_STATUS.ACCEPTED, "accepted", 10) + dblog.store_process(fake_process) + + running, stored = dblog.get_process_counts() + assert running == 1 + assert stored == 1 + + p = dblog.pop_first_stored() + assert p.uuid == fake_process.uuid + + running, stored = dblog.get_process_counts() + assert running == 1 + assert stored == 0 + + def test_storage(self): + fake_storage = ns( + uuid="ebf3cd00-0102-11ed-8421-e4b97ac7e02e", + pretty_filename = "pretty_filename.txt", + mimetype="text/plain", + dump=lambda: b'somedata' + ) + + dblog.update_storage_record(fake_storage) + + s = dblog.get_storage_record(fake_storage.uuid) + + assert s.uuid == fake_storage.uuid + assert s.pretty_filename == fake_storage.pretty_filename + assert s.mimetype == fake_storage.mimetype + assert s.data == fake_storage.dump() + + def test_status(self): + dblog.update_status_record("fbf3cd00-0102-11ed-8421-e4b97ac7e02e", "somedata") + s = dblog.get_status_record("fbf3cd00-0102-11ed-8421-e4b97ac7e02e") + assert s.uuid == "fbf3cd00-0102-11ed-8421-e4b97ac7e02e" + assert s.data == "somedata" + + def test_crashed_process(self): + fake_process_status_data = { + "process": {"uuid": "0cf3cd00-0102-11ed-8421-e4b97ac7e02e"}, + "status": { + "status": "started", + "time": "2022-07-11T17:07:18Z", + "percent_done": "10", + "message": "PyWPS Process Started" + } + } + + dblog.log_request(fake_process_status_data["process"]["uuid"], fake_request) + dblog.store_status(fake_process_status_data["process"]["uuid"], dblog.WPS_STATUS.STARTED, "accepted", 10) + dblog.update_pid(fake_process_status_data["process"]["uuid"], -1) # some invalid pid + dblog.update_status_record(fake_process_status_data["process"]["uuid"], fake_process_status_data) + + s = dblog.get_status_record(fake_process_status_data["process"]["uuid"]) + assert s.uuid == fake_process_status_data["process"]["uuid"] + assert s.data['status']['status'] == 'started' + + dblog.cleanup_crashed_process() + + s = dblog.get_status_record(fake_process_status_data["process"]["uuid"]) + assert s.uuid == fake_process_status_data["process"]["uuid"] + assert s.data['status']['status'] == 'failed' + def load_tests(loader=None, tests=None, pattern=None): """Load local tests diff --git a/tests/test_describe.py b/tests/test_describe.py index 17b6d9a91..212e4733c 100644 --- a/tests/test_describe.py +++ b/tests/test_describe.py @@ -93,7 +93,7 @@ def ping(request): self.client = client_for(Service(processes=processes)) def test_get_request_all_args(self): - resp = self.client.get('?Request=DescribeProcess&service=wps&version=1.0.0&identifier=all') + resp = self.client.get('/wps?Request=DescribeProcess&service=wps&version=1.0.0&identifier=all') identifiers = [desc.identifier for desc in get_describe_result(resp)] metadata = [desc.metadata for desc in get_describe_result(resp)] @@ -103,20 +103,20 @@ def test_get_request_all_args(self): assert 'hello metadata' in [item for sublist in metadata for item in sublist] def test_get_request_zero_args(self): - resp = self.client.get('?Request=DescribeProcess&version=1.0.0&service=wps') + resp = self.client.get('/wps?Request=DescribeProcess&version=1.0.0&service=wps') assert resp.status_code == 400 def test_get_request_nonexisting_process_args(self): - resp = self.client.get('?Request=DescribeProcess&version=1.0.0&service=wps&identifier=NONEXISTINGPROCESS') + resp = self.client.get('/wps?Request=DescribeProcess&version=1.0.0&service=wps&identifier=NONEXISTINGPROCESS') assert resp.status_code == 400 def test_post_request_zero_args(self): request_doc = WPS.DescribeProcess() - resp = self.client.post_xml(doc=request_doc) + resp = self.client.post_xml('/wps', doc=request_doc) assert resp.status_code == 400 def test_get_one_arg(self): - resp = self.client.get('?service=wps&version=1.0.0&Request=DescribeProcess&identifier=hello') + resp = self.client.get('/wps?service=wps&version=1.0.0&Request=DescribeProcess&identifier=hello') assert [pr.identifier for pr in get_describe_result(resp)] == ['hello'] def test_post_one_arg(self): @@ -124,7 +124,7 @@ def test_post_one_arg(self): OWS.Identifier('hello'), version='1.0.0' ) - resp = self.client.post_xml(doc=request_doc) + resp = self.client.post_xml('/wps', doc=request_doc) assert resp.status_code == 200 def test_post_two_args(self): @@ -133,7 +133,7 @@ def test_post_two_args(self): OWS.Identifier('ping'), version='1.0.0' ) - resp = self.client.post_xml(doc=request_doc) + resp = self.client.post_xml('/wps', doc=request_doc) result = get_describe_result(resp) # print(b"\n".join(resp.response).decode("utf-8")) assert [pr.identifier for pr in result] == ['hello', 'ping'] @@ -201,7 +201,7 @@ def tearDown(self): configuration.CONFIG.set('server', 'language', 'en-US') def test_get_describe_translations(self): - resp = self.client.get('?Request=DescribeProcess&service=wps&version=1.0.0&identifier=all&language=fr-CA') + resp = self.client.get('/wps?Request=DescribeProcess&service=wps&version=1.0.0&identifier=all&language=fr-CA') assert resp.xpath('/wps:ProcessDescriptions/@xml:lang')[0] == "fr-CA" @@ -227,7 +227,7 @@ class DescribeProcessInputTest(unittest.TestCase): def describe_process(self, process): client = client_for(Service(processes=[process])) - resp = client.get('?service=wps&version=1.0.0&Request=DescribeProcess&identifier={}'.format(process.identifier)) + resp = client.get('/wps?service=wps&version=1.0.0&Request=DescribeProcess&identifier={}'.format(process.identifier)) [result] = get_describe_result(resp) return result diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py index 578568d8e..262a2350e 100644 --- a/tests/test_exceptions.py +++ b/tests/test_exceptions.py @@ -21,7 +21,7 @@ def setUp(self): self.client = client_for(Service(processes=[])) def test_invalid_parameter_value(self): - resp = self.client.get('?service=wms') + resp = self.client.get('/wps?service=wms') exception_el = resp.xpath('/ows:ExceptionReport/ows:Exception')[0] assert exception_el.attrib['exceptionCode'] == 'InvalidParameterValue' assert resp.status_code == 400 @@ -29,21 +29,21 @@ def test_invalid_parameter_value(self): assert_pywps_version(resp) def test_missing_parameter_value(self): - resp = self.client.get() + resp = self.client.get('/wps') exception_el = resp.xpath('/ows:ExceptionReport/ows:Exception')[0] assert exception_el.attrib['exceptionCode'] == 'MissingParameterValue' assert resp.status_code == 400 assert re.match(r'text/xml(;\s*charset=.*)?', resp.headers['Content-Type']) def test_missing_request(self): - resp = self.client.get("?service=wps") + resp = self.client.get("/wps?service=wps") exception_el = resp.xpath('/ows:ExceptionReport/ows:Exception/ows:ExceptionText')[0] # should mention something about a request assert 'request' in exception_el.text assert re.match(r'text/xml(;\s*charset=.*)?', resp.headers['Content-Type']) def test_bad_request(self): - resp = self.client.get("?service=wps&request=xyz") + resp = self.client.get("/wps?service=wps&request=xyz") exception_el = resp.xpath('/ows:ExceptionReport/ows:Exception')[0] assert exception_el.attrib['exceptionCode'] == 'OperationNotSupported' assert re.match(r'text/xml(;\s*charset=.*)?', resp.headers['Content-Type']) diff --git a/tests/test_execute.py b/tests/test_execute.py index d3432c05d..c8f71874d 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -261,12 +261,15 @@ def test_dods(self): ), version='1.0.0' ) - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert_response_success(resp) """ class FakeRequest(): + http_request = None + default_mimetype = None identifier = 'my_opendap_process' + uuid = 'fakeuuid' service = 'wps' operation = 'execute' version = '1.0.0' @@ -282,7 +285,7 @@ class FakeRequest(): request = FakeRequest() - resp = service.execute('my_opendap_process', request, 'fakeuuid') + resp = service.execute(request) self.assertEqual(resp.outputs['conventions'].data, 'CF-1.0') self.assertEqual(resp.outputs['outdods'].url, href) self.assertTrue(resp.outputs['outdods'].as_reference) @@ -300,7 +303,10 @@ def test_input_parser(self): self.assertTrue(service.processes['my_complex_process']) class FakeRequest(): - identifier = 'complex_process' + http_request = None + default_mimetype = None + identifier = 'my_complex_process' + uuid = 'fakeuuid' service = 'wps' operation = 'execute' version = '1.0.0' @@ -309,12 +315,15 @@ class FakeRequest(): 'mimeType': 'text/gml', 'data': 'the data' }]} + raw = False + outputs = {} + store_execute = False language = "en-US" request = FakeRequest() try: - service.execute('my_complex_process', request, 'fakeuuid') + service.execute(request) except InvalidParameterValue as e: self.assertEqual(e.locator, 'mimeType') @@ -357,7 +366,10 @@ def test_input_default(self): self.assertTrue(service.processes['my_complex_process']) class FakeRequest(): - identifier = 'complex_process' + http_request = None + default_mimetype = None + identifier = 'my_complex_process' + uuid = 'fakeuuid' service = 'wps' operation = 'execute' version = '1.0.0' @@ -369,7 +381,7 @@ class FakeRequest(): language = "en-US" request = FakeRequest() - response = service.execute('my_complex_process', request, 'fakeuuid') + response = service.execute(request) self.assertEqual(response.outputs['complex'].data, 'DEFAULT COMPLEX DATA') def test_output_mimetype(self): @@ -388,7 +400,10 @@ def __init__(self, mimetype): 'data': 'the data' }} + http_request = None + default_mimetype = None identifier = 'get_mimetype_process' + uuid = 'fakeuuid' service = 'wps' operation = 'execute' version = '1.0.0' @@ -400,27 +415,27 @@ def __init__(self, mimetype): # valid mimetype request = FakeRequest('text/plain+test') - response = service.execute('get_mimetype_process', request, 'fakeuuid') + response = service.execute(request) self.assertEqual(response.outputs['mimetype'].data, 'text/plain+test') # non valid mimetype request = FakeRequest('text/xml') with self.assertRaises(InvalidParameterValue): - response = service.execute('get_mimetype_process', request, 'fakeuuid') + response = service.execute(request) def test_metalink(self): client = client_for(Service(processes=[create_metalink_process()])) - resp = client.get('?Request=Execute&identifier=multiple-outputs') + resp = client.get('/wps?Request=Execute&identifier=multiple-outputs') assert resp.status_code == 400 def test_missing_process_error(self): client = client_for(Service(processes=[create_ultimate_question()])) - resp = client.get('?Request=Execute&identifier=foo') + resp = client.get('/wps?Request=Execute&identifier=foo') assert resp.status_code == 400 def test_get_with_no_inputs(self): client = client_for(Service(processes=[create_ultimate_question()])) - resp = client.get('?service=wps&version=1.0.0&Request=Execute&identifier=ultimate_question') + resp = client.get('/wps?service=wps&version=1.0.0&Request=Execute&identifier=ultimate_question') assert_response_success(resp) assert get_output(resp.xml) == {'outvalue': '42'} @@ -438,11 +453,11 @@ def test_post_with_no_inputs(self): version=request['version'] ) - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert_response_success(resp) assert get_output(resp.xml) == result - resp = client.post_json(doc=request) + resp = client.post_json('/wps', doc=request) assert_response_success_json(resp, result) def test_post_with_string_input(self): @@ -466,11 +481,11 @@ def test_post_with_string_input(self): ), version=request['version'] ) - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert_response_success(resp) assert get_output(resp.xml) == result - resp = client.post_json(doc=request) + resp = client.post_json('/wps', doc=request) assert_response_success_json(resp, result) def test_bbox(self): @@ -488,7 +503,7 @@ def test_bbox(self): ), version='1.0.0' ) - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert_response_success(resp) [output] = xpath_ns(resp.xml, '/wps:ExecuteResponse' @@ -520,7 +535,7 @@ def test_bbox_rest(self): ) result = {'outbbox': bbox} - resp = client.post_json(doc=request) + resp = client.post_json('/wps', doc=request) assert_response_success_json(resp, result) def test_geojson_input_rest(self): @@ -543,7 +558,7 @@ def test_geojson_input_rest(self): ) result = {'complex': p} - resp = client.post_json(doc=request) + resp = client.post_json('/wps', doc=request) assert_response_success_json(resp, result) def test_output_response_dataType(self): @@ -558,7 +573,7 @@ def test_output_response_dataType(self): ), version='1.0.0' ) - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) el = next(resp.xml.iter('{http://www.opengis.net/wps/1.0.0}LiteralData')) assert el.attrib['dataType'] == 'string' @@ -592,7 +607,7 @@ def test_geojson_input_reference_rest(self): ) result = {'complex': p} - resp = client.post_json(doc=request) + resp = client.post_json('/wps', doc=request) assert_response_success_json(resp, result) @@ -624,7 +639,7 @@ def test_translations(self): version='1.0.0', language='fr-CA', ) - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert resp.xpath('/wps:ExecuteResponse/@xml:lang')[0] == "fr-CA" diff --git a/tests/test_filestorage.py b/tests/test_filestorage.py deleted file mode 100644 index 14e699812..000000000 --- a/tests/test_filestorage.py +++ /dev/null @@ -1,95 +0,0 @@ -################################################################## -# Copyright 2018 Open Source Geospatial Foundation and others # -# licensed under MIT, Please consult LICENSE.txt for details # -################################################################## -from pathlib import Path - -from pywps.inout.storage.file import FileStorageBuilder, FileStorage, _build_output_name -from pywps.inout.storage import STORE_TYPE -from pywps.inout.basic import ComplexOutput -from pywps.util import file_uri - -from pywps import configuration, FORMATS -from urllib.parse import urlparse - -import tempfile -import os - -import unittest - - -class FileStorageTests(unittest.TestCase): - - def setUp(self): - self.tmp_dir = tempfile.mkdtemp() - - def test_build_output_name(self): - storage = FileStorageBuilder().build() - output = ComplexOutput('testme', 'Test', supported_formats=[FORMATS.TEXT], workdir=self.tmp_dir) - output.data = "Hello World!" - output_name, suffix = _build_output_name(output) - self.assertEqual(output.file, str(Path(self.tmp_dir) / 'input.txt')) - self.assertEqual(output_name, 'input.txt') - self.assertEqual(suffix, '.txt') - - def test_store(self): - configuration.CONFIG.set('server', 'outputpath', self.tmp_dir) - storage = FileStorageBuilder().build() - output = ComplexOutput('testme', 'Test', supported_formats=[FORMATS.TEXT], workdir=self.tmp_dir) - output.data = "Hello World!" - store_type, store_str, url = storage.store(output) - - self.assertEqual(store_type, STORE_TYPE.PATH) - self.assertEqual(store_str, 'input.txt') - - with open(Path(self.tmp_dir) / store_str) as f: - self.assertEqual(f.read(), "Hello World!") - - def test_write(self): - configuration.CONFIG.set('server', 'outputpath', self.tmp_dir) - configuration.CONFIG.set('server', 'outputurl', file_uri(self.tmp_dir)) - storage = FileStorageBuilder().build() - output = ComplexOutput('testme', 'Test', supported_formats=[FORMATS.TEXT], workdir=self.tmp_dir) - output.data = "Hello World!" - url = storage.write(output.data, 'foo.txt') - - fname = Path(self.tmp_dir) / 'foo.txt' - self.assertEqual(url, file_uri(fname)) - with open(fname) as f: - self.assertEqual(f.read(), "Hello World!") - - def test_url(self): - configuration.CONFIG.set('server', 'outputpath', self.tmp_dir) - configuration.CONFIG.set('server', 'outputurl', file_uri(self.tmp_dir)) - storage = FileStorageBuilder().build() - output = ComplexOutput('testme', 'Test', supported_formats=[FORMATS.TEXT], workdir=self.tmp_dir) - output.data = "Hello World!" - output.uuid = '595129f0-1a6c-11ea-a30c-acde48001122' - url = storage.url(output) - - fname = Path(self.tmp_dir) / '595129f0-1a6c-11ea-a30c-acde48001122' / 'input.txt' - self.assertEqual(file_uri(fname), url) - - file_name = 'test.txt' - url = storage.url(file_name) - fname = Path(self.tmp_dir) / 'test.txt' - self.assertEqual(file_uri(fname), url) - - def test_location(self): - configuration.CONFIG.set('server', 'outputpath', self.tmp_dir) - storage = FileStorageBuilder().build() - file_name = 'test.txt' - loc = storage.location(file_name) - fname = Path(self.tmp_dir) / 'test.txt' - self.assertEqual(str(fname), loc) - - -def load_tests(loader=None, tests=None, pattern=None): - """Load local tests - """ - if not loader: - loader = unittest.TestLoader() - suite_list = [ - loader.loadTestsFromTestCase(FileStorageTests) - ] - return unittest.TestSuite(suite_list) diff --git a/tests/test_grass_location.py b/tests/test_grass_location.py index 2afb8c8e0..79f3fd36e 100644 --- a/tests/test_grass_location.py +++ b/tests/test_grass_location.py @@ -73,7 +73,7 @@ def test_epsg_based_location(self): version='1.0.0' ) - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert_response_success(resp) def test_file_based_location(self): @@ -94,7 +94,7 @@ def test_file_based_location(self): {'{http://www.w3.org/1999/xlink}href': href}))), version='1.0.0') - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert_response_success(resp) diff --git a/tests/test_inout.py b/tests/test_inout.py index 7755e5c72..de14540c4 100644 --- a/tests/test_inout.py +++ b/tests/test_inout.py @@ -30,7 +30,7 @@ from pywps.exceptions import InvalidParameterValue from pywps.validator.mode import MODE from pywps.inout.basic import UOM -from pywps.inout.storage.file import FileStorageBuilder +from pywps.inout.storage.database import DatabaseStorage from pywps.tests import service_ok from pywps.translations import get_translation @@ -554,9 +554,9 @@ def test_url_handler(self): 'request=GetFeature&' \ 'typename=continents&maxfeatures=2' self.complex_out.url = wfsResource - self.complex_out.storage = FileStorageBuilder().build() - url = self.complex_out.get_url() - self.assertEqual('file', urlparse(url).scheme) + self.complex_out.storage = DatabaseStorage() + url = self.complex_out.ensure_storage() + self.assertEqual('http', urlparse(url).scheme) def test_json(self): new_output = inout.outputs.ComplexOutput.from_json(self.complex_out.json) diff --git a/tests/test_ows.py b/tests/test_ows.py index cdd796ac5..914a1ba2e 100644 --- a/tests/test_ows.py +++ b/tests/test_ows.py @@ -111,7 +111,7 @@ def test_wfs(self): OWS.Identifier('output'))), version='1.0.0' ) - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert_response_success(resp) # Other things to assert: @@ -136,7 +136,7 @@ def test_wcs(self): WPS.Output( OWS.Identifier('output'))), version='1.0.0') - resp = client.post_xml(doc=request_doc) + resp = client.post_xml('/wps', doc=request_doc) assert_response_success(resp) # Other things to assert: # . the inclusion of output diff --git a/tests/test_s3storage.py b/tests/test_s3storage.py deleted file mode 100644 index e2708c9b3..000000000 --- a/tests/test_s3storage.py +++ /dev/null @@ -1,68 +0,0 @@ -################################################################## -# Copyright 2018 Open Source Geospatial Foundation and others # -# licensed under MIT, Please consult LICENSE.txt for details # -################################################################## - -from pywps.inout.storage.s3 import S3StorageBuilder, S3Storage -from pywps.inout.storage import STORE_TYPE -from pywps.inout.basic import ComplexOutput - -from pywps import configuration, FORMATS -from urllib.parse import urlparse - -import tempfile -import os - -import unittest -from unittest.mock import patch - - -class S3StorageTests(unittest.TestCase): - - def setUp(self): - self.tmp_dir = tempfile.mkdtemp() - - @patch('pywps.inout.storage.s3.S3Storage.uploadData') - def test_store(self, uploadData): - configuration.CONFIG.set('s3', 'bucket', 'notrealbucket') - configuration.CONFIG.set('s3', 'prefix', 'wps') - storage = S3StorageBuilder().build() - output = ComplexOutput('testme', 'Test', supported_formats=[FORMATS.TEXT], workdir=self.tmp_dir) - output.data = "Hello World!" - - store_type, filename, url = storage.store(output) - - called_args = uploadData.call_args[0] - - self.assertEqual(store_type, STORE_TYPE.S3) - self.assertEqual(filename, 'wps/input.txt') - - self.assertEqual(uploadData.call_count, 1) - self.assertEqual(called_args[1], 'wps/input.txt') - self.assertEqual(called_args[2], {'ContentType': 'text/plain'}) - - @patch('pywps.inout.storage.s3.S3Storage.uploadData') - def test_write(self, uploadData): - configuration.CONFIG.set('s3', 'bucket', 'notrealbucket') - configuration.CONFIG.set('s3', 'prefix', 'wps') - storage = S3StorageBuilder().build() - - url = storage.write('Bar Baz', 'out.txt', data_format=FORMATS.TEXT) - - called_args = uploadData.call_args[0] - - self.assertEqual(uploadData.call_count, 1) - self.assertEqual(called_args[0], 'Bar Baz') - self.assertEqual(called_args[1], 'wps/out.txt') - self.assertEqual(called_args[2], {'ContentType': 'text/plain'}) - - -def load_tests(loader=None, tests=None, pattern=None): - """Load local tests - """ - if not loader: - loader = unittest.TestLoader() - suite_list = [ - loader.loadTestsFromTestCase(S3StorageTests) - ] - return unittest.TestSuite(suite_list) diff --git a/tests/test_storage.py b/tests/test_storage.py deleted file mode 100644 index 309f65755..000000000 --- a/tests/test_storage.py +++ /dev/null @@ -1,54 +0,0 @@ -################################################################## -# Copyright 2018 Open Source Geospatial Foundation and others # -# licensed under MIT, Please consult LICENSE.txt for details # -################################################################## -import pytest - -from pywps.inout.storage.builder import StorageBuilder -from pywps.inout.storage.file import FileStorage -from pywps.inout.storage.s3 import S3Storage - -from pywps import configuration - -from pathlib import Path -import unittest -import tempfile - - -@pytest.fixture -def fake_output(tmp_path): - class FakeOutput(object): - """Fake output object for testing.""" - def __init__(self): - self.identifier = "fake_output" - self.file = self._get_file() - self.uuid = None - - def _get_file(self): - fn = tmp_path / 'file.tiff' - fn.touch() - return str(fn.absolute()) - - return FakeOutput() - - -class TestStorageBuilder(): - - def test_default_storage(self): - storage = StorageBuilder.buildStorage() - assert isinstance(storage, FileStorage) - - def test_s3_storage(self): - configuration.CONFIG.set('server', 'storagetype', 's3') - storage = StorageBuilder.buildStorage() - assert isinstance(storage, S3Storage) - - def test_recursive_directory_creation(self, fake_output): - """Test that outputpath is created.""" - configuration.CONFIG.set('server', 'storagetype', 'file') - outputpath = Path(tempfile.gettempdir()) / "a" / "b" / "c" - configuration.CONFIG.set('server', 'outputpath', str(outputpath)) - storage = StorageBuilder.buildStorage() - - storage.store(fake_output) - assert outputpath.exists() diff --git a/tests/test_storage_database.py b/tests/test_storage_database.py new file mode 100644 index 000000000..f802206a9 --- /dev/null +++ b/tests/test_storage_database.py @@ -0,0 +1,59 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## +import pytest + +# Needed to create database in memory +from pywps import configuration +from pywps.inout.storage import new_storage, get_storage_instance +from pywps.inout.storage.database import DatabaseStorage + + +class TestStorageBuilder(): + + def test_default_storage(self): + storage = new_storage("DatabaseStorage") + assert isinstance(storage, DatabaseStorage) + + def test_store_database_storage_binary(self): + storage = new_storage("DatabaseStorage") + + # Write some data + with storage.open("w") as f: + f.write(b'somedata') + + # Read data from storage + with storage.open("r") as f: + data = f.read() + assert data == b'somedata' + + # Retreive data from database + xstorage = get_storage_instance(storage.uuid) + assert isinstance(xstorage, DatabaseStorage) + + # Read data from storage + with xstorage.open("r") as f: + data = f.read() + assert data == b'somedata' + + def test_store_database_storage_string(self): + storage = new_storage("DatabaseStorage") + + # Write some data + with storage.open("w", "utf-8") as f: + f.write('somedata') + + # Read data from storage + with storage.open("r", "utf-8") as f: + data = f.read() + assert data == 'somedata' + + # Retreive data from database + xstorage = get_storage_instance(storage.uuid) + assert isinstance(xstorage, DatabaseStorage) + + # Read data from storage + with xstorage.open("r", "utf-8") as f: + data = f.read() + assert data == 'somedata' diff --git a/tests/test_storage_file.py b/tests/test_storage_file.py new file mode 100644 index 000000000..88146204b --- /dev/null +++ b/tests/test_storage_file.py @@ -0,0 +1,59 @@ +################################################################## +# Copyright 2018 Open Source Geospatial Foundation and others # +# licensed under MIT, Please consult LICENSE.txt for details # +################################################################## +import pytest + +# Needed to load default configuration +from pywps import configuration +from pywps.inout.storage import new_storage, get_storage_instance +from pywps.inout.storage.file import FileStorage + + +class TestFileStorageBuilder(): + + def test_default_storage(self): + storage = new_storage("FileStorage") + assert isinstance(storage, FileStorage) + + def test_store_database_storage_binary(self): + storage = new_storage("FileStorage") + + # Write some data + with storage.open("w") as f: + f.write(b'somedata') + + # Read data from storage + with storage.open("r") as f: + data = f.read() + assert data == b'somedata' + + # Retreive data from database + xstorage = get_storage_instance(storage.uuid) + assert isinstance(xstorage, FileStorage) + + # Read data from storage + with xstorage.open("r") as f: + data = f.read() + assert data == b'somedata' + + def test_store_database_storage_string(self): + storage = new_storage("FileStorage") + + # Write some data + with storage.open("w", "utf-8") as f: + f.write('somedata') + + # Read data from storage + with storage.open("r", "utf-8") as f: + data = f.read() + assert data == 'somedata' + + # Retreive data from database + xstorage = get_storage_instance(storage.uuid) + assert isinstance(xstorage, FileStorage) + + # Read data from storage + with xstorage.open("r", "utf-8") as f: + data = f.read() + assert data == 'somedata' diff --git a/tests/test_wpsrequest.py b/tests/test_wpsrequest.py index 2adc00a35..73e25e65a 100644 --- a/tests/test_wpsrequest.py +++ b/tests/test_wpsrequest.py @@ -8,6 +8,7 @@ import tempfile import datetime import json +import uuid from owslib.crs import Crs from pywps.inout.literaltypes import AnyValue @@ -27,6 +28,8 @@ def setUp(self): def test_json_in(self): obj = { + 'uuid': str(uuid.uuid1()), + 'is_async': False, 'operation': 'getcapabilities', 'version': '1.0.0', 'language': 'eng', @@ -57,8 +60,7 @@ def test_json_in(self): 'raw': False } - self.request = WPSRequest() - self.request.json = obj + self.request = WPSRequest(json=obj) self.assertEqual(self.request.inputs['myliteral'][0].data, 1, 'Data are in the file') self.assertEqual(self.request.inputs['myin'][0].data, 'ahoj', 'Data are in the file') @@ -67,6 +69,8 @@ def test_json_in(self): def test_json_inout_datetime(self): obj = { + 'uuid': str(uuid.uuid1()), + 'is_async': False, 'operation': 'getcapabilities', 'version': '1.0.0', 'language': 'eng', @@ -102,8 +106,7 @@ def test_json_inout_datetime(self): 'raw': False } - self.request = WPSRequest() - self.request.json = obj + self.request = WPSRequest(json=obj) self.assertEqual(self.request.inputs['datetime'][0].data, datetime.datetime(2017, 4, 20, 12), 'Datatime set') self.assertEqual(self.request.inputs['date'][0].data, datetime.date(2017, 4, 20), 'Data set') @@ -119,6 +122,8 @@ def test_json_inout_datetime(self): def test_json_inout_bbox(self): obj = { + 'uuid': str(uuid.uuid1()), + 'is_async': False, 'operation': 'getcapabilities', 'version': '1.0.0', 'language': 'eng', @@ -142,8 +147,7 @@ def test_json_inout_bbox(self): 'raw': False } - self.request = WPSRequest() - self.request.json = obj + self.request = WPSRequest(json=obj) self.assertEqual(self.request.inputs['bbox'][0].data, [6.117602, 46.176194, 6.22283, 46.275832], 'BBox data set') self.assertTrue(isinstance(self.request.inputs['bbox'][0].crs, str), 'CRS is a string')