From ec78941b0123ddc9880bedb4928532541aa1b10c Mon Sep 17 00:00:00 2001 From: Steve Sloka Date: Thu, 18 Jun 2020 00:01:21 -0400 Subject: [PATCH] cmd/contour: replace current xds server with go-control-plane impl Fixes #2134 by replacing the current XDS server implementation with the envoyproxy/go-control-plane impl. Signed-off-by: Steve Sloka --- cmd/contour/serve.go | 80 +++++---- go.mod | 1 - internal/contour/cachehandler.go | 4 + internal/contour/cluster.go | 28 +--- internal/contour/cond.go | 94 ----------- internal/contour/cond_test.go | 108 ------------ internal/contour/endpointstranslator.go | 32 +--- internal/contour/listener.go | 35 +--- internal/contour/route.go | 37 +---- internal/contour/secret.go | 42 +++-- internal/contour/snapshot.go | 32 ++++ internal/grpc/server.go | 90 +--------- internal/grpc/xds.go | 165 ------------------- internal/grpc/xds_test.go | 209 ------------------------ internal/protobuf/helpers.go | 8 +- 15 files changed, 128 insertions(+), 837 deletions(-) delete mode 100644 internal/contour/cond.go delete mode 100644 internal/contour/cond_test.go create mode 100644 internal/contour/snapshot.go delete mode 100644 internal/grpc/xds.go delete mode 100644 internal/grpc/xds_test.go diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index 1ed6b45dcbe..85e2cfc9814 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -14,6 +14,7 @@ package main import ( + "context" "fmt" "net" "net/http" @@ -23,6 +24,10 @@ import ( "syscall" "time" + api "github.com/envoyproxy/go-control-plane/envoy/api/v2" + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "github.com/envoyproxy/go-control-plane/pkg/cache/v2" + xds "github.com/envoyproxy/go-control-plane/pkg/server/v2" "github.com/projectcontour/contour/internal/annotation" "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/dag" @@ -169,26 +174,30 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { return err } + // Setup snapshot cache + snapshotCache := cache.NewSnapshotCache(true, cache.IDHash{}, nil) + + ch := &contour.CacheHandler{ + ListenerVisitorConfig: contour.ListenerVisitorConfig{ + UseProxyProto: ctx.useProxyProto, + HTTPAddress: ctx.httpAddr, + HTTPPort: ctx.httpPort, + HTTPAccessLog: ctx.httpAccessLog, + HTTPSAddress: ctx.httpsAddr, + HTTPSPort: ctx.httpsPort, + HTTPSAccessLog: ctx.httpsAccessLog, + AccessLogType: ctx.AccessLogFormat, + AccessLogFields: ctx.AccessLogFields, + MinimumProtocolVersion: annotation.MinProtoVersion(ctx.TLSConfig.MinimumProtocolVersion), + RequestTimeout: ctx.RequestTimeout, + }, + ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort), + FieldLogger: log.WithField("context", "CacheHandler"), + Metrics: metrics.NewMetrics(registry), + } // step 3. build our mammoth Kubernetes event handler. eventHandler := &contour.EventHandler{ - CacheHandler: &contour.CacheHandler{ - ListenerVisitorConfig: contour.ListenerVisitorConfig{ - UseProxyProto: ctx.useProxyProto, - HTTPAddress: ctx.httpAddr, - HTTPPort: ctx.httpPort, - HTTPAccessLog: ctx.httpAccessLog, - HTTPSAddress: ctx.httpsAddr, - HTTPSPort: ctx.httpsPort, - HTTPSAccessLog: ctx.httpsAccessLog, - AccessLogType: ctx.AccessLogFormat, - AccessLogFields: ctx.AccessLogFields, - MinimumProtocolVersion: annotation.MinProtoVersion(ctx.TLSConfig.MinimumProtocolVersion), - RequestTimeout: ctx.RequestTimeout, - }, - ListenerCache: contour.NewListenerCache(ctx.statsAddr, ctx.statsPort), - FieldLogger: log.WithField("context", "CacheHandler"), - Metrics: metrics.NewMetrics(registry), - }, + CacheHandler: ch, HoldoffDelay: 100 * time.Millisecond, HoldoffMaxDelay: 500 * time.Millisecond, Builder: dag.Builder{ @@ -248,6 +257,15 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { FieldLogger: log.WithField("context", "endpointstranslator"), } + snapshotHandler := &contour.SnapshotHandler{ + CacheHandler: ch, + EndpointsTranslator: et, + SnapshotCache: snapshotCache, + } + + ch.SnapshotHandler = snapshotHandler + et.SnapshotHandler = snapshotHandler + informerSyncList.InformOnResources(clusterInformerFactory, &k8s.DynamicClientHandler{ Next: &contour.EventRecorder{ @@ -373,15 +391,17 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { } log.Printf("informer caches synced") - resources := map[string]cgrpc.Resource{ - eventHandler.CacheHandler.ClusterCache.TypeURL(): &eventHandler.CacheHandler.ClusterCache, - eventHandler.CacheHandler.RouteCache.TypeURL(): &eventHandler.CacheHandler.RouteCache, - eventHandler.CacheHandler.ListenerCache.TypeURL(): &eventHandler.CacheHandler.ListenerCache, - eventHandler.CacheHandler.SecretCache.TypeURL(): &eventHandler.CacheHandler.SecretCache, - et.TypeURL(): et, - } opts := ctx.grpcOptions() - s := cgrpc.NewAPI(log, resources, registry, opts...) + grpcServer := cgrpc.NewAPI(registry, opts...) + + xdsServer := xds.NewServer(context.Background(), snapshotCache, nil) + + discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, xdsServer) + api.RegisterEndpointDiscoveryServiceServer(grpcServer, xdsServer) + api.RegisterClusterDiscoveryServiceServer(grpcServer, xdsServer) + api.RegisterRouteDiscoveryServiceServer(grpcServer, xdsServer) + api.RegisterListenerDiscoveryServiceServer(grpcServer, xdsServer) + addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort)) l, err := net.Listen("tcp", addr) if err != nil { @@ -393,15 +413,15 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { log = log.WithField("insecure", true) } - log.Info("started xDS server") - defer log.Info("stopped xDS server") + log.Info("started xDS xdsServer") + defer log.Info("stopped xDS xdsServer") go func() { <-stop - s.Stop() + grpcServer.Stop() }() - return s.Serve(l) + return grpcServer.Serve(l) }) // step 14. Setup SIGTERM handler diff --git a/go.mod b/go.mod index ea65af07211..cb7419bb7a6 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 // indirect github.com/client9/misspell v0.3.4 github.com/envoyproxy/go-control-plane v0.9.5 - github.com/evanphx/json-patch v4.5.0+incompatible github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect github.com/golang/protobuf v1.3.2 github.com/google/go-cmp v0.4.0 diff --git a/internal/contour/cachehandler.go b/internal/contour/cachehandler.go index 7fe1e097a80..1149956e333 100644 --- a/internal/contour/cachehandler.go +++ b/internal/contour/cachehandler.go @@ -33,6 +33,8 @@ type CacheHandler struct { ClusterCache SecretCache + SnapshotHandler *SnapshotHandler + *metrics.Metrics logrus.FieldLogger @@ -47,6 +49,8 @@ func (ch *CacheHandler) OnChange(dag *dag.DAG) { ch.updateRoutes(dag) ch.updateClusters(dag) + ch.SnapshotHandler.UpdateSnapshot() + ch.SetDAGLastRebuilt(time.Now()) } diff --git a/internal/contour/cluster.go b/internal/contour/cluster.go index 1921e67de48..603f8b63938 100644 --- a/internal/contour/cluster.go +++ b/internal/contour/cluster.go @@ -17,10 +17,8 @@ import ( "sort" "sync" - resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" - v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/golang/protobuf/proto" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" "github.com/projectcontour/contour/internal/protobuf" @@ -31,7 +29,6 @@ import ( type ClusterCache struct { mu sync.Mutex values map[string]*v2.Cluster - Cond } // Update replaces the contents of the cache with the supplied map. @@ -40,11 +37,10 @@ func (c *ClusterCache) Update(v map[string]*v2.Cluster) { defer c.mu.Unlock() c.values = v - c.Cond.Notify() } // Contents returns a copy of the cache's contents. -func (c *ClusterCache) Contents() []proto.Message { +func (c *ClusterCache) Contents() []types.Resource { c.mu.Lock() defer c.mu.Unlock() var values []*v2.Cluster @@ -55,26 +51,6 @@ func (c *ClusterCache) Contents() []proto.Message { return protobuf.AsMessages(values) } -func (c *ClusterCache) Query(names []string) []proto.Message { - c.mu.Lock() - defer c.mu.Unlock() - var values []*v2.Cluster - for _, n := range names { - // if the cluster is not registered we cannot return - // a blank cluster because each cluster has a required - // discovery type; DNS, EDS, etc. We cannot determine the - // correct value for this property from the cluster's name - // provided by the query so we must not return a blank cluster. - if v, ok := c.values[n]; ok { - values = append(values, v) - } - } - sort.Stable(sorter.For(values)) - return protobuf.AsMessages(values) -} - -func (*ClusterCache) TypeURL() string { return resource.ClusterType } - type clusterVisitor struct { clusters map[string]*v2.Cluster } diff --git a/internal/contour/cond.go b/internal/contour/cond.go deleted file mode 100644 index 11d037d73b3..00000000000 --- a/internal/contour/cond.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright © 2019 VMware -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package contour - -import "sync" - -// Cond implements a condition variable, a rendezvous point for goroutines -// waiting for or announcing the ocurence of an event. -// -// Unlike sync.Cond, Cond communciates with waiters via channels registered by -// the waiters. This permits goroutines to wait on Cond events using select. -type Cond struct { - mu sync.Mutex - waiters []waiter - last int -} - -type waiter struct { - ch chan int - hints []string -} - -// Register registers ch to receive a value when Notify is called. -// The value of last is the count of the times Notify has been called on this Cond. -// It functions of a sequence counter, if the value of last supplied to Register -// is less than the Conds internal counter, then the caller has missed at least -// one notification and will fire immediately. -// -// Sends by the broadcaster to ch must not block, therefore ch must have a capacity -// of at least 1. -func (c *Cond) Register(ch chan int, last int, hints ...string) { - c.mu.Lock() - defer c.mu.Unlock() - - if last < c.last { - // notify this channel immediately - ch <- c.last - return - } - c.waiters = append(c.waiters, waiter{ - ch: ch, - hints: hints, - }) -} - -// Notify notifies all interested waiters that an event has ocured. -func (c *Cond) Notify(hints ...string) { - c.mu.Lock() - defer c.mu.Unlock() - c.last++ - - notify := c.waiters - c.waiters = nil - - for _, waiter := range notify { - if len(hints) == 0 { - // notify unconditionally - waiter.ch <- c.last - continue - } - if intersection(hints, waiter.hints) { - // one of the hints registered has been notified - waiter.ch <- c.last - continue - } - - // not notified this time, put back on the list - c.waiters = append(c.waiters, waiter) - } -} - -// intersection returns true if the set of elements in left -// intersects with the set in right. -func intersection(left, right []string) bool { - for _, l := range left { - for _, r := range right { - if l == r { - return true - } - } - } - return false -} diff --git a/internal/contour/cond_test.go b/internal/contour/cond_test.go deleted file mode 100644 index 823e8f2fd12..00000000000 --- a/internal/contour/cond_test.go +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright © 2019 VMware -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package contour - -import "testing" - -func TestCondRegisterBeforeNotifyShouldNotBroadcast(t *testing.T) { - var c Cond - ch := make(chan int, 1) - c.Register(ch, 0) - select { - case <-ch: - t.Fatal("ch was notified before broadcast") - default: - } -} - -func TestCondRegisterAfterNotifyShouldBroadcast(t *testing.T) { - var c Cond - ch := make(chan int, 1) - c.Notify() - c.Register(ch, 0) - select { - case v := <-ch: - if v != 1 { - t.Fatal("ch was notified with the wrong sequence number", v) - } - default: - t.Fatal("ch was not notified on registration") - } -} - -func TestCondRegisterAfterNotifyWithCorrectSequenceShouldNotBroadcast(t *testing.T) { - var c Cond - ch := make(chan int, 1) - c.Notify() - c.Register(ch, 0) - seq := <-ch - - c.Register(ch, seq) - select { - case v := <-ch: - t.Fatal("ch was notified immediately with seq", v) - default: - } -} - -func TestCondRegisterWithHintShouldNotifyWithoutHint(t *testing.T) { - var c Cond - ch := make(chan int, 1) - c.Register(ch, 1, "ingress_https") - c.Notify() - select { - case v := <-ch: - if v != 1 { - t.Fatal("ch was notified with the wrong sequence number", v) - } - default: - t.Fatal("ch was not notified") - } -} - -func TestCondRegisterWithHintShouldNotifyWithHint(t *testing.T) { - var c Cond - ch := make(chan int, 1) - c.Register(ch, 1, "ingress_https") - c.Notify("ingress_https") - select { - case v := <-ch: - if v != 1 { - t.Fatal("ch was notified with the wrong sequence number", v) - } - default: - t.Fatal("ch was not notified") - } -} - -func TestCondRegisterWithHintShouldNotNotifyWithWrongHint(t *testing.T) { - var c Cond - ch := make(chan int, 1) - c.Register(ch, 1, "ingress_https") - c.Notify("banana") - select { - case v := <-ch: - t.Fatal("ch was notified when it should not be", v) - default: - } - c.Notify("ingress_https") - select { - case v := <-ch: - if v != 2 { - t.Fatal("ch was notified with the wrong sequence number", v) - } - default: - t.Fatal("ch was not notified") - } -} diff --git a/internal/contour/endpointstranslator.go b/internal/contour/endpointstranslator.go index 3cf84ba2bdc..e35702f5b8d 100644 --- a/internal/contour/endpointstranslator.go +++ b/internal/contour/endpointstranslator.go @@ -18,10 +18,10 @@ import ( "strings" "sync" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoy_api_v2_endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" - resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" - "github.com/golang/protobuf/proto" "github.com/projectcontour/contour/internal/envoy" "github.com/projectcontour/contour/internal/protobuf" "github.com/projectcontour/contour/internal/sorter" @@ -36,6 +36,8 @@ import ( type EndpointsTranslator struct { logrus.FieldLogger clusterLoadAssignmentCache + + SnapshotHandler *SnapshotHandler } func (e *EndpointsTranslator) OnAdd(obj interface{}) { @@ -72,32 +74,12 @@ func (e *EndpointsTranslator) OnDelete(obj interface{}) { } } -func (e *EndpointsTranslator) Contents() []proto.Message { +func (e *EndpointsTranslator) Contents() []types.Resource { values := e.clusterLoadAssignmentCache.Contents() sort.Stable(sorter.For(values)) return protobuf.AsMessages(values) } -func (e *EndpointsTranslator) Query(names []string) []proto.Message { - e.clusterLoadAssignmentCache.mu.Lock() - defer e.clusterLoadAssignmentCache.mu.Unlock() - values := make([]*v2.ClusterLoadAssignment, 0, len(names)) - for _, n := range names { - v, ok := e.entries[n] - if !ok { - v = &v2.ClusterLoadAssignment{ - ClusterName: n, - } - } - values = append(values, v) - } - - sort.Stable(sorter.For(values)) - return protobuf.AsMessages(values) -} - -func (*EndpointsTranslator) TypeURL() string { return resource.EndpointType } - func (e *EndpointsTranslator) addEndpoints(ep *v1.Endpoints) { e.recomputeClusterLoadAssignment(nil, ep) } @@ -182,12 +164,12 @@ func (e *EndpointsTranslator) recomputeClusterLoadAssignment(oldep, newep *v1.En } } + e.SnapshotHandler.UpdateSnapshot() } type clusterLoadAssignmentCache struct { mu sync.Mutex entries map[string]*v2.ClusterLoadAssignment - Cond } // Add adds an entry to the cache. If a ClusterLoadAssignment with the same @@ -199,7 +181,6 @@ func (c *clusterLoadAssignmentCache) Add(a *v2.ClusterLoadAssignment) { c.entries = make(map[string]*v2.ClusterLoadAssignment) } c.entries[a.ClusterName] = a - c.Notify(a.ClusterName) } // Remove removes the named entry from the cache. If the entry @@ -208,7 +189,6 @@ func (c *clusterLoadAssignmentCache) Remove(name string) { c.mu.Lock() defer c.mu.Unlock() delete(c.entries, name) - c.Notify(name) } // Contents returns a copy of the contents of the cache. diff --git a/internal/contour/listener.go b/internal/contour/listener.go index fa732c4ad89..0e14e1ee614 100644 --- a/internal/contour/listener.go +++ b/internal/contour/listener.go @@ -19,12 +19,12 @@ import ( "sync" "time" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoy_api_v2_auth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth" envoy_api_v2_listener "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener" envoy_api_v2_accesslog "github.com/envoyproxy/go-control-plane/envoy/config/filter/accesslog/v2" - resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" - "github.com/golang/protobuf/proto" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" "github.com/projectcontour/contour/internal/protobuf" @@ -209,7 +209,6 @@ type ListenerCache struct { mu sync.Mutex values map[string]*v2.Listener staticValues map[string]*v2.Listener - Cond } // NewListenerCache returns an instance of a ListenerCache @@ -228,11 +227,10 @@ func (c *ListenerCache) Update(v map[string]*v2.Listener) { defer c.mu.Unlock() c.values = v - c.Cond.Notify() } // Contents returns a copy of the cache's contents. -func (c *ListenerCache) Contents() []proto.Message { +func (c *ListenerCache) Contents() []types.Resource { c.mu.Lock() defer c.mu.Unlock() var values []*v2.Listener @@ -246,33 +244,6 @@ func (c *ListenerCache) Contents() []proto.Message { return protobuf.AsMessages(values) } -// Query returns the proto.Messages in the ListenerCache that match -// a slice of strings -func (c *ListenerCache) Query(names []string) []proto.Message { - c.mu.Lock() - defer c.mu.Unlock() - var values []*v2.Listener - for _, n := range names { - v, ok := c.values[n] - if !ok { - v, ok = c.staticValues[n] - if !ok { - // if the listener is not registered in - // dynamic or static values then skip it - // as there is no way to return a blank - // listener because the listener address - // field is required. - continue - } - } - values = append(values, v) - } - sort.Stable(sorter.For(values)) - return protobuf.AsMessages(values) -} - -func (*ListenerCache) TypeURL() string { return resource.ListenerType } - type listenerVisitor struct { *ListenerVisitorConfig diff --git a/internal/contour/route.go b/internal/contour/route.go index 1634010b766..dff91769fbc 100644 --- a/internal/contour/route.go +++ b/internal/contour/route.go @@ -18,10 +18,10 @@ import ( "sort" "sync" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoy_api_v2_route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" - resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" - "github.com/golang/protobuf/proto" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" "github.com/projectcontour/contour/internal/protobuf" @@ -32,7 +32,6 @@ import ( type RouteCache struct { mu sync.Mutex values map[string]*v2.RouteConfiguration - Cond } // Update replaces the contents of the cache with the supplied map. @@ -41,11 +40,10 @@ func (c *RouteCache) Update(v map[string]*v2.RouteConfiguration) { defer c.mu.Unlock() c.values = v - c.Cond.Notify() } // Contents returns a copy of the cache's contents. -func (c *RouteCache) Contents() []proto.Message { +func (c *RouteCache) Contents() []types.Resource { c.mu.Lock() defer c.mu.Unlock() @@ -58,35 +56,6 @@ func (c *RouteCache) Contents() []proto.Message { return protobuf.AsMessages(values) } -// Query searches the RouteCache for the named RouteConfiguration entries. -func (c *RouteCache) Query(names []string) []proto.Message { - c.mu.Lock() - defer c.mu.Unlock() - - var values []*v2.RouteConfiguration - for _, n := range names { - v, ok := c.values[n] - if !ok { - // if there is no route registered with the cache - // we return a blank route configuration. This is - // not the same as returning nil, we're choosing to - // say "the configuration you asked for _does exists_, - // but it contains no useful information. - v = &v2.RouteConfiguration{ - Name: n, - } - } - values = append(values, v) - } - - //sort.RouteConfigurations(values) - sort.Stable(sorter.For(values)) - return protobuf.AsMessages(values) -} - -// TypeURL returns the string type of RouteCache Resource. -func (*RouteCache) TypeURL() string { return resource.RouteType } - type routeVisitor struct { routes map[string]*v2.RouteConfiguration } diff --git a/internal/contour/secret.go b/internal/contour/secret.go index 9fae7153f88..63469a65011 100644 --- a/internal/contour/secret.go +++ b/internal/contour/secret.go @@ -17,9 +17,9 @@ import ( "sort" "sync" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + envoy_api_v2_auth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth" - resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2" - "github.com/golang/protobuf/proto" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/envoy" "github.com/projectcontour/contour/internal/protobuf" @@ -30,7 +30,6 @@ import ( type SecretCache struct { mu sync.Mutex values map[string]*envoy_api_v2_auth.Secret - Cond } // Update replaces the contents of the cache with the supplied map. @@ -39,11 +38,10 @@ func (c *SecretCache) Update(v map[string]*envoy_api_v2_auth.Secret) { defer c.mu.Unlock() c.values = v - c.Cond.Notify() } // Contents returns a copy of the cache's contents. -func (c *SecretCache) Contents() []proto.Message { +func (c *SecretCache) Contents() []types.Resource { c.mu.Lock() defer c.mu.Unlock() var values []*envoy_api_v2_auth.Secret @@ -54,23 +52,23 @@ func (c *SecretCache) Contents() []proto.Message { return protobuf.AsMessages(values) } -func (c *SecretCache) Query(names []string) []proto.Message { - c.mu.Lock() - defer c.mu.Unlock() - var values []*envoy_api_v2_auth.Secret - for _, n := range names { - // we can only return secrets where their value is - // known. if the secret is not registered in the cache - // we return nothing. - if v, ok := c.values[n]; ok { - values = append(values, v) - } - } - sort.Stable(sorter.For(values)) - return protobuf.AsMessages(values) -} - -func (*SecretCache) TypeURL() string { return resource.SecretType } +//func (c *SecretCache) Query(names []string) []proto.Message { +// c.mu.Lock() +// defer c.mu.Unlock() +// var values []*envoy_api_v2_auth.Secret +// for _, n := range names { +// // we can only return secrets where their value is +// // known. if the secret is not registered in the cache +// // we return nothing. +// if v, ok := c.values[n]; ok { +// values = append(values, v) +// } +// } +// sort.Stable(sorter.For(values)) +// return protobuf.AsMessages(values) +//} +// +//func (*SecretCache) TypeURL() string { return resource.SecretType } type secretVisitor struct { secrets map[string]*envoy_api_v2_auth.Secret diff --git a/internal/contour/snapshot.go b/internal/contour/snapshot.go new file mode 100644 index 00000000000..c4cc7207a87 --- /dev/null +++ b/internal/contour/snapshot.go @@ -0,0 +1,32 @@ +package contour + +import ( + "strconv" + + "github.com/envoyproxy/go-control-plane/pkg/cache/v2" +) + +// SnapshotHandler implements xds snapshot cache +type SnapshotHandler struct { + CacheHandler *CacheHandler + EndpointsTranslator *EndpointsTranslator + + // snapshotVersion holds the current version of the snapshot + snapshotVersion int + + // snapshotCache holds + SnapshotCache cache.SnapshotCache +} + +func (s *SnapshotHandler) UpdateSnapshot() { + // Increment the snapshot version + s.snapshotVersion++ + + // Create xds snapshot + snapshot := cache.NewSnapshot(strconv.Itoa(s.snapshotVersion), + s.EndpointsTranslator.Contents(), + s.CacheHandler.ClusterCache.Contents(), + s.CacheHandler.RouteCache.Contents(), + s.CacheHandler.ListenerCache.Contents(), nil) + s.SnapshotCache.SetSnapshot("contour", snapshot) +} diff --git a/internal/grpc/server.go b/internal/grpc/server.go index 818dfa0b985..4469e3fdb87 100644 --- a/internal/grpc/server.go +++ b/internal/grpc/server.go @@ -15,109 +15,25 @@ package grpc import ( - "context" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/prometheus/client_golang/prometheus" - - v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - loadstats "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v2" - "github.com/sirupsen/logrus" + "google.golang.org/grpc" ) // NewAPI returns a *grpc.Server which responds to the Envoy v2 xDS gRPC API. -func NewAPI(log logrus.FieldLogger, resources map[string]Resource, registry *prometheus.Registry, opts ...grpc.ServerOption) *grpc.Server { +func NewAPI(registry *prometheus.Registry, opts ...grpc.ServerOption) *grpc.Server { s := &grpcServer{ - xdsHandler{ - FieldLogger: log, - resources: resources, - }, - grpc_prometheus.NewServerMetrics(), + metrics: grpc_prometheus.NewServerMetrics(), } registry.MustRegister(s.metrics) opts = append(opts, grpc.StreamInterceptor(s.metrics.StreamServerInterceptor()), grpc.UnaryInterceptor(s.metrics.UnaryServerInterceptor())) g := grpc.NewServer(opts...) - v2.RegisterClusterDiscoveryServiceServer(g, s) - v2.RegisterEndpointDiscoveryServiceServer(g, s) - v2.RegisterListenerDiscoveryServiceServer(g, s) - v2.RegisterRouteDiscoveryServiceServer(g, s) - discovery.RegisterSecretDiscoveryServiceServer(g, s) s.metrics.InitializeMetrics(g) return g } // grpcServer implements the LDS, RDS, CDS, and EDS, gRPC endpoints. type grpcServer struct { - xdsHandler metrics *grpc_prometheus.ServerMetrics } - -func (s *grpcServer) FetchClusters(_ context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "FetchClusters unimplemented") -} - -func (s *grpcServer) FetchEndpoints(_ context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "FetchEndpoints unimplemented") -} - -func (s *grpcServer) DeltaEndpoints(v2.EndpointDiscoveryService_DeltaEndpointsServer) error { - return status.Errorf(codes.Unimplemented, "DeltaEndpoints unimplemented") -} - -func (s *grpcServer) FetchListeners(_ context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "FetchListeners unimplemented") -} - -func (s *grpcServer) DeltaListeners(v2.ListenerDiscoveryService_DeltaListenersServer) error { - return status.Errorf(codes.Unimplemented, "DeltaListeners unimplemented") -} - -func (s *grpcServer) FetchRoutes(_ context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "FetchRoutes unimplemented") -} - -func (s *grpcServer) FetchSecrets(_ context.Context, req *v2.DiscoveryRequest) (*v2.DiscoveryResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "FetchSecrets unimplemented") -} - -func (s *grpcServer) DeltaSecrets(discovery.SecretDiscoveryService_DeltaSecretsServer) error { - return status.Errorf(codes.Unimplemented, "DeltaSecrets unimplemented") -} - -func (s *grpcServer) StreamClusters(srv v2.ClusterDiscoveryService_StreamClustersServer) error { - return s.stream(srv) -} - -func (s *grpcServer) StreamEndpoints(srv v2.EndpointDiscoveryService_StreamEndpointsServer) error { - return s.stream(srv) -} - -func (s *grpcServer) StreamLoadStats(srv loadstats.LoadReportingService_StreamLoadStatsServer) error { - return status.Errorf(codes.Unimplemented, "StreamLoadStats unimplemented") -} - -func (s *grpcServer) DeltaClusters(v2.ClusterDiscoveryService_DeltaClustersServer) error { - return status.Errorf(codes.Unimplemented, "IncrementalClusters unimplemented") -} - -func (s *grpcServer) DeltaRoutes(v2.RouteDiscoveryService_DeltaRoutesServer) error { - return status.Errorf(codes.Unimplemented, "IncrementalRoutes unimplemented") -} - -func (s *grpcServer) StreamListeners(srv v2.ListenerDiscoveryService_StreamListenersServer) error { - return s.stream(srv) -} - -func (s *grpcServer) StreamRoutes(srv v2.RouteDiscoveryService_StreamRoutesServer) error { - return s.stream(srv) -} - -func (s *grpcServer) StreamSecrets(srv discovery.SecretDiscoveryService_StreamSecretsServer) error { - return s.stream(srv) -} diff --git a/internal/grpc/xds.go b/internal/grpc/xds.go deleted file mode 100644 index ff4fcb52a67..00000000000 --- a/internal/grpc/xds.go +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright © 2019 VMware -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpc - -import ( - "context" - "fmt" - "strconv" - "sync/atomic" - - envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" - "github.com/golang/protobuf/ptypes/any" - "github.com/sirupsen/logrus" -) - -// Resource represents a source of proto.Messages that can be registered -// for interest. -type Resource interface { - // Contents returns the contents of this resource. - Contents() []proto.Message - - // Query returns an entry for each resource name supplied. - Query(names []string) []proto.Message - - // Register registers ch to receive a value when Notify is called. - Register(chan int, int, ...string) - - // TypeURL returns the typeURL of messages returned from Values. - TypeURL() string -} - -// xdsHandler implements the Envoy xDS gRPC protocol. -type xdsHandler struct { - logrus.FieldLogger - connections counter - resources map[string]Resource // registered resource types -} - -type grpcStream interface { - Context() context.Context - Send(*envoy_api_v2.DiscoveryResponse) error - Recv() (*envoy_api_v2.DiscoveryRequest, error) -} - -// stream processes a stream of DiscoveryRequests. -func (xh *xdsHandler) stream(st grpcStream) error { - // bump connection counter and set it as a field on the logger - log := xh.WithField("connection", xh.connections.next()) - - // Notify whether the stream terminated on error. - done := func(log *logrus.Entry, err error) error { - if err != nil { - log.WithError(err).Error("stream terminated") - } else { - log.Info("stream terminated") - } - - return err - } - - ch := make(chan int, 1) - - // internally all registration values start at zero so sending - // a last that is less than zero will guarantee that each stream - // will generate a response immediately, then wait. - last := -1 - ctx := st.Context() - - // now stick in this loop until the client disconnects. - for { - // first we wait for the request from Envoy, this is part of - // the xDS protocol. - req, err := st.Recv() - if err != nil { - return done(log, err) - } - - // note: redeclare log in this scope so the next time around the loop all is forgotten. - log := log.WithField("version_info", req.VersionInfo).WithField("response_nonce", req.ResponseNonce) - if req.Node != nil { - log = log.WithField("node_id", req.Node.Id).WithField("node_version", req.Node.BuildVersion) - } - - if status := req.ErrorDetail; status != nil { - // if Envoy rejected the last update log the details here. - // TODO(dfc) issue 1176: handle xDS ACK/NACK - log.WithField("code", status.Code).Error(status.Message) - } - - // from the request we derive the resource to stream which have - // been registered according to the typeURL. - r, ok := xh.resources[req.TypeUrl] - if !ok { - return done(log, fmt.Errorf("no resource registered for typeURL %q", req.TypeUrl)) - } - - log = log.WithField("resource_names", req.ResourceNames).WithField("type_url", req.TypeUrl) - log.Info("stream_wait") - - // now we wait for a notification, if this is the first request received on this - // connection last will be less than zero and that will trigger a response immediately. - r.Register(ch, last, req.ResourceNames...) - select { - case last = <-ch: - // boom, something in the cache has changed. - // TODO(dfc) the thing that has changed may not be in the scope of the filter - // so we're going to be sending an update that is a no-op. See #426 - - var resources []proto.Message - switch len(req.ResourceNames) { - case 0: - // no resource hints supplied, return the full - // contents of the resource - resources = r.Contents() - default: - // resource hints supplied, return exactly those - resources = r.Query(req.ResourceNames) - } - - any := make([]*any.Any, 0, len(resources)) - for _, r := range resources { - a, err := ptypes.MarshalAny(r) - if err != nil { - return done(log, err) - } - - any = append(any, a) - } - - resp := &envoy_api_v2.DiscoveryResponse{ - VersionInfo: strconv.Itoa(last), - Resources: any, - TypeUrl: r.TypeURL(), - Nonce: strconv.Itoa(last), - } - - if err := st.Send(resp); err != nil { - return done(log, err) - } - - case <-ctx.Done(): - return done(log, ctx.Err()) - } - } -} - -// counter holds an atomically incrementing counter. -type counter uint64 - -func (c *counter) next() uint64 { - return atomic.AddUint64((*uint64)(c), 1) -} diff --git a/internal/grpc/xds_test.go b/internal/grpc/xds_test.go deleted file mode 100644 index ac0f1857cb6..00000000000 --- a/internal/grpc/xds_test.go +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright © 2019 VMware -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package grpc - -import ( - "context" - "fmt" - "io" - "io/ioutil" - "testing" - - v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/golang/protobuf/proto" - "github.com/sirupsen/logrus" -) - -func TestXDSHandlerStream(t *testing.T) { - log := logrus.New() - log.SetOutput(ioutil.Discard) - tests := map[string]struct { - xh xdsHandler - stream grpcStream - want error - }{ - "recv returns error immediately": { - xh: xdsHandler{FieldLogger: log}, - stream: &mockStream{ - context: context.Background, - recv: func() (*v2.DiscoveryRequest, error) { - return nil, io.EOF - }, - }, - want: io.EOF, - }, - "no registered typeURL": { - xh: xdsHandler{FieldLogger: log}, - stream: &mockStream{ - context: context.Background, - recv: func() (*v2.DiscoveryRequest, error) { - return &v2.DiscoveryRequest{ - TypeUrl: "com.heptio.potato", - }, nil - }, - }, - want: fmt.Errorf("no resource registered for typeURL %q", "com.heptio.potato"), - }, - "failed to convert values to any": { - xh: xdsHandler{ - FieldLogger: log, - resources: map[string]Resource{ - "com.heptio.potato": &mockResource{ - register: func(ch chan int, i int) { - ch <- i + 1 - }, - contents: func() []proto.Message { - return []proto.Message{nil} - }, - typeurl: func() string { return "com.heptio.potato" }, - }, - }, - }, - stream: &mockStream{ - context: context.Background, - recv: func() (*v2.DiscoveryRequest, error) { - return &v2.DiscoveryRequest{ - TypeUrl: "com.heptio.potato", - }, nil - }, - }, - want: fmt.Errorf("proto: Marshal called with nil"), - }, - "failed to send": { - xh: xdsHandler{ - FieldLogger: log, - resources: map[string]Resource{ - "com.heptio.potato": &mockResource{ - register: func(ch chan int, i int) { - ch <- i + 1 - }, - contents: func() []proto.Message { - return []proto.Message{new(v2.ClusterLoadAssignment)} - }, - typeurl: func() string { return "com.heptio.potato" }, - }, - }, - }, - stream: &mockStream{ - context: context.Background, - recv: func() (*v2.DiscoveryRequest, error) { - return &v2.DiscoveryRequest{ - TypeUrl: "com.heptio.potato", - }, nil - }, - send: func(resp *v2.DiscoveryResponse) error { - return io.EOF - }, - }, - want: io.EOF, - }, - "context canceled": { - xh: xdsHandler{ - FieldLogger: log, - resources: map[string]Resource{ - "com.heptio.potato": &mockResource{ - register: func(ch chan int, i int) { - // do nothing - }, - typeurl: func() string { return "com.heptio.potato" }, - }, - }, - }, - stream: &mockStream{ - context: func() context.Context { - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - cancel() - return ctx - }, - recv: func() (*v2.DiscoveryRequest, error) { - return &v2.DiscoveryRequest{ - TypeUrl: "com.heptio.potato", - }, nil - }, - send: func(resp *v2.DiscoveryResponse) error { - return io.EOF - }, - }, - want: context.Canceled, - }, - } - - for name, tc := range tests { - t.Run(name, func(t *testing.T) { - got := tc.xh.stream(tc.stream) - if !equalError(tc.want, got) { - t.Fatalf("expected: %v, got: %v", tc.want, got) - } - }) - } -} - -type mockStream struct { - context func() context.Context - send func(*v2.DiscoveryResponse) error - recv func() (*v2.DiscoveryRequest, error) -} - -func (m *mockStream) Context() context.Context { return m.context() } -func (m *mockStream) Send(resp *v2.DiscoveryResponse) error { return m.send(resp) } -func (m *mockStream) Recv() (*v2.DiscoveryRequest, error) { return m.recv() } - -type mockResource struct { - contents func() []proto.Message - query func([]string) []proto.Message - register func(chan int, int) - typeurl func() string -} - -func (m *mockResource) Contents() []proto.Message { return m.contents() } -func (m *mockResource) Query(names []string) []proto.Message { return m.query(names) } -func (m *mockResource) Register(ch chan int, last int, hints ...string) { m.register(ch, last) } -func (m *mockResource) TypeURL() string { return m.typeurl() } - -func TestCounterNext(t *testing.T) { - var c counter - // not a map this time as we want tests to execute - // in sequence. - tests := []struct { - fn func() uint64 - want uint64 - }{{ - fn: c.next, - want: 1, - }, { - fn: c.next, - want: 2, - }, { - fn: c.next, - want: 3, - }} - - for _, tc := range tests { - got := tc.fn() - if tc.want != got { - t.Fatalf("expected %d, got %d", tc.want, got) - } - } -} - -func equalError(a, b error) bool { - if a == nil { - return b == nil - } - if b == nil { - return a == nil - } - return a.Error() == b.Error() -} diff --git a/internal/protobuf/helpers.go b/internal/protobuf/helpers.go index b21b6ab3bd2..03ea14145cd 100644 --- a/internal/protobuf/helpers.go +++ b/internal/protobuf/helpers.go @@ -18,6 +18,8 @@ import ( "reflect" "time" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" @@ -50,16 +52,16 @@ func Bool(val bool) *wrappers.BoolValue { // AsMessages casts the given slice of values (that implement the proto.Message // interface) to a slice of proto.Message. If the length of the slice is 0, it // returns nil. -func AsMessages(messages interface{}) []proto.Message { +func AsMessages(messages interface{}) []types.Resource { v := reflect.ValueOf(messages) if v.Len() == 0 { return nil } - protos := make([]proto.Message, v.Len()) + protos := make([]types.Resource, v.Len()) for i := range protos { - protos[i] = v.Index(i).Interface().(proto.Message) + protos[i] = v.Index(i).Interface().(types.Resource) } return protos