Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#429 issue - OpenTrace zipKin Support #472

Merged
merged 14 commits into from
Oct 29, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {
Metrics Metrics
UI UI
Runtime Runtime
Tracing Tracing
ProfileMode string
ProfilePath string
Insecure bool
Expand Down Expand Up @@ -144,3 +145,13 @@ type Consul struct {
CheckDeregisterCriticalServiceAfter string
ChecksRequired string
}

type Tracing struct {
TracingEnabled bool
CollectorType string
ConnectString string
ServiceName string
Topic string
SamplerRate float64
SpanHost string
}
10 changes: 10 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,14 @@ var defaultConfig = &Config{
Color: "light-green",
Access: "rw",
},

Tracing: Tracing{
TracingEnabled: false,
CollectorType: "http",
ConnectString: "http://localhost:9411/api/v1/spans",
ServiceName: "Fabiolb",
Topic: "Fabiolb-Kafka-Topic",
SamplerRate: -1,
SpanHost: "localhost:9998",
},
}
7 changes: 7 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.StringVar(&cfg.UI.Title, "ui.title", defaultConfig.UI.Title, "optional title for the UI")
f.StringVar(&cfg.ProfileMode, "profile.mode", defaultConfig.ProfileMode, "enable profiling mode, one of [cpu, mem, mutex, block]")
f.StringVar(&cfg.ProfilePath, "profile.path", defaultConfig.ProfilePath, "path to profile dump file")
f.BoolVar(&cfg.Tracing.TracingEnabled, "tracing.TracingEnabled", defaultConfig.Tracing.TracingEnabled, "Enable/Disable OpenTrace, one of [true, false]")
f.StringVar(&cfg.Tracing.CollectorType, "tracing.CollectorType", defaultConfig.Tracing.CollectorType, "OpenTrace Collector Type, one of [http, kafka]")
f.StringVar(&cfg.Tracing.ConnectString, "tracing.ConnectString", defaultConfig.Tracing.ConnectString, "OpenTrace Collector host:port")
f.StringVar(&cfg.Tracing.ServiceName, "tracing.ServiceName", defaultConfig.Tracing.ServiceName, "Service name to embed in OpenTrace span")
f.StringVar(&cfg.Tracing.Topic, "tracing.Topic", defaultConfig.Tracing.Topic, "OpenTrace Collector Kafka Topic")
f.Float64Var(&cfg.Tracing.SamplerRate, "tracing.SamplerRate", defaultConfig.Tracing.SamplerRate, "OpenTrace sample rate percentage in decimal form")
f.StringVar(&cfg.Tracing.SpanHost, "tracing.SpanHost", defaultConfig.Tracing.SpanHost, "Host:Port info to add to spans")

// deprecated flags
var proxyLogRoutes string
Expand Down
59 changes: 59 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -994,3 +994,62 @@
# The default is
#
# ui.title =


# Open Trace Configuration Currently supports ZipKin Collector
# tracing.TracingEnabled enables/disables Open Tracing in Fabio. Bool value true/false
#
# The default is
#
# tracing.TracingEnabled = false

# tracing.CollectorType sets what type of collector is used.
# Currently only two types are supported http and kafka
#
# http: sets collector type to http tracing.ConnectString must also be set
# kafka: sets collector type to emit via kafka. tracing.Topic must also be set
#
# The default is
#
# tracing.CollectorType = http

# tracing.ConnectString sets the connection string per connection type.
# If tracing.CollectorType = http tracing.ConnectString should be
# http://URL:PORT where URL is the URL of your collector and PORT is the TCP Port
# it is listening on
#
# If tracing.CollectorType = kafka tracing.ConnectString should be
# HOSTNAME:PORT of your kafka broker
# tracing.Topic must also be set
#
# The default is
#
# tracing.ConnectString = http://localhost:9411/api/v1/spans

# tracing.ServiceName sets the service name used in reporting span information
#
# The default is
#
# tracing.ServiceName = Fabiolb

# tracing.Topic sets the Topic String used if tracing.CollectorType is kafka and
# tracing.ConnectSting is set to a kafka broker
#
# The default is
#
# tracing.Topic = Fabiolb-Kafka-Topic

