Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunk schema v3 #2934

Merged
merged 1 commit into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import (
const (
blocksPerChunk = 10
maxLineLength = 1024 * 1024 * 1024

_ byte = iota
chunkFormatV1
chunkFormatV2
chunkFormatV3
)

var (
magicNumber = uint32(0x12EE56A)

chunkFormatV1 = byte(1)
chunkFormatV2 = byte(2)
)

// The table gets initialized with sync.Once but may still cause a race
Expand Down Expand Up @@ -95,6 +97,15 @@ func (hb *headBlock) isEmpty() bool {
return len(hb.entries) == 0
}

func (hb *headBlock) clear() {
if hb.entries != nil {
hb.entries = hb.entries[:0]
}
hb.size = 0
hb.mint = 0
hb.maxt = 0
}

func (hb *headBlock) append(ts int64, line string) error {
if !hb.isEmpty() && hb.maxt > ts {
return ErrOutOfOrder
Expand Down Expand Up @@ -154,7 +165,7 @@ func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk {
blocks: []block{},

head: &headBlock{},
format: chunkFormatV2,
format: chunkFormatV3,

encoding: enc,
}
Expand Down Expand Up @@ -183,8 +194,8 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
switch version {
case chunkFormatV1:
bc.encoding = EncGZIP
case chunkFormatV2:
// format v2 has a byte for block encoding.
case chunkFormatV2, chunkFormatV3:
// format v2+ has a byte for block encoding.
enc := Encoding(db.byte())
if db.err() != nil {
return nil, errors.Wrap(db.err(), "verifying encoding")
Expand Down Expand Up @@ -218,6 +229,9 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {

// Read offset and length.
blk.offset = db.uvarint()
if version == chunkFormatV3 {
blk.uncompressedSize = db.uvarint()
}
l := db.uvarint()
blk.b = b[blk.offset : blk.offset+l]

Expand Down Expand Up @@ -259,8 +273,8 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
// Write the header (magicNum + version).
eb.putBE32(magicNumber)
eb.putByte(c.format)
if c.format == chunkFormatV2 {
// chunk format v2 has a byte for encoding.
if c.format > chunkFormatV1 {
// chunk format v2+ has a byte for encoding.
eb.putByte(byte(c.encoding))
}

Expand Down Expand Up @@ -296,6 +310,9 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
eb.putVarint64(b.mint)
eb.putVarint64(b.maxt)
eb.putUvarint(b.offset)
if c.format == chunkFormatV3 {
eb.putUvarint(b.uncompressedSize)
}
eb.putUvarint(len(b.b))
}
eb.putHash(crc32Hash)
Expand Down Expand Up @@ -441,10 +458,7 @@ func (c *MemChunk) cut() error {

c.cutBlockSize += len(b)

c.head.entries = c.head.entries[:0]
c.head.mint = 0 // Will be set on first append.
c.head.size = 0

c.head.clear()
return nil
}

Expand Down
90 changes: 58 additions & 32 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,53 +207,79 @@ func TestReadFormatV1(t *testing.T) {
// 2) []byte loaded chunks <-> []byte loaded chunks
func TestRoundtripV2(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
c := NewMemChunk(enc, testBlockSize, testTargetSize)
populated := fillChunk(c)
for _, version := range []byte{chunkFormatV2, chunkFormatV3} {
t.Run(enc.String(), func(t *testing.T) {
c := NewMemChunk(enc, testBlockSize, testTargetSize)
c.format = version
populated := fillChunk(c)

assertLines := func(c *MemChunk) {
require.Equal(t, enc, c.Encoding())
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
assertLines := func(c *MemChunk) {
require.Equal(t, enc, c.Encoding())
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
if err != nil {
t.Fatal(err)
}

i := int64(0)
var data int64
for it.Next() {
require.Equal(t, i, it.Entry().Timestamp.UnixNano())
require.Equal(t, testdata.LogString(i), it.Entry().Line)

data += int64(len(it.Entry().Line))
i++
}
require.Equal(t, populated, data)
}

assertLines(c)

// test MemChunk -> NewByteChunk loading
b, err := c.Bytes()
if err != nil {
t.Fatal(err)
}

i := int64(0)
var data int64
for it.Next() {
require.Equal(t, i, it.Entry().Timestamp.UnixNano())
require.Equal(t, testdata.LogString(i), it.Entry().Line)

data += int64(len(it.Entry().Line))
i++
r, err := NewByteChunk(b, testBlockSize, testTargetSize)
if err != nil {
t.Fatal(err)
}
require.Equal(t, populated, data)
}
assertLines(r)

assertLines(c)
// test NewByteChunk -> NewByteChunk loading
rOut, err := r.Bytes()
require.Nil(t, err)

// test MemChunk -> NewByteChunk loading
b, err := c.Bytes()
if err != nil {
t.Fatal(err)
}
loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize)
require.Nil(t, err)

r, err := NewByteChunk(b, testBlockSize, testTargetSize)
if err != nil {
t.Fatal(err)
}
assertLines(r)
assertLines(loaded)
})
}

}
}

// test NewByteChunk -> NewByteChunk loading
rOut, err := r.Bytes()
func TestRoundtripV3(t *testing.T) {

for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
c := NewMemChunk(enc, testBlockSize, testTargetSize)
_ = fillChunk(c)

b, err := c.Bytes()
require.Nil(t, err)
r, err := NewByteChunk(b, testBlockSize, testTargetSize)
require.Nil(t, err)

loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize)
// have to populate then clear the head block or fail comparing against nil vs zero values
err = r.head.append(1, "1")
require.Nil(t, err)
r.head.clear()

assertLines(loaded)
})
require.Equal(t, c, r)

})
}

}
Expand Down