Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added client package to help identify RPC/HTTP clients #326

Merged
merged 1 commit into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package client contains generic representations of clients connecting to different receivers
package client

import (
"context"
"net"
"net/http"

"google.golang.org/grpc/peer"
)

type ctxKey struct{}

// Client represents a generic client that sends data to any receiver supported by the OT receiver
type Client struct {
IP string
}

// NewContext takes an existing context and derives a new context with the client value stored on it
func NewContext(ctx context.Context, c *Client) context.Context {
return context.WithValue(ctx, ctxKey{}, c)
}

// FromContext takes a context and returns a Client value from it, if present.
func FromContext(ctx context.Context) (*Client, bool) {
c, ok := ctx.Value(ctxKey{}).(*Client)
return c, ok
}

// FromGRPC takes a GRPC context and tries to extract client information from it
func FromGRPC(ctx context.Context) (*Client, bool) {
if p, ok := peer.FromContext(ctx); ok {
ip := parseIP(p.Addr.String())
if ip != "" {
return &Client{ip}, true
}
}
return nil, false
}

// FromHTTP takes a net/http Request object and tries to extract client information from it
func FromHTTP(r *http.Request) (*Client, bool) {
ip := parseIP(r.RemoteAddr)
if ip == "" {
return nil, false
}
return &Client{ip}, true
}

func parseIP(source string) string {
ipstr, _, err := net.SplitHostPort(source)
if err == nil {
return ipstr
}
ip := net.ParseIP(source)
if ip != nil {
return ip.String()
}
return ""
}
60 changes: 60 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package client contains generic representations of clients connecting to different receivers
package client

import (
"context"
"net"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc/peer"
)

func TestClientContext(t *testing.T) {
ips := []string{
"1.1.1.1", "127.0.0.1", "1111", "ip",
}
for _, ip := range ips {
ctx := NewContext(context.Background(), &Client{ip})
c, ok := FromContext(ctx)
assert.True(t, ok)
assert.NotNil(t, c)
assert.Equal(t, c.IP, ip)
}
}

func TestParsingGRPC(t *testing.T) {
grpcCtx := peer.NewContext(context.Background(), &peer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP("192.168.1.1"),
Port: 80,
},
})

client, ok := FromGRPC(grpcCtx)
assert.True(t, ok)
assert.NotNil(t, client)
assert.Equal(t, client.IP, "192.168.1.1")
}

func TestParsingHTTP(t *testing.T) {
client, ok := FromHTTP(&http.Request{RemoteAddr: "192.168.1.2"})
assert.True(t, ok)
assert.NotNil(t, client)
assert.Equal(t, client.IP, "192.168.1.2")
}
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/open-telemetry/opentelemetry-service v0.2.0 h1:sesxaZ+IuDVCOOORGykV+xkzZwkoluJxXSqdxvo252E=
github.com/opentracing-contrib/go-stdlib v0.0.0-20190519235532-cf7a6c988dc9/go.mod h1:PLldrQSroqzH70Xl+1DQcGnefIbqsKR7UDaiux3zV+w=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
Expand Down
5 changes: 5 additions & 0 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector/client"
"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/observability"
Expand Down Expand Up @@ -353,6 +354,10 @@ func (jr *jReceiver) GetBaggageRestrictions(serviceName string) ([]*baggage.Bagg
}

func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
if c, ok := client.FromGRPC(ctx); ok {
ctx = client.NewContext(ctx, c)
}

ctxWithReceiverName := observability.ContextWithReceiverName(ctx, collectorReceiverTagValue)

td, err := jaegertranslator.ProtoBatchToOCProto(r.Batch)
Expand Down
8 changes: 7 additions & 1 deletion receiver/opencensusreceiver/octrace/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-collector/client"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
"github.com/open-telemetry/opentelemetry-collector/observability"
Expand Down Expand Up @@ -141,8 +142,13 @@ func (ocr *Receiver) sendToNextConsumer(longLivedCtx context.Context, tracedata
return nil
}

ctx := context.Background()
if c, ok := client.FromGRPC(longLivedCtx); ok {
ctx = client.NewContext(ctx, c)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the long live ctx be used as an input here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

longLivedCtx is bound to the stream handler loop and remains active for the lifetime of the connection. The new context is used per request (incoming batch) to trace handling of each batch as a single independent request. This is not a new change and continues to work as before.

}

// Trace this method
ctx, span := trace.StartSpan(context.Background(), "OpenCensusTraceReceiver.Export")
ctx, span := trace.StartSpan(ctx, "OpenCensusTraceReceiver.Export")
defer span.End()

// If the starting RPC has a parent span, then add it as a parent link.
Expand Down
7 changes: 6 additions & 1 deletion receiver/zipkinreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
zipkinproto "github.com/openzipkin/zipkin-go/proto/v2"
"go.opencensus.io/trace"

"github.com/open-telemetry/opentelemetry-collector/client"
"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
Expand Down Expand Up @@ -295,8 +296,12 @@ const (
// The ZipkinReceiver receives spans from endpoint /api/v2 as JSON,
// unmarshals them and sends them along to the nextConsumer.
func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Trace this method
parentCtx := r.Context()
if c, ok := client.FromHTTP(r); ok {
parentCtx = client.NewContext(parentCtx, c)
}

// Trace this method
ctx, span := trace.StartSpan(parentCtx, "ZipkinReceiver.Export")
defer span.End()

Expand Down