-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
API Gateway Controller Logic #16058
API Gateway Controller Logic #16058
Changes from all commits
33cec65
10c8eb8
b4ca767
c10faa7
644e666
dbea0a6
53822c5
f441909
f48376e
31e0015
5af8918
33156c8
805c69f
e2d9aaf
76861dd
179f576
37a51e6
8521fd8
38e9201
9499222
da01a33
6687a5a
d75af0b
a902d60
532c9b4
0b3f968
4476082
b1ea3fd
a44341e
ccaca56
37573de
1b4368a
2c77bc5
026e56b
c3ffcf2
550603e
d5f374d
0de5fd8
1af1d2d
6af984e
7bebafb
5b0163f
e3d5e09
e0803c8
e0ae7a0
73a7c21
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package consul | ||
|
||
import ( | ||
"github.com/hashicorp/consul/acl" | ||
"github.com/hashicorp/consul/agent/consul/fsm" | ||
"github.com/hashicorp/consul/agent/structs" | ||
) | ||
|
||
// FSMDataStore implements the DataStore interface using the Consul server and finite state manager. | ||
type FSMDataStore struct { | ||
server *Server | ||
fsm *fsm.FSM | ||
} | ||
|
||
func NewFSMDataStore(server *Server, fsm *fsm.FSM) *FSMDataStore { | ||
return &FSMDataStore{ | ||
server: server, | ||
fsm: fsm, | ||
} | ||
} | ||
|
||
// GetConfigEntry takes in a kind, name, and meta and returns a configentry and an error from the FSM state | ||
func (f *FSMDataStore) GetConfigEntry(kind string, name string, meta *acl.EnterpriseMeta) (structs.ConfigEntry, error) { | ||
store := f.fsm.State() | ||
|
||
_, entry, err := store.ConfigEntry(nil, kind, name, meta) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return entry, nil | ||
} | ||
|
||
// GetConfigEntriesByKind takes in a kind and returns all instances of that kind of config entry from the FSM state | ||
func (f *FSMDataStore) GetConfigEntriesByKind(kind string) ([]structs.ConfigEntry, error) { | ||
store := f.fsm.State() | ||
|
||
_, entries, err := store.ConfigEntriesByKind(nil, kind, acl.WildcardEnterpriseMeta()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return entries, nil | ||
} | ||
|
||
// Update takes a config entry and upserts it in the FSM state | ||
func (f *FSMDataStore) Update(entry structs.ConfigEntry) error { | ||
_, err := f.server.leaderRaftApply("ConfigEntry.Apply", structs.ConfigEntryRequestType, &structs.ConfigEntryRequest{ | ||
Op: structs.ConfigEntryUpsertCAS, | ||
Entry: entry, | ||
}) | ||
return err | ||
} | ||
|
||
// UpdateStatus takes a config entry, an error, and updates the status field as needed in the FSM state | ||
func (f *FSMDataStore) UpdateStatus(entry structs.ControlledConfigEntry, err error) error { | ||
if err == nil { | ||
//TODO additional status messages for success? | ||
return nil | ||
} | ||
status := structs.Status{ | ||
Conditions: []structs.Condition{{ | ||
|
||
Status: err.Error() + ": Accepted == false", | ||
}, | ||
}, | ||
} | ||
entry.SetStatus(status) | ||
return f.Update(entry) | ||
} | ||
|
||
// Delete takes a config entry and deletes it from the FSM state | ||
func (f *FSMDataStore) Delete(entry structs.ConfigEntry) error { | ||
_, err := f.server.leaderRaftApply("ConfigEntry.Delete", structs.ConfigEntryRequestType, &structs.ConfigEntryRequest{ | ||
Op: structs.ConfigEntryDelete, | ||
Entry: entry, | ||
}) | ||
return err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,33 +2,201 @@ package gateways | |
|
||
import ( | ||
"context" | ||
|
||
"github.com/hashicorp/go-hclog" | ||
|
||
"github.com/hashicorp/consul/agent/consul/controller" | ||
"github.com/hashicorp/consul/agent/consul/fsm" | ||
"github.com/hashicorp/consul/agent/consul/state" | ||
"github.com/hashicorp/consul/agent/consul/stream" | ||
"github.com/hashicorp/consul/agent/structs" | ||
"github.com/hashicorp/go-hclog" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
type apiGatewayReconciler struct { | ||
fsm *fsm.FSM | ||
logger hclog.Logger | ||
store DataStore | ||
} | ||
|
||
func (r apiGatewayReconciler) Reconcile(ctx context.Context, req controller.Request) error { | ||
return nil | ||
} | ||
|
||
func NewAPIGatewayController(fsm *fsm.FSM, publisher state.EventPublisher, logger hclog.Logger) controller.Controller { | ||
// NewAPIGatewayController returns a new APIGateway controller | ||
func NewAPIGatewayController(store DataStore, publisher state.EventPublisher, logger hclog.Logger) controller.Controller { | ||
reconciler := apiGatewayReconciler{ | ||
fsm: fsm, | ||
logger: logger, | ||
store: store, | ||
} | ||
return controller.New(publisher, reconciler).Subscribe( | ||
return controller.New(publisher, &reconciler).Subscribe( | ||
&stream.SubscribeRequest{ | ||
Topic: state.EventTopicAPIGateway, | ||
Subject: stream.SubjectWildcard, | ||
}, | ||
) | ||
} | ||
|
||
// Reconcile takes in a controller request and ensures this api gateways corresponding BoundAPIGateway exists and is | ||
// up to date | ||
func (r *apiGatewayReconciler) Reconcile(ctx context.Context, req controller.Request) error { | ||
|
||
r.logger.Debug("started reconciling gateway", "gateway", req.Name) | ||
|
||
metaGateway, err := r.initGatewayMeta(req) | ||
if err != nil { | ||
return err | ||
} else if metaGateway == nil { | ||
//delete meta gateway | ||
r.logger.Info("cleaning up deleted gateway object", "request", req) | ||
if err := r.store.Delete(&structs.BoundAPIGatewayConfigEntry{ | ||
Kind: structs.BoundAPIGateway, | ||
Name: req.Name, | ||
EnterpriseMeta: *req.Meta, | ||
}); err != nil { | ||
msg := "error cleaning up deleted gateway object" | ||
r.logger.Error(msg, err) | ||
return errors.Wrap(err, msg) | ||
} | ||
return nil | ||
} | ||
|
||
r.ensureBoundGateway(metaGateway) | ||
|
||
routes, err := r.retrieveAllRoutesFromStore() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
boundGateways, routeErrors := BindRoutesToGateways([]*gatewayMeta{metaGateway}, routes...) | ||
|
||
//In this loop there should only be 1 bound gateway returned, but looping over all returned gateways | ||
//to make sure nothing gets dropped and handle case where 0 gateways are returned | ||
for _, boundGateway := range boundGateways { | ||
// now update the gateway state | ||
r.logger.Debug("persisting gateway state", "state", boundGateway) | ||
if err := r.store.Update(boundGateway); err != nil { | ||
msg := "error persisting state" | ||
r.logger.Error(msg, "error", err) | ||
return errors.Wrap(err, msg) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! I always use |
||
} | ||
|
||
// then update the gateway status | ||
r.logger.Debug("persisting gateway status", "gateway", metaGateway.Gateway) | ||
if err := r.store.UpdateStatus(metaGateway.Gateway, err); err != nil { | ||
return err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this case worth logging? |
||
} | ||
} | ||
|
||
// and update the route statuses | ||
for route, routeError := range routeErrors { | ||
configEntry := r.resourceReferenceToBoundRoute(route) | ||
r.logger.Error("route binding error:", routeError) | ||
if err := r.store.UpdateStatus(configEntry, routeError); err != nil { | ||
return err | ||
nathancoleman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (r *apiGatewayReconciler) retrieveAllRoutesFromStore() ([]structs.BoundRoute, error) { | ||
tcpRoutes, err := r.store.GetConfigEntriesByKind(structs.TCPRoute) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
//TODO not implemented | ||
//httpRoutes, err := r.store.GetConfigEntriesByKind(structs.HTTPRoute) | ||
//if err != nil { | ||
// return nil, err | ||
//} | ||
|
||
routes := []structs.BoundRoute{} | ||
for _, r := range tcpRoutes { | ||
if r == nil { | ||
continue | ||
} | ||
routes = append(routes, r.(*structs.TCPRouteConfigEntry)) | ||
} | ||
//TODO not implemented | ||
//for _, r := range httpRoutes { | ||
// routes = append(routes, r.(*structs.HTTPRouteConfigEntry)) | ||
//} | ||
return routes, nil | ||
} | ||
|
||
func (r *apiGatewayReconciler) initGatewayMeta(req controller.Request) (*gatewayMeta, error) { | ||
metaGateway := &gatewayMeta{} | ||
|
||
apiGateway, err := r.store.GetConfigEntry(req.Kind, req.Name, req.Meta) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if apiGateway == nil { | ||
//gateway doesn't exist | ||
return nil, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: would this technically be an error condition because we're trying to init a gateway that should be deleted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My logic was that it being nil is a valid state for it to be in the store, but if it would be clearer to return an error I'm happy to do it that way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think "return |
||
} | ||
|
||
metaGateway.Gateway = apiGateway.(*structs.APIGatewayConfigEntry) | ||
|
||
boundGateway, err := r.store.GetConfigEntry(structs.BoundAPIGateway, req.Name, req.Meta) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
//initialize object, values get copied over in ensureBoundGateway if they don't exist | ||
metaGateway.BoundGateway = boundGateway.(*structs.BoundAPIGatewayConfigEntry) | ||
return metaGateway, nil | ||
} | ||
|
||
func (r *apiGatewayReconciler) resourceReferenceToBoundRoute(ref structs.ResourceReference) structs.ControlledConfigEntry { | ||
//TODO currently have to retrieve from the store to persist parent field on update call, is there a better way to do this? | ||
nathancoleman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
boundRoute, err := r.store.GetConfigEntry(ref.Kind, ref.Name, &ref.EnterpriseMeta) | ||
if err != nil { | ||
return nil | ||
} | ||
|
||
switch ref.Kind { | ||
case structs.TCPRoute: | ||
return boundRoute.(*structs.TCPRouteConfigEntry) | ||
case structs.HTTPRoute: | ||
return boundRoute.(*structs.HTTPRouteConfigEntry) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ensureBoundGateway copies all relevant data from a gatewayMeta's APIGateway to BoundAPIGateway | ||
func (r *apiGatewayReconciler) ensureBoundGateway(gw *gatewayMeta) { | ||
if gw.BoundGateway == nil { | ||
gw.BoundGateway = &structs.BoundAPIGatewayConfigEntry{ | ||
Kind: structs.BoundAPIGateway, | ||
Name: gw.Gateway.Name, | ||
EnterpriseMeta: gw.Gateway.EnterpriseMeta, | ||
} | ||
} | ||
|
||
r.ensureListeners(gw) | ||
} | ||
|
||
func (r *apiGatewayReconciler) ensureListeners(gw *gatewayMeta) { | ||
|
||
//rebuild the list from scratch, just copying over the ones that already exist | ||
listeners := []structs.BoundAPIGatewayListener{} | ||
for _, l := range gw.Gateway.Listeners { | ||
boundListener := getBoundGatewayListener(l, gw.BoundGateway.Listeners) | ||
if boundListener != nil { | ||
//listener is already on gateway, copy onto our new list | ||
listeners = append(listeners, *boundListener) | ||
continue | ||
} | ||
//create new listener to add to our gateway | ||
listeners = append(listeners, structs.BoundAPIGatewayListener{ | ||
Name: l.Name, | ||
}) | ||
} | ||
gw.BoundGateway.Listeners = listeners | ||
} | ||
|
||
func getBoundGatewayListener(listener structs.APIGatewayListener, boundListeners []structs.BoundAPIGatewayListener) *structs.BoundAPIGatewayListener { | ||
for _, bl := range boundListeners { | ||
if bl.Name == listener.Name { | ||
return &bl | ||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there ever a case where more than one could be returned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe there should be in this particular controller since only 1 gateway is being passed in. More likely what will happen is 0 boundGateways and multiple errors.