Skip to content

Commit

Permalink
mongostatestore dependecy to control removed and repository dependecy…
Browse files Browse the repository at this point in the history
… injected into control
  • Loading branch information
niklastheman committed Dec 30, 2024
1 parent 839bf39 commit db14426
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 48 deletions.
5 changes: 2 additions & 3 deletions fedn/network/api/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ class Network:
Some methods has been moved to :class:`fedn.network.api.interface.API`.
"""

def __init__(self, control, statestore, combiner_store: CombinerStore, client_store: ClientStore, load_balancer=None):
def __init__(self, control, network_id: str, combiner_store: CombinerStore, client_store: ClientStore, load_balancer=None):
""" """
self.statestore = statestore
self.combiner_store = combiner_store
self.client_store = client_store
self.control = control
self.id = statestore.network_id
self.id = network_id

if not load_balancer:
self.load_balancer = LeastPacked(self)
Expand Down
16 changes: 10 additions & 6 deletions fedn/network/api/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
modelstorage_config = get_modelstorage_config()
network_id = get_network_config()
statestore = MongoStateStore(network_id, statestore_config["mongo_config"])
statestore.set_storage_backend(modelstorage_config)
# statestore.set_storage_backend(modelstorage_config)

mc = pymongo.MongoClient(**statestore_config["mongo_config"])
mc.server_info()
Expand All @@ -35,16 +35,20 @@
status_store = StatusStore(mdb, "control.status")
validation_store = ValidationStore(mdb, "control.validations")

repository = Repository(modelstorage_config["storage_config"])

control = Control(
statestore=statestore,
network_id=network_id,
session_store=session_store,
model_store=model_store,
round_store=round_store,
package_store=package_store,
combiner_store=combiner_store,
client_store=client_store,
model_repository=repository,
)

# TODO: use Repository
minio_repository: RepositoryBase = None

if modelstorage_config["storage_type"] == "S3":
Expand All @@ -53,9 +57,9 @@

storage_collection = mdb["network.storage"]

storage_config = storage_collection.find_one({"status": "enabled"}, projection={"_id": False})
# storage_config = storage_collection.find_one({"status": "enabled"}, projection={"_id": False})

repository: RepositoryBase = None
# repository: RepositoryBase = None

if storage_config["storage_type"] == "S3":
repository = Repository(storage_config["storage_config"])
# if storage_config["storage_type"] == "S3":
# repository = Repository(storage_config["storage_config"])
4 changes: 1 addition & 3 deletions fedn/network/combiner/combiner.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from fedn.common.certificate.certificate import Certificate
from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream
from fedn.network.combiner.roundhandler import RoundConfig, RoundHandler
from fedn.network.combiner.shared import client_store, combiner_store, prediction_store, repository, round_store, statestore, status_store, validation_store
from fedn.network.combiner.shared import client_store, combiner_store, prediction_store, repository, round_store, status_store, validation_store
from fedn.network.grpc.server import Server, ServerConfig
from fedn.network.storage.statestore.stores.shared import EntityNotFound

Expand Down Expand Up @@ -106,8 +106,6 @@ def __init__(self, config):
# Set up model repository
self.repository = repository

self.statestore = statestore

self.round_store = round_store

# Add combiner to statestore
Expand Down
3 changes: 0 additions & 3 deletions fedn/network/combiner/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from fedn.common.config import get_modelstorage_config, get_network_config, get_statestore_config
from fedn.network.combiner.modelservice import ModelService
from fedn.network.storage.s3.repository import Repository
from fedn.network.storage.statestore.mongostatestore import MongoStateStore
from fedn.network.storage.statestore.stores.client_store import ClientStore
from fedn.network.storage.statestore.stores.combiner_store import CombinerStore
from fedn.network.storage.statestore.stores.prediction_store import PredictionStore
Expand All @@ -16,8 +15,6 @@
modelstorage_config = get_modelstorage_config()
network_id = get_network_config()

statestore = MongoStateStore(network_id, statestore_config["mongo_config"])

if statestore_config["type"] == "MongoDB":
mc = pymongo.MongoClient(**statestore_config["mongo_config"])
mc.server_info()
Expand Down
6 changes: 4 additions & 2 deletions fedn/network/controller/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from fedn.network.combiner.roundhandler import RoundConfig
from fedn.network.controller.controlbase import ControlBase
from fedn.network.state import ReducerState
from fedn.network.storage.s3.repository import Repository
from fedn.network.storage.statestore.stores.client_store import ClientStore
from fedn.network.storage.statestore.stores.combiner_store import CombinerStore
from fedn.network.storage.statestore.stores.model_store import ModelStore
Expand Down Expand Up @@ -96,16 +97,17 @@ class Control(ControlBase):

def __init__(
self,
statestore,
network_id: str,
session_store: SessionStore,
model_store: ModelStore,
round_store: RoundStore,
package_store: PackageStore,
combiner_store: CombinerStore,
client_store: ClientStore,
model_repository: Repository,
):
"""Constructor method."""
super().__init__(statestore, session_store, model_store, round_store, package_store, combiner_store, client_store)
super().__init__(network_id, session_store, model_store, round_store, package_store, combiner_store, client_store, model_repository)
self.name = "DefaultControl"

def start_session(self, session_id: str, rounds: int, round_timeout: int) -> None:
Expand Down
65 changes: 34 additions & 31 deletions fedn/network/controller/controlbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ class ControlBase(ABC):
@abstractmethod
def __init__(
self,
statestore,
network_id: str,
session_store: SessionStore,
model_store: ModelStore,
round_store: RoundStore,
package_store: PackageStore,
combiner_store: CombinerStore,
client_store: ClientStore,
model_repository: Repository,
):
"""Constructor."""
self._state = ReducerState.setup
Expand All @@ -62,35 +63,38 @@ def __init__(
self.model_store = model_store
self.round_store = round_store
self.package_store = package_store
self.statestore = statestore
if self.statestore.is_inited():
self.network = Network(self, statestore, combiner_store, client_store)

try:
not_ready = True
tries = 0
while not_ready:
storage_config = self.statestore.get_storage_backend()
if storage_config:
not_ready = False
else:
logger.warning("Storage backend not configured, waiting...")
sleep(5)
tries += 1
if tries > MAX_TRIES_BACKEND:
raise Exception
except Exception:
logger.error("Failed to retrive storage configuration, exiting.")
raise MisconfiguredStorageBackend()

if storage_config["storage_type"] == "S3":
self.model_repository = Repository(storage_config["storage_config"])
else:
logger.error("Unsupported storage backend, exiting.")
raise UnsupportedStorageBackend()

if self.statestore.is_inited():
self._state = ReducerState.idle
self.network = Network(self, network_id, combiner_store, client_store)

# if self.statestore.is_inited():
# self.network = Network(self, statestore, combiner_store, client_store)

self.model_repository = model_repository

# try:
# not_ready = True
# tries = 0
# while not_ready:
# storage_config = self.statestore.get_storage_backend()
# if storage_config:
# not_ready = False
# else:
# logger.warning("Storage backend not configured, waiting...")
# sleep(5)
# tries += 1
# if tries > MAX_TRIES_BACKEND:
# raise Exception
# except Exception:
# logger.error("Failed to retrive storage configuration, exiting.")
# raise MisconfiguredStorageBackend()

# if storage_config["storage_type"] == "S3":
# self.model_repository = Repository(storage_config["storage_config"])
# else:
# logger.error("Unsupported storage backend, exiting.")
# raise UnsupportedStorageBackend()

# if self.statestore.is_inited():
# self._state = ReducerState.idle

@abstractmethod
def round(self, config, round_number):
Expand Down Expand Up @@ -402,4 +406,3 @@ def state(self):
:rype: str
"""
return self._state
return self._state

0 comments on commit db14426

Please sign in to comment.