base
Pod communication protocols.
These classes take an algorithm and are responsible for organising the communication between Pods and Modeller.
Attributes: registry: A read-only dictionary of protocol factory names to their implementation classes.
Classes
BaseCompatibleAlgoFactory
class BaseCompatibleAlgoFactory(*args, **kwargs):
Protocol defining base algorithm factory compatibility.
Ancestors
Subclasses
- bitfount.federated.protocols.conversation._ConversationCompatibleAlgoFactory
- bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleAlgoFactory
- bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleAlgoFactory
- bitfount.federated.protocols.results_only._ResultsOnlyCompatibleAlgoFactory
Variables
- static
class_name : str
- static
fields_dict : ClassVar[dict[str, marshmallow.fields.Field]]
- static
nested_fields : ClassVar[dict[str, collections.abc.Mapping[str, Any]]]
BaseCompatibleModellerAlgorithm
class BaseCompatibleModellerAlgorithm(*args, **kwargs):
Protocol defining base modeller-side algorithm compatibility.
Ancestors
Subclasses
- bitfount.federated.protocols.conversation._ConversationCompatibleModellerAlgorithm
- bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleModeller
- bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleModellerAlgorithm
- bitfount.federated.protocols.results_only._ResultsOnlyCompatibleModellerAlgorithm
Methods
initialise
def initialise(self, task_id: Optional[str] = None, **kwargs: Any) ‑> None:
Initialises the algorithm.
BaseCompatibleWorkerAlgorithm
class BaseCompatibleWorkerAlgorithm(*args, **kwargs):
Protocol defining base worker-side algorithm compatibility.
Ancestors
Subclasses
- bitfount.federated.protocols.conversation._ConversationCompatibleWorkerAlgorithm
- bitfount.federated.protocols.model_protocols.federated_averaging._FederatedAveragingCompatibleWorker
- bitfount.federated.protocols.model_protocols.inference_csv_report._InferenceAndCSVReportCompatibleWorkerAlgorithm
- bitfount.federated.protocols.results_only._ResultsOnlyCompatibleWorkerAlgorithm
Methods
initialise
def initialise( self, datasource: BaseSource, pod_dp: Optional[DPPodConfig] = None, pod_identifier: Optional[str] = None, **kwargs: Any,) ‑> None:
Initialises the algorithm.
initialise_data
def initialise_data(self, datasource: BaseSource) ‑> None:
Initialises the data for the algorithm.
BaseModellerProtocol
class BaseModellerProtocol( *, algorithm: Union[BaseCompatibleModellerAlgorithm, Sequence[BaseCompatibleModellerAlgorithm]], mailbox: _ModellerMailbox, **kwargs: Any,):
Modeller side of the protocol.
Calls the modeller side of the algorithm.
Ancestors
- bitfount.federated.protocols.base._BaseProtocol
- typing.Generic
- abc.ABC
Subclasses
- bitfount.federated.protocols.conversation._ModellerSide
- bitfount.federated.protocols.model_protocols.federated_averaging._ModellerSide
- bitfount.federated.protocols.model_protocols.inference_csv_report._ModellerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._ModellerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._ModellerSide
- bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._ModellerSide
- bitfount.federated.protocols.results_only._ModellerSide
Methods
initialise
def initialise(self, task_id: Optional[str] = None, **kwargs: Any) ‑> None:
Initialises the component algorithms.
run
async def run(self, **kwargs: Any) ‑> Any:
Runs Modeller side of the protocol.
BaseProtocolFactory
class BaseProtocolFactory( *, algorithm: Union[BaseCompatibleAlgoFactory, Sequence[BaseCompatibleAlgoFactory]], **kwargs: Any,):
Base Protocol from which all other protocols must inherit.
Subclasses
Variables
- static
fields_dict : ClassVar[dict[str, marshmallow.fields.Field]]
- static
nested_fields : ClassVar[dict[str, collections.abc.Mapping[str, Any]]]
algorithms : list
- Returns the algorithms in the protocol.
Methods
dump
def dump(self) ‑> SerializedProtocol:
Returns the JSON-serializable representation of the protocol.
modeller
def modeller( self, mailbox: _ModellerMailbox, **kwargs: Any,) ‑> BaseModellerProtocol:
Creates an instance of the modeller-side for this protocol.
run
def run( self, pod_identifiers: Collection[str], session: Optional[BitfountSession] = None, username: Optional[str] = None, hub: Optional[BitfountHub] = None, ms_config: Optional[MessageServiceConfig] = None, message_service: Optional[_MessageService] = None, pod_public_key_paths: Optional[Mapping[str, Path]] = None, identity_verification_method: IdentityVerificationMethod = IdentityVerificationMethod.OIDC_DEVICE_CODE, private_key_or_file: Optional[Union[RSAPrivateKey, Path]] = None, idp_url: Optional[str] = None, require_all_pods: bool = False, run_on_new_data_only: bool = False, model_out: Optional[Union[Path, str]] = None, project_id: Optional[str] = None, batched_execution: Optional[bool] = None,) ‑> Optional[Any]:
Sets up a local Modeller instance and runs the protocol.
Arguments
pod_identifiers
: The BitfountHub pod identifiers to run against.session
: Optional. Session to use for authenticated requests. Created if needed.username
: Username to run as. Defaults to logged in user.hub
: BitfountHub instance. Default: hub.bitfount.com.ms_config
: Message service config. Default: messaging.bitfount.com.message_service
: Message service instance, created from ms_config if not provided. Defaults to "messaging.bitfount.com".pod_public_key_paths
: Public keys of pods to be checked against.identity_verification_method
: The identity verification method to use.private_key_or_file
: Private key (to be removed).idp_url
: The IDP URL.require_all_pods
: If true raise PodResponseError if at least one pod identifier specified rejects or fails to respond to a task request.run_on_new_data_only
: Whether to run the task on new datapoints only. Defaults to False.model_out
: The path to save the model to.project_id
: The project ID to run the task under.batched_execution
: Whether to run the task in batched mode. Defaults to False.
Returns Results of the protocol.
Raises
PodResponseError
: If require_all_pods is true and at least one pod identifier specified rejects or fails to respond to a task request.ValueError
: If attempting to train on multiple pods, and theDataStructure
table name is given as a string.
worker
def worker( self, mailbox: _WorkerMailbox, hub: BitfountHub, **kwargs: Any,) ‑> BaseWorkerProtocol:
Creates an instance of the worker-side for this protocol.
BaseWorkerProtocol
class BaseWorkerProtocol( *, algorithm: Union[BaseCompatibleWorkerAlgorithm, Sequence[BaseCompatibleWorkerAlgorithm]], mailbox: _WorkerMailbox, **kwargs: Any,):
Worker side of the protocol.
Calls the worker side of the algorithm.
Ancestors
- bitfount.federated.protocols.base._BaseProtocol
- typing.Generic
- abc.ABC
Subclasses
- bitfount.federated.protocols.conversation._WorkerSide
- bitfount.federated.protocols.model_protocols.federated_averaging._WorkerSide
- bitfount.federated.protocols.model_protocols.inference_csv_report._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_amethyst._WorkerSide
- bitfount.federated.protocols.ophthalmology.ga_screening_protocol_jade._WorkerSide
- bitfount.federated.protocols.ophthalmology.retinal_disease_protocol_cobalt._WorkerSide
- bitfount.federated.protocols.results_only._WorkerSide
Variables
- static
datasource : BaseSource
- static
mailbox : bitfount.federated.transport.worker_transport._WorkerMailbox
- static
project_id : Optional[str]
Methods
initialise
def initialise( self, datasource: BaseSource, pod_dp: Optional[DPPodConfig] = None, pod_identifier: Optional[str] = None, project_id: Optional[str] = None, **kwargs: Any,) ‑> None:
Initialises the component algorithms.
run
async def run(self, pod_vitals: Optional[_PodVitals] = None, **kwargs: Any) ‑> Any:
Runs the worker-side of the algorithm.
ProtocolDecoratorMetaClass
class ProtocolDecoratorMetaClass(*args, **kwargs):
Decorates the __init__
and run
protocol methods.
Ancestors
- bitfount.hooks.BaseDecoratorMetaClass
- builtins.type
Subclasses
- types.AbstractProtocolDecoratorMetaClass
Static methods
decorator
def decorator(f: Callable) ‑> collections.abc.Callable:
Hook decorator which logs before and after the hook it decorates.
do_decorate
def do_decorate(attr: str, value: Any) ‑> bool:
Checks if an object should be decorated.
Only the init and run methods should be decorated.