Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix previous download completion processing #11227

Merged
merged 45 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
7fead8b
added status logging
mh0lt Jul 16, 2024
f9e1352
only print lists on not complete
mh0lt Jul 16, 2024
ee3e937
add available list
mh0lt Jul 16, 2024
b8d919b
save
Giulio2002 Jul 16, 2024
8ba2cba
save
Giulio2002 Jul 16, 2024
0318a39
save
Giulio2002 Jul 16, 2024
cdc753d
save
Giulio2002 Jul 16, 2024
a372b4f
save
Giulio2002 Jul 16, 2024
d1f1e3d
save
Giulio2002 Jul 16, 2024
d0d21f0
save
Giulio2002 Jul 16, 2024
219020c
save
Giulio2002 Jul 16, 2024
cdd3392
save
Giulio2002 Jul 16, 2024
3ce0d86
save
Giulio2002 Jul 16, 2024
16df018
save
Giulio2002 Jul 16, 2024
9f6b6b2
save
Giulio2002 Jul 16, 2024
930fc93
save
Giulio2002 Jul 16, 2024
6f311b4
save
Giulio2002 Jul 16, 2024
5cd4e19
save
Giulio2002 Jul 16, 2024
0094f56
save
Giulio2002 Jul 16, 2024
ed8e38a
save
Giulio2002 Jul 16, 2024
3ac67d6
save
Giulio2002 Jul 16, 2024
c64bc9e
save
Giulio2002 Jul 16, 2024
4e22685
save
Giulio2002 Jul 16, 2024
23d3c45
save
Giulio2002 Jul 16, 2024
302b04b
save
Giulio2002 Jul 16, 2024
b5ca96c
save
Giulio2002 Jul 17, 2024
5f1fcf4
save
Giulio2002 Jul 17, 2024
5f53f7f
Merge branch 'main' into dl_incomplete_downloads
mh0lt Jul 17, 2024
0461d30
Merge branch 'good-prune' into dl_incomplete_downloads
mh0lt Jul 17, 2024
1fdd7ed
loop print
mh0lt Jul 17, 2024
7e9fd9c
loop print
mh0lt Jul 17, 2024
88ed5f1
loop print
mh0lt Jul 17, 2024
0982b17
update test
mh0lt Jul 18, 2024
6c5d67f
update test
mh0lt Jul 18, 2024
4918fe1
remove printers
mh0lt Jul 18, 2024
d8ac494
Updated slots comment
mh0lt Jul 18, 2024
84d1347
use batch unless awaiting flush
mh0lt Jul 18, 2024
385d7d0
don't lock on persist
mh0lt Jul 18, 2024
034e99f
updated mods to v1.54.2-alpha-32
mh0lt Jul 18, 2024
fd2aa8b
Merge branch 'main' into dl_incomplete_downloads
mh0lt Jul 18, 2024
ec10b95
tidy go sum
mh0lt Jul 18, 2024
ef2476f
save
AskAlexSharov Jul 19, 2024
6718b1a
Merge branch 'main' into dl_incomplete_downloads
AskAlexSharov Jul 19, 2024
27d739a
save
AskAlexSharov Jul 19, 2024
c8db4f7
save
AskAlexSharov Jul 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ var (
TorrentDownloadSlotsFlag = cli.IntFlag{
Name: "torrent.download.slots",
Value: 128,
Usage: "Amount of files to download in parallel. If network has enough seeders 1-3 slot enough, if network has lack of seeders increase to 5-7 (too big value will slow down everything).",
Usage: "Amount of files to download in parallel.",
}
TorrentStaticPeersFlag = cli.StringFlag{
Name: "torrent.staticpeers",
Expand Down
33 changes: 29 additions & 4 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,8 +891,14 @@ func (d *Downloader) mainLoop(silent bool) error {

var pending []*torrent.Torrent

var plist []string
var clist []string
var flist []string
var dlist []string

for _, t := range torrents {
if _, ok := complete[t.Name()]; ok {
clist = append(clist, t.Name())
continue
}

Expand Down Expand Up @@ -944,6 +950,7 @@ func (d *Downloader) mainLoop(silent bool) error {
}(fileInfo, t.InfoHash(), length, *completionTime)

} else {
clist = append(clist, t.Name())
complete[t.Name()] = struct{}{}
continue
}
Expand All @@ -954,6 +961,7 @@ func (d *Downloader) mainLoop(silent bool) error {
}

if _, ok := failed[t.Name()]; ok {
flist = append(flist, t.Name())
continue
}

Expand Down Expand Up @@ -1009,14 +1017,17 @@ func (d *Downloader) mainLoop(silent bool) error {
delete(d.downloading, t.Name())
d.lock.Unlock()
complete[t.Name()] = struct{}{}
clist = append(clist, t.Name())
continue
}

if downloading {
dlist = append(dlist, t.Name())
continue
}

pending = append(pending, t)
plist = append(plist, t.Name())
}

select {
Expand Down Expand Up @@ -1150,6 +1161,20 @@ func (d *Downloader) mainLoop(silent bool) error {
}
d.lock.RUnlock()

d.lock.RLock()
completed := d.stats.Completed
d.lock.RUnlock()

if !completed {
var alist []string

for _, t := range available {
alist = append(alist, t.Name())
}

d.logger.Debug("[snapshot] download status", "pending", plist, "availible", alist, "downloading", dlist, "complete", clist, "failed", flist)
}

for _, t := range available {

torrentInfo, err := d.torrentInfo(t.Name())
Expand Down Expand Up @@ -1217,7 +1242,7 @@ func (d *Downloader) mainLoop(silent bool) error {

switch {
case len(t.PeerConns()) > 0:
d.logger.Debug("[snapshots] Downloading from BitTorrent", "file", t.Name(), "peers", len(t.PeerConns()), "webpeers", len(t.WebseedPeerConns()))
d.logger.Debug("[snapshots] Downloading from torrent", "file", t.Name(), "peers", len(t.PeerConns()), "webpeers", len(t.WebseedPeerConns()))
delete(waiting, t.Name())
d.torrentDownload(t, downloadComplete)
case len(t.WebseedPeerConns()) > 0:
Expand Down Expand Up @@ -2759,10 +2784,10 @@ func openClient(ctx context.Context, dbDir, snapDir string, cfg *torrent.ClientC
dnsResolver := &downloadercfg.DnsCacheResolver{RefreshTimeout: 24 * time.Hour}
cfg.TrackerDialContext = dnsResolver.DialContext

err = func() error {
err = func() (err error) {
defer func() {
if err := recover(); err != nil {
fmt.Printf("openTorrentClient: %v\n", err)
if e := recover(); e != nil {
err = fmt.Errorf("openTorrentClient: %v", e)
}
}()

Expand Down
81 changes: 38 additions & 43 deletions erigon-lib/downloader/mdbx_piece_completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,59 +89,54 @@ func (m *mdbxPieceCompletion) Get(pk metainfo.PieceKey) (cn storage.Completion,
return
}

func (m *mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool) error {
func (m *mdbxPieceCompletion) Set(pk metainfo.PieceKey, b bool, awaitFlush bool) error {
if c, err := m.Get(pk); err == nil && c.Ok && c.Complete == b {
return nil
}

m.mu.Lock()
defer m.mu.Unlock()

var tx kv.RwTx
var err error
// On power-off recent "no-sync" txs may be lost.
// It will cause 2 cases of in-consistency between files on disk and db metadata:
// - Good piece on disk and recent "complete" db marker lost. Self-Heal by re-download.
// - Bad piece on disk and recent "incomplete" db marker lost. No Self-Heal. Means: can't afford loosing recent "incomplete" markers.
// FYI: Fsync of torrent pieces happenng before storing db markers: https://github.com/anacrolix/torrent/blob/master/torrent.go#L2026
//
// Mainnet stats:
// call amount 2 minutes complete=100K vs incomple=1K
// 1K fsyncs/2minutes it's quite expensive, but even on cloud (high latency) drive it allow download 100mb/s
// and Erigon doesn't do anything when downloading snapshots
if b {
completed, ok := m.completed[pk.InfoHash]

if !ok {
completed = &roaring.Bitmap{}
m.completed[pk.InfoHash] = completed
}

completed.Add(uint32(pk.Index))

if flushed, ok := m.flushed[pk.InfoHash]; !ok || !flushed.Contains(uint32(pk.Index)) {
return nil
persist := func() bool {
m.mu.Lock()
defer m.mu.Unlock()

if b {
completed, ok := m.completed[pk.InfoHash]

if !ok {
completed = &roaring.Bitmap{}
m.completed[pk.InfoHash] = completed
}

completed.Add(uint32(pk.Index))

// when files are being downloaded flushing is asynchronous so we want to wait for
// the confirm before committing to the DB. This means that if the program is
// abnormally terminated the piece will be re-downloaded
if awaitFlush {
if flushed, ok := m.flushed[pk.InfoHash]; !ok || !flushed.Contains(uint32(pk.Index)) {
return false
}
}
} else {
if completed, ok := m.completed[pk.InfoHash]; ok {
completed.Remove(uint32(pk.Index))
}
}
} else {
if completed, ok := m.completed[pk.InfoHash]; ok {
completed.Remove(uint32(pk.Index))
}
}
return true
}()

tx, err = m.db.BeginRw(context.Background())
if err != nil {
return err
if !persist {
return nil
}

defer tx.Rollback()

err = putCompletion(tx, pk.InfoHash, uint32(pk.Index), b)

if err != nil {
return err
if awaitFlush {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only comment: i don't understand why if awaitFlush == true then Update, else Batch. Why not opposite?

Copy link
Collaborator

@AskAlexSharov AskAlexSharov Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing: if b == false maybe you need use inversed logic:

  • mark in DB immediately as false - not waiting for Flush
  • then on restart - you will not loose that fact - that piece was marked as bad

In another words: save in db true only after flush (after ensure that data will be good after restart), save in db false ASAP (to be conservative. to re-validate this piece after restart: even piece is good or bad on disk).

!keep in mind - that when Set(false) happens - db already may have true (for example: 1. if after download finished we start with --downloader.verify and found broken piece 2. if file was removed manually and started with --downloader.verify).

return m.db.Update(context.Background(), func(tx kv.RwTx) error {
return putCompletion(tx, pk.InfoHash, uint32(pk.Index), b)
})
}

return tx.Commit()
return m.db.Batch(func(tx kv.RwTx) error {
return putCompletion(tx, pk.InfoHash, uint32(pk.Index), b)
})
}

func putCompletion(tx kv.RwTx, infoHash infohash.T, index uint32, c bool) error {
Expand Down
4 changes: 2 additions & 2 deletions erigon-lib/downloader/mdbx_piece_completion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func TestMdbxPieceCompletion(t *testing.T) {
require.NoError(t, err)
assert.False(t, b.Ok)

require.NoError(t, pc.Set(pk, false))
require.NoError(t, pc.Set(pk, false, false))

b, err = pc.Get(pk)
require.NoError(t, err)
assert.Equal(t, storage.Completion{Complete: false, Ok: true}, b)

require.NoError(t, pc.Set(pk, true))
require.NoError(t, pc.Set(pk, true, false))

b, err = pc.Get(pk)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ require (
)

replace (
github.com/anacrolix/torrent => github.com/erigontech/torrent v1.54.2-alpha-30
github.com/anacrolix/torrent => github.com/erigontech/torrent v1.54.2-alpha-32
github.com/holiman/bloomfilter/v2 => github.com/AskAlexSharov/bloomfilter/v2 v2.0.8
github.com/tidwall/btree => github.com/AskAlexSharov/btree v1.6.2
)
4 changes: 2 additions & 2 deletions erigon-lib/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ github.com/erigontech/secp256k1 v1.1.0 h1:mO3YJMUSoASE15Ya//SoHiisptUhdXExuMUN1M
github.com/erigontech/secp256k1 v1.1.0/go.mod h1:GokhPepsMB+EYDs7I5JZCprxHW6+yfOcJKaKtoZ+Fls=
github.com/erigontech/speedtest v0.0.2 h1:W9Cvky/8AMUtUONwkLA/dZjeQ2XfkBdYfJzvhMZUO+U=
github.com/erigontech/speedtest v0.0.2/go.mod h1:vulsRNiM51BmSTbVtch4FWxKxx53pS2D35lZTtao0bw=
github.com/erigontech/torrent v1.54.2-alpha-30 h1:LUk4fTwx4FAmH3Jf5+hQ48CmAHVNKu5DXnGsMZsiIHw=
github.com/erigontech/torrent v1.54.2-alpha-30/go.mod h1:QtK2WLdEz1Iy1Dh/325UltdHU0nA1xujh2rN6aov6y0=
github.com/erigontech/torrent v1.54.2-alpha-32 h1:Ly8W2JvD7r1o5TklXxKEV9D9Tr664tSrgj5OPpOrlWg=
github.com/erigontech/torrent v1.54.2-alpha-32/go.mod h1:QtK2WLdEz1Iy1Dh/325UltdHU0nA1xujh2rN6aov6y0=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/frankban/quicktest v1.9.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,6 @@ require (
)

replace (
github.com/anacrolix/torrent => github.com/erigontech/torrent v1.54.2-alpha-30
github.com/anacrolix/torrent => github.com/erigontech/torrent v1.54.2-alpha-32
github.com/holiman/bloomfilter/v2 => github.com/AskAlexSharov/bloomfilter/v2 v2.0.8
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,10 @@ github.com/erigontech/secp256k1 v1.1.0 h1:mO3YJMUSoASE15Ya//SoHiisptUhdXExuMUN1M
github.com/erigontech/secp256k1 v1.1.0/go.mod h1:GokhPepsMB+EYDs7I5JZCprxHW6+yfOcJKaKtoZ+Fls=
github.com/erigontech/silkworm-go v0.18.0 h1:j56p61xZHBFhZGH1OixlGU8KcfjHzcw9pjAfjmVsOZA=
github.com/erigontech/silkworm-go v0.18.0/go.mod h1:O50ux0apICEVEGyRWiE488K8qz8lc3PA/SXbQQAc8SU=
github.com/erigontech/torrent v1.54.2-alpha-32 h1:Ly8W2JvD7r1o5TklXxKEV9D9Tr664tSrgj5OPpOrlWg=
github.com/erigontech/torrent v1.54.2-alpha-32/go.mod h1:QtK2WLdEz1Iy1Dh/325UltdHU0nA1xujh2rN6aov6y0=
github.com/erigontech/speedtest v0.0.2 h1:W9Cvky/8AMUtUONwkLA/dZjeQ2XfkBdYfJzvhMZUO+U=
github.com/erigontech/speedtest v0.0.2/go.mod h1:vulsRNiM51BmSTbVtch4FWxKxx53pS2D35lZTtao0bw=
github.com/erigontech/torrent v1.54.2-alpha-30 h1:LUk4fTwx4FAmH3Jf5+hQ48CmAHVNKu5DXnGsMZsiIHw=
github.com/erigontech/torrent v1.54.2-alpha-30/go.mod h1:QtK2WLdEz1Iy1Dh/325UltdHU0nA1xujh2rN6aov6y0=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/fjl/gencodec v0.0.0-20220412091415-8bb9e558978c h1:CndMRAH4JIwxbW8KYq6Q+cGWcGHz0FjGR3QqcInWcW0=
Expand Down
Loading