Skip to main content

base

Pod communication protocols.

These classes take an algorithm and are responsible for organising the communication between Pods and Modeller.

Attributes: registry: A read-only dictionary of protocol factory names to their implementation classes.

Current message flow between the workers with 2 batches and modeller for streaming batched execution:

  1. Workers send NUM_BATCHES=-1 to Modeller.
  2. Workers send CURRENT_BATCH_ID=1 to Modeller.
  3. Modeller states it's processing batch 1, Worker send a TASK_START to Modeller.
  4. Modeller waits for all Pods to be ready.
  5. Modeller sends TASK_START to Worker.
  6. Workers run batch 1.
  7. Workers send EVALUATION_RESULTS to Modeller.
  8. Workers send CURRENT_BATCH_ID=2 to Modeller.
  9. Workers run batch 2 (final), Modeller states it's processing batch 2.
  10. Workers send EVALUATION_RESULTS to Modeller.
  11. Workers send BATCHES_COMPLETE("BATCHES_ONLY") to Modeller.
  12. Worker runs resilience phase [if enabled]
  13. Workers send BATCHES_COMPLETE("TASK_COMPLETE") to Modeller, modeller exits streaming loop.
  14. Modeller sends TASK_COMPLETE to Workers.

Message flow for single-batch execution:

  1. Workers send NUM_BATCHES=1 to Modeller.
  2. Modeller waits for all Pods to be ready.
  3. Modeller sends TASK_START to Worker.
  4. Workers run their protocol (including run_final_step if FinalStepProtocol).
  5. Workers send BATCHES_COMPLETE("TASK_COMPLETE") to Modeller.
  6. Modeller waits for all Workers to send completion signals.
  7. Modeller sends TASK_COMPLETE to Workers.

Batched execution is currently only supported for single worker cases, but for future reference, in the case where multiple workers are expected to execute the current task, the modeller will choose to display to the user the current batch id corresponding to the slowest user.

Classes

BaseCompatibleAlgoFactoryWorkerHubNeeded

class BaseCompatibleAlgoFactoryWorkerHubNeeded(*args, **kwargs):

Protocol defining base algorithm factory compatibility.

For the case where the worker() call explicitly needs a hub instance.

Ancestors

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleAlgoFactory
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleHuggingFaceAlgoFactory
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleModelAlgoFactory
  • bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleHuggingFaceAlgoFactory
  • bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleModelAlgoFactory
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleModelAlgoFactory

Methods


worker

def worker(    self, *, hub: BitfountHub, context: ProtocolContext, **kwargs: Any,)> T_WorkerSide:

Worker-side of the algorithm.

BaseCompatibleAlgoFactoryWorkerStandard

class BaseCompatibleAlgoFactoryWorkerStandard(*args, **kwargs):

Protocol defining base algorithm factory compatibility.

For the case where the worker() call has no explicit requirements.

Ancestors

Subclasses

  • bitfount.federated.protocols.modelprotocols.inference_csv_report._InferenceAndCSVReportCompatibleAlgoFactory
  • bitfount.federated.protocols.modelprotocols.inference_image_output._InferenceAndImageOutputCompatibleAlgoFactory
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleNonModelAlgoFactory

Methods


worker

def worker(self, *, context: ProtocolContext, **kwargs: Any)> +T_WorkerSide:

Worker-side of the algorithm.

BaseCompatibleModellerAlgorithm

class BaseCompatibleModellerAlgorithm(*args, **kwargs):

Protocol defining base modeller-side algorithm compatibility.

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleModeller
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleModellerAlgorithm
  • bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleModellerAlgorithm
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleModellerAlgorithm

Methods


initialise

def initialise(self, *, task_id: str, **kwargs: Any)> None:

Initialises the algorithm.

BaseCompatibleWorkerAlgorithm

class BaseCompatibleWorkerAlgorithm(*args, **kwargs):

Protocol defining base worker-side algorithm compatibility.

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleWorker
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleWorkerAlgorithm
  • bitfount.federated.protocols.model_protocols.inference_image_output._InferenceAndImageOutputCompatibleWorkerAlgorithm
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleWorkerAlgorithm

Methods


initialise

def initialise(    self,    *,    datasource: BaseSource,    task_id: str,    data_splitter: Optional[DatasetSplitter] = None,    pod_dp: Optional[DPPodConfig] = None,    pod_identifier: Optional[str] = None,    **kwargs: Any,)> None:

Initialises the algorithm.

initialise_data

def initialise_data(    self, datasource: BaseSource, data_splitter: Optional[DatasetSplitter] = None,)> None:

