From 67dc3d952d217b73488ee3c862d12bab72f6b0d2 Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Sun, 11 Jun 2023 22:07:37 +0200 Subject: [PATCH 1/4] Initial implementation --- .../{aggregator.py => aggregatorbase.py} | 18 +++++++++++++++++- .../network/combiner/aggregators/fedavg.py | 4 ++-- fedn/fedn/network/combiner/round.py | 11 ++++++++--- 3 files changed, 27 insertions(+), 6 deletions(-) rename fedn/fedn/network/combiner/aggregators/{aggregator.py => aggregatorbase.py} (88%) diff --git a/fedn/fedn/network/combiner/aggregators/aggregator.py b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py similarity index 88% rename from fedn/fedn/network/combiner/aggregators/aggregator.py rename to fedn/fedn/network/combiner/aggregators/aggregatorbase.py index 69e0c4fcf..f62d9476b 100644 --- a/fedn/fedn/network/combiner/aggregators/aggregator.py +++ b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py @@ -1,11 +1,14 @@ +import importlib import json import queue from abc import ABC, abstractmethod import fedn.common.net.grpc.fedn_pb2 as fedn +AGGREGATOR_PLUGIN_PATH = "fedn.network.combiner.aggregators.{}" -class Aggregator(ABC): + +class AggregatorBase(ABC): """ Abstract class defining an aggregator. """ @abstractmethod @@ -117,3 +120,16 @@ def next_model_update(self, helper): data['round_id'] = config['round_id'] return model_next, data, model_id + + +def get_aggregator(aggregator_module_name, id, storage, server, modelservice, control): + """ Return an instance of the helper class. + + :param helper_module_name: The name of the helper plugin module. + :type helper_module_name: str + :return: A helper instance. + :rtype: class: `fedn.utils.helpers.HelperBase` + """ + aggregator_plugin = AGGREGATOR_PLUGIN_PATH.format(aggregator_module_name) + aggregator = importlib.import_module(aggregator_plugin) + return aggregator.Aggregator(id, storage, server, modelservice, control) diff --git a/fedn/fedn/network/combiner/aggregators/fedavg.py b/fedn/fedn/network/combiner/aggregators/fedavg.py index 3c4d55b2d..2105f1cf6 100644 --- a/fedn/fedn/network/combiner/aggregators/fedavg.py +++ b/fedn/fedn/network/combiner/aggregators/fedavg.py @@ -1,8 +1,8 @@ import fedn.common.net.grpc.fedn_pb2 as fedn -from fedn.network.combiner.aggregators.aggregator import Aggregator +from fedn.network.combiner.aggregators.aggregatorbase import AggregatorBase -class FedAvg(Aggregator): +class Aggregator(AggregatorBase): """ Local SGD / Federated Averaging (FedAvg) aggregator. Computes a weighted mean of parameter updates. diff --git a/fedn/fedn/network/combiner/round.py b/fedn/fedn/network/combiner/round.py index a0c3a3b39..b23476039 100644 --- a/fedn/fedn/network/combiner/round.py +++ b/fedn/fedn/network/combiner/round.py @@ -4,9 +4,12 @@ import time import uuid -from fedn.network.combiner.aggregators.fedavg import FedAvg +from fedn.network.combiner.aggregators.aggregatorbase import get_aggregator +#from fedn.network.combiner.aggregators.fedavg import FedAvg from fedn.utils.helpers import get_helper +AGGREGATOR_NAME = 'fedavg' + class ModelUpdateError(Exception): pass @@ -37,8 +40,10 @@ def __init__(self, id, storage, server, modelservice): self.modelservice = modelservice # TODO, make runtime configurable - self.aggregator = FedAvg( - self.id, self.storage, self.server, self.modelservice, self) + self.aggregator = get_aggregator(AGGREGATOR_NAME, self.id, self.storage, self.server, self.modelservice, self) + + # FedAvg( + # self.id, self.storage, self.server, self.modelservice, self) def push_round_config(self, round_config): """Add a round_config (job description) to the inbox. From 951c3e65057c31ba64ce8e322e6858567e1fe86d Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Tue, 13 Jun 2023 11:38:02 +0200 Subject: [PATCH 2/4] Aggregrator now configurable as cli option to combiner start --- fedn/cli/run_cmd.py | 6 ++- .../combiner/aggregators/aggregatorbase.py | 8 +-- .../network/combiner/aggregators/fedavg.py | 6 +-- fedn/fedn/network/combiner/round.py | 15 ++---- fedn/fedn/network/combiner/server.py | 54 +++++++++---------- fedn/fedn/network/controller/control.py | 2 - 6 files changed, 41 insertions(+), 50 deletions(-) diff --git a/fedn/cli/run_cmd.py b/fedn/cli/run_cmd.py index 84fc908f8..bf49cd1f0 100644 --- a/fedn/cli/run_cmd.py +++ b/fedn/cli/run_cmd.py @@ -245,8 +245,9 @@ def reducer_cmd(ctx, host, port, secret_key, local_package, name, init): @click.option('-c', '--max_clients', required=False, default=30, help='The maximal number of client connections allowed.') @click.option('-in', '--init', required=False, default=None, help='Path to configuration file to (re)init combiner.') +@click.option('-a', '--aggregator', required=False, default='fedavg', help='Filename of the aggregator module to use.') @click.pass_context -def combiner_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, secure, verify, max_clients, init): +def combiner_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, secure, verify, max_clients, init, aggregator): """ :param ctx: @@ -261,7 +262,8 @@ def combiner_cmd(ctx, discoverhost, discoverport, token, name, host, port, fqdn, :param init: """ config = {'discover_host': discoverhost, 'discover_port': discoverport, 'token': token, 'host': host, - 'port': port, 'fqdn': fqdn, 'name': name, 'secure': secure, 'verify': verify, 'max_clients': max_clients, 'init': init} + 'port': port, 'fqdn': fqdn, 'name': name, 'secure': secure, 'verify': verify, 'max_clients': max_clients, + 'init': init, 'aggregator': aggregator} if config['init']: apply_config(config) diff --git a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py index 970a93087..e3bad0ad5 100644 --- a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py +++ b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py @@ -12,7 +12,7 @@ class AggregatorBase(ABC): """ Abstract class defining an aggregator. """ @abstractmethod - def __init__(self, id, storage, server, modelservice, control): + def __init__(self, storage, server, modelservice, control): """ Initialize the aggregator. :param id: A reference to id of :class: `fedn.network.combiner.Combiner` @@ -28,7 +28,7 @@ def __init__(self, id, storage, server, modelservice, control): """ self.name = self.__class__.__name__ self.storage = storage - self.id = id + #self.id = id self.server = server self.modelservice = modelservice self.control = control @@ -110,7 +110,7 @@ def next_model_update(self, helper): return model_next, data, model_id -def get_aggregator(aggregator_module_name, id, storage, server, modelservice, control): +def get_aggregator(aggregator_module_name, storage, server, modelservice, control): """ Return an instance of the helper class. :param helper_module_name: The name of the helper plugin module. @@ -120,4 +120,4 @@ def get_aggregator(aggregator_module_name, id, storage, server, modelservice, co """ aggregator_plugin = AGGREGATOR_PLUGIN_PATH.format(aggregator_module_name) aggregator = importlib.import_module(aggregator_plugin) - return aggregator.Aggregator(id, storage, server, modelservice, control) + return aggregator.Aggregator(storage, server, modelservice, control) diff --git a/fedn/fedn/network/combiner/aggregators/fedavg.py b/fedn/fedn/network/combiner/aggregators/fedavg.py index 5349a8e8e..0cd15b66a 100644 --- a/fedn/fedn/network/combiner/aggregators/fedavg.py +++ b/fedn/fedn/network/combiner/aggregators/fedavg.py @@ -19,12 +19,12 @@ class Aggregator(AggregatorBase): """ - def __init__(self, id, storage, server, modelservice, control): + def __init__(self, storage, server, modelservice, control): """Constructor method""" - super().__init__(id, storage, server, modelservice, control) + super().__init__(storage, server, modelservice, control) - self.name = "FedAvg" + self.name = "fedavg" def combine_models(self, helper=None, time_window=180, max_nr_models=100, delete_models=True): """Aggregate model updates in the queue by computing an incremental diff --git a/fedn/fedn/network/combiner/round.py b/fedn/fedn/network/combiner/round.py index cd66cd4b3..3aad0e254 100644 --- a/fedn/fedn/network/combiner/round.py +++ b/fedn/fedn/network/combiner/round.py @@ -5,11 +5,8 @@ import uuid from fedn.network.combiner.aggregators.aggregatorbase import get_aggregator -#from fedn.network.combiner.aggregators.fedavg import FedAvg from fedn.utils.helpers import get_helper -AGGREGATOR_NAME = 'fedavg' - class ModelUpdateError(Exception): pass @@ -31,19 +28,13 @@ class RoundController: :type modelservice: class: `fedn.network.combiner.modelservice.ModelService` """ - def __init__(self, id, storage, server, modelservice): + def __init__(self, aggregator_name, storage, server, modelservice): - self.id = id self.round_configs = queue.Queue() self.storage = storage self.server = server self.modelservice = modelservice - - # TODO, make runtime configurable - self.aggregator = get_aggregator(AGGREGATOR_NAME, self.id, self.storage, self.server, self.modelservice, self) - - # FedAvg( - # self.id, self.storage, self.server, self.modelservice, self) + self.aggregator = get_aggregator(aggregator_name, self.storage, self.server, self.modelservice, self) def push_round_config(self, round_config): """Add a round_config (job description) to the inbox. @@ -371,7 +362,7 @@ def run(self, polling_interval=1.0): round_meta['time_exec_training'] = time.time() - \ tic round_meta['status'] = "Success" - round_meta['name'] = self.id + round_meta['name'] = self.server.id self.server.tracer.set_round_combiner_data(round_meta) if round_config['delete_models_storage'] == 'True': self.modelservice.models.delete(round_config['model_id']) diff --git a/fedn/fedn/network/combiner/server.py b/fedn/fedn/network/combiner/server.py index 625482913..05df34a43 100644 --- a/fedn/fedn/network/combiner/server.py +++ b/fedn/fedn/network/combiner/server.py @@ -51,11 +51,11 @@ def role_to_proto_role(role): class Combiner(rpc.CombinerServicer, rpc.ReducerServicer, rpc.ConnectorServicer, rpc.ControlServicer): """ Combiner gRPC server. """ - def __init__(self, connect_config): + def __init__(self, config): """ Initialize a Combiner. - :param connect_config: configuration for the combiner - :type connect_config: dict + :param config: configuration for the combiner + :type config: dict """ # Client queues @@ -64,24 +64,24 @@ def __init__(self, connect_config): self.modelservice = ModelService() # Validate combiner name - match = re.search(VALID_NAME_REGEX, connect_config['name']) + match = re.search(VALID_NAME_REGEX, config['name']) if not match: raise ValueError('Unallowed character in combiner name. Allowed characters: a-z, A-Z, 0-9, _, -.') - self.id = connect_config['name'] + self.id = config['name'] self.role = Role.COMBINER - self.max_clients = connect_config['max_clients'] + self.max_clients = config['max_clients'] # Connector to announce combiner to discover service (reducer) - announce_client = ConnectorCombiner(host=connect_config['discover_host'], - port=connect_config['discover_port'], - myhost=connect_config['host'], - fqdn=connect_config['fqdn'], - myport=connect_config['port'], - token=connect_config['token'], - name=connect_config['name'], - secure=connect_config['secure'], - verify=connect_config['verify']) + announce_client = ConnectorCombiner(host=config['discover_host'], + port=config['discover_port'], + myhost=config['host'], + fqdn=config['fqdn'], + myport=config['port'], + token=config['token'], + name=config['name'], + secure=config['secure'], + verify=config['verify']) response = None while True: @@ -92,7 +92,7 @@ def __init__(self, connect_config): time.sleep(5) continue if status == Status.Assigned: - config = response + announce_config = response print( "COMBINER {0}: Announced successfully".format(self.id), flush=True) break @@ -100,33 +100,33 @@ def __init__(self, connect_config): print(response, flush=True) sys.exit("Exiting: Unauthorized") - cert = config['certificate'] - key = config['key'] + cert = announce_config['certificate'] + key = announce_config['key'] - if config['certificate']: - cert = base64.b64decode(config['certificate']) # .decode('utf-8') - key = base64.b64decode(config['key']) # .decode('utf-8') + if announce_config['certificate']: + cert = base64.b64decode(announce_config['certificate']) # .decode('utf-8') + key = base64.b64decode(announce_config['key']) # .decode('utf-8') # Set up gRPC server configuration - grpc_config = {'port': connect_config['port'], - 'secure': connect_config['secure'], + grpc_config = {'port': config['port'], + 'secure': config['secure'], 'certificate': cert, 'key': key} # Set up model repository self.repository = S3ModelRepository( - config['storage']['storage_config']) + announce_config['storage']['storage_config']) # Create gRPC server self.server = Server(self, self.modelservice, grpc_config) # Set up tracer for statestore self.tracer = MongoTracer( - config['statestore']['mongo_config'], config['statestore']['network_id']) + announce_config['statestore']['mongo_config'], announce_config['statestore']['network_id']) # Set up round controller - self.control = RoundController( - self.id, self.repository, self, self.modelservice) + self.control = RoundController(config['aggregator'], self.repository, self, self.modelservice) + # Start thread for round controller threading.Thread(target=self.control.run, daemon=True).start() diff --git a/fedn/fedn/network/controller/control.py b/fedn/fedn/network/controller/control.py index 1f8093f86..7929f2892 100644 --- a/fedn/fedn/network/controller/control.py +++ b/fedn/fedn/network/controller/control.py @@ -34,8 +34,6 @@ def __init__(self, message): self.message = message super().__init__(self.message) -# Exception class for when model is None - class NoModelException(Exception): """ Exception class for when model is None """ From 8513b079c2a645846973910f3e2d6175abea94de Mon Sep 17 00:00:00 2001 From: Andreas Hellander Date: Tue, 13 Jun 2023 13:14:12 +0200 Subject: [PATCH 3/4] Fix code scanning --- fedn/fedn/network/combiner/aggregators/aggregatorbase.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py index e3bad0ad5..943bc0600 100644 --- a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py +++ b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py @@ -28,7 +28,6 @@ def __init__(self, storage, server, modelservice, control): """ self.name = self.__class__.__name__ self.storage = storage - #self.id = id self.server = server self.modelservice = modelservice self.control = control From 2b14210fa27ec5861ab25d8c581bb9e1f883d5f9 Mon Sep 17 00:00:00 2001 From: Fredrik Wrede Date: Tue, 13 Jun 2023 14:05:44 +0000 Subject: [PATCH 4/4] fix doc strings --- .../network/combiner/aggregators/aggregatorbase.py | 12 ++++++++++-- fedn/fedn/network/combiner/round.py | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py index 943bc0600..1a6da8d1a 100644 --- a/fedn/fedn/network/combiner/aggregators/aggregatorbase.py +++ b/fedn/fedn/network/combiner/aggregators/aggregatorbase.py @@ -114,8 +114,16 @@ def get_aggregator(aggregator_module_name, storage, server, modelservice, contro :param helper_module_name: The name of the helper plugin module. :type helper_module_name: str - :return: A helper instance. - :rtype: class: `fedn.utils.helpers.HelperBase` + :param storage: Model repository for :class: `fedn.network.combiner.Combiner` + :type storage: class: `fedn.common.storage.s3.s3repo.S3ModelRepository` + :param server: A handle to the Combiner class :class: `fedn.network.combiner.Combiner` + :type server: class: `fedn.network.combiner.Combiner` + :param modelservice: A handle to the model service :class: `fedn.network.combiner.modelservice.ModelService` + :type modelservice: class: `fedn.network.combiner.modelservice.ModelService` + :param control: A handle to the :class: `fedn.network.combiner.round.RoundController` + :type control: class: `fedn.network.combiner.round.RoundController` + :return: An aggregator instance. + :rtype: class: `fedn.combiner.aggregators.AggregatorBase` """ aggregator_plugin = AGGREGATOR_PLUGIN_PATH.format(aggregator_module_name) aggregator = importlib.import_module(aggregator_plugin) diff --git a/fedn/fedn/network/combiner/round.py b/fedn/fedn/network/combiner/round.py index 3aad0e254..266873608 100644 --- a/fedn/fedn/network/combiner/round.py +++ b/fedn/fedn/network/combiner/round.py @@ -18,8 +18,8 @@ class RoundController: The round controller recieves round configurations from the global controller and coordinates model updates and aggregation, and model validations. - :param id: A reference to id of :class: `fedn.network.combiner.Combiner` - :type id: str + :param aggregator_name: The name of the aggregator plugin module. + :type aggregator_name: str :param storage: Model repository for :class: `fedn.network.combiner.Combiner` :type storage: class: `fedn.common.storage.s3.s3repo.S3ModelRepository` :param server: A handle to the Combiner class :class: `fedn.network.combiner.Combiner`