Skip to content

Commit

Permalink
runtime: support streaming traces
Browse files Browse the repository at this point in the history
This adds a new experiment that enables streaming traces
to the platform as they're happening, as opposed to waiting
for the trace to complete.
  • Loading branch information
eandre committed Nov 21, 2023
1 parent 58765d3 commit a52d2b6
Show file tree
Hide file tree
Showing 14 changed files with 347 additions and 87 deletions.
4 changes: 2 additions & 2 deletions pkg/traceparser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,9 @@ func TestParse(t *testing.T) {

for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
log := &trace2.Log{}
log := trace2.NewLog()
tt.Emit(log)
data := log.GetAndClear()
data, _ := log.GetAndClear()
ta := trace2.NewTimeAnchor(0, now)
got, err := ParseEvent(bufio.NewReader(bytes.NewReader(data)), ta)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions runtimes/go/appruntime/apisdk/api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ func TestDescGeneratesTrace(t *testing.T) {
EXPECT().
RequestSpanEnd(gomock.Any()).MaxTimes(1)

traceMock.EXPECT().WaitAndClear().AnyTimes()
traceMock.EXPECT().WaitUntilDone().AnyTimes()
traceMock.EXPECT().MarkDone().MaxTimes(1)

handler.Handle(server.NewIncomingContext(w, req, ps, api.CallMeta{}))

if diff := cmp.Diff(test.want, beginReq, opts...); diff != "" {
Expand Down Expand Up @@ -363,6 +367,9 @@ func TestRawEndpointOverflow(t *testing.T) {
params = append(params, p)
}).AnyTimes()

traceMock.EXPECT().WaitAndClear().MinTimes(1)
traceMock.EXPECT().MarkDone().Times(1)

handler.Handle(server.NewIncomingContext(w, req, ps, api.CallMeta{}))

if len(params) != 2 {
Expand Down
7 changes: 6 additions & 1 deletion runtimes/go/appruntime/exported/experiments/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const (
// AuthDataRoundTrip forces auth data to be round-tripped through the wireformat
// when internal API calls are made.
AuthDataRoundTrip Name = "auth-data-round-trip"

// StreamTraces enables streaming traces to the Encore platform as they're happening,
// as opposed to waiting for the request to finish before starting the upload.
StreamTraces Name = "stream-traces"
)

// Valid reports whether the given name is a known experiment.
Expand All @@ -37,7 +41,8 @@ func (x Name) Valid() bool {
V2,
BetaRuntime,
LocalMultiProcess,
AuthDataRoundTrip:
AuthDataRoundTrip,
StreamTraces:
return true
default:
return false
Expand Down
62 changes: 58 additions & 4 deletions runtimes/go/appruntime/exported/trace2/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@ import (

type EventID = model.TraceEventID

// nextEventID is a atomic counter for event IDs.
// nextEventID is an atomic counter for event IDs.
var nextEventID atomic.Uint64

func NewLog() *Log {
l := &Log{}
l.cond = sync.NewCond(&l.mu)
return l
}

type Log struct {
mu sync.Mutex
data []byte
done bool
cond *sync.Cond
}

// Ensure Log implements Logger.
Expand Down Expand Up @@ -98,17 +106,63 @@ func (l *Log) Add(e Event) EventID {
l.mu.Lock()
l.data = append(l.data, append(header[:], eventData...)...)
l.mu.Unlock()
l.cond.Broadcast()

return EventID(eventID)
}

func (l *Log) WaitUntilDone() {
l.mu.Lock()
for !l.done {
l.cond.Wait()
}
l.mu.Unlock()
}

// WaitAtLeast waits for at least dur to pass or for the log to be done.
// If no trace data is being written it can block for longer than dur.
// It reports whether the trace is done at the time of returning.
func (l *Log) WaitAtLeast(dur time.Duration) (done bool) {
now := time.Now()
l.mu.Lock()
for !l.done && time.Since(now) < dur {
l.cond.Wait()
}
done = l.done
l.mu.Unlock()
return done
}

// WaitAndClear blocks for data to arrive and then returns the data
// and whether the log has been completed. It also clears the log from
// any data it returns.
func (l *Log) WaitAndClear() (data []byte, done bool) {
l.mu.Lock()
for len(l.data) == 0 && !l.done {
l.cond.Wait()
}
done = l.done
data = l.data
l.data = l.data[len(l.data):]
l.mu.Unlock()
return data, done
}

// MarkDone marks the log as done.
func (l *Log) MarkDone() {
l.mu.Lock()
l.done = true
l.mu.Unlock()
l.cond.Broadcast()
}

// GetAndClear gets the data and clears the buffer.
func (l *Log) GetAndClear() []byte {
func (l *Log) GetAndClear() (data []byte, done bool) {
l.mu.Lock()
data := l.data
data, done = l.data, l.done
l.data = l.data[len(l.data):]
l.mu.Unlock()
return data
return data, done
}

// EventBuffer is a performant, low-overhead, growable buffer
Expand Down
9 changes: 8 additions & 1 deletion runtimes/go/appruntime/exported/trace2/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package trace2
import (
"context"
"net/http"
"time"

"encore.dev/appruntime/exported/model"
"encore.dev/appruntime/exported/stack"
Expand All @@ -11,8 +12,14 @@ import (
//go:generate mockgen -source=./logger.go -package=mock_trace -destination ../../shared/traceprovider/mock_trace/mock_trace.go Logger

type Logger interface {
MarkDone()
Add(Event) EventID
GetAndClear() []byte

WaitUntilDone()
WaitAtLeast(time.Duration) bool
GetAndClear() (data []byte, done bool)
WaitAndClear() (data []byte, done bool)

RequestSpanStart(req *model.Request, goid uint32)
RequestSpanEnd(params RequestSpanEndParams)
AuthSpanStart(req *model.Request, goid uint32)
Expand Down
40 changes: 4 additions & 36 deletions runtimes/go/appruntime/shared/platform/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,27 @@
package platform

import (
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
"net/http"
"strconv"
"time"

"encore.dev/appruntime/exported/config"
"encore.dev/appruntime/exported/trace2"
"encore.dev/appruntime/exported/experiments"
)

func NewClient(static *config.Static, rt *config.Runtime) *Client {
return &Client{static, rt}
exp := experiments.FromConfig(static, rt)
return &Client{static, rt, exp}
}

type Client struct {
static *config.Static
runtime *config.Runtime
}

func (c *Client) SendTrace(ctx context.Context, data io.Reader) error {
req, err := http.NewRequestWithContext(ctx, "POST", c.runtime.TraceEndpoint, data)
if err != nil {
return err
}

ta, err := trace2.NewTimeAnchorNow().MarshalText()
if err != nil {
return err
}

req.Header.Set("X-Encore-App-ID", c.runtime.AppID)
req.Header.Set("X-Encore-Env-ID", c.runtime.EnvID)
req.Header.Set("X-Encore-Deploy-ID", c.runtime.DeployID)
req.Header.Set("X-Encore-App-Commit", c.static.AppCommit.AsRevisionString())
req.Header.Set("X-Encore-Trace-Version", strconv.Itoa(int(trace2.CurrentVersion)))
req.Header.Set("X-Encore-Trace-TimeAnchor", string(ta))
c.addAuthKey(req)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("http %s: %s", resp.Status, body)
}
return nil
exp *experiments.Set
}

func (c *Client) addAuthKey(req *http.Request) {
Expand Down
131 changes: 131 additions & 0 deletions runtimes/go/appruntime/shared/platform/streaming_trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package platform

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"strconv"
"time"

"encore.dev/appruntime/exported/experiments"
"encore.dev/appruntime/exported/trace2"
)

func (c *Client) StreamTrace(log trace2.Logger) error {
if experiments.StreamTraces.Enabled(c.exp) {
return c.streamingTrace(log)
} else {
return c.blockingTrace(log)
}
}

// streamingTrace streams a trace to the platform.
func (c *Client) streamingTrace(log trace2.Logger) error {
// Wait a bit for the trace to start, so we can avoid the overhead
// of small chunk streaming if the trace is short.
done := log.WaitAtLeast(1 * time.Second)
var body io.Reader
if done {
data, _ := log.GetAndClear()
if len(data) == 0 {
return nil
}

// Use a bytes.Reader so net/http knows the Content-Length.
body = bytes.NewReader(data)
} else {
r := &traceLogReader{log: log}
if r.IsDoneAndEmpty() {
// We didn't get any trace data; don't bother streaming.
return nil
}
body = r
}

// Use a background context since the trace is streaming,
// and we don't know how long it will take to complete.
ctx := context.Background()
return c.sendTraceRequest(ctx, body)
}

// blockingTrace waits for the trace to complete before sending it.
func (c *Client) blockingTrace(log trace2.Logger) error {
// Wait for the trace to complete
log.WaitUntilDone()
data, _ := log.GetAndClear()
if len(data) == 0 {
return nil // optimization
}
body := bytes.NewReader(data)

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
return c.sendTraceRequest(ctx, body)
}

func (c *Client) sendTraceRequest(ctx context.Context, body io.Reader) error {
req, err := http.NewRequestWithContext(ctx, "POST", c.runtime.TraceEndpoint, body)
if err != nil {
return err
}

ta, err := trace2.NewTimeAnchorNow().MarshalText()
if err != nil {
return err
}

req.Header.Set("X-Encore-App-ID", c.runtime.AppID)
req.Header.Set("X-Encore-Env-ID", c.runtime.EnvID)
req.Header.Set("X-Encore-Deploy-ID", c.runtime.DeployID)
req.Header.Set("X-Encore-App-Commit", c.static.AppCommit.AsRevisionString())
req.Header.Set("X-Encore-Trace-Version", strconv.Itoa(int(trace2.CurrentVersion)))
req.Header.Set("X-Encore-Trace-TimeAnchor", string(ta))
c.addAuthKey(req)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("http %s: %s", resp.Status, body)
}
return nil
}

// traceLogReader implements io.Reader by reading from a trace log.
type traceLogReader struct {
log trace2.Logger
data []byte // current buffer
done bool
}

func (r *traceLogReader) Read(b []byte) (int, error) {
r.readMoreIfNeeded()
// Post-condition: we have data, or we're done (or both).

if len(r.data) > 0 {
n := copy(b, r.data)
r.data = r.data[n:]
return n, nil
} else {
return 0, io.EOF
}
}

// IsDoneAndEmpty blocks until we have some trace data, and then
// reports whether the trace is done and empty.
func (r *traceLogReader) IsDoneAndEmpty() bool {
r.readMoreIfNeeded()
return len(r.data) == 0 && r.done
}

func (r *traceLogReader) readMoreIfNeeded() {
for len(r.data) == 0 && !r.done {
r.data, r.done = r.log.WaitAndClear()
}
// Post-condition: we have data, or we're done (or both).
}
Loading

0 comments on commit a52d2b6

Please sign in to comment.