-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Source Acker Node #483
Merged
Merged
Source Acker Node #483
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
4 tasks
lovromazgon
commented
Jul 7, 2022
hariso
approved these changes
Jul 11, 2022
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, Lovro! 🚀
pkg/pipeline/stream/message.go
Outdated
// reverse order of how they were registered. | ||
func (m *Message) RegisterStatusHandler(mw StatusChangeMiddleware) { | ||
func (m *Message) RegisterStatusHandler(mw StatusChangeHandler) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: we can rename mw (middleware) to whatever sounds better for a handler.
lovromazgon
added a commit
that referenced
this pull request
Jul 26, 2022
* Semaphore (#451) * implement ticket queue * experiment with ordered semaphore * ticketqueue benchmarks * reduce allocations * remove ticketqueue (semaphore implementation is more performant) * optimize semaphore for our use case * fix linter warnings, better benchmarks * better docs * go mod tidy * use cerrors.New * improve benchmarks * fix linter error * add comments * simplify implementation * Source Acker Node (#483) * implement ticket queue * experiment with ordered semaphore * ticketqueue benchmarks * reduce allocations * remove ticketqueue (semaphore implementation is more performant) * optimize semaphore for our use case * fix linter warnings, better benchmarks * better docs * go mod tidy * rename AckerNode to DestinationAckerNode * remove message status change middleware to ensure all message handlers are called * implement SourceAckerNode * add todo note about possible deadlock * source acker node test * don't forward acks after a failed ack/nack * use cerrors * use cerrors.New * use LogOrReplace * improve benchmarks * fix linter error * add comments * simplify implementation * update semaphore * update param name * remove redundant if clause * Remove message status dropped (#487) * implement ticket queue * experiment with ordered semaphore * ticketqueue benchmarks * reduce allocations * remove ticketqueue (semaphore implementation is more performant) * optimize semaphore for our use case * fix linter warnings, better benchmarks * better docs * go mod tidy * rename AckerNode to DestinationAckerNode * remove message status change middleware to ensure all message handlers are called * implement SourceAckerNode * add todo note about possible deadlock * source acker node test * remove message status dropped * document behavior, fanout node return nacked message error * don't forward acks after a failed ack/nack * use cerrors * use cerrors.New * use LogOrReplace * improve benchmarks * fix linter error * add comments * simplify implementation * update semaphore * update param name * remove redundant if clause * Last position handling (#504) * implement ticket queue * experiment with ordered semaphore * ticketqueue benchmarks * reduce allocations * remove ticketqueue (semaphore implementation is more performant) * optimize semaphore for our use case * fix linter warnings, better benchmarks * better docs * go mod tidy * rename AckerNode to DestinationAckerNode * remove message status change middleware to ensure all message handlers are called * implement SourceAckerNode * add todo note about possible deadlock * source acker node test * remove message status dropped * document behavior, fanout node return nacked message error * don't forward acks after a failed ack/nack * use cerrors * update plugin interface * update standalone plugin implementation * update builtin plugin implementation * update connector * update nodes * change plugin semantics, close stream on teardown * refactor stream, reuse it in source and destination * lock stream when stopping * create control message for source stop * forward last position to destination * update connector SDK, fix race condition in source node * make Conduit in charge of closing connector streams * Change plugin semantics around teardown - internal connector entity is now in charge of closing the stream instead of plugin. * Map known gRPC errors to internal type (context.Canceled). * Rewrite DestinationAckerNode to be a regular node staning after DestinationNode, receiving messages and triggering ack receiving. This makes the structure simpler and in line with all other nodes. * Create OpenMessagesTracker to simplify tracking open messages in SourceNode and DestinationNode. * destination acker tests * use cerrors.New * use LogOrReplace * use LogOrReplace * make signal channel buffered * improve benchmarks * fix linter error * add comments * simplify implementation * update semaphore * update param name * remove redundant if clause * make it possible only to inject control messages * improve destination acker caching test * remove TODO comment * update comment
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
This PR adds a
SourceAckerNode
that is responsible for forwarding acks to the source connector. It will also be responsible for handling nacks in the future and forwarding them to the DLQ.The PR also simplifies the message status change handler by removing middleware and using simple handlers instead. This ensures that all handlers are called and errors are collected (previously a middleware could stop the downstream middleware from being called).
Depends on #451.
Fixes #393
Quick checks: