Skip to content

Commit

Permalink
Buffering for COHDA message implemented. Support for multiple container.
Browse files Browse the repository at this point in the history
fixes #4 #5, #8,
  • Loading branch information
mnebel-wenner committed Jun 13, 2022
1 parent a2a48d5 commit d02ffaa
Show file tree
Hide file tree
Showing 11 changed files with 636 additions and 265 deletions.
156 changes: 107 additions & 49 deletions mango_library/negotiation/cohda/cohda.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
"""Module for distributed real power planning with COHDA. Contains roles, which
integrate COHDA in the negotiation system and the core COHDA-decider together with its model.
"""
from typing import List, Dict, Any, Tuple, Optional
import copy
from typing import List, Dict, Any, Tuple
import asyncio
import numpy as np

from mango.messages.codecs import json_serializable
from mango_library.negotiation.core import NegotiationParticipant, NegotiationStarterRole, Negotiation
from mango_library.coalition.core import CoalitionAssignment
from mango_library.negotiation.cohda.data_classes import \
SolutionCandidate, SystemConfig, WorkingMemory, ScheduleSelection


@json_serializable
class CohdaMessage:
"""
Message for a COHDa negotiation.
Expand All @@ -33,8 +35,15 @@ class CohdaNegotiationStarterRole(NegotiationStarterRole):
"""Convenience role for starting a COHDA negotiation with simply providing a target schedule
"""

# create an empyt Working memory and send
# create an empyt Working memory and send it together with the target params
def __init__(self, target_params, coalition_model_matcher=None, coalition_uuid=None) -> None:
"""
:param target_params: Parameter that are necessary for the agents to calculate the performance.
Could be e.g. the target schedule.
:param coalition_model_matcher:
:param coalition_uuid:
"""
super().__init__(
lambda assignment:
CohdaMessage(WorkingMemory(target_params=target_params, system_config=SystemConfig({}),
Expand All @@ -49,7 +58,7 @@ class COHDA:
"""COHDA-decider
"""

def __init__(self, schedule_provider, is_local_acceptable, part_id, perf_func=None):
def __init__(self, schedule_provider, is_local_acceptable, part_id: str, perf_func=None):
self._schedule_provider = schedule_provider
self._is_local_acceptable = is_local_acceptable
self._memory = WorkingMemory(None, SystemConfig({}), SolutionCandidate(part_id, {}, float('-inf')))
Expand All @@ -64,16 +73,16 @@ def deviation_to_target_schedule(cluster_schedule: np.array, target_parameters):
diff = np.abs(np.array(target_schedule) - sum_cs) # deviation to the target schedule
w_diff = diff * np.array(weights) # multiply with weight vector
result = -np.sum(w_diff)
return result
return float(result)
self._perf_func = deviation_to_target_schedule
else:
self._perf_func = perf_func

def perceive(self, messages: List[CohdaMessage]) -> Tuple[SystemConfig, SolutionCandidate]:
"""
:param content:
:return: a tuple of
:param messages: The List of received CohdaMessages
:return: a tuple of SystemConfig, Candidate as a result of perceive
"""
current_sysconfig = None
current_candidate = None
Expand Down Expand Up @@ -101,20 +110,24 @@ def perceive(self, messages: List[CohdaMessage]) -> Tuple[SystemConfig, Solution
schedules[self._part_id] = self._schedule_provider()[0]
# we need to create a new class of SolutionCandidate so the COHDARole recognizes the update
current_candidate = SolutionCandidate(agent_id=self._part_id, schedules=schedules, perf=None)
current_candidate.perf = self._perf_func(current_candidate.cluster_schedule, self._memory.target_params)
current_candidate.perf = self._perf_func(current_candidate.cluster_schedule,
self._memory.target_params)
else:
current_candidate = self._memory.solution_candidate

new_sysconf = message.working_memory.system_config
new_candidate = message.working_memory.solution_candidate

current_sysconfig = SystemConfig.merge(sysconfig_i=current_sysconfig, sysconfig_j=new_sysconf)
current_candidate = SolutionCandidate.merge(candidate_i=current_candidate, candidate_j=new_candidate, agent_id=self._part_id,
perf_func=self._perf_func, target_params=self._memory.target_params)
current_candidate = SolutionCandidate.merge(candidate_i=current_candidate,
candidate_j=new_candidate,
agent_id=self._part_id,
perf_func=self._perf_func,
target_params=self._memory.target_params)

return current_sysconfig, current_candidate

def _decide(self, sysconfig: SystemConfig, candidate: SolutionCandidate) -> Tuple[SystemConfig, SolutionCandidate]:
def decide(self, sysconfig: SystemConfig, candidate: SolutionCandidate) -> Tuple[SystemConfig, SolutionCandidate]:
"""
:param sysconfig:
Expand Down Expand Up @@ -147,66 +160,111 @@ def _decide(self, sysconfig: SystemConfig, candidate: SolutionCandidate) -> Tupl

return sysconfig, current_best_candidate

