Skip to content

Commit

Permalink
Implementation of issue #24.
Browse files Browse the repository at this point in the history
  • Loading branch information
whizzosoftware committed Dec 28, 2017
1 parent 1abb775 commit fa8e683
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,44 @@ public class TransactionInboundHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(TransactionInboundHandler.class);

private ChannelHandlerContext handlerContext;
private DataFrameTransaction currentDataFrameTransaction;
private ScheduledFuture timeoutFuture;
private TransactionContext transactionContext;

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.handlerContext = ctx;
}

/**
* Called when data is read from the Z-Wave network.
* Called by Netty when data is read from the Z-Wave network.
*
* @param ctx the handler context
* @param msg the message that was read
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
processData(ctx, msg, System.currentTimeMillis());
}

/**
* Process data from the Z-Wave network.
*
* @param ctx the handler context
* @param msg the message to process
* @param time the time the data was read
*/
void processData(ChannelHandlerContext ctx, Object msg, long time) {
if (msg instanceof Frame) {
Frame frame = (Frame) msg;
if (hasCurrentTransaction()) {
String tid = currentDataFrameTransaction.getId();
String tid = transactionContext.getId();
logger.trace("Received frame within transaction ({}) context: {}", tid, frame);

// give new frame to current transaction
NettyZWaveChannelContext zctx = new NettyZWaveChannelContext();
if (currentDataFrameTransaction.addFrame(zctx, frame)) {
if (currentDataFrameTransaction.isComplete()) {
if (transactionContext.addFrame(zctx, frame)) {
if (transactionContext.isComplete()) {
logger.trace("*** Data frame transaction ({}) completed", tid);
logger.trace("");
cancelTimeoutCallback();
}
zctx.process(ctx);
// if transaction didn't consume frame, then pass it down the pipeline
Expand All @@ -72,7 +81,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
} else if (msg instanceof AddNodeToNetwork) {
logger.trace("Received ADD_NODE_STATUS_NODE_FOUND; starting transaction");
NettyZWaveChannelContext zctx = new NettyZWaveChannelContext();
currentDataFrameTransaction = new NodeInclusionTransaction(zctx, (DataFrame)msg);
transactionContext = new TransactionContext(new NodeInclusionTransaction(zctx, (DataFrame)msg), time);
zctx.process(ctx);
} else {
logger.trace("Received frame outside of transaction context so passing it along: {}", frame);
Expand All @@ -81,40 +90,59 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
}
}

DataFrameTransaction getCurrentTransaction() {
return currentDataFrameTransaction;
}

/**
* Called by Netty when an event is fired.
*
* @param ctx the handler context
* @param evt the event that was fired
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
processEvent(ctx, evt, System.currentTimeMillis());
}

/**
* Process an event propagated by Netty.
*
* @param ctx the handler context
* @param evt the event
* @param time the time the event was fired
*/
void processEvent(ChannelHandlerContext ctx, Object evt, long time) {
if (evt instanceof DataFrameSentEvent) {
DataFrameSentEvent dfse = (DataFrameSentEvent)evt;
logger.trace("Detected data frame write event: {}", dfse.getDataFrame());
if (!hasCurrentTransaction()) {
NettyZWaveChannelContext zctx = new NettyZWaveChannelContext();
currentDataFrameTransaction = dfse.getDataFrame().createWrapperTransaction(zctx, dfse.isListeningNode());
if (currentDataFrameTransaction != null) {
logger.trace("*** Data frame transaction started for {} with ID {}", dfse.getDataFrame(), currentDataFrameTransaction.getId());
startTimeoutCallback();
DataFrameTransaction t = dfse.getDataFrame().createWrapperTransaction(zctx, dfse.isListeningNode());
if (t != null) {
transactionContext = new TransactionContext(t, time);
logger.trace("*** Data frame transaction started for {} with ID {}", dfse.getDataFrame(), t.getId());
zctx.process(ctx);
}
} else if (currentDataFrameTransaction != null && currentDataFrameTransaction.getStartFrame() == dfse.getDataFrame()) {
} else if (transactionContext != null && transactionContext.getStartFrame() == dfse.getDataFrame()) {
logger.trace("Detected re-send of transaction start frame; starting timeout");
startTimeoutCallback();
transactionContext.resetTimeout(time);
} else {
logger.trace("Wrote a data frame with a current transaction: {}", dfse.getDataFrame());
}
} else if (evt instanceof TransactionTimeoutEvent) {
TransactionTimeoutEvent tte = (TransactionTimeoutEvent)evt;
if (tte.getId().equals(currentDataFrameTransaction.getId())) {
if (tte.getId().equals(transactionContext.getId())) {
logger.trace("Detected transaction timeout");
timeoutFuture = null;
NettyZWaveChannelContext zctx = new NettyZWaveChannelContext();
currentDataFrameTransaction.timeout(zctx);
transactionContext.processTimeoutEvent(zctx);
transactionContext = null;
zctx.process(ctx);
} else {
logger.error("Received timeout event for unknown transaction: {}", tte.getId());
}
} else if (evt instanceof IncompleteDataFrameEvent) {
if (transactionContext != null) {
logger.trace("Incomplete data received from network; extending transaction timeout for {}", transactionContext.getId());
transactionContext.resetTimeout(System.currentTimeMillis());
}
ctx.fireUserEventTriggered(evt);
} else {
ctx.fireUserEventTriggered(evt);
}
Expand All @@ -126,31 +154,16 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
* @return a boolean
*/
boolean hasCurrentTransaction() {
return (currentDataFrameTransaction != null && !currentDataFrameTransaction.isComplete());
}

private void startTimeoutCallback() {
cancelTimeoutCallback();
if (currentDataFrameTransaction.getTimeout() > 0 && handlerContext != null && handlerContext.executor() != null) {
timeoutFuture = handlerContext.executor().schedule(
new TransactionTimeoutHandler(
currentDataFrameTransaction.getId(),
handlerContext,
this
),
currentDataFrameTransaction.getTimeout(),
TimeUnit.MILLISECONDS
);
} else {
logger.warn("Unable to schedule transaction timeout callback");
}
return (transactionContext != null && !transactionContext.isComplete());
}

private void cancelTimeoutCallback() {
if (timeoutFuture != null) {
timeoutFuture.cancel(true);
timeoutFuture = null;
}
/**
* Returns the current transaction context.
*
* @return a TransactionContext instance or null if there is none
*/
TransactionContext getTransactionContext() {
return transactionContext;
}

private class NettyZWaveChannelContext implements ZWaveChannelContext {
Expand Down Expand Up @@ -186,4 +199,81 @@ public void writeFrame(OutboundDataFrame f) {
frames.add(f);
}
}

/**
* A class that wrappers a DataFrameTransaction and manages timeout logic.
*/
public class TransactionContext {
DataFrameTransaction transaction;
ScheduledFuture future;
long timeoutStartTime;
long timeoutDuration;

TransactionContext(DataFrameTransaction transaction, long startTime) {
this.transaction = transaction;
scheduleTimeout(startTime);
}

public DataFrameTransaction getTransaction() {
return transaction;
}

String getId() {
return transaction.getId();
}

DataFrame getStartFrame() {
return transaction.getStartFrame();
}

boolean addFrame(ZWaveChannelContext ctx, Frame frame) {
boolean b = transaction.addFrame(ctx, frame);
if (transaction.isComplete()) {
cancelTimeout();
}
return b;
}

boolean isComplete() {
return transaction.isComplete();
}

void processTimeoutEvent(ZWaveChannelContext ctx) {
transaction.timeout(ctx);
}

public long getTimeoutStartTime() {
return timeoutStartTime;
}

void resetTimeout(long currentTime) {
cancelTimeout();
scheduleTimeout(currentTime);
}

private void scheduleTimeout(long currentTime) {
if (transaction.getTimeout() > 0 && handlerContext != null && handlerContext.executor() != null) {
this.timeoutStartTime = currentTime;
this.timeoutDuration = transaction.getTimeout();
future = handlerContext.executor().schedule(
new TransactionTimeoutHandler(
transaction.getId(),
handlerContext,
TransactionInboundHandler.this
),
timeoutDuration,
TimeUnit.MILLISECONDS
);
} else {
logger.warn("Unable to schedule transaction timeout callback");
}
}

private void cancelTimeout() {
if (future != null) {
future.cancel(true);
future = null;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
*******************************************************************************
* Copyright (c) 2016 Whizzo Software, LLC.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*******************************************************************************
*/
package com.whizzosoftware.wzwave.channel.event;

/**
* An event fired when an incomplete data frame is received from the Z-Wave network.
*
* @author Dan Noguerol
*/
public class IncompleteDataFrameEvent {
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
/*
*******************************************************************************
* Copyright (c) 2016 Whizzo Software, LLC.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*******************************************************************************
*/
package com.whizzosoftware.wzwave.channel;

import io.netty.buffer.ByteBufAllocator;
Expand All @@ -13,6 +22,7 @@
public class MockChannelHandlerContext implements ChannelHandlerContext {
private final List<Object> writeQueue = new ArrayList<>();
private final List<Object> userEvents = new ArrayList<>();
private final MockEventExecutor executor = new MockEventExecutor();

public List<Object> getWriteQueue() {
return writeQueue;
Expand Down Expand Up @@ -228,7 +238,7 @@ public int compareTo(Channel o) {

@Override
public EventExecutor executor() {
return null;
return executor;
}

@Override
Expand Down
Loading

0 comments on commit fa8e683

Please sign in to comment.