# tracing.SamplerRate is the rate at which opentrace span data will be collected and sent
# If SamplerRate is <= 0 Never sample
# If SamplerRate is >= 1.0 always sample
# Values between 0 and 1 will be the percentage in decimal form
# Example a value of .50 will be 50% sample rate
#
# The default is
# tracing.SamplerRate = -1

# tracing.SpanHost sets host information.
# This is used to specify additional information when sending spans to a collector
#
# The default is
# tracing.SpanHost = localhost:9998
21 changes: 18 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/fabiolb/fabio/registry/file"
"github.com/fabiolb/fabio/registry/static"
"github.com/fabiolb/fabio/route"
"github.com/fabiolb/fabio/trace"
"github.com/pkg/profile"
dmp "github.com/sergi/go-diff/diffmatchpatch"
)
Expand Down Expand Up @@ -118,6 +119,9 @@ func main() {
initMetrics(cfg)
initRuntime(cfg)
initBackend(cfg)
//Init OpenTracing if Enabled in the Properties File Tracing.TracingEnabled
initOpenTracing(cfg)

startAdmin(cfg)

go watchNoRouteHTML(cfg)
Expand Down Expand Up @@ -192,9 +196,10 @@ func newHTTPProxy(cfg *config.Config) http.Handler {
}
return t
},
Requests: metrics.DefaultRegistry.GetTimer("requests"),
Noroute: metrics.DefaultRegistry.GetCounter("notfound"),
Logger: l,
Requests: metrics.DefaultRegistry.GetTimer("requests"),
Noroute: metrics.DefaultRegistry.GetCounter("notfound"),
Logger: l,
TracerCfg: cfg.Tracing,
}
}

Expand Down Expand Up @@ -379,6 +384,16 @@ func initBackend(cfg *config.Config) {
}
}

// OpenTracing Init
func initOpenTracing(cfg *config.Config) {
// If fabio.properties file has tracing.TracingEnabled set to true the init tracing
if cfg.Tracing.TracingEnabled {
trace.InitializeTracer(cfg.Tracing.CollectorType, cfg.Tracing.ConnectString, cfg.Tracing.ServiceName, cfg.Tracing.Topic, cfg.Tracing.SamplerRate, cfg.Tracing.SpanHost)

}

}

func watchBackend(cfg *config.Config, first chan bool) {
var (
last string
Expand Down
11 changes: 11 additions & 0 deletions proxy/http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/fabiolb/fabio/noroute"
"github.com/fabiolb/fabio/proxy/gzip"
"github.com/fabiolb/fabio/route"
"github.com/fabiolb/fabio/trace"
"github.com/fabiolb/fabio/uuid"
)

Expand Down Expand Up @@ -53,6 +54,9 @@ type HTTPProxy struct {
// Logger is the access logger for the requests.
Logger logger.Logger

// TracerCfg is the Open Tracing configuration as provided during startup
TracerCfg config.Tracing

// UUID returns a unique id in uuid format.
// If UUID is nil, uuid.NewUUID() is used.
UUID func() string
Expand All @@ -71,6 +75,10 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Header.Set(p.Config.RequestID, id())
}

//Create Span
span := trace.CreateSpan(r, p.TracerCfg.ServiceName)
defer span.Finish()

t := p.Lookup(r)
if t == nil {
status := p.Config.NoRouteStatus
Expand Down Expand Up @@ -144,6 +152,9 @@ func (p *HTTPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

//Add OpenTrace Headers to response
trace.InjectHeaders(span, r)

upgrade, accept := r.Header.Get("Upgrade"), r.Header.Get("Accept")

tr := p.Transport
Expand Down
93 changes: 93 additions & 0 deletions trace/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package trace

import (
"log"
"net/http"
"os"
"strings"

opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
zipkin "github.com/openzipkin/zipkin-go-opentracing"
)

func InjectHeaders(span opentracing.Span, req *http.Request) {
// Inject span data into the request headers
opentracing.GlobalTracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header),
)
}

func CreateCollector(collectorType string, connectString string, topic string) zipkin.Collector {
var collector zipkin.Collector
var err error

if collectorType == "http" {
collector, err = zipkin.NewHTTPCollector(connectString)
} else if collectorType == "kafka" {
// TODO set logger?
kafkaHosts := strings.Split(connectString, ",")
collector, err = zipkin.NewKafkaCollector(
kafkaHosts,
zipkin.KafkaTopic(topic),
)
}

if err != nil {
log.Printf("Unable to create Zipkin %s collector: %+v", collectorType, err)
os.Exit(-1)
}

return collector
}

func CreateTracer(recorder zipkin.SpanRecorder, samplerRate float64) opentracing.Tracer {
tracer, err := zipkin.NewTracer(
recorder,
zipkin.WithSampler(zipkin.NewBoundarySampler(samplerRate, 1)),
zipkin.ClientServerSameSpan(false),
zipkin.TraceID128Bit(true),
)

if err != nil {
log.Printf("Unable to create Zipkin tracer: %+v", err)
os.Exit(-1)
}

return tracer
}

func CreateSpan(r *http.Request, serviceName string) opentracing.Span {
globalTracer := opentracing.GlobalTracer()

// If headers contain trace data, create child span from parent; else, create root span
var span opentracing.Span
if globalTracer != nil {
spanCtx, err := globalTracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header))
if err != nil {
span = globalTracer.StartSpan(serviceName)
} else {
span = globalTracer.StartSpan(serviceName, ext.RPCServerOption(spanCtx))
}
}

