Skip to content

Commit

Permalink
Avoid blocking child type updates on parent ack
Browse files Browse the repository at this point in the history
When providing updates to Envoy it is safest to follow the update order
of: Clusters, Endpoints, Listeners, Routes. This order is encoded in our
delta xDS response generation code.

Another noteworthy attribute of these resources is that there are
parent->child relationships between: Cluster->Endpoints and
Listener->Routes. Envoy couples the storage of a parent and a child type
such that whenever a control plane sends an update for a parent type, it
MUST also send fresh data for the child type, even if Envoy previously
had the latest data for the child type.

A wrinkle that isn't accounted for currently is that Envoy does not
actually ACK parent types until after attempting to receive data for the
child types. This behavior also actually differs between Clusters and
Listeners:
- When a cluster is sent, Envoy will wait up to a 15s timeout for
  endpoints to arrive before sending the cluster ACK.
- When a listener is sent, and it specifies RDS routes, Envoy will wait
  until those routes arrive before ACKing the listener. Though it's not
  clear to me what the timeout is for this, it exceeds a minute from my
  tests.

However, a behavior encoded in our xDS update order is that we avoid
sending ANY data if we are waiting on ACKs for ANY resource. Meaning
that when we first send a cluster to Envoy and it requests endpoints for
that cluster, the endpoints do not actually get sent for at least 15.
This is because the endpoint update is paused by Consul until the
cluster ACK, which Envoy pauses until it gets endpoints or times out.

This commit changes the xDS order gating such that we only block
listener/route updates if there are cluster/endpoin updates pending.
This is to avoid routing to an invalid destination.
  • Loading branch information
freddygv committed Nov 7, 2022
1 parent c064ddf commit e43e898
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 119 deletions.
150 changes: 92 additions & 58 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -158,8 +157,14 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// representation of envoy state to force an update.
//
// see: https://github.com/envoyproxy/envoy/issues/13009
handlers[xdscommon.ListenerType].childType = handlers[xdscommon.RouteType]
handlers[xdscommon.ClusterType].childType = handlers[xdscommon.EndpointType]
handlers[xdscommon.ListenerType].deltaChild = &xDSDeltaChild{
childType: handlers[xdscommon.RouteType],
childrenNames: make(map[string][]string),
}
handlers[xdscommon.ClusterType].deltaChild = &xDSDeltaChild{
childType: handlers[xdscommon.EndpointType],
childrenNames: make(map[string][]string),
}

var authTimer <-chan time.Time
extendAuthTimer := func() {
Expand Down Expand Up @@ -346,30 +351,26 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
continue
}

var pendingTypes []string
for typeUrl, handler := range handlers {
if !handler.registered {
continue
}
if len(handler.pendingUpdates) > 0 {
pendingTypes = append(pendingTypes, typeUrl)
}
}
if len(pendingTypes) > 0 {
sort.Strings(pendingTypes)
generator.Logger.Trace("Skipping delta computation because there are responses in flight",
"pendingTypeUrls", pendingTypes)
continue
}

generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")

streamStartOnce.Do(func() {
metrics.MeasureSince([]string{"xds", "server", "streamStart"}, streamStartTime)
})

for _, op := range xDSUpdateOrder {
err, sent := handlers[op.TypeUrl].SendIfNew(
if op.TypeUrl == xdscommon.ListenerType || op.TypeUrl == xdscommon.RouteType {
if clusterHandler := handlers[xdscommon.ClusterType]; clusterHandler.registered && len(clusterHandler.pendingUpdates) > 0 {
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
"typeUrl", op.TypeUrl, "dependent", xdscommon.ClusterType)
break
}
if endpointHandler := handlers[xdscommon.EndpointType]; endpointHandler.registered && len(endpointHandler.pendingUpdates) > 0 {
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
"typeUrl", op.TypeUrl, "dependent", xdscommon.EndpointType)
break
}
}
err, _ := handlers[op.TypeUrl].SendIfNew(
cfgSnap.Kind,
currentVersions[op.TypeUrl],
resourceMap,
Expand All @@ -383,9 +384,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
op.errorLogNameReplyPrefix(),
op.TypeUrl, err)
}
if sent {
break // wait until we get an ACK to do more
}
}
}
}
Expand Down Expand Up @@ -435,16 +433,30 @@ func (op *xDSUpdateOperation) errorLogNameReplyPrefix() string {
}
}

type xDSDeltaChild struct {
// childType is a type that in Envoy is actually stored within this type.
// Upserts of THIS type should potentially trigger dependent named
// resources within the child to be re-configured.
childType *xDSDeltaType

// childrenNames is map of parent resource names to a list of associated child resource
// names.
childrenNames map[string][]string
}

