Skip to content

Commit

Permalink
[refactor] Remove archive reader and writer from remote storage grpc …
Browse files Browse the repository at this point in the history
…handler (#6611)

## Which problem is this PR solving?
- Towards #6065

## Description of the changes
- 🛑 This PR deprecates two fields in the `CapabilitiesResponse`;
`ArchiveSpanReader` and `ArchiveSpanWriter` as these capabilities do not
exist within grpc storage anymore and are configured externally. 🛑
- 🛑 This PR deprecates the ArchiveSpanReaderPlugin and
ArchvieSpanWriterPlugin 🛑

## How was this change tested?
- CI

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Jan 26, 2025
1 parent 7f72d96 commit 5136f34
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 288 deletions.
9 changes: 2 additions & 7 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@ type storageFactory interface {
CreateSpanReader() (spanstore.Reader, error)
CreateSpanWriter() (spanstore.Writer, error)
CreateDependencyReader() (dependencystore.Reader, error)
InitArchiveStorage(logger *zap.Logger) (spanstore.Reader, spanstore.Writer)
}

// NewServer creates and initializes Server.
func NewServer(options *Options, storageFactory storageFactory, tm *tenancy.Manager, telset telemetry.Settings) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, telset.Logger)
handler, err := createGRPCHandler(storageFactory)
if err != nil {
return nil, err
}
Expand All @@ -60,7 +59,7 @@ func NewServer(options *Options, storageFactory storageFactory, tm *tenancy.Mana
}, nil
}

