Skip to content

Commit

Permalink
update docs and comments as well as set some lower times on a few bol…
Browse files Browse the repository at this point in the history
…tdb-shipper timers

Signed-off-by: Edward Welch <[email protected]>
  • Loading branch information
slim-bean committed May 24, 2022
1 parent 9e15ea2 commit f6d8f09
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 45 deletions.
57 changes: 54 additions & 3 deletions cmd/migrate/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ Two stores are created, a source store and destination (abbreviated dest) store.

Chunks are queried from the source store and written to the dest store, new index entries are created in the dest store as well.

This _should_ handle schema changes and different schemas on both the source and dest store.

You should be able to:

* Migrate between clusters
Expand All @@ -22,7 +20,7 @@ This is simple and because it uses the storage interfaces, should be complete an

There is however some parallelism built in and there are a few flags to tune this, `migrate -help` for more info

This does not remove any source data, it only reads existing source data and writes to the destination.
This does not remove or modify any source data, it only reads existing source data and writes to the destination.

## Usage

Expand Down Expand Up @@ -54,3 +52,56 @@ Migrate tenant ID within a cluster
```
migrate -source.config.file=/etc/loki-us-west1/config/config.yaml -dest.config.file=/etc/loki-us-west1/config/config.yaml -source.tenant=fake -dest.tenant=1 -from=2020-06-16T14:00:00-00:00 -to=2020-07-01T00:00:00-00:00
```

### Stopping and restarting

It's ok to process the same data multiple times, chunks are uniquely addressable, they will just replace each other.

For boltdb-shipper you will end up with multiple index files which contain duplicate entries,
Loki will handle this without issue and the compactor will reduce the number of files if there are more than 3
(TODO we should make a compactor mode which forces cleanup to a single file)

You can use the output of the processed sync ranges to help in restarting from a point of already processed data,
however be aware that because of parallel processing, you need to find the last finished time for *ALL* the threads
to determine where processing finished, because of the parallel dispatching of sync ranges the order of messages
will not always be sorted chronologically.

Also be aware of special considerations for a boltdb-shipper destination outlined below.

### batchLen, shardBy, and parallel flags

The defaults here are probably ok for normal sized computers.

If sending data from something like a Raspberry Pi, you probably want to run something like `-batchLen=10 -parallel=4` or risk running out of memory.

The transfer works by breaking up the time range into `shardBy` windows, a window is called a sync range,
each sync range is then dispatched to one of up to `parallel` worker threads.

For each sync range, the source index is queried for all the chunks in the sync range, then the list of chunks is processed `batchLen` at a time from the source,
re-encoded if necessary (such as changing tenant ID), and send to the destination store. You need enough memory to handle having `batchlen` chunks in memory
times the number of `parallel` threads.

If you have a really huge amount of chunks, many tens or hundreds of thousands per day, you might want to decrease `shardBy` to a smaller window.
If you don't have many chunks and `shardBy` is too small you will process the same chunks from multiple sync ranges.

`parallel` can likely be increased up to a point until you saturate your CPU or exhaust memory.

If you have a lot of really tiny chunks it may make sense to increase `batchLen`, but I'm not sure how much changing this affects the performance.

There is not an exact science to tuning these params yet,
the output window gives information on throughput and you can play around with values to maximize throughput for your data.


### boltdb-shipper

When the destination index type is boltdb shipper, it's important to make sure index files are uploaded.
This happens automatically in the background with some timers built into the boltdb-shipper code.
It also happens explicitly when all the sync ranges have been processed and the store is shutdown.

However, if the process crashes while processing, there may be index files which were not uploaded.

If restarting after a crash, it's best to overlap the start time with previously processed sync ranges.
Exactly how much to overlap is hard to say, you could look for the most recently uploaded index file in
the destination store which is the number of days since the unix epoch, and convert it to seconds to see what day it is.