func (c *xDSDeltaChild) forceResubscribe(logger *hclog.Logger, parent string, child string) {

}

type xDSDeltaType struct {
generator *ResourceGenerator
stream ADSDeltaStream
typeURL string
allowEmptyFn func(kind structs.ServiceKind) bool

// childType is a type that in Envoy is actually stored within this type.
// Upserts of THIS type should potentially trigger dependent named
// resources within the child to be re-configured.
childType *xDSDeltaType
// deltaChild contains data for an xDS child type if there is one.
// For example, endpoints are a child type of clusters.
deltaChild *xDSDeltaChild

// registered indicates if this type has been requested at least once by
// the proxy
Expand Down Expand Up @@ -484,9 +496,8 @@ func (t *xDSDeltaType) subscribed(name string) bool {
}

type PendingUpdate struct {
Remove bool
Version string
ChildResources []string // optional
Remove bool
Version string
}

func newDeltaType(
Expand Down Expand Up @@ -610,6 +621,15 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
t.resourceVersions[name] = ""
}

// Certain xDS types are children of other types, meaning that if Envoy subscribes to a parent.
// We MUST assume that if Envoy ever had data for the children of this parent, then the child's
// data is gone.
if t.deltaChild != nil && t.deltaChild.childType.registered {
for _, childName := range t.deltaChild.childrenNames[name] {
t.ensureChildResend(name, childName)
}
}

if alreadySubscribed {
logger.Trace("re-subscribing resource for stream", "resource", name)
} else {
Expand Down Expand Up @@ -646,27 +666,6 @@ func (t *xDSDeltaType) ack(nonce string) {
}

t.resourceVersions[name] = obj.Version
if t.childType != nil {
// This branch only matters on UPDATE, since we already have
// mechanisms to clean up orphaned resources.
for _, childName := range obj.ChildResources {
if _, exist := t.childType.resourceVersions[childName]; !exist {
continue
}
if !t.subscribed(childName) {
continue
}
t.generator.Logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", name,
"childTypeUrl", t.childType.typeURL,
"childResource", childName,
)
// Basically manifest this as a re-subscribe/re-sync
t.childType.resourceVersions[childName] = ""
}
}
}
t.sentToEnvoyOnce = true
delete(t.pendingUpdates, nonce)
Expand All @@ -686,6 +685,12 @@ func (t *xDSDeltaType) SendIfNew(
if t == nil || !t.registered {
return nil, false
}

// Wait for Envoy to catch up with this delta type before sending something new.
if len(t.pendingUpdates) > 0 {
return nil, false
}

logger := t.generator.Logger.With("typeUrl", t.typeURL)

allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn(kind)
Expand Down Expand Up @@ -721,14 +726,21 @@ func (t *xDSDeltaType) SendIfNew(
}
logger.Trace("sent response", "nonce", resp.Nonce)

if t.childType != nil {
// Capture the relevant child resource names on this pending update so
// we can properly clean up the linked children when this change is
// ACKed.
for name, obj := range updates {
// Certain xDS types are children of other types, meaning that if an update is pushed for a parent,
// we MUST send new data for all its children. Envoy will NOT re-subscribe to the child data upon
// receiving updates for the parent, so we need to handle this ourselves.
//
// Note that we do not check whether the deltaChild.childType is registered here, since we
if t.deltaChild != nil {
for name := range updates {
if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
obj.ChildResources = children
updates[name] = obj
// Capture the relevant child resource names on this pending update so
// we can know the linked children if Envoy ever re-subscribes to the parent resource.
t.deltaChild.childrenNames[name] = children

for _, childName := range children {
t.ensureChildResend(name, childName)
}
}
}
}
Expand Down Expand Up @@ -848,6 +860,28 @@ func (t *xDSDeltaType) createDeltaResponse(
return resp, realUpdates, nil
}

func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
return
}
if !t.subscribed(childName) {
return
}

t.generator.Logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", parentName,
"childTypeUrl", t.deltaChild.childType.typeURL,
"childResource", childName,
)

// resourceVersions tracks the last known version for this childName that Envoy
// has ACKed. By setting this to empty it effectively tells us that Envoy does
// not have any data for that child, and we need to re-send.
t.deltaChild.childType.resourceVersions[childName] = ""
}

func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
out := make(map[string]map[string]string)
for typeUrl, resources := range resourceMap.Index {
Expand Down
Loading

0 comments on commit e43e898

Please sign in to comment.