-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement the ServerDiscovery.WatchServers gRPC endpoint #12819
Conversation
584c47d
to
1e267ca
Compare
28590ef
to
f0f7469
Compare
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
t.Cleanup(cancel) | ||
|
||
ctx = public.ContextWithToken(ctx, TestDefaultInitialManagementToken) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a token to the context because the new testGRPCIntegrationServer turns ACLs on.
@@ -48,6 +48,7 @@ func (s *Server) Sign(ctx context.Context, req *pbconnectca.SignRequest) (*pbcon | |||
var rsp *pbconnectca.SignResponse | |||
handled, err := s.ForwardRPC(&rpcInfo, func(conn *grpc.ClientConn) error { | |||
logger.Trace("forwarding RPC") | |||
ctx := public.ForwardMetadataContext(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is critical to have the ACL token (and any other metadata from the incoming request) forwarded. This is due to the gRPC library using different keys to store the incoming and outgoing requests metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, good catch! I wonder if there's a way to make this happen with an interceptor, that wouldn't risk leaking tokens to any other gRPC APIs we integrate with in the future 🤔
@@ -257,6 +257,26 @@ func testACLServerWithConfig(t *testing.T, cb func(*Config), initReplicationToke | |||
return dir, srv, codec | |||
} | |||
|
|||
func testGRPCIntegrationServer(t *testing.T, cb func(*Config)) (*Server, *grpc.ClientConn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of self explanatory but the purpose here is to create a test server with a listening gRPC server configured for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Thanks for pulling this out 🙇🏻♂️
"github.com/hashicorp/consul/proto-public/pbconnectca" | ||
) | ||
|
||
func noopForwardRPC(structs.RPCInfo, func(*grpc.ClientConn) error) (bool, error) { | ||
return false, nil | ||
} | ||
|
||
func testStateStore(t *testing.T, publisher state.EventPublisher) *state.Store { | ||
func setupFSMAndPublisher(t *testing.T) (*testutils.FakeFSM, state.EventPublisher) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bulk of what used to be here was moved to a shared location to reduce some of the boilerplate.
97be3e4
to
fbbc823
Compare
fbbc823
to
30706b2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, nice job!
@@ -257,6 +257,26 @@ func testACLServerWithConfig(t *testing.T, cb func(*Config), initReplicationToke | |||
return dir, srv, codec | |||
} | |||
|
|||
func testGRPCIntegrationServer(t *testing.T, cb func(*Config)) (*Server, *grpc.ClientConn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Thanks for pulling this out 🙇🏻♂️
@@ -48,6 +48,7 @@ func (s *Server) Sign(ctx context.Context, req *pbconnectca.SignRequest) (*pbcon | |||
var rsp *pbconnectca.SignResponse | |||
handled, err := s.ForwardRPC(&rpcInfo, func(conn *grpc.ClientConn) error { | |||
logger.Trace("forwarding RPC") | |||
ctx := public.ForwardMetadataContext(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, good catch! I wonder if there's a way to make this happen with an interceptor, that wouldn't risk leaking tokens to any other gRPC APIs we integrate with in the future 🤔
// closed due to an ACL change, we'll attempt to re-authorize and resume it to | ||
// prevent unnecessarily terminating the stream. | ||
var idx uint64 | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for now (as I think it'd be premature) but if we ended up writing this event-consumer-loop for a third time then it'd probably be worth extracting 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥳
f858cdc
to
4a254d8
Compare
Just waiting for green tests again (had to regen the protobuf files after adding comments to make CI happy) |
🍒 If backport labels were added before merging, cherry-picking will start automatically. To retroactively trigger a backport after merging, add backport labels and re-run https://circleci.com/gh/hashicorp/consul/647772. |
This PR adds a ServerDiscovery.WatchServers gRPC endpoint.
Additionally I did a bit of refactoring to of gRPC test functionality to make sharing more of it possible.