Skip to content

Commit

Permalink
x/ref/lib/aws/vxray: work around goroutine leak in xray.BeginSegmentW…
Browse files Browse the repository at this point in the history
…ithSampling (#219)

aws/aws-xray-sdk-go#51 describes the goroutine leak in xray.BeginSegmentWithSampling and an appropriate fix (Cyberax/aws-xray-sdk-go@033bad5). However, a different, incomplete fix was adopted: https://github.com/aws/aws-xray-sdk-go/pull/156/files.

This PR works around this bug by artificially creating a context and then canceling that new context when the xray segment is closed.
  • Loading branch information
cosnicolaou authored Sep 25, 2021
1 parent 22827c5 commit a0f4bd1
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
- run:
name: downloads
command: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.42.0
go get github.com/matthewloring/validjson/cmd/[email protected]
go install -x github.com/matthewloring/validjson/cmd/validjson
- go/save-cache
Expand Down
87 changes: 67 additions & 20 deletions x/ref/lib/aws/vxray/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@ type xrayspan struct {
subseg bool
mgr *manager
seg *xray.Segment

// The xray SDK holds onto goroutines until the ctx it is called with is
// canceled: see https://github.com/aws/aws-xray-sdk-go/issues/51 for how
// this leads to goroutine leaks. This forces client code of this SDK to
// ensure that the ctx passed in to it is canceled in a timely manner.
// For the manager implementation here, a new context is created purely
// for passing in to the xray SDK and this context is canceled when
// its Finish method is called. The cancel method is stored in field below.
// The affected code uses 'xraybugctx' as the variable name of the specifically
// created context. This is clearly a bug in the xray SDK and if that ever
// gets fixed then this code can be removed.
cancel func()
}

func newXraySpan(ctx *context.T, cancel func(), vspan vtrace.Span, mgr *manager, seg *xray.Segment, subseg bool) *xrayspan {
return &xrayspan{
Span: vspan,
cancel: cancel,
ctx: ctx,
mgr: mgr,
seg: seg,
subseg: subseg,
}
}

func (xs *xrayspan) Annotate(msg string) {
Expand Down Expand Up @@ -92,6 +115,10 @@ func (xs *xrayspan) AnnotateMetadata(key string, value interface{}, indexed bool
}

func segJSON(seg *xray.Segment) string {
// Need to look the segment to prevent concurrent read/writes to the
// annotations etc.
seg.RLock()
defer seg.RUnlock()
out := strings.Builder{}
enc := json.NewEncoder(&out)
enc.SetIndent(" ", " ")
Expand Down Expand Up @@ -122,49 +149,56 @@ func (xs *xrayspan) Finish(err error) {
}
xseg := xs.seg
xs.mgr.annotateSegment(xseg, xs.subseg)
/* if an := xseg.Annotations; !xs.subseg && xs.mapToHTTP && an != nil {
if xseg.HTTP == nil || xseg.HTTP.Request == nil {
// Only create fake http fields if they have not already been set.
hd := xseg.GetHTTP()
req := hd.GetRequest()
mapAnnotation(an, "name", &req.URL)
mapAnnotation(an, "method", &req.Method)
mapAnnotation(an, "clientAddr", &req.ClientIP)
req.UserAgent = "vanadium"
}
}*/
if xs.subseg {
xseg.CloseAndStream(err)
} else {
xseg.Close(err)
}
xs.ctx.VI(1).Infof("Finish: sampled %v, %v", xseg.Sampled, segJSON(xseg))
if xs.ctx.V(1) {
xs.ctx.Infof("Finish: sampled %v, %v", xseg.Sampled, segJSON(xseg))
}
xs.Span.Finish(err)
if xs.cancel != nil {
xs.cancel()
}
}

// WithNewTrace creates a new vtrace context that is not the child of any
// other span. This is useful when starting operations that are
// disconnected from the activity ctx is performing. For example
// this might be used to start background tasks.
func (m *manager) WithNewTrace(ctx *context.T, name string, sr *vtrace.SamplingRequest) (*context.T, vtrace.Span) {
st := vtrace.GetStore(ctx)
if st == nil {
panic("nil store")
}

id, err := uniqueid.Random()
if err != nil {
ctx.Errorf("vtrace: couldn't generate Trace Id, debug data may be lost: %v", err)
}
newSpan, err := libvtrace.NewSpan(id, id, name, vtrace.GetStore(ctx))

newSpan, err := libvtrace.NewSpan(id, id, name, st)
if err != nil {
ctx.Error(err)
}

if st.Flags(uniqueid.Id{})&vtrace.AWSXRay == 0 {
// The underlying store is not configured to use xray.
return vtrace.WithSpan(ctx, newSpan), newSpan
}

tid := xray.NewTraceID()
hdr := &header.Header{
TraceID: tid,
}
ctx = WithTraceHeader(ctx, hdr)
name = sanitizeName(name)
sampling := translateSamplingHeader(sr)
_, seg := xray.NewSegmentFromHeader(ctx, name, sampling, hdr)
xraybugctx, cancel := context.WithCancel(ctx) // Workaround xray SDK leak.
_, seg := xray.NewSegmentFromHeader(xraybugctx, name, sampling, hdr)
ctx = WithSegment(ctx, seg)
xs := &xrayspan{Span: newSpan, ctx: ctx, mgr: m, seg: seg}
xs := newXraySpan(ctx, cancel, newSpan, m, seg, false)
return vtrace.WithSpan(ctx, xs), xs
}

Expand Down Expand Up @@ -218,6 +252,9 @@ func getTraceID(seg *xray.Segment) string {
return seg.TraceID
}

// newSegment assumes that ctx is created specifically for passing to the
// xray functions and that this ctx will be be canceled when the span is
// finished with.
func newSegment(ctx *context.T, name string, sampling *http.Request) (seg *xray.Segment, sub bool) {
sanitized := sanitizeName(name)
seg = GetSegment(ctx)
Expand Down Expand Up @@ -261,24 +298,33 @@ func (m *manager) WithContinuedTrace(ctx *context.T, name string, sr *vtrace.Sam
ctx.Error(err)
}

if req.Flags&vtrace.AWSXRay == 0 || st.Flags(uniqueid.Id{})&vtrace.AWSXRay == 0 {
// The request or the underlying store is not configured to use xray.
return vtrace.WithSpan(ctx, newSpan), newSpan
}

sampling := translateSamplingHeader(sr)

name = sanitizeName(name)
var seg *xray.Segment
var sub bool
var cancel func()
var xrtaybugctx *context.T
if req.Flags&vtrace.AWSXRay != 0 && len(req.RequestMetadata) > 0 {
reqHdr := string(req.RequestMetadata)
hdr := header.FromString(reqHdr)
ctx = WithTraceHeader(ctx, hdr)
_, seg = xray.NewSegmentFromHeader(ctx, name, sampling, hdr)
xrtaybugctx, cancel = context.WithCancel(ctx)
_, seg = xray.NewSegmentFromHeader(xrtaybugctx, name, sampling, hdr)
ctx.VI(1).Infof("WithContinuedTrace: new seg from header %v / %v: %v", hdr.TraceID, hdr.ParentID, segStr(seg))
} else {
seg, sub = newSegment(ctx, name, sampling)
xrtaybugctx, cancel = context.WithCancel(ctx)
seg, sub = newSegment(xrtaybugctx, name, sampling)
ctx.VI(1).Infof("WithContinuedTrace: new seg: %v", segStr(seg))
}

ctx = WithSegment(ctx, seg)
xs := &xrayspan{Span: newSpan, ctx: ctx, mgr: m, seg: seg, subseg: sub}
xs := newXraySpan(ctx, cancel, newSpan, m, seg, sub)
return vtrace.WithSpan(ctx, xs), xs
}

Expand All @@ -293,10 +339,11 @@ func (m *manager) WithNewSpan(ctx *context.T, name string) (*context.T, vtrace.S
if err != nil {
ctx.Error(err)
}
seg, sub := newSegment(ctx, name, nil)
xraybugctx, cancel := context.WithCancel(ctx)
seg, sub := newSegment(xraybugctx, name, nil)
ctx.VI(1).Infof("WithNewSpan: new seg: %v", segStr(seg))
ctx = WithSegment(ctx, seg)
xs := &xrayspan{Span: newSpan, ctx: ctx, mgr: m, seg: seg, subseg: sub}
xs := newXraySpan(ctx, cancel, newSpan, m, seg, sub)
return vtrace.WithSpan(ctx, xs), xs
}
ctx.Error("vtrace: creating a new child span from context with no existing span.")
Expand Down
49 changes: 45 additions & 4 deletions x/ref/lib/aws/vxray/vxray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"net/http"
"net/http/httptest"
"runtime"
"testing"

"github.com/aws/aws-xray-sdk-go/strategy/sampling"
Expand Down Expand Up @@ -145,7 +146,7 @@ func TestSimple(t *testing.T) {

type simple struct{}

func (s *simple) Ping(_ *context.T, _ rpc.ServerCall) (string, error) {
func (s *simple) Ping(ctx *context.T, _ rpc.ServerCall) (string, error) {
return "pong", nil
}

Expand Down Expand Up @@ -250,11 +251,14 @@ func TestWithRPC(t *testing.T) {

// Run without vxray.
captured.segs = nil

store, _ := libvtrace.NewStore(flags.VtraceFlags{
SampleRate: 1,
DumpOnShutdown: false,
CacheSize: 1024,
EnableAWSXRay: false,
})

clientCtx = vtrace.WithStore(ctx, store)

clientCtx, span, err := issueCall(clientCtx, name)
Expand All @@ -263,10 +267,17 @@ func TestWithRPC(t *testing.T) {
}

if got, want := len(captured.segs), 0; got != want {
t.Fatalf("# segments: got %v, want %v", got, want)
for i, s := range captured.segs {
t.Logf("segments: %v: %v", i, s)
}
t.Errorf("# segments: got %v, want %v", got, want)
}

trace := vtrace.GetStore(clientCtx).TraceRecord(span.Trace())
if got, want := len(trace.Spans), 4; got != want {
for i, s := range trace.Spans {
t.Logf("spans: %v: %v", i, s)
}
t.Fatalf("# spans: got %v, want %v", got, want)
}

Expand Down Expand Up @@ -305,8 +316,11 @@ func TestWithXRayHTTPHandler(t *testing.T) {
defer ts.Close()

res, err := http.Get(ts.URL)
if err != nil || res.StatusCode != 200 {
t.Fatalf("get failed: %v: code %v", err, res.StatusCode)
if err != nil {
t.Fatalf("get failed: %v", err)
}
if res.StatusCode != 200 {
t.Fatalf("get failed: http status code %v", res.StatusCode)
}

if got, want := len(captured.segs), 5; got != want {
Expand All @@ -329,3 +343,30 @@ func TestWithXRayHTTPHandler(t *testing.T) {
checkSeg(t, captured.segs[4], "http.echo.client")

}

func TestGoRoutineLeak(t *testing.T) {
// https://github.com/aws/aws-xray-sdk-go/issues/51 outlines how
// the xray SDK can leak goroutines. The vxray package works around
// this leak and this test is intended to verify that it does so.
ctx, shutdown := v23.Init()
defer shutdown()

captured := &emitter{}

initialGoRoutines := runtime.NumGoroutine()
xrayctx := initXRay(t, ctx, captured)
iterations := 1000
for i := 0; i < iterations; i++ {
sr := &vtrace.SamplingRequest{
Name: fmt.Sprintf("%v", i),
}
_, span := vtrace.WithNewTrace(xrayctx, "test", sr)
span.Finish(nil)
}
runtime.Gosched()
currentGoRoutines := runtime.NumGoroutine() - initialGoRoutines

if currentGoRoutines > iterations/2 {
t.Fatalf("%v running goroutines seems too high", currentGoRoutines)
}
}

0 comments on commit a0f4bd1

Please sign in to comment.