Skip to main content

concurrency_utils

Useful components related to asyncio, multithreading or multiprocessing.

Module

Functions

async_lock

async def async_lock(    lock: threading.Lock,    lock_name: str,    polling_timeout: Union[float, Literal[_NoPolling.NO_POLLING]] = 5,)> collections.abc.AsyncGenerator:

Context manager to acquire a threading.Lock from the async event loop.

Avoids blocking the async event loop.

Avoids saturation of the ThreadPoolExecutor by limiting thread run times (via polling_timeout) to enable to control to return to this function and to reschedule the wait so other thread-based tasks get a chance to run. Control is returned to here every polling_timeout seconds and allows asyncio cancellation to avoid hanging threads.

If polling_timeout is NO_POLLING, then the thread will wait indefinitely.

Handles releasing the lock at the end of the context.

Arguments

  • lock: The threading.Lock to acquire.
  • lock_name: The name of the lock to acquire, used for debugging.
  • polling_timeout: How frequently to return control to this function from the thread execution. If the lock is still not acquired, the thread execution is rescheduled. Useful for handling cancellation events as it places a bound on the time a thread can run.

asyncnullcontext

async def asyncnullcontext()> collections.abc.AsyncGenerator:

Async version of contextlib.nullcontext().

await_event_with_stop

async def await_event_with_stop(    wait_event: threading.Event,    stop_event: threading.Event,    wait_event_name: str,    polling_timeout: float = 5,)> bool:

Helper function that waits on a Threading Event but allows exiting early.

Avoids blocking the async event loop.

Monitors the wait_event but with a polling timeout, so it can periodically check if it should stop early.

Arguments

  • wait_event: The event to wait to be set.
  • stop_event: An event indicating we should stop early.
  • wait_event_name: The name of the wait event, used for debugging.
  • polling_timeout: The amount of time in seconds between checks for whether the stop_event has been set.

Returns True if the wait_event has been set, False if the stop_event is used to cancel waiting before the wait_event is set.

await_threading_event

async def await_threading_event(    event: threading.Event,    event_name: str,    timeout: Optional[float] = None,    polling_timeout: Union[float, Literal[_NoPolling.NO_POLLING]] = 5,)> bool:

Event.wait() that doesn't block the event loop.

Avoids saturation of the ThreadPoolExecutor by either limiting run time directly (via timeout) or by occassionally rescheduling the wait so other thread-based tasks get a chance to run (via polling_timeout).

If both timeout and polling_timeout are supplied, timeout takes precedence and no polling/rescheduling will be performed.

If polling_timeout is set to NO_POLLING then the thread will wait indefinitely. This should be used rarely and with caution as it may leave either hanging threads or saturate all threads of the ThreadPoolExecutor.

Arguments

  • event: The event to wait on.
  • event_name: The name of the event, used in debug logging.
  • timeout: The timeout to wait for the event to be set.
  • polling_timeout: How frequently to return control to this function from the thread execution. If the event is still not set, the thread execution is rescheduled. Useful for handling cancellation events as it places a bound on the time a thread can run.

Returns True if the event is set within any timeout constraints specified, False otherwise.

Classes

ThreadWithException

class ThreadWithException(    group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None,):

A thread subclass that captures exceptions to be reraised in the main thread.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form "Thread-N" where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to .

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.init()) before doing anything else to the thread.

Methods


join

def join(self, timeout: Optional[float] = None)> None:

See parent method for documentation.

If an exception occurs in the joined thread it will be reraised in the calling thread.

run

def run(self)> None:

See parent method for documentation.

Captures exceptions raised during the run call and stores them as an attribute.