Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a Shutdown Handler #526

Merged
merged 15 commits into from
Feb 23, 2021
Merged
1 change: 1 addition & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (t *App) initIngester() (services.Service, error) {
tempopb.RegisterPusherServer(t.server.GRPC, t.ingester)
tempopb.RegisterQuerierServer(t.server.GRPC, t.ingester)
t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
t.server.HTTP.Path("/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler))
return t.ingester, nil
}

Expand Down
1 change: 0 additions & 1 deletion integration/microservices/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ services:

synthetic-load-generator:
image: omnition/synthetic-load-generator:1.0.25
scale: 4 # every container = 1000 spans/s
volumes:
- ./load-generator.json:/etc/load-generator.json
environment:
Expand Down
110 changes: 84 additions & 26 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package ingester
import (
"context"
"fmt"
"math"
"net/http"
"strconv"
"time"

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -38,6 +41,11 @@ const (
flushBackoff = 1 * time.Second
)

const (
opKindComplete = iota
opKindFlush
)

// Flush triggers a flush of all in memory traces to disk. This is called
// by the lifecycler on shutdown and will put our traces in the WAL to be
// replayed.
Expand All @@ -52,28 +60,52 @@ func (i *Ingester) Flush() {
}
}

// FlushHandler calls sweepUsers(true) which will force push all traces into the WAL and force
// ShutdownHandler handles a graceful shutdown for an ingester. It does the following things in order
// * Stop incoming writes by exiting from the ring
// * Flush all blocks to backend
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, _ *http.Request) {
// lifecycler should exit the ring on shutdown
i.lifecycler.SetUnregisterOnShutdown(true)

// stop accepting new writes
i.markUnavailable()

// move all data into flushQueue
i.sweepAllInstances(true)

for !i.flushQueues.IsEmpty() {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(100 * time.Millisecond)
}
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

// stop ingester service
_ = services.StopAndAwaitTerminated(context.Background(), i)

w.WriteHeader(http.StatusNoContent)
}

// FlushHandler calls sweepAllInstances(true) which will force push all traces into the WAL and force
// mark all head blocks as ready to flush.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
i.sweepUsers(true)
i.sweepAllInstances(true)
w.WriteHeader(http.StatusNoContent)
}

type flushOp struct {
kind int
from int64
userID string
}

func (o *flushOp) Key() string {
return o.userID
return o.userID + "/" + strconv.Itoa(o.kind)
}

func (o *flushOp) Priority() int64 {
return -o.from
}

// sweepUsers periodically schedules series for flushing and garbage collects users with no series
func (i *Ingester) sweepUsers(immediate bool) {
// sweepAllInstances periodically schedules series for flushing and garbage collects instances with no series
func (i *Ingester) sweepAllInstances(immediate bool) {
instances := i.getInstances()

for _, instance := range instances {
Expand All @@ -89,24 +121,33 @@ func (i *Ingester) sweepInstance(instance *instance, immediate bool) {
return
}

// see if it's ready to cut a block?
err = instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
// see if it's ready to cut a block
isReady, err := instance.CutBlockIfReady(i.cfg.MaxBlockDuration, i.cfg.MaxBlockBytes, immediate)
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to cut block", "err", err)
return
}

if isReady {
i.flushQueues.Enqueue(&flushOp{
kind: opKindComplete,
from: math.MaxInt64,
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
userID: instance.instanceID,
})
}

// dump any blocks that have been flushed for awhile
err = instance.ClearFlushedBlocks(i.cfg.CompleteBlockTimeout)
if err != nil {
level.Error(log.WithUserID(instance.instanceID, log.Logger)).Log("msg", "failed to complete block", "err", err)
}

// see if any complete blocks are ready to be flushed
if instance.GetBlockToBeFlushed() != nil {
if len(instance.GetBlocksToBeFlushed()) > 0 {
i.flushQueues.Enqueue(&flushOp{
time.Now().Unix(),
instance.instanceID,
kind: opKindFlush,
from: time.Now().Unix(),
userID: instance.instanceID,
})
}
}
Expand All @@ -124,16 +165,38 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")

err := i.flushUserTraces(op.userID)
if err != nil {
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "failed to flush user", "err", err)

// re-queue failed flush
op.from += int64(flushBackoff)
i.flushQueues.Requeue(op)
continue
if op.kind == opKindComplete {
level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID)
instance, exists := i.getInstanceByID(op.userID)
if !exists {
// instance no longer exists? that's bad, log and continue
level.Error(log.Logger).Log("msg", "instance not found", "tenantID", op.userID)
continue
}

isComplete := instance.CompleteBlock()
annanay25 marked this conversation as resolved.
Show resolved Hide resolved

if isComplete {
// add a flushOp for the block we just completed
i.flushQueues.Enqueue(&flushOp{
kind: opKindFlush,
from: time.Now().Unix(),
userID: instance.instanceID,
})
}

} else {
level.Debug(log.Logger).Log("msg", "flushing block", "userid", op.userID, "fp")

err := i.flushUserTraces(op.userID)
if err != nil {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "failed to flush user", "err", err)

annanay25 marked this conversation as resolved.
Show resolved Hide resolved
// re-queue failed flush
op.from += int64(flushBackoff)
i.flushQueues.Requeue(op)
continue
}
}

i.flushQueues.Clear(op)
Expand All @@ -150,12 +213,7 @@ func (i *Ingester) flushUserTraces(userID string) error {
return fmt.Errorf("instance id %s not found", userID)
}

