Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal: Wire up new snapshots to DAG rebuilds #2850

Merged
merged 1 commit into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 43 additions & 26 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"syscall"
"time"

"github.com/envoyproxy/go-control-plane/pkg/cache/v2"
"github.com/envoyproxy/go-control-plane/pkg/server/v2"
projectcontourv1alpha1 "github.com/projectcontour/contour/apis/projectcontour/v1alpha1"
"github.com/projectcontour/contour/internal/annotation"
"github.com/projectcontour/contour/internal/contour"
Expand All @@ -38,6 +40,7 @@ import (
"github.com/projectcontour/contour/internal/xds"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -216,11 +219,19 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
endpointHandler,
}

// snapshotCache is used to store the state of what all xDS services should
// contain at any given point in time.
snapshotCache := cache.NewSnapshotCache(false, xds.DefaultHash,
log.WithField("context", "xDS"))

// snapshotHandler is used to produce new snapshots when the internal state changes for any xDS resource.
snapshotHandler := contour.NewSnapshotHandler(snapshotCache, resources, log.WithField("context", "snapshotHandler"))

// Build the core Kubernetes event handler.
eventHandler := &contour.EventHandler{
HoldoffDelay: 100 * time.Millisecond,
HoldoffMaxDelay: 500 * time.Millisecond,
Observer: dag.ComposeObservers(contour.ObserversOf(resources)...),
Observer: dag.ComposeObservers(append(contour.ObserversOf(resources), snapshotHandler)...),
Builder: dag.Builder{
FieldLogger: log.WithField("context", "builder"),
Source: dag.KubernetesCache{
Expand Down Expand Up @@ -423,42 +434,48 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
log.Printf("informer caches synced")

var grpcServer *grpc.Server

switch ctx.XDSServerType {
case "contour":
rpcServer := xds.RegisterServer(
grpcServer = xds.RegisterServer(
xds.NewContourServer(log, contour.ResourcesOf(resources)...),
registry,
ctx.grpcOptions()...)
case "envoy":
grpcServer = xds.RegisterServer(
server.NewServer(context.Background(), snapshotCache, nil),
registry,
ctx.grpcOptions()...)
default:
log.Fatalf("invalid xdsServerType %q configured", ctx.XDSServerType)
}

addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}
addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}

log = log.WithField("address", addr)
if ctx.PermitInsecureGRPC {
log = log.WithField("insecure", true)
}
log = log.WithField("address", addr)
if ctx.PermitInsecureGRPC {
log = log.WithField("insecure", true)
}

log.Info("started xDS server")
defer log.Info("stopped xDS server")
log.Info("started xDS server")
defer log.Info("stopped xDS server")

go func() {
<-stop
go func() {
<-stop

// We don't use GracefulStop here because envoy
// has long-lived hanging xDS requests. There's no
// mechanism to make those pending requests fail,
// so we forcibly terminate the TCP sessions.
rpcServer.Stop()
}()
// We don't use GracefulStop here because envoy
// has long-lived hanging xDS requests. There's no
// mechanism to make those pending requests fail,
// so we forcibly terminate the TCP sessions.
grpcServer.Stop()
}()

return rpcServer.Serve(l)
default:
log.Fatalf("invalid xdsServerType %q configured", ctx.XDSServerType)
}
return nil
return grpcServer.Serve(l)
})

// Set up SIGTERM handler for graceful shutdown.
Expand Down
3 changes: 1 addition & 2 deletions internal/contour/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (
"sort"
"sync"

resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
resource "github.com/envoyproxy/go-control-plane/pkg/resource/v2"
"github.com/golang/protobuf/proto"
"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/envoy"
Expand Down
46 changes: 46 additions & 0 deletions internal/contour/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright Project Contour Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package contour

import (
"github.com/envoyproxy/go-control-plane/pkg/cache/v2"
"github.com/projectcontour/contour/internal/dag"
"github.com/sirupsen/logrus"
)

// SnapshotHandler implements the xDS snapshot cache
type SnapshotHandler struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that "handler" is the right naming here. The "Handler" name is used for types that deal with Kubernetes events directly, but this is a DAG observer which is a different role.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It "handles' snapshots? I dunno, please suggest I think the name is fine. =)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It "handles' snapshots? I dunno, please suggest I think the name is fine. =)

ROFL, it's just that it's a handler in a different sense than the other types that we name "*Handler". I will get by with "handler" if we can't think of a evocative alternative :)

// resources holds the cache of xDS contents.
resources []ResourceCache

// snapshotCache is a snapshot-based cache that maintains a single versioned
// snapshot of responses for xDS resources that Contour manages
snapshotCache cache.SnapshotCache

logrus.FieldLogger
}

