diff --git a/requirements_dev.txt b/requirements_dev.txt index fdeef3c..56cecc2 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,2 +1,2 @@ -moto[all] +moto[all,server] pytest diff --git a/taskqueue/taskqueue.py b/taskqueue/taskqueue.py index c8b68b9..8df9542 100644 --- a/taskqueue/taskqueue.py +++ b/taskqueue/taskqueue.py @@ -14,7 +14,7 @@ import sys import gevent.pool -import multiprocessing as mp +import multiprocess as mp # used by pathos.pools.ProcessPool import numpy as np import pathos.pools from tqdm import tqdm @@ -513,8 +513,6 @@ def soloprocess_upload(QueueClass, queue_name, tasks): tq = QueueClass(queue_name, progress=False) return tq.insert(tasks, skip_insert_counter=True) -error_queue = mp.Queue() - def multiprocess_upload(QueueClass, queue_name, tasks, parallel=True, total=None): if parallel is True: parallel = mp.cpu_count() @@ -570,6 +568,7 @@ def capturing_soloprocess_upload(*args, **kwargs): # Don't fork, spawn entirely new processes. This # avoids accidental deadlocks. mp.set_start_method("spawn", force=True) + error_queue = mp.Manager().Queue() ct = 0 with tqdm(desc="Upload", total=total) as pbar: diff --git a/test/test_taskqueue.py b/test/test_taskqueue.py index 38e1080..203de1d 100644 --- a/test/test_taskqueue.py +++ b/test/test_taskqueue.py @@ -4,7 +4,6 @@ import time from moto import mock_aws - from six.moves import range import pytest @@ -18,7 +17,7 @@ from taskqueue.queueables import totask from taskqueue.queueablefns import tofunc, UnregisteredFunctionError, func2task -@pytest.fixture(scope='function') +@pytest.fixture(scope='session') def aws_credentials(): """Mocked AWS Credentials for moto.""" os.environ['AWS_ACCESS_KEY_ID'] = 'testing' @@ -35,6 +34,17 @@ def sqs(aws_credentials): client.create_queue(QueueName='test-pull-queue') yield client +@pytest.fixture(scope='session') +def sqs_server(aws_credentials): + from moto.server import ThreadedMotoServer + server = ThreadedMotoServer(port=0) + server.start() + host, port = server.get_host_and_port() + os.environ['AWS_ENDPOINT_URL'] = f"http://{host}:{port}" + yield + del os.environ['AWS_ENDPOINT_URL'] + server.stop() + QURLS = { 'sqs': 'test-pull-queue', 'fq': '/tmp/removeme/taskqueue/fq', @@ -269,7 +279,7 @@ def test_local_taskqueue(): assert tq.insert(epts) == 200 @pytest.mark.parametrize('protocol', PROTOCOL) -def test_parallel_insert_all(sqs, protocol): +def test_parallel_insert_all(sqs, sqs_server, protocol): import pathos_issue path = getpath(protocol)