Skip to content

Commit

Permalink
replace tracker announce status updates with callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
marcovidonis committed Jan 23, 2025
1 parent 5fd510c commit 19bbecf
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 42 deletions.
46 changes: 8 additions & 38 deletions webtorrent/tracker-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ import (
"sync"
"time"

"github.com/anacrolix/torrent/types/infohash"

g "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/gorilla/websocket"
"github.com/pion/webrtc/v4"
"go.opentelemetry.io/otel/trace"

"github.com/anacrolix/torrent/tracker"
"github.com/anacrolix/torrent/types/infohash"
)

type TrackerStatus struct {
Expand Down Expand Up @@ -67,8 +66,10 @@ type TrackerClient struct {
rtcPeerConns map[string]*wrappedPeerConnection

// callbacks
OnConnected func(error)
OnDisconnected func(error)
OnConnected func(error)
OnDisconnected func(error)
OnAnnounceSuccessful func(ih string)
OnAnnounceError func(ih string, err error)
}

func (me *TrackerClient) Stats() TrackerClientStats {
Expand Down Expand Up @@ -159,15 +160,6 @@ func (tc *TrackerClient) doWebsocket() error {
return err
}

func (tc *TrackerClient) updateTrackerAnnounceStatus(status AnnounceStatus) {
if tc.Observers != nil {
select {
case tc.Observers.AnnounceStatus <- status:
default:
}
}
}

// Finishes initialization and spawns the run routine, calling onStop when it completes with the
// result. We don't let the caller just spawn the runner directly, since then we can race against
// .Close to finish initialization.
Expand Down Expand Up @@ -297,17 +289,7 @@ func (tc *TrackerClient) Announce(event tracker.AnnounceEvent, infoHash [20]byte
func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte, offers []outboundOffer) error {
request, err := tc.GetAnnounceRequest(event, infoHash)
if err != nil {
tc.updateTrackerAnnounceStatus(AnnounceStatus{
TrackerStatus: TrackerStatus{
Url: tc.Url,
Ok: false,
Err: err,
},
Event: "",
InfoHash: infohash.T(infoHash).HexString(),
})
}
if err != nil {
tc.OnAnnounceError(infohash.T(infoHash).HexString(), err)
return fmt.Errorf("getting announce parameters: %w", err)
}

Expand All @@ -328,16 +310,6 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte
})
}

announceStatus := AnnounceStatus{
TrackerStatus: TrackerStatus{
Url: tc.Url,
Ok: true,
Err: nil,
},
Event: req.Event,
InfoHash: infohash.T(infoHash).HexString(),
}

data, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("marshalling request: %w", err)
Expand All @@ -347,12 +319,10 @@ func (tc *TrackerClient) announce(event tracker.AnnounceEvent, infoHash [20]byte
defer tc.mu.Unlock()
err = tc.writeMessage(data)
if err != nil {
announceStatus.Ok = false
announceStatus.Err = err
tc.updateTrackerAnnounceStatus(announceStatus)
tc.OnAnnounceError(infohash.T(infoHash).HexString(), err)
return fmt.Errorf("write AnnounceRequest: %w", err)
}
tc.updateTrackerAnnounceStatus(announceStatus)
tc.OnAnnounceSuccessful(infohash.T(infoHash).HexString())
g.MakeMapIfNil(&tc.outboundOffers)
for _, offer := range offers {
g.MapInsert(tc.outboundOffers, offer.offerId, offer.outboundOfferValue)
Expand Down
27 changes: 23 additions & 4 deletions wstracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,40 @@ func (me *websocketTrackers) Get(url string, infoHash [20]byte) (*webtorrent.Tra
),
WebsocketTrackerHttpHeader: me.WebsocketTrackerHttpHeader,
ICEServers: me.ICEServers,
OnConnected: func(Error error) {
OnConnected: func(err error) {
for _, cb := range me.callbacks.StatusUpdated {
cb(StatusUpdatedEvent{
Event: TrackerConnected,
Url: url,
Error: Error,
Error: err,
})
}
},
OnDisconnected: func(Error error) {
OnDisconnected: func(err error) {
for _, cb := range me.callbacks.StatusUpdated {
cb(StatusUpdatedEvent{
Event: TrackerDisconnected,
Url: url,
Error: Error,
Error: err,
})
}
},
OnAnnounceSuccessful: func(ih string) {
for _, cb := range me.callbacks.StatusUpdated {
cb(StatusUpdatedEvent{
Event: TrackerAnnounceSuccessful,
Url: url,
InfoHash: ih,
})
}
},
OnAnnounceError: func(ih string, err error) {
for _, cb := range me.callbacks.StatusUpdated {
cb(StatusUpdatedEvent{
Event: TrackerAnnounceError,
Url: url,
Error: err,
InfoHash: ih,
})
}
},
Expand Down

0 comments on commit 19bbecf

Please sign in to comment.