Skip to content

Commit

Permalink
better error recovery from loss of comms
Browse files Browse the repository at this point in the history
  • Loading branch information
gilesbradshaw committed Apr 18, 2016
1 parent b9ac26e commit 1a2c078
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 62 deletions.
93 changes: 52 additions & 41 deletions NodeSocket.js
Original file line number Diff line number Diff line change
@@ -1,67 +1,78 @@
// @flow

import {Observable, ReplaySubject} from 'rx-lite';
import {opcua, nextSession, handleError} from './data/opcua';
import {opcua, sessions, handleError} from './data/opcua';



var sub = new ReplaySubject(1);

nextSession().subscribe(session=>sub.onNext(
new opcua.ClientSubscription(session,{
requestedPublishingInterval: 1000,
requestedLifetimeCount: 10,
requestedMaxKeepAliveCount: 2,
maxNotificationsPerPublish: 10,
publishingEnabled: true,
priority: 10
})
));
sessions.subscribe(session=>{
console.log('session ...', session);
if(session) {
sub.onNext(
new opcua.ClientSubscription(session,{
requestedPublishingInterval: 1000,
requestedLifetimeCount: 10,
requestedMaxKeepAliveCount: 2,
maxNotificationsPerPublish: 10,
publishingEnabled: true,
priority: 10
})
);
} else {
sub.onNext(null);
}
});


class NodeSocket {
destroy: Function;
constructor(nodeId : string, io : any){
const _this=this;
const timer = sub.select(subscription => Observable.create((obs)=> {
console.log('erewego', nodeId, sub);
try {

// install monitored item
//console.log('monitoring', nodeId);
let monitoredItem = subscription.monitor({
nodeId: opcua.resolveNodeId(nodeId.split(':').slice(1).join()),
attributeId: opcua.AttributeIds[nodeId.split(':')[0]]
},
{
samplingInterval: 1000,
discardOldest: true,
queueSize: 10
},
opcua.read_service.TimestampsToReturn.Both
);
console.log("-monitoring", subscription.subscriptionId, nodeId);
const timer = sub.select(subscription => Observable.create(
obs=> {
if(subscription) {
try {

// install monitored item
//console.log('monitoring', nodeId);
let monitoredItem = subscription.monitor({
nodeId: opcua.resolveNodeId(nodeId.split(':').slice(1).join()),
attributeId: opcua.AttributeIds[nodeId.split(':')[0]]
},
{
samplingInterval: 1000,
discardOldest: true,
queueSize: 10
},
opcua.read_service.TimestampsToReturn.Both
);

monitoredItem.on("changed",function(dataValue){
obs.onNext(dataValue);
});
console.log("-monitoring", subscription.subscriptionId, nodeId);

monitoredItem.on("changed",function(dataValue){
obs.onNext(dataValue);
});
} catch(ex) {
console.log("caught error", ex);
obs.onError(ex);
}
return ()=>{}; //subscription.terminate();
} )).switch().subscribe(x => {
} catch(ex) {
console.log("caught error", ex);
obs.onError(ex);
}
} else {
obs.onNext(null);
}
return ()=>{};
}
)).switch().subscribe(x => {
_this.lastValue = {
nodeId: nodeId,
value: x
};

};
io.to(nodeId).emit('update', _this.lastValue );
});

this.destroy = ()=> timer.dispose();
}

}

export default NodeSocket;
8 changes: 3 additions & 5 deletions data/opcua.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ function logAllEmitterEvents(eventEmitter)
eventEmitter.emit = function () {
var event = arguments[0];
if(event!='receive_response' && event!='receive_chunk' && event!='send_request' && event!='send_chunk')
console.log("event emitted: " + event);//, JSON.stringify(arguments, null, '\t'));
emitToLog.apply(eventEmitter, arguments);
};
return eventEmitter;
Expand Down Expand Up @@ -45,11 +44,10 @@ class UASession {
var observable = new Rx.ReplaySubject(1);
var closeRequest = new Rx.Subject();
var client = logAllEmitterEvents(new opcua.OPCUAClient({applicationName: 'uaQL'}));
this.sessions = observable;
this.nextSession=()=> observable.where(s=>s).take(1)
this.handleError=(session, err)=>{
try{
console.log('ERROR to handle', JSON.stringify(err, null, '\t'));
console.log('eh?');
//if(err instanceof Error){
console.log('err details', err.name, err.message);
if(err.message==='Transaction has timed out'){
Expand Down Expand Up @@ -90,7 +88,6 @@ class UASession {
return err;
}
const go = ()=> {
console.log('go!');
console.log('ok connecting client..');
client.connect(endpointUrl, (err) => {
if(err) {
Expand Down Expand Up @@ -166,5 +163,6 @@ class UASession {
const connector = new UASession();
const nextSession = connector.nextSession;
const handleError = connector.handleError;
const sessions = connector.sessions;
//export default connector.session;
export {opcua as opcua, nextSession as nextSession, handleError as handleError};
export {opcua as opcua, sessions as sessions, nextSession as nextSession, handleError as handleError};
1 change: 0 additions & 1 deletion data/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,6 @@ const UANodeType = new GraphQLObjectType({
parent: {
type: ReferenceDescriptionType,
resolve: ({id}) => new Promise(function(resolve, reject){
console.log("resolving pRENT");
nextSession().take(1).timeout(3000, new Error('Timeout, try later...'))
.subscribe(session=> {
session.browse(id, function(err, browseResult){
Expand Down
8 changes: 0 additions & 8 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@ var io = socket(socketserver, {
path: '/napi'
});



setInterval(function(){
global.gc();
console.log('GC done')
}, 1000*30);


const rooms = {};
var latestConnection = 0;
const leaveRoom = (socket, nodeId, myRooms, myConnection)=>{
Expand Down
7 changes: 0 additions & 7 deletions server.production.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ var io = socket(socketserver, {
});



setInterval(function(){
global.gc();
console.log('GC done')
}, 1000*30);


const rooms = {};
var latestConnection = 0;
const leaveRoom = (socket, nodeId, myRooms, myConnection)=>{
Expand Down

0 comments on commit 1a2c078

Please sign in to comment.