From 9cb8d0b997f78f06ebab348ddb48cb3ccb5ef6b3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Martin=20Gr=C3=BCning?=
<36659742+martingruening@users.noreply.github.com>
Date: Thu, 10 Sep 2020 18:27:38 +0200
Subject: [PATCH] Add files via upload
Initial Release on Github
---
configuration.json | 7 +
pom.xml | 71 +++++++
.../discofox/apiclient/DiscovergyApi.java | 62 ++++++
.../apiclient/DiscovergyApiClient.java | 138 +++++++++++++
.../apiclient/DiscovergyApiEngine.java | 90 +++++++++
.../discofox/apiclient/aWATTarApiEngine.java | 89 +++++++++
src/eu/gruning/discofox/apiobjects/Meter.java | 111 +++++++++++
.../gruning/discofox/apiobjects/Reading.java | 90 +++++++++
.../gruning/discofox/apiobjects/Readings.java | 52 +++++
.../discofox/apiobjects/aWATTarPricing.java | 86 ++++++++
.../discofox/apiobjects/aWATTarPricings.java | 77 ++++++++
.../discofox/internal/Configuration.java | 117 +++++++++++
.../gruning/discofox/internal/Discotime.java | 22 +++
src/eu/gruning/discofox/internal/Version.java | 7 +
src/eu/gruning/discofox/main/Discofox.java | 187 ++++++++++++++++++
src/eu/gruning/discofox/mqtt/MqttEngine.java | 67 +++++++
.../discofox/task/ReadingBatchTask.java | 85 ++++++++
src/eu/gruning/discofox/task/ReadingTask.java | 137 +++++++++++++
src/main/resources/log4j2.xml | 13 ++
19 files changed, 1508 insertions(+)
create mode 100644 configuration.json
create mode 100644 pom.xml
create mode 100644 src/eu/gruning/discofox/apiclient/DiscovergyApi.java
create mode 100644 src/eu/gruning/discofox/apiclient/DiscovergyApiClient.java
create mode 100644 src/eu/gruning/discofox/apiclient/DiscovergyApiEngine.java
create mode 100644 src/eu/gruning/discofox/apiclient/aWATTarApiEngine.java
create mode 100644 src/eu/gruning/discofox/apiobjects/Meter.java
create mode 100644 src/eu/gruning/discofox/apiobjects/Reading.java
create mode 100644 src/eu/gruning/discofox/apiobjects/Readings.java
create mode 100644 src/eu/gruning/discofox/apiobjects/aWATTarPricing.java
create mode 100644 src/eu/gruning/discofox/apiobjects/aWATTarPricings.java
create mode 100644 src/eu/gruning/discofox/internal/Configuration.java
create mode 100644 src/eu/gruning/discofox/internal/Discotime.java
create mode 100644 src/eu/gruning/discofox/internal/Version.java
create mode 100644 src/eu/gruning/discofox/main/Discofox.java
create mode 100644 src/eu/gruning/discofox/mqtt/MqttEngine.java
create mode 100644 src/eu/gruning/discofox/task/ReadingBatchTask.java
create mode 100644 src/eu/gruning/discofox/task/ReadingTask.java
create mode 100644 src/main/resources/log4j2.xml
diff --git a/configuration.json b/configuration.json
new file mode 100644
index 0000000..9cd4a85
--- /dev/null
+++ b/configuration.json
@@ -0,0 +1,7 @@
+{
+ "User": "martin@gruning.eu",
+ "Password": "InnCOboMJghBLWaL97uD",
+ "Broker": "tcp://kube.gruning.eu:1883",
+ "aWATTarPrices": true,
+ "Debug": true
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..d0af4b6
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,71 @@
+
+ 4.0.0
+ eu.gruning
+ discofox
+ 0.0.3-SNAPSHOT
+ Discofox
+ Tooling around the Discovergy Smartmeter API
+
+ src
+
+
+ maven-compiler-plugin
+ 3.8.0
+
+
+ 1.8
+
+
+
+ maven-assembly-plugin
+
+
+
+ eu.gruning.discofox.main.Discofox
+ true
+ true
+
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+
+
+ UTF-8
+
+
+
+ com.github.scribejava
+ scribejava-core
+ 6.9.0
+
+
+ com.google.code.gson
+ gson
+ 2.8.6
+
+
+ org.apache.logging.log4j
+ log4j-core
+ 2.13.0
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.2
+
+
+
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/apiclient/DiscovergyApi.java b/src/eu/gruning/discofox/apiclient/DiscovergyApi.java
new file mode 100644
index 0000000..9bffc19
--- /dev/null
+++ b/src/eu/gruning/discofox/apiclient/DiscovergyApi.java
@@ -0,0 +1,62 @@
+package eu.gruning.discofox.apiclient;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
+import com.github.scribejava.core.builder.api.DefaultApi10a;
+import com.github.scribejava.core.model.OAuth1RequestToken;
+
+// This code is based on sample code provided by Discovergy GmbH at https://api.discovergy.com/docs/
+// Thanks Discovergy for providing this including a well documented API
+
+public class DiscovergyApi extends DefaultApi10a {
+
+ private final String baseAddress;
+ private final String user;
+ private final String password;
+
+ public DiscovergyApi(String user, String password) {
+ this("https://api.discovergy.com/public/v1", user, password);
+ }
+
+ public DiscovergyApi(String baseAddress, String user, String password) {
+ this.baseAddress = baseAddress;
+ this.user = user;
+ this.password = password;
+ }
+
+ public String getBaseAddress() {
+ return baseAddress;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ @Override
+ public String getRequestTokenEndpoint() {
+ return baseAddress + "/oauth1/request_token";
+ }
+
+ @Override
+ public String getAccessTokenEndpoint() {
+ return baseAddress + "/oauth1/access_token";
+ }
+
+ @Override
+ public String getAuthorizationBaseUrl() {
+ return baseAddress + "/oauth1/authorize";
+ }
+
+ @Override
+ public String getAuthorizationUrl(OAuth1RequestToken requestToken) {
+ try {
+ return baseAddress + "/oauth1/authorize?oauth_token=" + requestToken.getToken() + "&email="
+ + URLEncoder.encode(user, UTF_8.name()) + "&password=" + URLEncoder.encode(password, UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/eu/gruning/discofox/apiclient/DiscovergyApiClient.java b/src/eu/gruning/discofox/apiclient/DiscovergyApiClient.java
new file mode 100644
index 0000000..75a448f
--- /dev/null
+++ b/src/eu/gruning/discofox/apiclient/DiscovergyApiClient.java
@@ -0,0 +1,138 @@
+package eu.gruning.discofox.apiclient;
+
+import static java.nio.charset.CodingErrorAction.REPORT;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import com.github.scribejava.core.builder.ServiceBuilder;
+import com.github.scribejava.core.model.OAuth1AccessToken;
+import com.github.scribejava.core.model.OAuth1RequestToken;
+import com.github.scribejava.core.model.OAuthRequest;
+import com.github.scribejava.core.model.Response;
+import com.github.scribejava.core.model.Verb;
+import com.github.scribejava.core.oauth.OAuth10aService;
+import com.github.scribejava.core.utils.StreamUtils;
+import com.google.gson.Gson;
+
+/**
+ * Client for the Discovergy API (https://api.discovergy.com/docs/)
+ */
+public class DiscovergyApiClient {
+
+ /**
+ * Unique client id
+ */
+ private final String clientId;
+
+ private final DiscovergyApi api;
+
+ private final OAuth10aService authenticationService;
+ private final OAuth1AccessToken accessToken;
+
+ public DiscovergyApiClient(String clientId) throws InterruptedException, ExecutionException, IOException {
+ this(createDiscovergyApi(), clientId);
+ }
+
+ public DiscovergyApiClient(DiscovergyApi api, String clientId)
+ throws InterruptedException, ExecutionException, IOException {
+ this.api = api;
+ this.clientId = clientId;
+ Map consumerTokenEntries = getConsumerToken();
+ authenticationService = new ServiceBuilder(consumerTokenEntries.get("key"))
+ .apiSecret(consumerTokenEntries.get("secret")).build(api);
+ OAuth1RequestToken requestToken = authenticationService.getRequestToken();
+ String authorizationURL = authenticationService.getAuthorizationUrl(requestToken);
+ String verifier = authorize(authorizationURL);
+ accessToken = authenticationService.getAccessToken(requestToken, verifier);
+ }
+
+ private static DiscovergyApi createDiscovergyApi() throws IOException {
+ File file = new File("credentials.properties").getAbsoluteFile();
+ Properties properties = new Properties();
+ try (Reader reader = new InputStreamReader(new FileInputStream(file),
+ UTF_8.newDecoder().onMalformedInput(REPORT).onUnmappableCharacter(REPORT))) {
+ properties.load(reader);
+ } catch (IOException e) {
+ throw new IOException("Failed to read credentials from file " + file, e);
+ }
+ String email = properties.getProperty("email");
+ String password = properties.getProperty("password");
+ if (email == null || email.isEmpty() || password == null || password.isEmpty()) {
+ throw new RuntimeException("The properties \"email\" and \"password\" must be set in file " + file);
+ }
+ return new DiscovergyApi(email, password);
+ }
+
+ public DiscovergyApi getApi() {
+ return api;
+ }
+
+ public OAuthRequest createRequest(Verb verb, String endpoint)
+ throws InterruptedException, ExecutionException, IOException {
+ return new OAuthRequest(verb, api.getBaseAddress() + endpoint);
+ }
+
+ public Response executeRequest(OAuthRequest request) throws InterruptedException, ExecutionException, IOException {
+ authenticationService.signRequest(accessToken, request);
+ return authenticationService.execute(request);
+ }
+
+ public Response executeRequest(OAuthRequest request, int expectedStatusCode)
+ throws InterruptedException, ExecutionException, IOException {
+ Response response = executeRequest(request);
+ if (response.getCode() != expectedStatusCode) {
+ response.getBody();
+ throw new RuntimeException("Status code is not " + expectedStatusCode + ": " + response);
+ }
+ return response;
+ }
+
+ private Map getConsumerToken() throws IOException {
+ byte[] rawRequest = ("client=" + clientId).getBytes(StandardCharsets.UTF_8);
+ HttpURLConnection connection = getConnection(api.getBaseAddress() + "/oauth1/consumer_token", "POST", true,
+ true);
+ connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
+ connection.setRequestProperty("Content-Length", Integer.toString(rawRequest.length));
+ connection.connect();
+ connection.getOutputStream().write(rawRequest);
+ connection.getOutputStream().flush();
+ String content = StreamUtils.getStreamContents(connection.getInputStream());
+ connection.disconnect();
+
+ return new Gson().fromJson(content, Map.class);
+ }
+
+ private static String authorize(String authorizationURL) throws IOException {
+ HttpURLConnection connection = getConnection(authorizationURL, "GET", true, false);
+ connection.connect();
+ String content = StreamUtils.getStreamContents(connection.getInputStream());
+ connection.disconnect();
+ return content.substring(content.indexOf('=') + 1);
+ }
+
+ private static HttpURLConnection getConnection(String rawURL, String method, boolean doInput, boolean doOutput)
+ throws IOException {
+ URL url = new URL(rawURL);
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setDoInput(doInput);
+ connection.setDoOutput(doOutput);
+ connection.setRequestMethod(method);
+ connection.setRequestProperty("Accept", "*");
+ connection.setInstanceFollowRedirects(false);
+ connection.setRequestProperty("charset", "utf-8");
+ connection.setUseCaches(false);
+ return connection;
+ }
+}
diff --git a/src/eu/gruning/discofox/apiclient/DiscovergyApiEngine.java b/src/eu/gruning/discofox/apiclient/DiscovergyApiEngine.java
new file mode 100644
index 0000000..45bc7ef
--- /dev/null
+++ b/src/eu/gruning/discofox/apiclient/DiscovergyApiEngine.java
@@ -0,0 +1,90 @@
+package eu.gruning.discofox.apiclient;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.github.scribejava.core.model.OAuthRequest;
+import com.github.scribejava.core.model.Verb;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.reflect.TypeToken;
+
+import eu.gruning.discofox.apiobjects.Meter;
+import eu.gruning.discofox.apiobjects.Reading;
+import eu.gruning.discofox.apiobjects.Readings;
+import eu.gruning.discofox.internal.Configuration;
+
+public class DiscovergyApiEngine {
+ private final DiscovergyApi api;
+ private final DiscovergyApiClient client;
+ private final Type listMeterType = new TypeToken>() {
+ }.getType();
+ private static final Logger logger = LogManager.getLogger(DiscovergyApiEngine.class);
+
+ public DiscovergyApiEngine(Configuration config) throws IOException, InterruptedException, ExecutionException {
+ this.api = new DiscovergyApi(config.getUser(), config.getPassword());
+ this.client = new DiscovergyApiClient(api, config.getClientId());
+ logger.info("Sucessfully connected to Discovergy API at " + config.getaWATTarBaseURL() + " with user " + config.getUser());
+ }
+
+ public List getMeters() throws IOException, InterruptedException, ExecutionException {
+ return new Gson().fromJson(client.executeRequest(client.createRequest(Verb.GET, "/meters"), 200).getBody(), listMeterType);
+ }
+
+ public Reading getLastReading(String meterId) throws IOException, InterruptedException, ExecutionException {
+ OAuthRequest request = client.createRequest(Verb.GET, "/last_reading");
+ request.addQuerystringParameter("meterId", meterId);
+ Reading reading;
+ try {
+ reading = new Gson().fromJson(client.executeRequest(request, 200).getBody(), Reading.class);
+ reading.setMeterId(meterId);
+ } catch (JsonSyntaxException e) {
+ // in case of syntax exception return empty object
+ reading = new Reading();
+ }
+ return reading;
+ }
+
+ private Readings getReadingsInternal(String meterId, long start, long end) throws IOException, InterruptedException, ExecutionException {
+ OAuthRequest request = client.createRequest(Verb.GET, "/readings");
+ request.addQuerystringParameter("meterId", meterId);
+ request.addQuerystringParameter("resolution", "raw");
+ request.addQuerystringParameter("from", String.valueOf(start));
+ request.addQuerystringParameter("to", String.valueOf(end));
+ logger.debug("Getting readings for meter " + meterId + ", resolution: raw, from: " + start + " to " + end);
+ String result = client.executeRequest(request, 200).getBody();
+ logger.debug("Result: " + result);
+ Readings readings = new Readings();
+ try {
+ Reading[] r = new Gson().fromJson(result, Reading[].class);
+ readings.setReadings(Arrays.asList(r));
+ readings.setMeterId(meterId);
+ } catch (JsonSyntaxException e) {
+ logger.info("JSON conversion of readings failed: " + e.getMessage());
+ // in case of syntax exception return empty object
+ }
+ return readings;
+ }
+
+ public Readings getReadings(String meterId, long start, long end) throws IOException, InterruptedException, ExecutionException {
+ // Handle situations where a day has 25 hours which can happen with switching to
+ // daylight saving time
+ // The Discovergy API will refuse any request with more than 24 hours between
+ // start and end
+ // In this special case we will first query 24 hours and then another time 1
+ // hour
+ if (end - start <= 86400000) {
+ return getReadingsInternal(meterId, start, end);
+ } else {
+ Readings readings = getReadingsInternal(meterId, start, start + 86399999);
+ readings.append(getReadingsInternal(meterId, start + 86400000, end));
+ return readings;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/apiclient/aWATTarApiEngine.java b/src/eu/gruning/discofox/apiclient/aWATTarApiEngine.java
new file mode 100644
index 0000000..898cc66
--- /dev/null
+++ b/src/eu/gruning/discofox/apiclient/aWATTarApiEngine.java
@@ -0,0 +1,89 @@
+package eu.gruning.discofox.apiclient;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodyHandlers;
+import java.time.Duration;
+import java.time.Instant;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
+import eu.gruning.discofox.apiobjects.aWATTarPricings;
+import eu.gruning.discofox.internal.Configuration;
+
+public class aWATTarApiEngine {
+
+ private HttpClient client = HttpClient.newHttpClient();
+ private HttpRequest request;
+ private String apiurl = "https://api.awattar.de/v1/marketdata";
+ private String agent = "discofox";
+ private static final Logger logger = LogManager.getLogger(aWATTarApiEngine.class);
+
+ private long callinterval = Instant.now().getEpochSecond();
+ private long callcounter = 0;
+
+ public aWATTarApiEngine(Configuration config) {
+ this.apiurl = config.getaWATTarBaseURL();
+ this.agent = config.getClientId();
+ }
+
+ public aWATTarPricings query(long start, long end) throws InterruptedException, IOException, URISyntaxException {
+ return this.query(new URI(apiurl + "?start=" + start + "&end=" + end));
+ }
+
+ public aWATTarPricings query(long start) throws InterruptedException, IOException, URISyntaxException {
+ return this.query(new URI(apiurl + "?start=" + start));
+ }
+
+ public aWATTarPricings query() throws InterruptedException, IOException, URISyntaxException {
+ return this.query(new URI(apiurl));
+ }
+
+ // Make sure we don't exceed the fair use of the aWATTar API
+ private void throttle() throws InterruptedException {
+ double speed = (double) callcounter / (double) (Instant.now().getEpochSecond() - callinterval);
+ // more than 1 API query per second? If yes, we will sleep for a second
+ if (speed > 1) {
+ logger.info("Throttling to not violate the aWATTar API Fair Use Policy (1 API call per second)");
+ Thread.sleep(1000);
+ }
+ }
+
+ private aWATTarPricings query(URI uri) throws InterruptedException, IOException {
+ request = HttpRequest.newBuilder().GET().uri(uri).timeout(Duration.ofSeconds(10)).setHeader("User-Agent", agent).build();
+ HttpResponse response = client.send(request, BodyHandlers.ofString());
+ callcounter++;
+ aWATTarPricings pricings = new aWATTarPricings();
+
+ if (response.statusCode() == 200) {
+ logger.debug("Response Body: " + response.body());
+ try {
+ pricings = new Gson().fromJson(response.body(), aWATTarPricings.class);
+ } catch (JsonSyntaxException e) {
+ logger.info("JSON conversion of readings failed: " + e.getMessage());
+ }
+ } else {
+ logger.error("aWATTar API returned status code " + response.statusCode());
+ if (response.statusCode() == 429) {
+ long exceededtime = Instant.now().getEpochSecond() - callinterval;
+ logger.info("Status code 429 means we have exceeded the fair use of the API (60 Calls/Minute), did " + callcounter + " in " + exceededtime + " seconds");
+ logger.info("Sleeping for " + (60 - exceededtime) + " seconds");
+ Thread.sleep((60 - exceededtime) * 1000);
+ callinterval = Instant.now().getEpochSecond();
+ callcounter = 0;
+ // Retry the query after sleeping
+ return this.query(uri);
+ }
+ }
+ throttle();
+ return pricings;
+ }
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/apiobjects/Meter.java b/src/eu/gruning/discofox/apiobjects/Meter.java
new file mode 100644
index 0000000..418ed5f
--- /dev/null
+++ b/src/eu/gruning/discofox/apiobjects/Meter.java
@@ -0,0 +1,111 @@
+package eu.gruning.discofox.apiobjects;
+
+import java.util.Map;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import com.google.gson.Gson;
+
+import eu.gruning.discofox.mqtt.MqttEngine;
+
+public class Meter {
+ public String getSerialNumber() {
+ return serialNumber;
+ }
+
+ public String getFullSerialNumber() {
+ return fullSerialNumber;
+ }
+
+ public Map getLocation() {
+ return location;
+ }
+
+ public String getAdministrationNumber() {
+ return administrationNumber;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public String getMeasurementType() {
+ return measurementType;
+ }
+
+ public String getLoadProfileType() {
+ return loadProfileType;
+ }
+
+ public int getScalingFactor() {
+ return scalingFactor;
+ }
+
+ public int getCurrentScalingFactor() {
+ return currentScalingFactor;
+ }
+
+ public int getVoltageScalingFactor() {
+ return voltageScalingFactor;
+ }
+
+ public int getInternalMeters() {
+ return internalMeters;
+ }
+
+ public long getFirstMeasurementTime() {
+ return firstMeasurementTime;
+ }
+
+ public long getLastMeasurementTime() {
+ return lastMeasurementTime;
+ }
+
+ public String getMeterId() {
+ return meterId;
+ }
+
+ public String getManufacturerId() {
+ return manufacturerId;
+ }
+
+ public String getMeasurement() {
+ return measurement;
+ }
+
+ @Override
+ public String toString() {
+ return ("ID " + meterId + ", Measurement " + measurementType + ", Serial " + serialNumber);
+
+ }
+
+ public String getTopic() {
+ return "discovergy/meters";
+ }
+
+ public void publish(MqttEngine mqtt) throws MqttException {
+ mqtt.publish(this.getTopic(), this.asJson());
+ }
+
+ public String asJson() {
+ return new Gson().toJson(this);
+ }
+
+ private String meterId;
+ private String manufacturerId;
+ private String serialNumber;
+ private String fullSerialNumber;
+ private Map location;
+ private String administrationNumber;
+ private String type;
+ private String measurementType;
+ private String loadProfileType;
+ private int scalingFactor;
+ private int currentScalingFactor;
+ private int voltageScalingFactor;
+ private int internalMeters;
+ private long firstMeasurementTime;
+ private long lastMeasurementTime;
+ private String measurement = "Meter";
+
+}
diff --git a/src/eu/gruning/discofox/apiobjects/Reading.java b/src/eu/gruning/discofox/apiobjects/Reading.java
new file mode 100644
index 0000000..be8975e
--- /dev/null
+++ b/src/eu/gruning/discofox/apiobjects/Reading.java
@@ -0,0 +1,90 @@
+package eu.gruning.discofox.apiobjects;
+
+import java.time.Instant;
+import java.util.Date;
+import java.util.Map;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import com.google.gson.Gson;
+
+import eu.gruning.discofox.mqtt.MqttEngine;
+
+public class Reading {
+ private String meterId;
+ private long time;
+ private Map values;
+ private double price;
+ private String unit;
+
+ public Reading() {
+ }
+
+ public long getTime() {
+ return time;
+ }
+
+ public Date getDate() {
+ return new Date(time);
+ }
+
+ public Map getValues() {
+ return values;
+ }
+
+ public String getMeterId() {
+ return meterId;
+ }
+
+ public void setMeterId(String meterId) {
+ this.meterId = meterId;
+ }
+
+ // Return age in Seconds
+ public long getAge() {
+ return Instant.now().getEpochSecond() - time / 1000;
+ }
+
+ public String getTopic() {
+ return "discovergy/" + meterId + "/readings";
+ }
+
+ public void publish(MqttEngine mqttengine) throws MqttException {
+ if (mqttengine.isConnected(MqttEngine.RECONNECT)) {
+ mqttengine.publish(this.getTopic(), this.asJson());
+ }
+ }
+
+ public String asJson() {
+ return new Gson().toJson(this);
+ }
+
+ public void addPricing(aWATTarPricings pricings) {
+ aWATTarPricing pricing = pricings.getPriceUnit(time);
+ if (pricing.isValid()) {
+ setUnit(pricing.getUnit());
+ setPrice(pricing.getMarketprice());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return meterId + "/" + time;
+ }
+
+ public double getPrice() {
+ return price;
+ }
+
+ public void setPrice(double price) {
+ this.price = price;
+ }
+
+ public String getUnit() {
+ return unit;
+ }
+
+ public void setUnit(String unit) {
+ this.unit = unit;
+ }
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/apiobjects/Readings.java b/src/eu/gruning/discofox/apiobjects/Readings.java
new file mode 100644
index 0000000..4a73daf
--- /dev/null
+++ b/src/eu/gruning/discofox/apiobjects/Readings.java
@@ -0,0 +1,52 @@
+package eu.gruning.discofox.apiobjects;
+
+import java.util.List;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import eu.gruning.discofox.mqtt.MqttEngine;
+
+public class Readings {
+ private List readings;
+
+ public Readings() {
+ }
+
+ public List getReadings() {
+ return readings;
+ }
+
+ public long size() {
+ return readings.size();
+ }
+
+ public void setReadings(List readings) {
+ this.readings = readings;
+ }
+
+ public void setMeterId(String meterId) {
+ for (Reading r : readings) {
+ r.setMeterId(meterId);
+ }
+ }
+
+ public String getTopic() {
+ return readings.get(0).getTopic();
+ }
+
+ public void publish(MqttEngine mqttengine) throws MqttException {
+ for (Reading r : readings) {
+ r.publish(mqttengine);
+ }
+ }
+
+ public void addPricing(aWATTarPricings pricings) {
+ for (Reading r : readings) {
+ r.addPricing(pricings);
+ }
+ }
+
+ public void append(Readings readings) {
+ this.readings.addAll(readings.readings);
+ }
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/apiobjects/aWATTarPricing.java b/src/eu/gruning/discofox/apiobjects/aWATTarPricing.java
new file mode 100644
index 0000000..13ac1d0
--- /dev/null
+++ b/src/eu/gruning/discofox/apiobjects/aWATTarPricing.java
@@ -0,0 +1,86 @@
+package eu.gruning.discofox.apiobjects;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import com.google.gson.Gson;
+import com.google.gson.annotations.Expose;
+
+import eu.gruning.discofox.mqtt.MqttEngine;
+
+public class aWATTarPricing {
+
+ public static aWATTarPricing NOT_FOUND = new aWATTarPricing(false);
+
+ private long start_timestamp;
+ private long end_timestamp;
+ private double marketprice;
+ private String unit;
+ private String measurement = "EnergyPricing";
+
+ @Expose(serialize = false, deserialize = false)
+ private Boolean isvalid = true;
+
+ public String getMeasurement() {
+ return measurement;
+ }
+
+ public void setMeasurement(String measurement) {
+ this.measurement = measurement;
+ }
+
+ public aWATTarPricing() {
+ }
+
+ public long getStartTimestamp() {
+ return start_timestamp;
+ }
+
+ public void setStartTimestamp(long startTimestamp) {
+ this.start_timestamp = startTimestamp;
+ }
+
+ public long getEndTimestamp() {
+ return end_timestamp;
+ }
+
+ public void setEndTimestamp(long endTimestamp) {
+ this.end_timestamp = endTimestamp;
+ }
+
+ public double getMarketprice() {
+ return marketprice;
+ }
+
+ public void setMarketprice(double marketprice) {
+ this.marketprice = marketprice;
+ }
+
+ public String getUnit() {
+ return unit;
+ }
+
+ public void setUnit(String unit) {
+ this.unit = unit;
+ }
+
+ public String getTopic() {
+ return "awattar/pricing";
+ }
+
+ public void publish(MqttEngine mqtt) throws MqttException {
+ mqtt.publish(this.getTopic(), this.asJson());
+ }
+
+ public String asJson() {
+ return new Gson().toJson(this);
+ }
+
+ public Boolean isValid() {
+ return isvalid;
+ }
+
+ // Creates an pricing object with non-default validity
+ public aWATTarPricing(Boolean valid) {
+ this.isvalid = valid;
+ }
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/apiobjects/aWATTarPricings.java b/src/eu/gruning/discofox/apiobjects/aWATTarPricings.java
new file mode 100644
index 0000000..708dd51
--- /dev/null
+++ b/src/eu/gruning/discofox/apiobjects/aWATTarPricings.java
@@ -0,0 +1,77 @@
+package eu.gruning.discofox.apiobjects;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import eu.gruning.discofox.mqtt.MqttEngine;
+
+public class aWATTarPricings {
+ private String object;
+ private String url;
+ private List data = new ArrayList<>();
+
+ public aWATTarPricings() {
+ }
+
+ public String getObject() {
+ return object;
+ }
+
+ public void setObject(String object) {
+ this.object = object;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public List getPricings() {
+ return data;
+ }
+
+ public void setPricings(List pricings) {
+ this.data = pricings;
+ }
+
+ public int size() {
+ return data.size();
+ }
+
+ public String getTopic() {
+ return data.get(0).getTopic();
+ }
+
+ public void publish(MqttEngine mqttengine) throws MqttException {
+ if (mqttengine.isConnected(MqttEngine.RECONNECT)) {
+ for (aWATTarPricing pricing : data) {
+ pricing.publish(mqttengine);
+ }
+ }
+ }
+
+ public long getStartTimestamp() {
+ return data.get(0).getStartTimestamp();
+ }
+
+ public long getEndTimestamp() {
+ return data.get(data.size() - 1).getEndTimestamp();
+ }
+
+ public aWATTarPricing getPriceUnit(long timestamp) {
+ aWATTarPricing match = aWATTarPricing.NOT_FOUND;
+ for (aWATTarPricing pricing : data) {
+ if (timestamp >= pricing.getStartTimestamp() && timestamp <= pricing.getEndTimestamp()) {
+ // Found a valid price period
+ match = pricing;
+ break;
+ }
+ }
+ return match;
+ }
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/internal/Configuration.java b/src/eu/gruning/discofox/internal/Configuration.java
new file mode 100644
index 0000000..d4400bf
--- /dev/null
+++ b/src/eu/gruning/discofox/internal/Configuration.java
@@ -0,0 +1,117 @@
+package eu.gruning.discofox.internal;
+
+import java.io.Reader;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+
+public class Configuration {
+ private String DiscovergyBaseURL = "https://api.discovergy.com/public/v1";
+ private String aWATTarBaseURL = "https://api.awattar.de/v1/marketdata";
+ private String clientId = "discofox";
+ private String User;
+ private String Password;
+ private Boolean Debug = false;
+ private String Broker = "tcp://localhost:1883";
+ @SerializedName("aWATTarPrices")
+ private Boolean aWATTarPrices = false;
+
+ public Boolean getaWATTarPrices() {
+ return aWATTarPrices;
+ }
+
+ public void setaWATTarPrices(Boolean aWATTarPrices) {
+ this.aWATTarPrices = aWATTarPrices;
+ }
+
+ public String getBroker() {
+ return Broker;
+ }
+
+ public void setBroker(String broker) {
+ Broker = broker;
+ }
+
+ public String getDiscovergyBaseURL() {
+ return DiscovergyBaseURL;
+ }
+
+ public String getaWATTarBaseURL() {
+ return aWATTarBaseURL;
+ }
+
+ public String getUser() {
+ return User;
+ }
+
+ public String getPassword() {
+ return Password;
+ }
+
+ public Boolean getDebug() {
+ return Debug;
+ }
+
+ public void setDiscovergyBaseURL(String baseURL) {
+ this.DiscovergyBaseURL = baseURL;
+ }
+
+ public void setaWATTarBaseURL(String baseURL) {
+ this.aWATTarBaseURL = baseURL;
+ }
+
+ public void setUser(String user) {
+ User = user;
+ }
+
+ public void setPassword(String password) {
+ Password = password;
+ }
+
+ public void setDebug(Boolean debug) {
+ Debug = debug;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public Configuration() {
+ }
+
+ public Configuration(Configuration config) {
+ this.DiscovergyBaseURL = config.DiscovergyBaseURL;
+ this.aWATTarBaseURL = config.aWATTarBaseURL;
+ this.clientId = config.clientId;
+ this.Debug = config.Debug;
+ this.Password = config.Password;
+ this.User = config.User;
+ this.Broker = config.Broker;
+ }
+
+ public Configuration(Reader reader) {
+ this(new Gson().fromJson(reader, Configuration.class));
+ }
+
+ public Configuration(String json) {
+ this(new Gson().fromJson(json, Configuration.class));
+ }
+
+ public String asJson() {
+ return new Gson().toJson(this);
+ }
+
+ public String asPrettyJson() {
+ Gson gson = new GsonBuilder().setPrettyPrinting().create();
+ return gson.toJson(this);
+ }
+
+ public boolean isSane() {
+ return (DiscovergyBaseURL != null && aWATTarBaseURL != null && clientId != null && Debug != null && Password != null && User != null && Broker != null);
+ }
+}
diff --git a/src/eu/gruning/discofox/internal/Discotime.java b/src/eu/gruning/discofox/internal/Discotime.java
new file mode 100644
index 0000000..5ee093c
--- /dev/null
+++ b/src/eu/gruning/discofox/internal/Discotime.java
@@ -0,0 +1,22 @@
+package eu.gruning.discofox.internal;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+
+public class Discotime {
+ private static final String timezone = "Europe/Berlin";
+
+ public static long beginOf(LocalDate date) {
+ return LocalDateTime.of(date, LocalTime.MIDNIGHT).atZone(ZoneId.of(timezone)).toInstant().toEpochMilli();
+ }
+
+ public static long endOf(LocalDate date) {
+ return LocalDateTime.of(date, LocalTime.MAX).atZone(ZoneId.of(timezone)).toInstant().toEpochMilli();
+ }
+
+ public static LocalTime LocalTimeGermany() {
+ return LocalTime.now(ZoneId.of(timezone));
+ }
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/internal/Version.java b/src/eu/gruning/discofox/internal/Version.java
new file mode 100644
index 0000000..fb5b7bc
--- /dev/null
+++ b/src/eu/gruning/discofox/internal/Version.java
@@ -0,0 +1,7 @@
+package eu.gruning.discofox.internal;
+
+public class Version {
+ public String getVersion() {
+ return this.getClass().getPackage().getImplementationVersion();
+ }
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/main/Discofox.java b/src/eu/gruning/discofox/main/Discofox.java
new file mode 100644
index 0000000..3396e55
--- /dev/null
+++ b/src/eu/gruning/discofox/main/Discofox.java
@@ -0,0 +1,187 @@
+package eu.gruning.discofox.main;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.time.DateTimeException;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import eu.gruning.discofox.apiclient.DiscovergyApiEngine;
+import eu.gruning.discofox.apiclient.aWATTarApiEngine;
+import eu.gruning.discofox.apiobjects.Meter;
+import eu.gruning.discofox.apiobjects.aWATTarPricings;
+import eu.gruning.discofox.internal.Configuration;
+import eu.gruning.discofox.internal.Discotime;
+import eu.gruning.discofox.internal.Version;
+import eu.gruning.discofox.mqtt.MqttEngine;
+import eu.gruning.discofox.task.ReadingBatchTask;
+import eu.gruning.discofox.task.ReadingTask;
+
+public class Discofox {
+
+ private static final String configfile = "configuration.json";
+ private static Configuration config;
+ private static final Logger logger = LogManager.getLogger(Discofox.class);
+ private static DiscovergyApiEngine meterapiengine;
+ private static aWATTarApiEngine priceapiengine;
+ private static MqttEngine mqttengine;
+ private static List meters;
+
+ public static void readConfiguration() {
+ logger.info("Startup Discofox " + new Version().getVersion());
+ logger.info("Reading configfile '" + configfile + "'");
+ // Try to read JSON-formatted configuration file
+ try {
+ Reader reader = Files.newBufferedReader(Paths.get(configfile));
+ config = new Configuration(reader);
+ reader.close();
+ logger.info("Sucessfully read configfile");
+ } catch (Exception e) {
+ logger.fatal("Fatal error reading configfile: " + e.getMessage());
+ System.exit(1);
+ }
+ // Check if all required configuration fields are populated
+ if (!config.isSane()) {
+ logger.fatal("Configuration malformed or incomplete");
+ System.exit(2);
+ }
+ }
+
+ private static void infiniteReadingLoop() throws InterruptedException {
+ // Loop over all meters and start a single infinite reading task for each
+ ExecutorService taskservice = Executors.newFixedThreadPool(meters.size());
+ for (Meter meter : meters) {
+ logger.info("Starting thread for meter " + meter.getMeterId());
+ taskservice.submit(new ReadingTask(meter, config, meterapiengine, priceapiengine, mqttengine));
+ }
+ // This will wait for the tasks to end
+ taskservice.shutdown();
+ taskservice.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+ }
+
+ private static void batchReadingLoop(LocalDate start, LocalDate end) throws InterruptedException, IOException, URISyntaxException {
+ for (LocalDate date = start; date.isBefore(end) || date.isEqual(end); date = date.plusDays(1)) {
+ long ts_midnight = Discotime.beginOf(date);
+ long ts_endofday = Discotime.endOf(date);
+ logger.info("Retrieving data for the full day " + date + " (timestamps " + ts_midnight + " to " + ts_endofday + ")");
+ aWATTarPricings pricings = new aWATTarPricings();
+ if (config.getaWATTarPrices()) {
+ pricings = priceapiengine.query(ts_midnight, ts_endofday);
+ logger.info("Got " + pricings.size() + " pricings from aWATTar");
+ try {
+ pricings.publish(mqttengine);
+ logger.info("Published to topic " + pricings.getTopic());
+ } catch (MqttException e) {
+ logger.error("Publishing pricings failed: " + e.getMessage());
+ }
+ }
+ ExecutorService taskservice = Executors.newFixedThreadPool(meters.size());
+ for (Meter meter : meters) {
+ logger.info("Publishing meter " + meter.getMeterId() + " to topic " + meter.getTopic());
+ try {
+ meter.publish(mqttengine);
+ } catch (MqttException e) {
+ logger.error("Publishing meter failed.");
+ }
+ if (config.getaWATTarPrices()) {
+ logger.info("Starting batch thread for meter " + meter.getMeterId() + " with merged pricing");
+ taskservice.submit(new ReadingBatchTask(meter, ts_midnight, ts_endofday, pricings, meterapiengine, mqttengine));
+ } else {
+ logger.info("Starting batch thread for meter " + meter.getMeterId());
+ taskservice.submit(new ReadingBatchTask(meter, ts_midnight, ts_endofday, meterapiengine, mqttengine));
+ }
+ }
+
+ // This will wait for the tasks to end
+ taskservice.shutdown();
+ taskservice.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
+ }
+ }
+
+ public static void main(String[] args) {
+ readConfiguration();
+ try {
+ // Kick off the connection to the API
+ logger.info("Connecting to to Discovergy API at " + config.getDiscovergyBaseURL() + " with user " + config.getUser());
+ meterapiengine = new DiscovergyApiEngine(config);
+ priceapiengine = new aWATTarApiEngine(config);
+ logger.info("ConfigJSON: " + config.asJson());
+ config.setaWATTarPrices(true);
+ if (config.getaWATTarPrices()) {
+ logger.info("aWATTar API is at " + config.getaWATTarBaseURL());
+ }
+
+ // Discover all the available meters and log some info
+ meters = meterapiengine.getMeters();
+ logger.info("Found meters: " + meters.size());
+ if (meters.size() == 0) {
+ logger.fatal("No meter found");
+ System.exit(3);
+ } else {
+ for (Meter m : meters) {
+ logger.info("Meter: " + m);
+ logger.debug("Meter asJSON: " + m.asJson());
+ }
+ }
+
+ // Build MQTT connection
+ logger.info("Connecting to MQTT broker at " + config.getBroker());
+ mqttengine = new MqttEngine(config.getBroker(), config.getClientId());
+ if (mqttengine.isConnected()) {
+ logger.info("Sucessfully connected to MQTT broker");
+ } else {
+ logger.fatal("Could not connect to MQTT broker. Exiting.");
+ System.exit(99);
+ }
+
+ // When no arguments are passed discofox will poll all meters infinitely
+ if (args.length == 0) {
+ // Make sure that we have all prior readings for today by default
+ logger.info("Step 1: Getting all prior readings for today to ensure completeness");
+ batchReadingLoop(LocalDate.now(), LocalDate.now());
+ // Kickoff the required infinite tasks
+ logger.info("Step 2: Starting infinite reading loops");
+ infiniteReadingLoop();
+ logger.info("Tasks have completed. Exiting.");
+ // Some cleanup to exit gracefully
+ mqttengine.disconnect();
+ System.exit(0);
+ } else {
+ try {
+ LocalDate date1 = LocalDate.parse(args[0], DateTimeFormatter.BASIC_ISO_DATE);
+ LocalDate date2 = date1;
+ if (args.length > 1) {
+ date2 = LocalDate.parse(args[1], DateTimeFormatter.BASIC_ISO_DATE);
+ }
+ // Kickoff the required tasks
+ batchReadingLoop(date1, date2);
+ logger.debug("Tasks have completed");
+ mqttengine.disconnect();
+ System.exit(0);
+ } catch (DateTimeException e) {
+ logger.error("Cannot parse date");
+ System.out.println("Usage: discofox [DATE1] [DATE2]");
+ System.out.println("When no arguments are passed discofox will read all meters infinitely (DEFAULT)");
+ System.out.println("DATE1 : get all readings for {date1}, then exit (Batch mode)");
+ System.out.println("DATE1 DATE2 : get all readings from {date1} to {date2}, then exit (Batch mode)");
+ System.out.println("Dates should be in BASIC_ISO_DATE (YYYYMMDD) format, e.g. 20200729");
+ System.exit(99);
+ }
+ }
+ } catch (Exception e) {
+ logger.fatal("Error: " + e.getMessage());
+ System.exit(99);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/mqtt/MqttEngine.java b/src/eu/gruning/discofox/mqtt/MqttEngine.java
new file mode 100644
index 0000000..02a6388
--- /dev/null
+++ b/src/eu/gruning/discofox/mqtt/MqttEngine.java
@@ -0,0 +1,67 @@
+package eu.gruning.discofox.mqtt;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+public class MqttEngine {
+ private final String broker;
+ private final String clientId;
+ private final MemoryPersistence persistence = new MemoryPersistence();
+ private final MqttClient client;
+ private final int qos = 2;
+ private final MqttConnectOptions connOpts;
+ public static final Boolean RECONNECT = true;
+
+ public MqttEngine(String broker, String clientId) throws MqttException {
+ this.broker = broker;
+ this.clientId = clientId;
+
+ // Setup MQTT Client connection
+ client = new MqttClient(this.broker, this.clientId, persistence);
+ connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(true);
+ client.connect(connOpts);
+ }
+
+ public void publish(String topic, String content) throws MqttException {
+ MqttMessage message = new MqttMessage(content.getBytes());
+ message.setQos(qos);
+ client.publish(topic, message);
+ }
+
+ public void disconnect() throws MqttException {
+ client.disconnect();
+ }
+
+ public void connect() {
+ if (!client.isConnected()) {
+ try {
+ client.connect(connOpts);
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ public boolean isConnected() {
+ return client.isConnected();
+ }
+
+ public boolean isConnected(Boolean reconnect) {
+ if (reconnect == RECONNECT) {
+ for (int i = 0; i < 3; i++) {
+ this.connect();
+ if (client.isConnected()) {
+ break;
+ }
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ return client.isConnected();
+ }
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/task/ReadingBatchTask.java b/src/eu/gruning/discofox/task/ReadingBatchTask.java
new file mode 100644
index 0000000..1e46bcd
--- /dev/null
+++ b/src/eu/gruning/discofox/task/ReadingBatchTask.java
@@ -0,0 +1,85 @@
+package eu.gruning.discofox.task;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import eu.gruning.discofox.apiclient.DiscovergyApiEngine;
+import eu.gruning.discofox.apiobjects.Meter;
+import eu.gruning.discofox.apiobjects.Readings;
+import eu.gruning.discofox.apiobjects.aWATTarPricings;
+import eu.gruning.discofox.mqtt.MqttEngine;
+
+public class ReadingBatchTask implements Runnable {
+
+ private long start, end;
+ private String taskname;
+ private Meter meter;
+ private DiscovergyApiEngine apiengine;
+ private MqttEngine mqttengine;
+ private Boolean dopricing = false;
+ private aWATTarPricings pricings;
+
+ private static final Logger logger = LogManager.getLogger(ReadingTask.class);
+
+ public ReadingBatchTask(Meter meter, long start, long end, DiscovergyApiEngine apiengine, MqttEngine mqttengine) {
+ this.meter = meter;
+ this.apiengine = apiengine;
+ this.mqttengine = mqttengine;
+ this.start = start;
+ this.end = end;
+ this.taskname = "batchtask-" + this.meter.getMeterId() + "-" + start + "-" + end;
+ }
+
+ public ReadingBatchTask(Meter meter, long start, long end, aWATTarPricings pricings, DiscovergyApiEngine apiengine, MqttEngine mqttengine) {
+ this(meter, start, end, apiengine, mqttengine);
+ dopricing = true;
+ this.pricings = pricings;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName(taskname);
+ logger.info("Task '" + taskname + "' started");
+ // Make sure the MQTT engine is connected, methods handles the necessary
+ // reconnect
+ if (!mqttengine.isConnected(MqttEngine.RECONNECT)) {
+ logger.error("MQTT connection is disconnected and cannot be reestablished. Exiting Task.");
+ return;
+ }
+
+ Readings readings = new Readings();
+ try {
+ readings = apiengine.getReadings(meter.getMeterId(), start, end);
+ logger.info("Got " + readings.size() + " readings");
+ if (readings.size() > 0) {
+ if (dopricing) {
+ logger.info("Merging prices with readings and publishing");
+ readings.addPricing(pricings);
+ } else {
+ logger.info("Publishing");
+ }
+ readings.publish(mqttengine);
+ logger.info("Published " + readings.size() + " readings to topic " + readings.getTopic());
+ }
+ } catch (Exception e) {
+ logger.error("Reading task '" + taskname + "' failed with exception: " + e.getMessage() + " (Class " + e.getClass().getName() + ")");
+ if (e.getClass() == MqttException.class) {
+ logger.error("MQTT connection has a problem. Trying to fix.");
+ // Try to fix MQTT connection
+ if (!mqttengine.isConnected(MqttEngine.RECONNECT)) {
+ logger.error("MQTT connection is disconnected and cannot be reestablished. Exiting Task.");
+ return;
+ } else {
+ logger.info("MQTT connection is reestablished. Republishing.");
+ try {
+ readings.publish(mqttengine);
+ } catch (MqttException f) {
+ }
+ logger.info("Published " + readings.size() + " readings to topic " + readings.getTopic());
+ }
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/eu/gruning/discofox/task/ReadingTask.java b/src/eu/gruning/discofox/task/ReadingTask.java
new file mode 100644
index 0000000..4feac68
--- /dev/null
+++ b/src/eu/gruning/discofox/task/ReadingTask.java
@@ -0,0 +1,137 @@
+package eu.gruning.discofox.task;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.time.LocalDate;
+import java.time.LocalTime;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import eu.gruning.discofox.apiclient.DiscovergyApiEngine;
+import eu.gruning.discofox.apiclient.aWATTarApiEngine;
+import eu.gruning.discofox.apiobjects.Meter;
+import eu.gruning.discofox.apiobjects.Reading;
+import eu.gruning.discofox.apiobjects.aWATTarPricings;
+import eu.gruning.discofox.internal.Configuration;
+import eu.gruning.discofox.internal.Discotime;
+import eu.gruning.discofox.mqtt.MqttEngine;
+
+public class ReadingTask implements Runnable {
+
+ private String taskname;
+ private Meter meter;
+ private DiscovergyApiEngine meterapiengine;
+ private aWATTarApiEngine priceapiengine;
+ private MqttEngine mqttengine;
+ private Configuration config;
+ private aWATTarPricings pricings = new aWATTarPricings();
+ private static final Logger logger = LogManager.getLogger(ReadingTask.class);
+ private int errorcounter = 0;
+ private static boolean SUCCESS = true;
+ private LocalTime aWATTarPricesForTomorrowAvailable = LocalTime.of(14, 0);
+
+ public ReadingTask(Meter meter, Configuration config, DiscovergyApiEngine meterapiengine, aWATTarApiEngine priceapiengine, MqttEngine mqttengine) {
+ this.meter = meter;
+ this.taskname = "task-" + this.meter.getMeterId();
+ this.meterapiengine = meterapiengine;
+ this.priceapiengine = priceapiengine;
+ this.mqttengine = mqttengine;
+ this.config = config;
+ }
+
+ public void getaWATTarPrices() throws InterruptedException, IOException, URISyntaxException {
+ long start = Discotime.beginOf(LocalDate.now());
+ long end;
+ // At 2 PM (14:00) aWATTar makes prices for the full next day available, adjust
+ // end if after this point in time
+ if (Discotime.LocalTimeGermany().isAfter(aWATTarPricesForTomorrowAvailable)) {
+ end = Discotime.endOf(LocalDate.now().plusDays(1));
+ } else {
+ end = Discotime.endOf(LocalDate.now());
+ }
+ if (pricings.size() < 23 || pricings.getStartTimestamp() > start || pricings.getEndTimestamp() < end) {
+ pricings = priceapiengine.query(start, end);
+ logger.info("Got " + pricings.size() + " pricings from aWATTar (start " + pricings.getStartTimestamp() + " end " + pricings.getEndTimestamp() + ")");
+ try {
+ pricings.publish(mqttengine);
+ logger.info("Published to topic " + pricings.getTopic());
+ } catch (MqttException e) {
+ logger.error("Publishing pricings failed: " + e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ long watermark = 0;
+ Thread.currentThread().setName(taskname);
+ logger.info("Task '" + taskname + "' started");
+
+ logger.info("Publishing meter " + meter.getMeterId() + " to topic " + meter.getTopic());
+ try {
+ meter.publish(mqttengine);
+ } catch (MqttException e) {
+ if (!errorHandler(e) == SUCCESS) {
+ logger.error("Too many errors or non-recoverable situation, Task is being stopped.");
+ return;
+ }
+ }
+ // Looping forever in this task until interrupted
+ while (true) {
+ try {
+ if (config.getaWATTarPrices()) {
+ getaWATTarPrices();
+ }
+ Reading reading = meterapiengine.getLastReading(meter.getMeterId());
+ String logline = "Got last reading for meter " + meter.getMeterId() + " (timestamp " + reading.getTime() + ", age " + reading.getAge() + "s";
+ if (watermark < reading.getTime()) {
+ // This is a new reading that needs to be published to MQTT
+ watermark = reading.getTime();
+ logline = logline.concat(", NEW)");
+ logger.info(logline);
+ if (config.getaWATTarPrices()) {
+ reading.addPricing(pricings);
+ }
+ reading.publish(mqttengine);
+ logger.info("Published " + reading + " to topic " + reading.getTopic());
+ } else {
+ // Got the same reading again from the API (not new, no need to publish)
+ logline = logline.concat(")");
+ logger.info(logline);
+ }
+ Thread.sleep(23000);
+ } catch (Exception e) {
+ if (!errorHandler(e) == SUCCESS) {
+ logger.error("Too many errors or non-recoverable situation, Task is being stopped.");
+ break;
+ }
+ }
+ }
+ }
+
+ private boolean errorHandler(Exception e) {
+ logger.error("Reading task '" + taskname + "' failed with exception: " + e.getMessage() + " (Class " + e.getClass().getName() + ")");
+ errorcounter++;
+ if (errorcounter >= 4) {
+ return false;
+ }
+
+ if (e.getClass() == MqttException.class) {
+ // Try to fix MQTT connection
+ logger.error("MQTT connection has a problem. Trying to fix.");
+ if (!mqttengine.isConnected(MqttEngine.RECONNECT)) {
+ logger.error("MQTT connection is disconnected and cannot be reestablished");
+ return false;
+ }
+ }
+ return SUCCESS;
+ }
+
+ @Override
+ public String toString() {
+ return taskname;
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..3778816
--- /dev/null
+++ b/src/main/resources/log4j2.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file