Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of Avoid blocking child type updates on parent ack into release/1.12.x #15501

Merged
merged 3 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/15083.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
connect: fixed bug where endpoint updates for new xDS clusters could block for 15s before being sent to Envoy.
```
154 changes: 96 additions & 58 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"sort"
"sync/atomic"
"time"

Expand Down Expand Up @@ -140,8 +139,14 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
// representation of envoy state to force an update.
//
// see: https://github.com/envoyproxy/envoy/issues/13009
handlers[xdscommon.ListenerType].childType = handlers[xdscommon.RouteType]
handlers[xdscommon.ClusterType].childType = handlers[xdscommon.EndpointType]
handlers[xdscommon.ListenerType].deltaChild = &xDSDeltaChild{
childType: handlers[xdscommon.RouteType],
childrenNames: make(map[string][]string),
}
handlers[xdscommon.ClusterType].deltaChild = &xDSDeltaChild{
childType: handlers[xdscommon.EndpointType],
childrenNames: make(map[string][]string),
}

var authTimer <-chan time.Time
extendAuthTimer := func() {
Expand Down Expand Up @@ -298,26 +303,28 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
continue
}

var pendingTypes []string
for typeUrl, handler := range handlers {
if !handler.registered {
continue
}
if len(handler.pendingUpdates) > 0 {
pendingTypes = append(pendingTypes, typeUrl)
}
}
if len(pendingTypes) > 0 {
sort.Strings(pendingTypes)
generator.Logger.Trace("Skipping delta computation because there are responses in flight",
"pendingTypeUrls", pendingTypes)
continue
}

generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")

for _, op := range xDSUpdateOrder {
err, sent := handlers[op.TypeUrl].SendIfNew(
if op.TypeUrl == xdscommon.ListenerType || op.TypeUrl == xdscommon.RouteType {
if clusterHandler := handlers[xdscommon.ClusterType]; clusterHandler.registered && len(clusterHandler.pendingUpdates) > 0 {
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
"typeUrl", op.TypeUrl, "dependent", xdscommon.ClusterType)

// Receiving an ACK from Envoy will unblock the select statement above,
// and re-trigger an attempt to send these skipped updates.
break
}
if endpointHandler := handlers[xdscommon.EndpointType]; endpointHandler.registered && len(endpointHandler.pendingUpdates) > 0 {
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
"typeUrl", op.TypeUrl, "dependent", xdscommon.EndpointType)

// Receiving an ACK from Envoy will unblock the select statement above,
// and re-trigger an attempt to send these skipped updates.
break
}
}
err, _ := handlers[op.TypeUrl].SendIfNew(
cfgSnap.Kind,
currentVersions[op.TypeUrl],
resourceMap,
Expand All @@ -331,9 +338,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
op.errorLogNameReplyPrefix(),
op.TypeUrl, err)
}
if sent {
break // wait until we get an ACK to do more
}
}
}
}
Expand Down Expand Up @@ -383,16 +387,26 @@ func (op *xDSUpdateOperation) errorLogNameReplyPrefix() string {
}
}

type xDSDeltaChild struct {
// childType is a type that in Envoy is actually stored within this type.
// Upserts of THIS type should potentially trigger dependent named
// resources within the child to be re-configured.
childType *xDSDeltaType

// childrenNames is map of parent resource names to a list of associated child resource
// names.
childrenNames map[string][]string
}

type xDSDeltaType struct {
generator *ResourceGenerator
stream ADSDeltaStream
typeURL string
allowEmptyFn func(kind structs.ServiceKind) bool

// childType is a type that in Envoy is actually stored within this type.
// Upserts of THIS type should potentially trigger dependent named
// resources within the child to be re-configured.
childType *xDSDeltaType
// deltaChild contains data for an xDS child type if there is one.
// For example, endpoints are a child type of clusters.
deltaChild *xDSDeltaChild

// registered indicates if this type has been requested at least once by
// the proxy
Expand Down Expand Up @@ -432,9 +446,8 @@ func (t *xDSDeltaType) subscribed(name string) bool {
}

type PendingUpdate struct {
Remove bool
Version string
ChildResources []string // optional
Remove bool
Version string
}

func newDeltaType(
Expand Down Expand Up @@ -558,6 +571,15 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
t.resourceVersions[name] = ""
}

// Certain xDS types are children of other types, meaning that if Envoy subscribes to a parent.
// We MUST assume that if Envoy ever had data for the children of this parent, then the child's
// data is gone.
if t.deltaChild != nil && t.deltaChild.childType.registered {
for _, childName := range t.deltaChild.childrenNames[name] {
t.ensureChildResend(name, childName)
}
}

if alreadySubscribed {
logger.Trace("re-subscribing resource for stream", "resource", name)
} else {
Expand Down Expand Up @@ -594,27 +616,6 @@ func (t *xDSDeltaType) ack(nonce string) {
}

t.resourceVersions[name] = obj.Version
if t.childType != nil {
// This branch only matters on UPDATE, since we already have
// mechanisms to clean up orphaned resources.
for _, childName := range obj.ChildResources {
if _, exist := t.childType.resourceVersions[childName]; !exist {
continue
}
if !t.subscribed(childName) {
continue
}
t.generator.Logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", name,
"childTypeUrl", t.childType.typeURL,
"childResource", childName,
)
// Basically manifest this as a re-subscribe/re-sync
t.childType.resourceVersions[childName] = ""
}
}
}
t.sentToEnvoyOnce = true
delete(t.pendingUpdates, nonce)
Expand All @@ -634,6 +635,12 @@ func (t *xDSDeltaType) SendIfNew(
if t == nil || !t.registered {
return nil, false
}

// Wait for Envoy to catch up with this delta type before sending something new.
if len(t.pendingUpdates) > 0 {
return nil, false
}

logger := t.generator.Logger.With("typeUrl", t.typeURL)

allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn(kind)
Expand Down Expand Up @@ -669,14 +676,23 @@ func (t *xDSDeltaType) SendIfNew(
}
logger.Trace("sent response", "nonce", resp.Nonce)

if t.childType != nil {
// Capture the relevant child resource names on this pending update so
// we can properly clean up the linked children when this change is
// ACKed.
for name, obj := range updates {
// Certain xDS types are children of other types, meaning that if an update is pushed for a parent,
// we MUST send new data for all its children. Envoy will NOT re-subscribe to the child data upon
// receiving updates for the parent, so we need to handle this ourselves.
//
// Note that we do not check whether the deltaChild.childType is registered here, since we send
// parent types before child types, meaning that it's expected on first send of a parent that
// there are no subscriptions for the child type.
if t.deltaChild != nil {
for name := range updates {
if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
obj.ChildResources = children
updates[name] = obj
// Capture the relevant child resource names on this pending update so
// we can know the linked children if Envoy ever re-subscribes to the parent resource.
t.deltaChild.childrenNames[name] = children

for _, childName := range children {
t.ensureChildResend(name, childName)
}
}
}
}
Expand Down Expand Up @@ -796,6 +812,28 @@ func (t *xDSDeltaType) createDeltaResponse(
return resp, realUpdates, nil
}

func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
return
}
if !t.subscribed(childName) {
return
}

t.generator.Logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", parentName,
"childTypeUrl", t.deltaChild.childType.typeURL,
"childResource", childName,
)

// resourceVersions tracks the last known version for this childName that Envoy
// has ACKed. By setting this to empty it effectively tells us that Envoy does
// not have any data for that child, and we need to re-send.
t.deltaChild.childType.resourceVersions[childName] = ""
}

func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
out := make(map[string]map[string]string)
for typeUrl, resources := range resourceMap.Index {
Expand Down
Loading