Skip to content

Commit

Permalink
use of channels
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 committed Dec 12, 2021
1 parent b0a74d5 commit 9af084a
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,22 +202,21 @@ func HeadersDownward(

stopped := false
prevProgress := header.Number.Uint64()
doneCh := make(chan bool)
go func() {
for {
timer := time.NewTimer(1 * time.Second)
select {
case <-ctx.Done():
stopped = true
doneCh <- true
return
case <-logEvery.C:
diff := prevProgress - cfg.hd.Progress()
if cfg.hd.Progress() <= prevProgress {
log.Info("Wrote Block Headers backwards", "from", header.Number.Uint64(),
"now", cfg.hd.Progress(), "blk/sec", float64(diff)/float64(logInterval/time.Second))
prevProgress = cfg.hd.Progress()
}
}
timer.Stop()
if stopped {
case <-doneCh:
return
}
}
Expand All @@ -239,7 +238,8 @@ func HeadersDownward(
req = cfg.hd.RequestMoreHeadersForPOS()
_, sentToPeer = cfg.headerReqSend(ctx, &req)
if !sentToPeer {
time.Sleep(30 * time.Millisecond)
timer := time.NewTimer(30 * time.Millisecond)
timer.Stop()
}
}
// Load headers into the database
Expand All @@ -250,11 +250,17 @@ func HeadersDownward(
if cfg.hd.Synced() { // We do not break unless there best header changed
stopped = true
}
select {
case <-doneCh:
stopped = true
default:
}
}
// If the user stopped it, we dont update anything
if !cfg.hd.Synced() {
return nil
}
doneCh <- true
if err := headerCollector.Load(tx, kv.Headers, etl.IdentityLoadFunc, etl.TransformArgs{
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
Expand Down

0 comments on commit 9af084a

Please sign in to comment.