-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return mesh gateway addrs if peering through mgw #14694
Conversation
@@ -150,7 +150,6 @@ func (m *CertManager) watchServerToken(ctx context.Context) { | |||
|
|||
// Cancel existing the leaf cert watch and spin up new one any time the server token changes. | |||
// The watch needs the current token as set by the leader since certificate signing requests go to the leader. | |||
fmt.Println("canceling and resetting") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stray print from a previous PR
|
||
if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collapsed this error handling with the ctx.Done() check below, since waiter.Wait
can let us know if the context was canceled
} else { | ||
// Block for any changes to the state store. | ||
updateCh <- cache.UpdateEvent{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seemed like it could block forever if the peering is deleted, so now there's a check for context cancelation.
if state.exportList != nil { | ||
// Trigger public events for all synthetic discovery chain replies. | ||
for chainName, info := range state.connectServices { | ||
m.emitEventForDiscoveryChain(ctx, state, pending, chainName, info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed for clarity since this method doesn't actually emit the events
@@ -49,18 +50,10 @@ func TestSubscriptionManager_RegisterDeregister(t *testing.T) { | |||
subCh := mgr.subscribe(ctx, id, "my-peering", partition) | |||
|
|||
var ( | |||
gatewayCorrID = subMeshGateway + partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these test changes follow from not directly exporting mesh gateway instances
if len(serverAddrs) == 0 { | ||
m.logger.Warn("did not find any server addresses with external gRPC ports to publish") | ||
continue | ||
} | ||
|
||
updateCh <- cache.UpdateEvent{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again changed to consider context cancellation to avoid potentially blocking forever
@@ -239,16 +243,10 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti | |||
|
|||
pending := &pendingPayload{} | |||
|
|||
// Directly replicate information about our mesh gateways to the consuming side. | |||
// TODO(peering): should we scrub anything before replicating this? | |||
if err := pending.Add(meshGatewayPayloadID, u.CorrelationID, csn); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were never actually sent these to the peer, the handler for these is a no-op in stream_resources.go
. That case statement has been removed now as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comments but LGTM!
agent/grpc-external/services/peerstream/subscription_manager.go
Outdated
Show resolved
Hide resolved
} else if err != nil { | ||
logger.Error("failed to wait before re-trying sync", "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waiter.Wait
only returns context errors; we could remove this branch since I don't think there's any reason for a waiter function to return anything else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a comment to make that contract explicit, though I don't love that callers rely on that behavior
This commit adds handling so that the replication stream considers whether the user intends to peer through mesh gateways. The subscription will return server or mesh gateway addresses depending on the mesh configuration setting. These watches can be updated at runtime by modifying the mesh config entry.
a9d9080
to
a8c4d6b
Compare
Description
Exposing servers through mesh gateways will involve sharing mesh gateway addresses with peers instead of true server addresses.
There are two flows where these addresses are shared:
Each flow was addressed in a separate commit.
Testing & Reproduction steps
Links
PR Checklist