Skip to content

Commit

Permalink
EventPublisher subscriptions for Consul Native API Gateway (#15757)
Browse files Browse the repository at this point in the history
* Create new event topics in subscribe proto
* Add tests for PBSubscribe func
* Make configs singular, add all configs to PBToStreamSubscribeRequest
* Add snapshot methods
* Add config_entry_events tests
* Add config entry kind to topic for new configs
* Add unit tests for snapshot methods
* Start adding integration test
* Test using the new controller code
* Update agent/consul/state/config_entry_events.go
Co-authored-by: Nathan Coleman <[email protected]>
* Check value of error
Co-authored-by: Nathan Coleman <[email protected]>
  • Loading branch information
Thomas Eckert authored Dec 20, 2022
1 parent cf33cd8 commit 4889a0e
Show file tree
Hide file tree
Showing 9 changed files with 797 additions and 32 deletions.
146 changes: 145 additions & 1 deletion agent/consul/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ LOOP:
}
}

// since we only modified each entry once, we should have exactly 200 reconcliation calls
// since we only modified each entry once, we should have exactly 200 reconciliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
Expand Down Expand Up @@ -271,3 +271,147 @@ func TestBasicController_RunPanicAssertions(t *testing.T) {
controller.WithQueueFactory(RunWorkQueue)
})
}

func TestConfigEntrySubscriptions(t *testing.T) {
t.Parallel()

cases := map[string]struct {
configEntry func(string) structs.ConfigEntry
topic stream.Topic
kind string
}{
"Subscribe to Service Resolver Config Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: name,
}
},
topic: state.EventTopicServiceResolver,
kind: structs.ServiceResolver,
},
"Subscribe to Ingress Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: name,
}
},
topic: state.EventTopicIngressGateway,
kind: structs.IngressGateway,
},
"Subscribe to Service Intentions Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.ServiceIntentionsConfigEntry{
Kind: structs.ServiceIntentions,
Name: name,
}
},
topic: state.EventTopicServiceIntentions,
kind: structs.ServiceIntentions,
},
"Subscribe to API Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.APIGatewayConfigEntry{
Kind: structs.APIGateway,
Name: name,
}
},
topic: state.EventTopicAPIGateway,
kind: structs.APIGateway,
},
"Subscribe to Inline Certificate Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.InlineCertificateConfigEntry{
Kind: structs.InlineCertificate,
Name: name,
}
},
topic: state.EventTopicInlineCertificate,
kind: structs.InlineCertificate,
},
"Subscribe to HTTP Route Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.HTTPRouteConfigEntry{
Kind: structs.HTTPRoute,
Name: name,
}
},
topic: state.EventTopicHTTPRoute,
kind: structs.HTTPRoute,
},
"Subscribe to TCP Route Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.TCPRouteConfigEntry{
Kind: structs.TCPRoute,
Name: name,
}
},
topic: state.EventTopicTCPRoute,
kind: structs.TCPRoute,
},
"Subscribe to Bound API Gateway Changes": {
configEntry: func(name string) structs.ConfigEntry {
return &structs.BoundAPIGatewayConfigEntry{
Kind: structs.BoundAPIGateway,
Name: name,
}
},
topic: state.EventTopicBoundAPIGateway,
kind: structs.BoundAPIGateway,
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

reconciler := newTestReconciler(false)

publisher := stream.NewEventPublisher(1 * time.Millisecond)
go publisher.Run(ctx)

// get the store through the FSM since the publisher handlers get registered through it
store := fsm.NewFromDeps(fsm.Deps{
Logger: hclog.New(nil),
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
}).State()

for i := 0; i < 200; i++ {
entryIndex := uint64(i + 1)
name := fmt.Sprintf("foo-%d", i)
require.NoError(t, store.EnsureConfigEntry(entryIndex, tc.configEntry(name)))
}

go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
Topic: tc.topic,
Subject: stream.SubjectWildcard,
}).WithWorkers(10).Run(ctx)

received := []string{}
LOOP:
for {
select {
case request := <-reconciler.received:
require.Equal(t, tc.kind, request.Kind)
received = append(received, request.Name)
if len(received) == 200 {
break LOOP
}
case <-ctx.Done():
break LOOP
}
}

// since we only modified each entry once, we should have exactly 200 reconciliation calls
require.Len(t, received, 200)
for i := 0; i < 200; i++ {
require.Contains(t, received, fmt.Sprintf("foo-%d", i))
}
})
}
}
35 changes: 35 additions & 0 deletions agent/consul/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,4 +342,39 @@ func (c *FSM) registerStreamSnapshotHandlers() {
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().APIGatewaySnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicInlineCertificate, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().InlineCertificateSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicHTTPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().HTTPRouteSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicTCPRoute, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().TCPRouteSnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}

err = c.deps.Publisher.RegisterHandler(state.EventTopicBoundAPIGateway, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return c.State().BoundAPIGatewaySnapshot(req, buf)
}, true)
if err != nil {
panic(fmt.Errorf("fatal error encountered registering streaming snapshot handlers: %w", err))
}
}
35 changes: 35 additions & 0 deletions agent/consul/state/config_entry_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ var configEntryKindToTopic = map[string]stream.Topic{
structs.IngressGateway: EventTopicIngressGateway,
structs.ServiceIntentions: EventTopicServiceIntentions,
structs.ServiceDefaults: EventTopicServiceDefaults,
structs.APIGateway: EventTopicAPIGateway,
structs.TCPRoute: EventTopicTCPRoute,
structs.HTTPRoute: EventTopicHTTPRoute,
structs.InlineCertificate: EventTopicInlineCertificate,
structs.BoundAPIGateway: EventTopicBoundAPIGateway,
}

// EventSubjectConfigEntry is a stream.Subject used to route and receive events
Expand Down Expand Up @@ -117,6 +122,36 @@ func (s *Store) ServiceDefaultsSnapshot(req stream.SubscribeRequest, buf stream.
return s.configEntrySnapshot(structs.ServiceDefaults, req, buf)
}

// APIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of
// api-gateway config entries.
func (s *Store) APIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.APIGateway, req, buf)
}

// TCPRouteSnapshot is a stream.SnapshotFunc that returns a snapshot of
// tcp-route config entries.
func (s *Store) TCPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.TCPRoute, req, buf)
}

// HTTPRouteSnapshot is a stream.SnapshotFunc that retuns a snapshot of
// http-route config entries.
func (s *Store) HTTPRouteSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.HTTPRoute, req, buf)
}

// InlineCertificateSnapshot is a stream.SnapshotFunc that returns a snapshot of
// inline-certificate config entries.
func (s *Store) InlineCertificateSnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.InlineCertificate, req, buf)
}

// BoundAPIGatewaySnapshot is a stream.SnapshotFunc that returns a snapshot of
// bound-api-gateway config entries.
func (s *Store) BoundAPIGatewaySnapshot(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
return s.configEntrySnapshot(structs.BoundAPIGateway, req, buf)
}

func (s *Store) configEntrySnapshot(kind string, req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
var (
idx uint64
Expand Down
Loading

0 comments on commit 4889a0e

Please sign in to comment.