Skip to content

Commit

Permalink
changes for v0.1.2
Browse files Browse the repository at this point in the history
* Fixed bug with mixing headers on map buckets rebalancing;
* Added internal option to disable write batching;
* Fixed bug in file requests decoding;
* Downgrade grpc library version;
* All http/2 setting frame options considiration;
* Fixed bug in GOAWAY frame processing;
* Decreased memory allocation for each connection;
* More stream lifecycles capturing;
* Fixed conflict with C# grpc server.
  • Loading branch information
rapthead committed Aug 16, 2024
1 parent e820110 commit 5898e1c
Show file tree
Hide file tree
Showing 50 changed files with 1,352 additions and 601 deletions.
1 change: 1 addition & 0 deletions benchmarks/dumb-server/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
debug
37 changes: 0 additions & 37 deletions benchmarks/dumb-server/debug/main.go

This file was deleted.

8 changes: 4 additions & 4 deletions benchmarks/dumb-server/pb/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions benchmarks/dumb-server/pb/api_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion benchmarks/jmeter-java-dsl/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ services:
resources:
limits:
cpus: '2'
memory: 3G
memory: 2G
2 changes: 1 addition & 1 deletion cmd/framer/cmd_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *ConvertCommand) Run(ctx context.Context) error {

var r8n reflection.DynamicMessagesStore
if c.ReflectionAddr != "" {
conn, err := grpc.NewClient(
conn, err := grpc.Dial(
c.ReflectionAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUserAgent("framer"),
Expand Down
7 changes: 4 additions & 3 deletions cmd/framer/cmd_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,18 +132,18 @@ func (c *LoadCommand) Run(

g, ctx := errgroup.WithContext(ctx)

timeout := consts.DefaultTimeout
var reporter types.Reporter = supersimpleReporter.New(timeout)
var reporter types.Reporter = supersimpleReporter.New()
if c.Phout != "" {
f, err := os.Create(c.Phout)
if err != nil {
return fmt.Errorf("creating phout file(%s): %w", c.Phout, err)
}
phoutReporter := phoutReporter.New(f, timeout)
phoutReporter := phoutReporter.New(f)
reporter = multi.NewMutli(phoutReporter, reporter)
}
g.Go(reporter.Run)

timeout := consts.DefaultTimeout
loaders := make([]*loader.Loader, clients)
for i := 0; i < clients; i++ {
conn, err := createConn(ctx, timeout, addr)
Expand All @@ -154,6 +154,7 @@ func (c *LoadCommand) Run(
conn,
reporter,
timeout,
false,
log,
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/framer/main_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func BenchmarkE2E(b *testing.B) {
conn,
reporter,
consts.DefaultTimeout,
false,
zaptest.NewLogger(b),
)
if err != nil {
Expand Down Expand Up @@ -129,6 +130,7 @@ func BenchmarkE2EInMemDatasource(b *testing.B) {
conn,
reporter,
consts.DefaultTimeout,
false,
zaptest.NewLogger(b),
)
a.NoError(err)
Expand Down
7 changes: 5 additions & 2 deletions consts/consts.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package consts

import "time"
import (
"math"
"time"
)

const (
ChunksBufferSize = 2048
RecieveBufferSize = 2048
SendBatchTimeout = time.Millisecond
RecieveBatchTimeout = time.Millisecond

DefaultInitialWindowSize = 65_535
DefaultTimeout = 11 * time.Second
DefaultMaxFrameSize = 16384 // Максимальная длина пейлоада фрейма в grpc. У http2 ограничение больше.
DefaultMaxHeaderListSize = math.MaxUint32
)
52 changes: 52 additions & 0 deletions datasource/decoder/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package decoder

import (
"bytes"
"fmt"

"github.com/ozontech/framer/utils/lru"
)

type Decoder struct {
metaUnmarshaler *SafeMultiValDecoder

tagsLRU *lru.LRU
methodsLRU *lru.LRU
}

func NewDecoder() *Decoder {
return &Decoder{
metaUnmarshaler: NewSafeMultiValDecoder(),

tagsLRU: lru.New(1024),
methodsLRU: lru.New(1024),
}
}

func (decoder *Decoder) Unmarshal(d *Data, b []byte) error {
d.Reset()

tagB, b := nextLine(b)
d.Tag = decoder.tagsLRU.GetOrAdd(tagB)

methodB, b := nextLine(b)
d.Method = decoder.methodsLRU.GetOrAdd(methodB)

metaBytes, b := nextLine(b)
var err error
d.Metadata, err = decoder.metaUnmarshaler.UnmarshalAppend(d.Metadata, metaBytes)
if err != nil {
return fmt.Errorf("meta unmarshal error: %w", err)
}

d.Message = b
return nil
}

func nextLine(in []byte) ([]byte, []byte) {
index := bytes.IndexByte(in, '\n')
if index == -1 {
return []byte{}, []byte{}
}
return in[:index], in[index+1:]
}
47 changes: 47 additions & 0 deletions datasource/decoder/jsonkv_safe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package decoder

import (
"github.com/mailru/easyjson/jlexer"
"github.com/ozontech/framer/utils/lru"
)

const (
kLRUSize = 1 << 10
vLRUSize = 1 << 16
)

type SafeMultiValDecoder struct {
kLRU *lru.LRU
vLRU *lru.LRU
}

func NewSafeMultiValDecoder() *SafeMultiValDecoder {
return &SafeMultiValDecoder{
lru.New(kLRUSize),
lru.New(vLRUSize),
}
}

func (d *SafeMultiValDecoder) UnmarshalAppend(buf []Meta, bytes []byte) ([]Meta, error) {
in := jlexer.Lexer{Data: bytes}

in.Delim('{')
for !in.IsDelim('}') {
key := d.kLRU.GetOrAdd(in.UnsafeBytes())
in.WantColon()

in.Delim('[')
for !in.IsDelim(']') {
val := d.kLRU.GetOrAdd(in.UnsafeBytes())
buf = append(buf, Meta{Name: key, Value: val})
in.WantComma()
}
in.Delim(']')

in.WantComma()
}
in.Delim('}')
in.Consumed()

return buf, in.Error()
}
20 changes: 20 additions & 0 deletions datasource/decoder/model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package decoder

type Meta struct {
Name string
Value string
}

type Data struct {
Tag string
Method string // "/" {service name} "/" {method name}
Metadata []Meta
Message []byte
}

func (d *Data) Reset() {
d.Tag = ""
d.Method = ""
d.Metadata = d.Metadata[:0]
d.Message = d.Message[:0]
}
17 changes: 11 additions & 6 deletions datasource/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,27 @@ import (
"io"
"sync"

"github.com/ozontech/framer/datasource/decoder"
"github.com/ozontech/framer/formats/grpc/ozon/binary"
"github.com/ozontech/framer/formats/model"
"github.com/ozontech/framer/loader/types"
"github.com/ozontech/framer/utils/pool"
)

type FileDataSource struct {
format *model.InputFormat
reader model.PooledRequestReader
decoder *decoder.Decoder

pool *pool.SlicePool[*fileRequest]
factory *RequestAdapterFactory
mu sync.Mutex
}

func NewFileDataSource(r io.Reader, factoryOptions ...Option) *FileDataSource {
ds := &FileDataSource{
binary.NewInput(r),
binary.NewInput(r).Reader,
decoder.NewDecoder(),

pool.NewSlicePoolSize[*fileRequest](100),
NewRequestAdapterFactory(factoryOptions...),
sync.Mutex{},
Expand All @@ -32,25 +37,25 @@ func (ds *FileDataSource) Fetch() (types.Req, error) {
r, ok := ds.pool.Acquire()
if !ok {
r = &fileRequest{
bytesPool: ds.format.Reader,
bytesPool: ds.reader,
pool: ds.pool,
RequestAdapter: ds.factory.Build(),
}
}

var err error
ds.mu.Lock()
r.bytes, err = ds.format.Reader.ReadNext()
r.bytes, err = ds.reader.ReadNext()
ds.mu.Unlock()
if err != nil {
return nil, fmt.Errorf("read next request: %w", err)
}

return r, ds.format.Decoder.Unmarshal(&r.data, r.bytes)
return r, ds.decoder.Unmarshal(&r.data, r.bytes)
}

type fileRequest struct {
bytesPool model.PooledRequestReder
bytesPool model.PooledRequestReader
bytes []byte
pool *pool.SlicePool[*fileRequest]
*RequestAdapter
Expand Down
Loading

0 comments on commit 5898e1c

Please sign in to comment.