Skip to content

Commit

Permalink
Backend-Api: send average value for number Channels instead of latest…
Browse files Browse the repository at this point in the history
… value
  • Loading branch information
sfeilmeier committed Feb 27, 2019
1 parent a2bf09c commit 92cd3f4
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 63 deletions.
3 changes: 2 additions & 1 deletion io.openems.edge.controller.api.backend/bnd.bnd
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ Bundle-Vendor: FENECON GmbH
Bundle-License: https://opensource.org/licenses/EPL-2.0
Bundle-Version: 1.0.0.${tstamp}
Private-Package: \
io.openems.edge.controller.api.backend
io.openems.edge.controller.api.backend,\
io.openems.edge.controller.api.backend.slidingvalue

-includeresource: {readme.md}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,6 +34,7 @@
import io.openems.edge.common.component.AbstractOpenemsComponent;
import io.openems.edge.common.component.ComponentManager;
import io.openems.edge.common.component.OpenemsComponent;
import io.openems.edge.common.event.EdgeEventConstants;
import io.openems.edge.controller.api.Controller;
import io.openems.edge.controller.api.core.ApiWorker;
import io.openems.edge.timedata.api.Timedata;
Expand All @@ -39,19 +43,24 @@
@Component(name = "Controller.Api.Backend", //
immediate = true, //
configurationPolicy = ConfigurationPolicy.REQUIRE, //
property = "org.ops4j.pax.logging.appender.name=Controller.Api.Backend")
public class BackendApi extends AbstractOpenemsComponent implements Controller, OpenemsComponent, PaxAppender {

protected static final int DEFAULT_CYCLE_TIME = 10000;
property = { //
"org.ops4j.pax.logging.appender.name=Controller.Api.Backend", //
EventConstants.EVENT_TOPIC + "=" + EdgeEventConstants.TOPIC_CYCLE_AFTER_PROCESS_IMAGE //
} //
)
public class BackendApi extends AbstractOpenemsComponent
implements Controller, OpenemsComponent, PaxAppender, EventHandler {

protected static final int DEFAULT_NO_OF_CYCLES = 10;
protected static final String COMPONENT_NAME = "Controller.Api.Backend";

protected final BackendWorker backendWorker = new BackendWorker(this);
protected final BackendWorker worker = new BackendWorker(this);

private final Logger log = LoggerFactory.getLogger(BackendApi.class);
private final ApiWorker apiWorker = new ApiWorker();

protected WebsocketClient websocket = null;
protected int cycleTime = DEFAULT_CYCLE_TIME; // default, is going to be overwritten by config
protected int noOfCycles = DEFAULT_NO_OF_CYCLES; // default, is going to be overwritten by config
protected boolean debug = false;

// Used for SubscribeSystemLogRequests
Expand Down Expand Up @@ -79,7 +88,7 @@ public BackendApi() {
@Activate
void activate(ComponentContext context, Config config) {
super.activate(context, config.id(), config.enabled());
this.cycleTime = config.cycleTime();
this.noOfCycles = config.noOfCycles();
this.debug = config.debug();

if (!this.isEnabled()) {
Expand Down Expand Up @@ -115,13 +124,13 @@ void activate(ComponentContext context, Config config) {
this.websocket.start();

// Activate worker
this.backendWorker.activate(config.id());
this.worker.activate(config.id());
}

@Deactivate
protected void deactivate() {
super.deactivate();
this.backendWorker.deactivate();
this.worker.deactivate();
if (this.websocket != null) {
this.websocket.stop();
}
Expand Down Expand Up @@ -171,4 +180,13 @@ public void doAppend(PaxLoggingEvent event) {
SystemLogNotification notification = SystemLogNotification.fromPaxLoggingEvent(event);
ws.sendMessage(notification);
}

@Override
public void handleEvent(Event event) {
switch (event.getTopic()) {
case EdgeEventConstants.TOPIC_CYCLE_AFTER_PROCESS_IMAGE:
this.worker.triggerNextRun();
break;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,48 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.EvictingQueue;
import com.google.gson.JsonElement;

import io.openems.common.jsonrpc.base.JsonrpcMessage;
import io.openems.common.jsonrpc.notification.TimestampedDataNotification;
import io.openems.common.types.ChannelAddress;
import io.openems.common.worker.AbstractWorker;
import io.openems.common.types.OpenemsType;
import io.openems.common.worker.AbstractCycleWorker;
import io.openems.edge.common.channel.doc.AccessMode;
import io.openems.edge.controller.api.backend.slidingvalue.DoubleSlidingValue;
import io.openems.edge.controller.api.backend.slidingvalue.FloatSlidingValue;
import io.openems.edge.controller.api.backend.slidingvalue.IntegerSlidingValue;
import io.openems.edge.controller.api.backend.slidingvalue.LatestSlidingValue;
import io.openems.edge.controller.api.backend.slidingvalue.LongSlidingValue;
import io.openems.edge.controller.api.backend.slidingvalue.ShortSlidingValue;
import io.openems.edge.controller.api.backend.slidingvalue.SlidingValue;

class BackendWorker extends AbstractWorker {
class BackendWorker extends AbstractCycleWorker {

private static final int MAX_CACHED_MESSAGES = 1000;

// private final Logger log = LoggerFactory.getLogger(BackendWorker.class);
private final Logger log = LoggerFactory.getLogger(BackendWorker.class);

private final BackendApi parent;

private Optional<Integer> increasedCycleTime = Optional.empty();
// Counts the number of Cycles till data is sent to Backend.
private int cycleCount = 0;

// Holds an current NoOfCycles
private Optional<Integer> increasedNoOfCycles = Optional.empty();

// Current values
private final ConcurrentHashMap<ChannelAddress, SlidingValue<?>> data = new ConcurrentHashMap<>();

// Last cached values
private final HashMap<ChannelAddress, JsonElement> last = new HashMap<>();
// Unsent queue (FIFO)
private EvictingQueue<JsonrpcMessage> unsent = EvictingQueue.create(MAX_CACHED_MESSAGES);

Expand Down Expand Up @@ -59,30 +77,63 @@ public void sendValuesOfAllChannelsOnce() {

@Override
protected void forever() {
Map<ChannelAddress, JsonElement> values = this.getValues(//
this.sendChangedValuesOnly.getAndSet(true) // resets the mode to 'send changed values only'
);
// Update the data from ChannelValues
this.updateData();

// Increase CycleCount
if (++this.cycleCount < this.parent.noOfCycles) {
// Stop here if not reached CycleCount
return;
}

/*
* Reached CycleCount -> Send data
*/
// Reset CycleCount
this.cycleCount = 0;

// resets the mode to 'send changed values only'
boolean sendChangedValuesOnly = this.sendChangedValuesOnly.getAndSet(true);

// Prepare message values
Map<ChannelAddress, JsonElement> sendValues = new HashMap<>();

if (sendChangedValuesOnly) {
// Only Changed Values
for (Entry<ChannelAddress, SlidingValue<?>> entry : this.data.entrySet()) {
JsonElement changedValueOrNull = entry.getValue().getChangedValueOrNull();
if (changedValueOrNull != null) {
sendValues.put(entry.getKey(), changedValueOrNull);
}
}
} else {
// All Values
for (Entry<ChannelAddress, SlidingValue<?>> entry : this.data.entrySet()) {
sendValues.put(entry.getKey(), entry.getValue().getValue());
}
}

boolean canSendFromCache;

/*
* send, if list is not empty
*/
if (!values.isEmpty()) {
if (!sendValues.isEmpty()) {
// Get timestamp and round to Cycle-Time
int cycleTime = this.getCycleTime();
long timestamp = System.currentTimeMillis() / cycleTime * cycleTime;

// create JSON-RPC notification
TimestampedDataNotification message = new TimestampedDataNotification();
message.add(timestamp, values);
message.add(timestamp, sendValues);

// reset cycleTime to default
resetCycleTime();
resetNoOfCycles();

boolean wasSent = this.parent.websocket.sendMessage(message);
if (!wasSent) {
// increase cycleTime
increaseCycleTime();
increaseNoOfCycles();

// cache data for later
this.unsent.add(message);
Expand All @@ -106,21 +157,10 @@ protected void forever() {
}
}

@Override
protected int getCycleTime() {
return this.increasedCycleTime.orElse(this.parent.cycleTime);
}

/**
* Cycles through all Channels and gets the value. If 'changedValuesOnly' is
* set, then only values that changed since last check are added to the queue.
*
* @param changedValuesOnly take changed values only or take all channels
* values?
* @return a map of channels who's value changed since last check
* Cycles through all Channels and updates the value.
*/
private Map<ChannelAddress, JsonElement> getValues(boolean changedValuesOnly) {
final ConcurrentHashMap<ChannelAddress, JsonElement> values = new ConcurrentHashMap<>();
private void updateData() {
this.parent.componentManager.getComponents().parallelStream() //
.filter(c -> c.isEnabled()) //
.flatMap(component -> component.channels().parallelStream()) //
Expand All @@ -129,44 +169,94 @@ private Map<ChannelAddress, JsonElement> getValues(boolean changedValuesOnly) {
|| channel.channelDoc().getAccessMode() == AccessMode.READ_WRITE)
.forEach(channel -> {
ChannelAddress address = channel.address();
JsonElement jValue = channel.value().asJson();

if (changedValuesOnly) {
JsonElement jLastValue = this.last.get(address);
if (jLastValue == null || !jLastValue.equals(jValue)) {
// this value differs from the last sent value
this.last.put(address, jValue);
values.put(address, jValue);
}
Object value = channel.value().get();
boolean isEnum = channel.channelDoc().hasOptions();
if (isEnum && !channel.getType().equals(OpenemsType.ENUM)) {
this.log.warn(
"Channel [" + address.toString() + "] should have been defined as an EnumReadChannel");
}

} else {
this.last.put(address, jValue);
values.put(address, jValue);
// Get existing SlidingValue object or add new one
SlidingValue<?> slidingValue = this.data.get(address);
if (slidingValue == null) {
switch (channel.getType()) {
case INTEGER:
slidingValue = new IntegerSlidingValue();
break;
case BOOLEAN:
slidingValue = new LatestSlidingValue(OpenemsType.BOOLEAN);
break;
case DOUBLE:
slidingValue = new DoubleSlidingValue();
break;
case FLOAT:
slidingValue = new FloatSlidingValue();
break;
case LONG:
slidingValue = new LongSlidingValue();
break;
case SHORT:
slidingValue = new ShortSlidingValue();
break;
case STRING:
slidingValue = new LatestSlidingValue(OpenemsType.STRING);
break;
case ENUM:
slidingValue = new LatestSlidingValue(OpenemsType.ENUM);
break;
}
this.data.put(address, slidingValue);
}

// Add Value to SlidingValue object
switch (channel.getType()) {
case INTEGER:
((IntegerSlidingValue) slidingValue).addValue((Integer) value);
break;
case DOUBLE:
((DoubleSlidingValue) slidingValue).addValue((Double) value);
break;
case FLOAT:
((FloatSlidingValue) slidingValue).addValue((Float) value);
break;
case LONG:
((LongSlidingValue) slidingValue).addValue((Long) value);
break;
case SHORT:
((ShortSlidingValue) slidingValue).addValue((Short) value);
break;
case BOOLEAN:
case STRING:
case ENUM:
((LatestSlidingValue) slidingValue).addValue(value);
break;
}
});
return values;
}

private void increaseCycleTime() {
int currentCycleTime = this.getCycleTime();
int newCycleTime;
if (currentCycleTime < 30000 /* 30 seconds */) {
newCycleTime = currentCycleTime * 2;
/**
* NoOfCycles is adjusted if connection to Backend fails. This method increases
* the NoOfCycles.
*/
private void increaseNoOfCycles() {
int increasedNoOfCycles;
if (this.increasedNoOfCycles.isPresent()) {
increasedNoOfCycles = this.increasedNoOfCycles.get();
} else {
newCycleTime = currentCycleTime;
increasedNoOfCycles = this.parent.noOfCycles;
}
if (currentCycleTime != newCycleTime) {
this.increasedCycleTime = Optional.of(newCycleTime);
if (increasedNoOfCycles < 60) {
increasedNoOfCycles++;
}
this.increasedNoOfCycles = Optional.of(increasedNoOfCycles);
}

/**
* Cycletime is adjusted if connection to Backend fails. This method resets it
* NoOfCycles is adjusted if connection to Backend fails. This method resets it
* to configured or default value.
*/
private void resetCycleTime() {
this.increasedCycleTime = Optional.empty();
private void resetNoOfCycles() {
this.increasedNoOfCycles = Optional.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
@AttributeDefinition(name = "Uri", description = "The connection Uri to OpenEMS Backend.")
String uri() default "ws://localhost:8081";

@AttributeDefinition(name = "Cycle Time", description = "The time between sending data to Backend.")
int cycleTime() default BackendApi.DEFAULT_CYCLE_TIME;
@AttributeDefinition(name = "No. of Cycles", description = "How many Cycles till data is sent to OpenEMS Backend.")
int noOfCycles() default BackendApi.DEFAULT_NO_OF_CYCLES;

@AttributeDefinition(name = "Proxy Address", description = "The IP address or hostname of the proxy server.")
String proxyAddress() default "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void run(WebSocket ws, JsonObject handshake) {
this.parent.websocket.sendMessage(message);

// Send all Channel values
this.parent.backendWorker.sendValuesOfAllChannelsOnce();
this.parent.worker.sendValuesOfAllChannelsOnce();
}

}
Loading

0 comments on commit 92cd3f4

Please sign in to comment.