diff --git a/pkg/foundation/semaphore/semaphore.go b/pkg/foundation/semaphore/semaphore.go index b950dcd44..e51cfd6b4 100644 --- a/pkg/foundation/semaphore/semaphore.go +++ b/pkg/foundation/semaphore/semaphore.go @@ -81,8 +81,6 @@ func (s *Simple) Release(t Ticket) error { default: } - if t.next != nil { - close(t.next) - } + close(t.next) return nil } diff --git a/pkg/pipeline/lifecycle.go b/pkg/pipeline/lifecycle.go index e7b8789c0..3c2e6f17f 100644 --- a/pkg/pipeline/lifecycle.go +++ b/pkg/pipeline/lifecycle.go @@ -233,6 +233,15 @@ func (s *Service) buildProcessorNodes( return nodes, nil } +func (s *Service) buildSourceAckerNode( + src connector.Source, +) *stream.SourceAckerNode { + return &stream.SourceAckerNode{ + Name: src.ID() + "-acker", + Source: src, + } +} + func (s *Service) buildSourceNodes( ctx context.Context, connFetcher ConnectorFetcher, @@ -259,15 +268,17 @@ func (s *Service) buildSourceNodes( pl.Config.Name, ), } + ackerNode := s.buildSourceAckerNode(instance.(connector.Source)) + ackerNode.Sub(sourceNode.Pub()) metricsNode := s.buildMetricsNode(pl, instance) - metricsNode.Sub(sourceNode.Pub()) + metricsNode.Sub(ackerNode.Pub()) procNodes, err := s.buildProcessorNodes(ctx, procFetcher, pl, instance.Config().ProcessorIDs, metricsNode, next) if err != nil { return nil, cerrors.Errorf("could not build processor nodes for connector %s: %w", instance.ID(), err) } - nodes = append(nodes, &sourceNode, metricsNode) + nodes = append(nodes, &sourceNode, ackerNode, metricsNode) nodes = append(nodes, procNodes...) } @@ -288,10 +299,10 @@ func (s *Service) buildMetricsNode( } } -func (s *Service) buildAckerNode( +func (s *Service) buildDestinationAckerNode( dest connector.Destination, -) *stream.AckerNode { - return &stream.AckerNode{ +) *stream.DestinationAckerNode { + return &stream.DestinationAckerNode{ Name: dest.ID() + "-acker", Destination: dest, } @@ -316,7 +327,7 @@ func (s *Service) buildDestinationNodes( continue // skip any connector that's not a destination } - ackerNode := s.buildAckerNode(instance.(connector.Destination)) + ackerNode := s.buildDestinationAckerNode(instance.(connector.Destination)) destinationNode := stream.DestinationNode{ Name: instance.ID(), Destination: instance.(connector.Destination), @@ -358,7 +369,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error { // If any of the nodes stops, the nodesTomb will be put into a dying state // and ctx will be cancelled. // This way, the other nodes will be notified that they need to stop too. - //nolint: staticcheck // nil used to use the default (parent provided via WithContext) + // nolint: staticcheck // nil used to use the default (parent provided via WithContext) ctx := nodesTomb.Context(nil) s.logger.Trace(ctx).Str(log.NodeIDField, node.ID()).Msg("running node") defer func() { @@ -406,7 +417,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error { // before declaring the pipeline as stopped. pl.t = &tomb.Tomb{} pl.t.Go(func() error { - //nolint: staticcheck // nil used to use the default (parent provided via WithContext) + // nolint: staticcheck // nil used to use the default (parent provided via WithContext) ctx := pl.t.Context(nil) err := nodesTomb.Wait() diff --git a/pkg/pipeline/lifecycle_test.go b/pkg/pipeline/lifecycle_test.go index c15b783fa..2424c3aa4 100644 --- a/pkg/pipeline/lifecycle_test.go +++ b/pkg/pipeline/lifecycle_test.go @@ -111,6 +111,7 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { // wait for pipeline to finish err = pl.Wait() assert.Error(t, err) + t.Log(err) assert.Equal(t, StatusDegraded, pl.Status) // pipeline errors contain only string messages, so we can only compare the errors by the messages diff --git a/pkg/pipeline/stream/destination.go b/pkg/pipeline/stream/destination.go index 467b97f7e..0c75f2179 100644 --- a/pkg/pipeline/stream/destination.go +++ b/pkg/pipeline/stream/destination.go @@ -30,7 +30,7 @@ type DestinationNode struct { Destination connector.Destination ConnectorTimer metrics.Timer // AckerNode is responsible for handling acks - AckerNode *AckerNode + AckerNode *DestinationAckerNode base subNodeBase logger log.CtxLogger diff --git a/pkg/pipeline/stream/acker.go b/pkg/pipeline/stream/destination_acker.go similarity index 87% rename from pkg/pipeline/stream/acker.go rename to pkg/pipeline/stream/destination_acker.go index 3f819605b..2e437d89c 100644 --- a/pkg/pipeline/stream/acker.go +++ b/pkg/pipeline/stream/destination_acker.go @@ -27,9 +27,9 @@ import ( "github.com/conduitio/conduit/pkg/record" ) -// AckerNode is responsible for handling acknowledgments received from the -// destination and forwarding them to the correct message. -type AckerNode struct { +// DestinationAckerNode is responsible for handling acknowledgments received +// from the destination and forwarding them to the correct message. +type DestinationAckerNode struct { Name string Destination connector.Destination @@ -49,8 +49,8 @@ type AckerNode struct { stopOnce sync.Once } -// init initializes AckerNode internal fields. -func (n *AckerNode) init() { +// init initializes DestinationAckerNode internal fields. +func (n *DestinationAckerNode) init() { n.initOnce.Do(func() { n.cache = &positionMessageMap{} n.start = make(chan struct{}) @@ -58,13 +58,13 @@ func (n *AckerNode) init() { }) } -func (n *AckerNode) ID() string { +func (n *DestinationAckerNode) ID() string { return n.Name } // Run continuously fetches acks from the destination and forwards them to the // correct message by calling Ack or Nack on that message. -func (n *AckerNode) Run(ctx context.Context) (err error) { +func (n *DestinationAckerNode) Run(ctx context.Context) (err error) { n.logger.Trace(ctx).Msg("starting acker node") defer n.logger.Trace(ctx).Msg("acker node stopped") @@ -113,6 +113,11 @@ func (n *AckerNode) Run(ctx context.Context) (err error) { continue } + // TODO make sure acks are called in the right order or this will block + // forever. Right now we rely on connectors sending acks back in the + // correct order and this should generally be true, but we can't be + // completely sure and a badly written connector shouldn't provoke a + // deadlock. err = n.handleAck(msg, err) if err != nil { return err @@ -122,7 +127,7 @@ func (n *AckerNode) Run(ctx context.Context) (err error) { // teardown will drop all messages still in the cache and return an error in // case there were still unprocessed messages in the cache. -func (n *AckerNode) teardown() error { +func (n *DestinationAckerNode) teardown() error { var dropped int n.cache.Range(func(pos record.Position, msg *Message) bool { msg.Drop() @@ -138,7 +143,7 @@ func (n *AckerNode) teardown() error { // handleAck either acks or nacks the message, depending on the supplied error. // If the nacking or acking fails, the message is dropped and the error is // returned. -func (n *AckerNode) handleAck(msg *Message, err error) error { +func (n *DestinationAckerNode) handleAck(msg *Message, err error) error { switch { case err != nil: n.logger.Trace(msg.Ctx).Err(err).Msg("nacking message") @@ -160,7 +165,7 @@ func (n *AckerNode) handleAck(msg *Message, err error) error { // ExpectAck makes the handler aware of the message and signals to it that an // ack for this message might be received at some point. -func (n *AckerNode) ExpectAck(msg *Message) error { +func (n *DestinationAckerNode) ExpectAck(msg *Message) error { // happens only once to signal Run that the destination is ready to be used. n.startOnce.Do(func() { n.init() @@ -185,7 +190,7 @@ func (n *AckerNode) ExpectAck(msg *Message) error { // ForgetAndDrop signals the handler that an ack for this message won't be // received, and it should remove it from its cache. In case an ack for this // message wasn't yet received it drops the message, otherwise it does nothing. -func (n *AckerNode) ForgetAndDrop(msg *Message) { +func (n *DestinationAckerNode) ForgetAndDrop(msg *Message) { _, ok := n.cache.LoadAndDelete(msg.Record.Position) if !ok { // message wasn't found in the cache, looks like the message was already @@ -197,8 +202,9 @@ func (n *AckerNode) ForgetAndDrop(msg *Message) { // Wait can be used to wait for the count of outstanding acks to drop to 0 or // the context gets canceled. Wait is expected to be the last function called on -// AckerNode, after Wait returns AckerNode will soon stop running. -func (n *AckerNode) Wait(ctx context.Context) { +// DestinationAckerNode, after Wait returns DestinationAckerNode will soon stop +// running. +func (n *DestinationAckerNode) Wait(ctx context.Context) { // happens only once to signal that the destination is stopping n.stopOnce.Do(func() { n.init() @@ -227,7 +233,7 @@ func (n *AckerNode) Wait(ctx context.Context) { } // SetLogger sets the logger. -func (n *AckerNode) SetLogger(logger log.CtxLogger) { +func (n *DestinationAckerNode) SetLogger(logger log.CtxLogger) { n.logger = logger } diff --git a/pkg/pipeline/stream/acker_test.go b/pkg/pipeline/stream/destination_acker_test.go similarity index 94% rename from pkg/pipeline/stream/acker_test.go rename to pkg/pipeline/stream/destination_acker_test.go index cc33e04c9..66dfe759e 100644 --- a/pkg/pipeline/stream/acker_test.go +++ b/pkg/pipeline/stream/destination_acker_test.go @@ -32,7 +32,7 @@ func TestAckerNode_Run_StopAfterWait(t *testing.T) { ctrl := gomock.NewController(t) dest := mock.NewDestination(ctrl) - node := &AckerNode{ + node := &DestinationAckerNode{ Name: "acker-node", Destination: dest, } @@ -44,9 +44,6 @@ func TestAckerNode_Run_StopAfterWait(t *testing.T) { is.NoErr(err) }() - // give Go a chance to run the node - time.Sleep(time.Millisecond) - // note that there should be no calls to the destination at all if we didn't // receive any ExpectedAck call @@ -69,7 +66,7 @@ func TestAckerNode_Run_StopAfterExpectAck(t *testing.T) { ctrl := gomock.NewController(t) dest := mock.NewDestination(ctrl) - node := &AckerNode{ + node := &DestinationAckerNode{ Name: "acker-node", Destination: dest, } @@ -81,9 +78,6 @@ func TestAckerNode_Run_StopAfterExpectAck(t *testing.T) { is.NoErr(err) }() - // give Go a chance to run the node - time.Sleep(time.Millisecond) - // up to this point there should have been no calls to the destination // only after the call to ExpectAck should the node try to fetch any acks msg := &Message{ diff --git a/pkg/pipeline/stream/fanout.go b/pkg/pipeline/stream/fanout.go index 6e71aed26..c6c2aa9ec 100644 --- a/pkg/pipeline/stream/fanout.go +++ b/pkg/pipeline/stream/fanout.go @@ -146,38 +146,27 @@ func (n *FanoutNode) Run(ctx context.Context) error { // wrapAckHandler modifies the ack handler, so it's called with the original // message received by FanoutNode instead of the new message created by // FanoutNode. -func (n *FanoutNode) wrapAckHandler(origMsg *Message, f AckHandler) AckMiddleware { - return func(newMsg *Message, next AckHandler) error { - err := f(origMsg) - if err != nil { - return err - } - // next handler is called again with new message - return next(newMsg) +func (n *FanoutNode) wrapAckHandler(origMsg *Message, f AckHandler) AckHandler { + return func(_ *Message) error { + return f(origMsg) } } // wrapNackHandler modifies the nack handler, so it's called with the original // message received by FanoutNode instead of the new message created by // FanoutNode. -func (n *FanoutNode) wrapNackHandler(origMsg *Message, f NackHandler) NackMiddleware { - return func(newMsg *Message, reason error, next NackHandler) error { - err := f(origMsg, reason) - if err != nil { - return err - } - // next handler is called again with new message - return next(newMsg, err) +func (n *FanoutNode) wrapNackHandler(origMsg *Message, f NackHandler) NackHandler { + return func(_ *Message, reason error) error { + return f(origMsg, reason) } } // wrapDropHandler modifies the drop handler, so it's called with the original // message received by FanoutNode instead of the new message created by // FanoutNode. -func (n *FanoutNode) wrapDropHandler(origMsg *Message, f DropHandler) DropMiddleware { - return func(newMsg *Message, reason error, next DropHandler) { +func (n *FanoutNode) wrapDropHandler(origMsg *Message, f DropHandler) DropHandler { + return func(_ *Message, reason error) { f(origMsg, reason) - next(newMsg, reason) } } diff --git a/pkg/pipeline/stream/message.go b/pkg/pipeline/stream/message.go index f8d833297..69be73212 100644 --- a/pkg/pipeline/stream/message.go +++ b/pkg/pipeline/stream/message.go @@ -22,6 +22,7 @@ import ( "sync" "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/multierror" "github.com/conduitio/conduit/pkg/record" ) @@ -84,45 +85,21 @@ type ( // a nack or drop. StatusChangeHandler func(*Message, StatusChange) error - // StatusChangeMiddleware can be registered on a message and will be executed in - // case of a status change (see StatusChangeHandler). Middlewares are called in - // the reverse order of how they were registered. - // The middleware has two options when processing a message status change: - // - If it successfully processed the status change it should call the next - // handler and return its error. The handler may inspect the error and act - // accordingly, but it must return that error (or another error that - // contains it). It must not return an error if the next handler was called - // and it returned nil. - // - If it failed to process the status change successfully it must not call - // the next handler but instead return an error right away. - // Applying these rules means each middleware can be sure that all middlewares - // before it processed the status change successfully. - StatusChangeMiddleware func(*Message, StatusChange, StatusChangeHandler) error - // AckHandler is a variation of the StatusChangeHandler that is only called // when a message is acked. For more info see StatusChangeHandler. AckHandler func(*Message) error - // AckMiddleware is a variation of the StatusChangeMiddleware that is only - // called when a message is acked. For more info see StatusChangeMiddleware. - AckMiddleware func(*Message, AckHandler) error // NackHandler is a variation of the StatusChangeHandler that is only called // when a message is nacked. For more info see StatusChangeHandler. NackHandler func(*Message, error) error - // NackMiddleware is a variation of the StatusChangeMiddleware that is only - // called when a message is nacked. For more info see StatusChangeMiddleware. - NackMiddleware func(*Message, error, NackHandler) error // DropHandler is a variation of the StatusChangeHandler that is only called // when a message is dropped. For more info see StatusChangeHandler. DropHandler func(*Message, error) - // DropMiddleware is a variation of the StatusChangeMiddleware that is only - // called when a message is dropped. For more info see StatusChangeMiddleware. - DropMiddleware func(*Message, error, DropHandler) ) -// StatusChange is passed to StatusChangeMiddleware and StatusChangeHandler when -// the status of a message changes. +// StatusChange is passed to StatusChangeHandler when the status of a message +// changes. type StatusChange struct { Old MessageStatus New MessageStatus @@ -150,9 +127,9 @@ func (m *Message) ID() string { // RegisterStatusHandler is used to register a function that will be called on // any status change of the message. This function can only be called if the -// message status is open, otherwise it panics. Middlewares are called in the +// message status is open, otherwise it panics. Handlers are called in the // reverse order of how they were registered. -func (m *Message) RegisterStatusHandler(mw StatusChangeMiddleware) { +func (m *Message) RegisterStatusHandler(h StatusChangeHandler) { m.init() m.handlerGuard.Lock() defer m.handlerGuard.Unlock() @@ -163,35 +140,34 @@ func (m *Message) RegisterStatusHandler(mw StatusChangeMiddleware) { next := m.handler m.handler = func(msg *Message, change StatusChange) error { - return mw(msg, change, next) + // all handlers are called and errors collected + err1 := h(msg, change) + err2 := next(msg, change) + return multierror.Append(err1, err2) } } // RegisterAckHandler is used to register a function that will be called when // the message is acked. This function can only be called if the message status // is open, otherwise it panics. -func (m *Message) RegisterAckHandler(mw AckMiddleware) { - m.RegisterStatusHandler(func(msg *Message, change StatusChange, next StatusChangeHandler) error { +func (m *Message) RegisterAckHandler(h AckHandler) { + m.RegisterStatusHandler(func(msg *Message, change StatusChange) error { if change.New != MessageStatusAcked { - return next(msg, change) + return nil // skip } - return mw(msg, func(msg *Message) error { - return next(msg, change) - }) + return h(msg) }) } // RegisterNackHandler is used to register a function that will be called when // the message is nacked. This function can only be called if the message status // is open, otherwise it panics. -func (m *Message) RegisterNackHandler(mw NackMiddleware) { - m.RegisterStatusHandler(func(msg *Message, change StatusChange, next StatusChangeHandler) error { +func (m *Message) RegisterNackHandler(h NackHandler) { + m.RegisterStatusHandler(func(msg *Message, change StatusChange) error { if change.New != MessageStatusNacked { - return next(msg, change) + return nil // skip } - return mw(msg, change.Reason, func(msg *Message, reason error) error { - return next(msg, change) - }) + return h(msg, change.Reason) }) m.hasNackHandler = true } @@ -199,17 +175,12 @@ func (m *Message) RegisterNackHandler(mw NackMiddleware) { // RegisterDropHandler is used to register a function that will be called when // the message is dropped. This function can only be called if the message // status is open, otherwise it panics. -func (m *Message) RegisterDropHandler(mw DropMiddleware) { - m.RegisterStatusHandler(func(msg *Message, change StatusChange, next StatusChangeHandler) error { +func (m *Message) RegisterDropHandler(h DropHandler) { + m.RegisterStatusHandler(func(msg *Message, change StatusChange) error { if change.New != MessageStatusDropped { - return next(msg, change) + return nil } - mw(msg, change.Reason, func(msg *Message, reason error) { - err := next(msg, change) - if err != nil { - panic(cerrors.Errorf("BUG: drop handlers should never return an error (message %s): %w", msg.ID(), err)) - } - }) + h(msg, change.Reason) return nil }) } diff --git a/pkg/pipeline/stream/message_test.go b/pkg/pipeline/stream/message_test.go index 322009c9e..12c35b2f5 100644 --- a/pkg/pipeline/stream/message_test.go +++ b/pkg/pipeline/stream/message_test.go @@ -47,7 +47,7 @@ func TestMessage_Ack_WithHandler(t *testing.T) { ackedMessageHandlerCallCount int ) - msg.RegisterAckHandler(func(*Message, AckHandler) error { + msg.RegisterAckHandler(func(*Message) error { ackedMessageHandlerCallCount++ return nil }) @@ -90,35 +90,34 @@ func TestMessage_Ack_WithFailingHandler(t *testing.T) { ) { - // first handler should never be called - msg.RegisterAckHandler(func(*Message, AckHandler) error { - t.Fatalf("did not expect first handler to be called") + // first handler should still be called + msg.RegisterAckHandler(func(*Message) error { + ackedMessageHandlerCallCount++ return nil }) // second handler fails - msg.RegisterAckHandler(func(*Message, AckHandler) error { + msg.RegisterAckHandler(func(*Message) error { return wantErr }) // third handler should work as expected - msg.RegisterAckHandler(func(msg *Message, next AckHandler) error { + msg.RegisterAckHandler(func(msg *Message) error { ackedMessageHandlerCallCount++ - return next(msg) + return nil }) // fourth handler should be called twice, once for ack, once for drop - msg.RegisterStatusHandler(func(msg *Message, change StatusChange, next StatusChangeHandler) error { + msg.RegisterStatusHandler(func(msg *Message, change StatusChange) error { statusMessageHandlerCallCount++ - return next(msg, change) + return nil }) // drop handler should be called after the ack fails - msg.RegisterDropHandler(func(msg *Message, reason error, next DropHandler) { - if ackedMessageHandlerCallCount != 1 { - t.Fatal("expected acked message handler to already be called") + msg.RegisterDropHandler(func(msg *Message, reason error) { + if ackedMessageHandlerCallCount != 2 { + t.Fatal("expected acked message handlers to already be called") } droppedMessageHandlerCallCount++ - next(msg, reason) }) // nack handler should not be called - msg.RegisterNackHandler(func(*Message, error, NackHandler) error { + msg.RegisterNackHandler(func(*Message, error) error { t.Fatalf("did not expect nack handler to be called") return nil }) @@ -131,8 +130,8 @@ func TestMessage_Ack_WithFailingHandler(t *testing.T) { t.Fatalf("ack expected error %v, got: %v", wantErr, err) } assertMessageIsDropped(t, &msg) - if ackedMessageHandlerCallCount != 1 { - t.Fatalf("expected acked message handler to be called once, got %d calls", ackedMessageHandlerCallCount) + if ackedMessageHandlerCallCount != 2 { + t.Fatalf("expected acked message handler to be called twice, got %d calls", ackedMessageHandlerCallCount) } if droppedMessageHandlerCallCount != 1 { t.Fatalf("expected dropped message handler to be called once, got %d calls", droppedMessageHandlerCallCount) @@ -186,12 +185,12 @@ func TestMessage_Nack_WithHandler(t *testing.T) { nackedMessageHandlerCallCount int ) - msg.RegisterNackHandler(func(msg *Message, err error, next NackHandler) error { + msg.RegisterNackHandler(func(msg *Message, err error) error { nackedMessageHandlerCallCount++ if err != wantErr { t.Fatalf("nacked message handler, expected err %v, got %v", wantErr, err) } - return next(msg, err) + return nil }) err := msg.Nack(wantErr) @@ -225,35 +224,34 @@ func TestMessage_Nack_WithFailingHandler(t *testing.T) { ) { - // first handler should never be called - msg.RegisterNackHandler(func(*Message, error, NackHandler) error { - t.Fatalf("did not expect first handler to be called") + // first handler should still be called + msg.RegisterNackHandler(func(*Message, error) error { + nackedMessageHandlerCallCount++ return nil }) // second handler fails - msg.RegisterNackHandler(func(*Message, error, NackHandler) error { + msg.RegisterNackHandler(func(*Message, error) error { return wantErr }) // third handler should work as expected - msg.RegisterNackHandler(func(msg *Message, reason error, next NackHandler) error { + msg.RegisterNackHandler(func(msg *Message, reason error) error { nackedMessageHandlerCallCount++ - return next(msg, reason) + return nil }) // fourth handler should be called twice, once for ack, once for drop - msg.RegisterStatusHandler(func(msg *Message, change StatusChange, next StatusChangeHandler) error { + msg.RegisterStatusHandler(func(msg *Message, change StatusChange) error { statusMessageHandlerCallCount++ - return next(msg, change) + return nil }) // drop handler should be called after the nack fails - msg.RegisterDropHandler(func(msg *Message, reason error, next DropHandler) { - if nackedMessageHandlerCallCount != 1 { - t.Fatal("expected nacked message handler to already be called") + msg.RegisterDropHandler(func(msg *Message, reason error) { + if nackedMessageHandlerCallCount != 2 { + t.Fatal("expected nacked message handlers to already be called") } droppedMessageHandlerCallCount++ - next(msg, reason) }) // ack handler should not be called - msg.RegisterAckHandler(func(*Message, AckHandler) error { + msg.RegisterAckHandler(func(*Message) error { t.Fatalf("did not expect ack handler to be called") return nil }) @@ -266,8 +264,8 @@ func TestMessage_Nack_WithFailingHandler(t *testing.T) { t.Fatalf("nack expected error %v, got: %v", wantErr, err) } assertMessageIsDropped(t, &msg) - if nackedMessageHandlerCallCount != 1 { - t.Fatalf("expected nacked message handler to be called once, got %d calls", nackedMessageHandlerCallCount) + if nackedMessageHandlerCallCount != 2 { + t.Fatalf("expected nacked message handler to be called twice, got %d calls", nackedMessageHandlerCallCount) } if droppedMessageHandlerCallCount != 1 { t.Fatalf("expected dropped message handler to be called once, got %d calls", droppedMessageHandlerCallCount) @@ -309,14 +307,13 @@ func TestMessage_Drop_WithHandler(t *testing.T) { ) { - msg.RegisterDropHandler(func(msg *Message, reason error, next DropHandler) { + msg.RegisterDropHandler(func(msg *Message, reason error) { droppedMessageHandlerCallCount++ - next(msg, reason) }) // second handler should be called once for drop - msg.RegisterStatusHandler(func(msg *Message, change StatusChange, next StatusChangeHandler) error { + msg.RegisterStatusHandler(func(msg *Message, change StatusChange) error { statusMessageHandlerCallCount++ - return next(msg, change) + return nil }) } @@ -337,7 +334,7 @@ func TestMessage_Drop_WithFailingHandler(t *testing.T) { var msg Message // handler return error for drop - msg.RegisterStatusHandler(func(msg *Message, change StatusChange, next StatusChangeHandler) error { + msg.RegisterStatusHandler(func(msg *Message, change StatusChange) error { return cerrors.New("oops") }) @@ -391,7 +388,7 @@ func TestMessage_StatusChangeTwice(t *testing.T) { t.Run("nacked message", func(t *testing.T) { var msg Message // need to register a nack handler for message to be nacked - msg.RegisterNackHandler(func(*Message, error, NackHandler) error { return nil }) + msg.RegisterNackHandler(func(*Message, error) error { return nil }) err := msg.Nack(nil) if err != nil { t.Fatalf("ack did not expect error, got %v", err) @@ -424,7 +421,7 @@ func TestMessage_RegisterHandlerFail(t *testing.T) { t.Fatalf("expected msg.RegisterAckHandler to panic") } }() - msg.RegisterAckHandler(func(*Message, AckHandler) error { return nil }) + msg.RegisterAckHandler(func(*Message) error { return nil }) } assertRegisterNackHandlerPanics := func(msg *Message) { defer func() { @@ -432,7 +429,7 @@ func TestMessage_RegisterHandlerFail(t *testing.T) { t.Fatalf("expected msg.RegisterNackHandler to panic") } }() - msg.RegisterNackHandler(func(*Message, error, NackHandler) error { return nil }) + msg.RegisterNackHandler(func(*Message, error) error { return nil }) } assertRegisterDropHandlerPanics := func(msg *Message) { defer func() { @@ -440,7 +437,7 @@ func TestMessage_RegisterHandlerFail(t *testing.T) { t.Fatalf("expected msg.RegisterDropHandler to panic") } }() - msg.RegisterDropHandler(func(*Message, error, DropHandler) {}) + msg.RegisterDropHandler(func(*Message, error) {}) } // registering a handler after the message is acked should panic @@ -459,7 +456,7 @@ func TestMessage_RegisterHandlerFail(t *testing.T) { t.Run("nacked message", func(t *testing.T) { var msg Message // need to register a nack handler for message to be nacked - msg.RegisterNackHandler(func(*Message, error, NackHandler) error { return nil }) + msg.RegisterNackHandler(func(*Message, error) error { return nil }) err := msg.Nack(nil) if err != nil { t.Fatalf("ack did not expect error, got %v", err) diff --git a/pkg/pipeline/stream/metrics.go b/pkg/pipeline/stream/metrics.go index 406c5801d..b6dd3bb13 100644 --- a/pkg/pipeline/stream/metrics.go +++ b/pkg/pipeline/stream/metrics.go @@ -46,7 +46,7 @@ func (n *MetricsNode) Run(ctx context.Context) error { return err } - msg.RegisterAckHandler(func(msg *Message, next AckHandler) error { + msg.RegisterAckHandler(func(msg *Message) error { // TODO for now we call method Bytes() on key and payload to get the // bytes representation. In case of a structured payload or key it // is marshaled into JSON, which might not be the correct way to @@ -60,7 +60,7 @@ func (n *MetricsNode) Run(ctx context.Context) error { bytes += len(msg.Record.Payload.Bytes()) } n.BytesHistogram.Observe(float64(bytes)) - return next(msg) + return nil }) err = n.base.Send(ctx, n.logger, msg) diff --git a/pkg/pipeline/stream/processor_test.go b/pkg/pipeline/stream/processor_test.go index 845c5dea7..cbe479981 100644 --- a/pkg/pipeline/stream/processor_test.go +++ b/pkg/pipeline/stream/processor_test.go @@ -143,9 +143,9 @@ func TestProcessorNode_ErrorWithNackHandler(t *testing.T) { out := n.Pub() msg := &Message{Ctx: ctx} - msg.RegisterNackHandler(func(msg *Message, err error, next NackHandler) error { + msg.RegisterNackHandler(func(msg *Message, err error) error { assert.True(t, cerrors.Is(err, wantErr), "expected underlying error to be the processor error") - return next(msg, err) // the error should be regarded as handled + return nil // the error should be regarded as handled }) go func() { // publisher @@ -186,11 +186,11 @@ func TestProcessorNode_Skip(t *testing.T) { // register a dummy AckHandler and NackHandler for tests. counter := 0 - msg.RegisterAckHandler(func(msg *Message, next AckHandler) error { + msg.RegisterAckHandler(func(msg *Message) error { counter++ return nil }) - msg.RegisterNackHandler(func(msg *Message, err error, next NackHandler) error { + msg.RegisterNackHandler(func(msg *Message, err error) error { // Our NackHandler shouldn't ever be hit if we're correctly skipping // so fail the test if we get here at all. t.Fail() diff --git a/pkg/pipeline/stream/source.go b/pkg/pipeline/stream/source.go index 86bd8a788..9c827c5fc 100644 --- a/pkg/pipeline/stream/source.go +++ b/pkg/pipeline/stream/source.go @@ -104,24 +104,13 @@ func (n *SourceNode) Run(ctx context.Context) (err error) { // register another open message wgOpenMessages.Add(1) msg.RegisterStatusHandler( - func(msg *Message, change StatusChange, next StatusChangeHandler) error { + func(msg *Message, change StatusChange) error { // this is the last handler to be executed, once this handler is // reached we know either the message was successfully acked, nacked // or dropped defer n.PipelineTimer.Update(time.Since(msg.Record.ReadAt)) defer wgOpenMessages.Done() - return next(msg, change) - }, - ) - - msg.RegisterAckHandler( - func(msg *Message, next AckHandler) error { - n.logger.Trace(msg.Ctx).Msg("forwarding ack to source connector") - err := n.Source.Ack(msg.Ctx, msg.Record.Position) - if err != nil { - return err - } - return next(msg) + return nil }, ) diff --git a/pkg/pipeline/stream/source_acker.go b/pkg/pipeline/stream/source_acker.go new file mode 100644 index 000000000..682151cf0 --- /dev/null +++ b/pkg/pipeline/stream/source_acker.go @@ -0,0 +1,140 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stream + +import ( + "context" + + "github.com/conduitio/conduit/pkg/connector" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/foundation/semaphore" +) + +// SourceAckerNode is responsible for handling acknowledgments for messages of +// a specific source and forwarding them to the source in the correct order. +type SourceAckerNode struct { + Name string + Source connector.Source + + base pubSubNodeBase + logger log.CtxLogger + + // sem ensures acks are sent to the source in the correct order and only one + // at a time + sem semaphore.Simple + // fail is set to true once the first ack/nack fails and we can't guarantee + // that acks will be delivered in the correct order to the source anymore, + // at that point we completely stop processing acks/nacks + fail bool +} + +func (n *SourceAckerNode) ID() string { + return n.Name +} + +func (n *SourceAckerNode) Run(ctx context.Context) error { + trigger, cleanup, err := n.base.Trigger(ctx, n.logger) + if err != nil { + return err + } + + defer cleanup() + for { + msg, err := trigger() + if err != nil || msg == nil { + return err + } + + // enqueue message in semaphore + ticket := n.sem.Enqueue() + n.registerAckHandler(msg, ticket) + n.registerNackHandler(msg, ticket) + + err = n.base.Send(ctx, n.logger, msg) + if err != nil { + msg.Drop() + return err + } + } +} + +func (n *SourceAckerNode) registerAckHandler(msg *Message, ticket semaphore.Ticket) { + msg.RegisterAckHandler( + func(msg *Message) (err error) { + defer func() { + if err != nil { + n.fail = true + } + tmpErr := n.sem.Release(ticket) + err = cerrors.LogOrReplace(err, tmpErr, func() { + n.logger.Err(msg.Ctx, tmpErr).Msg("error releasing semaphore ticket for ack") + }) + }() + n.logger.Trace(msg.Ctx).Msg("acquiring semaphore for ack") + n.sem.Acquire(ticket) + + if n.fail { + n.logger.Trace(msg.Ctx).Msg("blocking forwarding of ack to source connector, because another message failed to be acked/nacked") + return cerrors.Errorf("another message failed to be acked/nacked") + } + + n.logger.Trace(msg.Ctx).Msg("forwarding ack to source connector") + return n.Source.Ack(msg.Ctx, msg.Record.Position) + }, + ) +} + +func (n *SourceAckerNode) registerNackHandler(msg *Message, ticket semaphore.Ticket) { + msg.RegisterNackHandler( + func(msg *Message, reason error) (err error) { + defer func() { + if err != nil { + n.fail = true + } + tmpErr := n.sem.Release(ticket) + err = cerrors.LogOrReplace(err, tmpErr, func() { + n.logger.Err(msg.Ctx, tmpErr).Msg("error releasing semaphore ticket for nack") + }) + }() + n.logger.Trace(msg.Ctx).Msg("acquiring semaphore for nack") + n.sem.Acquire(ticket) + + if n.fail { + n.logger.Trace(msg.Ctx).Msg("blocking forwarding of nack to DLQ handler, because another message failed to be acked/nacked") + return cerrors.Errorf("another message failed to be acked/nacked") + } + + n.logger.Trace(msg.Ctx).Msg("forwarding nack to DLQ handler") + // TODO implement DLQ and call it here, right now any nacked message + // will just stop the pipeline because we don't support DLQs, + // don't forget to forward ack to source if the DLQ call succeeds + // https://github.com/ConduitIO/conduit/issues/306 + return cerrors.New("no DLQ handler configured") + }, + ) +} + +func (n *SourceAckerNode) Sub(in <-chan *Message) { + n.base.Sub(in) +} + +func (n *SourceAckerNode) Pub() <-chan *Message { + return n.base.Pub() +} + +func (n *SourceAckerNode) SetLogger(logger log.CtxLogger) { + n.logger = logger +} diff --git a/pkg/pipeline/stream/source_acker_test.go b/pkg/pipeline/stream/source_acker_test.go new file mode 100644 index 000000000..ad3d53385 --- /dev/null +++ b/pkg/pipeline/stream/source_acker_test.go @@ -0,0 +1,285 @@ +// Copyright © 2022 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stream + +import ( + "context" + "math/rand" + "strconv" + "sync" + "testing" + "time" + + "github.com/conduitio/conduit/pkg/connector" + "github.com/conduitio/conduit/pkg/connector/mock" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/record" + "github.com/golang/mock/gomock" + "github.com/matryer/is" +) + +func TestSourceAckerNode_ForwardAck(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + src := mock.NewSource(ctrl) + helper := sourceAckerNodeTestHelper{} + + _, in, out := helper.newSourceAckerNode(ctx, is, src) + + want := &Message{Ctx: ctx, Record: record.Record{Position: []byte("foo")}} + // expect to receive an ack in the source after the message is acked + src.EXPECT().Ack(want.Ctx, want.Record.Position).Return(nil) + + in <- want + got := <-out + is.Equal(got, want) + + // ack should be propagated to the source, the mock will do the assertion + err := got.Ack() + is.NoErr(err) + + // gracefully stop node and give the test 1 second to finish + close(in) + + waitCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + select { + case <-waitCtx.Done(): + is.Fail() // expected node to stop running + case <-out: + // all good + } +} + +func TestSourceAckerNode_AckOrder(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + src := mock.NewSource(ctrl) + helper := sourceAckerNodeTestHelper{} + + _, in, out := helper.newSourceAckerNode(ctx, is, src) + // send 1000 messages through the node + messages := helper.sendMessages(ctx, 1000, in, out) + // expect all messages to be acked + expectedCalls := helper.expectAcks(ctx, messages, src) + gomock.InOrder(expectedCalls...) // enforce order of acks + + // ack messages concurrently in random order, expect no errors + var wg sync.WaitGroup + helper.ackMessagesConcurrently( + &wg, + messages, + func(msg *Message, err error) { + is.NoErr(err) + }, + ) + + // gracefully stop node and give the test 1 second to finish + close(in) + + err := helper.wait(ctx, &wg, time.Second) + is.NoErr(err) // expected to receive acks in time +} + +func TestSourceAckerNode_FailedAck(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + src := mock.NewSource(ctrl) + helper := sourceAckerNodeTestHelper{} + + _, in, out := helper.newSourceAckerNode(ctx, is, src) + // send 1000 messages through the node + messages := helper.sendMessages(ctx, 1000, in, out) + // expect first 500 to be acked successfully + expectedCalls := helper.expectAcks(ctx, messages[:500], src) + gomock.InOrder(expectedCalls...) // enforce order of acks + // the 500th message should be acked unsuccessfully + wantErr := cerrors.New("test error") + src.EXPECT(). + Ack(ctx, messages[500].Record.Position). + Return(wantErr). + After(expectedCalls[len(expectedCalls)-1]) // should happen after last acked call + + // ack messages concurrently in random order, expect errors for second half + var wg sync.WaitGroup + helper.ackMessagesConcurrently(&wg, messages[:500], + func(msg *Message, err error) { + is.NoErr(err) // expected messages from the first half to be acked successfully + }, + ) + helper.ackMessagesConcurrently(&wg, messages[500:501], + func(msg *Message, err error) { + is.Equal(err, wantErr) // expected the middle message ack to fail with specific error + }, + ) + helper.ackMessagesConcurrently(&wg, messages[501:], + func(msg *Message, err error) { + is.True(err != nil) // expected messages from the second half to be acked unsuccessfully + is.True(err != wantErr) + }, + ) + + // gracefully stop node and give the test 1 second to finish + close(in) + + err := helper.wait(ctx, &wg, time.Second) + is.NoErr(err) // expected to receive acks in time +} + +func TestSourceAckerNode_FailedNack(t *testing.T) { + is := is.New(t) + ctx := context.Background() + ctrl := gomock.NewController(t) + src := mock.NewSource(ctrl) + helper := sourceAckerNodeTestHelper{} + + _, in, out := helper.newSourceAckerNode(ctx, is, src) + // send 1000 messages through the node + messages := helper.sendMessages(ctx, 1000, in, out) + // expect first 500 to be acked successfully + expectedCalls := helper.expectAcks(ctx, messages[:500], src) + gomock.InOrder(expectedCalls...) // enforce order of acks + // the 500th message will be nacked unsuccessfully, no more acks should be received after that + + // ack messages concurrently in random order + var wg sync.WaitGroup + helper.ackMessagesConcurrently(&wg, messages[:500], + func(msg *Message, err error) { + is.NoErr(err) // expected messages from the first half to be acked successfully + }, + ) + helper.ackMessagesConcurrently(&wg, messages[501:], + func(msg *Message, err error) { + is.True(err != nil) // expected messages from the second half to be acked unsuccessfully + }, + ) + + wantErr := cerrors.New("test error") + err := messages[500].Nack(wantErr) + is.True(err != nil) // expected the 500th message nack to fail with specific error + + // gracefully stop node and give the test 1 second to finish + close(in) + + err = helper.wait(ctx, &wg, time.Second) + is.NoErr(err) // expected to receive acks in time +} + +// sourceAckerNodeTestHelper groups together helper functions for tests related +// to SourceAckerNode. +type sourceAckerNodeTestHelper struct{} + +func (sourceAckerNodeTestHelper) newSourceAckerNode( + ctx context.Context, + is *is.I, + src connector.Source, +) (*SourceAckerNode, chan<- *Message, <-chan *Message) { + node := &SourceAckerNode{ + Name: "acker-node", + Source: src, + } + in := make(chan *Message) + out := node.Pub() + node.Sub(in) + + go func() { + err := node.Run(ctx) + is.NoErr(err) + }() + + return node, in, out +} + +func (sourceAckerNodeTestHelper) sendMessages( + ctx context.Context, + count int, + in chan<- *Message, + out <-chan *Message, +) []*Message { + messages := make([]*Message, count) + for i := 0; i < count; i++ { + m := &Message{ + Ctx: ctx, + Record: record.Record{ + Position: []byte(strconv.Itoa(i)), // position is monotonically increasing + }, + } + in <- m + <-out + messages[i] = m + } + return messages +} + +func (sourceAckerNodeTestHelper) expectAcks( + ctx context.Context, + messages []*Message, + src *mock.Source, +) []*gomock.Call { + count := len(messages) + + // expect to receive acks successfully + expectedCalls := make([]*gomock.Call, count) + for i := 0; i < count; i++ { + expectedCalls[i] = src.EXPECT(). + Ack(ctx, messages[i].Record.Position). + Return(nil) + } + + return expectedCalls +} + +func (sourceAckerNodeTestHelper) ackMessagesConcurrently( + wg *sync.WaitGroup, + messages []*Message, + assertAckErr func(*Message, error), +) { + const maxSleep = time.Millisecond + count := len(messages) + + wg.Add(count) + for i := 0; i < count; i++ { + go func(msg *Message) { + defer wg.Done() + // sleep for a random amount of time and ack the message + //nolint:gosec // math/rand is good enough for a test + time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond) + err := msg.Ack() + assertAckErr(msg, err) + }(messages[i]) + } +} + +func (sourceAckerNodeTestHelper) wait(ctx context.Context, wg *sync.WaitGroup, timeout time.Duration) error { + wgDone := make(chan struct{}) + go func() { + defer close(wgDone) + wg.Wait() + }() + + waitCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + select { + case <-waitCtx.Done(): + return waitCtx.Err() + case <-wgDone: + return nil + } +} diff --git a/pkg/pipeline/stream/stream_test.go b/pkg/pipeline/stream/stream_test.go index b5f0b6d44..1acc9c814 100644 --- a/pkg/pipeline/stream/stream_test.go +++ b/pkg/pipeline/stream/stream_test.go @@ -47,27 +47,33 @@ func Example_simpleStream() { Source: generatorSource(ctrl, logger, "generator", 10, time.Millisecond*10), PipelineTimer: noop.Timer{}, } - node2 := &stream.DestinationNode{ + node2 := &stream.SourceAckerNode{ + Name: "generator-acker", + Source: node1.Source, + } + node3 := &stream.DestinationNode{ Name: "printer", Destination: printerDestination(ctrl, logger, "printer"), ConnectorTimer: noop.Timer{}, } - node3 := &stream.AckerNode{ + node4 := &stream.DestinationAckerNode{ Name: "printer-acker", - Destination: node2.Destination, + Destination: node3.Destination, } - node2.AckerNode = node3 + node3.AckerNode = node4 stream.SetLogger(node1, logger) stream.SetLogger(node2, logger) stream.SetLogger(node3, logger) + stream.SetLogger(node4, logger) // put everything together - out := node1.Pub() - node2.Sub(out) + node2.Sub(node1.Pub()) + node3.Sub(node2.Pub()) var wg sync.WaitGroup - wg.Add(3) + wg.Add(4) + go runNode(ctx, &wg, node4) go runNode(ctx, &wg, node3) go runNode(ctx, &wg, node2) go runNode(ctx, &wg, node1) @@ -104,6 +110,7 @@ func Example_simpleStream() { // DBG received ack message_id=p/generator-10 node_id=generator // INF stopping source connector component=SourceNode node_id=generator // DBG received error on error channel error="error reading from source: stream not open" component=SourceNode node_id=generator + // DBG incoming messages channel closed component=SourceAckerNode node_id=generator-acker // DBG incoming messages channel closed component=DestinationNode node_id=printer // INF finished successfully } @@ -122,57 +129,62 @@ func Example_complexStream() { Source: generatorSource(ctrl, logger, "generator1", 10, time.Millisecond*10), PipelineTimer: noop.Timer{}, } - node2 := &stream.SourceNode{ + node2 := &stream.SourceAckerNode{ + Name: "generator1-acker", + Source: node1.Source, + } + node3 := &stream.SourceNode{ Name: "generator2", Source: generatorSource(ctrl, logger, "generator2", 10, time.Millisecond*10), PipelineTimer: noop.Timer{}, } - node3 := &stream.FaninNode{Name: "fanin"} - node4 := &stream.ProcessorNode{ + node4 := &stream.SourceAckerNode{ + Name: "generator2-acker", + Source: node3.Source, + } + node5 := &stream.FaninNode{Name: "fanin"} + node6 := &stream.ProcessorNode{ Name: "counter", Processor: counterProcessor(ctrl, &count), ProcessorTimer: noop.Timer{}, } - node5 := &stream.FanoutNode{Name: "fanout"} - node6 := &stream.DestinationNode{ + node7 := &stream.FanoutNode{Name: "fanout"} + node8 := &stream.DestinationNode{ Name: "printer1", Destination: printerDestination(ctrl, logger, "printer1"), ConnectorTimer: noop.Timer{}, } - node7 := &stream.DestinationNode{ + node9 := &stream.DestinationNode{ Name: "printer2", Destination: printerDestination(ctrl, logger, "printer2"), ConnectorTimer: noop.Timer{}, } - node8 := &stream.AckerNode{ + node10 := &stream.DestinationAckerNode{ Name: "printer1-acker", - Destination: node6.Destination, + Destination: node8.Destination, } - node6.AckerNode = node8 - node9 := &stream.AckerNode{ + node8.AckerNode = node10 + node11 := &stream.DestinationAckerNode{ Name: "printer2-acker", - Destination: node7.Destination, + Destination: node9.Destination, } - node7.AckerNode = node9 + node9.AckerNode = node11 // put everything together - out := node1.Pub() - node3.Sub(out) - out = node2.Pub() - node3.Sub(out) + node2.Sub(node1.Pub()) + node4.Sub(node3.Pub()) + + node5.Sub(node2.Pub()) + node5.Sub(node4.Pub()) - out = node3.Pub() - node4.Sub(out) - out = node4.Pub() - node5.Sub(out) + node6.Sub(node5.Pub()) + node7.Sub(node6.Pub()) - out = node5.Pub() - node6.Sub(out) - out = node5.Pub() - node7.Sub(out) + node8.Sub(node7.Pub()) + node9.Sub(node7.Pub()) // run nodes - nodes := []stream.Node{node1, node2, node3, node4, node5, node6, node7, node8, node9} + nodes := []stream.Node{node1, node2, node3, node4, node5, node6, node7, node8, node9, node10, node11} var wg sync.WaitGroup wg.Add(len(nodes)) @@ -186,7 +198,7 @@ func Example_complexStream() { 250*time.Millisecond, func() { node1.Stop(nil) - node2.Stop(nil) + node3.Stop(nil) }, ) // give the nodes some time to process the records, plus a bit of time to stop @@ -260,6 +272,8 @@ func Example_complexStream() { // DBG received ack message_id=p/generator1-10 node_id=generator1 // INF stopping source connector component=SourceNode node_id=generator1 // INF stopping source connector component=SourceNode node_id=generator2 + // DBG incoming messages channel closed component=SourceAckerNode node_id=generator1-acker + // DBG incoming messages channel closed component=SourceAckerNode node_id=generator2-acker // DBG received error on error channel error="error reading from source: stream not open" component=SourceNode node_id=generator1 // DBG received error on error channel error="error reading from source: stream not open" component=SourceNode node_id=generator2 // DBG incoming messages channel closed component=ProcessorNode node_id=counter