Skip to content

Commit

Permalink
Adding a Shutdown Handler (#526)
Browse files Browse the repository at this point in the history
* Checkpoint: improve flush handler

Signed-off-by: Annanay <[email protected]>

* Checkpoint: fleshing out basic structure

Signed-off-by: Annanay <[email protected]>

* Actually return list of blocks from GetBlocksToBeFlushed

Signed-off-by: Annanay <[email protected]>

* Checkpoint: Address review feedback

Signed-off-by: Annanay <[email protected]>

* Checkpoint: flush immediate in sweepAllInstances and wait for all work in ShutdownHandler

Signed-off-by: Annanay <[email protected]>

* Add shutdown endpoint

Signed-off-by: Annanay <[email protected]>

* Finish implementation, add tests

Signed-off-by: Annanay <[email protected]>

* Checkpoint: Add blockID to flushOp

Signed-off-by: Annanay <[email protected]>

* Retry on failure in instance.CompleteBlock()

Signed-off-by: Annanay <[email protected]>

* Lintin

Signed-off-by: Annanay <[email protected]>

* Metric every err

Signed-off-by: Annanay <[email protected]>

* Fix tests

Signed-off-by: Annanay <[email protected]>

* Add CHANGELOG entry

Signed-off-by: Annanay <[email protected]>

* Increase test sleep

Signed-off-by: Annanay <[email protected]>
  • Loading branch information
annanay25 authored Feb 23, 2021
1 parent f87489c commit 6456fb9
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 111 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## master / unreleased

* [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526)

## v0.6.0

* [CHANGE] Fixed ingester latency spikes on read [#461](https://github.com/grafana/tempo/pull/461)
Expand Down
1 change: 1 addition & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (t *App) initIngester() (services.Service, error) {
tempopb.RegisterPusherServer(t.server.GRPC, t.ingester)
tempopb.RegisterQuerierServer(t.server.GRPC, t.ingester)
t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
t.server.HTTP.Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler))
return t.ingester, nil
}

Expand Down
1 change: 0 additions & 1 deletion integration/microservices/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ services:

synthetic-load-generator:
image: omnition/synthetic-load-generator:1.0.25
scale: 4 # every container = 1000 spans/s
volumes:
- ./load-generator.json:/etc/load-generator.json
environment:
Expand Down
124 changes: 94 additions & 30 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package ingester
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"time"

"github.com/google/uuid"

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -38,6 +43,11 @@ const (
flushBackoff = 1 * time.Second
)

const (
opKindComplete = iota
opKindFlush
)

// Flush triggers a flush of all in memory traces to disk. This is called
// by the lifecycler on shutdown and will put our traces in the WAL to be
// replayed.
Expand All @@ -52,28 +62,59 @@ func (i *Ingester) Flush() {
}
}

// FlushHandler calls sweepUsers(true) which will force push all traces into the WAL and force
// ShutdownHandler handles a graceful shutdown for an ingester. It does the following things in order
// * Stop incoming writes by exiting from the ring
// * Flush all blocks to backend
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) {
go func() {
level.Info(log.Logger).Log("msg", "shutdown handler called")

// lifecycler should exit the ring on shutdown
i.lifecycler.SetUnregisterOnShutdown(true)

// stop accepting new writes
i.markUnavailable()

// move all data into flushQueue
i.sweepAllInstances(true)

for !i.flushQueues.IsEmpty() {
time.Sleep(100 * time.Millisecond)
}

// stop ingester service
_ = services.StopAndAwaitTerminated(context.Background(), i)

level.Info(log.Logger).Log("msg", "shutdown handler complete")
}()

_, _ = w.Write([]byte("shutdown job acknowledged"))
}

// FlushHandler calls sweepAllInstances(true) which will force push all traces into the WAL and force
// mark all head blocks as ready to flush.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
i.sweepUsers(true)
i.sweepAllInstances(true)
w.WriteHeader(http.StatusNoContent)
}

type flushOp struct {
from int64
userID string
kind int
from int64
userID string
blockID uuid.UUID
}

func (o *flushOp) Key() string {
return o.userID
return o.userID + "/" + strconv.Itoa(o.kind) + "/" + o.blockID.String()
}

func (o *flushOp) Priority() int64 {
return -o.from
}

