Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
cmd/trace-agent: use http.Server.Shutdown and context.Context
Browse files Browse the repository at this point in the history
  • Loading branch information
gbbr committed Mar 5, 2018
1 parent 1c25633 commit 0c0754f
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 53 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ install:

ci:
# task used by CI
GOOS=windows go build ./cmd/trace-agent # ensure windows builds
go get -u github.com/golang/lint/golint/...
golint ./cmd/trace-agent ./filters ./fixtures ./info ./quantile ./quantizer ./sampler ./statsd ./watchdog ./writer
go test ./...
Expand Down
16 changes: 10 additions & 6 deletions cmd/trace-agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"sync/atomic"
"time"

Expand Down Expand Up @@ -57,13 +58,14 @@ type Agent struct {
dynConf *config.DynamicConfig

// Used to synchronize on a clean exit
exit chan struct{}
ctx context.Context

die func(format string, args ...interface{})
}

// NewAgent returns a new Agent object, ready to be started
func NewAgent(conf *config.AgentConfig, exit chan struct{}) *Agent {
// NewAgent returns a new Agent object, ready to be started. It takes a context
// which may be cancelled in order to gracefully stop the agent.
func NewAgent(ctx context.Context, conf *config.AgentConfig) *Agent {
dynConf := config.NewDynamicConfig()

// inter-component channels
Expand Down Expand Up @@ -107,7 +109,7 @@ func NewAgent(conf *config.AgentConfig, exit chan struct{}) *Agent {
sampledTraceChan: sampledTraceChan,
conf: conf,
dynConf: dynConf,
exit: exit,
ctx: ctx,
die: die,
}
}
Expand Down Expand Up @@ -140,9 +142,11 @@ func (a *Agent) Run() {
a.Process(t)
case <-watchdogTicker.C:
a.watchdog()
case <-a.exit:
case <-a.ctx.Done():
log.Info("exiting")
close(a.Receiver.exit)
if err := a.Receiver.Stop(); err != nil {
log.Error(err)
}
a.Concentrator.Stop()
a.TraceWriter.Stop()
a.StatsWriter.Stop()
Expand Down
20 changes: 12 additions & 8 deletions cmd/trace-agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"net/http"
"runtime"
Expand Down Expand Up @@ -31,14 +32,15 @@ func TestWatchdog(t *testing.T) {
defaultMux := http.DefaultServeMux
http.DefaultServeMux = http.NewServeMux()

exit := make(chan struct{})
agent := NewAgent(conf, exit)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
agent := NewAgent(ctx, conf)

defer func() {
close(agent.exit)
cancelFunc()
// We need to manually close the receiver as the Run() func
// should have been broken and interrupted by the watchdog panic
close(agent.Receiver.exit)
agent.Receiver.Stop()
// we need to wait more than on second (time for StoppableListener.Accept
// to acknowledge the connection has been closed)
time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -129,8 +131,9 @@ func BenchmarkAgentTraceProcessingWithWorstCaseFiltering(b *testing.B) {
}

func runTraceProcessingBenchmark(b *testing.B, c *config.AgentConfig) {
exit := make(chan struct{})
agent := NewAgent(c, exit)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
agent := NewAgent(ctx, c)
log.UseLogger(log.Disabled)

b.ResetTimer()
Expand All @@ -143,8 +146,9 @@ func runTraceProcessingBenchmark(b *testing.B, c *config.AgentConfig) {
func BenchmarkWatchdog(b *testing.B) {
conf := config.NewDefaultAgentConfig()
conf.APIKey = "apikey_2"
exit := make(chan struct{})
agent := NewAgent(conf, exit)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
agent := NewAgent(ctx, conf)

b.ResetTimer()
b.ReportAllocs()
Expand Down
25 changes: 7 additions & 18 deletions cmd/trace-agent/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,27 @@ import (
log "github.com/cihub/seelog"
)

// StoppableListener wraps a regular TCPListener with an exit channel so we can exit cleanly from the Serve() loop of our HTTP server
type StoppableListener struct {
exit chan struct{}
// RateLimitedListener wraps a regular TCPListener with an exit channel so we can exit cleanly from the Serve() loop of our HTTP server
type RateLimitedListener struct {
connLease int32 // How many connections are available for this listener before rate-limiting kicks in
*net.TCPListener
}

// NewStoppableListener returns a new wrapped listener, which is non-initialized
func NewStoppableListener(l net.Listener, exit chan struct{}, conns int) (*StoppableListener, error) {
// NewRateLimitedListener returns a new wrapped listener, which is non-initialized
func NewRateLimitedListener(l net.Listener, conns int) (*RateLimitedListener, error) {
tcpL, ok := l.(*net.TCPListener)

if !ok {
return nil, errors.New("cannot wrap listener")
}

sl := &StoppableListener{exit: exit, connLease: int32(conns), TCPListener: tcpL}
sl := &RateLimitedListener{connLease: int32(conns), TCPListener: tcpL}

return sl, nil
}

// Refresh periodically refreshes the connection lease, and thus cancels any rate limits in place
func (sl *StoppableListener) Refresh(conns int) {
func (sl *RateLimitedListener) Refresh(conns int) {
for range time.Tick(30 * time.Second) {
atomic.StoreInt32(&sl.connLease, int32(conns))
log.Debugf("Refreshed the connection lease: %d conns available", conns)
Expand All @@ -51,7 +50,7 @@ func (e *RateLimitedError) Temporary() bool { return true }
func (e *RateLimitedError) Timeout() bool { return false }

// Accept reimplements the regular Accept but adds a check on the exit channel and returns if needed
func (sl *StoppableListener) Accept() (net.Conn, error) {
func (sl *RateLimitedListener) Accept() (net.Conn, error) {
if atomic.LoadInt32(&sl.connLease) <= 0 {
// we've reached our cap for this lease period, reject the request
return nil, &RateLimitedError{}
Expand All @@ -63,16 +62,6 @@ func (sl *StoppableListener) Accept() (net.Conn, error) {

newConn, err := sl.TCPListener.Accept()

//Check for the channel being closed
select {
case <-sl.exit:
log.Debug("stopping listener")
sl.TCPListener.Close()
return nil, errors.New("listener stopped")
default:
//If the channel is still open, continue as normal
}

if err != nil {
netErr, ok := err.(net.Error)

Expand Down
9 changes: 5 additions & 4 deletions cmd/trace-agent/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"math/rand"
"os"
Expand All @@ -24,14 +25,14 @@ import (
)

// handleSignal closes a channel to exit cleanly from routines
func handleSignal(exit chan struct{}) {
func handleSignal(onCancel func()) {
sigChan := make(chan os.Signal, 10)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
for signo := range sigChan {
switch signo {
case syscall.SIGINT, syscall.SIGTERM:
log.Infof("received signal %d (%v)", signo, signo)
close(exit)
onCancel()
return
default:
log.Warnf("unhandled signal %d (%v)", signo, signo)
Expand Down Expand Up @@ -71,7 +72,7 @@ to your datadog.conf file.
Exiting.`

// runAgent is the entrypoint of our code
func runAgent(exit chan struct{}) {
func runAgent(ctx context.Context) {
// configure a default logger before anything so we can observe initialization
if opts.info || opts.version {
log.UseLogger(log.Disabled)
Expand Down Expand Up @@ -205,7 +206,7 @@ func runAgent(exit chan struct{}) {
// Seed rand
rand.Seed(time.Now().UTC().UnixNano())

agent := NewAgent(agentConf, exit)
agent := NewAgent(ctx, agentConf)

log.Infof("trace-agent running on host %s", agentConf.HostName)
agent.Run()
Expand Down
7 changes: 4 additions & 3 deletions cmd/trace-agent/main_nix.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"flag"
_ "net/http/pprof"

Expand All @@ -28,13 +29,13 @@ func init() {

// main is the main application entry point
func main() {
exit := make(chan struct{})
ctx, cancelFunc := context.WithCancel(context.Background())

// Handle stops properly
go func() {
defer watchdog.LogOnPanic()
handleSignal(exit)
handleSignal(cancelFunc)
}()

runAgent(exit)
runAgent(ctx)
}
13 changes: 7 additions & 6 deletions cmd/trace-agent/main_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (m *myservice) Execute(args []string, r <-chan svc.ChangeRequest, changes c
changes <- svc.Status{State: svc.StartPending}
changes <- svc.Status{State: svc.Running, Accepts: cmdsAccepted}

exit := make(chan struct{})
ctx, cancelFunc := context.WithCancel(context.Background())

go func() {
for {
Expand All @@ -75,7 +76,7 @@ func (m *myservice) Execute(args []string, r <-chan svc.ChangeRequest, changes c
case svc.Stop, svc.Shutdown:
elog.Info(0x40000006, ServiceName)
changes <- svc.Status{State: svc.StopPending}
close(exit)
cancelFunc()
return
default:
elog.Warning(0xc000000A, string(c.Cmd))
Expand All @@ -84,7 +85,7 @@ func (m *myservice) Execute(args []string, r <-chan svc.ChangeRequest, changes c
}
}()
elog.Info(0x40000003, ServiceName)
runAgent(exit)
runAgent(ctx)

changes <- svc.Status{State: svc.Stopped}
return
Expand Down Expand Up @@ -173,15 +174,15 @@ func main() {

// if we are an interactive session, then just invoke the agent on the command line.

exit := make(chan struct{})
ctx, cancelFunc := context.WithCancel(context.Background())
// Handle stops properly
go func() {
defer watchdog.LogOnPanic()
handleSignal(exit)
handleSignal(cancelFunc)
}()

// Invoke the Agent
runAgent(exit)
runAgent(ctx)
}

func startService() error {
Expand Down
18 changes: 11 additions & 7 deletions cmd/trace-agent/receiver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -58,12 +59,11 @@ type HTTPReceiver struct {
services chan model.ServicesMetadata
conf *config.AgentConfig
dynConf *config.DynamicConfig
server *http.Server

stats *info.ReceiverStats
preSampler *sampler.PreSampler

exit chan struct{}

maxRequestBodyLength int64
debug bool
}
Expand All @@ -78,7 +78,6 @@ func NewHTTPReceiver(
dynConf: dynConf,
stats: info.NewReceiverStats(),
preSampler: sampler.NewPreSampler(conf.PreSampleRate),
exit: make(chan struct{}),

traces: traces,
services: services,
Expand Down Expand Up @@ -128,8 +127,7 @@ func (r *HTTPReceiver) Listen(addr, logExtra string) error {
return fmt.Errorf("cannot listen on %s: %v", addr, err)
}

stoppableListener, err := NewStoppableListener(listener, r.exit,
r.conf.ConnectionLimit)
stoppableListener, err := NewRateLimitedListener(listener, r.conf.ConnectionLimit)
if err != nil {
return fmt.Errorf("cannot create stoppable listener: %v", err)
}
Expand All @@ -139,7 +137,7 @@ func (r *HTTPReceiver) Listen(addr, logExtra string) error {
timeout = time.Duration(r.conf.ReceiverTimeout) * time.Second
}

server := http.Server{
r.server = &http.Server{
ReadTimeout: time.Second * time.Duration(timeout),
WriteTimeout: time.Second * time.Duration(timeout),
}
Expand All @@ -152,12 +150,18 @@ func (r *HTTPReceiver) Listen(addr, logExtra string) error {
}()
go func() {
defer watchdog.LogOnPanic()
server.Serve(stoppableListener)
r.server.Serve(stoppableListener)
}()

return nil
}

func (r *HTTPReceiver) Stop() error {
expiry := time.Now().Add(20 * time.Second) // give it 20 seconds
ctx, _ := context.WithDeadline(context.Background(), expiry)
return r.server.Shutdown(ctx)
}

func (r *HTTPReceiver) httpHandle(fn http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
req.Body = model.NewLimitedReader(req.Body, r.maxRequestBodyLength)
Expand Down
2 changes: 1 addition & 1 deletion cmd/trace-agent/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestReceiverRequestBodyLength(t *testing.T) {
go receiver.Run()

defer func() {
close(receiver.exit)
receiver.Stop()
// we need to wait more than on second (time for StoppableListener.Accept
// to acknowledge the connection has been closed)
time.Sleep(2 * time.Second)
Expand Down

0 comments on commit 0c0754f

Please sign in to comment.