Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add OTLP log endpoints (gRPC & HTTP) #1187

Merged
merged 22 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
70a67e9
add otlp logs router
puckpuck Feb 16, 2024
0272231
add otlp common functions
puckpuck Feb 16, 2024
db26a0e
add otlp common functions
puckpuck Feb 16, 2024
41b61a9
add otlp common router functionality
puckpuck Feb 16, 2024
a7e7dc2
clean-up after chery picking
MikeGoldsmith May 23, 2024
e091a6a
add TODO for skipping root span check
MikeGoldsmith May 23, 2024
05bc6ac
fix: Clean up all the things, even if redis doesn't know about it (#1…
kentquirk May 23, 2024
7c9800a
add logs HTTP routes to muxxer
MikeGoldsmith May 23, 2024
398ac8b
make logs handler mirror traces, move shared to common file
MikeGoldsmith May 23, 2024
cb4025f
move processOTLPRequest to router
MikeGoldsmith May 23, 2024
010f89d
add otlp logs tests
MikeGoldsmith May 23, 2024
62ee1f9
clean up tests
MikeGoldsmith May 23, 2024
b084664
clean-up after cherry-pick
MikeGoldsmith Jun 5, 2024
be73e96
add missing API validation check trace & logs OTLP handler
MikeGoldsmith Jun 5, 2024
f5d37dc
add tests for gRPC invalid keys
MikeGoldsmith Jun 5, 2024
4810a00
update invalid API key to return grpc error with msg
MikeGoldsmith Jun 5, 2024
d75a377
Merge branch '2.X-work-branch' into mike/2.x-otlp-logs
MikeGoldsmith Jun 11, 2024
62bd990
move logserveer outside of tests
MikeGoldsmith Jun 11, 2024
6d8c7b6
add test to verify logs with trace ID are added to collector
MikeGoldsmith Jun 11, 2024
c2fc208
add test for log without trace ID
MikeGoldsmith Jun 11, 2024
03ed976
flush mock collector after each test
MikeGoldsmith Jun 11, 2024
2a6803f
use different names for otlp traces and logs mux route names
MikeGoldsmith Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions collect/mockCollector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package collect

import (
"github.com/honeycombio/refinery/types"
)

type MockCollector struct {
Spans chan *types.Span
}

func NewMockCollector() *MockCollector {
return &MockCollector{
Spans: make(chan *types.Span, 100),
}
}

func (m *MockCollector) AddSpan(span *types.Span) error {
m.Spans <- span
return nil
}

func (m *MockCollector) AddSpanFromPeer(span *types.Span) error {
m.Spans <- span
return nil
}

func (m *MockCollector) GetStressedSampleRate(traceID string) (rate uint, keep bool, reason string) {
return 0, false, ""
}

func (m *MockCollector) ProcessSpanImmediately(sp *types.Span, keep bool, sampleRate uint, reason string) {
m.Spans <- sp
}

func (m *MockCollector) Stressed() bool {
return false
}

func (m *MockCollector) Flush() {
for {
select {
case <-m.Spans:
default:
return
}
}
}

var _ Collector = (*MockCollector)(nil)
77 changes: 77 additions & 0 deletions route/otlp_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package route

import (
"context"
"errors"
"fmt"
"net/http"

huskyotlp "github.com/honeycombio/husky/otlp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

collectorlogs "go.opentelemetry.io/proto/otlp/collector/logs/v1"
)

func (r *Router) postOTLPLogs(w http.ResponseWriter, req *http.Request) {
ri := huskyotlp.GetRequestInfoFromHttpHeaders(req.Header)

if err := ri.ValidateLogsHeaders(); err != nil {
if errors.Is(err, huskyotlp.ErrInvalidContentType) {
r.handlerReturnWithError(w, ErrInvalidContentType, err)
} else {
r.handleOTLPFailureResponse(w, req, huskyotlp.OTLPError{Message: err.Error(), HTTPStatusCode: http.StatusUnauthorized})
}
return
}

if !r.Config.IsAPIKeyValid(ri.ApiKey) {
r.handleOTLPFailureResponse(w, req, huskyotlp.OTLPError{Message: fmt.Sprintf("api key %s not found in list of authorized keys", ri.ApiKey), HTTPStatusCode: http.StatusUnauthorized})
return
}

result, err := huskyotlp.TranslateLogsRequestFromReader(req.Context(), req.Body, ri)
if err != nil {
r.handleOTLPFailureResponse(w, req, huskyotlp.OTLPError{Message: err.Error(), HTTPStatusCode: http.StatusInternalServerError})
return
}

if err := r.processOTLPRequest(req.Context(), result.Batches, ri.ApiKey); err != nil {
r.handleOTLPFailureResponse(w, req, huskyotlp.OTLPError{Message: err.Error(), HTTPStatusCode: http.StatusInternalServerError})
return
}

_ = huskyotlp.WriteOtlpHttpTraceSuccessResponse(w, req)
}

type LogsServer struct {
router *Router
collectorlogs.UnimplementedLogsServiceServer
}

func NewLogsServer(router *Router) *LogsServer {
logsServer := LogsServer{router: router}
return &logsServer
}

func (l *LogsServer) Export(ctx context.Context, req *collectorlogs.ExportLogsServiceRequest) (*collectorlogs.ExportLogsServiceResponse, error) {
ri := huskyotlp.GetRequestInfoFromGrpcMetadata(ctx)
if err := ri.ValidateLogsHeaders(); err != nil {
return nil, huskyotlp.AsGRPCError(err)
}

if !l.router.Config.IsAPIKeyValid(ri.ApiKey) {
return nil, status.Error(codes.Unauthenticated, fmt.Sprintf("api key %s not found in list of authorized keys", ri.ApiKey))
}

result, err := huskyotlp.TranslateLogsRequest(ctx, req, ri)
if err != nil {
return nil, huskyotlp.AsGRPCError(err)
}

if err := l.router.processOTLPRequest(ctx, result.Batches, ri.ApiKey); err != nil {
return nil, huskyotlp.AsGRPCError(err)
}

return &collectorlogs.ExportLogsServiceResponse{}, nil
}
Loading
Loading