diff --git a/agent/consul/fsm_data_store.go b/agent/consul/fsm_data_store.go new file mode 100644 index 000000000000..4bbdf6f24d7d --- /dev/null +++ b/agent/consul/fsm_data_store.go @@ -0,0 +1,77 @@ +package consul + +import ( + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/fsm" + "github.com/hashicorp/consul/agent/structs" +) + +// FSMDataStore implements the DataStore interface using the Consul server and finite state manager. +type FSMDataStore struct { + server *Server + fsm *fsm.FSM +} + +func NewFSMDataStore(server *Server, fsm *fsm.FSM) *FSMDataStore { + return &FSMDataStore{ + server: server, + fsm: fsm, + } +} + +// GetConfigEntry takes in a kind, name, and meta and returns a configentry and an error from the FSM state +func (f *FSMDataStore) GetConfigEntry(kind string, name string, meta *acl.EnterpriseMeta) (structs.ConfigEntry, error) { + store := f.fsm.State() + + _, entry, err := store.ConfigEntry(nil, kind, name, meta) + if err != nil { + return nil, err + } + return entry, nil +} + +// GetConfigEntriesByKind takes in a kind and returns all instances of that kind of config entry from the FSM state +func (f *FSMDataStore) GetConfigEntriesByKind(kind string) ([]structs.ConfigEntry, error) { + store := f.fsm.State() + + _, entries, err := store.ConfigEntriesByKind(nil, kind, acl.WildcardEnterpriseMeta()) + if err != nil { + return nil, err + } + return entries, nil +} + +// Update takes a config entry and upserts it in the FSM state +func (f *FSMDataStore) Update(entry structs.ConfigEntry) error { + _, err := f.server.leaderRaftApply("ConfigEntry.Apply", structs.ConfigEntryRequestType, &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsertCAS, + Entry: entry, + }) + return err +} + +// UpdateStatus takes a config entry, an error, and updates the status field as needed in the FSM state +func (f *FSMDataStore) UpdateStatus(entry structs.ControlledConfigEntry, err error) error { + if err == nil { + //TODO additional status messages for success? + return nil + } + status := structs.Status{ + Conditions: []structs.Condition{{ + + Status: err.Error() + ": Accepted == false", + }, + }, + } + entry.SetStatus(status) + return f.Update(entry) +} + +// Delete takes a config entry and deletes it from the FSM state +func (f *FSMDataStore) Delete(entry structs.ConfigEntry) error { + _, err := f.server.leaderRaftApply("ConfigEntry.Delete", structs.ConfigEntryRequestType, &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryDelete, + Entry: entry, + }) + return err +} diff --git a/agent/consul/gateways/controller_gateways.go b/agent/consul/gateways/controller_gateways.go index fc2ae3ed0d75..b60372f1da57 100644 --- a/agent/consul/gateways/controller_gateways.go +++ b/agent/consul/gateways/controller_gateways.go @@ -2,33 +2,201 @@ package gateways import ( "context" - - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/consul/agent/consul/controller" - "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-hclog" + "github.com/pkg/errors" ) type apiGatewayReconciler struct { - fsm *fsm.FSM logger hclog.Logger + store DataStore } -func (r apiGatewayReconciler) Reconcile(ctx context.Context, req controller.Request) error { - return nil -} - -func NewAPIGatewayController(fsm *fsm.FSM, publisher state.EventPublisher, logger hclog.Logger) controller.Controller { +// NewAPIGatewayController returns a new APIGateway controller +func NewAPIGatewayController(store DataStore, publisher state.EventPublisher, logger hclog.Logger) controller.Controller { reconciler := apiGatewayReconciler{ - fsm: fsm, logger: logger, + store: store, } - return controller.New(publisher, reconciler).Subscribe( + return controller.New(publisher, &reconciler).Subscribe( &stream.SubscribeRequest{ Topic: state.EventTopicAPIGateway, Subject: stream.SubjectWildcard, }, ) } + +// Reconcile takes in a controller request and ensures this api gateways corresponding BoundAPIGateway exists and is +// up to date +func (r *apiGatewayReconciler) Reconcile(ctx context.Context, req controller.Request) error { + + r.logger.Debug("started reconciling gateway", "gateway", req.Name) + + metaGateway, err := r.initGatewayMeta(req) + if err != nil { + return err + } else if metaGateway == nil { + //delete meta gateway + r.logger.Info("cleaning up deleted gateway object", "request", req) + if err := r.store.Delete(&structs.BoundAPIGatewayConfigEntry{ + Kind: structs.BoundAPIGateway, + Name: req.Name, + EnterpriseMeta: *req.Meta, + }); err != nil { + msg := "error cleaning up deleted gateway object" + r.logger.Error(msg, err) + return errors.Wrap(err, msg) + } + return nil + } + + r.ensureBoundGateway(metaGateway) + + routes, err := r.retrieveAllRoutesFromStore() + if err != nil { + return err + } + + boundGateways, routeErrors := BindRoutesToGateways([]*gatewayMeta{metaGateway}, routes...) + + //In this loop there should only be 1 bound gateway returned, but looping over all returned gateways + //to make sure nothing gets dropped and handle case where 0 gateways are returned + for _, boundGateway := range boundGateways { + // now update the gateway state + r.logger.Debug("persisting gateway state", "state", boundGateway) + if err := r.store.Update(boundGateway); err != nil { + msg := "error persisting state" + r.logger.Error(msg, "error", err) + return errors.Wrap(err, msg) + } + + // then update the gateway status + r.logger.Debug("persisting gateway status", "gateway", metaGateway.Gateway) + if err := r.store.UpdateStatus(metaGateway.Gateway, err); err != nil { + return err + } + } + + // and update the route statuses + for route, routeError := range routeErrors { + configEntry := r.resourceReferenceToBoundRoute(route) + r.logger.Error("route binding error:", routeError) + if err := r.store.UpdateStatus(configEntry, routeError); err != nil { + return err + } + } + + return nil +} + +func (r *apiGatewayReconciler) retrieveAllRoutesFromStore() ([]structs.BoundRoute, error) { + tcpRoutes, err := r.store.GetConfigEntriesByKind(structs.TCPRoute) + if err != nil { + return nil, err + } + + //TODO not implemented + //httpRoutes, err := r.store.GetConfigEntriesByKind(structs.HTTPRoute) + //if err != nil { + // return nil, err + //} + + routes := []structs.BoundRoute{} + for _, r := range tcpRoutes { + if r == nil { + continue + } + routes = append(routes, r.(*structs.TCPRouteConfigEntry)) + } + //TODO not implemented + //for _, r := range httpRoutes { + // routes = append(routes, r.(*structs.HTTPRouteConfigEntry)) + //} + return routes, nil +} + +func (r *apiGatewayReconciler) initGatewayMeta(req controller.Request) (*gatewayMeta, error) { + metaGateway := &gatewayMeta{} + + apiGateway, err := r.store.GetConfigEntry(req.Kind, req.Name, req.Meta) + if err != nil { + return nil, err + } + + if apiGateway == nil { + //gateway doesn't exist + return nil, nil + } + + metaGateway.Gateway = apiGateway.(*structs.APIGatewayConfigEntry) + + boundGateway, err := r.store.GetConfigEntry(structs.BoundAPIGateway, req.Name, req.Meta) + if err != nil { + return nil, err + } + + //initialize object, values get copied over in ensureBoundGateway if they don't exist + metaGateway.BoundGateway = boundGateway.(*structs.BoundAPIGatewayConfigEntry) + return metaGateway, nil +} + +func (r *apiGatewayReconciler) resourceReferenceToBoundRoute(ref structs.ResourceReference) structs.ControlledConfigEntry { + //TODO currently have to retrieve from the store to persist parent field on update call, is there a better way to do this? + boundRoute, err := r.store.GetConfigEntry(ref.Kind, ref.Name, &ref.EnterpriseMeta) + if err != nil { + return nil + } + + switch ref.Kind { + case structs.TCPRoute: + return boundRoute.(*structs.TCPRouteConfigEntry) + case structs.HTTPRoute: + return boundRoute.(*structs.HTTPRouteConfigEntry) + } + + return nil +} + +// ensureBoundGateway copies all relevant data from a gatewayMeta's APIGateway to BoundAPIGateway +func (r *apiGatewayReconciler) ensureBoundGateway(gw *gatewayMeta) { + if gw.BoundGateway == nil { + gw.BoundGateway = &structs.BoundAPIGatewayConfigEntry{ + Kind: structs.BoundAPIGateway, + Name: gw.Gateway.Name, + EnterpriseMeta: gw.Gateway.EnterpriseMeta, + } + } + + r.ensureListeners(gw) +} + +func (r *apiGatewayReconciler) ensureListeners(gw *gatewayMeta) { + + //rebuild the list from scratch, just copying over the ones that already exist + listeners := []structs.BoundAPIGatewayListener{} + for _, l := range gw.Gateway.Listeners { + boundListener := getBoundGatewayListener(l, gw.BoundGateway.Listeners) + if boundListener != nil { + //listener is already on gateway, copy onto our new list + listeners = append(listeners, *boundListener) + continue + } + //create new listener to add to our gateway + listeners = append(listeners, structs.BoundAPIGatewayListener{ + Name: l.Name, + }) + } + gw.BoundGateway.Listeners = listeners +} + +func getBoundGatewayListener(listener structs.APIGatewayListener, boundListeners []structs.BoundAPIGatewayListener) *structs.BoundAPIGatewayListener { + for _, bl := range boundListeners { + if bl.Name == listener.Name { + return &bl + } + } + return nil +} diff --git a/agent/consul/gateways/controller_gateways_test.go b/agent/consul/gateways/controller_gateways_test.go new file mode 100644 index 000000000000..06fa44d92681 --- /dev/null +++ b/agent/consul/gateways/controller_gateways_test.go @@ -0,0 +1,122 @@ +package gateways + +import ( + "context" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/controller" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/mock" + "testing" +) + +func Test_apiGatewayReconciler_Reconcile(t *testing.T) { + type fields struct { + logger hclog.Logger + store DataStore + } + type args struct { + ctx context.Context + req controller.Request + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "happy path - update available", + fields: fields{ + store: datastoreWithUpdate(t), + logger: hclog.Default(), + }, + args: args{ + ctx: context.Background(), + req: controller.Request{ + Kind: structs.APIGateway, + Name: "test-gateway", + Meta: acl.DefaultEnterpriseMeta(), + }, + }, + wantErr: false, + }, + { + name: "delete happy path", + fields: fields{ + store: datastoreWithDelete(t), + logger: hclog.Default(), + }, + args: args{ + ctx: context.Background(), + req: controller.Request{ + Kind: structs.APIGateway, + Name: "test-gateway", + Meta: acl.DefaultEnterpriseMeta(), + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := apiGatewayReconciler{ + logger: tt.fields.logger, + store: tt.fields.store, + } + if err := r.Reconcile(tt.args.ctx, tt.args.req); (err != nil) != tt.wantErr { + t.Errorf("Reconcile() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func datastoreWithUpdate(t *testing.T) *MockDataStore { + ds := NewMockDataStore(t) + ds.On("GetConfigEntry", structs.APIGateway, mock.Anything, mock.Anything).Return(&structs.APIGatewayConfigEntry{ + Kind: structs.APIGateway, + Name: "test-gateway", + Listeners: []structs.APIGatewayListener{ + { + Name: "test-listener", + Protocol: "tcp", + Port: 8080, + }, + }, + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), + }, nil) + ds.On("GetConfigEntry", structs.BoundAPIGateway, mock.Anything, mock.Anything).Return( + &structs.BoundAPIGatewayConfigEntry{ + Kind: structs.BoundAPIGateway, + Name: "test-gateway", + Listeners: []structs.BoundAPIGatewayListener{}, + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), + }, nil) + + ds.On("GetConfigEntriesByKind", structs.TCPRoute).Return([]structs.ConfigEntry{ + &structs.TCPRouteConfigEntry{ + Kind: structs.TCPRoute, + Name: "test-route", + Parents: []structs.ResourceReference{ + { + Kind: structs.APIGateway, + Name: "test-gateway", + SectionName: "test-listener", + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), + }, + }, + EnterpriseMeta: *acl.DefaultEnterpriseMeta(), + }, + }, nil) + + ds.On("Update", mock.Anything).Return(nil) + ds.On("UpdateStatus", mock.Anything, mock.Anything).Return(nil) + return ds +} + +func datastoreWithDelete(t *testing.T) *MockDataStore { + ds := NewMockDataStore(t) + ds.On("GetConfigEntry", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + ds.On("Delete", mock.Anything).Return(nil) + return ds +} diff --git a/agent/consul/gateways/datastore.go b/agent/consul/gateways/datastore.go new file mode 100644 index 000000000000..a05f03e577f6 --- /dev/null +++ b/agent/consul/gateways/datastore.go @@ -0,0 +1,15 @@ +package gateways + +import ( + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/structs" +) + +//go:generate mockery --name DataStore --inpackage +type DataStore interface { + GetConfigEntry(kind string, name string, meta *acl.EnterpriseMeta) (structs.ConfigEntry, error) + GetConfigEntriesByKind(kind string) ([]structs.ConfigEntry, error) + Update(entry structs.ConfigEntry) error + UpdateStatus(entry structs.ControlledConfigEntry, err error) error + Delete(entry structs.ConfigEntry) error +} diff --git a/agent/consul/gateways/mock_DataStore.go b/agent/consul/gateways/mock_DataStore.go new file mode 100644 index 000000000000..ccc57cfbe306 --- /dev/null +++ b/agent/consul/gateways/mock_DataStore.go @@ -0,0 +1,115 @@ +// Code generated by mockery v2.12.2. DO NOT EDIT. + +package gateways + +import ( + acl "github.com/hashicorp/consul/acl" + mock "github.com/stretchr/testify/mock" + + structs "github.com/hashicorp/consul/agent/structs" + + testing "testing" +) + +// MockDataStore is an autogenerated mock type for the DataStore type +type MockDataStore struct { + mock.Mock +} + +// Delete provides a mock function with given fields: entry +func (_m *MockDataStore) Delete(entry structs.ConfigEntry) error { + ret := _m.Called(entry) + + var r0 error + if rf, ok := ret.Get(0).(func(structs.ConfigEntry) error); ok { + r0 = rf(entry) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetConfigEntriesByKind provides a mock function with given fields: kind +func (_m *MockDataStore) GetConfigEntriesByKind(kind string) ([]structs.ConfigEntry, error) { + ret := _m.Called(kind) + + var r0 []structs.ConfigEntry + if rf, ok := ret.Get(0).(func(string) []structs.ConfigEntry); ok { + r0 = rf(kind) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]structs.ConfigEntry) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(kind) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetConfigEntry provides a mock function with given fields: kind, name, meta +func (_m *MockDataStore) GetConfigEntry(kind string, name string, meta *acl.EnterpriseMeta) (structs.ConfigEntry, error) { + ret := _m.Called(kind, name, meta) + + var r0 structs.ConfigEntry + if rf, ok := ret.Get(0).(func(string, string, *acl.EnterpriseMeta) structs.ConfigEntry); ok { + r0 = rf(kind, name, meta) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(structs.ConfigEntry) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, string, *acl.EnterpriseMeta) error); ok { + r1 = rf(kind, name, meta) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Update provides a mock function with given fields: entry +func (_m *MockDataStore) Update(entry structs.ConfigEntry) error { + ret := _m.Called(entry) + + var r0 error + if rf, ok := ret.Get(0).(func(structs.ConfigEntry) error); ok { + r0 = rf(entry) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UpdateStatus provides a mock function with given fields: entry, err +func (_m *MockDataStore) UpdateStatus(entry structs.ControlledConfigEntry, err error) error { + ret := _m.Called(entry, err) + + var r0 error + if rf, ok := ret.Get(0).(func(structs.ControlledConfigEntry, error) error); ok { + r0 = rf(entry, err) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewMockDataStore creates a new instance of MockDataStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockDataStore(t testing.TB) *MockDataStore { + mock := &MockDataStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/consul/leader_connect.go b/agent/consul/leader_connect.go index 42bdd4313ba3..c6eea4fb3e4b 100644 --- a/agent/consul/leader_connect.go +++ b/agent/consul/leader_connect.go @@ -76,7 +76,8 @@ func (s *Server) runConfigEntryControllers(ctx context.Context) error { group.Go(func() error { logger := s.logger.Named(logging.APIGatewayController) - return gateways.NewAPIGatewayController(s.fsm, s.publisher, logger).Run(ctx) + datastore := NewFSMDataStore(s, s.fsm) + return gateways.NewAPIGatewayController(datastore, s.publisher, logger).Run(ctx) }) group.Go(func() error {