Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements_dev.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
moto[all]
moto[all,server]
pytest
5 changes: 2 additions & 3 deletions taskqueue/taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import sys

import gevent.pool
import multiprocessing as mp
import multiprocess as mp # used by pathos.pools.ProcessPool
import numpy as np
import pathos.pools
from tqdm import tqdm
Expand Down Expand Up @@ -513,8 +513,6 @@ def soloprocess_upload(QueueClass, queue_name, tasks):
tq = QueueClass(queue_name, progress=False)
return tq.insert(tasks, skip_insert_counter=True)

error_queue = mp.Queue()

def multiprocess_upload(QueueClass, queue_name, tasks, parallel=True, total=None):
if parallel is True:
parallel = mp.cpu_count()
Expand Down Expand Up @@ -570,6 +568,7 @@ def capturing_soloprocess_upload(*args, **kwargs):
# Don't fork, spawn entirely new processes. This
# avoids accidental deadlocks.
mp.set_start_method("spawn", force=True)
error_queue = mp.Manager().Queue()

ct = 0
with tqdm(desc="Upload", total=total) as pbar:
Expand Down
16 changes: 13 additions & 3 deletions test/test_taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time

from moto import mock_aws

from six.moves import range
import pytest

Expand All @@ -18,7 +17,7 @@
from taskqueue.queueables import totask
from taskqueue.queueablefns import tofunc, UnregisteredFunctionError, func2task

@pytest.fixture(scope='function')
@pytest.fixture(scope='session')
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
Expand All @@ -35,6 +34,17 @@ def sqs(aws_credentials):
client.create_queue(QueueName='test-pull-queue')
yield client

@pytest.fixture(scope='session')
def sqs_server(aws_credentials):
from moto.server import ThreadedMotoServer
server = ThreadedMotoServer(port=0)
server.start()
host, port = server.get_host_and_port()
os.environ['AWS_ENDPOINT_URL'] = f"http://{host}:{port}"
yield
del os.environ['AWS_ENDPOINT_URL']
server.stop()

QURLS = {
'sqs': 'test-pull-queue',
'fq': '/tmp/removeme/taskqueue/fq',
Expand Down Expand Up @@ -269,7 +279,7 @@ def test_local_taskqueue():
assert tq.insert(epts) == 200

@pytest.mark.parametrize('protocol', PROTOCOL)
def test_parallel_insert_all(sqs, protocol):
def test_parallel_insert_all(sqs, sqs_server, protocol):
import pathos_issue

path = getpath(protocol)
Expand Down
Loading