for {
block := instance.GetBlockToBeFlushed()
if block == nil {
break
}

for _, block := range instance.GetBlocksToBeFlushed() {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
Expand Down
24 changes: 15 additions & 9 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (i *Ingester) loop(ctx context.Context) error {
for {
select {
case <-flushTicker.C:
i.sweepUsers(false)
i.sweepAllInstances(false)

case <-ctx.Done():
return nil
Expand All @@ -125,14 +125,7 @@ func (i *Ingester) loop(ctx context.Context) error {

// stopping is run when ingester is asked to stop
func (i *Ingester) stopping(_ error) error {
// This will prevent us accepting any more samples
i.stopIncomingRequests()

// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
// Next initiate our graceful exit from the ring.
return services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}
i.markUnavailable()

if i.flushQueues != nil {
i.flushQueues.Stop()
Expand All @@ -142,6 +135,19 @@ func (i *Ingester) stopping(_ error) error {
return nil
}

func (i *Ingester) markUnavailable() {
// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
// Next initiate our graceful exit from the ring.
if err := services.StopAndAwaitTerminated(context.Background(), i.lifecycler); err != nil {
level.Warn(log.Logger).Log("msg", "failed to stop ingester lifecycler", "err", err)
}
}

// This will prevent us accepting any more samples
i.stopIncomingRequests()
}

// Push implements tempopb.Pusher.Push
func (i *Ingester) Push(ctx context.Context, req *tempopb.PushRequest) (*tempopb.PushResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
Expand Down
86 changes: 52 additions & 34 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ type instance struct {
tracesMtx sync.Mutex
traces map[uint32]*trace

blocksMtx sync.RWMutex
headBlock *wal.AppendBlock
completingBlock *wal.AppendBlock
completeBlocks []*encoding.CompleteBlock
blocksMtx sync.RWMutex
headBlock *wal.AppendBlock
completingBlocks []*wal.AppendBlock
completeBlocks []*encoding.CompleteBlock

lastBlockCut time.Time

Expand Down Expand Up @@ -133,59 +133,77 @@ func (i *instance) CutCompleteTraces(cutoff time.Duration, immediate bool) error
return nil
}

func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) error {
// CutBlockIfReady cuts a completingBlock from the HeadBlock if ready
// Returns a bool indicating if a block was cut along with the error (if any).
func (i *instance) CutBlockIfReady(maxBlockLifetime time.Duration, maxBlockBytes uint64, immediate bool) (bool, error) {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

if i.headBlock == nil || i.headBlock.DataLength() == 0 {
return nil
return false, nil
}

now := time.Now()
if i.lastBlockCut.Add(maxBlockLifetime).Before(now) || i.headBlock.DataLength() >= maxBlockBytes || immediate {
if i.completingBlock != nil {
return fmt.Errorf("unable to complete head block for %s b/c there is already a completing block. Will try again next cycle", i.instanceID)
}
i.completingBlocks = append(i.completingBlocks, i.headBlock)

i.completingBlock = i.headBlock
err := i.resetHeadBlock()
if err != nil {
return fmt.Errorf("failed to resetHeadBlock: %w", err)
return true, fmt.Errorf("failed to resetHeadBlock: %w", err)
}

// todo : this should be a queue of blocks to complete with workers
go func() {
completeBlock, err := i.writer.CompleteBlock(i.completingBlock, i)
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

if err != nil {
// this is a really bad error that results in data loss. most likely due to disk full
_ = i.completingBlock.Clear()
metricFailedFlushes.Inc()
i.completingBlock = nil
level.Error(log.Logger).Log("msg", "unable to complete block. THIS BLOCK WAS LOST", "tenantID", i.instanceID, "err", err)
return
}
i.completingBlock = nil
i.completeBlocks = append(i.completeBlocks, completeBlock)
}()
return true, nil
}

return nil
return false, nil
}

// CompleteBlock() moves a completingBlock to a completeBlock
func (i *instance) CompleteBlock() bool {
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
i.blocksMtx.Lock()

var completingBlock *wal.AppendBlock
if len(i.completingBlocks) > 0 {
completingBlock = i.completingBlocks[0]
}
i.blocksMtx.Unlock()

if completingBlock == nil {
return false
}

// potentially long running operation placed outside blocksMtx
completeBlock, err := i.writer.CompleteBlock(completingBlock, i)

i.blocksMtx.Lock()
if err != nil {
// this is a really bad error that results in data loss. most likely due to disk full
_ = completingBlock.Clear()
metricFailedFlushes.Inc()
level.Error(log.Logger).Log("msg", "unable to complete block. THIS BLOCK WAS LOST", "tenantID", i.instanceID, "err", err)
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
i.blocksMtx.Unlock()
return false
}
i.completeBlocks = append(i.completeBlocks, completeBlock)
i.completingBlocks = i.completingBlocks[1:]
i.blocksMtx.Unlock()

return true
}

func (i *instance) GetBlockToBeFlushed() *encoding.CompleteBlock {
// GetBlocksToBeFlushed gets a list of blocks that can be flushed to the backend
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
func (i *instance) GetBlocksToBeFlushed() []*encoding.CompleteBlock {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

completeBlockList := []*encoding.CompleteBlock{}
for _, c := range i.completeBlocks {
if c.FlushedTime().IsZero() {
return c
completeBlockList = append(completeBlockList, c)
}
}

return nil
return completeBlockList
}

func (i *instance) ClearFlushedBlocks(completeBlockTimeout time.Duration) error {
Expand Down Expand Up @@ -240,8 +258,8 @@ func (i *instance) FindTraceByID(id []byte) (*tempopb.Trace, error) {
allBytes = i.Combine(foundBytes, allBytes)

// completingBlock
if i.completingBlock != nil {
foundBytes, err = i.completingBlock.Find(id, i)
for _, c := range i.completingBlocks {
foundBytes, err = c.Find(id, i)
if err != nil {
return nil, fmt.Errorf("completingBlock.Find failed: %w", err)
}
Expand Down
Loading