-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add generics to subscriber and publisher and fix potential deadlock #602
Conversation
WalkthroughThe changes in this pull request involve significant updates to the Changes
Possibly related PRs
Suggested reviewers
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@@ -162,8 +147,7 @@ func (s *StreamAPI) newSubscription( | |||
|
|||
subs := models.NewSubscription(callback(notifier, rpcSub)) | |||
|
|||
rpcSub.ID = rpc.ID(subs.ID().String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit hesitant about removing this assignment here. If I remember correctly, this was added by Gregor, due to some issue with the event streaming API. But on the other hand, the CI is passing, so maybe it's not needed after all 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the heads up, I'll take a look at the history, maybe there were some clues what specific problem this was solving.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't find anything, we can just merge it. It doesn't seem to break anything, and the E2E tests do exercise this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@janezpodhostnik I did investigate this. What I found is:
rpcSub.ID
has the format: 0x8b841deff1dbca0881c995ad77574d17
.
I think Gregor changed it to uuid
, to match the format of the event streaming API,
e.g.: 69a20431-f601-43da-99f1-aa4bfd4d1bac
But we're using entirely the subscription functionality from Geth
, so there's no need to go with the uuid
format.
I think we're all good here. It would make sense though, to log the proper subscription-id
, e.g.
l := logger.With().Str("subscription-id", string(rpcSub.ID)).Logger()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in 47ecbea. Please check if that is what you had in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, good idea adding both 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Just one small remark/concern, maybe it's nothing to worry about though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (2)
services/traces/engine.go (1)
Line range hint
1-150
: Summary: Successful implementation of generics and alignment with PR objectivesThe changes in this file successfully implement generics for the subscriber and publisher components, aligning well with the PR objectives. Key improvements include:
- Consistent use of generics for
blocksPublisher
in both theEngine
struct and theNewTracesIngestionEngine
function.- Refactoring of the
Notify
method to use a specific*models.Block
type instead ofany
.- Removal of the
ID
method, indicating a change in the subscription mechanism as mentioned in the PR objectives.These changes enhance code readability, improve type safety, and eliminate the need for certain checks, as intended. The modifications streamline the codebase and reduce the potential for runtime errors.
Consider documenting the rationale behind removing the
ID
method and any changes to the subscription mechanism in the code comments or PR description. This will help future maintainers understand the architectural decisions made in this PR.services/traces/engine_test.go (1)
30-30
: Summary: Successful implementation of generics with potential for refactoring.The changes consistently implement generics for the
NewPublisher
method across all test cases, improving type safety and readability as per the PR objectives. The existing test logic remains unchanged, indicating backward compatibility.Consider refactoring these test cases to reduce code duplication, possibly by creating a helper function for setting up the
NewPublisher[*models.Block]()
instance and other common setup steps.Would you like assistance in creating a helper function to reduce code duplication across these test cases?
Also applies to: 116-116, 233-233
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (9)
- api/stream.go (7 hunks)
- bootstrap/bootstrap.go (4 hunks)
- go.mod (1 hunks)
- models/stream.go (1 hunks)
- services/ingestion/engine.go (3 hunks)
- services/ingestion/engine_test.go (6 hunks)
- services/requester/pool.go (1 hunks)
- services/traces/engine.go (3 hunks)
- services/traces/engine_test.go (3 hunks)
🔇 Additional comments (32)
services/requester/pool.go (3)
38-38
: Approve the updated parameter type inNewTxPool
The modification of the
transactionsPublisher
parameter type to*models.Publisher[*gethTypes.Transaction]
in theNewTxPool
function is consistent with the change in theTxPool
struct. This ensures type consistency and improves the overall type safety of the code.To ensure this change is properly implemented across the codebase, please run the following script to check for any calls to
NewTxPool
that might need updating:#!/bin/bash # Description: Check for calls to NewTxPool that might need updating # Search for NewTxPool calls echo "Searching for NewTxPool calls:" rg --type go "NewTxPool\(" -C 3 # Search for NewTxPool function definition echo "Searching for NewTxPool function definition:" ast-grep --lang go --pattern 'func NewTxPool($_) $_'
32-38
: Overall impact: Improved type safety with genericsThe introduction of generics for the
Publisher
type in both theTxPool
struct and its constructorNewTxPool
aligns well with the PR objectives. These changes enhance code readability, improve type safety, and potentially simplify the codebase by reducing the need for type assertions.While the changes appear to be low-risk and localized, it's crucial to ensure consistency throughout the codebase.
To ensure a comprehensive implementation of this change:
- Review all usages of
TxPool
andNewTxPool
across the project.- Update any code that interacts with the
txPublisher
field to leverage the new generic type.- Verify that all tests related to
TxPool
are updated and passing.- Consider adding new tests that specifically verify the behavior with the generic
Publisher[*gethTypes.Transaction]
.Run the following script to get an overview of
TxPool
usage across the project:#!/bin/bash # Description: Overview of TxPool usage across the project # Search for TxPool struct usage echo "Searching for TxPool struct usage:" rg --type go "type.*TxPool.*struct" -C 3 # Search for NewTxPool function calls echo "Searching for NewTxPool function calls:" rg --type go "NewTxPool\(" -C 3 # Search for txPublisher field access echo "Searching for txPublisher field access:" rg --type go "\.txPublisher" -C 3 # Search for Publisher type usage echo "Searching for Publisher type usage:" rg --type go "Publisher(\[.*\])?" -C 3This script will help identify areas that might need attention due to the introduction of generics.
32-32
: Approve the generic type fortxPublisher
The introduction of the generic type
*gethTypes.Transaction
for thetxPublisher
field enhances type safety and aligns with the PR objective of incorporating generics. This change improves code readability and reduces the need for type assertions.To ensure this change doesn't introduce any issues, please run the following script to check for any other occurrences of
TxPool
that might need updating:✅ Verification successful
Verified:
txPublisher
Update is SafeThe change to the
txPublisher
field in theTxPool
struct is localized and does not affect other parts of the codebase. All usages oftxPublisher
are confined toservices/requester/pool.go
, ensuring that the introduction of the generic type*gethTypes.Transaction
does not introduce any issues.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for other occurrences of TxPool that might need updating # Search for TxPool usage echo "Searching for TxPool usage:" rg --type go "TxPool" -C 3 # Search for txPublisher usage within TxPool echo "Searching for txPublisher usage within TxPool:" ast-grep --lang go --pattern 'struct TxPool { $$$ txPublisher $_ $$$ }'Length of output: 17541
services/traces/engine.go (3)
32-32
: Excellent use of generics for type safetyThe change from
*models.Publisher
to*models.Publisher[*models.Block]
for theblocksPublisher
field is a great improvement. This use of generics enhances type safety and code readability by explicitly specifying that the publisher deals with*models.Block
types. It eliminates the need for type assertions when working with published data, reducing the potential for runtime errors.
41-41
: Consistent use of generics in constructorThe update to the
NewTracesIngestionEngine
function signature, changing theblocksPublisher
parameter type to*models.Publisher[*models.Block]
, is consistent with the earlier change to theEngine
struct. This ensures type consistency throughout the codebase and reinforces the benefits of using generics for improved type safety and readability.
72-72
: Improved type safety in Notify methodThe change to the
Notify
method signature fromfunc (e *Engine) Notify(data any)
tofunc (e *Engine) Notify(block *models.Block)
is a significant improvement. This change:
- Eliminates the need for type assertion within the method.
- Aligns with the use of generics in the
blocksPublisher
field.- Improves type safety and makes the code more straightforward.
These improvements contribute to better code readability and reduce the potential for runtime errors.
To ensure this change doesn't affect other parts of the codebase, please run the following verification script:
This script will help identify any other places in the codebase that might need to be updated to match this new signature.
✅ Verification successful
Further Verification Required for Notify Method Changes
The initial verification identified additional
Notify
method implementations and usages:
models/stream.go:func (b *Subscription[T]) Notify(data T)
cmd/main/main.go:signal.Notify(osSig, syscall.SIGINT, syscall.SIGTERM)
- Multiple calls in
api/stream.go
To ensure that the signature change in
services/traces/engine.go
does not adversely affect other parts of the codebase, please run the following additional verification scripts:These scripts will help isolate and verify the impact of the
Notify
method signature change across the codebase.
Notify Method Signature Change Verified Successfully
The modification to the
Notify
method inservices/traces/engine.go
has been thoroughly verified and confirmed to be safe. This change:
- Does not affect other
Notify
method implementations inmodels/stream.go
.- Does not interfere with
signal.Notify
or anynotifier.Notify
calls inapi/stream.go
.- Maintains consistent and improved type safety within the affected method.
No further actions are required.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for any other implementations or uses of the Notify method that might be affected by this change. # Test: Search for other Notify method implementations or interface definitions rg --type go -e 'func.*Notify\s*\([^)]*\)' -e 'type.*interface.*\{.*Notify.*\}' # Test: Search for calls to the Notify method rg --type go -e '\.Notify\s*\('Length of output: 597
Script:
#!/bin/bash # Description: Identify all Notify method implementations excluding services/traces/engine.go rg --type go 'func\s+\(.*\)\s+Notify\s*\(.*\)' | grep -v 'services/traces/engine.go' # Description: Check all Notify method calls excluding external signal.Notify rg --type go '\.Notify\s*\(' | grep -v 'signal.Notify'Length of output: 476
services/traces/engine_test.go (3)
116-116
: LGTM: Consistent use of generic type parameter.The change to use
models.NewPublisher[*models.Block]()
is consistent with the previous test case and correctly implements the generic type parameter for the blocks publisher. This maintains consistency across test cases and aligns with the PR objective.
233-233
: LGTM: Consistent implementation of generics across all test cases.The change to use
models.NewPublisher[*models.Block]()
maintains consistency with the previous test cases and correctly implements the generic type parameter for the block broadcaster. This ensures uniformity across all test scenarios in the file.
30-30
: LGTM: Generic type parameter correctly applied.The change to use
models.NewPublisher[*models.Block]()
correctly implements the generic type parameter for the block publisher. This aligns with the PR objective of adding generics to improve code readability and type safety.To ensure consistency across the codebase, let's verify the usage of
NewPublisher
:✅ Verification successful
Verified: All
NewPublisher
usages are now generic.The usage of
models.NewPublisher[*models.Block]()
has been correctly updated across the codebase. All instances utilize the generic type parameter, ensuring consistency and enhancing type safety.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of NewPublisher across the codebase # Test: Search for all occurrences of NewPublisher echo "Occurrences of NewPublisher:" rg --type go 'NewPublisher\[.*\]' # Test: Check for any remaining non-generic usage of NewPublisher echo "\nPotential non-generic usage of NewPublisher:" rg --type go 'NewPublisher\(\)' -g '!*_test.go'Length of output: 1590
services/ingestion/engine.go (5)
9-9
: LGTM: Import statement added for geth types.The addition of the
gethTypes
import is necessary for the updated type definitions in theEngine
struct. Good job on aliasing it to avoid potential naming conflicts.
45-46
: Excellent use of generics for improved type safety.The updates to
blocksPublisher
andlogsPublisher
field types in theEngine
struct are well-implemented:
blocksPublisher
is now*models.Publisher[*models.Block]
logsPublisher
is now*models.Publisher[[]*gethTypes.Log]
These changes enhance type safety and code clarity, aligning perfectly with the PR objectives. The specific types used for each publisher are appropriate and will help prevent type-related errors at compile-time.
57-58
: LGTM: Function signature updated consistently with struct changes.The
NewEventIngestionEngine
function signature has been correctly updated to match the changes in theEngine
struct:
blocksPublisher *models.Publisher[*models.Block]
logsPublisher *models.Publisher[[]*gethTypes.Log]
This update maintains type consistency throughout the codebase and ensures the function remains compatible with the updated
Engine
struct. Good job on keeping everything in sync!
Line range hint
1-309
: Summary: Excellent implementation of generics for improved type safety.Overall, the changes in this file successfully introduce generics to the
Publisher
types used in theEngine
struct and its associated methods. Key points:
- The use of generics enhances type safety and code clarity.
- Changes are consistent across struct fields and function signatures.
- The implementation maintains backward compatibility, minimizing the impact on the existing codebase.
These updates align well with the PR objectives of enhancing readability and simplifying the codebase. The introduction of generics should help prevent type-related errors at compile-time and make the code more self-documenting.
Great job on this implementation!
Line range hint
201-209
: LGTM: Unchanged method implementation maintains compatibility.The
processEvents
method implementation remains unchanged, which is good for maintaining backward compatibility. ThePublish
method calls for bothblocksPublisher
andlogsPublisher
are still valid with the new generic types.However, to ensure full type safety:
Could you please verify that the
Publish
method of the genericPublisher
type is correctly implemented to handle*models.Block
and[]*gethTypes.Log
types respectively? This will confirm that the type safety extends to the method calls as well.go.mod (1)
84-84
: Approve change and verify UUID usage removalThe change from a direct to an indirect dependency for
github.com/google/uuid
aligns with the PR objectives of modifying the handling of subscriber's UUID. This suggests that the project no longer directly uses the UUID package, which is consistent with the proposed switch to pointer comparison for subscribers.To ensure that all direct usage of the UUID package has been removed from the codebase, please run the following verification script:
If the script returns any results, please review those occurrences and ensure they have been properly addressed in this PR.
bootstrap/bootstrap.go (5)
13-13
: LGTM: Import addition aligns with generics implementation.The addition of the
gethTypes
import is consistent with the PR's objective of implementing generics and improving type safety. This change supports the use of Ethereum types in thePublishers
struct.
40-42
: Excellent use of generics in thePublishers
struct.The implementation of generics in the
Publishers
struct significantly enhances type safety and code clarity. Each field now has a specific type parameter:
Block
uses*models.Block
Transaction
uses*gethTypes.Transaction
Logs
uses[]*gethTypes.Log
This change aligns perfectly with the PR's objective of improving code readability and eliminating the need for certain type checks.
76-78
: LGTM:Publishers
initialization correctly uses generics.The initialization of the
Publishers
struct has been updated to use the new generic types, which is consistent with the struct definition. This change ensures type safety and clarity in the code.
213-217
: LGTM: Improved formatting fortxPool
initialization.The
txPool
initialization has been reformatted for better readability. This change improves code clarity without altering the functionality.
Line range hint
1-524
: Overall assessment: Excellent implementation of generics.The changes in this file successfully implement generics in the
Publishers
struct and its usage, aligning perfectly with the PR's objectives. Key improvements include:
- Enhanced type safety and clarity in the
Publishers
struct.- Consistent use of generics in the
Publishers
initialization.- Improved code readability through better formatting.
These changes will likely simplify the codebase and reduce the need for certain type checks, as intended. The implementation is clean and well-executed.
services/ingestion/engine_test.go (6)
71-72
: Excellent use of generics for type-specific publishersThe change from generic publishers to type-specific ones (
models.NewPublisher[*models.Block]()
andmodels.NewPublisher[[]*gethTypes.Log]()
) enhances type safety and code clarity. This aligns well with the PR objective of incorporating generics into the subscriber and publisher components.
151-152
: Consistent use of type-specific publishers across test casesThe changes here mirror those in the previous test case, maintaining consistency in the use of type-specific publishers. This uniformity is commendable and helps ensure that all test cases accurately reflect the updated implementation.
266-267
: Thorough application of type-specific publishers across all test casesThe consistent application of type-specific publishers (
models.NewPublisher[*models.Block]()
andmodels.NewPublisher[[]*gethTypes.Log]()
) across all test cases demonstrates a thorough and systematic update. This ensures that all scenarios are tested with the new generic implementation.
369-370
: Comprehensive coverage of test scenarios with type-specific publishersThe consistent use of type-specific publishers (
models.NewPublisher[*models.Block]()
andmodels.NewPublisher[[]*gethTypes.Log]()
) in this test case further demonstrates the comprehensive coverage of various scenarios. This thorough approach ensures that the new generic implementation is tested under different conditions.
418-419
: Improved code formattingThis minor formatting adjustment, adding a comma and moving the closing brace to a new line, enhances code readability. While not directly related to the generics implementation, it's a welcome improvement that aligns with good coding practices.
Line range hint
1-619
: Summary: Successful incorporation of generics in publisher componentsThe changes in this file consistently implement type-specific publishers across all test cases, aligning perfectly with the PR objective of incorporating generics into the subscriber and publisher components. The thorough and systematic update ensures that all test scenarios accurately reflect the new generic implementation.
Key points:
- Consistent use of
models.NewPublisher[*models.Block]()
andmodels.NewPublisher[[]*gethTypes.Log]()
across all test cases.- No modifications to the core test logic, maintaining the integrity of existing tests.
- Improved type safety and code clarity through the use of specific types.
These changes should contribute to a more robust and type-safe implementation in the main code. Great job on maintaining consistency and improving the overall quality of the codebase.
api/stream.go (6)
27-29
: Use of Generics in Publisher Fields Enhances Type SafetyUpdating the
blocksPublisher
,transactionsPublisher
, andlogsPublisher
fields to use generics improves type safety and code clarity by explicitly defining the types of data being published.
38-40
: Constructor Parameters Updated to Reflect Generic TypesThe parameters in
NewStreamAPI
are updated to use genericPublisher
types, ensuring consistency with the struct fields and enhancing type safety.
56-61
: Callback Function inNewHeads
Method Updated CorrectlyThe callback function now accepts
*models.Block
, aligning with the genericnewSubscription
function. This change ensures that the correct data type is handled in the subscription.
88-93
: Correct Update to Callback Function inNewPendingTransactions
The callback function in
NewPendingTransactions
now accepts*gethTypes.Transaction
, matching the generic type innewSubscription
. This ensures proper handling of transaction data.
111-116
: Updated Callback Function inLogs
Method Aligns with Generic SubscriptionThe callback function now accepts
[]*gethTypes.Log
, correctly reflecting the data type used in thelogsPublisher
. This change aligns with the genericnewSubscription
function and ensures type-safe handling of log data.
135-139
: GeneralizednewSubscription
Function with Generics and Logger ParameterThe
newSubscription
function is now generic over typeT
and includes alogger
parameter. This enhances flexibility, allowing for different data types in subscriptions, and provides better logging capabilities.
@m-Peter After you comment I went back and added some tests. Good thing I did. There was a bug in the subscription. The err channel was never initialized. so sending to it would just block forever! I also added a failsafe for sending to the err channel in case no one is listening. I added a panic, but I'm also thinking just logging it might be a better option. PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (1)
models/stream_test.go (1)
87-90
: Flaky test due to timing assumptionsThe test relies on reaching at least 50 calls within a fixed time frame, which can result in flaky tests depending on system load and scheduling.
Consider using synchronization primitives to control the flow of the test, ensuring that subscribers receive the expected number of calls without relying on timing.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- models/stream.go (1 hunks)
- models/stream_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- models/stream.go
🔇 Additional comments (2)
models/stream_test.go (2)
13-43
: Well-structured tests enhance readabilityThe use of subtests with
t.Run
improves the organization and readability of the test suite, making it easier to understand and maintain.
98-123
: Effective validation of error propagationThe error handling test correctly verifies that errors returned by subscription callbacks are properly received through the error channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
- models/stream_test.go (1 hunks)
🔇 Additional comments (1)
models/stream_test.go (1)
14-125
: Tests are comprehensive and well-structuredThe test suite effectively covers various scenarios, including edge cases and concurrent operations, ensuring the robustness of the publisher-subscriber model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Just a small change on the subscription-id
value that is logged.
err := b.callback(data) | ||
if err != nil { | ||
b.err <- err | ||
select { | ||
case b.err <- err: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the subscription cares about callback returning error or not.
I think the subscription's responsibility is to deliver the data to the callback. If there is error, it's the callback's job to handle it, log it, or even crash.
The callback has all the context of why this would error, so it's better being handled there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed!
Right now I just added a band-aid, but I will definitely fix this further with a following PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (1)
models/stream_test.go (1)
6-6
: Consider usingsync/atomic
for all atomic operations.For consistency, consider using
sync/atomic
for all atomic operations in the file. You can replaceatomic.Uint64
withuint64
and useatomic.LoadUint64
andatomic.AddUint64
for atomic operations.Apply this diff to use
sync/atomic
consistently:type mockSubscription struct { *models.Subscription[mockData] - callCount atomic.Uint64 + callCount uint64 } func newMockSubscription() *mockSubscription { s := &mockSubscription{} s.Subscription = models.NewSubscription[mockData](func(data mockData) error { - s.callCount.Add(1) + atomic.AddUint64(&s.callCount, 1) return nil }) return s } func (s *mockSubscription) CallCount() uint64 { - return s.callCount.Load() + return atomic.LoadUint64(&s.callCount) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- models/stream_test.go (1 hunks)
- services/ingestion/engine.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- services/ingestion/engine.go
🔇 Additional comments (1)
models/stream_test.go (1)
1-165
: LGTM!The test suite is comprehensive and well-structured, covering various scenarios for the publisher-subscriber model. The use of sub-tests, mock objects, and synchronization primitives ensures thorough testing of the functionality.
t.Run("error handling", func(t *testing.T) { | ||
p := newMockPublisher() | ||
s := &mockSubscription{} | ||
errContent := fmt.Errorf("failed to process data") | ||
|
||
s.Subscription = models.NewSubscription[mockData](func(data mockData) error { | ||
s.callCount.Add(1) | ||
return errContent | ||
}) | ||
|
||
p.Subscribe(s) | ||
|
||
shouldReceiveError := make(chan struct{}) | ||
ready := make(chan struct{}) | ||
go func() { | ||
close(ready) | ||
select { | ||
case err := <-s.Error(): | ||
require.ErrorIs(t, err, errContent) | ||
case <-shouldReceiveError: | ||
require.Fail(t, "should have received error") | ||
} | ||
}() | ||
<-ready | ||
|
||
p.Publish(mockData{}) | ||
close(shouldReceiveError) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure proper cleanup of goroutines in the error handling test.
The error handling test spawns a goroutine to listen for errors, but it doesn't ensure proper cleanup of the goroutine. If the test fails or panics, the goroutine may continue running in the background.
To ensure proper cleanup, use a defer
statement to close the shouldReceiveError
channel and wait for the goroutine to exit. Apply this diff:
func Test_Stream(t *testing.T) {
// ...
t.Run("error handling", func(t *testing.T) {
// ...
shouldReceiveError := make(chan struct{})
+ defer close(shouldReceiveError)
+
ready := make(chan struct{})
+ done := make(chan struct{})
+ defer func() {
+ <-done
+ }()
+
go func() {
+ defer close(done)
close(ready)
select {
case err := <-s.Error():
require.ErrorIs(t, err, errContent)
case <-shouldReceiveError:
require.Fail(t, "should have received error")
}
}()
<-ready
p.Publish(mockData{})
- close(shouldReceiveError)
})
}
This ensures that the shouldReceiveError
channel is always closed, and the test waits for the goroutine to finish before proceeding.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
t.Run("error handling", func(t *testing.T) { | |
p := newMockPublisher() | |
s := &mockSubscription{} | |
errContent := fmt.Errorf("failed to process data") | |
s.Subscription = models.NewSubscription[mockData](func(data mockData) error { | |
s.callCount.Add(1) | |
return errContent | |
}) | |
p.Subscribe(s) | |
shouldReceiveError := make(chan struct{}) | |
ready := make(chan struct{}) | |
go func() { | |
close(ready) | |
select { | |
case err := <-s.Error(): | |
require.ErrorIs(t, err, errContent) | |
case <-shouldReceiveError: | |
require.Fail(t, "should have received error") | |
} | |
}() | |
<-ready | |
p.Publish(mockData{}) | |
close(shouldReceiveError) | |
}) | |
t.Run("error handling", func(t *testing.T) { | |
p := newMockPublisher() | |
s := &mockSubscription{} | |
errContent := fmt.Errorf("failed to process data") | |
s.Subscription = models.NewSubscription[mockData](func(data mockData) error { | |
s.callCount.Add(1) | |
return errContent | |
}) | |
p.Subscribe(s) | |
shouldReceiveError := make(chan struct{}) | |
defer close(shouldReceiveError) | |
ready := make(chan struct{}) | |
done := make(chan struct{}) | |
defer func() { | |
<-done | |
}() | |
go func() { | |
defer close(done) | |
close(ready) | |
select { | |
case err := <-s.Error(): | |
require.ErrorIs(t, err, errContent) | |
case <-shouldReceiveError: | |
require.Fail(t, "should have received error") | |
} | |
}() | |
<-ready | |
p.Publish(mockData{}) | |
}) |
t.Run("concurrent subscribe, publish, unsubscribe, publish", func(t *testing.T) { | ||
|
||
p := newMockPublisher() | ||
|
||
stopPublishing := make(chan struct{}) | ||
|
||
published := make(chan struct{}) | ||
|
||
// publishing | ||
go func() { | ||
for { | ||
select { | ||
case <-stopPublishing: | ||
return | ||
case <-time.After(time.Millisecond * 1): | ||
p.Publish(mockData{}) | ||
|
||
select { | ||
case published <- struct{}{}: | ||
default: | ||
} | ||
} | ||
} | ||
}() | ||
|
||
waitAllSubscribed := sync.WaitGroup{} | ||
waitAllUnsubscribed := sync.WaitGroup{} | ||
|
||
// 10 goroutines adding 10 subscribers each | ||
// and then unsubscribe all | ||
waitAllSubscribed.Add(10) | ||
waitAllUnsubscribed.Add(10) | ||
for i := 0; i < 10; i++ { | ||
go func() { | ||
subscriptions := make([]*mockSubscription, 10) | ||
|
||
for j := 0; j < 10; j++ { | ||
s := newMockSubscription() | ||
subscriptions[j] = s | ||
p.Subscribe(s) | ||
|
||
} | ||
waitAllSubscribed.Done() | ||
waitAllSubscribed.Wait() | ||
|
||
// wait for all subscribers to receive data | ||
for i := 0; i < 10; i++ { | ||
<-published | ||
} | ||
|
||
for _, s := range subscriptions { | ||
p.Unsubscribe(s) | ||
} | ||
|
||
// there should be at least 1 call | ||
for j := 0; j < 10; j++ { | ||
require.GreaterOrEqual(t, subscriptions[j].CallCount(), uint64(10)) | ||
} | ||
|
||
waitAllUnsubscribed.Done() | ||
}() | ||
} | ||
|
||
waitAllUnsubscribed.Wait() | ||
close(stopPublishing) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Insufficient Buffer Size for published
Channel
The published
channel is currently unbuffered, which may lead to blocking the publishing goroutine if subscribers are not ready to receive. This can cause the test to hang or not function as expected.
- Recommendation: Add a buffer size to the
published
channel to prevent blocking. For example:published := make(chan struct{}, 100) // Adjust the buffer size as needed
🔗 Analysis chain
Verify the correctness of the concurrent test.
The concurrent test is well-designed and uses appropriate synchronization primitives. However, there are a few points to verify:
- Ensure that the
published
channel has sufficient buffer size to avoid blocking the publishing goroutine. - Confirm that waiting for 10 published events per subscriber is sufficient to guarantee that all subscribers receive the expected number of calls.
- Verify that the
GreaterOrEqual
assertion with a threshold of 10 is appropriate and accounts for potential race conditions.
Run the following script to analyze the concurrent behavior:
Review the output to ensure that:
- The
published
channel has a sufficient buffer size. - The publishing goroutine sends to the
published
channel. - Each subscriber waits for 10 published events.
- The assertion checks for at least 10 calls per subscriber.
If any of these points are not met, consider adjusting the test accordingly.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Find the size of the `published` channel buffer.
rg --type go 'published\s*:=\s*make\(chan\s+struct\{\},\s*(\d+)\)'
# Check if the publishing goroutine sends to the `published` channel.
rg --type go 'case\s+published\s*<-\s*struct\{\}\{\}:'
# Analyze the waiting logic for published events.
rg --type go 'for\s+i\s*:=\s*0;\s*i\s*<\s*10;\s*i\+\+\s*\{[\s\S]*<-published[\s\S]*\}'
# Examine the assertion for the expected number of calls.
rg --type go 'require\.GreaterOrEqual\(t,\s*subscriptions\[j\]\.CallCount\(\),\s*uint64\(10\)\)'
Length of output: 465
Description
With generics the code is easier to read and some checks are no longer needed.
A change also include in this is the removal of the uuid of the subscriber. Pointer comparison can be used instead, making it a bit simpler.
I found a bug with the error channel on the subscriber. Sending an error to that channel would just block forever, since the channel was always nil.
For contributor use:
master
branchFiles changed
in the Github PR explorerSummary by CodeRabbit
Release Notes
New Features
Bug Fixes
Chores
github.com/google/uuid
, now managed indirectly.