Initialises the data for the algorithm.

BaseModellerProtocol

class BaseModellerProtocol(    *,    algorithm: Union[BaseCompatibleModellerAlgorithm, Sequence[BaseCompatibleModellerAlgorithm]],    mailbox: _ModellerMailbox,    **kwargs: Any,):

Modeller side of the protocol.

Calls the modeller side of the algorithm.

Ancestors

Subclasses

  • bitfount.federated.protocols.ehr.data_extraction_protocol_charcoal._ModellerSide
  • bitfount.federated.protocols.ehr.nextgen_search_protocol._ModellerSide
  • bitfount.federated.protocols.model_protocols.federated_averaging._ModellerSide
  • bitfount.federated.protocols.model_protocols.inference_csv_report._ModellerSide
  • bitfount.federated.protocols.model_protocols.inference_image_output._ModellerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._ModellerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._ModellerSide
  • GenericOphthalmologyModellerSide
  • bitfount.federated.protocols.results_only._ModellerSide

Variables

  • static mailbox : bitfount.federated.transport.modeller_transport._ModellerMailbox

Methods


initialise

def initialise(self, *, task_id: str, **kwargs: Any)> None:

Initialises the component algorithms.

run

async def run(self, *, context: ProtocolContext, **kwargs: Any)> Any:

Runs Modeller side of the protocol.

Arguments

  • context: Optional. Run-time context for the protocol.
  • ****kwargs**: Additional keyword arguments.

BaseProtocolFactory

class BaseProtocolFactory(    *,    algorithm: Union[BaseCompatibleAlgoFactory, Sequence[BaseCompatibleAlgoFactory]],    primary_results_path: Optional[str] = None,    **kwargs: Any,):

Base Protocol from which all other protocols must inherit.

Ancestors

  • abc.ABC
  • bitfount.federated.roles._RolesMixIn
  • bitfount.types._BaseSerializableObjectMixIn

Variables

Methods


dump

def dump(self)> SerializedProtocol:

Returns the JSON-serializable representation of the protocol.

modeller

def modeller(    self, *, mailbox: _ModellerMailbox, context: ProtocolContext, **kwargs: Any,)> BaseModellerProtocol:

Creates an instance of the modeller-side for this protocol.

run

def run(    self,    pod_identifiers: Collection[str],    session: Optional[BitfountSession] = None,    username: Optional[str] = None,    hub: Optional[BitfountHub] = None,    ms_config: Optional[MessageServiceConfig] = None,    message_service: Optional[_MessageService] = None,    pod_public_key_paths: Optional[Mapping[str, Path]] = None,    identity_verification_method: IdentityVerificationMethod = IdentityVerificationMethod.OIDC_DEVICE_CODE,    private_key_or_file: Optional[Union[RSAPrivateKey, Path]] = None,    idp_url: Optional[str] = None,    require_all_pods: bool = False,    run_on_new_data_only: bool = False,    project_id: Optional[str] = None,    batched_execution: Optional[bool] = None,    test_run: bool = False,    force_rerun_failed_files: bool = True,)> Optional[Any]:

Sets up a local Modeller instance and runs the protocol.

Arguments

  • pod_identifiers: The BitfountHub pod identifiers to run against.
  • session: Optional. Session to use for authenticated requests. Created if needed.
  • username: Username to run as. Defaults to logged in user.
  • hub: BitfountHub instance. Default: hub.bitfount.com.
  • ms_config: Message service config. Default: messaging.bitfount.com.
  • message_service: Message service instance, created from ms_config if not provided. Defaults to "messaging.bitfount.com".
  • pod_public_key_paths: Public keys of pods to be checked against.
  • identity_verification_method: The identity verification method to use.
  • private_key_or_file: Private key (to be removed).
  • idp_url: The IDP URL.
  • require_all_pods: If true raise PodResponseError if at least one pod identifier specified rejects or fails to respond to a task request.
  • run_on_new_data_only: Whether to run the task on new datapoints only. Defaults to False.
  • project_id: The project ID to run the task under.
  • batched_execution: Whether to run the task in batched mode. Defaults to False.
  • test_run: If True, runs the task in test mode, on a limited number of datapoints. Defaults to False.
  • force_rerun_failed_files: If True, forces a rerun on files that the task previously failed on. If False, the task will skip files that have previously failed. Note: This option can only be enabled if both enable_batch_resilience and individual_file_retry_enabled are True. Defaults to True.

Returns Results of the protocol.

Raises

  • PodResponseError: If require_all_pods is true and at least one pod identifier specified rejects or fails to respond to a task request.
  • ValueError: If attempting to train on multiple pods, and the DataStructure table name is given as a string.

