diff --git a/st2common/st2common/util/concurrency.py b/st2common/st2common/util/concurrency.py index 150d088036..f064be4292 100644 --- a/st2common/st2common/util/concurrency.py +++ b/st2common/st2common/util/concurrency.py @@ -140,6 +140,16 @@ def get_greenlet_exit_exception_class(): raise ValueError("Unsupported concurrency library") +def get_default_green_pool_size(): + if CONCURRENCY_LIBRARY == "eventlet": + return eventlet.wsgi.DEFAULT_MAX_SIMULTANEOUS_REQUESTS + elif CONCURRENCY_LIBRARY == "gevent": + # matches what DEFAULT_MAX_SIMULTANEOUS_REQUESTS is for eventlet + return 1024 + else: + raise ValueError("Unsupported concurrency library") + + def get_green_pool_class(): if CONCURRENCY_LIBRARY == "eventlet": return eventlet.GreenPool @@ -173,3 +183,15 @@ def green_pool_wait_all(pool): return all(gl.ready() for gl in pool.greenlets) else: raise ValueError("Unsupported concurrency library") + +def listen_server(host, port): + if CONCURRENCY_LIBRARY == "eventlet": + return eventlet.listen((host, port)) + elif CONCURRENCY_LIBRARY == "gevent": + import socket + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind((host, port)) + return sock.listen(5) + else: + raise ValueError("Unsupported concurrency library") + diff --git a/st2common/st2common/util/monkey_patch.py b/st2common/st2common/util/monkey_patch.py index 2694d3d785..c852a48b2c 100644 --- a/st2common/st2common/util/monkey_patch.py +++ b/st2common/st2common/util/monkey_patch.py @@ -57,8 +57,11 @@ def monkey_patch(patch_thread=None): os=True, select=True, socket=True, thread=patch_thread, time=True ) elif concurrency_library == "gevent": - # TODO: support gevent.patch_all if .concurrency.CONCURRENCY_LIBRARY = "gevent" - raise NotImplementedError + # Match what eventlet was enabling, fallback on gevent defaults + import gevent + gevent.monkey.patch_all( + os=True, select=True, thread=True, time=True, socket=True + ) else: raise RuntimeError(f"Unsupported concurrency library {concurrency_library}") diff --git a/st2stream/st2stream/cmd/api.py b/st2stream/st2stream/cmd/api.py index 3de0c89f03..08caa0a2bd 100644 --- a/st2stream/st2stream/cmd/api.py +++ b/st2stream/st2stream/cmd/api.py @@ -21,6 +21,7 @@ import sys import eventlet + from oslo_config import cfg from eventlet import wsgi @@ -29,6 +30,7 @@ from st2common.service_setup import teardown as common_teardown from st2common.service_setup import deregister_service from st2common.stream.listener import get_listener_if_set +from st2common.util import concurrency from st2common.util.wsgi import shutdown_server_kill_pending_requests from st2stream.signal_handlers import register_stream_signal_handlers from st2stream import config @@ -40,13 +42,13 @@ __all__ = ["main"] -eventlet.monkey_patch( - os=True, - select=True, - socket=True, - thread=False if "--use-debugger" in sys.argv else True, - time=True, -) +#eventlet.monkey_patch( +# os=True, +# select=True, +# socket=True, +# thread=False if "--use-debugger" in sys.argv else True, +# time=True, +#) LOG = logging.getLogger(__name__) STREAM = "stream" @@ -83,9 +85,9 @@ def _run_server(): "(PID=%s) ST2 Stream API is serving on http://%s:%s.", os.getpid(), host, port ) - max_pool_size = eventlet.wsgi.DEFAULT_MAX_SIMULTANEOUS_REQUESTS - worker_pool = eventlet.GreenPool(max_pool_size) - sock = eventlet.listen((host, port)) + max_pool_size = concurrency.get_default_green_pool_size() + worker_pool = concurrency.get_green_pool_class()(max_pool_size) + sock = concurrency.listen_server(host, port) def queue_shutdown(signal_number, stack_frame): deregister_service(STREAM)