Skip to content

Commit

Permalink
feat(sparkplug): implementation of CloudPublisher as Sparkplug Device (
Browse files Browse the repository at this point in the history
…#5107)

* feat(sparkplug): implementation of CloudPublisher as Sparkplug Device

Signed-off-by: Marcello Martina <[email protected]>

* feat: using KuraPayload.getTimestamp as timestamp in Sparkplug payload

Signed-off-by: Marcello Martina <[email protected]>

* refactor: moved Sparkplug payload construction in one point

Signed-off-by: Marcello Martina <[email protected]>

* fix: missing payload injection

Signed-off-by: Marcello Martina <[email protected]>

* test: added test cases and refactor of some tests

Signed-off-by: Marcello Martina <[email protected]>

* test: removed unused field

Signed-off-by: Marcello Martina <[email protected]>

* test: refactor of SparkplugIntegrationTest

Signed-off-by: Marcello Martina <[email protected]>

* fix: copying set of entries since original is immutable

Signed-off-by: Marcello Martina <[email protected]>

* test: added SparkplugDevice default initialization to SparkplugIntegrationTest

Signed-off-by: Marcello Martina <[email protected]>

* fix: seqCounter is reset when updating component

Signed-off-by: Marcello Martina <[email protected]>

* test: added test cases for SparkplugDevice

Signed-off-by: Marcello Martina <[email protected]>

* chore: added debug logger to SparkplugBProtobufPayloadBuilder

Signed-off-by: Marcello Martina <[email protected]>

* fix: position metrics are added only if values non null

Signed-off-by: Marcello Martina <[email protected]>

* test: renamed TypeMapperTestCase to TypeMapperCase to avoid Sonar

Signed-off-by: Marcello Martina <[email protected]>

* refactor: removed unused properties from endpoint update method

Signed-off-by: Marcello Martina <[email protected]>

* refactor: applied review suggestions

Signed-off-by: Marcello Martina <[email protected]>

* refactor: open service tracker from a different thread

Signed-off-by: Marcello Martina <[email protected]>

---------

Signed-off-by: Marcello Martina <[email protected]>
  • Loading branch information
marcellorinaldo authored Jan 31, 2024
1 parent 588c0a2 commit d5c541b
Show file tree
Hide file tree
Showing 19 changed files with 1,214 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ Bundle-Name: Sparkplug MQTT Cloud Connection Provider
Bundle-SymbolicName: org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider
Bundle-Version: 1.0.0.qualifier
Require-Capability: osgi.ee;filter:="(&(osgi.ee=JavaSE)(version=1.8))"
Import-Package: com.google.protobuf;version="[3.0,4.0]",
com.google.gson;version="[2.7,3.0)",
Import-Package: com.google.gson;version="[2.7,3.0)",
com.google.protobuf;version="[3.0,4.0]",
org.eclipse.kura;version="[1.0,2.0)",
org.eclipse.kura.cloud;version="[1.1,2.0)",
org.eclipse.kura.cloudconnection;version="[1.0,1.1)",
Expand All @@ -19,10 +19,12 @@ Import-Package: com.google.protobuf;version="[3.0,4.0]",
org.eclipse.kura.data;version="[1.0,2.0)",
org.eclipse.kura.data.listener;version="[1.0,2.0)",
org.eclipse.kura.data.transport.listener;version="[1.0,2.0)",
org.eclipse.kura.message;version="[1.5,2.0)",
org.eclipse.kura.type;version="[1.1,2.0)",
org.osgi.framework;version="1.8.0",
org.osgi.service.component;version="1.2.0",
org.osgi.service.event;version="1.3.1",
org.osgi.util.tracker;version="[1.5,2.0)",
org.slf4j;version="[1.7,2.0]"
Bundle-ActivationPolicy: lazy
Service-Component: OSGI-INF/*.xml
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2024 Eurotech and/or its affiliates and others
This program and the accompanying materials are made
available under the terms of the Eclipse Public License 2.0
which is available at https://www.eclipse.org/legal/epl-2.0/
SPDX-License-Identifier: EPL-2.0
Contributors:
Eurotech
-->
<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0"
name="org.eclipse.kura.cloudconnection.sparkplug.mqtt.device.SparkplugDevice"
activate="activate"
modified="update"
deactivate="deactivate"
configuration-policy="require"
enabled="true"
immediate="true">

<implementation class="org.eclipse.kura.cloudconnection.sparkplug.mqtt.device.SparkplugDevice"/>

<service>
<provide interface="org.eclipse.kura.cloudconnection.publisher.CloudPublisher"/>
<provide interface="org.eclipse.kura.configuration.ConfigurableComponent"/>
</service>

<property name="service.pid" type="String" value="org.eclipse.kura.cloudconnection.sparkplug.mqtt.device.SparkplugDevice"/>
<property name="cloud.connection.factory.pid" type="String" value="org.eclipse.kura.cloudconnection.sparkplug.mqtt.endpoint.SparkplugCloudEndpoint"/>
<property name="kura.ui.service.hide" type="Boolean" value="true"/>
<property name="kura.ui.factory.hide" type="String" value="true"/>

</scr:component>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2024 Eurotech and/or its affiliates and others
This program and the accompanying materials are made
available under the terms of the Eclipse Public License 2.0
which is available at https://www.eclipse.org/legal/epl-2.0/
SPDX-License-Identifier: EPL-2.0
Contributors:
Eurotech
-->
<MetaData xmlns="http://www.osgi.org/xmlns/metatype/v1.2.0" localization="en_us">

<OCD id="org.eclipse.kura.cloudconnection.sparkplug.mqtt.device.SparkplugDevice"
name="SparkplugDevice"
description="Sparkplug Device configuration. This Cloud Publisher sends a device birth message (DBIRTH message type)
when the first publish occurs or when the set of published metrics is changed.
After a DBIRTH message, this Cloud Publisher will send device data messages (DDATA message type).">

<AD id="device.id"
name="Sparkplug Device ID"
type="String"
cardinality="0"
required="true"
default="device"
description="Sparkplug Device identifier, needs to be unique under the same Sparkplug Edge Node ID."/>

</OCD>

<Designate pid="org.eclipse.kura.cloudconnection.sparkplug.mqtt.device.SparkplugDevice"
factoryPid="org.eclipse.kura.cloudconnection.sparkplug.mqtt.device.SparkplugDevice">
<Object ocdref="org.eclipse.kura.cloudconnection.sparkplug.mqtt.device.SparkplugDevice"/>
</Designate>
</MetaData>
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*******************************************************************************
* Copyright (c) 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
*******************************************************************************/
package org.eclipse.kura.cloudconnection.sparkplug.mqtt.device;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.eclipse.kura.KuraErrorCode;
import org.eclipse.kura.KuraException;
import org.eclipse.kura.cloudconnection.CloudConnectionConstants;
import org.eclipse.kura.cloudconnection.CloudConnectionManager;
import org.eclipse.kura.cloudconnection.listener.CloudConnectionListener;
import org.eclipse.kura.cloudconnection.listener.CloudDeliveryListener;
import org.eclipse.kura.cloudconnection.message.KuraMessage;
import org.eclipse.kura.cloudconnection.publisher.CloudPublisher;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.endpoint.SparkplugCloudEndpoint;
import org.eclipse.kura.cloudconnection.sparkplug.mqtt.message.SparkplugMessageType;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.Filter;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
import org.osgi.service.component.ComponentContext;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkplugDevice
implements CloudPublisher, ConfigurableComponent, CloudConnectionListener, CloudDeliveryListener {

private static final Logger logger = LoggerFactory.getLogger(SparkplugDevice.class);

public static final String KEY_MESSAGE_TYPE = "message.type";
public static final String KEY_DEVICE_ID = "device.id";

private String deviceId;
private ServiceTracker<CloudConnectionManager, CloudConnectionManager> cloudConnectionManagerTracker;
private Optional<SparkplugCloudEndpoint> sparkplugCloudEndpoint = Optional.empty();
private final Set<CloudConnectionListener> cloudConnectionListeners = new CopyOnWriteArraySet<>();
private final Set<CloudDeliveryListener> cloudDeliveryListeners = new CopyOnWriteArraySet<>();
private final ExecutorService executorService = Executors.newCachedThreadPool();
private Set<String> deviceMetrics = new HashSet<>();

/*
* ConfigurableComponent APIs
*/

public void activate(final ComponentContext componentContext, final Map<String, Object> properties)
throws InvalidSyntaxException {
String selectedCloudEndpointPid = (String) properties
.get(CloudConnectionConstants.CLOUD_ENDPOINT_SERVICE_PID_PROP_NAME.value());

String filterString = String.format("(&(%s=%s)(kura.service.pid=%s))", Constants.OBJECTCLASS,
CloudConnectionManager.class.getName(), selectedCloudEndpointPid);

final BundleContext context = componentContext.getBundleContext();
final Filter filter = context.createFilter(filterString);
this.cloudConnectionManagerTracker = new ServiceTracker<>(context, filter,
new CloudConnectionManagerTrackerCustomizer(context));

this.executorService.submit(() -> this.cloudConnectionManagerTracker.open());

update(properties);
}

public void update(final Map<String, Object> properties) {
this.deviceId = (String) properties.get(KEY_DEVICE_ID);
if (Objects.isNull(this.deviceId) || this.deviceId.trim().isEmpty()) {
throw new IllegalArgumentException("Property '" + KEY_DEVICE_ID + "' cannot be null or empty");
}

this.deviceMetrics.clear();

logger.info("Sparkplug Device {} - Updated device ID", this.deviceId);
}

public void deactivate() {
logger.info("Sparkplug Device {} - Deactivating", this.deviceId);

if (Objects.nonNull(this.cloudConnectionManagerTracker)) {
this.cloudConnectionManagerTracker.close();
}

logger.debug("Sparkplug Device {} - Shutting down executor service", this.deviceId);
this.executorService.shutdownNow();

logger.info("Sparkplug Device {} - Deactivated", this.deviceId);
}

/*
* CloudConnectionListener APIs
*/

@Override
public void onDisconnected() {
this.deviceMetrics.clear();
this.cloudConnectionListeners.forEach(listener -> this.executorService.execute(listener::onDisconnected));
}

@Override
public void onConnectionLost() {
this.deviceMetrics.clear();
this.cloudConnectionListeners.forEach(listener -> this.executorService.execute(listener::onConnectionLost));
}

@Override
public void onConnectionEstablished() {
this.deviceMetrics.clear();
this.cloudConnectionListeners
.forEach(listener -> this.executorService.execute(listener::onConnectionEstablished));
}

/*
* CloudPublisher APIs
*/

@Override
public synchronized String publish(final KuraMessage message) throws KuraException {
if (!this.sparkplugCloudEndpoint.isPresent()) {
throw new KuraException(KuraErrorCode.SERVICE_UNAVAILABLE, "Missing SparkplugCloudEndpoint reference");
}

final Map<String, Object> newMessageProperties = new HashMap<>();
newMessageProperties.put(KEY_DEVICE_ID, this.deviceId);

if (this.deviceMetrics.isEmpty() || !this.deviceMetrics.equals(message.getPayload().metricNames())) {
this.deviceMetrics.clear();
this.deviceMetrics.addAll(message.getPayload().metricNames());
newMessageProperties.put(KEY_MESSAGE_TYPE, SparkplugMessageType.DBIRTH);
logger.info("Sparkplug Device {} - Metrics set changed, publishing DBIRTH", this.deviceId);
} else {
newMessageProperties.put(KEY_MESSAGE_TYPE, SparkplugMessageType.DDATA);
}

return this.sparkplugCloudEndpoint.get().publish(new KuraMessage(message.getPayload(), newMessageProperties));
}

@Override
public void registerCloudConnectionListener(final CloudConnectionListener cloudConnectionListener) {
this.cloudConnectionListeners.add(cloudConnectionListener);
}

@Override
public void unregisterCloudConnectionListener(final CloudConnectionListener cloudConnectionListener) {
this.cloudConnectionListeners.remove(cloudConnectionListener);
}

@Override
public void registerCloudDeliveryListener(final CloudDeliveryListener cloudDeliveryListener) {
this.cloudDeliveryListeners.add(cloudDeliveryListener);
}

@Override
public void unregisterCloudDeliveryListener(final CloudDeliveryListener cloudDeliveryListener) {
this.cloudDeliveryListeners.remove(cloudDeliveryListener);
}

/*
* CloudDeliveryListener APIs
*/

@Override
public void onMessageConfirmed(final String messageId) {
this.cloudDeliveryListeners
.forEach(listener -> this.executorService.execute(() -> listener.onMessageConfirmed(messageId)));
}

/*
* Utils
*/

synchronized void setSparkplugCloudEndpoint(SparkplugCloudEndpoint endpoint) {
this.sparkplugCloudEndpoint = Optional.of(endpoint);
this.sparkplugCloudEndpoint.get().registerCloudConnectionListener(this);
this.sparkplugCloudEndpoint.get().registerCloudDeliveryListener(this);
}

synchronized void unsetSparkplugCloudEndpoint(SparkplugCloudEndpoint endpoint) {
if (this.sparkplugCloudEndpoint.isPresent() && this.sparkplugCloudEndpoint.get() == endpoint) {
this.sparkplugCloudEndpoint.get().unregisterCloudConnectionListener(this);
this.sparkplugCloudEndpoint.get().unregisterCloudDeliveryListener(this);
this.sparkplugCloudEndpoint = Optional.empty();
}
}

private class CloudConnectionManagerTrackerCustomizer
implements ServiceTrackerCustomizer<CloudConnectionManager, CloudConnectionManager> {

private final BundleContext context;

public CloudConnectionManagerTrackerCustomizer(BundleContext context) {
this.context = context;
}

@Override
public synchronized CloudConnectionManager addingService(
final ServiceReference<CloudConnectionManager> reference) {
CloudConnectionManager cloudConnectionManager = this.context.getService(reference);

if (cloudConnectionManager instanceof SparkplugCloudEndpoint) {
setSparkplugCloudEndpoint((SparkplugCloudEndpoint) cloudConnectionManager);
return cloudConnectionManager;
} else {
this.context.ungetService(reference);
}

return null;
}

@Override
public synchronized void removedService(final ServiceReference<CloudConnectionManager> reference,
final CloudConnectionManager service) {
unsetSparkplugCloudEndpoint((SparkplugCloudEndpoint) service);
}

@Override
public synchronized void modifiedService(final ServiceReference<CloudConnectionManager> reference,
final CloudConnectionManager service) {
// Not needed
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*******************************************************************************
* Copyright (c) 2024 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
*******************************************************************************/
package org.eclipse.kura.cloudconnection.sparkplug.mqtt.endpoint;


public class SeqCounter {

private int seq = 1; // start from 1 since 0 is reserved for BIRTH messages

public synchronized void next() {
if (this.seq == 255) {
this.seq = 1;
} else {
this.seq = this.seq + 1;
}
}

public synchronized int getCurrent() {
return this.seq;
}

}
Loading

0 comments on commit d5c541b

Please sign in to comment.