worker

def worker(    self,    *,    mailbox: _WorkerMailbox,    hub: BitfountHub,    context: ProtocolContext,    **kwargs: Any,)> BaseWorkerProtocol:

Creates an instance of the worker-side for this protocol.

BaseWorkerProtocol

class BaseWorkerProtocol(    *,    algorithm: Union[BaseCompatibleWorkerAlgorithm, Sequence[BaseCompatibleWorkerAlgorithm]],    mailbox: _WorkerMailbox,    **kwargs: Any,):

Worker side of the protocol.

Calls the worker side of the algorithm.

Ancestors

Subclasses

  • bitfount.federated.protocols.ehr.data_extraction_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.ehr.nextgen_search_protocol._WorkerSide
  • bitfount.federated.protocols.model_protocols.federated_averaging._WorkerSide
  • bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
  • bitfount.federated.protocols.model_protocols.inference_image_output._WorkerSide
  • bitfount.federated.protocols.ophthalmology.fluid_volume_screening_protocol._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze_base._WorkerSideBronzeBase
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
  • bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
  • bitfount.federated.protocols.ophthalmology.wet_amd_screening_protocol_sapphire._WorkerSide
  • bitfount.federated.protocols.results_only._WorkerSide

Variables

  • static mailbox : bitfount.federated.transport.worker_transport._WorkerMailbox
  • static project_id : Optional[str]

Methods


initialise

def initialise(    self,    *,    datasource: BaseSource,    task_id: str,    data_splitter: Optional[DatasetSplitter] = None,    pod_dp: Optional[DPPodConfig] = None,    pod_identifier: Optional[str] = None,    project_id: Optional[str] = None,    ehr_secrets: Optional[ExternallyManagedJWT] = None,    ehr_config: Optional[EHRConfig] = None,    parent_pod_identifier: Optional[str] = None,    datasource_name: Optional[str] = None,    data_identifier: Optional[str] = None,    **kwargs: Any,)> None:

Initialises the component algorithms.

Sets up the protocol with the necessary data source, configuration, and context information, then initialises all component algorithms with these parameters. Called once per task, regardless of the number of batches.

Why initialise() instead of __init__()?

Protocols follow a two-phase initialization pattern:

  1. __init__() phase: Protocols are created on the modeller side with only serializable configuration (algorithms, mailbox). They are then serialized and sent to worker pods over the network.

  2. initialise() phase: After deserialization on the worker, this method is called with task-specific and worker-specific data that:

    • Was not available at protocol creation time (e.g., task_id, datasource, project_id)
    • Cannot be serialized (e.g., BaseSource objects, JWT tokens in ehr_secrets)
    • Is specific to each worker pod (e.g., datasource, pod_identifier, parent_pod_identifier)

This separation allows the same protocol instance to be reused across multiple tasks (with different datasources, task IDs, etc.) and ensures that worker-specific resources are only created when and where they are needed.

Arguments

  • datasource: Datasource object that provides access to the data. Used for streaming data in batches. Must be a concrete instance of BaseSource.
  • task_id: Unique identifier for the task being executed. Used for tracking, logging, and task management throughout the protocol execution.
  • data_splitter: Optional. Dataset splitter used to divide the dataset into train/validation/test splits. If provided, algorithms will use this to determine which data split to process. Defaults to None.
  • pod_dp: Optional. Differential privacy configuration for the pod. If provided, applies privacy constraints (epsilon, delta) to algorithms that support differential privacy. Defaults to None.
  • pod_identifier: Optional. Unique identifier for the pod. Used for identification and tracking purposes. Defaults to None.
  • project_id: Optional. Project identifier. Used for project-level tracking, organization, and database operations. Defaults to None.
  • ehr_secrets: Optional. JWT secrets for authenticating with Electronic Health Record (EHR) systems. Required when the protocol needs to access EHR data. Must be an ExternallyManagedJWT instance containing the JWT token and expiration information. Defaults to None.
  • ehr_config: Optional. Configuration for EHR system access. Contains provider information (e.g., "nextgen enterprise") and base URL for the EHR API. Required when accessing EHR systems. Defaults to None.
  • parent_pod_identifier: Optional. Identifier of the parent pod that the worker is running in. Format is typically <username>/<pod_name>. Defaults to None.
  • datasource_name: Optional. Name of the datasource. Used for identification and as a fallback for when data_identifier is not available. Defaults to None.
  • data_identifier: Optional. Logical pod/dataset identifier for the task. Format is <username>/<datasource_name>. May differ from the pod identifier for pods with multiple datasources. Defaults to None.
  • **kwargs: Additional keyword arguments that may be passed to the component algorithms during their initialisation. These are forwarded to each algorithm's initialise method.

