Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vng: Use intcomp for uint32 vectors #5562

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
github.com/pmezard/go-difflib v1.0.0
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/ronanh/intcomp v1.1.0
github.com/rs/cors v1.8.0
github.com/segmentio/ksuid v1.0.2
github.com/shellyln/go-sql-like-expr v0.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua
github.com/rivo/uniseg v0.1.0 h1:+2KBaVoUmb9XzDsrx/Ct0W/EYOSFf/nWTauy++DprtY=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/ronanh/intcomp v1.1.0 h1:i54kxmpmSoOZFcWPMWryuakN0vLxLswASsGa07zkvLU=
github.com/ronanh/intcomp v1.1.0/go.mod h1:7FOLy3P3Zj3er/kVrU/pl+Ql7JFZj7bwliMGketo0IU=
github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so=
github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
Expand Down
7 changes: 7 additions & 0 deletions pkg/byteconv/byteconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,10 @@ func ParseUint64(b []byte) (uint64, error) {
func ParseFloat64(b []byte) (float64, error) {
return strconv.ParseFloat(UnsafeString(b), 64)
}

func ReinterpretSlice[Out, In any](in []In) []Out {
outData := (*Out)(unsafe.Pointer(unsafe.SliceData(in)))
outLen := len(in) * int(unsafe.Sizeof(in[0])) / int(unsafe.Sizeof(*outData))
outCap := cap(in) * int(unsafe.Sizeof(in[0])) / int(unsafe.Sizeof(*outData))
return unsafe.Slice(outData, outCap)[:outLen]
}
18 changes: 8 additions & 10 deletions vng/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,23 @@ import (
type ArrayEncoder struct {
typ super.Type
values Encoder
lengths *Int64Encoder
lengths Uint32Encoder
count uint32
}

var _ Encoder = (*ArrayEncoder)(nil)

func NewArrayEncoder(typ *super.TypeArray) *ArrayEncoder {
return &ArrayEncoder{
typ: typ.Type,
values: NewEncoder(typ.Type),
lengths: NewInt64Encoder(),
typ: typ.Type,
values: NewEncoder(typ.Type),
}
}

func (a *ArrayEncoder) Write(body zcode.Bytes) {
a.count++
it := body.Iter()
var len int64
var len uint32
for !it.Done() {
a.values.Write(it.Next())
len++
Expand All @@ -49,11 +48,11 @@ func (a *ArrayEncoder) Emit(w io.Writer) error {
}

func (a *ArrayEncoder) Metadata(off uint64) (uint64, Metadata) {
off, lens := a.lengths.Metadata(off)
off, lens := a.lengths.Segment(off)
off, vals := a.values.Metadata(off)
return off, &Array{
Length: a.count,
Lengths: lens.(*Primitive).Location, //XXX
Lengths: lens,
Values: vals,
}
}
Expand All @@ -65,9 +64,8 @@ type SetEncoder struct {
func NewSetEncoder(typ *super.TypeSet) *SetEncoder {
return &SetEncoder{
ArrayEncoder{
typ: typ.Type,
values: NewEncoder(typ.Type),
lengths: NewInt64Encoder(),
typ: typ.Type,
values: NewEncoder(typ.Type),
},
}
}
Expand Down
15 changes: 7 additions & 8 deletions vng/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@ import (
)

type DynamicEncoder struct {
tags *Int64Encoder
tags Uint32Encoder
values []Encoder
which map[super.Type]int
which map[super.Type]uint32
len uint32
}

var _ zio.Writer = (*DynamicEncoder)(nil)

func NewDynamicEncoder() *DynamicEncoder {
return &DynamicEncoder{
tags: NewInt64Encoder(),
which: make(map[super.Type]int),
which: make(map[super.Type]uint32),
}
}

Expand All @@ -32,11 +31,11 @@ func (d *DynamicEncoder) Write(val super.Value) error {
typ := val.Type()
tag, ok := d.which[typ]
if !ok {
tag = len(d.values)
tag = uint32(len(d.values))
d.values = append(d.values, NewEncoder(typ))
d.which[typ] = tag
}
d.tags.Write(int64(tag))
d.tags.Write(tag)
d.len++
d.values[tag].Write(val.Bytes())
return nil
Expand All @@ -58,14 +57,14 @@ func (d *DynamicEncoder) Encode() (Metadata, uint64, error) {
return meta, off, nil
}
values := make([]Metadata, 0, len(d.values))
off, tags := d.tags.Metadata(0)
off, tags := d.tags.Segment(0)
for _, val := range d.values {
var meta Metadata
off, meta = val.Metadata(off)
values = append(values, meta)
}
return &Dynamic{
Tags: tags.(*Primitive).Location,
Tags: tags,
Values: values,
Length: d.len,
}, off, nil
Expand Down
2 changes: 1 addition & 1 deletion vng/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

const (
Version = 5
Version = 6
HeaderSize = 24
MaxMetaSize = 100 * 1024 * 1024
MaxDataSize = 2 * 1024 * 1024 * 1024
Expand Down
48 changes: 35 additions & 13 deletions vng/int.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,46 @@ package vng
import (
"io"

"github.com/brimdata/super"
"github.com/brimdata/super/zcode"
"github.com/brimdata/super/pkg/byteconv"
"github.com/ronanh/intcomp"
"golang.org/x/sync/errgroup"
)

type Int64Encoder struct {
PrimitiveEncoder
type Uint32Encoder struct {
vals []uint32
out []byte
bytesLen uint64
}

func NewInt64Encoder() *Int64Encoder {
return &Int64Encoder{*NewPrimitiveEncoder(super.TypeInt64, false)}
func (u *Uint32Encoder) Write(v uint32) {
u.vals = append(u.vals, v)
}

func (p *Int64Encoder) Write(v int64) {
p.PrimitiveEncoder.Write(super.EncodeInt(v))
func (u *Uint32Encoder) Encode(group *errgroup.Group) {
group.Go(func() error {
u.bytesLen = uint64(len(u.vals) * 4)
compressed := intcomp.CompressUint32(u.vals, nil)
u.out = byteconv.ReinterpretSlice[byte](compressed)
return nil
})
}

func (u *Uint32Encoder) Emit(w io.Writer) error {
var err error
if len(u.out) > 0 {
_, err = w.Write(u.out)
}
return err
}

func (u *Uint32Encoder) Segment(off uint64) (uint64, Segment) {
len := uint64(len(u.out))
return off + len, Segment{
Offset: off,
MemLength: len,
Length: u.bytesLen,
CompressionFormat: CompressionFormatNone,
}
}

func ReadUint32s(loc Segment, r io.ReaderAt) ([]uint32, error) {
Expand All @@ -27,9 +53,5 @@ func ReadUint32s(loc Segment, r io.ReaderAt) ([]uint32, error) {
}
return nil, err
}
var vals []uint32
for it := zcode.Iter(buf); !it.Done(); {
vals = append(vals, uint32(super.DecodeInt(it.Next())))
}
return vals, nil
return intcomp.UncompressUint32(byteconv.ReinterpretSlice[uint32](buf), nil), nil
}
15 changes: 7 additions & 8 deletions vng/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,27 @@ import (
type MapEncoder struct {
keys Encoder
values Encoder
lengths *Int64Encoder
lengths Uint32Encoder
count uint32
}

func NewMapEncoder(typ *super.TypeMap) *MapEncoder {
return &MapEncoder{
keys: NewEncoder(typ.KeyType),
values: NewEncoder(typ.ValType),
lengths: NewInt64Encoder(),
keys: NewEncoder(typ.KeyType),
values: NewEncoder(typ.ValType),
}
}

func (m *MapEncoder) Write(body zcode.Bytes) {
m.count++
var len int
var len uint32
it := body.Iter()
for !it.Done() {
m.keys.Write(it.Next())
m.values.Write(it.Next())
len++
}
m.lengths.Write(int64(len))
m.lengths.Write(len)
}

func (m *MapEncoder) Emit(w io.Writer) error {
Expand All @@ -46,11 +45,11 @@ func (m *MapEncoder) Emit(w io.Writer) error {
}

func (m *MapEncoder) Metadata(off uint64) (uint64, Metadata) {
off, lens := m.lengths.Metadata(off)
off, lens := m.lengths.Segment(off)
off, keys := m.keys.Metadata(off)
off, vals := m.values.Metadata(off)
return off, &Map{
Lengths: lens.(*Primitive).Location,
Lengths: lens,
Keys: keys,
Values: vals,
Length: m.count,
Expand Down
9 changes: 4 additions & 5 deletions vng/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
// the first, which may be zero when the first value is non-null.
type NullsEncoder struct {
values Encoder
runs Int64Encoder
run int64
runs Uint32Encoder
run uint32
null bool
count uint32
}

func NewNullsEncoder(values Encoder) *NullsEncoder {
return &NullsEncoder{
values: values,
runs: *NewInt64Encoder(),
}
}

Expand Down Expand Up @@ -70,9 +69,9 @@ func (n *NullsEncoder) Metadata(off uint64) (uint64, Metadata) {
if n.count == 0 {
return off, values
}
off, runs := n.runs.Metadata(off)
off, runs := n.runs.Segment(off)
return off, &Nulls{
Runs: runs.(*Primitive).Location,
Runs: runs,
Values: values,
Count: n.count,
}
Expand Down
9 changes: 4 additions & 5 deletions vng/union.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type UnionEncoder struct {
typ *super.TypeUnion
values []Encoder
tags *Int64Encoder
tags Uint32Encoder
count uint32
}

Expand All @@ -25,15 +25,14 @@ func NewUnionEncoder(typ *super.TypeUnion) *UnionEncoder {
return &UnionEncoder{
typ: typ,
values: values,
tags: NewInt64Encoder(),
}
}

func (u *UnionEncoder) Write(body zcode.Bytes) {
u.count++
typ, zv := u.typ.Untag(body)
tag := u.typ.TagOf(typ)
u.tags.Write(int64(tag))
u.tags.Write(uint32(tag))
u.values[tag].Write(zv)
}

Expand All @@ -57,15 +56,15 @@ func (u *UnionEncoder) Encode(group *errgroup.Group) {
}

func (u *UnionEncoder) Metadata(off uint64) (uint64, Metadata) {
off, tags := u.tags.Metadata(off)
off, tags := u.tags.Segment(off)
values := make([]Metadata, 0, len(u.values))
for _, val := range u.values {
var meta Metadata
off, meta = val.Metadata(off)
values = append(values, meta)
}
return off, &Union{
Tags: tags.(*Primitive).Location,
Tags: tags,
Values: values,
Length: u.count,
}
Expand Down
2 changes: 1 addition & 1 deletion vng/ztests/const.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ inputs:
outputs:
- name: stdout
data: |
{Version:5(uint32),MetaSize:35(uint64),DataSize:0(uint64)}
{Version:6(uint32),MetaSize:35(uint64),DataSize:0(uint64)}
{Value:1,Count:3(uint32)}(=Const)