From ca42ed1b8a2166590adcdf758a8980f929188229 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 26 May 2022 14:39:59 -0400 Subject: [PATCH] Add package and API skeleton for internal queue (#40) --- monitoring/queuemon.go | 16 ++++++------ monitoring/queuemon_test.go | 22 +---------------- queue/queue.go | 49 +++++++++++++++++++++++++++++++++++++ server/run.go | 20 +++++++++------ server/server.go | 13 ++++++++++ 5 files changed, 84 insertions(+), 36 deletions(-) create mode 100644 queue/queue.go diff --git a/monitoring/queuemon.go b/monitoring/queuemon.go index 04b8295..91f6a99 100644 --- a/monitoring/queuemon.go +++ b/monitoring/queuemon.go @@ -8,11 +8,11 @@ import ( "fmt" "time" - outqueue "github.com/elastic/beats/v7/libbeat/publisher/queue" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/opt" "github.com/elastic/elastic-agent-shipper/monitoring/reporter" + "github.com/elastic/elastic-agent-shipper/queue" "github.com/elastic/elastic-agent-shipper/monitoring/reporter/expvar" "github.com/elastic/elastic-agent-shipper/monitoring/reporter/log" @@ -26,8 +26,8 @@ type QueueMonitor struct { interval time.Duration done chan struct{} // handler for the event queue - queue outqueue.Queue - log *logp.Logger + target queue.MetricsSource + log *logp.Logger // enabled is a awkward no-op if a user has disabled monitoring enabled bool @@ -59,16 +59,16 @@ func DefaultConfig() Config { } // NewFromConfig creates a new queue monitor from a pre-filled config struct. -func NewFromConfig(cfg Config, queue outqueue.Queue) (*QueueMonitor, error) { +func NewFromConfig(cfg Config, target queue.MetricsSource) (*QueueMonitor, error) { // the queue == nil is largely a shim to make things not panic while we wait for the queues to get hooked up. - if !cfg.Enabled || queue == nil { + if !cfg.Enabled || target == nil { return &QueueMonitor{enabled: true}, nil } //init reporters reporters := initReporters(cfg) return &QueueMonitor{ interval: cfg.Interval, - queue: queue, + target: target, done: make(chan struct{}), log: logp.L(), reporters: reporters, @@ -112,7 +112,7 @@ func (mon QueueMonitor) End() { // updateMetrics is responsible for fetching the metrics from the queue, calculating whatever it needs to, and sending the complete events to the output func (mon *QueueMonitor) updateMetrics() error { - raw, err := mon.queue.Metrics() + raw, err := mon.target.Metrics() if err != nil { return fmt.Errorf("error fetching queue Metrics: %w", err) } @@ -169,7 +169,7 @@ func initReporters(cfg Config) []reporter.Reporter { // This is a wrapper to deal with the multiple queue metric "types", // as we could either be dealing with event counts, or bytes. // The reporting interfaces assumes we only want one. -func getLimits(raw outqueue.Metrics) (uint64, uint64, bool, error) { +func getLimits(raw queue.Metrics) (uint64, uint64, bool, error) { //bias towards byte count, as it's a little more granular. if raw.ByteCount.Exists() && raw.ByteLimit.Exists() { diff --git a/monitoring/queuemon_test.go b/monitoring/queuemon_test.go index 3bfda00..dca7e62 100644 --- a/monitoring/queuemon_test.go +++ b/monitoring/queuemon_test.go @@ -20,8 +20,8 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/opt" - "github.com/elastic/beats/v7/libbeat/publisher/queue" expvarReport "github.com/elastic/elastic-agent-shipper/monitoring/reporter/expvar" + "github.com/elastic/elastic-agent-shipper/queue" ) func init() { @@ -56,26 +56,6 @@ func NewTestQueue(limit uint64) *TestMetricsQueue { } } -// BufferConfig doesn't do anything -func (tq TestMetricsQueue) BufferConfig() queue.BufferConfig { - return queue.BufferConfig{} -} - -// Producer doesn't do anything -func (tq TestMetricsQueue) Producer(_ queue.ProducerConfig) queue.Producer { - return nil -} - -// Consumer doesn't do anything -func (tq TestMetricsQueue) Consumer() queue.Consumer { - return nil -} - -// Close Doesn't do anything -func (tq TestMetricsQueue) Close() error { - return nil -} - // Metrics spoofs the metrics output func (tq *TestMetricsQueue) Metrics() (queue.Metrics, error) { tq.metricState.EventCount = opt.UintWith(tq.metricState.EventCount.ValueOr(0) + 1) diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 0000000..b293ee3 --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,49 @@ +package queue + +import ( + "fmt" + + beatsqueue "github.com/elastic/beats/v7/libbeat/publisher/queue" + + "github.com/elastic/elastic-agent-shipper/api" +) + +// Queue is a shipper-specific wrapper around the bare libbeat queue. +// It accepts api.Event instead of bare interface pointers like the +// libbeat queue, and it sets opinionated defaults for the queue +// semantics. The intention is to keep the shipper from becoming too +// entangled with the legacy queue api, and to gradually expose more +// features as the libbeat queue evolves and we decide what we want +// to support in the shipper. +type Queue struct { + eventQueue beatsqueue.Queue + + //producer beatsqueue.Producer +} + +type Metrics beatsqueue.Metrics + +// metricsSource is a wrapper around the libbeat queue interface, exposing only +// the callback to query the current metrics. It is used to pass queue metrics +// to the monitoring package. +type MetricsSource interface { + Metrics() (Metrics, error) +} + +func New() (*Queue, error) { + return &Queue{}, nil +} + +func (queue *Queue) Publish(event *api.Event) error { + return fmt.Errorf("couldn't publish: Queue.Publish is not implemented") +} + +func (queue *Queue) Metrics() (Metrics, error) { + metrics, err := queue.eventQueue.Metrics() + // We need to do the explicit cast, otherwise this isn't recognized as the same type + return Metrics(metrics), err +} + +func (queue *Queue) Close() { + +} diff --git a/server/run.go b/server/run.go index 5ef65bd..14bb5fc 100644 --- a/server/run.go +++ b/server/run.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-shipper/config" "github.com/elastic/elastic-agent-shipper/monitoring" + "github.com/elastic/elastic-agent-shipper/queue" pb "github.com/elastic/elastic-agent-shipper/api" ) @@ -62,6 +63,14 @@ func handleShutdown(stopFunc func(), log *logp.Logger) { // Run starts the gRPC server func Run(cfg config.ShipperConfig) error { log := logp.L() + + // When there is queue-specific configuration in ShipperConfig, it should + // be passed in here. + queue, err := queue.New() + if err != nil { + return fmt.Errorf("couldn't create queue: %w", err) + } + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Port)) if err != nil { return fmt.Errorf("failed to listen: %w", err) @@ -69,7 +78,7 @@ func Run(cfg config.ShipperConfig) error { // Beats won't call the "New*" functions of a queue directly, but instead fetch a queueFactory from the global registers. //However, that requires the publisher/pipeline code as well, and I'm not sure we want that. - monHandler, err := loadMonitoring(cfg) + monHandler, err := loadMonitoring(cfg, queue) if err != nil { return fmt.Errorf("error loading outputs: %w", err) } @@ -91,6 +100,7 @@ func Run(cfg config.ShipperConfig) error { shutdownFunc := func() { grpcServer.GracefulStop() monHandler.End() + queue.Close() } handleShutdown(shutdownFunc, log) log.Debugf("gRPC server is listening on port %d", cfg.Port) @@ -99,13 +109,9 @@ func Run(cfg config.ShipperConfig) error { } // Initialize metrics and outputs -func loadMonitoring(cfg config.ShipperConfig) (*monitoring.QueueMonitor, error) { - //If we had an actual queue hooked up, that would go here - //queue := NewTestQueue() - +func loadMonitoring(cfg config.ShipperConfig, queue *queue.Queue) (*monitoring.QueueMonitor, error) { //startup monitor - //remove the nil in the second argument here when we have an actual queue. - mon, err := monitoring.NewFromConfig(cfg.Monitor, nil) + mon, err := monitoring.NewFromConfig(cfg.Monitor, queue) if err != nil { return nil, fmt.Errorf("error initializing output monitor: %w", err) } diff --git a/server/server.go b/server/server.go index 6dd5ca6..6f9df19 100644 --- a/server/server.go +++ b/server/server.go @@ -12,10 +12,14 @@ import ( "github.com/elastic/elastic-agent-libs/logp" pb "github.com/elastic/elastic-agent-shipper/api" + "github.com/elastic/elastic-agent-shipper/queue" ) type shipperServer struct { logger *logp.Logger + + queue *queue.Queue + pb.UnimplementedProducerServer } @@ -24,6 +28,15 @@ func (serv shipperServer) PublishEvents(_ context.Context, req *pb.PublishReques results := []*pb.EventResult{} for _, evt := range req.Events { serv.logger.Infof("Got event %s: %#v", evt.EventId, evt.Fields.AsMap()) + err := serv.queue.Publish(evt) + if err != nil { + // If we couldn't accept any events, return the error directly. Otherwise, + // just return success on however many events we were able to handle. + if len(results) == 0 { + return nil, err + } + break + } res := pb.EventResult{EventId: evt.EventId, Timestamp: pbts.Now()} results = append(results, &res) }