run

async def run(    self,    *,    pod_vitals: Optional[_PodVitals] = None,    context: ProtocolContext,    **kwargs: Any,)> Any:

Runs the worker-side of the algorithm.

Arguments

  • pod_vitals: Optional. Pod vitals instance for recording run-time details from the protocol run.
  • context: Optional. Run-time context for the protocol.
  • ****kwargs**: Additional keyword arguments.

FinalStepProtocol

class FinalStepProtocol():

Tagging class for protocols that contain a final step.

These protocols will have a number of steps that can be operated batch-wise (batch steps) followed by step(s) to be executed at the end.

Type

Arguments

  • T_FinalStepAlgo: Type of the setup algorithm, must implement FinalStepAlgorithm

Subclasses

  • FinalStepReduceProtocol
  • bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
  • bitfount.federated.protocols.results_only._WorkerSide

Variables

  • algorithms : list[~T_FinalStepAlgo] - Get the algorithms for this protocol.

Methods


run_final_step

async def run_final_step(self, *, context: ProtocolContext, **kwargs: Any)> Any:

Execute the final reduce step.

FinalStepReduceProtocol

class FinalStepReduceProtocol():

Tagging class for protocols that contain a final "reduce" step.

These protocols will have a number of steps that can be operated batch-wise (batch steps) followed by step(s) at the end that cannot be executed batch-wise but instead require access to the outputs from all batch steps (reduce step(s)).

Subclasses

  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze_base._WorkerSideBronzeBase
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
  • bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide

Variables

  • algorithms : list[~T_FinalStepAlgo] - Get the algorithms for this protocol.

Methods


run_final_step

async def run_final_step(self, *, context: ProtocolContext, **kwargs: Any)> Any:

Inherited from:

FinalStepProtocol.run_final_step :

Execute the final reduce step.

InitialSetupModellerProtocol

class InitialSetupModellerProtocol(*args: Any, **kwargs: Any):

Tagging class for protocols that contain an initial setup step.

These protocols will have an initial step that must be executed before any batching, followed by steps that can be operated batch-wise.

Type

Arguments

  • T_InitialSetupModellerAlgo: Type of the setup algorithm, must implement InitialSetupModellerAlgorithm

Ancestors

Subclasses

  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._ModellerSide

Variables

  • static mailbox : bitfount.federated.transport.modeller_transport._ModellerMailbox

Methods


run_initial_setup

async def run_initial_setup(self, **kwargs: Any)> None:

Run the initial setup phase.

InitialSetupWorkerProtocol

class InitialSetupWorkerProtocol(*args: Any, **kwargs: Any):

Tagging class for protocols that contain an initial setup step.

These protocols will have an initial step that must be executed before any batching, followed by steps that can be operated batch-wise.

Type

Arguments

  • T_InitialSetupAlgo: Type of the setup algorithm, must implement InitialSetupAlgorithm

Ancestors

Subclasses

  • bitfount.federated.protocols.ehr.nextgen_search_protocol._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide

Variables

  • static mailbox : bitfount.federated.transport.worker_transport._WorkerMailbox

Methods


run_initial_setup

async def run_initial_setup(self, **kwargs: Any)> None:

Run the initial setup phase.

LimitsExceededInfo

class LimitsExceededInfo(overrun: int, allowed: int):

LimitsExceededInfo(overrun, allowed)

Ancestors

  • builtins.tuple

Variables

  • allowed : int - Alias for field number 1
  • overrun : int - Alias for field number 0

ModelInferenceProtocolMixin

class ModelInferenceProtocolMixin():

Mixin class for protocols that may contain one or more model inference steps.

These protocols will have to respect any model inference usage limits that are associated with the model(s) in use.

Subclasses

  • bitfount.federated.protocols.ehr.data_extraction_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
  • bitfount.federated.protocols.model_protocols.inference_image_output._WorkerSide
  • bitfount.federated.protocols.ophthalmology.fluid_volume_screening_protocol._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze_base._WorkerSideBronzeBase
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_charcoal._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
  • bitfount.federated.protocols.ophthalmology.insite_insights_protocol._WorkerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
  • bitfount.federated.protocols.ophthalmology.wet_amd_screening_protocol_sapphire._WorkerSide
  • bitfount.federated.protocols.results_only._WorkerSide

Static methods


apply_actual_usage_to_resources_consumed

