Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Acker Node #483

Merged
merged 34 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1c57687
implement ticket queue
lovromazgon Jun 13, 2022
8bf3f66
experiment with ordered semaphore
lovromazgon Jun 14, 2022
424d889
ticketqueue benchmarks
lovromazgon Jun 14, 2022
8a852db
reduce allocations
lovromazgon Jun 17, 2022
ec7249e
remove ticketqueue (semaphore implementation is more performant)
lovromazgon Jun 21, 2022
b288e21
optimize semaphore for our use case
lovromazgon Jun 21, 2022
0471fbe
fix linter warnings, better benchmarks
lovromazgon Jun 21, 2022
83f8184
better docs
lovromazgon Jun 21, 2022
f4cfe81
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jun 21, 2022
83c97e0
go mod tidy
lovromazgon Jun 21, 2022
c0ded08
rename AckerNode to DestinationAckerNode
lovromazgon Jun 10, 2022
c66c273
remove message status change middleware to ensure all message handler…
lovromazgon Jun 22, 2022
68cd7c7
implement SourceAckerNode
lovromazgon Jun 22, 2022
6bdc893
add todo note about possible deadlock
lovromazgon Jun 23, 2022
8b6dc73
source acker node test
lovromazgon Jun 23, 2022
c6be9e8
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jun 28, 2022
94b4f43
don't forward acks after a failed ack/nack
lovromazgon Jun 28, 2022
7d3749b
Merge branch 'lovro/ticketqueue' into lovro/source-acker-node
lovromazgon Jun 28, 2022
3e283dd
use cerrors
lovromazgon Jun 28, 2022
aa5f17b
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jul 5, 2022
1bf9a7b
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jul 6, 2022
f43fe35
use cerrors.New
lovromazgon Jul 7, 2022
19d6123
Merge branch 'lovro/stability' into lovro/ticketqueue
lovromazgon Jul 7, 2022
c36b19a
Merge branch 'lovro/ticketqueue' into lovro/source-acker-node
lovromazgon Jul 7, 2022
befaf44
use LogOrReplace
lovromazgon Jul 7, 2022
64b7d25
improve benchmarks
lovromazgon Jul 11, 2022
24c3386
fix linter error
lovromazgon Jul 11, 2022
d4dd111
add comments
lovromazgon Jul 11, 2022
3ebb744
simplify implementation
lovromazgon Jul 11, 2022
b628a7b
Merge branch 'lovro/ticketqueue' into lovro/source-acker-node
lovromazgon Jul 11, 2022
dc31319
update semaphore
lovromazgon Jul 11, 2022
5a6e8a3
update param name
lovromazgon Jul 11, 2022
54a65d1
remove redundant if clause
lovromazgon Jul 11, 2022
a668a02
Merge branch 'lovro/stability' into lovro/source-acker-node
lovromazgon Jul 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pkg/foundation/semaphore/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ func (s *Simple) Release(t Ticket) error {
default:
}

if t.next != nil {
close(t.next)
}
close(t.next)
return nil
}
27 changes: 19 additions & 8 deletions pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ func (s *Service) buildProcessorNodes(
return nodes, nil
}

func (s *Service) buildSourceAckerNode(
src connector.Source,
) *stream.SourceAckerNode {
return &stream.SourceAckerNode{
Name: src.ID() + "-acker",
Source: src,
}
}

func (s *Service) buildSourceNodes(
ctx context.Context,
connFetcher ConnectorFetcher,
Expand All @@ -259,15 +268,17 @@ func (s *Service) buildSourceNodes(
pl.Config.Name,
),
}
ackerNode := s.buildSourceAckerNode(instance.(connector.Source))
ackerNode.Sub(sourceNode.Pub())
metricsNode := s.buildMetricsNode(pl, instance)
metricsNode.Sub(sourceNode.Pub())
metricsNode.Sub(ackerNode.Pub())

procNodes, err := s.buildProcessorNodes(ctx, procFetcher, pl, instance.Config().ProcessorIDs, metricsNode, next)
if err != nil {
return nil, cerrors.Errorf("could not build processor nodes for connector %s: %w", instance.ID(), err)
}

nodes = append(nodes, &sourceNode, metricsNode)
nodes = append(nodes, &sourceNode, ackerNode, metricsNode)
nodes = append(nodes, procNodes...)
}

Expand All @@ -288,10 +299,10 @@ func (s *Service) buildMetricsNode(
}
}

