Skip to content

Commit

Permalink
feature #183: .addEmitFilter and .addCallFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
justparking committed Jun 17, 2020
1 parent 22d8a54 commit a07a0c0
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public Map<String, Object> getFullSchema() {
* For other in-process 'call' handlers that may be interested in this action.
*/
private AtomicReference<Handler.H1<Object>[]> _callHandlers = new AtomicReference<Handler.H1<Object>[]>();

/**
* Allow a call filter which can trap and alter the value of the arg
*/
private Handler.F1<Object, Object> _callFilter = null;

private boolean _closed;

Expand Down Expand Up @@ -257,13 +262,34 @@ protected void handleActionRequest(Object arg) {

@Override
public void handleActionRequest(Object arg) {
if (_callFilter != null) {
// arg = _emitFilter.handle(arg);
if (_callbackQueue != null) {
try {
arg = _callbackQueue.handle(_callFilter, arg);
} catch (Exception e) {
// handle gracefully ...
_exceptionHandler.handle(e);
// ... and keep going with arg regardless
}
} else {
try {
arg = _callFilter.handle(arg);
} catch (Exception exc) {
throw new RuntimeException("Emit filter", exc);
}
}
}

final Object finalArg = arg;

_argValue.set(arg);
_timestamp.set(DateTime.now());

// seq must be set last
_seqNum = Nodel.getNextSeq();

_handler.handleActionRequest(arg);
_handler.handleActionRequest(finalArg);

// snap-shot of handlers
final H1<Object>[] handlers = _callHandlers.get();
Expand All @@ -283,10 +309,10 @@ public void run() {
// call handlers one after the other
for (Handler.H1<Object> handler : handlers) {
if (_callbackQueue != null)
_callbackQueue.handle(handler, arg, _exceptionHandler);
_callbackQueue.handle(handler, finalArg, _exceptionHandler);
else {
try {
Handler.tryHandle(handler, arg);
Handler.tryHandle(handler, finalArg);
} catch (Exception exc) {
lastExc = exc;
}
Expand Down Expand Up @@ -350,6 +376,13 @@ public void addCallHandler(Handler.H1<Object> handler) {

// otherwise keep trying
}
}

/**
* Filtering the arg emitted
*/
public void addCallFilter(Handler.F1<Object, Object> filter) {
_callFilter = filter;
}

/**
Expand Down
41 changes: 37 additions & 4 deletions nodel-framework/src/main/java/org/nodel/core/NodelServerEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public class NodelServerEvent implements Closeable {
* For other in-process 'emit' handlers that may be interested in this event.
*/
private LockFreeList<Handler.H1<Object>> _emitHandlers = new LockFreeList<>();

/**
* Allow an emit filter which can trap and alter the value of the arg
*/
private Handler.F1<Object, Object> _emitFilter = null;

private String _title;

Expand Down Expand Up @@ -300,11 +305,30 @@ public void emit(@Param(name="arg", title="Argument") Object arg) {
/**
* Fires the event.
*/
private void doEmit(final Object arg) {
private void doEmit(Object arg) {
DateTime now = DateTime.now();

if (_emitFilter != null) {
// arg = _emitFilter.handle(arg);
if (_callbackQueue != null) {
try {
arg = _callbackQueue.handle(_emitFilter, arg);
} catch (Exception e) {
// handle gracefully ...
_exceptionHandler.handle(e);
// ... and keep going with arg regardless
}
} else {
try {
arg = _emitFilter.handle(arg);
} catch (Exception exc) {
throw new RuntimeException("Emit filter", exc);
}
}
}

ArgInstance argInstance = new ArgInstance();
argInstance.timestamp = DateTime.now();
argInstance.timestamp = now;
argInstance.arg = arg;
argInstance.seqNum = Nodel.getNextSeq();

Expand All @@ -318,6 +342,8 @@ private void doEmit(final Object arg) {
// snap-shot of handlers
final List<Handler.H1<Object>> handlers = _emitHandlers.items();

final Object finalArg = arg;

// if there are some handlers, use the Channel Client thread-pool (treat as though remote events)
if (handlers.size() > 0) {
ChannelClient.getThreadPool().execute(new Runnable() {
Expand All @@ -333,10 +359,10 @@ public void run() {
// call handlers one after the other
for (Handler.H1<Object> handler : handlers) {
if (_callbackQueue != null)
_callbackQueue.handle(handler, arg, _exceptionHandler);
_callbackQueue.handle(handler, finalArg, _exceptionHandler);
else {
try {
Handler.handle(handler, arg);
Handler.handle(handler, finalArg);
} catch (Exception exc) {
lastExc = exc;
}
Expand All @@ -359,6 +385,13 @@ public void run() {
public void addEmitHandler(Handler.H1<Object> handler) {
_emitHandlers.add(handler);
}

/**
* Filtering the arg emitted
*/
public void addEmitFilter(Handler.F1<Object, Object> filter) {
_emitFilter = filter;
}

/**
* Attaches a monitor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ public <T> T handle(Callable<T> func) throws Exception {
_fairLock.unlock();
}
}

/**
* For synchronous functions.
*/
public <R, T> R handle(Handler.F1<R, T> func, T arg) throws Exception {
try {
_fairLock.lock();

return func.handle(arg);

} finally {
_fairLock.unlock();
}
}

/**
* Creates a callback instance.
Expand Down

0 comments on commit a07a0c0

Please sign in to comment.