Skip to content

Commit

Permalink
Merge a bunch of stuff into ConnStats and refactor connection.upload
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Feb 2, 2018
1 parent c950778 commit b0c1f99
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 48 deletions.
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
return fmt.Errorf("data has bad offset in payload: %d", begin)
}
t.saveMetadataPiece(piece, payload[begin:])
c.UsefulChunksReceived++
c.stats.ChunksReadUseful++
c.lastUsefulChunkReceived = time.Now()
return t.maybeCompleteMetadata()
case pp.RequestMetadataExtensionMsgType:
Expand Down
30 changes: 24 additions & 6 deletions conn_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,34 @@ import (
pp "github.com/anacrolix/torrent/peer_protocol"
)

// Various connection-level metrics. At the Torrent level these are
// aggregates. Chunks are messages with data payloads. Data is actual torrent
// content without any overhead. Useful is something we needed locally.
// Unwanted is something we didn't ask for (but may still be useful). Written
// is things sent to the peer, and Read is stuff received from them.
type ConnStats struct {
// Torrent "piece" messages, or data chunks.
ChunksWritten int64 // Num piece messages sent.
ChunksRead int64
// Total bytes on the wire. Includes handshakes and encryption.
BytesWritten int64
BytesRead int64
// Data bytes, actual torrent data.
DataBytesWritten int64
DataBytesRead int64

// The rest of the stats only occur on connections after handshakes.

ChunksWritten int64

ChunksRead int64
ChunksReadUseful int64
ChunksReadUnwanted int64

DataBytesWritten int64
DataBytesRead int64
UsefulDataBytesRead int64

// Number of pieces data was written to, that subsequently passed verification.
GoodPiecesDirtied int64
// Number of pieces data was written to, that subsequently failed
// verification. Note that a connection may not have been the sole dirtier
// of a piece.
BadPiecesDirtied int64
}

