Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add feature to send a event to a specific actor #251

Merged
merged 2 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public List<PersonId> getUnvaccinatedFamilyMembers(PersonId personId) {
List<PersonId> familyMembers = familyDataManager.getFamilyMembers(familyId);
for (PersonId familyMemeberId : familyMembers) {
if (!isPersonVaccinated(familyMemeberId)) {
result.add(personId);
result.add(familyMemeberId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ protected DataManagerContext(DataManagerId dataManagerId, Simulation simulation)
* executed at the given time.
*
* @throws ContractException
* <ul>
* <ul>
* <li>{@link NucleusError#NULL_PLAN_CONSUMER} if the
* consumer is null</li>
* <li>{@link NucleusError#PAST_PLANNING_TIME} if the
Expand All @@ -36,7 +36,6 @@ protected DataManagerContext(DataManagerId dataManagerId, Simulation simulation)
* the plan is added to the simulation after event
* processing is finished</li>
* </ul>
*
*/
public void addPlan(final Consumer<DataManagerContext> consumer, final double planTime) {
simulation.addDataManagerPlan(dataManagerId, new ConsumerDataManagerPlan(planTime, consumer));
Expand All @@ -50,21 +49,20 @@ public void addPlan(final Consumer<DataManagerContext> consumer, final double pl
* plans (having arrival id = -1) are scheduled in the planning queue with
* higher arrival ids than all the serialized plans.
*
*
* @throws ContractException
* <ul>
* <li>{@link NucleusError#NULL_PLAN} if the plan is
* null</li>
* <li>{@link NucleusError#INVALID_PLAN_ARRIVAL_ID} if the
* arrival id is less than -1</li>
* <li>{@link NucleusError#INVALID_PLAN_ARRIVAL_ID} if
* the arrival id is less than -1</li>
* <li>{@link NucleusError#PAST_PLANNING_TIME} if the
* plan is scheduled for a time in the past *</li>
* <li>{@link NucleusError#PLANNING_QUEUE_CLOSED} if
* the plan is added to the simulation after event
* processing is finished</li>
* </ul>
*/
public void addPlan(DataManagerPlan plan) {
public void addPlan(DataManagerPlan plan) {
simulation.addDataManagerPlan(dataManagerId, plan);
}

Expand Down Expand Up @@ -111,6 +109,24 @@ public void releaseObservationEvent(final Event event) {
simulation.releaseObservationEventForDataManager(event);
}

/**
* Broadcasts the given event to a single actor. This is used
* for OBSERVATION events that are generated by the data managers. MUTATION
* events that are generated by the data managers as a proxy for actors and data
* managers should use releaseMutationEvent() instead.
*
* @throws ContractException
* <ul>
* <li>{@link NucleusError#NULL_EVENT} if the event is
* null</li>
* <li>{@link NucleusError#NULL_ACTOR_ID} if the
* actorId is null</li>
* </ul>
*/
public void releaseObservationEventToActor(final Event event, final ActorId actorId) {
simulation.releaseObservationEventForDataManagerToActor(event, actorId);
}

/**
* Starts the event handling process for the given event This is used for
* MUTATION events.
Expand Down Expand Up @@ -243,7 +259,6 @@ public LocalDateTime getLocalDateTime(double simulationTime) {
* if this method is invoked before the termination of
* the simulation</li>
* </ul>
*
*/
public List<DataManagerPlan> retrievePlans() {
return simulation.retrievePlansForDataManager(dataManagerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,23 @@ protected void unsubscribeReportFromEvent(Class<? extends Event> eventClass) {

}

protected void releaseObservationEventForDataManagerToActor(final Event event, final ActorId actorId) {
if (event == null) {
throw new ContractException(NucleusError.NULL_EVENT);
}

if (actorId == null) {
throw new ContractException(NucleusError.NULL_ACTOR_ID);
}

if (!dataManagerQueueActive) {
throw new ContractException(NucleusError.OBSERVATION_EVENT_IMPROPER_RELEASE);
}

// queue the event handling for actors
broadcastEventToFilterNodeAndActor(event, rootNode, actorId);
}

protected void releaseObservationEventForDataManager(final Event event) {

if (event == null) {
Expand Down Expand Up @@ -1474,6 +1491,43 @@ protected boolean subscribersExistForEvent(Class<? extends Event> eventClass) {
|| rootNode.children.containsKey(eventClass) || rootNode.consumers.containsKey(eventClass));
}

/*
* Recursively processes the event through the filter node to the given actor. Events should be
* processed through the root filter node. Each node's consumers have each such
* consumer scheduled onto the actor queue for delayed execution of the
* consumer.
*/
private void broadcastEventToFilterNodeAndActor(final Event event, FilterNode filterNode, ActorId actorId) {
// determine the value of the function for the given event
Object value = filterNode.function.apply(event);

// use that value to place any consumers that are matched to that value
// on the actor queue
Map<ActorId, Consumer<Event>> consumerMap = filterNode.consumers.get(value);
if (consumerMap != null) {
if (consumerMap.containsKey(actorId)) {
Consumer<Event> consumer = consumerMap.get(actorId);
final ActorContentRec actorContentRec = new ActorContentRec();
actorContentRec.event = event;
actorContentRec.actorId = actorId;
actorContentRec.eventConsumer = consumer;
actorQueue.add(actorContentRec);
}
}

// match the value to any child nodes and recursively call this method
// on that node
Map<IdentifiableFunction<?>, FilterNode> childMap = filterNode.children.get(value);
if (childMap != null) {
for (Object id : childMap.keySet()) {
FilterNode childNode = childMap.get(id);
if (childNode != null) {
broadcastEventToFilterNodeAndActor(event, childNode, actorId);
}
}
}
}

/*
* Recursively processes the event through the filter node . Events should be
* processed through the root filter node. Each node's consumers have each such
Expand Down