// NewSnapshotHandler returns an instance of SnapShotHandler
func NewSnapshotHandler(c cache.SnapshotCache, resources []ResourceCache, logger logrus.FieldLogger) *SnapshotHandler {
return &SnapshotHandler{
snapshotCache: c,
resources: resources,
FieldLogger: logger,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

What I had in mind was something along the lines of the pseudocode below. We can treat the resources generically, and control how we forward the DAG changes.

type SnapshotHandler struct {
    snapshot types.Snapshot
    resources []ResourceCache
}

func NewSnapshotHandler(c cache.SnapshotCache, resources []ResourceCache, logger logrus.FieldLogger) *SnapshotHandler {
}

func (s *SnapshotHandler) OnChange(root *dag.DAG) {
    // Forward the DAG change to each resource cache.
    for _, r := range s.resources {
	r.OnChange(root)
    }

    for _, r := range s.resources {
	contents := r.Contents()
	responseType := responseTypeOf(r.TypeURL())
	version := nextVersionForType()

	s.snapshot.Resources[responseType] = cache.NewResources(version, responseSlice(contents)
    }
}

}

// OnChange is called when the DAG is rebuilt
// and a new snapshot is needed.
func (s *SnapshotHandler) OnChange(root *dag.DAG) {
}
36 changes: 36 additions & 0 deletions internal/xds/hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright Project Contour Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package xds

import (
envoy_api_v2_core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
cache "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
)

// ConstantHash is a specialized node ID hasher used to allow
// any instance of Envoy to connect to Contour regardless of the
// service-node flag configured on Envoy.
type ConstantHash string

func (c ConstantHash) ID(*envoy_api_v2_core.Node) string {
return string(c)
}

func (c ConstantHash) String() string {
return string(c)
}

var _ cache.NodeHash = ConstantHash("")

var DefaultHash = ConstantHash("contour")
24 changes: 10 additions & 14 deletions internal/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@
package xds

import (
clusterservice "github.com/envoyproxy/go-control-plane/envoy/api/v2"
endpointservice "github.com/envoyproxy/go-control-plane/envoy/api/v2"
listenerservice "github.com/envoyproxy/go-control-plane/envoy/api/v2"
routeservice "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
api "github.com/envoyproxy/go-control-plane/envoy/api/v2"
stevesloka marked this conversation as resolved.
Show resolved Hide resolved
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -27,12 +23,12 @@ import (

// Server is a collection of handlers for streaming discovery requests.
type Server interface {
clusterservice.ClusterDiscoveryServiceServer
api.ClusterDiscoveryServiceServer
api.EndpointDiscoveryServiceServer
api.ListenerDiscoveryServiceServer
api.RouteDiscoveryServiceServer
discovery.AggregatedDiscoveryServiceServer
discovery.SecretDiscoveryServiceServer
endpointservice.EndpointDiscoveryServiceServer
listenerservice.ListenerDiscoveryServiceServer
routeservice.RouteDiscoveryServiceServer
}

// RegisterServer registers the given xDS protocol Server with the gRPC
Expand All @@ -41,7 +37,7 @@ type Server interface {
func RegisterServer(srv Server, registry *prometheus.Registry, opts ...grpc.ServerOption) *grpc.Server {
var metrics *grpc_prometheus.ServerMetrics

// TODO(jpeach) Figure out how to decouple this.
// TODO: Decouple registry from this.
if registry != nil {
metrics = grpc_prometheus.NewServerMetrics()
registry.MustRegister(metrics)
Expand All @@ -57,10 +53,10 @@ func RegisterServer(srv Server, registry *prometheus.Registry, opts ...grpc.Serv

discovery.RegisterAggregatedDiscoveryServiceServer(g, srv)
discovery.RegisterSecretDiscoveryServiceServer(g, srv)
v2.RegisterClusterDiscoveryServiceServer(g, srv)
v2.RegisterEndpointDiscoveryServiceServer(g, srv)
v2.RegisterListenerDiscoveryServiceServer(g, srv)
v2.RegisterRouteDiscoveryServiceServer(g, srv)
api.RegisterClusterDiscoveryServiceServer(g, srv)
api.RegisterEndpointDiscoveryServiceServer(g, srv)
api.RegisterListenerDiscoveryServiceServer(g, srv)
api.RegisterRouteDiscoveryServiceServer(g, srv)

if metrics != nil {
metrics.InitializeMetrics(g)
Expand Down