concurrency_utils
Useful components related to asyncio, multithreading or multiprocessing.
Module
Functions
async_lock
async def async_lock( lock: threading.Lock, lock_name: str, polling_timeout: Union[float, Literal[_NoPolling.NO_POLLING]] = 5,) ‑> collections.abc.AsyncGenerator[None, None]:
Context manager to acquire a threading.Lock from the async event loop.
Avoids blocking the async event loop.
Avoids saturation of the ThreadPoolExecutor by limiting thread run times (via
polling_timeout
) to enable to control to return to this function and to
reschedule the wait so other thread-based tasks get a chance to run. Control
is returned to here every polling_timeout
seconds and allows asyncio cancellation
to avoid hanging threads.
If polling_timeout
is NO_POLLING
, then the thread will wait indefinitely.
Handles releasing the lock at the end of the context.
Arguments
lock
: The threading.Lock to acquire.lock_name
: The name of the lock to acquire, used for debugging.polling_timeout
: How frequently to return control to this function from the thread execution. If the lock is still not acquired, the thread execution is rescheduled. Useful for handling cancellation events as it places a bound on the time a thread can run.
asyncnullcontext
async def asyncnullcontext() ‑> collections.abc.AsyncGenerator[None, None]:
Async version of contextlib.nullcontext().
await_event_with_stop
async def await_event_with_stop( wait_event: threading.Event, stop_event: threading.Event, wait_event_name: str, polling_timeout: float = 5,) ‑> bool:
Helper function that waits on a Threading Event but allows exiting early.
Avoids blocking the async event loop.
Monitors the wait_event but with a polling timeout, so it can periodically check if it should stop early.
Arguments
wait_event
: The event to wait to be set.stop_event
: An event indicating we should stop early.wait_event_name
: The name of the wait event, used for debugging.polling_timeout
: The amount of time in seconds between checks for whether the stop_event has been set.
Returns
True
if the wait_event
has been set, False
if the stop_event
is
used to cancel waiting before the wait_event
is set.
await_threading_event
async def await_threading_event( event: threading.Event, event_name: str, timeout: Optional[float] = None, polling_timeout: Union[float, Literal[_NoPolling.NO_POLLING]] = 5,) ‑> bool:
Event.wait() that doesn't block the event loop.
Avoids saturation of the ThreadPoolExecutor by either limiting run time directly
(via timeout
) or by occassionally rescheduling the wait so other thread-based
tasks get a chance to run (via polling_timeout
).
If both timeout
and polling_timeout
are supplied, timeout
takes precedence
and no polling/rescheduling will be performed.
If polling_timeout
is set to NO_POLLING
then the thread will wait indefinitely.
This should be used rarely and with caution as it may leave either hanging threads
or saturate all threads of the ThreadPoolExecutor.
Arguments
event
: The event to wait on.event_name
: The name of the event, used in debug logging.timeout
: The timeout to wait for the event to be set.polling_timeout
: How frequently to return control to this function from the thread execution. If the event is still not set, the thread execution is rescheduled. Useful for handling cancellation events as it places a bound on the time a thread can run.
Returns
True
if the event is set within any timeout constraints specified,
False
otherwise.
Classes
ThreadWithException
class ThreadWithException( group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None,):
A thread subclass that captures exceptions to be reraised in the main thread.
This constructor should always be called with keyword arguments. Arguments are:
group should be None; reserved for future extension when a ThreadGroup class is implemented.
target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.
name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.
args is the argument tuple for the target invocation. Defaults to ().
kwargs is a dictionary of keyword arguments for the target invocation. Defaults to .
If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.
Ancestors
Methods
join
def join(self, timeout: Optional[float] = None) ‑> None:
See parent method for documentation.
If an exception occurs in the joined thread it will be reraised in the calling thread.
run
def run(self) ‑> None:
See parent method for documentation.
Captures exceptions raised during the run call and stores them as an attribute.