Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Acker Node #483

Merged
merged 34 commits into from
Jul 11, 2022
Merged

Source Acker Node #483

merged 34 commits into from
Jul 11, 2022

Conversation

lovromazgon
Copy link
Member

@lovromazgon lovromazgon commented Jun 22, 2022

Description

This PR adds a SourceAckerNode that is responsible for forwarding acks to the source connector. It will also be responsible for handling nacks in the future and forwarding them to the DLQ.

The PR also simplifies the message status change handler by removing middleware and using simple handlers instead. This ensures that all handlers are called and errors are collected (previously a middleware could stop the downstream middleware from being called).

Depends on #451.

Fixes #393

Quick checks:

  • I have followed the Code Guidelines.
  • There is no other pull request for the same update/change.
  • I have written unit tests.
  • I have made sure that the PR is of reasonable size and can be easily reviewed.

@lovromazgon lovromazgon changed the base branch from main to lovro/ticketqueue June 22, 2022 18:27
@lovromazgon lovromazgon marked this pull request as ready for review June 23, 2022 15:28
@lovromazgon lovromazgon requested a review from a team as a code owner June 23, 2022 15:28
@lovromazgon lovromazgon mentioned this pull request Jun 29, 2022
4 tasks
Copy link
Contributor

@hariso hariso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, Lovro! 🚀

// reverse order of how they were registered.
func (m *Message) RegisterStatusHandler(mw StatusChangeMiddleware) {
func (m *Message) RegisterStatusHandler(mw StatusChangeHandler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: we can rename mw (middleware) to whatever sounds better for a handler.

Base automatically changed from lovro/ticketqueue to lovro/stability July 11, 2022 18:07
@lovromazgon lovromazgon merged commit 361343f into lovro/stability Jul 11, 2022
@lovromazgon lovromazgon deleted the lovro/source-acker-node branch July 11, 2022 18:16
lovromazgon added a commit that referenced this pull request Jul 26, 2022
* Semaphore (#451)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* use cerrors.New

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* Source Acker Node (#483)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* rename AckerNode to DestinationAckerNode

* remove message status change middleware to ensure all message handlers are called

* implement SourceAckerNode

* add todo note about possible deadlock

* source acker node test

* don't forward acks after a failed ack/nack

* use cerrors

* use cerrors.New

* use LogOrReplace

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* update semaphore

* update param name

* remove redundant if clause

* Remove message status dropped (#487)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* rename AckerNode to DestinationAckerNode

* remove message status change middleware to ensure all message handlers are called

* implement SourceAckerNode

* add todo note about possible deadlock

* source acker node test

* remove message status dropped

* document behavior, fanout node return nacked message error

* don't forward acks after a failed ack/nack

* use cerrors

* use cerrors.New

* use LogOrReplace

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* update semaphore

* update param name

* remove redundant if clause

* Last position handling (#504)

* implement ticket queue

* experiment with ordered semaphore

* ticketqueue benchmarks

* reduce allocations

* remove ticketqueue (semaphore implementation is more performant)

* optimize semaphore for our use case

* fix linter warnings, better benchmarks

* better docs

* go mod tidy

* rename AckerNode to DestinationAckerNode

* remove message status change middleware to ensure all message handlers are called

* implement SourceAckerNode

* add todo note about possible deadlock

* source acker node test

* remove message status dropped

* document behavior, fanout node return nacked message error

* don't forward acks after a failed ack/nack

* use cerrors

* update plugin interface

* update standalone plugin implementation

* update builtin plugin implementation

* update connector

* update nodes

* change plugin semantics, close stream on teardown

* refactor stream, reuse it in source and destination

* lock stream when stopping

* create control message for source stop

* forward last position to destination

* update connector SDK, fix race condition in source node

* make Conduit in charge of closing connector streams

* Change plugin semantics around teardown - internal connector entity is
  now in charge of closing the stream instead of plugin.
* Map known gRPC errors to internal type (context.Canceled).
* Rewrite DestinationAckerNode to be a regular node staning after
  DestinationNode, receiving messages and triggering ack receiving. This
  makes the structure simpler and in line with all other nodes.
* Create OpenMessagesTracker to simplify tracking open messages in
  SourceNode and DestinationNode.

* destination acker tests

* use cerrors.New

* use LogOrReplace

* use LogOrReplace

* make signal channel buffered

* improve benchmarks

* fix linter error

* add comments

* simplify implementation

* update semaphore

* update param name

* remove redundant if clause

* make it possible only to inject control messages

* improve destination acker caching test

* remove TODO comment

* update comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature: Remove message status "dropped" Feature: Ack/nack handler node
2 participants