return span // caller must defer span.finish()
}

func InitializeTracer(collectorType string, connectString string, serviceName string, topic string, samplerRate float64, addressPort string) {
log.Printf("Tracing initializing - type: %s, connection string: %s, service name: %s, topic: %s, samplerRate: %v", collectorType, connectString, serviceName, topic, samplerRate)

// Create a new Zipkin Collector, Recorder, and Tracer
collector := CreateCollector(collectorType, connectString, topic)
recorder := zipkin.NewRecorder(collector, false, addressPort, serviceName)
tracer := CreateTracer(recorder, samplerRate)

// Set the Zipkin Tracer created above to the GlobalTracer
opentracing.SetGlobalTracer(tracer)

log.Printf("\n\nTRACER: %v\n\n", tracer)
log.Printf("\n\nCOLLECTOR: %v\n\n", collector)
log.Printf("\n\nRECORDER: %v\n\n", recorder)
}
54 changes: 54 additions & 0 deletions trace/trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package trace

import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
mocktracer "github.com/opentracing/opentracing-go/mocktracer"
"net/http"
"testing"
)

func mimicTracerInject(req *http.Request) {
// TODO maybe replace this will a call to opentracing.GlobalTracer().Inject()
req.Header.Add("X-B3-TraceId", "1234562345678")
req.Header.Add("X-B3-SpanId", "123456789")
req.Header.Add("X-B3-ParentSpanId", "123456789")
req.Header.Add("X-B3-Flags", "1")
}

// go test -v ./trace
func TestInjectHeaders(t *testing.T) {
serviceName := "TESTING"

req, err := http.NewRequest("GET", "http://example.com", nil)
if err != nil {
t.Error("Error when creating new request.")
t.Fail()
}
mimicTracerInject(req)

mt := mocktracer.New()
opentracing.SetGlobalTracer(mt)
globalTracer := opentracing.GlobalTracer()

var span opentracing.Span
if globalTracer != nil {
spanCtx, err := globalTracer.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header))
if err != nil {
span = globalTracer.StartSpan(serviceName)
} else {
span = globalTracer.StartSpan(serviceName, ext.RPCServerOption(spanCtx))
}
}

InjectHeaders(span, req)

if req.Header.Get("X-B3-Traceid") == "" {
t.Error("Zipkin headers not set in request.")
t.Fail()
}
if req.Header.Get("X-B3-Traceid") != "1234562345678" {
t.Error("Zipkin headers do not match the values set.")
t.Fail()
}
}
Loading