func (cs *ConnStats) wroteMsg(msg *pp.Message) {
Expand Down
72 changes: 37 additions & 35 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@ type connection struct {
uTP bool
closed missinggo.Event

stats ConnStats
UnwantedChunksReceived int
UsefulChunksReceived int
chunksSent int
goodPiecesDirtied int
badPiecesDirtied int
stats ConnStats

lastMessageReceived time.Time
completedHandshake time.Time
Expand Down Expand Up @@ -213,9 +208,9 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
" %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
cn.completedString(),
len(cn.peerTouchedPieces),
cn.UsefulChunksReceived,
cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
cn.chunksSent,
cn.stats.ChunksReadUseful,
cn.stats.ChunksReadUnwanted+cn.stats.ChunksReadUseful,
cn.stats.ChunksWritten,
cn.numLocalRequests(),
len(cn.PeerRequests),
cn.statusFlags(),
Expand Down Expand Up @@ -1079,14 +1074,14 @@ func (c *connection) receiveChunk(msg *pp.Message) {
// Do we actually want this chunk?
if !t.wantPiece(req) {
unwantedChunksReceived.Add(1)
c.UnwantedChunksReceived++
c.stats.ChunksReadUnwanted++
return
}

index := int(req.Index)
piece := &t.pieces[index]

c.UsefulChunksReceived++
c.stats.ChunksReadUseful++
c.lastUsefulChunkReceived = time.Now()
// if t.fastestConn != c {
// log.Printf("setting fastest connection %p", c)
Expand Down Expand Up @@ -1142,51 +1137,59 @@ func (c *connection) receiveChunk(msg *pp.Message) {
t.publishPieceChange(int(req.Index))
}

// Also handles choking and unchoking of the remote peer.
func (c *connection) upload(msg func(pp.Message) bool) bool {
t := c.t
cl := t.cl
if cl.config.NoUpload {
return true
func (c *connection) uploadAllowed() bool {
if c.t.cl.config.NoUpload {
return false
}
if !c.PeerInterested {
if c.t.seeding() {
return true
}
seeding := t.seeding()
if !seeding && !c.peerHasWantedPieces() {
// There's no reason to upload to this peer.
return true
if !c.peerHasWantedPieces() {
return false
}
// Don't upload more than 100 KiB more than we download.
if c.stats.DataBytesWritten >= c.stats.DataBytesRead+100<<10 {
return false
}
return true
}

func (c *connection) setRetryUploadTimer(delay time.Duration) {
if c.uploadTimer == nil {
c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
} else {
c.uploadTimer.Reset(delay)
}
}

// Also handles choking and unchoking of the remote peer.
func (c *connection) upload(msg func(pp.Message) bool) bool {
// Breaking or completing this loop means we don't want to upload to the
// peer anymore, and we choke them.
another:
for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
for c.uploadAllowed() {
// We want to upload to the peer.
if !c.Unchoke(msg) {
return false
}
for r := range c.PeerRequests {
res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
if !res.OK() {
panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
}
delay := res.Delay()
if delay > 0 {
res.Cancel()
if c.uploadTimer == nil {
c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
} else {
c.uploadTimer.Reset(delay)
}
c.setRetryUploadTimer(delay)
// Hard to say what to return here.
return true
}
more, err := c.sendChunk(r, msg)
if err != nil {
i := int(r.Index)
if t.pieceComplete(i) {
t.updatePieceCompletion(i)
if !t.pieceComplete(i) {
if c.t.pieceComplete(i) {
c.t.updatePieceCompletion(i)
if !c.t.pieceComplete(i) {
// We had the piece, but not anymore.
break another
}
Expand Down Expand Up @@ -1214,8 +1217,8 @@ func (cn *connection) Drop() {
cn.t.dropConnection(cn)
}

func (cn *connection) netGoodPiecesDirtied() int {
return cn.goodPiecesDirtied - cn.badPiecesDirtied
func (cn *connection) netGoodPiecesDirtied() int64 {
return cn.stats.GoodPiecesDirtied - cn.stats.BadPiecesDirtied
}

func (c *connection) peerHasWantedPieces() bool {
Expand Down Expand Up @@ -1275,7 +1278,6 @@ func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool,
Begin: r.Begin,
Piece: b,
})
c.chunksSent++
uploadChunksPosted.Add(1)
c.lastChunkSent = time.Now()
return
Expand Down
2 changes: 1 addition & 1 deletion connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
}
w.Close()
require.NoError(b, <-mrlErr)
require.EqualValues(b, b.N, cn.UsefulChunksReceived)
require.EqualValues(b, b.N, cn.stats.ChunksReadUseful)
}

func TestConnectionReceiveBadChunkIndex(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ func (t *Torrent) worstBadConn() *connection {
heap.Init(&wcs)
for wcs.Len() != 0 {
c := heap.Pop(&wcs).(*connection)
if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful {
return c
}
if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
Expand Down Expand Up @@ -1516,7 +1516,7 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
p.everHashed = true
if correct {
for _, c := range touchers {
c.goodPiecesDirtied++
c.stats.GoodPiecesDirtied++
}
err := p.Storage().MarkComplete()
if err != nil {
Expand All @@ -1526,11 +1526,11 @@ func (t *Torrent) pieceHashed(piece int, correct bool) {
if len(touchers) != 0 {
for _, c := range touchers {
// Y u do dis peer?!
c.badPiecesDirtied++
c.stats.BadPiecesDirtied++
}
slices.Sort(touchers, connLessTrusted)
if t.cl.config.Debug {
log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
for _, c := range touchers {
ret = append(ret, c.netGoodPiecesDirtied())
}
Expand Down
4 changes: 3 additions & 1 deletion torrent_stats.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package torrent

type TorrentStats struct {
ConnStats // Aggregates stats over all connections past and present.
// Aggregates stats over all connections past and present. Some values may
// not have much meaning in the aggregate context.
ConnStats

// Ordered by expected descending quantities (if all is well).
TotalPeers int
Expand Down

0 comments on commit b0c1f99

Please sign in to comment.