diff --git a/go.mod b/go.mod index e97c3b2088..b4ddbc3138 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 + github.com/ronanh/intcomp v1.1.0 github.com/rs/cors v1.8.0 github.com/segmentio/ksuid v1.0.2 github.com/shellyln/go-sql-like-expr v0.0.1 diff --git a/go.sum b/go.sum index 224abee32c..fffbd35809 100644 --- a/go.sum +++ b/go.sum @@ -307,6 +307,8 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY= github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/ronanh/intcomp v1.1.0 h1:i54kxmpmSoOZFcWPMWryuakN0vLxLswASsGa07zkvLU= +github.com/ronanh/intcomp v1.1.0/go.mod h1:7FOLy3P3Zj3er/kVrU/pl+Ql7JFZj7bwliMGketo0IU= github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so= github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM= github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= diff --git a/pkg/byteconv/byteconv.go b/pkg/byteconv/byteconv.go index 499b17529c..ac481bffed 100644 --- a/pkg/byteconv/byteconv.go +++ b/pkg/byteconv/byteconv.go @@ -64,3 +64,10 @@ func ParseUint64(b []byte) (uint64, error) { func ParseFloat64(b []byte) (float64, error) { return strconv.ParseFloat(UnsafeString(b), 64) } + +func ReinterpretSlice[Out, In any](in []In) []Out { + outData := (*Out)(unsafe.Pointer(unsafe.SliceData(in))) + outLen := len(in) * int(unsafe.Sizeof(in[0])) / int(unsafe.Sizeof(*outData)) + outCap := cap(in) * int(unsafe.Sizeof(in[0])) / int(unsafe.Sizeof(*outData)) + return unsafe.Slice(outData, outCap)[:outLen] +} diff --git a/vng/array.go b/vng/array.go index 2ad27b0fbd..416e629380 100644 --- a/vng/array.go +++ b/vng/array.go @@ -11,7 +11,7 @@ import ( type ArrayEncoder struct { typ super.Type values Encoder - lengths *Int64Encoder + lengths Uint32Encoder count uint32 } @@ -19,16 +19,15 @@ var _ Encoder = (*ArrayEncoder)(nil) func NewArrayEncoder(typ *super.TypeArray) *ArrayEncoder { return &ArrayEncoder{ - typ: typ.Type, - values: NewEncoder(typ.Type), - lengths: NewInt64Encoder(), + typ: typ.Type, + values: NewEncoder(typ.Type), } } func (a *ArrayEncoder) Write(body zcode.Bytes) { a.count++ it := body.Iter() - var len int64 + var len uint32 for !it.Done() { a.values.Write(it.Next()) len++ @@ -49,11 +48,11 @@ func (a *ArrayEncoder) Emit(w io.Writer) error { } func (a *ArrayEncoder) Metadata(off uint64) (uint64, Metadata) { - off, lens := a.lengths.Metadata(off) + off, lens := a.lengths.Segment(off) off, vals := a.values.Metadata(off) return off, &Array{ Length: a.count, - Lengths: lens.(*Primitive).Location, //XXX + Lengths: lens, Values: vals, } } @@ -65,9 +64,8 @@ type SetEncoder struct { func NewSetEncoder(typ *super.TypeSet) *SetEncoder { return &SetEncoder{ ArrayEncoder{ - typ: typ.Type, - values: NewEncoder(typ.Type), - lengths: NewInt64Encoder(), + typ: typ.Type, + values: NewEncoder(typ.Type), }, } } diff --git a/vng/dynamic.go b/vng/dynamic.go index 435464652f..bf9b296f28 100644 --- a/vng/dynamic.go +++ b/vng/dynamic.go @@ -9,9 +9,9 @@ import ( ) type DynamicEncoder struct { - tags *Int64Encoder + tags Uint32Encoder values []Encoder - which map[super.Type]int + which map[super.Type]uint32 len uint32 } @@ -19,8 +19,7 @@ var _ zio.Writer = (*DynamicEncoder)(nil) func NewDynamicEncoder() *DynamicEncoder { return &DynamicEncoder{ - tags: NewInt64Encoder(), - which: make(map[super.Type]int), + which: make(map[super.Type]uint32), } } @@ -32,11 +31,11 @@ func (d *DynamicEncoder) Write(val super.Value) error { typ := val.Type() tag, ok := d.which[typ] if !ok { - tag = len(d.values) + tag = uint32(len(d.values)) d.values = append(d.values, NewEncoder(typ)) d.which[typ] = tag } - d.tags.Write(int64(tag)) + d.tags.Write(tag) d.len++ d.values[tag].Write(val.Bytes()) return nil @@ -58,14 +57,14 @@ func (d *DynamicEncoder) Encode() (Metadata, uint64, error) { return meta, off, nil } values := make([]Metadata, 0, len(d.values)) - off, tags := d.tags.Metadata(0) + off, tags := d.tags.Segment(0) for _, val := range d.values { var meta Metadata off, meta = val.Metadata(off) values = append(values, meta) } return &Dynamic{ - Tags: tags.(*Primitive).Location, + Tags: tags, Values: values, Length: d.len, }, off, nil diff --git a/vng/header.go b/vng/header.go index 0123c18789..b637a92dd1 100644 --- a/vng/header.go +++ b/vng/header.go @@ -8,7 +8,7 @@ import ( ) const ( - Version = 5 + Version = 6 HeaderSize = 24 MaxMetaSize = 100 * 1024 * 1024 MaxDataSize = 2 * 1024 * 1024 * 1024 diff --git a/vng/int.go b/vng/int.go index 9258408553..13f8a3427c 100644 --- a/vng/int.go +++ b/vng/int.go @@ -3,20 +3,46 @@ package vng import ( "io" - "github.com/brimdata/super" - "github.com/brimdata/super/zcode" + "github.com/brimdata/super/pkg/byteconv" + "github.com/ronanh/intcomp" + "golang.org/x/sync/errgroup" ) -type Int64Encoder struct { - PrimitiveEncoder +type Uint32Encoder struct { + vals []uint32 + out []byte + bytesLen uint64 } -func NewInt64Encoder() *Int64Encoder { - return &Int64Encoder{*NewPrimitiveEncoder(super.TypeInt64, false)} +func (u *Uint32Encoder) Write(v uint32) { + u.vals = append(u.vals, v) } -func (p *Int64Encoder) Write(v int64) { - p.PrimitiveEncoder.Write(super.EncodeInt(v)) +func (u *Uint32Encoder) Encode(group *errgroup.Group) { + group.Go(func() error { + u.bytesLen = uint64(len(u.vals) * 4) + compressed := intcomp.CompressUint32(u.vals, nil) + u.out = byteconv.ReinterpretSlice[byte](compressed) + return nil + }) +} + +func (u *Uint32Encoder) Emit(w io.Writer) error { + var err error + if len(u.out) > 0 { + _, err = w.Write(u.out) + } + return err +} + +func (u *Uint32Encoder) Segment(off uint64) (uint64, Segment) { + len := uint64(len(u.out)) + return off + len, Segment{ + Offset: off, + MemLength: len, + Length: u.bytesLen, + CompressionFormat: CompressionFormatNone, + } } func ReadUint32s(loc Segment, r io.ReaderAt) ([]uint32, error) { @@ -27,9 +53,5 @@ func ReadUint32s(loc Segment, r io.ReaderAt) ([]uint32, error) { } return nil, err } - var vals []uint32 - for it := zcode.Iter(buf); !it.Done(); { - vals = append(vals, uint32(super.DecodeInt(it.Next()))) - } - return vals, nil + return intcomp.UncompressUint32(byteconv.ReinterpretSlice[uint32](buf), nil), nil } diff --git a/vng/map.go b/vng/map.go index bcc4df31f0..90b92a0fd3 100644 --- a/vng/map.go +++ b/vng/map.go @@ -11,28 +11,27 @@ import ( type MapEncoder struct { keys Encoder values Encoder - lengths *Int64Encoder + lengths Uint32Encoder count uint32 } func NewMapEncoder(typ *super.TypeMap) *MapEncoder { return &MapEncoder{ - keys: NewEncoder(typ.KeyType), - values: NewEncoder(typ.ValType), - lengths: NewInt64Encoder(), + keys: NewEncoder(typ.KeyType), + values: NewEncoder(typ.ValType), } } func (m *MapEncoder) Write(body zcode.Bytes) { m.count++ - var len int + var len uint32 it := body.Iter() for !it.Done() { m.keys.Write(it.Next()) m.values.Write(it.Next()) len++ } - m.lengths.Write(int64(len)) + m.lengths.Write(len) } func (m *MapEncoder) Emit(w io.Writer) error { @@ -46,11 +45,11 @@ func (m *MapEncoder) Emit(w io.Writer) error { } func (m *MapEncoder) Metadata(off uint64) (uint64, Metadata) { - off, lens := m.lengths.Metadata(off) + off, lens := m.lengths.Segment(off) off, keys := m.keys.Metadata(off) off, vals := m.values.Metadata(off) return off, &Map{ - Lengths: lens.(*Primitive).Location, + Lengths: lens, Keys: keys, Values: vals, Length: m.count, diff --git a/vng/nulls.go b/vng/nulls.go index 0e88695be7..14fc754eb5 100644 --- a/vng/nulls.go +++ b/vng/nulls.go @@ -12,8 +12,8 @@ import ( // the first, which may be zero when the first value is non-null. type NullsEncoder struct { values Encoder - runs Int64Encoder - run int64 + runs Uint32Encoder + run uint32 null bool count uint32 } @@ -21,7 +21,6 @@ type NullsEncoder struct { func NewNullsEncoder(values Encoder) *NullsEncoder { return &NullsEncoder{ values: values, - runs: *NewInt64Encoder(), } } @@ -70,9 +69,9 @@ func (n *NullsEncoder) Metadata(off uint64) (uint64, Metadata) { if n.count == 0 { return off, values } - off, runs := n.runs.Metadata(off) + off, runs := n.runs.Segment(off) return off, &Nulls{ - Runs: runs.(*Primitive).Location, + Runs: runs, Values: values, Count: n.count, } diff --git a/vng/union.go b/vng/union.go index 7bddf00b25..ad8263324f 100644 --- a/vng/union.go +++ b/vng/union.go @@ -11,7 +11,7 @@ import ( type UnionEncoder struct { typ *super.TypeUnion values []Encoder - tags *Int64Encoder + tags Uint32Encoder count uint32 } @@ -25,7 +25,6 @@ func NewUnionEncoder(typ *super.TypeUnion) *UnionEncoder { return &UnionEncoder{ typ: typ, values: values, - tags: NewInt64Encoder(), } } @@ -33,7 +32,7 @@ func (u *UnionEncoder) Write(body zcode.Bytes) { u.count++ typ, zv := u.typ.Untag(body) tag := u.typ.TagOf(typ) - u.tags.Write(int64(tag)) + u.tags.Write(uint32(tag)) u.values[tag].Write(zv) } @@ -57,7 +56,7 @@ func (u *UnionEncoder) Encode(group *errgroup.Group) { } func (u *UnionEncoder) Metadata(off uint64) (uint64, Metadata) { - off, tags := u.tags.Metadata(off) + off, tags := u.tags.Segment(off) values := make([]Metadata, 0, len(u.values)) for _, val := range u.values { var meta Metadata @@ -65,7 +64,7 @@ func (u *UnionEncoder) Metadata(off uint64) (uint64, Metadata) { values = append(values, meta) } return off, &Union{ - Tags: tags.(*Primitive).Location, + Tags: tags, Values: values, Length: u.count, } diff --git a/vng/ztests/const.yaml b/vng/ztests/const.yaml index fcf7f7e9ea..6750a95bdd 100644 --- a/vng/ztests/const.yaml +++ b/vng/ztests/const.yaml @@ -12,5 +12,5 @@ inputs: outputs: - name: stdout data: | - {Version:5(uint32),MetaSize:35(uint64),DataSize:0(uint64)} + {Version:6(uint32),MetaSize:35(uint64),DataSize:0(uint64)} {Value:1,Count:3(uint32)}(=Const)