Skip to content

Commit

Permalink
Optimize OStream implementation (#1399)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardartoul authored Feb 25, 2019
1 parent 0647a12 commit 985eb5f
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 30 deletions.
120 changes: 95 additions & 25 deletions src/dbnode/encoding/ostream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,24 @@ const (

// Ostream encapsulates a writable stream.
type ostream struct {
rawBuffer checked.Bytes // raw bytes
pos int // how many bits have been used in the last byte
// We want to use a checked.Bytes when exposing or returning the buffer
// of the ostream. Unfortunately, the accounting overhead of going through
// the checked.Bytes for every write is massive. As a result, we store both
// the rawBuffer that backs the checked.Bytes AND the checked.Bytes themselves
// in this struct.
//
// That way, whenever we're writing to the buffer we can avoid the cost accounting
// overhead entirely, but when the data needs to be exposed outside of this datastructure
// or ownership of the data needs to be transferred, then we use the checked.Bytes, which
// is when the accounting really matters anyways.
//
// The rawBuffer and checked.Bytes may get out of sync as the rawBuffer is written to,
// but thats fine because we perform a "repair" by resetting the checked.Bytes underlying
// byte slice to be the rawBuffer whenever we expose a checked.Bytes to an external caller.
rawBuffer []byte
checked checked.Bytes

pos int // how many bits have been used in the last byte
bytesPool pool.CheckedBytesPool
}

Expand All @@ -45,17 +61,15 @@ func NewOStream(
if bytes == nil && initAllocIfEmpty {
bytes = checked.NewBytes(make([]byte, 0, initAllocSize), nil)
}

stream := &ostream{bytesPool: bytesPool}
stream.Reset(bytes)
return stream
}

// Len returns the length of the Ostream
func (os *ostream) Len() int {
if os.rawBuffer == nil {
return 0
}
return os.rawBuffer.Len()
return len(os.rawBuffer)
}

// Empty returns whether the Ostream is empty
Expand All @@ -73,22 +87,50 @@ func (os *ostream) hasUnusedBits() bool {

// grow appends the last byte of v to rawBuffer and sets pos to np.
func (os *ostream) grow(v byte, np int) {
os.ensureCapacityFor(1)
os.rawBuffer = append(os.rawBuffer, v)

os.pos = np
}

// ensureCapacity ensures that there is at least capacity for n more bytes.
func (os *ostream) ensureCapacityFor(n int) {
var (
currCap = cap(os.rawBuffer)
currLen = len(os.rawBuffer)
availableCap = currCap - currLen
missingCap = n - availableCap
)
if missingCap <= 0 {
// Already have enough capacity.
return
}

newCap := max(cap(os.rawBuffer)*2, currCap+missingCap)
if p := os.bytesPool; p != nil {
if b, swapped := pool.AppendByteChecked(os.rawBuffer, v, p); swapped {
os.rawBuffer.DecRef()
os.rawBuffer.Finalize()
os.rawBuffer = b
os.rawBuffer.IncRef()
newChecked := p.Get(newCap)
newChecked.IncRef()
newChecked.AppendAll(os.rawBuffer)

if os.checked != nil {
os.checked.DecRef()
os.checked.Finalize()
}

os.checked = newChecked
os.rawBuffer = os.checked.Bytes()
} else {
os.rawBuffer.Append(v)
}
newRawBuffer := make([]byte, 0, newCap)
newRawBuffer = append(newRawBuffer, os.rawBuffer...)
os.rawBuffer = newRawBuffer

os.pos = np
os.checked = checked.NewBytes(os.rawBuffer, nil)
os.checked.IncRef()
}
}

func (os *ostream) fillUnused(v byte) {
os.rawBuffer.Bytes()[os.lastIndex()] |= v >> uint(os.pos)
os.rawBuffer[os.lastIndex()] |= v >> uint(os.pos)
}

// WriteBit writes the last bit of v.
Expand All @@ -114,6 +156,10 @@ func (os *ostream) WriteByte(v byte) {

// WriteBytes writes a byte slice.
func (os *ostream) WriteBytes(bytes []byte) {
// Make sure we only have to grow the underlying buffer at most
// one time before we write one byte at a time in a loop.
os.ensureCapacityFor(len(bytes))

for i := 0; i < len(bytes); i++ {
os.WriteByte(bytes[i])
}
Expand Down Expand Up @@ -147,27 +193,35 @@ func (os *ostream) WriteBits(v uint64, numBits int) {

// Discard takes the ref to the checked bytes from the ostream
func (os *ostream) Discard() checked.Bytes {
buffer := os.rawBuffer
os.repairCheckedBytes()

buffer := os.checked
buffer.DecRef()
os.pos = 0

os.rawBuffer = nil
os.pos = 0
os.checked = nil

return buffer
}

// Reset resets the ostream
func (os *ostream) Reset(buffer checked.Bytes) {
if os.rawBuffer != nil {
if os.checked != nil {
// Release ref of the current raw buffer
os.rawBuffer.DecRef()
os.rawBuffer.Finalize()
os.checked.DecRef()
os.checked.Finalize()

os.rawBuffer = nil
os.checked = nil
}

os.rawBuffer = buffer

if os.rawBuffer != nil {
if buffer != nil {
// Track ref to the new raw buffer
os.rawBuffer.IncRef()
buffer.IncRef()

os.checked = buffer
os.rawBuffer = os.checked.Bytes()
}

os.pos = 0
Expand All @@ -180,5 +234,21 @@ func (os *ostream) Reset(buffer checked.Bytes) {

// Rawbytes returns the Osteam's raw bytes
func (os *ostream) Rawbytes() (checked.Bytes, int) {
return os.rawBuffer, os.pos
os.repairCheckedBytes()
return os.checked, os.pos
}

// repairCheckedBytes makes sure that the checked.Bytes wraps the rawBuffer as
// they may have fallen out of sync during the writing process.
func (os *ostream) repairCheckedBytes() {
if os.checked != nil {
os.checked.Reset(os.rawBuffer)
}
}

func max(x, y int) int {
if x > y {
return x
}
return y
}
103 changes: 98 additions & 5 deletions src/dbnode/encoding/ostream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,24 @@ package encoding
import (
"testing"

"github.com/m3db/m3x/pool"

"github.com/stretchr/testify/require"
)

var (
testBytesPool = newTestCheckedBytesPool()
)

func TestWriteBits(t *testing.T) {
testWriteBits(t, NewOStream(nil, true, nil))
}

func TestWriteBitsWithPooling(t *testing.T) {
testWriteBits(t, NewOStream(nil, true, testBytesPool))
}

func testWriteBits(t *testing.T, o OStream) {
inputs := []struct {
value uint64
numBits int
Expand All @@ -44,32 +58,111 @@ func TestWriteBits(t *testing.T) {
{0x1, 65, []byte{0xca, 0xfe, 0xfd, 0x89, 0x1a, 0x2b, 0x3c, 0x48, 0x55, 0xe6, 0xf7, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80}, 1},
}

o := NewOStream(nil, true, nil)
os := o.(*ostream)
require.True(t, os.Empty())
for _, input := range inputs {
os.WriteBits(input.value, input.numBits)
require.Equal(t, input.expectedBytes, os.rawBuffer.Bytes())
require.Equal(t, input.expectedBytes, os.rawBuffer)
checked, _ := os.Rawbytes()
require.Equal(t, input.expectedBytes, checked.Bytes())
require.Equal(t, input.expectedPos, os.pos)
}
require.False(t, os.Empty())
}

func TestWriteBytes(t *testing.T) {
o := NewOStream(nil, true, nil)
testWriteBytes(t, NewOStream(nil, true, nil))
}

func TestWriteBytesWithPooling(t *testing.T) {
testWriteBytes(t, NewOStream(nil, true, testBytesPool))
}

func testWriteBytes(t *testing.T, o OStream) {
os := o.(*ostream)
rawBytes := []byte{0x1, 0x2}
os.WriteBytes(rawBytes)
require.Equal(t, rawBytes, os.rawBuffer.Bytes())

require.Equal(t, rawBytes, os.rawBuffer)

checked, _ := os.Rawbytes()
require.Equal(t, rawBytes, checked.Bytes())

require.Equal(t, 8, os.pos)
}

func TestResetOStream(t *testing.T) {
o := NewOStream(nil, true, nil)
testResetOStream(t, NewOStream(nil, true, testBytesPool))
}

func TestResetOStreamWithPooling(t *testing.T) {
testResetOStream(t, NewOStream(nil, true, testBytesPool))
}

func testResetOStream(t *testing.T, o OStream) {
os := o.(*ostream)
os.WriteByte(0xfe)
os.Reset(nil)

require.True(t, os.Empty())
require.Equal(t, 0, os.Len())
require.Equal(t, 0, os.pos)

checked, _ := os.Rawbytes()
require.Equal(t, nil, checked)
}

func BenchmarkWriteBytes(b *testing.B) {
var (
bytes = make([]byte, 298)
bytesPool = testBytesPool
o = NewOStream(nil, false, bytesPool)
)
for n := 0; n < b.N; n++ {
o.Reset(nil)
o.WriteBytes(bytes)
}
}

func newTestCheckedBytesPool() pool.CheckedBytesPool {
bytesPoolOpts := pool.NewObjectPoolOptions()

bytesPool := pool.NewCheckedBytesPool([]pool.Bucket{
pool.Bucket{
Capacity: 16,
Count: 1,
},
pool.Bucket{
Capacity: 32,
Count: 1,
},
pool.Bucket{
Capacity: 64,
Count: 1,
},
pool.Bucket{
Capacity: 1,
Count: 1,
},
pool.Bucket{
Capacity: 256,
Count: 1,
},
pool.Bucket{
Capacity: 1440,
Count: 1,
},
pool.Bucket{
Capacity: 4096,
Count: 1,
},
pool.Bucket{
Capacity: 8192,
Count: 1,
},
}, bytesPoolOpts, func(s []pool.Bucket) pool.BytesPool {
return pool.NewBytesPool(s, bytesPoolOpts)
})
bytesPool.Init()
return bytesPool
}

0 comments on commit 985eb5f

Please sign in to comment.