Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output reloading: ensure all events get eventually published #17657

Closed
wants to merge 11 commits into from
27 changes: 26 additions & 1 deletion libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package pipeline

import (
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
Expand All @@ -43,6 +46,11 @@ type eventConsumer struct {
out *outputGroup
}

func lf(msg string, v ...interface{}) {
now := time.Now().Format("15:04:05.00000")
fmt.Printf(now+" "+msg+"\n", v...)
}

type consumerSignal struct {
tag consumerEventTag
consumer queue.Consumer
Expand Down Expand Up @@ -114,8 +122,11 @@ func (c *eventConsumer) sigHint() {
}

func (c *eventConsumer) updOutput(grp *outputGroup) {
lf("in eventConsumer: updOutput: new output group ID = %v", grp.id)
// close consumer to break consumer worker from pipeline
c.consumer.Close()
if err := c.consumer.Close(); err != nil {
lf("in updGroup: consumer close error: %v", err)
}

// update output
c.sig <- consumerSignal{
Expand Down Expand Up @@ -147,6 +158,7 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {
case sigConsumerCheck:

case sigConsumerUpdateOutput:
lf("in eventConsumer: handling sigConsumerUpdateOutput: new output group ID = %v", sig.out.id)
c.out = sig.out

case sigConsumerUpdateInput:
Expand All @@ -156,12 +168,17 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {
paused = c.paused()
if !paused && c.out != nil && batch != nil {
out = c.out.workQueue
} else if paused && c.out != nil && batch != nil {
lf("paused but have batch of %v event", len(batch.events))
//batch.Cancelled()
} else {
//lf("over here, batch empty? = %v", batch == nil)
out = nil
}
}

for {
lf("paused: %v, batch == nil?: %v", paused, batch == nil)
if !paused && c.out != nil && consumer != nil && batch == nil {
out = c.out.workQueue
queueBatch, err := consumer.Get(c.out.batchSize)
Expand All @@ -171,17 +188,20 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {
continue
}
if queueBatch != nil {
lf("in event consumer: got batch of %v events", len(queueBatch.Events()))
batch = newBatch(c.ctx, queueBatch, c.out.timeToLive)
}

paused = c.paused()
if paused || batch == nil {
lf("in event consumer: paused: %v, batch size = %v", paused, len(batch.events))
out = nil
}
}

select {
case sig := <-c.sig:
lf("in event consumer: 1st select: handling signal %v", sig)
handleSignal(sig)
continue
default:
Expand All @@ -190,10 +210,15 @@ func (c *eventConsumer) loop(consumer queue.Consumer) {
select {
case <-c.done:
log.Debug("stop pipeline event consumer")
lf("stop pipeline event consumer")
return
case sig := <-c.sig:
lf("in event consumer: 2nd select: handling signal %v", sig)
handleSignal(sig)
case out <- batch:
if batch != nil {
lf("in event consumer: sent batch of %v events to output workQueue", len(batch.Events()))
}
batch = nil
}
}
Expand Down
31 changes: 30 additions & 1 deletion libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
package pipeline

import (
"sync"

"github.com/elastic/beats/v7/libbeat/common/atomic"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
)

var _outputGroupID atomic.Uint

// outputController manages the pipelines output capabilities, like:
// - start
// - stop
Expand All @@ -39,10 +45,13 @@ type outputController struct {
retryer *retryer
consumer *eventConsumer
out *outputGroup

setInProgress sync.Mutex
}

// outputGroup configures a group of load balanced outputs with shared work queue.
type outputGroup struct {
id uint
workQueue workQueue
outputs []outputWorker

Expand Down Expand Up @@ -77,8 +86,8 @@ func newOutputController(
ctx.observer = observer
ctx.retryer = c.retryer

lf("starting consumer...")
c.consumer.sigContinue()

return c
}

Expand All @@ -98,6 +107,11 @@ func (c *outputController) Close() error {
}

func (c *outputController) Set(outGrp outputs.Group) {
lf("set called...")
c.setInProgress.Lock()
defer c.setInProgress.Unlock()
lf("setting output...")

// create new outputGroup with shared work queue
clients := outGrp.Clients
queue := makeWorkQueue()
Expand All @@ -106,19 +120,22 @@ func (c *outputController) Set(outGrp outputs.Group) {
worker[i] = makeClientWorker(c.observer, queue, client)
}
grp := &outputGroup{
id: _outputGroupID.Inc(),
workQueue: queue,
outputs: worker,
timeToLive: outGrp.Retry + 1,
batchSize: outGrp.BatchSize,
}

// update consumer and retryer
lf("pausing consumer...")
c.consumer.sigPause()
if c.out != nil {
for range c.out.outputs {
c.retryer.sigOutputRemoved()
}
}

c.retryer.updOutput(queue)
for range clients {
c.retryer.sigOutputAdded()
Expand All @@ -130,11 +147,23 @@ func (c *outputController) Set(outGrp outputs.Group) {
for _, w := range c.out.outputs {
w.Close()
}
for drained := false; !drained; {
select {
case b := <-c.out.workQueue:
lf("draining batches from old workqueue to new workqueue...")
queue <- b
default:
lf("workqueue is already drained")
drained = true
}
}
}

c.out = grp

// restart consumer (potentially blocked by retryer)
lf("continuing consumer...")
c.consumer.sigUnWait()
c.consumer.sigContinue()

c.observer.updateOutputGroup()
Expand Down
120 changes: 120 additions & 0 deletions libbeat/publisher/pipeline/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pipeline

import (
"fmt"
"sync"
"testing"
"testing/quick"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"

"github.com/stretchr/testify/require"
)

func TestOutputReload(t *testing.T) {
tests := map[string]func(mockPublishFn) outputs.Client{
"client": newMockClient,
"network_client": newMockNetworkClient,
}

for name, ctor := range tests {
t.Run(name, func(t *testing.T) {
seedPRNG(t)

err := quick.Check(func(q uint) bool {
numEventsToPublish := 15000 + (q % 500) // 15000 to 19999
numOutputReloads := 350 + (q % 150) // 350 to 499

queueFactory := func(ackListener queue.ACKListener) (queue.Queue, error) {
return memqueue.NewQueue(
logp.L(),
memqueue.Settings{
ACKListener: ackListener,
Events: int(numEventsToPublish),
}), nil
}

var publishedCount atomic.Uint
countingPublishFn := func(batch publisher.Batch) error {
publishedCount.Add(uint(len(batch.Events())))
lf("in test: published now: %v, so far: %v", len(batch.Events()), publishedCount.Load())
return nil
}

pipeline, err := New(
beat.Info{},
Monitors{},
queueFactory,
outputs.Group{},
Settings{},
)
require.NoError(t, err)
defer pipeline.Close()

pipelineClient, err := pipeline.Connect()
require.NoError(t, err)
defer pipelineClient.Close()

var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := uint(0); i < numEventsToPublish; i++ {
pipelineClient.Publish(beat.Event{})
}
wg.Done()
}()

for i := uint(0); i < numOutputReloads; i++ {
outputClient := ctor(countingPublishFn)
out := outputs.Group{
Clients: []outputs.Client{outputClient},
}
lf("in test code: reloading output...")
pipeline.output.Set(out)
}

wg.Wait()

timeout := 20 * time.Second
success := waitUntilTrue(timeout, func() bool {
return uint(numEventsToPublish) == publishedCount.Load()
})
if !success {
fmt.Printf(
"numOutputReloads = %v, numEventsToPublish = %v, publishedCounted = %v\n",
numOutputReloads, numEventsToPublish, publishedCount.Load(),
)
}
return success
}, &quick.Config{MaxCount: 25})

if err != nil {
t.Error(err)
}
})
}
}
Loading