Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jhead committed Mar 23, 2016
0 parents commit b8c9d77
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
build/
node_modules/
11 changes: 11 additions & 0 deletions binding.gyp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"targets": [
{
"target_name": "msg",
"sources": [ "src/msg.cc" ],
"include_dirs" : [
"<!(node -e \"require('nan')\")"
]
}
]
}
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require('./lib/queue');
166 changes: 166 additions & 0 deletions lib/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
'use strict';

let EventEmitter = require('events').EventEmitter;
let inherits = require('util').inherits;
let msg = require('../build/Release/msg');

let MessageQueue = function(key, perms, options) {
if (typeof key !== 'number') {
throw new Error('Key must be a number. If you\'re providing a path, use ftok first.')
}

if (typeof perms === 'undefined') {
perms = 0x666;
} else if (typeof perms !== 'number') {
throw new Error('Permissions must be in number form, i.e. 0x666');
}

this.key = key;
this.perms = perms;
this.flags = formatFlags(perms);
this.options = options || { };

this.receiveBuffer = [];

this.on('newListener', (event) => {
if (event === 'data') {
setImmediate(() => this._beginReceive());
}
});

this._open();
};

inherits(MessageQueue, EventEmitter);

MessageQueue.open = (key, perms) => new MessageQueue(key, perms);

MessageQueue.prototype._open = function() {
if (this.id >= 0) throw new Error('Queue already open');

this.id = msg.get(this.key, this.flags);
};

MessageQueue.prototype.push = function(data, options, callback) {
if (!Buffer.isBuffer(data)) {
data = new Buffer(data);
}

if (typeof options == 'function') {
callback = options;
options = { };
}

if (typeof callback === 'undefined') {
callback = Function.prototype;
}

options = options || { };
if (typeof options.type === 'undefined') {
options.type = 1;
} else if (typeof options.type !== 'number' || options.type <= 0) {
return callback(new TypeError('Message type must be a positive nonzero integer or undefined'));
}

if (typeof options.flags === 'undefined') {
options.flags = 0;
} else if (typeof options.flags !== 'number') {
return callback(new Error('Message flags must be a number or undefined'));
}

msg.snd(this.id, data, options.type, options.flags, callback);
};

MessageQueue.prototype._beginReceive = function() {
if (this.listenerCount('data') === 0) return;

// TODO
let options = { };

this._receive(options, (err, data) => {
if (err) {
this.emit('error', err);
return;
}

if (this.listenerCount('data') > 0) {
this.emit('data', data);
} else {
this.receiveBuffer.push(data);
}

this._beginReceive();
});
};

MessageQueue.prototype._receive = function(options, callback) {
options = options || { };

if (typeof options.type === 'undefined') {
options.type = 0; // pop
} else if (typeof options.type !== 'number') {
return callback(new TypeError('Message type must be a positive nonzero integer or undefined'));
}

if (typeof options.flags === 'undefined') {
options.flags = 0;
}

if (this.receiveBuffer.length > 0) {
return callback(null, this.receiveBuffer.shift());
}
// TODO: handle buffered messages with different types

msg.rcv(this.id, MSGMAX, options.type, options.flags, callback);
};

MessageQueue.prototype.pop = function(options, callback) {
if (this.listenerCount('data') > 0) {
return callback(new Error('Do not use \'data\' event with pop()'));
}

if (typeof options === 'function') {
callback = options;
options = { };
}

if (typeof callback === 'undefined') {
callback = (err) => {
if (err) throw err;
};
}

options = options || { };
this._receive(options, callback);
};

MessageQueue.prototype.close = function() {
if (this.closed) throw new Error('Queue already closed');

return (this.closed = msg.close(this.id));
};

function formatFlags(perms) {
let uPerms = (perms & 0x006);
let gPerms = (perms & 0x060) >> 1;
let oPerms = (perms & 0x600) >> 2;

perms = uPerms | gPerms | oPerms;

let flags = 0;
flags |= MessageQueue.IPC_CREATE;
flags |= perms;

return flags;
}

// Static members and constants
const IPC_CREATE = MessageQueue.IPC_CREATE = 512;
const MSGMAX = MessageQueue.MSGMAX = 4052;
// MessageQueue.IPC_RMID =
MessageQueue.formatFlags = formatFlags;

// Export native bindings
MessageQueue.msg = msg;

module.exports = MessageQueue;
14 changes: 14 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "node-sysv-msg",
"version": "1.0.0",
"description": "Native System V (svipc) message queues in Node.js",
"main": "index.js",
"scripts": {
"install": "node-gyp rebuild"
},
"author": "",
"license": "ISC",
"dependencies": {
"nan": "^2.2.0"
}
}
190 changes: 190 additions & 0 deletions src/functions.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#include <nan.h>
#include <iostream>
#include <sys/msg.h>

