Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Acker Node #483

Merged
merged 34 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1c57687
implement ticket queue
lovromazgon Jun 13, 2022
8bf3f66
experiment with ordered semaphore
lovromazgon Jun 14, 2022
424d889
ticketqueue benchmarks
lovromazgon Jun 14, 2022
8a852db
reduce allocations
lovromazgon Jun 17, 2022
ec7249e
remove ticketqueue (semaphore implementation is more performant)
lovromazgon Jun 21, 2022
b288e21
optimize semaphore for our use case
lovromazgon Jun 21, 2022
0471fbe
fix linter warnings, better benchmarks
lovromazgon Jun 21, 2022
83f8184
better docs
lovromazgon Jun 21, 2022
f4cfe81
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jun 21, 2022
83c97e0
go mod tidy
lovromazgon Jun 21, 2022
c0ded08
rename AckerNode to DestinationAckerNode
lovromazgon Jun 10, 2022
c66c273
remove message status change middleware to ensure all message handler…
lovromazgon Jun 22, 2022
68cd7c7
implement SourceAckerNode
lovromazgon Jun 22, 2022
6bdc893
add todo note about possible deadlock
lovromazgon Jun 23, 2022
8b6dc73
source acker node test
lovromazgon Jun 23, 2022
c6be9e8
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jun 28, 2022
94b4f43
don't forward acks after a failed ack/nack
lovromazgon Jun 28, 2022
7d3749b
Merge branch 'lovro/ticketqueue' into lovro/source-acker-node
lovromazgon Jun 28, 2022
3e283dd
use cerrors
lovromazgon Jun 28, 2022
aa5f17b
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jul 5, 2022
1bf9a7b
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jul 6, 2022
f43fe35
use cerrors.New
lovromazgon Jul 7, 2022
19d6123
Merge branch 'lovro/stability' into lovro/ticketqueue
lovromazgon Jul 7, 2022
c36b19a
Merge branch 'lovro/ticketqueue' into lovro/source-acker-node
lovromazgon Jul 7, 2022
befaf44
use LogOrReplace
lovromazgon Jul 7, 2022
64b7d25
improve benchmarks
lovromazgon Jul 11, 2022
24c3386
fix linter error
lovromazgon Jul 11, 2022
d4dd111
add comments
lovromazgon Jul 11, 2022
3ebb744
simplify implementation
lovromazgon Jul 11, 2022
b628a7b
Merge branch 'lovro/ticketqueue' into lovro/source-acker-node
lovromazgon Jul 11, 2022
dc31319
update semaphore
lovromazgon Jul 11, 2022
5a6e8a3
update param name
lovromazgon Jul 11, 2022
54a65d1
remove redundant if clause
lovromazgon Jul 11, 2022
a668a02
Merge branch 'lovro/stability' into lovro/source-acker-node
lovromazgon Jul 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions pkg/foundation/semaphore/semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package semaphore

import (
"sync"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
)

// Simple provides a way to bound concurrent access to a resource. It only
// allows one caller to gain access at a time.
type Simple struct {
waiters []waiter
front int
batch int64
acquired bool
released int
mu sync.Mutex
}

type waiter struct {
index int
ready chan struct{} // Closed when semaphore acquired.

released bool
acquired bool
}

// Ticket reserves a place in the queue and can be used to acquire access to a
// resource.
type Ticket struct {
index int
batch int64
}

// Enqueue reserves the next place in the queue and returns a Ticket used to
// acquire access to the resource when it's the callers turn. The Ticket has to
// be supplied to Release before discarding.
func (s *Simple) Enqueue() Ticket {
s.mu.Lock()
defer s.mu.Unlock()

index := len(s.waiters)
w := waiter{index: index, ready: make(chan struct{})}
s.waiters = append(s.waiters, w)

return Ticket{
index: index,
batch: s.batch,
}
}

// Acquire acquires the semaphore, blocking until resources are available. On
// success, returns nil. On failure, returns an error and leaves the semaphore
// unchanged.
func (s *Simple) Acquire(t Ticket) error {
s.mu.Lock()
if s.batch != t.batch {
s.mu.Unlock()
return cerrors.Errorf("semaphore: invalid batch")
}

w := s.waiters[t.index]
if w.acquired {
return cerrors.New("semaphore: can't acquire ticket that was already acquired")
}

w.acquired = true // mark that Acquire was already called for this Ticket
s.waiters[t.index] = w

if s.front == t.index && !s.acquired {
s.front++
s.acquired = true
s.mu.Unlock()
return nil
}
s.mu.Unlock()

<-w.ready
return nil
}

// Release releases the semaphore and notifies the next in line if any.
// If the ticket was already released the function returns an error. After the
// ticket is released it should be discarded.
func (s *Simple) Release(t Ticket) error {
s.mu.Lock()
defer s.mu.Unlock()

if s.batch != t.batch {
return cerrors.Errorf("semaphore: invalid batch")
}
w := s.waiters[t.index]
if !w.acquired {
return cerrors.New("semaphore: can't release ticket that was not acquired")
}
if w.released {
return cerrors.New("semaphore: ticket already released")
}

w.released = true
s.waiters[t.index] = w
s.acquired = false
s.released++
s.notifyWaiter()
if s.released == len(s.waiters) {
s.increaseBatch()
}
return nil
}

func (s *Simple) notifyWaiter() {
if len(s.waiters) > s.front {
w := s.waiters[s.front]
s.acquired = true
s.front++
close(w.ready)
}
}

func (s *Simple) increaseBatch() {
s.waiters = s.waiters[:0]
s.batch++
s.front = 0
s.released = 0
}
67 changes: 67 additions & 0 deletions pkg/foundation/semaphore/semaphore_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package semaphore_test

import (
"container/list"
"fmt"
"testing"

"github.com/conduitio/conduit/pkg/foundation/semaphore"
)

func BenchmarkNewSem(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = &semaphore.Simple{}
}
}

func BenchmarkAcquireSem(b *testing.B) {
for _, N := range []int{1, 2, 8, 64, 128} {
b.Run(fmt.Sprintf("acquire-%d", N), func(b *testing.B) {
b.ResetTimer()
sem := &semaphore.Simple{}
for i := 0; i < b.N; i++ {
for j := 0; j < N; j++ {
t := sem.Enqueue()
_ = sem.Acquire(t)
_ = sem.Release(t)
}
}
})
}
}

func BenchmarkEnqueueReleaseSem(b *testing.B) {
for _, N := range []int{1, 2, 8, 64, 128} {
b.Run(fmt.Sprintf("enqueue/release-%d", N), func(b *testing.B) {
b.ResetTimer()
sem := &semaphore.Simple{}
tickets := list.New()
for i := 0; i < b.N; i++ {
tickets.Init()
for j := 0; j < N; j++ {
t := sem.Enqueue()
tickets.PushBack(t)
}
ticket := tickets.Front()
for ticket != nil {
_ = sem.Release(ticket.Value.(semaphore.Ticket))
ticket = ticket.Next()
}
}
})
}
}
Loading