func createGRPCHandler(f storageFactory, logger *zap.Logger) (*shared.GRPCHandler, error) {
func createGRPCHandler(f storageFactory) (*shared.GRPCHandler, error) {
reader, err := f.CreateSpanReader()
if err != nil {
return nil, err
Expand All @@ -81,10 +80,6 @@ func createGRPCHandler(f storageFactory, logger *zap.Logger) (*shared.GRPCHandle
StreamingSpanWriter: func() spanstore.Writer { return nil },
}

ar, aw := f.InitArchiveStorage(logger)
impl.ArchiveSpanReader = func() spanstore.Reader { return ar }
impl.ArchiveSpanWriter = func() spanstore.Writer { return aw }

handler := shared.NewGRPCHandler(impl)
return handler, nil
}
Expand Down
10 changes: 1 addition & 9 deletions cmd/remote-storage/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestCreateGRPCHandler(t *testing.T) {
depReader: depReader,
}

h, err := createGRPCHandler(f, zap.NewNop())
h, err := createGRPCHandler(f)
require.NoError(t, err)

writer.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("writer error"))
Expand All @@ -176,12 +176,6 @@ func TestCreateGRPCHandler(t *testing.T) {
_, err = h.GetDependencies(context.Background(), &storage_v1.GetDependenciesRequest{})
require.ErrorContains(t, err, "deps error")

err = h.GetArchiveTrace(nil, nil)
require.ErrorContains(t, err, "not implemented")

_, err = h.WriteArchiveSpan(context.Background(), nil)
require.ErrorContains(t, err, "not implemented")

err = h.WriteSpanStream(nil)
assert.ErrorContains(t, err, "not implemented")
}
Expand Down Expand Up @@ -449,8 +443,6 @@ func validateGRPCServer(t *testing.T, hostPort string) {
"jaeger.storage.v1.SpanWriterPlugin",
"jaeger.storage.v1.DependenciesReaderPlugin",
"jaeger.storage.v1.PluginCapabilities",
"jaeger.storage.v1.ArchiveSpanReaderPlugin",
"jaeger.storage.v1.ArchiveSpanWriterPlugin",
"jaeger.storage.v1.StreamingSpanWriterPlugin",
"grpc.health.v1.Health",
},
Expand Down
6 changes: 4 additions & 2 deletions plugin/storage/grpc/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,13 @@ service SpanReaderPlugin {
rpc FindTraceIDs(FindTraceIDsRequest) returns (FindTraceIDsResponse);
}

// This plugin is deprecated and is not used anymore.
service ArchiveSpanWriterPlugin {
// spanstore/Writer
rpc WriteArchiveSpan(WriteSpanRequest) returns (WriteSpanResponse);
}

// This plugin is deprecated and is not used anymore.
service ArchiveSpanReaderPlugin {
// spanstore/Reader
rpc GetArchiveTrace(GetTraceRequest) returns (stream SpansResponseChunk);
Expand All @@ -188,8 +190,8 @@ message CapabilitiesRequest {
}

message CapabilitiesResponse {
bool archiveSpanReader = 1;
bool archiveSpanWriter = 2;
bool archiveSpanReader = 1 [deprecated = true];
bool archiveSpanWriter = 2 [deprecated = true];
bool streamingSpanWriter = 3;
}

Expand Down
48 changes: 2 additions & 46 deletions plugin/storage/grpc/shared/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type GRPCHandlerStorageImpl struct {
SpanWriter func() spanstore.Writer
DependencyReader func() dependencystore.Reader

ArchiveSpanReader func() spanstore.Reader
ArchiveSpanWriter func() spanstore.Writer

StreamingSpanWriter func() spanstore.Writer
}

Expand All @@ -53,16 +50,12 @@ func NewGRPCHandler(impl *GRPCHandlerStorageImpl) *GRPCHandler {
func (s *GRPCHandler) Register(ss *grpc.Server, hs *health.Server) error {
storage_v1.RegisterSpanReaderPluginServer(ss, s)
storage_v1.RegisterSpanWriterPluginServer(ss, s)
storage_v1.RegisterArchiveSpanReaderPluginServer(ss, s)
storage_v1.RegisterArchiveSpanWriterPluginServer(ss, s)
storage_v1.RegisterPluginCapabilitiesServer(ss, s)
storage_v1.RegisterDependenciesReaderPluginServer(ss, s)
storage_v1.RegisterStreamingSpanWriterPluginServer(ss, s)

hs.SetServingStatus("jaeger.storage.v1.SpanReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.SpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.ArchiveSpanReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.ArchiveSpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.PluginCapabilities", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.DependenciesReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
hs.SetServingStatus("jaeger.storage.v1.StreamingSpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING)
Expand Down Expand Up @@ -244,45 +237,8 @@ func (*GRPCHandler) sendSpans(spans []*model.Span, sendFn func(*storage_v1.Spans

func (s *GRPCHandler) Capabilities(context.Context, *storage_v1.CapabilitiesRequest) (*storage_v1.CapabilitiesResponse, error) {
return &storage_v1.CapabilitiesResponse{
ArchiveSpanReader: s.impl.ArchiveSpanReader() != nil,
ArchiveSpanWriter: s.impl.ArchiveSpanWriter() != nil,
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
StreamingSpanWriter: s.impl.StreamingSpanWriter() != nil,
}, nil
}

func (s *GRPCHandler) GetArchiveTrace(r *storage_v1.GetTraceRequest, stream storage_v1.ArchiveSpanReaderPlugin_GetArchiveTraceServer) error {
reader := s.impl.ArchiveSpanReader()
if reader == nil {
return status.Error(codes.Unimplemented, "not implemented")
}
trace, err := reader.GetTrace(stream.Context(), spanstore.GetTraceParameters{
TraceID: r.TraceID,
StartTime: r.StartTime,
EndTime: r.EndTime,
})
if errors.Is(err, spanstore.ErrTraceNotFound) {
return status.Error(codes.NotFound, spanstore.ErrTraceNotFound.Error())
}
if err != nil {
return err
}

err = s.sendSpans(trace.Spans, stream.Send)
if err != nil {
return err
}

return nil
}

func (s *GRPCHandler) WriteArchiveSpan(ctx context.Context, r *storage_v1.WriteSpanRequest) (*storage_v1.WriteSpanResponse, error) {
writer := s.impl.ArchiveSpanWriter()
if writer == nil {
return nil, status.Error(codes.Unimplemented, "not implemented")
}
err := writer.WriteSpan(ctx, r.Span)
if err != nil {
return nil, err
}
return &storage_v1.WriteSpanResponse{}, nil
}
152 changes: 2 additions & 150 deletions plugin/storage/grpc/shared/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ func withGRPCServer(fn func(r *grpcServerTest)) {
DependencyReader: func() dependencystore.Reader {
return mockPlugin.depsReader
},
ArchiveSpanReader: func() spanstore.Reader {
return mockPlugin.spanReader
},
ArchiveSpanWriter: func() spanstore.Writer {
return mockPlugin.spanWriter
},
StreamingSpanWriter: func() spanstore.Writer {
return mockPlugin.streamWriter
},
Expand Down Expand Up @@ -280,150 +274,11 @@ func TestGRPCServerGetDependencies(t *testing.T) {
})
}

func TestGRPCServerGetArchiveTrace(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
traceSteam := new(grpcMocks.SpanReaderPlugin_GetTraceServer)
traceSteam.On("Context").Return(context.Background())
traceSteam.On("Send", &storage_v1.SpansResponseChunk{Spans: mockTraceSpans}).
Return(nil)

var traceSpans []*model.Span
for i := range mockTraceSpans {
traceSpans = append(traceSpans, &mockTraceSpans[i])
}
r.impl.spanReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}).
Return(&model.Trace{Spans: traceSpans}, nil)

err := r.server.GetArchiveTrace(&storage_v1.GetTraceRequest{
TraceID: mockTraceID,
}, traceSteam)
require.NoError(t, err)
})
}

func TestGRPCServerGetArchiveTrace_NotFound(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
traceSteam := new(grpcMocks.SpanReaderPlugin_GetTraceServer)
traceSteam.On("Context").Return(context.Background())

r.impl.spanReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}).
Return(nil, spanstore.ErrTraceNotFound)

err := r.server.GetArchiveTrace(&storage_v1.GetTraceRequest{
TraceID: mockTraceID,
}, traceSteam)
assert.Equal(t, codes.NotFound, status.Code(err))
})
}