def apply_actual_usage_to_resources_consumed(    inference_algorithm: ModelInferenceWorkerSideAlgorithm,    limits_exceeded_info: LimitsExceededInfo | None,)> list[ResourceConsumed]:

Generate a resources consumed list from an algorithm that respects limits.

Given information on the actual number of inferences that were allowed/used, updates resources consumed entries from the given algorithm to reflect this limit.

If limits were not exceeded, just returns the resources consumed information unchanged.

Arguments

  • inference_algorithm: The inference algorithm used for the inferences.
  • limits_exceeded_info: If not None, contains information on the actual number of inferences that were allowed/used.

Returns The list of resources consumed, as generated by the algorithm, with model inference resources consumed entries modified to reflect the actually used inferences. If limits were not exceeded, returns the list of resources consumed, unchanged.

check_usage_limits

def check_usage_limits(    limits: dict[str, InferenceLimits],    inference_algorithm: ModelInferenceWorkerSideAlgorithm,)> Optional[LimitsExceededInfo]:

Check if the most recent inference run has exceeded the usage limits.

Updates the total usage count associated with model in question, regardless of if the limits are exceeded or not.

Arguments

  • limits: The inference usage limits as a mapping of model_id to usage limits.
  • inference_algorithm: The inference algorithm instance that has just been run.

Returns If limits were not exceeded, returns None. Otherwise, returns a container with .overrun and .allowed attributes which indicate the number of predictions usage was exceeded by and the number of predictions actually allowed to be used respectively. e.g. for an initial total_usage of 10, a limit of 20, and an inference run that used 14 more inferences, will return (4, 10). If limits are not exceeded, will return None.

handle_limits_exceeded

async def handle_limits_exceeded(    exceeded_inference_algo: ModelInferenceWorkerSideAlgorithm,    limits_exceeded_info: LimitsExceededInfo,    limits_info: dict[str, InferenceLimits],    mailbox: _WorkerMailbox,)> NoReturn:

Handles when usage limits are exceeded within the protocol.

In particular, sends a TASK_ABORT message from Worker->Modeller, letting them know that they limits are exceeded, and raises a TaskAbortError to do the same within the Worker side.

Methods


apply_usage_limits

def apply_usage_limits(    self,    context: ProtocolContext,    predictions_df: pd.DataFrame,    inference_algo: ModelInferenceWorkerSideAlgorithm,    final_batch: bool = False,)> tuple[pandas.core.frame.DataFrame, typing.Optional[LimitsExceededInfo], bool]:

Apply usage limits by reducing inference results.

Returns 3-tuple of the predictions dataframe (potentially reduced), limits exceeded info (or None if limits not exceeded), and a boolean indicating whether this should now be treated as the final batch.

ProtocolDecoratorMetaClass

class ProtocolDecoratorMetaClass(*args, **kwargs):

Decorates the __init__ and run protocol methods.

Ancestors

  • bitfount.hooks.BaseDecoratorMetaClass
  • builtins.type

Subclasses

  • types.AbstractProtocolDecoratorMetaClass

Static methods


decorator

def decorator(f: Callable)> collections.abc.Callable:

Hook decorator which logs before and after the hook it decorates.

do_decorate

def do_decorate(attr: str, value: Any)> bool:

Checks if an object should be decorated.

Only the init and run methods should be decorated.

ProtocolExecution

class ProtocolExecution(    protocol: _BaseProtocol,    run_method: Callable,    context: ProtocolContext,    batched_execution: Optional[bool],    hook_kwargs: Optional[_StrAnyDict],    processed_files_cache: Optional[dict[str, datetime]] = None,    failed_files_cache: Optional[dict[str, dict[str, str]]] = None,    test_run: bool = False,    cache_getter: Optional[Callable[[list[str]], pd.DataFrame]] = None,    **kwargs: Any,):

Handles protocol execution with proper context management.

Arguments

  • protocol: The protocol instance to run.
  • run_method: The method to execute on the protocol.
  • context: Optional. The context in which the protocol is being run.
  • batched_execution: Whether to run the protocol in batched mode.
  • hook_kwargs: Optional. Keyword arguments to pass to the hooks.
  • processed_files_cache: Optional. A dictionary of processed files with their last modified dates. Defaults to None.
  • failed_files_cache: Optional. A dictionary of previously failed files for the current task run. Defaults to None.
  • test_run: Whether this is a test run. Defaults to False.
  • ****kwargs**: Additional keyword arguments for the run method.

Methods


execute

async def execute(self)> Union[Any, list[Any]]:

Main execution entry point.