Skip to content

Commit

Permalink
[dbnode] Decode ReadBits improvements (#2197)
Browse files Browse the repository at this point in the history
  • Loading branch information
rallen090 authored Mar 11, 2020
1 parent 8f6d216 commit 07e7df8
Show file tree
Hide file tree
Showing 12 changed files with 101 additions and 91 deletions.
4 changes: 2 additions & 2 deletions src/dbnode/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func LeadingAndTrailingZeros(v uint64) (int, int) {
}

// SignExtend sign extends the highest bit of v which has numBits (<=64)
func SignExtend(v uint64, numBits int) int64 {
shift := uint(64 - numBits)
func SignExtend(v uint64, numBits uint) int64 {
shift := 64 - numBits
return (int64(v) << shift) >> shift
}
8 changes: 4 additions & 4 deletions src/dbnode/encoding/encoding_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 11 additions & 15 deletions src/dbnode/encoding/istream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type istream struct {
r *bufio.Reader // encoded stream
err error // error encountered
current byte // current byte we are working off of
remaining int // bits remaining in current to be read
remaining uint // bits remaining in current to be read
}

// NewIStream creates a new Istream
Expand Down Expand Up @@ -93,11 +93,10 @@ func (is *istream) ReadByte() (byte, error) {
}

// ReadBits reads the next Bits
func (is *istream) ReadBits(numBits int) (uint64, error) {
func (is *istream) ReadBits(numBits uint) (uint64, error) {
if is.err != nil {
return 0, is.err
}

var res uint64
for numBits >= 8 {
byteRead, err := is.ReadByte()
Expand All @@ -121,8 +120,8 @@ func (is *istream) ReadBits(numBits int) (uint64, error) {
if is.remaining < numToRead {
numToRead = is.remaining
}
bits := is.current >> uint(8-numToRead)
is.current <<= uint(numToRead)
bits := is.current >> (8 - numToRead)
is.current <<= numToRead
is.remaining -= numToRead
res = (res << uint64(numToRead)) | uint64(bits)
numBits -= numToRead
Expand All @@ -131,10 +130,7 @@ func (is *istream) ReadBits(numBits int) (uint64, error) {
}

// PeekBits looks at the next Bits, but doesn't move the pos
func (is *istream) PeekBits(numBits int) (uint64, error) {
if is.err != nil {
return 0, is.err
}
func (is *istream) PeekBits(numBits uint) (uint64, error) {
// check the last byte first
if numBits <= is.remaining {
return uint64(readBitsInByte(is.current, numBits)), nil
Expand All @@ -152,25 +148,25 @@ func (is *istream) PeekBits(numBits int) (uint64, error) {
numBitsRead += 8
}
remainder := readBitsInByte(bytesRead[numBytesToRead-1], numBits-numBitsRead)
res = (res << uint(numBits-numBitsRead)) | uint64(remainder)
res = (res << (numBits - numBitsRead)) | uint64(remainder)
return res, nil
}

// RemainingBitsInCurrentByte returns the number of bits remaining to be read in
// the current byte.
func (is *istream) RemainingBitsInCurrentByte() int {
func (is *istream) RemainingBitsInCurrentByte() uint {
return is.remaining
}

// readBitsInByte reads numBits in byte b.
func readBitsInByte(b byte, numBits int) byte {
return b >> uint(8-numBits)
func readBitsInByte(b byte, numBits uint) byte {
return b >> (8 - numBits)
}

// consumeBuffer consumes numBits in is.current.
func (is *istream) consumeBuffer(numBits int) byte {
func (is *istream) consumeBuffer(numBits uint) byte {
res := readBitsInByte(is.current, numBits)
is.current <<= uint(numBits)
is.current <<= numBits
is.remaining -= numBits
return res
}
Expand Down
15 changes: 5 additions & 10 deletions src/dbnode/encoding/istream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestReadBits(t *testing.T) {

o := NewIStream(bytes.NewReader(byteStream), 16)
is := o.(*istream)
numBits := []int{1, 3, 4, 8, 7, 2, 64, 64}
numBits := []uint{1, 3, 4, 8, 7, 2, 64, 64}
var res []uint64
for _, v := range numBits {
read, err := is.ReadBits(v)
Expand All @@ -44,19 +44,17 @@ func TestReadBits(t *testing.T) {
}
expected := []uint64{0x1, 0x4, 0xa, 0xfe, 0x7e, 0x3, 0x1234567890abcdef, 0x1}
require.Equal(t, expected, res)
require.NoError(t, is.err)

_, err := is.ReadBits(8)
require.Error(t, err)
require.Error(t, is.err)
}

func TestPeekBitsSuccess(t *testing.T) {
byteStream := []byte{0xa9, 0xfe, 0xfe, 0xdf, 0x9b, 0x57, 0x21, 0xf1}
o := NewIStream(bytes.NewReader(byteStream), 16)
is := o.(*istream)
inputs := []struct {
numBits int
numBits uint
expected uint64
}{
{0, 0},
Expand All @@ -73,9 +71,8 @@ func TestPeekBitsSuccess(t *testing.T) {
require.NoError(t, err)
require.Equal(t, input.expected, res)
}
require.NoError(t, is.err)
require.Equal(t, byte(0), is.current)
require.Equal(t, 0, is.remaining)
require.Equal(t, 0, int(is.remaining))
}

func TestPeekBitsError(t *testing.T) {
Expand All @@ -98,7 +95,7 @@ func TestReadAfterPeekBits(t *testing.T) {
require.Error(t, err)

inputs := []struct {
numBits int
numBits uint
expected uint64
}{
{2, 0x2},
Expand All @@ -117,9 +114,7 @@ func TestResetIStream(t *testing.T) {
o := NewIStream(bytes.NewReader(nil), 16)
is := o.(*istream)
is.ReadBits(1)
require.Error(t, is.err)
is.Reset(bytes.NewReader(nil))
require.NoError(t, is.err)
require.Equal(t, byte(0), is.current)
require.Equal(t, 0, is.remaining)
require.Equal(t, 0, int(is.remaining))
}
4 changes: 2 additions & 2 deletions src/dbnode/encoding/m3tsz/float_encoder_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (eit *FloatEncoderAndIterator) readNextFloat(stream encoding.IStream) error
cb = (cb << 1) | nextCB
if cb == opcodeContainedValueXOR {
previousLeading, previousTrailing := encoding.LeadingAndTrailingZeros(eit.PrevXOR)
numMeaningfulBits := 64 - previousLeading - previousTrailing
numMeaningfulBits := uint(64 - previousLeading - previousTrailing)
meaningfulBits, err := stream.ReadBits(numMeaningfulBits)
if err != nil {
return err
Expand All @@ -153,7 +153,7 @@ func (eit *FloatEncoderAndIterator) readNextFloat(stream encoding.IStream) error
numLeadingZeros := (numLeadingZeroesAndNumMeaningfulBits & bits12To6Mask) >> 6
numMeaningfulBits := (numLeadingZeroesAndNumMeaningfulBits & bits6To0Mask) + 1

meaningfulBits, err := stream.ReadBits(int(numMeaningfulBits))
meaningfulBits, err := stream.ReadBits(uint(numMeaningfulBits))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/encoding/m3tsz/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ func (it *readerIterator) readIntValDiff() {
sign = 1.0
}

it.intVal += sign * float64(it.readBits(int(it.sig)))
it.intVal += sign * float64(it.readBits(uint(it.sig)))
}

func (it *readerIterator) readBits(numBits int) uint64 {
func (it *readerIterator) readBits(numBits uint) uint64 {
if !it.hasNext() {
return 0
}
Expand Down
42 changes: 24 additions & 18 deletions src/dbnode/encoding/m3tsz/timestamp_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,21 @@ type TimestampIterator struct {
// schemes. Setting SkipMarkers to true disables the look ahead behavior
// for situations where looking ahead is not safe.
SkipMarkers bool

numValueBits uint
numBits uint
markerEncodingScheme encoding.MarkerEncodingScheme
}

// NewTimestampIterator creates a new TimestampIterator.
func NewTimestampIterator(opts encoding.Options, skipMarkers bool) TimestampIterator {
mes := opts.MarkerEncodingScheme()
return TimestampIterator{
Opts: opts,
SkipMarkers: skipMarkers,
Opts: opts,
SkipMarkers: skipMarkers,
numValueBits: uint(mes.NumValueBits()),
numBits: uint(mes.NumOpcodeBits() + mes.NumValueBits()),
markerEncodingScheme: mes,
}
}

Expand Down Expand Up @@ -137,32 +145,30 @@ func (it *TimestampIterator) readNextTimestamp(stream encoding.IStream) error {
}

func (it *TimestampIterator) tryReadMarker(stream encoding.IStream) (time.Duration, bool, error) {
mes := it.Opts.MarkerEncodingScheme()
numBits := mes.NumOpcodeBits() + mes.NumValueBits()
opcodeAndValue, success := it.tryPeekBits(stream, numBits)
opcodeAndValue, success := it.tryPeekBits(stream, it.numBits)
if !success {
return 0, false, nil
}

opcode := opcodeAndValue >> uint(mes.NumValueBits())
if opcode != mes.Opcode() {
opcode := opcodeAndValue >> it.numValueBits
if opcode != it.markerEncodingScheme.Opcode() {
return 0, false, nil
}

var (
valueMask = (1 << uint(mes.NumValueBits())) - 1
valueMask = (1 << it.numValueBits) - 1
markerValue = int64(opcodeAndValue & uint64(valueMask))
)
switch encoding.Marker(markerValue) {
case mes.EndOfStream():
_, err := stream.ReadBits(numBits)
case it.markerEncodingScheme.EndOfStream():
_, err := stream.ReadBits(it.numBits)
if err != nil {
return 0, false, err
}
it.Done = true
return 0, true, nil
case mes.Annotation():
_, err := stream.ReadBits(numBits)
case it.markerEncodingScheme.Annotation():
_, err := stream.ReadBits(it.numBits)
if err != nil {
return 0, false, err
}
Expand All @@ -175,8 +181,8 @@ func (it *TimestampIterator) tryReadMarker(stream encoding.IStream) (time.Durati
return 0, false, err
}
return markerOrDOD, true, nil
case mes.TimeUnit():
_, err := stream.ReadBits(numBits)
case it.markerEncodingScheme.TimeUnit():
_, err := stream.ReadBits(it.numBits)
if err != nil {
return 0, false, err
}
Expand Down Expand Up @@ -248,12 +254,12 @@ func (it *TimestampIterator) readDeltaOfDelta(

cb = (cb << 1) | nextCB
if cb == buckets[i].Opcode() {
dodBits, err := stream.ReadBits(buckets[i].NumValueBits())
dodBits, err := stream.ReadBits(uint(buckets[i].NumValueBits()))
if err != nil {
return 0, err
}

dod := encoding.SignExtend(dodBits, buckets[i].NumValueBits())
dod := encoding.SignExtend(dodBits, uint(buckets[i].NumValueBits()))
timeUnit, err := it.TimeUnit.Value()
if err != nil {
return 0, nil
Expand All @@ -263,7 +269,7 @@ func (it *TimestampIterator) readDeltaOfDelta(
}
}

numValueBits := tes.DefaultBucket().NumValueBits()
numValueBits := uint(tes.DefaultBucket().NumValueBits())
dodBits, err := stream.ReadBits(numValueBits)
if err != nil {
return 0, err
Expand Down Expand Up @@ -310,7 +316,7 @@ func (it *TimestampIterator) readVarint(stream encoding.IStream) (int, error) {
return int(res), err
}

func (it *TimestampIterator) tryPeekBits(stream encoding.IStream, numBits int) (uint64, bool) {
func (it *TimestampIterator) tryPeekBits(stream encoding.IStream, numBits uint) (uint64, bool) {
res, err := stream.PeekBits(numBits)
if err != nil {
return 0, false
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
const (
defaultDefaultTimeUnit = xtime.Second
defaultByteFieldDictLRUSize = 4
defaultIStreamReaderSizeM3TSZ = 16
defaultIStreamReaderSizeM3TSZ = 8 * 2
defaultIStreamReaderSizeProto = 128
)

Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/proto/int_encoder_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (eit *intEncoderAndIterator) readIntValDiff(stream encoding.IStream) error
itErrPrefix, err)
}

numSig := int(eit.intSigBitsTracker.NumSig)
numSig := uint(eit.intSigBitsTracker.NumSig)
diffSigBits, err := stream.ReadBits(numSig)
if err != nil {
return fmt.Errorf(
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/encoding/proto/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (it *iterator) readCustomFieldsSchema() error {
}

for i := 1; i <= int(numCustomFields); i++ {
fieldTypeBits, err := it.stream.ReadBits(numBitsToEncodeCustomType)
fieldTypeBits, err := it.stream.ReadBits(uint(numBitsToEncodeCustomType))
if err != nil {
return err
}
Expand Down Expand Up @@ -546,7 +546,7 @@ func (it *iterator) readBytesValue(i int, customField customFieldState) error {

if valueInDictControlBit == opCodeInterpretSubsequentBitsAsLRUIndex {
dictIdxBits, err := it.stream.ReadBits(
numBitsRequiredForNumUpToN(it.byteFieldDictLRUSize))
uint(numBitsRequiredForNumUpToN(it.byteFieldDictLRUSize)))
if err != nil {
return fmt.Errorf(
"%s error trying to read bytes dict idx: %v",
Expand Down Expand Up @@ -861,7 +861,7 @@ func (it *iterator) nextToBeEvicted(fieldIdx int) []byte {
return dict[0]
}

func (it *iterator) readBits(numBits int) (uint64, error) {
func (it *iterator) readBits(numBits uint) (uint64, error) {
res, err := it.stream.ReadBits(numBits)
if err != nil {
return 0, err
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,9 @@ type IStream interface {
Read([]byte) (int, error)
ReadBit() (Bit, error)
ReadByte() (byte, error)
ReadBits(numBits int) (uint64, error)
PeekBits(numBits int) (uint64, error)
RemainingBitsInCurrentByte() int
ReadBits(numBits uint) (uint64, error)
PeekBits(numBits uint) (uint64, error)
RemainingBitsInCurrentByte() uint
Reset(r io.Reader)
}

Expand Down
Loading

0 comments on commit 07e7df8

Please sign in to comment.