def _act(self, new_sysconfig, new_candidate) -> Optional[CohdaMessage]:
if new_sysconfig != self._memory.system_config or new_candidate != self._memory.solution_candidate:
# update memory
self._memory.system_config = new_sysconfig
self._memory.solution_candidate = new_candidate

# send message
return CohdaMessage(working_memory=self._memory)

# don't send message
return None
def act(self, new_sysconfig: SystemConfig, new_candidate: SolutionCandidate) -> CohdaMessage:
"""
Stores the new SystemCondig and SolutionCandidate in Memory and returns the COHDA message that should be sent
:param new_sysconfig: The SystemConfig as a result from perceive and decide
:param new_candidate: The SolutionCandidate as a result from perceive and decide
:return: The COHDA message that should be sent
"""
# update memory
self._memory.system_config = new_sysconfig
self._memory.solution_candidate = new_candidate
# return COHDA message
return CohdaMessage(working_memory=self._memory)


class COHDARole(NegotiationParticipant):
"""Negotiation role for COHDA.
"""

def __init__(self, schedules_provider, local_acceptable_func):
def __init__(self, schedules_provider, local_acceptable_func=None, check_inbox_interval: float = 0.1):
"""
Init of COHDARole
:param schedules_provider: Function that takes not arguments and returns a list of schedules
:param local_acceptable_func: Function that takes a schedule as input and returns a boolean indicating,
if the schedule is locally acceptable or not. Defaults to lambda x: True
:param check_inbox_interval: Duration of buffering the cohda messages [s]
"""
super().__init__()

self._schedules_provider = schedules_provider
self._is_local_acceptable = local_acceptable_func
if local_acceptable_func is None:
self._is_local_acceptable = lambda x: True
else:
self._is_local_acceptable = local_acceptable_func
self._cohda = {}
self._cohda_msg_queues = {}
self._cohda_tasks = []
self.check_inbox_interval = check_inbox_interval

def create_cohda(self, part_id: int):
"""Create an instance of the COHDA-decider.
def create_cohda(self, part_id: str):
"""
Create an instance of COHDA.
:param part_id: participant id
:return: COHDA
:return: COHDA object
"""
return COHDA(schedule_provider=self._schedules_provider,
is_local_acceptable=self._is_local_acceptable,
part_id=part_id)

async def on_stop(self) -> None:
"""
Will be called once the agent is shutdown
"""
# cancel all cohda tasks
for task in self._cohda_tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

def handle(self,
message,
assignment: CoalitionAssignment,
negotiation: Negotiation,
meta: Dict[str, Any]):

if negotiation.coalition_id not in self._cohda:
if negotiation.coalition_id in self._cohda:
negotiation.active = True
self._cohda_msg_queues[negotiation.coalition_id].append(message)
else:
self._cohda[negotiation.coalition_id] = self.create_cohda(assignment.part_id)

# (old, new) = self._cohda[negotiation.coalition_id].decide(message)

this_cohda: COHDA = self._cohda[negotiation.coalition_id]
old_sysconf = this_cohda._memory.system_config
old_candidate = this_cohda._memory.solution_candidate


sysconf, candidate = this_cohda.perceive(messages=[message])

if sysconf is not old_sysconf or candidate is not old_candidate:
sysconf, candidate = this_cohda._decide(sysconfig=sysconf, candidate=candidate)
message_to_send = this_cohda._act(new_sysconfig=sysconf, new_candidate=candidate)

if message_to_send is not None:
self.send_to_neighbors(assignment, negotiation, message_to_send)

# set agent as idle
if self.context.inbox_length() == 0:
self._cohda_msg_queues[negotiation.coalition_id] = [message]

async def process_msg_queue():
"""
Method to evaluate all incoming message of a cohda_message_queue for a certain negotiation
"""

if len(self._cohda_msg_queues[negotiation.coalition_id]) > 0:
# copy queue
cohda_message_queue, self._cohda_msg_queues[negotiation.coalition_id] = \
self._cohda_msg_queues[negotiation.coalition_id], []
# get cohda object
current_cohda = self._cohda[negotiation.coalition_id]
# copy old memory
old_sysconf = current_cohda._memory.system_config
old_candidate = current_cohda._memory.solution_candidate

# perceive
sysconf, candidate = current_cohda.perceive(cohda_message_queue)

# decide
if sysconf is not old_sysconf or candidate is not old_candidate:
sysconf, candidate = current_cohda.decide(sysconfig=sysconf, candidate=candidate)
# act
message_to_send = current_cohda.act(new_sysconfig=sysconf, new_candidate=candidate)
if message_to_send is not None:
await self.send_to_neighbors(assignment, negotiation,
message_to_send)

else:
# set the negotiation as inactive as the incoming information was known already
negotiation.active = False
else:
negotiation.active = False

self._cohda_tasks.append(self.context.schedule_periodic_task(process_msg_queue,
delay=self.check_inbox_interval))
30 changes: 18 additions & 12 deletions mango_library/negotiation/cohda/data_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
from typing import Dict, Callable, Optional
import numpy as np

from mango.messages.codecs import json_serializable


@json_serializable
class SolutionCandidate:
"""
Model for a solution candidate in COHDA.
"""

def __init__(self, agent_id: int, schedules: Dict[int, np.array], perf: Optional[float]) -> None:
def __init__(self, agent_id: str, schedules: Dict[str, np.array], perf: Optional[float]) -> None:
self._agent_id = agent_id
self._schedules = schedules
self._perf = perf
Expand All @@ -29,23 +32,23 @@ def __eq__(self, o: object) -> bool:
return self.agent_id == o.agent_id and self.perf == o.perf and schedules_equal

@property
def agent_id(self) -> int:
def agent_id(self) -> str:
"""Return the agent id
:return: agent id
"""
return self._agent_id

@agent_id.setter
def agent_id(self, new_id: int):
def agent_id(self, new_id: str):
"""Set the agent id
:param new_id: agent id
"""
self._agent_id = new_id

@property
def schedules(self) -> Dict[int, np.array]:
def schedules(self) -> Dict[str, np.array]:
"""Return the candidate schedule map (part_id -> schedule)
:return: map part_id -> schedule
Expand Down Expand Up @@ -77,7 +80,7 @@ def cluster_schedule(self) -> np.array:
return np.array(list(self.schedules.values()))

