Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make reader and writer buffers configurable #2014

Merged
merged 2 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions base.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func (db *baseDB) ExecContext(c context.Context, query interface{}, params ...in
}

func (db *baseDB) exec(ctx context.Context, query interface{}, params ...interface{}) (Result, error) {
wb := pool.GetWriteBuffer()
defer pool.PutWriteBuffer(wb)
wb := db.pool.GetWriteBuffer()
defer db.pool.PutWriteBuffer(wb)

if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
return nil, err
Expand Down Expand Up @@ -297,8 +297,8 @@ func (db *baseDB) QueryContext(c context.Context, model, query interface{}, para
}

func (db *baseDB) query(ctx context.Context, model, query interface{}, params ...interface{}) (Result, error) {
wb := pool.GetWriteBuffer()
defer pool.PutWriteBuffer(wb)
wb := db.pool.GetWriteBuffer()
defer db.pool.PutWriteBuffer(wb)

if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
return nil, err
Expand Down Expand Up @@ -374,8 +374,8 @@ func (db *baseDB) copyFrom(
) (res Result, err error) {
var evt *QueryEvent

wb := pool.GetWriteBuffer()
defer pool.PutWriteBuffer(wb)
wb := db.pool.GetWriteBuffer()
defer db.pool.PutWriteBuffer(wb)

if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
return nil, err
Expand Down Expand Up @@ -456,8 +456,8 @@ func (db *baseDB) copyTo(
) (res Result, err error) {
var evt *QueryEvent

wb := pool.GetWriteBuffer()
defer pool.PutWriteBuffer(wb)
wb := db.pool.GetWriteBuffer()
defer db.pool.PutWriteBuffer(wb)

if err := writeQueryMsg(wb, db.fmter, query, params...); err != nil {
return nil, err
Expand Down
21 changes: 18 additions & 3 deletions base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

/*
The test is for testing the case that sending a cancel request when the timeout from connection comes earlier than ctx.Done().
The test is for testing the case that sending a cancel request when the timeout from connection comes earlier than ctx.Done().
*/
func Test_baseDB_withConn(t *testing.T) {
b := mockBaseDB{}
Expand Down Expand Up @@ -44,9 +44,10 @@ type mockPooler struct {
}

func (m *mockPooler) NewConn(ctx context.Context) (*pool.Conn, error) {
m.conn = &pool.Conn{ProcessID: 123, SecretKey: 234, Inited: true}
m.mockConn = mockConn{}
m.conn.SetNetConn(&m.mockConn)
m.conn = pool.NewConn(&m.mockConn, pool.NewConnPool(&pool.Options{}))
m.conn.ProcessID = 123
m.conn.SecretKey = 234
return m.conn, nil
}

Expand Down Expand Up @@ -83,6 +84,20 @@ func (m *mockPooler) Close() error {
return nil
}

func (m *mockPooler) GetWriteBuffer() *pool.WriteBuffer {
return pool.NewWriteBuffer(1024)
}

func (m *mockPooler) PutWriteBuffer(_ *pool.WriteBuffer) {
}

func (m *mockPooler) GetReaderContext() *pool.ReaderContext {
return pool.NewReaderContext(1024)
}

func (m *mockPooler) PutReaderContext(_ *pool.ReaderContext) {
}

type mockPGError struct {
M map[byte]string
}
Expand Down
14 changes: 8 additions & 6 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var noDeadline = time.Time{}
type Conn struct {
netConn net.Conn
rd *ReaderContext
pool *ConnPool

ProcessID int32
SecretKey int32
Expand All @@ -24,9 +25,10 @@ type Conn struct {
Inited bool
}

func NewConn(netConn net.Conn) *Conn {
func NewConn(netConn net.Conn, pool *ConnPool) *Conn {
cn := &Conn{
createdAt: time.Now(),
pool: pool,
}
cn.SetNetConn(netConn)
cn.SetUsedAt(time.Now())
Expand Down Expand Up @@ -57,7 +59,7 @@ func (cn *Conn) LockReader() {
if cn.rd != nil {
panic("not reached")
}
cn.rd = NewReaderContext()
cn.rd = NewReaderContext(cn.pool.opt.ReadBufferInitialSize)
cn.rd.Reset(cn.netConn)
}

Expand All @@ -79,8 +81,8 @@ func (cn *Conn) WithReader(

rd := cn.rd
if rd == nil {
rd = GetReaderContext()
defer PutReaderContext(rd)
rd = cn.pool.GetReaderContext()
defer cn.pool.PutReaderContext(rd)

rd.Reset(cn.netConn)
}
Expand All @@ -97,8 +99,8 @@ func (cn *Conn) WithReader(
func (cn *Conn) WithWriter(
ctx context.Context, timeout time.Duration, fn func(wb *WriteBuffer) error,
) error {
wb := GetWriteBuffer()
defer PutWriteBuffer(wb)
wb := cn.pool.GetWriteBuffer()
defer cn.pool.PutWriteBuffer(wb)

if err := fn(wb); err != nil {
return err
Expand Down
53 changes: 46 additions & 7 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Pooler interface {
Get(context.Context) (*Conn, error)
Put(context.Context, *Conn)
Remove(context.Context, *Conn, error)
GetWriteBuffer() *WriteBuffer
PutWriteBuffer(*WriteBuffer)
GetReaderContext() *ReaderContext
PutReaderContext(*ReaderContext)

Len() int
IdleLen() int
Expand All @@ -54,12 +58,14 @@ type Options struct {
Dialer func(context.Context) (net.Conn, error)
OnClose func(*Conn) error

PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
PoolSize int
MinIdleConns int
ReadBufferInitialSize int
WriteBufferInitialSize int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
}

type ConnPool struct {
Expand All @@ -82,6 +88,9 @@ type ConnPool struct {

poolSize int
idleConnsLen int

wbPool sync.Pool
rbPool sync.Pool
}

var _ Pooler = (*ConnPool)(nil)
Expand All @@ -93,6 +102,16 @@ func NewConnPool(opt *Options) *ConnPool {
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize),
wbPool: sync.Pool{
New: func() interface{} {
return NewWriteBuffer(opt.WriteBufferInitialSize)
},
},
rbPool: sync.Pool{
New: func() interface{} {
return NewReaderContext(opt.ReadBufferInitialSize)
},
},
}

p.connsMu.Lock()
Expand Down Expand Up @@ -182,7 +201,7 @@ func (p *ConnPool) dialConn(c context.Context, pooled bool) (*Conn, error) {
return nil, err
}

cn := NewConn(netConn)
cn := NewConn(netConn, p)
cn.pooled = pooled
return cn, nil
}
Expand Down Expand Up @@ -504,3 +523,23 @@ func (p *ConnPool) isStaleConn(cn *Conn) bool {

return false
}

func (p *ConnPool) GetWriteBuffer() *WriteBuffer {
wb := p.wbPool.Get().(*WriteBuffer)
return wb
}

func (p *ConnPool) PutWriteBuffer(wb *WriteBuffer) {
wb.Reset()
p.wbPool.Put(wb)
}

func (p *ConnPool) GetReaderContext() *ReaderContext {
rd := p.rbPool.Get().(*ReaderContext)
return rd
}

func (p *ConnPool) PutReaderContext(rd *ReaderContext) {
rd.ColumnAlloc.Reset()
p.rbPool.Put(rd)
}
16 changes: 16 additions & 0 deletions internal/pool/pool_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,19 @@ func (p *SingleConnPool) IdleLen() int {
func (p *SingleConnPool) Stats() *Stats {
return &Stats{}
}

func (p *SingleConnPool) GetWriteBuffer() *WriteBuffer {
return p.pool.GetWriteBuffer()
}

func (p *SingleConnPool) PutWriteBuffer(wb *WriteBuffer) {
p.pool.PutWriteBuffer(wb)
}

func (p *SingleConnPool) GetReaderContext() *ReaderContext {
return p.pool.GetReaderContext()
}

func (p *SingleConnPool) PutReaderContext(rd *ReaderContext) {
p.pool.PutReaderContext(rd)
}
16 changes: 16 additions & 0 deletions internal/pool/pool_sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,19 @@ func (p *StickyConnPool) IdleLen() int {
func (p *StickyConnPool) Stats() *Stats {
return &Stats{}
}

func (p *StickyConnPool) GetWriteBuffer() *WriteBuffer {
return p.pool.GetWriteBuffer()
}

func (p *StickyConnPool) PutWriteBuffer(wb *WriteBuffer) {
p.pool.PutWriteBuffer(wb)
}

func (p *StickyConnPool) GetReaderContext() *ReaderContext {
return p.pool.GetReaderContext()
}

func (p *StickyConnPool) PutReaderContext(rd *ReaderContext) {
p.pool.PutReaderContext(rd)
}
23 changes: 1 addition & 22 deletions internal/pool/reader.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package pool

import (
"sync"
)

type Reader interface {
Buffered() int

Expand Down Expand Up @@ -55,26 +51,9 @@ type ReaderContext struct {
ColumnAlloc *ColumnAlloc
}

func NewReaderContext() *ReaderContext {
const bufSize = 1 << 20 // 1mb
func NewReaderContext(bufSize int) *ReaderContext {
return &ReaderContext{
BufReader: NewBufReader(bufSize),
ColumnAlloc: NewColumnAlloc(),
}
}

var readerPool = sync.Pool{
New: func() interface{} {
return NewReaderContext()
},
}

func GetReaderContext() *ReaderContext {
rd := readerPool.Get().(*ReaderContext)
return rd
}

func PutReaderContext(rd *ReaderContext) {
rd.ColumnAlloc.Reset()
readerPool.Put(rd)
}
23 changes: 2 additions & 21 deletions internal/pool/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,18 @@ package pool
import (
"encoding/binary"
"io"
"sync"
)

const defaultBufSize = 65 << 10 // 65kb

var wbPool = sync.Pool{
New: func() interface{} {
return NewWriteBuffer()
},
}

func GetWriteBuffer() *WriteBuffer {
wb := wbPool.Get().(*WriteBuffer)
return wb
}

func PutWriteBuffer(wb *WriteBuffer) {
wb.Reset()
wbPool.Put(wb)
}

type WriteBuffer struct {
Bytes []byte

msgStart int
paramStart int
}

func NewWriteBuffer() *WriteBuffer {
func NewWriteBuffer(bufSize int) *WriteBuffer {
return &WriteBuffer{
Bytes: make([]byte, 0, defaultBufSize),
Bytes: make([]byte, 0, bufSize),
}
}

Expand Down
30 changes: 24 additions & 6 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ type Options struct {
// but idle connections are still discarded by the client
// if IdleTimeout is set.
IdleCheckFrequency time.Duration
// Connections read buffers stored in a sync.Pool to reduce allocations.
// Using this option you can adjust the initial size of the buffer.
// Default is 1 Mb.
ReadBufferInitialSize int
// Connections write buffers stored in a sync.Pool to reduce allocations.
// Using this option you can adjust the initial size of the buffer.
// Default is 64 Kb.
WriteBufferInitialSize int
}

func (opt *Options) init() {
Expand Down Expand Up @@ -164,6 +172,14 @@ func (opt *Options) init() {
case 0:
opt.MaxRetryBackoff = 4 * time.Second
}

if opt.ReadBufferInitialSize == 0 {
opt.ReadBufferInitialSize = 1048576 // 1Mb
}

if opt.WriteBufferInitialSize == 0 {
opt.WriteBufferInitialSize = 65536 // 64Kb
}
}

func env(key, defValue string) string {
Expand Down Expand Up @@ -318,11 +334,13 @@ func newConnPool(opt *Options) *pool.ConnPool {
Dialer: opt.getDialer(),
OnClose: terminateConn,

PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
ReadBufferInitialSize: opt.ReadBufferInitialSize,
WriteBufferInitialSize: opt.WriteBufferInitialSize,
})
}
Loading
Loading