-
Notifications
You must be signed in to change notification settings - Fork 37
Thread limit introspection API, part 1: API scope #213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
28bd246
128c4ef
8637248
6946867
31bbcf5
f27e6f2
0d4dde5
60dd11c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| """ | ||
| Tests for get/set number of threads API introspection. | ||
| """ | ||
|
|
||
| from threading import local as threadlocal | ||
|
|
||
| import pytest | ||
|
|
||
| from threadpoolctl import _APIScope, _determine_api_scope | ||
|
|
||
|
|
||
| class FakeThreadLocalAPI(threadlocal): | ||
| """Thread-local num threads setting API.""" | ||
|
|
||
| def get(self) -> int: | ||
| return getattr(self, "num_threads", 17) | ||
|
|
||
| def set(self, n: int) -> None: | ||
| self.num_threads = n | ||
|
|
||
|
|
||
| class FakeProcesswideAPI: | ||
| """Process-wide num threads setting API.""" | ||
|
|
||
| def __init__(self, num_threads: int): | ||
| self.num_threads = num_threads | ||
|
|
||
| def get(self) -> int: | ||
| return self.num_threads | ||
|
|
||
| def set(self, n: int) -> None: | ||
| self.num_threads = n | ||
|
|
||
|
|
||
| def test_determine_api_scope_thread_local(): | ||
| """ | ||
| Check ``_determine_api_scope()`` can correctly diagnose a trivial | ||
| thread-local implementation. | ||
| """ | ||
| api = FakeThreadLocalAPI() | ||
| assert _determine_api_scope(api.get, api.set) == _APIScope.CURRENT_THREAD | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("default", [1, 17]) | ||
| def test_determine_api_scope_processiwde(default: int): | ||
| """ | ||
| Check ``determine_api_scope()`` can correctly diagnose a trivial | ||
| process-wide implementation. | ||
| """ | ||
| api = FakeProcesswideAPI(default) | ||
| assert _determine_api_scope(api.get, api.set) == _APIScope.PROCESS |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,12 +17,14 @@ | |
| import ctypes | ||
| import itertools | ||
| import textwrap | ||
| from typing import final | ||
| from threading import Thread | ||
| from typing import Callable, final | ||
| import warnings | ||
| from ctypes.util import find_library | ||
| from abc import ABC, abstractmethod | ||
| from functools import lru_cache | ||
| from contextlib import ContextDecorator | ||
| from enum import Enum, auto | ||
|
|
||
| __version__ = "3.7.0.dev0" | ||
| __all__ = [ | ||
|
|
@@ -69,6 +71,89 @@ class _dl_phdr_info(ctypes.Structure): | |
| _RTLD_NOLOAD = ctypes.DEFAULT_MODE | ||
|
|
||
|
|
||
| class _APIScope(Enum): | ||
| """ | ||
| What scope does the API affect. | ||
| """ | ||
|
|
||
| # Using the API sets a limit only on the current thread. | ||
| CURRENT_THREAD = auto() | ||
| # Using the API sets a limit for every thread in the process; whether or | ||
| # not it's a shared process-wide pool or per-thread limit needs to be | ||
| # determined some other way. | ||
| PROCESS = auto() | ||
| # Something else, unexpected; perhaps another variant, perhaps information | ||
| # can't be determined under the current configuration. | ||
| UNKNOWN = auto() | ||
|
|
||
|
|
||
| def _determine_api_scope( | ||
| get_n_threads: Callable[[], int], set_n_threads: Callable[[int], None] | ||
| ) -> _APIScope: | ||
| """ | ||
| Run some experiments to determine the scope of the given get/set API. | ||
|
|
||
| An attempt will be made to restore all settings to their previous state. | ||
|
|
||
| This won't work if you only have one core available. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this function can temporarily mutate global process state, it is not thread safe and can have a side effect on concurrently running code that use the libraries being dynamically inspected. I think we should document those limitations explicitly. |
||
| """ | ||
| if os.cpu_count() == 1 or ( | ||
| hasattr(os, "process_cpu_count") and os.process_cpu_count() == 1 | ||
| ): | ||
| raise RuntimeError("Cannot determine API meaning if only one core is available") | ||
|
|
||
| previous = get_n_threads() | ||
|
|
||
| # Some plausible constraints we need to keep in mind: | ||
| # | ||
| # 1. The API might not allow setting more than the number of (available, or | ||
| # physical) cores. | ||
| # 2. ... | ||
| try: | ||
| # Choose a desired number of threads that is different than the current | ||
| # number, and hopefully achievable under the current configuration: | ||
| if previous < 2: | ||
| expected = 2 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we safely assume that all runtimes allow for growing the number of threads even if externally limited, e.g. by some environment variables? I tried with libomp and openblas + pthreads and it seems that I can use |
||
| else: | ||
| # It's 2 or more, so shrink it slightly: | ||
| expected = previous - 1 | ||
|
|
||
| thread_result = [] | ||
|
|
||
| def get_and_set() -> None: | ||
| set_n_threads(expected) | ||
| thread_result.append(get_n_threads()) | ||
|
|
||
| thread = Thread(target=get_and_set) | ||
| thread.start() | ||
| thread.join() | ||
|
|
||
| # First, getting in the same thread as a set should always give same | ||
| # number, if it's a number in a reasonable range. A possible exception | ||
| # is if the number of thread is limited by available CPU, and only one | ||
| # CPU is available. In that case we can't empirically determine how the | ||
| # API works. We try to not reach that point here, but you can imagine a | ||
| # thread pool implementation that is aware of cgroups, in which case a | ||
| # Docker container limited to one core will pass the safety check at | ||
| # the start of the function. Perhaps cpu_count() from loky should be | ||
| # moved into this package... | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree this will be the best solution, but we can wait until the need actually arises. |
||
| if thread_result != [expected]: | ||
| return _APIScope.UNKNOWN | ||
|
|
||
| # Now, check this thread: | ||
| if get_n_threads() == expected: | ||
| # Setting modified this thread's results too: | ||
| return _APIScope.PROCESS | ||
| elif get_n_threads() == previous: | ||
| # Setting modified the other thread, but not this one: | ||
| return _APIScope.CURRENT_THREAD | ||
| else: | ||
| # No idea what's going on: | ||
| return _APIScope.UNKNOWN | ||
| finally: | ||
| set_n_threads(previous) | ||
|
|
||
|
|
||
| class LibController(ABC): | ||
| """Abstract base class for the individual library controllers | ||
|
|
||
|
|
@@ -123,6 +208,11 @@ def info(self): | |
| "user_api": self.user_api, | ||
| "internal_api": self.internal_api, | ||
| "num_threads": self.num_threads, | ||
| "api_scope": ( | ||
| _determine_api_scope( | ||
| self.get_num_threads, self.set_num_threads | ||
| ).name.lower() | ||
| ), | ||
| **{k: v for k, v in vars(self).items() if k not in hidden_attrs}, | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the value of
expected_api_scopealso depends on the current threading layer of each BLAS runtime?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this was just an initial sketch to see what happens when it runs in CI.