Skip to content

Commit

Permalink
Merge pull request #5 from dignifiedquire/fixed
Browse files Browse the repository at this point in the history
feat: add fixed prefix length as an option
  • Loading branch information
dignifiedquire authored Sep 5, 2016
2 parents 2ea9d6c + e310e47 commit 934882c
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 27 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,38 @@ pull(
)
```

## API

### `encode([opts])`

- `opts: Object`, optional
- `fixed: false`:
- `bytes: 4`: If `fixed` is `true` this is the amount of bytes used for the prefix.

By default all messages will be prefixed with a varint. If you want to use a fixed length prefix you can specify this through the `opts`.

Returns a pull-stream through.

### `decode([opts])`

- `opts: Object`, optional
- `fixed: false`:
- `bytes: 4`: If `fixed` is `true` this is the amount of bytes used for the prefix.

By default all messages will be prefixed with a varint. If you want to use a fixed length prefix you can specify this through the `opts`.


Returns a pull-stream through.

### `decodeFromReader(reader, [opts], cb)`

- `reader: [pull-reader](https://github.com/dominictarr/pull-reader)`
- `opts: Object`, optional. Same as for `decode`.
- `cb: Function`: Callback called with `(err, message)`.

This uses a [pull-reader](https://github.com/dominictarr/pull-reader) instance to reade and decode a single message. Useful when using [pull-handshake](https://github.com/pull-stream/pull-handshake) with length prefixed messages.


## Contribute

PRs and issues gladly accepted! Check out the [issues](//github.com/dignifiedquire/pull-length-prefixed/issues).
Expand Down
66 changes: 51 additions & 15 deletions src/decode.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ exports.decodeFromReader = decodeFromReader
const MSB = 0x80
const isEndByte = (byte) => !(byte & MSB)

function decode () {
function decode (opts) {
let reader = new Reader()
let p = pushable((err) => {
reader.abort(err)
Expand All @@ -20,7 +20,7 @@ function decode () {
return (read) => {
reader(read)
function next () {
decodeFromReader(reader, (err, msg) => {
decodeFromReader(reader, opts, (err, msg) => {
if (err) return p.end(err)

p.push(msg)
Expand All @@ -33,7 +33,36 @@ function decode () {
}
}

function decodeFromReader (reader, cb) {
function decodeFromReader (reader, opts, cb) {
if (typeof opts === 'function') {
cb = opts
opts = {}
}

opts = Object.assign({
fixed: false,
bytes: 4
}, opts || {})

if (opts.fixed) {
readFixedMessage(reader, opts.bytes, cb)
} else {
readVarintMessage(reader, cb)
}
}

function readFixedMessage (reader, byteLength, cb) {
reader.read(byteLength, (err, bytes) => {
if (err) {
return cb(err)
}

const msgSize = bytes.readInt32BE(0)
readMessage(reader, msgSize, cb)
})
}

function readVarintMessage (reader, cb) {
let rawMsgSize = []
if (rawMsgSize.length === 0) readByte()

Expand All @@ -48,22 +77,29 @@ function decodeFromReader (reader, cb) {

if (byte && !isEndByte(byte[0])) {
readByte()
} else {
readMessage()
return
}
})
}

function readMessage () {
const msgSize = varint.decode(Buffer.concat(rawMsgSize))
reader.read(msgSize, (err, msg) => {
if (err) {
return cb(err)
}
const msgSize = varint.decode(Buffer.concat(rawMsgSize))
readMessage(reader, msgSize, (err, msg) => {
if (err) {
return cb(err)
}

rawMsgSize = []
rawMsgSize = []

cb(null, msg)
cb(null, msg)
})
})
}
}

function readMessage (reader, size, cb) {
reader.read(size, (err, msg) => {
if (err) {
return cb(err)
}

cb(null, msg)
})
}
43 changes: 31 additions & 12 deletions src/encode.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
'use strict'

const varint = require('varint')
const Buffer = require('safe-buffer').Buffer

module.exports = encode

function encode () {
const poolSize = 10 * 1024
let pool = Buffer.alloc(poolSize)
const poolSize = 10 * 1024

function encode (opts) {
opts = Object.assign({
fixed: false,
bytes: 4
}, opts || {})

// Only needed for varint
const varint = require('varint')
let pool = opts.fixed ? null : createPool()
let used = 0

let ended = false

return (read) => (end, cb) => {
Expand All @@ -24,18 +32,29 @@ function encode () {
return cb(ended)
}

varint.encode(data.length, pool, used)
used += varint.encode.bytes
let encodedLength
if (opts.fixed) {
encodedLength = Buffer.alloc(opts.bytes)
encodedLength.writeInt32BE(data.length, 0)
} else {
varint.encode(data.length, pool, used)
used += varint.encode.bytes
encodedLength = pool.slice(used - varint.encode.bytes, used)

if (pool.length - used < 100) {
pool = createPool()
used = 0
}
}

cb(null, Buffer.concat([
pool.slice(used - varint.encode.bytes, used),
encodedLength,
data
]))

if (pool.length - used < 100) {
pool = Buffer.alloc(poolSize)
used = 0
}
})
}
}

function createPool () {
return Buffer.alloc(poolSize)
}
51 changes: 51 additions & 0 deletions test/fixed.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/* eslint-env mocha */
'use strict'

const pull = require('pull-stream')
const expect = require('chai').expect

const lp = require('../src')

describe('pull-length-prefixed', () => {
it('basics', (done) => {
const input = [
new Buffer('hello '),
new Buffer('world')
]

pull(
pull.values(input),
lp.encode({fixed: true}),
pull.collect((err, encoded) => {
if (err) throw err

expect(
encoded
).to.be.eql([
Buffer.concat([
new Buffer('00000006', 'hex'),
new Buffer('hello ')
]),
Buffer.concat([
new Buffer('00000005', 'hex'),
new Buffer('world')
])
])

pull(
pull.values(encoded),
lp.decode({fixed: true}),
pull.collect((err, output) => {
if (err) throw err
expect(
input
).to.be.eql(
output
)
done()
})
)
})
)
})
})

0 comments on commit 934882c

Please sign in to comment.