Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Steve Sloka <[email protected]>
  • Loading branch information
stevesloka committed Aug 31, 2020
1 parent 0e8dcf7 commit 9b5519c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 60 deletions.
79 changes: 31 additions & 48 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"time"

xds "github.com/envoyproxy/go-control-plane/pkg/cache/types"

"google.golang.org/grpc"

"github.com/envoyproxy/go-control-plane/pkg/cache/v2"
projectcontourv1alpha1 "github.com/projectcontour/contour/apis/projectcontour/v1alpha1"
"github.com/projectcontour/contour/internal/annotation"
Expand Down Expand Up @@ -426,65 +429,45 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
}
log.Printf("informer caches synced")

var grpcServer *grpc.Server

switch ctx.XDSServerType {
case "contour":
grpcServer := contourxds.RegisterServer(
grpcServer = contourxds.RegisterServer(
contourxds.NewContourServer(log, contour.ResourcesOf(resources)...),
registry,
ctx.grpcOptions()...)

addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}

log = log.WithField("address", addr)
if ctx.PermitInsecureGRPC {
log = log.WithField("insecure", true)
}

log.Info("started contour xDS server")
defer log.Info("stopped contour xDS server")

go func() {
<-stop

// We don't use GracefulStop here because envoy
// has long-lived hanging xDS requests. There's no
// mechanism to make those pending requests fail,
// so we forcibly terminate the TCP sessions.
grpcServer.Stop()
}()

return grpcServer.Serve(l)
case "envoy":
grpcServer := contourxds.RegisterEnvoyServer(registry, snapshotCache, ctx.grpcOptions()...)
grpcServer = contourxds.RegisterEnvoyServer(registry, snapshotCache, ctx.grpcOptions()...)
default:
log.Fatalf("invalid xdsServerType %q configured", ctx.XDSServerType)
}

addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}
addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
l, err := net.Listen("tcp", addr)
if err != nil {
return err
}

log = log.WithField("address", addr)
if ctx.PermitInsecureGRPC {
log = log.WithField("insecure", true)
}
log = log.WithField("address", addr)
if ctx.PermitInsecureGRPC {
log = log.WithField("insecure", true)
}

log.Info("started envoy xDS server")
defer log.Info("stopped envoy xDS server")
log.Info("started xDS server")
defer log.Info("stopped xDS server")

go func() {
<-stop
grpcServer.Stop()
}()
go func() {
<-stop

return grpcServer.Serve(l)
default:
log.Fatalf("invalid xdsServerType %q configured", ctx.XDSServerType)
}
return nil
// We don't use GracefulStop here because envoy
// has long-lived hanging xDS requests. There's no
// mechanism to make those pending requests fail,
// so we forcibly terminate the TCP sessions.
grpcServer.Stop()
}()

return grpcServer.Serve(l)
})

// Set up SIGTERM handler for graceful shutdown.
Expand Down
4 changes: 0 additions & 4 deletions internal/contour/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ import (
// SnapshotHandler implements the xDS snapshot cache
type SnapshotHandler struct {

// Observer allows for an event to propagate when
// the DAG is rebuilt and a new snapshot is needed.
dag.Observer

// resources holds a map of ResourceCaches which
// relate back to the xDS caches used to populate
// new snapshots.
Expand Down
12 changes: 4 additions & 8 deletions internal/xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ import (
"context"

api "github.com/envoyproxy/go-control-plane/envoy/api/v2"
clusterservice "github.com/envoyproxy/go-control-plane/envoy/api/v2"
endpointservice "github.com/envoyproxy/go-control-plane/envoy/api/v2"
listenerservice "github.com/envoyproxy/go-control-plane/envoy/api/v2"
routeservice "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/envoyproxy/go-control-plane/pkg/cache/v2"
Expand All @@ -32,12 +28,12 @@ import (

// Server is a collection of handlers for streaming discovery requests.
type Server interface {
clusterservice.ClusterDiscoveryServiceServer
api.ClusterDiscoveryServiceServer
api.EndpointDiscoveryServiceServer
api.ListenerDiscoveryServiceServer
api.RouteDiscoveryServiceServer
discovery.AggregatedDiscoveryServiceServer
discovery.SecretDiscoveryServiceServer
endpointservice.EndpointDiscoveryServiceServer
listenerservice.ListenerDiscoveryServiceServer
routeservice.RouteDiscoveryServiceServer
}

// RegisterServer registers the given xDS protocol Server with the gRPC
Expand Down
3 changes: 3 additions & 0 deletions internal/xds/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package xds
import (
"strings"

xds "github.com/envoyproxy/go-control-plane/pkg/cache/types"
"k8s.io/apimachinery/pkg/types"
)

type ResourceType = xds.ResponseType

// ClusterLoadAssignmentName generates the name used for an EDS
// ClusterLoadAssignment, given a fully qualified Service name and
// port. This name is a contract between the producer of a cluster
Expand Down

0 comments on commit 9b5519c

Please sign in to comment.