Skip to main content

base

Pod communication protocols.

These classes take an algorithm and are responsible for organising the communication between Pods and Modeller.

Attributes: registry: A read-only dictionary of protocol factory names to their implementation classes.

Classes

BaseCompatibleAlgoFactory

class BaseCompatibleAlgoFactory(*args, **kwargs):

Protocol defining base algorithm factory compatibility.

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleAlgoFactory
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleAlgoFactory
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleAlgoFactory

Variables

  • static class_name : str

BaseCompatibleModellerAlgorithm

class BaseCompatibleModellerAlgorithm(*args, **kwargs):

Protocol defining base modeller-side algorithm compatibility.

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleModeller
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleModellerAlgorithm
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleModellerAlgorithm

Methods


initialise

def initialise(self, task_id: Optional[str] = None, **kwargs: Any)> None:

Initialises the algorithm.

BaseCompatibleWorkerAlgorithm

class BaseCompatibleWorkerAlgorithm(*args, **kwargs):

Protocol defining base worker-side algorithm compatibility.

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleWorker
  • bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleWorkerAlgorithm
  • bitfount.federated.protocols.results_only._ResultsOnlyCompatibleWorkerAlgorithm

Methods


initialise

def initialise(    self,    datasource: BaseSource,    data_splitter: Optional[DatasetSplitter] = None,    pod_dp: Optional[DPPodConfig] = None,    pod_identifier: Optional[str] = None,    **kwargs: Any,)> None:

Initialises the algorithm.

initialise_data

def initialise_data(    self, datasource: BaseSource, data_splitter: Optional[DatasetSplitter] = None,)> None:

Initialises the data for the algorithm.

BaseModellerProtocol

class BaseModellerProtocol(    *,    algorithm: Union[BaseCompatibleModellerAlgorithm, Sequence[BaseCompatibleModellerAlgorithm]],    mailbox: _ModellerMailbox,    **kwargs: Any,):

Modeller side of the protocol.

Calls the modeller side of the algorithm.

Ancestors

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._ModellerSide
  • bitfount.federated.protocols.model_protocols.inference_csv_report._ModellerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._ModellerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze._ModellerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._ModellerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._ModellerSide
  • bitfount.federated.protocols.results_only._ModellerSide

Methods


initialise

def initialise(self, task_id: Optional[str] = None, **kwargs: Any)> None:

Initialises the component algorithms.

run

async def run(self, **kwargs: Any)> Any:

Runs Modeller side of the protocol.

BaseProtocolFactory

class BaseProtocolFactory(    *,    algorithm: Union[BaseCompatibleAlgoFactory, Sequence[BaseCompatibleAlgoFactory]],    **kwargs: Any,):

Base Protocol from which all other protocols must inherit.

Ancestors

  • abc.ABC
  • bitfount.federated.roles._RolesMixIn
  • bitfount.types._BaseSerializableObjectMixIn

Variables

Methods


dump

def dump(self)> SerializedProtocol:

Returns the JSON-serializable representation of the protocol.

modeller

def modeller(    self, mailbox: _ModellerMailbox, **kwargs: Any,)> BaseModellerProtocol:

Creates an instance of the modeller-side for this protocol.

run

def run(    self,    pod_identifiers: Collection[str],    session: Optional[BitfountSession] = None,    username: Optional[str] = None,    hub: Optional[BitfountHub] = None,    ms_config: Optional[MessageServiceConfig] = None,    message_service: Optional[_MessageService] = None,    pod_public_key_paths: Optional[Mapping[str, Path]] = None,    identity_verification_method: IdentityVerificationMethod = IdentityVerificationMethod.OIDC_DEVICE_CODE,    private_key_or_file: Optional[Union[RSAPrivateKey, Path]] = None,    idp_url: Optional[str] = None,    require_all_pods: bool = False,    run_on_new_data_only: bool = False,    model_out: Optional[Union[Path, str]] = None,    project_id: Optional[str] = None,    batched_execution: Optional[bool] = None,)> Optional[Any]:

Sets up a local Modeller instance and runs the protocol.

Arguments

  • pod_identifiers: The BitfountHub pod identifiers to run against.
  • session: Optional. Session to use for authenticated requests. Created if needed.
  • username: Username to run as. Defaults to logged in user.
  • hub: BitfountHub instance. Default: hub.bitfount.com.
  • ms_config: Message service config. Default: messaging.bitfount.com.
  • message_service: Message service instance, created from ms_config if not provided. Defaults to "messaging.bitfount.com".
  • pod_public_key_paths: Public keys of pods to be checked against.
  • identity_verification_method: The identity verification method to use.
  • private_key_or_file: Private key (to be removed).
  • idp_url: The IDP URL.
  • require_all_pods: If true raise PodResponseError if at least one pod identifier specified rejects or fails to respond to a task request.
  • run_on_new_data_only: Whether to run the task on new datapoints only. Defaults to False.
  • model_out: The path to save the model to.
  • project_id: The project ID to run the task under.
  • batched_execution: Whether to run the task in batched mode. Defaults to False.

Returns Results of the protocol.

Raises

  • PodResponseError: If require_all_pods is true and at least one pod identifier specified rejects or fails to respond to a task request.
  • ValueError: If attempting to train on multiple pods, and the DataStructure table name is given as a string.

worker

def worker(    self, mailbox: _WorkerMailbox, hub: BitfountHub, **kwargs: Any,)> BaseWorkerProtocol:

Creates an instance of the worker-side for this protocol.

BaseWorkerProtocol

class BaseWorkerProtocol(    *,    algorithm: Union[BaseCompatibleWorkerAlgorithm, Sequence[BaseCompatibleWorkerAlgorithm]],    mailbox: _WorkerMailbox,    **kwargs: Any,):

Worker side of the protocol.

Calls the worker side of the algorithm.

Ancestors

Subclasses

  • bitfount.federated.protocols.model_protocols.federated_averaging._WorkerSide
  • bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_bronze._WorkerSide
  • bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
  • bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
  • bitfount.federated.protocols.results_only._WorkerSide

Variables

  • static mailbox : bitfount.federated.transport.worker_transport._WorkerMailbox
  • static project_id : Optional[str]

Methods


initialise

def initialise(    self,    datasource: BaseSource,    data_splitter: Optional[DatasetSplitter] = None,    pod_dp: Optional[DPPodConfig] = None,    pod_identifier: Optional[str] = None,    project_id: Optional[str] = None,    **kwargs: Any,)> None:

Initialises the component algorithms.

run

async def run(self, pod_vitals: Optional[_PodVitals] = None, **kwargs: Any)> Any:

Runs the worker-side of the algorithm.

send_evaluation_results_with_resources_consumed

async def send_evaluation_results_with_resources_consumed(    self,    algorithm: BaseCompatibleWorkerAlgorithm,    eval_results: Optional[Mapping[Hashable, Any]] = None,    notification: Optional[TaskNotification] = None,)> None:

Wraps send_evaluation_results to inject resources_consumed.

ProtocolDecoratorMetaClass

class ProtocolDecoratorMetaClass(*args, **kwargs):

Decorates the __init__ and run protocol methods.

Ancestors

  • bitfount.hooks.BaseDecoratorMetaClass
  • builtins.type

Subclasses

  • types.AbstractProtocolDecoratorMetaClass

Static methods


decorator

def decorator(f: Callable)> collections.abc.Callable:

Hook decorator which logs before and after the hook it decorates.

do_decorate

def do_decorate(attr: str, value: Any)> bool:

Checks if an object should be decorated.

Only the init and run methods should be decorated.