using v8::Function;
using v8::Local;
using v8::Number;
using v8::Value;
using Nan::AsyncQueueWorker;
using Nan::AsyncWorker;
using Nan::Callback;
using Nan::New;
using Nan::Null;

#ifndef __USE_GNU

#ifndef MSGMAX
#define MSGMAX 4056
#endif

struct msgbuf {
long mtype;
char mtext[MSGMAX - 4];
};
#endif

const char *ConcatString(std::string one, const char *two) {
return one.append(two).c_str();
}

Local<Value> CreateError(std::string message, int errcode) {
message.append(": ");

Nan::MaybeLocal<Value> val = Nan::Error(ConcatString(message, strerror(errcode)));

return val.ToLocalChecked();
}

class SendMessageWorker : public AsyncWorker {

public:
SendMessageWorker(Callback *callback, int id, char *data, size_t dataLength, long type, int flags)
: AsyncWorker(callback), id(id), data(data), dataLength(dataLength), type(type), flags(flags) { }
~SendMessageWorker() { }

void Execute() {
msgbuf *message = new msgbuf;
message->mtype = type;

memcpy(message->mtext, data, dataLength);

ret = msgsnd(id, message, dataLength, flags);
error = errno;
}

void HandleOKCallback () {
Local<Value> argv[] = { Null() };

if (ret == -1) {
argv[0] = CreateError("Failed to send message", error);
}

callback->Call(1, argv);
}

private:
int id;
char *data;
int dataLength;
long type;
int flags;
int ret;
int error;

};

class ReceiveMessageWorker : public AsyncWorker {

public:
ReceiveMessageWorker(Callback *callback, int id, size_t bufferLength, long type, int flags)
: AsyncWorker(callback), id(id), bufferLength(bufferLength), type(type), flags(flags) { }
~ReceiveMessageWorker() { }

void Execute() {
message = new msgbuf;

bufferLength = msgrcv(id, message, bufferLength, type, flags);
error = errno;
}

void HandleOKCallback () {
Local<Value> argv[] = {
Null(),
Null()
};

if (bufferLength == (size_t) -1) {
argv[0] = CreateError("Failed to receive message", error);
} else {
argv[1] = Nan::CopyBuffer(message->mtext, bufferLength).ToLocalChecked();
}

callback->Call(2, argv);
}

private:
int id;
msgbuf *message;
size_t bufferLength;
long type;
int flags;
int error;
};

NAN_METHOD(GetMessageQueue) {
key_t key = (key_t) info[0]->Int32Value();
int flags = info[1]->Int32Value();

int queue = msgget(key, flags);

if (queue == -1) {
Nan::ThrowError(CreateError("Failed to get queue", errno));
}

info.GetReturnValue().Set(New(queue));
}

NAN_METHOD(ControlMessageQueue) {
int id = info[0]->Int32Value();
int cmd = info[1]->Int32Value();

msqid_ds *buf;

if (cmd != IPC_STAT && cmd != IPC_SET) {
buf = nullptr;
}
#ifdef linux
else if (cmd != MSG_STAT) {
buf = nullptr;
}
#endif
else {
buf = new msqid_ds;
}

int ret = msgctl(id, cmd, buf);

size_t bufSize = sizeof(msqid_ds);
char rawBuf[bufSize];
memcpy(rawBuf, buf, bufSize);

if (ret == 0) {
info.GetReturnValue().Set(Nan::NewBuffer(rawBuf, bufSize).ToLocalChecked());
} else {
info.GetReturnValue().Set(New(ret));
}
}

NAN_METHOD(CloseMessageQueue) {
int id = info[0]->Int32Value();

int ret = msgctl(id, IPC_RMID, nullptr);

if (ret != 0) {
Nan::ThrowError(CreateError("Failed to close queue", errno));
}

info.GetReturnValue().Set(Nan::True());
}

NAN_METHOD(SendMessage) {
int id = info[0]->Int32Value();
char* bufferData = node::Buffer::Data(info[1]);
size_t bufferLength = (size_t) node::Buffer::Length(info[1]);
long type = info[2]->Int32Value();
int flags = info[3]->Int32Value();
Callback *callback = new Callback(info[4].As<Function>());

AsyncQueueWorker(new SendMessageWorker(callback, id, bufferData, bufferLength, type, flags));
}

NAN_METHOD(ReceiveMessage) {
int id = info[0]->Int32Value();
size_t bufferLength = (size_t) info[1]->Int32Value();
long type = info[2]->Int32Value();
int flags = info[3]->Int32Value();
Callback *callback = new Callback(info[4].As<Function>());

AsyncQueueWorker(new ReceiveMessageWorker(callback, id, bufferLength, type, flags));
}
Loading

0 comments on commit b8c9d77

Please sign in to comment.