Skip to content

Commit

Permalink
Do torrent storage flush on piece completion (#755)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Jul 7, 2022
1 parent b58aec9 commit 75cc4e9
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 1 deletion.
13 changes: 13 additions & 0 deletions mmap_span/mmap_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ func (ms *MMapSpan) Append(mMap mmap.MMap) {
ms.mMaps = append(ms.mMaps, mMap)
}

func (ms *MMapSpan) Flush() (errs []error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
for _, mMap := range ms.mMaps {
err := mMap.Flush()
if err != nil {
errs = append(errs, err)
}
}
return
}

func (ms *MMapSpan) Close() (errs []error) {
ms.mu.Lock()
defer ms.mu.Unlock()
Expand Down Expand Up @@ -69,6 +81,7 @@ func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte)
_n := copyBytes(copyArgs(p, mMapBytes))
p = p[_n:]
n += _n

if segments.Int(_n) != e.Length {
panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length))
}
Expand Down
6 changes: 6 additions & 0 deletions piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func (p *Piece) Storage() storage.Piece {
return p.t.storage.Piece(p.Info())
}

func (p *Piece) Flush() {
if p.t.storage.Flush != nil {
_ = p.t.storage.Flush()
}
}

func (p *Piece) pendingChunkIndex(chunkIndex chunkIndexType) bool {
return !p.chunkIndexDirty(chunkIndex)
}
Expand Down
1 change: 1 addition & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type TorrentCapacity *func() (cap int64, capped bool)
type TorrentImpl struct {
Piece func(p metainfo.Piece) PieceImpl
Close func() error
Flush func() error
// Storages that share the same space, will provide equal pointers. The function is called once
// to determine the storage for torrents sharing the same function pointer, and mutated in
// place.
Expand Down
9 changes: 8 additions & 1 deletion storage/mmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash
span: span,
pc: s.pc,
}
return TorrentImpl{Piece: t.Piece, Close: t.Close}, err
return TorrentImpl{Piece: t.Piece, Close: t.Close, Flush: t.Flush}, err
}

func (s *mmapClientImpl) Close() error {
Expand Down Expand Up @@ -71,6 +71,13 @@ func (ts *mmapTorrentStorage) Close() error {
}
return nil
}
func (ts *mmapTorrentStorage) Flush() error {
errs := ts.span.Flush()
if len(errs) > 0 {
return errs[0]
}
return nil
}

type mmapStoragePiece struct {
pc PieceCompletionGetSetter
Expand Down
4 changes: 4 additions & 0 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2007,7 +2007,11 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) {
c._stats.incrementPiecesDirtiedGood()
}
t.clearPieceTouchers(piece)
hasDirty := p.hasDirtyChunks()
t.cl.unlock()
if hasDirty {
p.Flush() // You can be synchronous here!
}
err := p.Storage().MarkComplete()
if err != nil {
t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
Expand Down

0 comments on commit 75cc4e9

Please sign in to comment.