e.g. index_18262: 18262 * (60 * 60 * 24) = 1577836800 which is `Wednesday, January 1, 2020 12:00:00 AM`
107 changes: 65 additions & 42 deletions cmd/migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,12 @@ func main() {
destConfig.ChunkStoreConfig.WriteDedupeCacheConfig.MemcacheClient = defaultsConfig.ChunkStoreConfig.WriteDedupeCacheConfig.MemcacheClient
destConfig.ChunkStoreConfig.WriteDedupeCacheConfig.Redis = defaultsConfig.ChunkStoreConfig.WriteDedupeCacheConfig.Redis

//// Load each from provided files
//if err := cfg.YAML(*sf, true)(&sourceConfig); err != nil {
// log.Printf("Failed parsing source config file %v: %v\n", *sf, err)
// os.Exit(1)
//}
//if err := cfg.YAML(*df, true)(&destConfig); err != nil {
// log.Printf("Failed parsing dest config file %v: %v\n", *df, err)
// os.Exit(1)
//}
// Don't keep fetched index files for very long
sourceConfig.StorageConfig.BoltDBShipperConfig.CacheTTL = 30 * time.Minute

// Shorten these timers up so we resync a little faster and clear index files a little quicker
destConfig.StorageConfig.IndexCacheValidity = 1 * time.Minute
destConfig.StorageConfig.BoltDBShipperConfig.ResyncInterval = 1 * time.Minute

// The long nature of queries requires stretching out the cardinality limit some and removing the query length limit
sourceConfig.LimitsConfig.CardinalityLimit = 1e9
Expand Down Expand Up @@ -160,43 +157,51 @@ func main() {
ctx := context.Background()
// This is a little weird but it was the easiest way to guarantee the userID is in the right format
ctx = user.InjectOrgID(ctx, *source)
//userID, err := tenant.TenantID(ctx)
//if err != nil {
// panic(err)
//}

parsedFrom := mustParse(*from)
parsedTo := mustParse(*to)
//f, t := util.RoundToMilliseconds(parsedFrom, parsedTo)

//schemaGroups, fetchers, err := s.GetChunkRefs(ctx, userID, f, t, matchers...)
//if err != nil {
// log.Println("Error querying index for chunk refs:", err)
// os.Exit(1)
//}

//var totalChunks int
//for i := range schemaGroups {
// totalChunks += len(schemaGroups[i])
//}
//rdr := bufio.NewReader(os.Stdin)
//fmt.Printf("Timespan will sync %v chunks spanning %v schemas.\n", totalChunks, len(fetchers))
//fmt.Print("Proceed? (Y/n):")
//in, err := rdr.ReadString('\n')
//if err != nil {
// log.Fatalf("Error reading input: %v", err)
//}
//if strings.ToLower(strings.TrimSpace(in)) == "n" {
// log.Println("Exiting")
// os.Exit(0)
//}

// This code was helpful in scoping out the amount of data to be processed but it's slow to run over long time ranges.
// Leaving it here for now commented out
/*
userID, err := tenant.TenantID(ctx)
if err != nil {
panic(err)
}
f, t := util.RoundToMilliseconds(parsedFrom, parsedTo)
schemaGroups, fetchers, err := s.GetChunkRefs(ctx, userID, f, t, matchers...)
if err != nil {
log.Println("Error querying index for chunk refs:", err)
os.Exit(1)
}
var totalChunks int
for i := range schemaGroups {
totalChunks += len(schemaGroups[i])
}
rdr := bufio.NewReader(os.Stdin)
fmt.Printf("Timespan will sync %v chunks spanning %v schemas.\n", totalChunks, len(fetchers))
fmt.Print("Proceed? (Y/n):")
in, err := rdr.ReadString('\n')
if err != nil {
log.Fatalf("Error reading input: %v", err)
}
if strings.ToLower(strings.TrimSpace(in)) == "n" {
log.Println("Exiting")
os.Exit(0)
}
*/

start := time.Now()

shardByNs := *shardBy
syncRanges := calcSyncRanges(parsedFrom.UnixNano(), parsedTo.UnixNano(), shardByNs.Nanoseconds())
log.Printf("With a shard duration of %v, %v ranges have been calculated.\n", shardByNs, len(syncRanges))
log.Printf("With a shard duration of %v, %v ranges have been calculated.\n", shardByNs, len(syncRanges)-1)

// Pass dest schema config, the destination determines the new chunk external keys using potentially a different schema config.
cm := newChunkMover(ctx, destConfig.SchemaConfig, s, d, *source, *dest, matchers, *batch)
cm := newChunkMover(ctx, destConfig.SchemaConfig, s, d, *source, *dest, matchers, *batch, len(syncRanges)-1)
syncChan := make(chan *syncRange)
errorChan := make(chan error)
statsChan := make(chan stats)
Expand Down Expand Up @@ -234,7 +239,7 @@ func main() {
processedChunks += stat.totalChunks
processedBytes += stat.totalBytes
}
log.Printf("Transferring %v chunks totalling %v bytes in %v\n", processedChunks, processedBytes, time.Since(start))
log.Printf("Transferring %v chunks totalling %s in %v for an average throughput of %s/second\n", processedChunks, ByteCountDecimal(processedBytes), time.Since(start), ByteCountDecimal(uint64(float64(processedBytes)/time.Since(start).Seconds())))
log.Println("Exiting stats thread")
}()

Expand All @@ -249,7 +254,10 @@ func main() {
log.Println("Waiting for threads to exit")
wg.Wait()
close(statsChan)
log.Println("All threads finished")
log.Println("All threads finished, stopping destination store (uploading index files for boltdb-shipper)")

// For boltdb shipper this is important as it will upload all the index files.
d.Stop()

log.Println("Going to sleep....")
for {
Expand Down Expand Up @@ -299,9 +307,10 @@ type chunkMover struct {
destUser string
matchers []*labels.Matcher
batch int
syncRanges int
}

func newChunkMover(ctx context.Context, s config.SchemaConfig, source, dest storage.Store, sourceUser, destUser string, matchers []*labels.Matcher, batch int) *chunkMover {
func newChunkMover(ctx context.Context, s config.SchemaConfig, source, dest storage.Store, sourceUser, destUser string, matchers []*labels.Matcher, batch int, syncRanges int) *chunkMover {
cm := &chunkMover{
ctx: ctx,
schema: s,
Expand All @@ -311,6 +320,7 @@ func newChunkMover(ctx context.Context, s config.SchemaConfig, source, dest stor
destUser: destUser,
matchers: matchers,
batch: batch,
syncRanges: syncRanges,
}
return cm
}
Expand Down Expand Up @@ -416,7 +426,7 @@ func (m *chunkMover) moveChunks(ctx context.Context, threadID int, syncRangeCh <
//log.Println(threadID, "Batch sent successfully")
}
}
log.Printf("%d Finished processing sync range %d - Start: %v, End: %v, %v chunks, %v bytes in %v seconds %v bytes/second\n", threadID, sr.number, time.Unix(0, sr.from).UTC(), time.Unix(0, sr.to).UTC(), totalChunks, totalBytes, time.Since(start).Seconds(), float64(totalBytes)/time.Since(start).Seconds())
log.Printf("%d Finished processing sync range %d of %d - Start: %v, End: %v, %v chunks, %s in %.1f seconds %s/second\n", threadID, sr.number, m.syncRanges, time.Unix(0, sr.from).UTC(), time.Unix(0, sr.to).UTC(), totalChunks, ByteCountDecimal(totalBytes), time.Since(start).Seconds(), ByteCountDecimal(uint64(float64(totalBytes)/time.Since(start).Seconds())))
statsCh <- stats{
totalChunks: totalChunks,
totalBytes: totalBytes,
Expand All @@ -433,3 +443,16 @@ func mustParse(t string) time.Time {

return ret
}

func ByteCountDecimal(b uint64) string {
const unit = 1000
if b < unit {
return fmt.Sprintf("%d B", b)
}
div, exp := uint64(unit), 0
for n := b / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(b)/float64(div), "kMGTPE"[exp])
}

0 comments on commit f6d8f09

Please sign in to comment.