func (s *Service) buildAckerNode(
func (s *Service) buildDestinationAckerNode(
dest connector.Destination,
) *stream.AckerNode {
return &stream.AckerNode{
) *stream.DestinationAckerNode {
return &stream.DestinationAckerNode{
Name: dest.ID() + "-acker",
Destination: dest,
}
Expand All @@ -316,7 +327,7 @@ func (s *Service) buildDestinationNodes(
continue // skip any connector that's not a destination
}

ackerNode := s.buildAckerNode(instance.(connector.Destination))
ackerNode := s.buildDestinationAckerNode(instance.(connector.Destination))
destinationNode := stream.DestinationNode{
Name: instance.ID(),
Destination: instance.(connector.Destination),
Expand Down Expand Up @@ -358,7 +369,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
// If any of the nodes stops, the nodesTomb will be put into a dying state
// and ctx will be cancelled.
// This way, the other nodes will be notified that they need to stop too.
//nolint: staticcheck // nil used to use the default (parent provided via WithContext)
// nolint: staticcheck // nil used to use the default (parent provided via WithContext)
ctx := nodesTomb.Context(nil)
s.logger.Trace(ctx).Str(log.NodeIDField, node.ID()).Msg("running node")
defer func() {
Expand Down Expand Up @@ -406,7 +417,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
// before declaring the pipeline as stopped.
pl.t = &tomb.Tomb{}
pl.t.Go(func() error {
//nolint: staticcheck // nil used to use the default (parent provided via WithContext)
// nolint: staticcheck // nil used to use the default (parent provided via WithContext)
ctx := pl.t.Context(nil)
err := nodesTomb.Wait()

Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestServiceLifecycle_PipelineError(t *testing.T) {
// wait for pipeline to finish
err = pl.Wait()
assert.Error(t, err)
t.Log(err)

assert.Equal(t, StatusDegraded, pl.Status)
// pipeline errors contain only string messages, so we can only compare the errors by the messages
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/stream/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type DestinationNode struct {
Destination connector.Destination
ConnectorTimer metrics.Timer
// AckerNode is responsible for handling acks
AckerNode *AckerNode
AckerNode *DestinationAckerNode

base subNodeBase
logger log.CtxLogger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
"github.com/conduitio/conduit/pkg/record"
)

// AckerNode is responsible for handling acknowledgments received from the
// destination and forwarding them to the correct message.
type AckerNode struct {
// DestinationAckerNode is responsible for handling acknowledgments received
// from the destination and forwarding them to the correct message.
type DestinationAckerNode struct {
Name string
Destination connector.Destination

Expand All @@ -49,22 +49,22 @@ type AckerNode struct {
stopOnce sync.Once
}

// init initializes AckerNode internal fields.
func (n *AckerNode) init() {
// init initializes DestinationAckerNode internal fields.
func (n *DestinationAckerNode) init() {
n.initOnce.Do(func() {
n.cache = &positionMessageMap{}
n.start = make(chan struct{})
n.stop = make(chan struct{})
})
}

func (n *AckerNode) ID() string {
func (n *DestinationAckerNode) ID() string {
return n.Name
}

// Run continuously fetches acks from the destination and forwards them to the
// correct message by calling Ack or Nack on that message.
func (n *AckerNode) Run(ctx context.Context) (err error) {
func (n *DestinationAckerNode) Run(ctx context.Context) (err error) {
n.logger.Trace(ctx).Msg("starting acker node")
defer n.logger.Trace(ctx).Msg("acker node stopped")

Expand Down Expand Up @@ -113,6 +113,11 @@ func (n *AckerNode) Run(ctx context.Context) (err error) {
continue
}

// TODO make sure acks are called in the right order or this will block
// forever. Right now we rely on connectors sending acks back in the
// correct order and this should generally be true, but we can't be
// completely sure and a badly written connector shouldn't provoke a
// deadlock.
err = n.handleAck(msg, err)
if err != nil {
return err
Expand All @@ -122,7 +127,7 @@ func (n *AckerNode) Run(ctx context.Context) (err error) {

// teardown will drop all messages still in the cache and return an error in
// case there were still unprocessed messages in the cache.
func (n *AckerNode) teardown() error {
func (n *DestinationAckerNode) teardown() error {
var dropped int
n.cache.Range(func(pos record.Position, msg *Message) bool {
msg.Drop()
Expand All @@ -138,7 +143,7 @@ func (n *AckerNode) teardown() error {
// handleAck either acks or nacks the message, depending on the supplied error.
// If the nacking or acking fails, the message is dropped and the error is
// returned.
func (n *AckerNode) handleAck(msg *Message, err error) error {
func (n *DestinationAckerNode) handleAck(msg *Message, err error) error {
switch {
case err != nil:
n.logger.Trace(msg.Ctx).Err(err).Msg("nacking message")
Expand All @@ -160,7 +165,7 @@ func (n *AckerNode) handleAck(msg *Message, err error) error {

// ExpectAck makes the handler aware of the message and signals to it that an
// ack for this message might be received at some point.
func (n *AckerNode) ExpectAck(msg *Message) error {
func (n *DestinationAckerNode) ExpectAck(msg *Message) error {
// happens only once to signal Run that the destination is ready to be used.
n.startOnce.Do(func() {
n.init()
Expand All @@ -185,7 +190,7 @@ func (n *AckerNode) ExpectAck(msg *Message) error {
// ForgetAndDrop signals the handler that an ack for this message won't be
// received, and it should remove it from its cache. In case an ack for this
// message wasn't yet received it drops the message, otherwise it does nothing.
func (n *AckerNode) ForgetAndDrop(msg *Message) {
func (n *DestinationAckerNode) ForgetAndDrop(msg *Message) {
_, ok := n.cache.LoadAndDelete(msg.Record.Position)
if !ok {
// message wasn't found in the cache, looks like the message was already
Expand All @@ -197,8 +202,9 @@ func (n *AckerNode) ForgetAndDrop(msg *Message) {

// Wait can be used to wait for the count of outstanding acks to drop to 0 or
// the context gets canceled. Wait is expected to be the last function called on
// AckerNode, after Wait returns AckerNode will soon stop running.
func (n *AckerNode) Wait(ctx context.Context) {
// DestinationAckerNode, after Wait returns DestinationAckerNode will soon stop
// running.
func (n *DestinationAckerNode) Wait(ctx context.Context) {
// happens only once to signal that the destination is stopping
n.stopOnce.Do(func() {
n.init()
Expand Down Expand Up @@ -227,7 +233,7 @@ func (n *AckerNode) Wait(ctx context.Context) {
}

// SetLogger sets the logger.
func (n *AckerNode) SetLogger(logger log.CtxLogger) {
func (n *DestinationAckerNode) SetLogger(logger log.CtxLogger) {
n.logger = logger
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestAckerNode_Run_StopAfterWait(t *testing.T) {
ctrl := gomock.NewController(t)
dest := mock.NewDestination(ctrl)

node := &AckerNode{
node := &DestinationAckerNode{
Name: "acker-node",
Destination: dest,
}
Expand All @@ -44,9 +44,6 @@ func TestAckerNode_Run_StopAfterWait(t *testing.T) {
is.NoErr(err)
}()

// give Go a chance to run the node
time.Sleep(time.Millisecond)

// note that there should be no calls to the destination at all if we didn't
// receive any ExpectedAck call

Expand All @@ -69,7 +66,7 @@ func TestAckerNode_Run_StopAfterExpectAck(t *testing.T) {
ctrl := gomock.NewController(t)
dest := mock.NewDestination(ctrl)

node := &AckerNode{
node := &DestinationAckerNode{
Name: "acker-node",
Destination: dest,
}
Expand All @@ -81,9 +78,6 @@ func TestAckerNode_Run_StopAfterExpectAck(t *testing.T) {
is.NoErr(err)
}()

// give Go a chance to run the node
time.Sleep(time.Millisecond)

// up to this point there should have been no calls to the destination
// only after the call to ExpectAck should the node try to fetch any acks
msg := &Message{
Expand Down
27 changes: 8 additions & 19 deletions pkg/pipeline/stream/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,38 +146,27 @@ func (n *FanoutNode) Run(ctx context.Context) error {
// wrapAckHandler modifies the ack handler, so it's called with the original
// message received by FanoutNode instead of the new message created by
// FanoutNode.
func (n *FanoutNode) wrapAckHandler(origMsg *Message, f AckHandler) AckMiddleware {
return func(newMsg *Message, next AckHandler) error {
err := f(origMsg)
if err != nil {
return err
}
// next handler is called again with new message
return next(newMsg)
func (n *FanoutNode) wrapAckHandler(origMsg *Message, f AckHandler) AckHandler {
return func(_ *Message) error {
return f(origMsg)
}
}

// wrapNackHandler modifies the nack handler, so it's called with the original
// message received by FanoutNode instead of the new message created by
// FanoutNode.
func (n *FanoutNode) wrapNackHandler(origMsg *Message, f NackHandler) NackMiddleware {
return func(newMsg *Message, reason error, next NackHandler) error {
err := f(origMsg, reason)
if err != nil {
return err
}
// next handler is called again with new message
return next(newMsg, err)
func (n *FanoutNode) wrapNackHandler(origMsg *Message, f NackHandler) NackHandler {
return func(_ *Message, reason error) error {
return f(origMsg, reason)
}
}

// wrapDropHandler modifies the drop handler, so it's called with the original
// message received by FanoutNode instead of the new message created by
// FanoutNode.
func (n *FanoutNode) wrapDropHandler(origMsg *Message, f DropHandler) DropMiddleware {
return func(newMsg *Message, reason error, next DropHandler) {
func (n *FanoutNode) wrapDropHandler(origMsg *Message, f DropHandler) DropHandler {
return func(_ *Message, reason error) {
f(origMsg, reason)
next(newMsg, reason)
}
}

Expand Down
Loading