Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions st2common/st2common/util/concurrency.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ def get_greenlet_exit_exception_class():
raise ValueError("Unsupported concurrency library")


def get_default_green_pool_size():
if CONCURRENCY_LIBRARY == "eventlet":
return eventlet.wsgi.DEFAULT_MAX_SIMULTANEOUS_REQUESTS
elif CONCURRENCY_LIBRARY == "gevent":
# matches what DEFAULT_MAX_SIMULTANEOUS_REQUESTS is for eventlet
return 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably avoid hard coded magic numbers and put this in a constants file. Perhaps https://github.com/StackStorm/st2/blob/master/st2common/st2common/constants/system.py or start a concurrency.py file?

else:
raise ValueError("Unsupported concurrency library")


def get_green_pool_class():
if CONCURRENCY_LIBRARY == "eventlet":
return eventlet.GreenPool
Expand Down Expand Up @@ -173,3 +183,15 @@ def green_pool_wait_all(pool):
return all(gl.ready() for gl in pool.greenlets)
else:
raise ValueError("Unsupported concurrency library")

def listen_server(host, port):
if CONCURRENCY_LIBRARY == "eventlet":
return eventlet.listen((host, port))
elif CONCURRENCY_LIBRARY == "gevent":
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host, port))
return sock.listen(5)
else:
raise ValueError("Unsupported concurrency library")

7 changes: 5 additions & 2 deletions st2common/st2common/util/monkey_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ def monkey_patch(patch_thread=None):
os=True, select=True, socket=True, thread=patch_thread, time=True
)
elif concurrency_library == "gevent":
# TODO: support gevent.patch_all if .concurrency.CONCURRENCY_LIBRARY = "gevent"
raise NotImplementedError
# Match what eventlet was enabling, fallback on gevent defaults
import gevent
gevent.monkey.patch_all(
os=True, select=True, thread=True, time=True, socket=True
)
else:
raise RuntimeError(f"Unsupported concurrency library {concurrency_library}")

Expand Down
22 changes: 12 additions & 10 deletions st2stream/st2stream/cmd/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import sys

import eventlet

from oslo_config import cfg
from eventlet import wsgi

Expand All @@ -29,6 +30,7 @@
from st2common.service_setup import teardown as common_teardown
from st2common.service_setup import deregister_service
from st2common.stream.listener import get_listener_if_set
from st2common.util import concurrency
from st2common.util.wsgi import shutdown_server_kill_pending_requests
from st2stream.signal_handlers import register_stream_signal_handlers
from st2stream import config
Expand All @@ -40,13 +42,13 @@
__all__ = ["main"]


eventlet.monkey_patch(
os=True,
select=True,
socket=True,
thread=False if "--use-debugger" in sys.argv else True,
time=True,
)
#eventlet.monkey_patch(
# os=True,
# select=True,
# socket=True,
# thread=False if "--use-debugger" in sys.argv else True,
# time=True,
#)

LOG = logging.getLogger(__name__)
STREAM = "stream"
Expand Down Expand Up @@ -83,9 +85,9 @@ def _run_server():
"(PID=%s) ST2 Stream API is serving on http://%s:%s.", os.getpid(), host, port
)

max_pool_size = eventlet.wsgi.DEFAULT_MAX_SIMULTANEOUS_REQUESTS
worker_pool = eventlet.GreenPool(max_pool_size)
sock = eventlet.listen((host, port))
max_pool_size = concurrency.get_default_green_pool_size()
worker_pool = concurrency.get_green_pool_class()(max_pool_size)
sock = concurrency.listen_server(host, port)

def queue_shutdown(signal_number, stack_frame):
deregister_service(STREAM)
Expand Down