@classmethod
def merge(cls, candidate_i, candidate_j, agent_id: int, perf_func: Callable, target_params):
def merge(cls, candidate_i, candidate_j, agent_id: str, perf_func: Callable, target_params):
"""
Returns a merged Candidate. If the candidate_i remains unchanged, the same instance of candidate_i is
returned, otherwise a new object is created with agent_id as candidate.agent_id
Expand Down Expand Up @@ -106,7 +109,7 @@ def merge(cls, candidate_i, candidate_j, agent_id: int, perf_func: Callable, tar
candidate = candidate_j
elif keyset_j - keyset_i:
# If *candidate_j* shares some entries with *candidate_i*, update *candidate_i*
new_schedules: Dict[int, np.array] = {}
new_schedules: Dict[str, np.array] = {}
for a in sorted(keyset_i | keyset_j):
if a in keyset_i:
schedule = candidate_i.schedules[a]
Expand All @@ -121,7 +124,7 @@ def merge(cls, candidate_i, candidate_j, agent_id: int, perf_func: Callable, tar
return candidate

@classmethod
def create_from_updated_sysconf(cls, sysconfig, agent_id: int, new_schedule: np.array):
def create_from_updated_sysconf(cls, sysconfig, agent_id: str, new_schedule: np.array):
"""
Creates a Candidate based on the cluster schedule of a SystemConfiguration,
which is changed only for *agent_id* towards *new_schedule*
Expand All @@ -136,6 +139,7 @@ def create_from_updated_sysconf(cls, sysconfig, agent_id: int, new_schedule: np.
return cls(agent_id=agent_id, schedules=schedule_dict, perf=None)


@json_serializable
class ScheduleSelection:
"""
A selection of a specific schedule
Expand Down Expand Up @@ -166,19 +170,20 @@ def schedule(self) -> np.array:
return self._schedule


@json_serializable
class SystemConfig:
"""
Model for a system configuration in COHDA
"""

def __init__(self, schedule_choices: Dict[int, ScheduleSelection]) -> None:
def __init__(self, schedule_choices: Dict[str, ScheduleSelection]) -> None:
self._schedule_choices = schedule_choices

def __eq__(self, o: object) -> bool:
return isinstance(o, SystemConfig) and self._schedule_choices == o._schedule_choices

@property
def schedule_choices(self) -> Dict[int, ScheduleSelection]:
def schedule_choices(self) -> Dict[str, ScheduleSelection]:
"""Return the schedule_choices map (part_id -> scheduleSelection)
:return: Dict with part_id -> ScheduleSelection
Expand All @@ -202,12 +207,12 @@ def merge(cls, sysconfig_i, sysconfig_j):
returned, otherwise a new object is created.
"""

sysconfig_i_schedules: Dict[int, ScheduleSelection] = sysconfig_i.schedule_choices
sysconfig_j_schedules: Dict[int, ScheduleSelection] = sysconfig_j.schedule_choices
sysconfig_i_schedules: Dict[str, ScheduleSelection] = sysconfig_i.schedule_choices
sysconfig_j_schedules: Dict[str, ScheduleSelection] = sysconfig_j.schedule_choices
key_set_i = set(sysconfig_i_schedules.keys())
key_set_j = set(sysconfig_j_schedules.keys())

new_sysconfig: Dict[int, ScheduleSelection] = {}
new_sysconfig: Dict[str, ScheduleSelection] = {}
modified = False

for i, a in enumerate(sorted(key_set_i | key_set_j)):
Expand All @@ -231,6 +236,7 @@ def merge(cls, sysconfig_i, sysconfig_j):
return sysconf


@json_serializable
class WorkingMemory:
"""Working memory of a COHDA agent
"""
Expand Down
Loading

0 comments on commit d02ffaa

Please sign in to comment.