// sweepUsers periodically schedules series for flushing and garbage collects users with no series
func (i *Ingester) sweepUsers(immediate bool) {
// sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series
func (i *Ingester) sweepAllInstances(immediate bool) {
instances := i.getInstances()

for _, instance := range instances {
Expand All @@ -89,26 +130,27 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
return
}

// see if it's ready to cut a block?
err = instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
// see if it's ready to cut a block
blockID, err := instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to cut block", "err", err)
return
}

if blockID != uuid.Nil {
i.flushQueues.Enqueue(&flushOp{
kind: opKindComplete,
from: math.MaxInt64,
userID: instance.instanceID,
blockID: blockID,
})
}

// dump any blocks that have been flushed for awhile
err = instance.ClearFlushedBlocks(i.cfg.CompleteBlockTimeout)
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to complete block", "err", err)
}

// see if any complete blocks are ready to be flushed
if instance.GetBlockToBeFlushed() != nil {
i.flushQueues.Enqueue(&flushOp{
time.Now().Unix(),
instance.instanceID,
})
}
}

func (i *Ingester) flushLoop(j int) {
Expand All @@ -124,13 +166,39 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")
var completeBlockID uuid.UUID
var err error
if op.kind == opKindComplete {
level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID)
instance, exists := i.getInstanceByID(op.userID)
if !exists {
// instance no longer exists? that's bad, log and continue
level.Error(log.Logger).Log("msg", "instance not found", "tenantID", op.userID)
continue
}

completeBlockID, err = instance.CompleteBlock(op.blockID)
if completeBlockID != uuid.Nil {
// add a flushOp for the block we just completed
i.flushQueues.Enqueue(&flushOp{
kind: opKindFlush,
from: time.Now().Unix(),
userID: instance.instanceID,
blockID: completeBlockID,
})
}

} else {
level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")

err = i.flushBlock(op.userID, op.blockID)
}

err := i.flushUserTraces(op.userID)
if err != nil {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "failed to flush user", "err", err)

// re-queue failed flush
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "error performing op in flushQueue",
"op", op.kind, "block", op.blockID.String(), "err", err)
metricFailedFlushes.Inc()
// re-queue op with backoff
op.from += int64(flushBackoff)
i.flushQueues.Requeue(op)
continue
Expand All @@ -140,7 +208,7 @@ func (i *Ingester) flushLoop(j int) {
}
}

func (i *Ingester) flushUserTraces(userID string) error {
func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error {
instance, err := i.getOrCreateInstance(userID)
if err != nil {
return err
Expand All @@ -150,12 +218,7 @@ func (i *Ingester) flushUserTraces(userID string) error {
return fmt.Errorf("instance id %s not found", userID)
}

for {
block := instance.GetBlockToBeFlushed()
if block == nil {
break
}

if block := instance.GetBlockToBeFlushed(blockID); block != nil {
ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
Expand All @@ -164,10 +227,11 @@ func (i *Ingester) flushUserTraces(userID string) error {
err = i.store.WriteBlock(ctx, block)
metricFlushDuration.Observe(time.Since(start).Seconds())
if err != nil {
metricFailedFlushes.Inc()
return err
}
metricBlocksFlushed.Inc()
} else {
return fmt.Errorf("error getting block to flush")
}

return nil
Expand Down
24 changes: 15 additions & 9 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (i *Ingester) loop(ctx context.Context) error {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false)
i.sweepAllInstances(false)

case <-ctx.Done():
return nil
Expand All @@ -125,14 +125,7 @@ func (i *Ingester) loop(ctx context.Context) error {

// stopping is run when ingester is asked to stop
func (i *Ingester) stopping(_ error) error {
// This will prevent us accepting any more samples
i.stopIncomingRequests()

// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
// Next initiate our graceful exit from the ring.
return services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}
i.markUnavailable()

if i.flushQueues != nil {
i.flushQueues.Stop()
Expand All @@ -142,6 +135,19 @@ func (i *Ingester) stopping(_ error) error {
return nil
}

func (i *Ingester) markUnavailable() {
// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
// Next initiate our graceful exit from the ring.
if err := services.StopAndAwaitTerminated(context.Background(), i.lifecycler); err != nil {
level.Warn(log.Logger).Log("msg", "failed to stop ingester lifecycler", "err", err)
}
}

// This will prevent us accepting any more samples
i.stopIncomingRequests()
}

// Push implements tempopb.Pusher.Push
func (i *Ingester) Push(ctx context.Context, req *tempopb.PushRequest) (*tempopb.PushResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
Expand Down
Loading

0 comments on commit 6456fb9

Please sign in to comment.