Skip to content

Commit

Permalink
Logically Split Router and Ingestor Mode
Browse files Browse the repository at this point in the history
Made changes where the if conditions are reduced.

Signed-off-by: Kartik-Garg <[email protected]>
  • Loading branch information
Kartik-Garg committed Jan 16, 2023
1 parent a19ad80 commit daff214
Showing 1 changed file with 45 additions and 39 deletions.
84 changes: 45 additions & 39 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,51 @@ func runReceive(
enableIngestion := receiveMode == receive.IngestorOnly || receiveMode == receive.RouterIngestor

upload := len(confContentYaml) > 0
// first change
grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)
var (
// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.

// reloadGRPCServer signals when - (1)TSDB is ready and the Store gRPC server can start.
// (2) The Hashring files have changed if tsdb ingestion is disabled.
// reloadGRPCServer = make(chan struct{}, 1)
// hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change.
hashringChangedChan = make(chan struct{}, 1)

dbs *receive.MultiTSDB
writer *receive.Writer
)
if enableIngestion {
dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
reg,
tsdbOpts,
lset,
conf.tenantLabelName,
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
)
writer = receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)
// uploadC signals when new blocks should be uploaded.
uploadC := make(chan struct{}, 1)
// uploadDone signals when uploading has finished.
uploadDone := make(chan struct{}, 1)

level.Debug(logger).Log("msg", "setting up TSDB")
{
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil {
return err
}
}
if upload {
if tsdbOpts.MinBlockDuration != tsdbOpts.MaxBlockDuration {
if !conf.ignoreBlockSize {
Expand Down Expand Up @@ -195,19 +239,6 @@ func runReceive(
return errors.Wrap(err, "parse relabel configuration")
}

dbs := receive.NewMultiTSDB(
conf.dataDir,
logger,
reg,
tsdbOpts,
lset,
conf.tenantLabelName,
bkt,
conf.allowOutOfOrderUpload,
hashFunc,
)
writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs)

var limitsConfig *receive.RootLimitsConfig
if conf.limitsConfig != nil {
limitsContentYaml, err := conf.limitsConfig.Content()
Expand Down Expand Up @@ -244,33 +275,8 @@ func runReceive(
Limiter: limiter,
})

grpcProbe := prober.NewGRPC()
httpProbe := prober.NewHTTP()
statusProber := prober.Combine(
httpProbe,
grpcProbe,
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completes.

// hashringChangedChan signals when TSDB needs to be flushed and updated due to hashring config change.
hashringChangedChan := make(chan struct{}, 1)

if enableIngestion {
// uploadC signals when new blocks should be uploaded.
uploadC := make(chan struct{}, 1)
// uploadDone signals when uploading has finished.
uploadDone := make(chan struct{}, 1)

level.Debug(logger).Log("msg", "setting up TSDB")
{
if err := startTSDBAndUpload(g, logger, reg, dbs, uploadC, hashringChangedChan, upload, uploadDone, statusProber, bkt, receive.HashringAlgorithm(conf.hashringsAlgorithm)); err != nil {
return err
}
}
}
//hashringChangedChan := make(chan struct{}, 1)

level.Debug(logger).Log("msg", "setting up hashring")
{
Expand Down

0 comments on commit daff214

Please sign in to comment.