Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read performance #42

Merged
merged 5 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func TestZeroDequeFrequency(t *testing.T) {
// 999 is evicted automatically, because tail entry in slru has frequency 1
// but 999 frequency is 0
// so increase 999 frequency
for i := 0; i < 128; i++ {
for i := 0; i < 1280; i++ {
_, ok := client.Get(999)
require.False(t, ok)
}
Expand Down
167 changes: 167 additions & 0 deletions internal/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright (c) 2024 Yiling-J. All rights reserved.
// Copyright (c) 2023 Alexey Mayshev. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"runtime"
"sync/atomic"
"unsafe"

"github.com/Yiling-J/theine-go/internal/xruntime"
)

const (
// The maximum number of elements per buffer.
capacity = 16
mask = uint64(capacity - 1)
)

func castToPointer[K comparable, V any](ptr unsafe.Pointer) *ReadBufItem[K, V] {
return (*ReadBufItem[K, V])(ptr)
}

// PolicyBuffers is the set of buffers returned by the lossy buffer.
type PolicyBuffers[K comparable, V any] struct {
Returned []ReadBufItem[K, V]
}

// Buffer is a circular ring buffer stores the elements being transferred by the producers to the consumer.
// The monotonically increasing count of reads and writes allow indexing sequentially to the next
// element location based upon a power-of-two sizing.
//
// The producers race to read the counts, check if there is available capacity, and if so then try
// once to CAS to the next write count. If the increment is successful then the producer lazily
// publishes the element. The producer does not retry or block when unsuccessful due to a failed
// CAS or the buffer being full.
//
// The consumer reads the counts and takes the available elements. The clearing of the elements
// and the next read count are lazily set.
//
// This implementation is striped to further increase concurrency.
type Buffer[K comparable, V any] struct {
head atomic.Uint64
// headPadding
_ [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
tail atomic.Uint64
// tailPadding
_ [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
returned unsafe.Pointer
// returnedPadding
_ [xruntime.CacheLineSize - 8]byte
policyBuffers unsafe.Pointer
// returnedSlicePadding
_ [xruntime.CacheLineSize - 8]byte
buffer [capacity]unsafe.Pointer
}

// New creates a new lossy Buffer.
func NewBuffer[K comparable, V any]() *Buffer[K, V] {
pb := &PolicyBuffers[K, V]{
Returned: make([]ReadBufItem[K, V], 0, capacity),
}
b := &Buffer[K, V]{
policyBuffers: unsafe.Pointer(pb),
}
b.returned = b.policyBuffers
return b
}

// Add lazily publishes the item to the consumer.
//
// item may be lost due to contention.
func (b *Buffer[K, V]) Add(n ReadBufItem[K, V]) *PolicyBuffers[K, V] {
head := b.head.Load()
tail := b.tail.Load()
size := tail - head
if size >= capacity {
// full buffer
return nil
}
if b.tail.CompareAndSwap(tail, tail+1) {
// success
index := int(tail & mask)
atomic.StorePointer(&b.buffer[index], unsafe.Pointer(&ReadBufItem[K, V]{
entry: n.entry,
hash: n.hash,
}))
if size == capacity-1 {
// try return new buffer
if !atomic.CompareAndSwapPointer(&b.returned, b.policyBuffers, nil) {
// somebody already get buffer
return nil
}

pb := (*PolicyBuffers[K, V])(b.policyBuffers)
for i := 0; i < capacity; i++ {
index := int(head & mask)
v := atomic.LoadPointer(&b.buffer[index])
if v != nil {
// published
pb.Returned = append(pb.Returned, *castToPointer[K, V](v))
// release
atomic.StorePointer(&b.buffer[index], nil)
}
head++
}

b.head.Store(head)
return pb
}
}

// failed
return nil
}

// Load all items in buffer, used in test only to update policy proactive proactively
func (b *Buffer[K, V]) items() []ReadBufItem[K, V] {
head := b.head.Load()
returned := []ReadBufItem[K, V]{}
// try return new buffer
for _, pt := range b.buffer {
v := atomic.LoadPointer(&pt)
if v != nil {
returned = append(returned, *castToPointer[K, V](v))
}
head++
}

return returned
}

// Free returns the processed buffer back and also clears it.
func (b *Buffer[K, V]) Free() {
pb := (*PolicyBuffers[K, V])(b.policyBuffers)
for i := 0; i < len(pb.Returned); i++ {
pb.Returned[i].entry = nil
pb.Returned[i].hash = 0
}
pb.Returned = pb.Returned[:0]
atomic.StorePointer(&b.returned, b.policyBuffers)
}

// Clear clears the lossy Buffer and returns it to the default state.
func (b *Buffer[K, V]) Clear() {
for !atomic.CompareAndSwapPointer(&b.returned, b.policyBuffers, nil) {
runtime.Gosched()
}
for i := 0; i < capacity; i++ {
atomic.StorePointer(&b.buffer[i], nil)
}
b.Free()
b.tail.Store(0)
b.head.Store(0)
}
6 changes: 4 additions & 2 deletions internal/clock/clock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package clock

import "time"
import (
"time"
)

type Clock struct {
Start time.Time
Expand All @@ -15,5 +17,5 @@ func (c *Clock) ExpireNano(ttl time.Duration) int64 {
}

func (c *Clock) SetStart(ts int64) {
c.Start = time.Unix(0, ts).UTC()
c.Start = time.Unix(0, ts)
}
116 changes: 116 additions & 0 deletions internal/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2024 Yiling-J
// Copyright 2024 Andrei Pechkurov

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

// http://www.apache.org/licenses/LICENSE-2.0

// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"sync"
"sync/atomic"

"github.com/Yiling-J/theine-go/internal/xruntime"
)

// pool for P tokens
var ptokenPool sync.Pool

// a P token is used to point at the current OS thread (P)
// on which the goroutine is run; exact identity of the thread,
// as well as P migration tolerance, is not important since
// it's used to as a best effort mechanism for assigning
// concurrent operations (goroutines) to different stripes of
// the counter
type ptoken struct {
idx uint32
//lint:ignore U1000 prevents false sharing
pad [xruntime.CacheLineSize - 4]byte
}

// A Counter is a striped int64 counter.
//
// Should be preferred over a single atomically updated int64
// counter in high contention scenarios.
//
// A Counter must not be copied after first use.
type Counter struct {
stripes []cstripe
mask uint32
}

type cstripe struct {
c int64
//lint:ignore U1000 prevents false sharing
pad [xruntime.CacheLineSize - 8]byte
}

// NewCounter creates a new Counter instance.
func NewCounter() *Counter {
nstripes := RoundUpPowerOf2(xruntime.Parallelism())
c := Counter{
stripes: make([]cstripe, nstripes),
mask: nstripes - 1,
}
return &c
}

// Inc increments the counter by 1.
func (c *Counter) Inc() {
c.Add(1)
}

// Dec decrements the counter by 1.
func (c *Counter) Dec() {
c.Add(-1)
}

// Add adds the delta to the counter.
func (c *Counter) Add(delta int64) {
t, ok := ptokenPool.Get().(*ptoken)
if !ok {
t = new(ptoken)
t.idx = xruntime.Fastrand()
}
for {
stripe := &c.stripes[t.idx&c.mask]
cnt := atomic.LoadInt64(&stripe.c)
if atomic.CompareAndSwapInt64(&stripe.c, cnt, cnt+delta) {
break
}
// Give a try with another randomly selected stripe.
t.idx = xruntime.Fastrand()
}
ptokenPool.Put(t)
}

// Value returns the current counter value.
// The returned value may not include all of the latest operations in
// presence of concurrent modifications of the counter.
func (c *Counter) Value() int64 {
v := int64(0)
for i := 0; i < len(c.stripes); i++ {
stripe := &c.stripes[i]
v += atomic.LoadInt64(&stripe.c)
}
return v
}

// Reset resets the counter to zero.
// This method should only be used when it is known that there are
// no concurrent modifications of the counter.
func (c *Counter) Reset() {
for i := 0; i < len(c.stripes); i++ {
stripe := &c.stripes[i]
atomic.StoreInt64(&stripe.c, 0)
}
}
74 changes: 0 additions & 74 deletions internal/mpsc.go

This file was deleted.

Loading
Loading