From bb428672a00ac36b3baff2c6fb695875e41846ac Mon Sep 17 00:00:00 2001 From: sergiu128 Date: Thu, 21 Sep 2023 18:51:34 +0200 Subject: [PATCH] MirroredBuffer: cleanup tests --- bytes/mirrored_buffer_test.go | 360 +++++++++++++++------------------- 1 file changed, 156 insertions(+), 204 deletions(-) diff --git a/bytes/mirrored_buffer_test.go b/bytes/mirrored_buffer_test.go index d313c86d..8ffcd34f 100644 --- a/bytes/mirrored_buffer_test.go +++ b/bytes/mirrored_buffer_test.go @@ -6,6 +6,7 @@ import ( "log" "math/rand" "runtime" + "sync" "syscall" "testing" "time" @@ -79,9 +80,9 @@ func TestMirroredBufferSize(t *testing.T) { } } -func TestMirroredBuffer1(t *testing.T) { - size := syscall.Getpagesize() - buf, err := NewMirroredBuffer(size, true) +func TestMirroredBufferEnsureMirroring(t *testing.T) { + pageSize := syscall.Getpagesize() + buf, err := NewMirroredBuffer(pageSize, true) if err != nil { t.Fatal(err) } @@ -91,133 +92,48 @@ func TestMirroredBuffer1(t *testing.T) { } }() - for i := 0; i < 2*size; i++ { + for i := 0; i < 2*pageSize; i++ { if buf.slice[i] != 0 { t.Fatal("buffer should be zeroed") } } - for i := 0; i < size; i++ { + // Set the left mapping to 42. This change should be reflected in the right + // mapping. + for i := 0; i < pageSize; i++ { buf.slice[i] = 42 } - if buf.slice[0] != buf.slice[size] { - t.Fatal("buffer should mirror the first byte") - } - - for i := 0; i < size; i++ { - if buf.slice[i] != buf.slice[size+i] { + for i := 0; i < pageSize; i++ { + if buf.slice[i] != buf.slice[pageSize+i] { t.Fatal("buffer is not mirrored") } } } -func TestMirroredBuffer2(t *testing.T) { - size := syscall.Getpagesize() - buf, err := NewMirroredBuffer(size, true) +func TestMirroredBufferClaim(t *testing.T) { + buf, err := NewMirroredBuffer(syscall.Getpagesize(), false) if err != nil { t.Fatal(err) } - - var ( - v byte = 0 - chunk = size / 64 - b []byte - ) - { - b = buf.Claim(1) - if len(b) != 1 { - t.Fatal("should have claimed 1") - } - b[0] = v - if buf.Commit(1) != 1 { - t.Fatal("should have committed 1") - } - v++ - } - { - for { - b = buf.Claim(chunk) - if b == nil { - t.Fatal("claimed slice should not be nil") - } - if len(b) != chunk { - break - } - - for i := range b { - b[i] = v - } - buf.Commit(chunk) - v++ - } - if buf.FreeSpace() != chunk-1 { - t.Fatal("wrong free space") - } - if buf.head > buf.tail { - t.Fatal("buffer should not wrap") - } - } - { - if buf.Consume(1) != 1 { - t.Fatal("should have consumed 1") - } - if buf.FreeSpace() != chunk { - t.Fatalf("should have %d bytes available", chunk) - } - } - { - b = buf.Claim(chunk) - if len(b) != chunk { - t.Fatalf("should have claimed %d", chunk) - } - if buf.FreeSpace() != chunk { - t.Fatalf("should have %d bytes available", chunk) - } - for i := range b { - b[i] = v - } - if buf.Commit(chunk) != chunk { - t.Fatalf("should have committed %d", chunk) - } - if buf.FreeSpace() != 0 { - t.Fatal("should have no free space") - } - if buf.head != buf.tail { - t.Fatal("buffer should be wrapped and full") - } - } - { - b = buf.slice[:buf.size] - if len(b) == 0 { - t.Fatal("should not be zero") + defer func() { + if err := buf.Destroy(); err != nil { + t.Fatal(err) } + }() - var ( - offset = buf.head - expectedValue byte = 1 - ) - for k := 0; k < size/chunk; k++ { - slice := b[offset+k*chunk:] - slice = slice[:chunk] - for i := range slice { - if slice[i] != expectedValue { - t.Fatal("buffer is in wrong state") - } - } - expectedValue++ - } - if b[0] != expectedValue-1 { - t.Fatal("invalid head") - } + b := buf.Claim(buf.Size() - 1) + for i := range b { + b[i] = 1 } + buf.Commit(len(b)) - if err := buf.Destroy(); err != nil { - t.Fatal(err) + if len(buf.Claim(2)) != 1 { + t.Fatal("invalid claim") } } -func TestMirroredBuffer3(t *testing.T) { +func TestMirroredBufferWritesOnMirrorBoundary(t *testing.T) { buf, err := NewMirroredBuffer(syscall.Getpagesize(), true) if err != nil { t.Fatal(err) @@ -228,48 +144,52 @@ func TestMirroredBuffer3(t *testing.T) { } }() - n := buf.Size() - 1 - b := buf.Claim(n) - if len(b) != n { - t.Fatal("invalid claim") - } - for i := range b { - b[i] = 1 - } - if buf.Commit(n) != n { - t.Fatal("invalid commit") - } + // empty claim such that the next one can cross the mirror boundary + b := buf.Claim(buf.Size() - 1) + buf.Commit(len(b)) + buf.Consume(1) - if buf.Consume(10) != 10 { - t.Fatal("invalid consume") + if !(buf.head == 1 && buf.tail == buf.Size()-1) { + t.Fatal("invalid state") } - // This write will cross the mirror: 1 byte taken from the end, 5 from the - // beginning. - b = buf.Claim(6) - if len(b) != 6 { + // this claim crosses the mirror boundary + b = buf.Claim(2) + if len(b) != 2 { t.Fatal("invalid claim") } - for i := range b { - b[i] = 2 + b[0], b[1] = 1, 1 + buf.Commit(2) + + // assert that we wrapped + if !(buf.head == 1 && buf.tail == 1) { + t.Fatal("invalid state") } - if buf.Commit(6) != 6 { - t.Fatal("invalid commit") + + // assert left part + if !(buf.slice[0] == 1 && buf.slice[buf.size-1] == 1) { + t.Fatal("invalid state") + } + for i := 1; i < buf.size-1; i++ { + if buf.slice[i] != 0 { + t.Fatal("invalid state") + } } - b = buf.slice[:buf.size] - if b[len(b)-1] != 2 { - t.Fatal("wrong tail") + // assert right part + if !(buf.slice[buf.size] == 1 && buf.slice[2*buf.size-1] == 1) { + t.Fatal("invalid state") } - for i := 0; i < 5; i++ { - if b[i] != 2 { - t.Fatal("wrong head") + for i := buf.size + 1; i < 2*buf.size-1; i++ { + if buf.slice[i] != 0 { + t.Fatal("invalid state") } } } -func TestMirroredBufferRandom(t *testing.T) { - buf, err := NewMirroredBuffer(syscall.Getpagesize(), true) +func TestMirroredBufferRandomClaimCommitConsume(t *testing.T) { + pageSize := syscall.Getpagesize() + buf, err := NewMirroredBuffer(pageSize, true) if err != nil { t.Fatal(err) } @@ -280,50 +200,59 @@ func TestMirroredBufferRandom(t *testing.T) { }() var ( - wrapped = 0 - rand = rand.New(rand.NewSource(time.Now().UnixNano())) - k = 0 + rand = rand.New(rand.NewSource(time.Now().UnixNano())) + wrapCount = 0 + iteration = 0 ) - for wrapped == 0 || k < 1024*128 { - n := rand.Intn(syscall.Getpagesize()) - b := buf.Claim(n) + for wrapCount == 0 || iteration < pageSize*128 { + claim := rand.Intn(pageSize) + b := buf.Claim(claim) for i := range b { - b[i] = byte(k % 127) + b[i] = byte(iteration % 127) } - buf.Commit(n) + buf.Commit(claim) - if !buf.Full() && buf.head > buf.tail { - wrapped++ + if buf.head >= buf.tail && buf.used > 0 { + wrapCount++ } - buf.Consume(n) - k++ + buf.Consume(claim) + iteration++ } - if wrapped == 0 { + if wrapCount == 0 { t.Fatal("should have wrapped") } } -func TestMirroredBufferGC(t *testing.T) { - size := syscall.Getpagesize() - - buf, err := NewMirroredBuffer(size, false) +func TestMirroredBufferDoesNotGetGarbageCollected(t *testing.T) { + pageSize := syscall.Getpagesize() + buf, err := NewMirroredBuffer(pageSize, false) if err != nil { t.Fatal(err) } + defer func() { + if err := buf.Destroy(); err != nil { + t.Fatalf("could not destroy buffer err=%v", err) + } + }() - for k := 0; k < size/7*4; k++ { + iterations := pageSize / 7 * 4 + for k := 0; k < iterations; k++ { runtime.GC() - buf.Claim(7) + b := buf.Claim(7) + for i := range b { + b[i] = 42 + } buf.Commit(7) buf.Consume(7) } + // ensure the garbage collector has run each iteration var memstats runtime.MemStats runtime.ReadMemStats(&memstats) - if memstats.NumGC != uint32(size/7*4) { + if memstats.NumGC != uint32(iterations) { t.Fatal("did not GC") } @@ -331,52 +260,53 @@ func TestMirroredBufferGC(t *testing.T) { t.Fatal("buffer should be empty") } - b := buf.Claim(128) - for i := 0; i < 128; i++ { - b[i] = 42 - } - buf.Commit(128) - - if err := buf.Destroy(); err != nil { - t.Fatal("buffer should be destroyed") + for i := range buf.slice { + if buf.slice[i] != 42 { + t.Fatal("invalid state") + } } } -func TestMirroredBufferSizes(t *testing.T) { +func TestMirroredBufferAllocateDifferentSizes(t *testing.T) { var ( pageSize = syscall.Getpagesize() err error - size = pageSize - k = 1 + pages = 1 buf *MirroredBuffer ) - log.Printf("pagesize=%d", pageSize) + const ( + MaxSize = 1024 * 1024 * 1024 // 1GB + MaxPages = 1024 * 512 + ) + + log.Printf("page_size=%d", pageSize) - for err == nil && k < 1024*512 && size*k < 1024*1024*1024 /* 1GB */ { - buf, err = NewMirroredBuffer(size*k, false) + for err == nil && pages < MaxPages && pageSize*pages < MaxSize { + buf, err = NewMirroredBuffer(pageSize*pages, false) buf.Destroy() - k++ + pages++ } + if err != nil { log.Printf( - "allocated %d pages of size=%d (%s) then err=%v", - k, - size, + "could not allocate a buffer of size=%s page_size=%d pages=%d err=%v", + util.ByteCountSI(int64(pages*pageSize)), + pageSize, + pages, err, - int64(k*size), ) } else { log.Printf( - "allocated %d pages of size=%d (%s)", - k, - size, - util.ByteCountSI(int64(k*size)), + "largest buffer allocated was of size=%s page_size=%d pages=%d", + util.ByteCountSI(int64(pages*pageSize)), + pageSize, + pages, ) } } -func TestMirroredBufferHugeSize(t *testing.T) { +func TestMirroredBufferAllocateHugeSize(t *testing.T) { // We should be good mapping huge amounts of memory due to on-demand paging // on Linux/BSD. As long as we don't prefault or write to the entire // buffer... Still you should not make a buffer this big in production - the @@ -400,29 +330,50 @@ func TestMirroredBufferHugeSize(t *testing.T) { b[0] = 128 } -func TestMirroredBufferMultiple(t *testing.T) { - var buffers []*MirroredBuffer - for i := 0; i < 8; i++ { - buf, err := NewMirroredBuffer(syscall.Getpagesize(), true) - if err != nil { - t.Fatal(err) - } - buffers = append(buffers, buf) - } +func TestMirroredBufferAllocateMultipleBuffersConcurrently(t *testing.T) { + var ( + buffers []*MirroredBuffer + values []byte + lck sync.Mutex + wg sync.WaitGroup + ) - for k, buf := range buffers { - b := buf.Claim(8) - for i := 0; i < 8; i++ { - b[i] = byte(k + 1) - } - buf.Commit(8) + const NBuffers = 64 + + for k := 0; k < NBuffers; k++ { + value := byte(k) + wg.Add(1) + go func() { + defer wg.Done() + + // increase the chance of concurrent allocation + time.Sleep(time.Millisecond) + + buf, err := NewMirroredBuffer(syscall.Getpagesize(), true) + if err != nil { + t.Error(err) + } + + b := buf.Claim(buf.Size()) + for i := range b { + b[i] = value + } + buf.Commit(buf.Size()) + + lck.Lock() + defer lck.Unlock() + buffers = append(buffers, buf) + values = append(values, value) + }() } - for k, buf := range buffers { - b := buf.slice[:buf.size] - for i := 0; i < 8; i++ { - if b[i] != byte(k+1) { - t.Fatal("invalid buffer") + wg.Wait() + + for k := 0; k < NBuffers; k++ { + b := buffers[k].slice + for i := range b { + if b[i] != values[k] { + t.Fatalf("buffer %d is invalid", k) } } } @@ -561,7 +512,8 @@ func BenchmarkMirroredBuffer(b *testing.B) { // type and the consume them. var ( - toCopy = make([]byte, 1024*128) // what we copy in the claimed part + size = syscall.Getpagesize() * 32 + toCopy = make([]byte, size) // what we copy in the claimed part // The number of messages to decode after a Claim+memcpy nMessages = []int{1, 2, 4, 8, 16, 32, 64, 128, 256, 512} @@ -573,7 +525,7 @@ func BenchmarkMirroredBuffer(b *testing.B) { b.Run( fmt.Sprintf( "byte_buffer_%s_%d", - util.ByteCountSI(int64(len(toCopy))), + util.ByteCountSI(int64(size)), nMessage, ), func(b *testing.B) { @@ -609,7 +561,7 @@ func BenchmarkMirroredBuffer(b *testing.B) { b.Run( fmt.Sprintf( "mirrored_buffer_%s_%d", - util.ByteCountSI(int64(len(toCopy))), + util.ByteCountSI(int64(size)), chunk, ), func(b *testing.B) { @@ -625,7 +577,7 @@ func BenchmarkMirroredBuffer(b *testing.B) { sum := 0 for i := 0; i < b.N; i++ { - b := buf.Claim(len(toCopy)) + b := buf.Claim(size) copy(b, toCopy) for buf.UsedSpace() > 0 { buf.Commit(consume)