func TestGRPCServerGetArchiveTrace_Error(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
traceSteam := new(grpcMocks.SpanReaderPlugin_GetTraceServer)
traceSteam.On("Context").Return(context.Background())

r.impl.spanReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}).
Return(nil, errors.New("some error"))

err := r.server.GetArchiveTrace(&storage_v1.GetTraceRequest{
TraceID: mockTraceID,
}, traceSteam)
require.Error(t, err)
})
}

func TestGRPCServerGetArchiveTrace_NoImpl(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
r.server.impl.ArchiveSpanReader = func() spanstore.Reader { return nil }
traceSteam := new(grpcMocks.SpanReaderPlugin_GetTraceServer)

r.impl.spanReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}).
Return(nil, errors.New("some error"))

err := r.server.GetArchiveTrace(&storage_v1.GetTraceRequest{
TraceID: mockTraceID,
}, traceSteam)
assert.Equal(t, codes.Unimplemented, status.Code(err))
})
}

func TestGRPCServerGetArchiveTrace_StreamError(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
traceSteam := new(grpcMocks.SpanReaderPlugin_GetTraceServer)
traceSteam.On("Context").Return(context.Background())
traceSteam.On("Send", &storage_v1.SpansResponseChunk{Spans: mockTraceSpans}).
Return(errors.New("some error"))

var traceSpans []*model.Span
for i := range mockTraceSpans {
traceSpans = append(traceSpans, &mockTraceSpans[i])
}
r.impl.spanReader.On("GetTrace", mock.Anything, spanstore.GetTraceParameters{TraceID: mockTraceID}).
Return(&model.Trace{Spans: traceSpans}, nil)

err := r.server.GetArchiveTrace(&storage_v1.GetTraceRequest{
TraceID: mockTraceID,
}, traceSteam)
require.Error(t, err)
})
}

func TestGRPCServerWriteArchiveSpan_NoImpl(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
r.server.impl.ArchiveSpanWriter = func() spanstore.Writer { return nil }

_, err := r.server.WriteArchiveSpan(context.Background(), &storage_v1.WriteSpanRequest{
Span: &mockTraceSpans[0],
})
assert.Equal(t, codes.Unimplemented, status.Code(err))
})
}

func TestGRPCServerWriteArchiveSpan(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
r.impl.spanWriter.On("WriteSpan", mock.Anything, &mockTraceSpans[0]).
Return(nil)

s, err := r.server.WriteArchiveSpan(context.Background(), &storage_v1.WriteSpanRequest{
Span: &mockTraceSpans[0],
})
require.NoError(t, err)
assert.Equal(t, &storage_v1.WriteSpanResponse{}, s)
})
}

func TestGRPCServerWriteArchiveSpan_Error(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
r.impl.spanWriter.On("WriteSpan", mock.Anything, &mockTraceSpans[0]).
Return(errors.New("some error"))

_, err := r.server.WriteArchiveSpan(context.Background(), &storage_v1.WriteSpanRequest{
Span: &mockTraceSpans[0],
})
require.Error(t, err)
})
}

func TestGRPCServerCapabilities(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
capabilities, err := r.server.Capabilities(context.Background(), &storage_v1.CapabilitiesRequest{})
require.NoError(t, err)
assert.Equal(t, &storage_v1.CapabilitiesResponse{ArchiveSpanReader: true, ArchiveSpanWriter: true, StreamingSpanWriter: true}, capabilities)
})
}

func TestGRPCServerCapabilities_NoArchive(t *testing.T) {
withGRPCServer(func(r *grpcServerTest) {
r.server.impl.ArchiveSpanReader = func() spanstore.Reader { return nil }
r.server.impl.ArchiveSpanWriter = func() spanstore.Writer { return nil }

capabilities, err := r.server.Capabilities(context.Background(), &storage_v1.CapabilitiesRequest{})
require.NoError(t, err)
expected := &storage_v1.CapabilitiesResponse{
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
StreamingSpanWriter: true,
}
assert.Equal(t, expected, capabilities)
assert.Equal(t, &storage_v1.CapabilitiesResponse{ArchiveSpanReader: false, ArchiveSpanWriter: false, StreamingSpanWriter: true}, capabilities)
})
}

Expand All @@ -433,10 +288,7 @@ func TestGRPCServerCapabilities_NoStreamWriter(t *testing.T) {

capabilities, err := r.server.Capabilities(context.Background(), &storage_v1.CapabilitiesRequest{})
require.NoError(t, err)
expected := &storage_v1.CapabilitiesResponse{
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
}
expected := &storage_v1.CapabilitiesResponse{}
assert.Equal(t, expected, capabilities)
})
}
Loading

0 comments on commit 5136f34

Please sign in to comment.