Skip to content

Commit

Permalink
fix: Fixed asset scale and offset not being applied in listen mode [b…
Browse files Browse the repository at this point in the history
…ackport release-5.3.0] (#4738)

* fix: Fixed asset scale and offset not being applied in listen mode (#4736)

* fix: Fixed asset scale and offset not being applied in listen mode

Signed-off-by: Nicola Timeus <[email protected]>

* Compute scale and offset only if needed

Signed-off-by: Nicola Timeus <[email protected]>

---------

Signed-off-by: Nicola Timeus <[email protected]>
(cherry picked from commit f1973d9)

* chore: Updated version in kura.build.properties

Signed-off-by: MMaiero <[email protected]>

* Updated wire.component.provider version in kura.properties

---------

Signed-off-by: MMaiero <[email protected]>
Co-authored-by: nicolatimeus <[email protected]>
Co-authored-by: MMaiero <[email protected]>
  • Loading branch information
3 people authored Jun 28, 2023
1 parent 39a2d18 commit 65d08f1
Show file tree
Hide file tree
Showing 9 changed files with 728 additions and 416 deletions.
4 changes: 2 additions & 2 deletions kura/distrib/config/kura.build.properties
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ org.eclipse.kura.example.wire.math.multiport.provider.version=1.3.0
org.eclipse.kura.example.tamper.detection.version=1.3.0
org.eclipse.kura.web2.version=2.3.0
org.eclipse.kura.wire.camel.version=1.3.0
org.eclipse.kura.wire.component.provider.version=2.0.0
org.eclipse.kura.wire.component.provider.version=2.0.1-SNAPSHOT
org.eclipse.kura.wire.h2db.component.provider.version=2.3.0
org.eclipse.kura.wire.db.component.provider.version=1.1.0
org.eclipse.kura.wire.script.filter.provider.version=1.3.0
org.eclipse.kura.wire.helper.provider.version=1.3.0
org.eclipse.kura.wire.provider.version=1.3.0
org.eclipse.kura.rest.provider.version=1.4.0
org.eclipse.kura.rest.asset.provider.version=1.3.0
org.eclipse.kura.rest.asset.provider.version=1.3.1-SNAPSHOT
org.eclipse.kura.hook.file.move.provider.version=1.3.0
org.eclipse.kura.misc.cloudcat.version=1.3.0
org.eclipse.kura.json.marshaller.unmarshaller.provider.version=1.3.0
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2018, 2020 Eurotech and/or its affiliates and others
* Copyright (c) 2018, 2023 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -19,7 +19,7 @@
import java.util.Map;
import java.util.Set;

import org.eclipse.kura.asset.provider.BaseAsset.ChannelListenerRegistration;
import org.eclipse.kura.asset.provider.BaseAsset.ChannelListenerHolder;
import org.eclipse.kura.channel.Channel;
import org.eclipse.kura.channel.ChannelRecord;
import org.eclipse.kura.driver.Driver;
Expand All @@ -32,7 +32,7 @@ public class DriverState {
private static final Logger logger = LoggerFactory.getLogger(DriverState.class);

private final Driver driver;
private final Set<ChannelListenerRegistration> attachedListeners;
private final Set<ChannelListenerHolder> attachedListeners;

private PreparedRead preparedRead;

Expand Down Expand Up @@ -76,53 +76,53 @@ private void closePreparedRead() {
}
}

public void syncChannelListeners(final Set<ChannelListenerRegistration> targetState,
public void syncChannelListeners(final Set<ChannelListenerHolder> targetState,
final Map<String, Channel> channels) {
setChannelListenersInternal(new HashSet<>(targetState), channels);
}

private void setChannelListenersInternal(final Set<ChannelListenerRegistration> targetState,
private void setChannelListenersInternal(final Set<ChannelListenerHolder> targetState,
final Map<String, Channel> channels) {

final Iterator<ChannelListenerRegistration> iter = this.attachedListeners.iterator();
final Iterator<ChannelListenerHolder> iter = this.attachedListeners.iterator();

while (iter.hasNext()) {
final ChannelListenerRegistration reg = iter.next();
final ChannelListenerHolder reg = iter.next();

if (!targetState.contains(reg)) {
detach(iter, reg);
}
}

for (final ChannelListenerRegistration reg : targetState) {
for (final ChannelListenerHolder holder : targetState) {

if (this.attachedListeners.contains(reg)) {
if (this.attachedListeners.contains(holder)) {
continue;
}

final Channel channel = channels.get(reg.getChannelName());
final Channel channel = channels.get(holder.getChannelName());

if (channel != null && channel.isEnabled()) {
attach(reg, channel);
attach(holder, channel);
}
}
}

private void attach(final ChannelListenerRegistration reg, final Channel channel) {
private void attach(final ChannelListenerHolder holder, final Channel channel) {
try {
logger.debug("Registering Channel Listener for monitoring...");
this.driver.registerChannelListener(channel.getConfiguration(), reg.getChannelListener());
this.attachedListeners.add(reg);
this.driver.registerChannelListener(channel.getConfiguration(), holder);
this.attachedListeners.add(holder);
logger.debug("Registering Channel Listener for monitoring...done");
} catch (Exception e) {
logger.warn("Failed to register channel listener", e);
}
}

private void detach(final Iterator<ChannelListenerRegistration> iter, final ChannelListenerRegistration reg) {
private void detach(final Iterator<ChannelListenerHolder> iter, final ChannelListenerHolder holder) {
try {
logger.debug("Unregistering Asset Listener...");
this.driver.unregisterChannelListener(reg.getChannelListener());
this.driver.unregisterChannelListener(holder);
iter.remove();
logger.debug("Unregistering Asset Listener...done");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Eurotech and/or its affiliates and others
* Copyright (c) 2016, 2023 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand Down Expand Up @@ -364,7 +364,7 @@ private boolean isListeningChannel(final Map<String, Object> properties) {
}

@Override
protected boolean isChannelListenerValid(final ChannelListenerRegistration reg, final Channel channel) {
protected boolean isChannelListenerValid(final ChannelListenerHolder reg, final Channel channel) {

if (!super.isChannelListenerValid(reg, channel)) {
return false;
Expand All @@ -380,13 +380,13 @@ protected boolean isChannelListenerValid(final ChannelListenerRegistration reg,
}

@Override
protected void updateChannelListenerRegistrations(final Set<ChannelListenerRegistration> listeners,
protected void updateChannelListenerRegistrations(final Set<ChannelListenerHolder> listeners,
final AssetConfiguration config) {

super.updateChannelListenerRegistrations(listeners, config);

config.getAssetChannels().entrySet().stream().filter(e -> isListeningChannel(e.getValue().getConfiguration()))
.map(e -> new ChannelListenerRegistration(e.getKey(), new EmitterChannelListener()))
.map(e -> new ChannelListenerHolder(e.getValue(), new EmitterChannelListener()))
.forEach(listeners::add);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Import-Package: org.apache.logging.log4j;version="2.8.2",
org.eclipse.kura.util.wire.test;version="[1.0,2.0)",
org.eclipse.kura.wire.graph;version="[1.0,2.0)",
org.junit;version="[4.12.0,5.0.0)",
org.junit.runner;version="4.12.0",
org.junit.runners;version="[4.12.0,5.0.0)",
org.mockito;version="[4.0.0,5.0.0)",
org.mockito.invocation;version="[4.0.0,5.0.0)",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*******************************************************************************
* Copyright (c) 2022, 2023 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
*******************************************************************************/
package org.eclipse.kura.internal.wire.asset.test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.eclipse.kura.channel.ChannelFlag;
import org.eclipse.kura.channel.ChannelRecord;
import org.eclipse.kura.channel.ChannelStatus;
import org.eclipse.kura.channel.listener.ChannelEvent;
import org.eclipse.kura.channel.listener.ChannelListener;
import org.eclipse.kura.driver.ChannelDescriptor;
import org.eclipse.kura.driver.Driver;
import org.eclipse.kura.driver.PreparedRead;
import org.eclipse.kura.type.TypedValue;

class MockDriver implements Driver {

private final Map<String, List<TypedValue<?>>> values = new HashMap<>();
final Map<String, ChannelListener> listeners = new HashMap<>();
CompletableFuture<Void> preparedReadCalled = new CompletableFuture<>();

@Override
public void connect() throws ConnectionException {
}

@Override
public void disconnect() throws ConnectionException {
}

@Override
public ChannelDescriptor getChannelDescriptor() {

return new ChannelDescriptor() {

@Override
public Object getDescriptor() {
return Collections.emptyList();
}
};
}

@Override
public void read(List<ChannelRecord> records) throws ConnectionException {
for (final ChannelRecord record : records) {
final Optional<TypedValue<?>> value = Optional.ofNullable(values.get(record.getChannelName()))
.flatMap(l -> {
if (l.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(l.remove(0));
}
});

if (value.isPresent()) {
record.setChannelStatus(new ChannelStatus(ChannelFlag.SUCCESS));
record.setValue(value.get());
} else {
record.setChannelStatus(new ChannelStatus(ChannelFlag.FAILURE));
}

record.setTimestamp(System.currentTimeMillis());
}

}

@Override
public synchronized void registerChannelListener(Map<String, Object> channelConfig, ChannelListener listener)
throws ConnectionException {
synchronized (listeners) {
listeners.put((String) channelConfig.get("+name"), listener);
listeners.notifyAll();
}
}

@Override
public void unregisterChannelListener(ChannelListener listener) throws ConnectionException {
synchronized (listeners) {
final Iterator<Entry<String, ChannelListener>> iter = listeners.entrySet().iterator();

while (iter.hasNext()) {
final Entry<String, ChannelListener> e = iter.next();

if (e.getValue() == listener) {
iter.remove();
}
}
}
}

@Override
public void write(List<ChannelRecord> records) throws ConnectionException {
}

synchronized void addReadResult(final String channelName, final TypedValue<?> value) {
this.values.computeIfAbsent(channelName, a -> new ArrayList<>()).add(value);
}

synchronized void emitChannelEvent(final String channelName, final TypedValue<?> value) {
for (final Entry<String, ChannelListener> e : listeners.entrySet()) {

if (!e.getKey().equals(channelName)) {
continue;
}

final ChannelRecord record = ChannelRecord.createReadRecord(channelName, value.getType());
record.setValue(value);
record.setChannelStatus(new ChannelStatus(ChannelFlag.SUCCESS));
record.setTimestamp(System.currentTimeMillis());

e.getValue().onChannelEvent(new ChannelEvent(record));
}
}

@Override
public PreparedRead prepareRead(List<ChannelRecord> records) {
preparedReadCalled.complete(null);

throw new UnsupportedOperationException();
}

}
Loading

0 comments on commit 65d08f1

Please sign in to comment.