Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

block sent listener is not called for every block sent #126

Closed
dirkmc opened this issue Dec 8, 2020 · 3 comments · Fixed by #128
Closed

block sent listener is not called for every block sent #126

dirkmc opened this issue Dec 8, 2020 · 3 comments · Fixed by #128
Assignees
Labels
need/triage Needs initial labeling and prioritization

Comments

@dirkmc
Copy link
Collaborator

dirkmc commented Dec 8, 2020

I added TestDataSentEvents as an integration test in go-data-transfer to watch for events emitted. It seems that gsBlockSentHook is not called every time that a block is sent (whereas gsIncomingBlockHook is called every time a block is received). gsBlockSentHook is registered with graphsync using RegisterBlockSentListener.

The full output from the test is below. Note the output for the last two lines:

prov [Completed] CleanupComplete: qd 0, rcvd 20439, sent 0, total: 0
clnt [Completed] CleanupComplete: qd 20439, rcvd 0, sent 14295, total: 0

On the provider graphsync emitted "receive" events for 20439 bytes.
On the client graphsync emitted "queue" events for 20439 bytes, but only emitted "sent" events for 14295 bytes.

Full output:

clnt [Requested] Open: qd 0, rcvd 0, sent 0, total: 0
prov [Requested] Open: qd 0, rcvd 0, sent 0, total: 0
prov [Ongoing] Accept: qd 0, rcvd 0, sent 0, total: 0
clnt [Ongoing] Accept: qd 0, rcvd 0, sent 0, total: 0
clnt [Ongoing] ResumeResponder: qd 0, rcvd 0, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 966, rcvd 0, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 1990, rcvd 0, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 3014, rcvd 0, sent 0, total: 0
clnt [Ongoing] DataSent: qd 3014, rcvd 0, sent 966, total: 0
clnt [Ongoing] DataQueued: qd 4038, rcvd 0, sent 966, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 966, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 5062, rcvd 0, sent 966, total: 0
clnt [Ongoing] DataSent: qd 5062, rcvd 0, sent 1990, total: 0
clnt [Ongoing] DataQueued: qd 6086, rcvd 0, sent 1990, total: 0
clnt [Ongoing] DataQueued: qd 7110, rcvd 0, sent 1990, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 1990, sent 0, total: 0
clnt [Ongoing] DataSent: qd 7110, rcvd 0, sent 3014, total: 0
clnt [Ongoing] DataQueued: qd 8134, rcvd 0, sent 3014, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 3014, sent 0, total: 0
clnt [Ongoing] DataSent: qd 8134, rcvd 0, sent 4038, total: 0
clnt [Ongoing] DataQueued: qd 9158, rcvd 0, sent 4038, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 4038, sent 0, total: 0
clnt [Ongoing] DataSent: qd 9158, rcvd 0, sent 5062, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 5062, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 10182, rcvd 0, sent 5062, total: 0
clnt [Ongoing] DataSent: qd 10182, rcvd 0, sent 6086, total: 0
clnt [Ongoing] DataQueued: qd 11206, rcvd 0, sent 6086, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 6086, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 12230, rcvd 0, sent 6086, total: 0
clnt [Ongoing] DataSent: qd 12230, rcvd 0, sent 7110, total: 0
clnt [Ongoing] DataQueued: qd 13254, rcvd 0, sent 7110, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 7110, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 14278, rcvd 0, sent 7110, total: 0
clnt [Ongoing] DataSent: qd 14278, rcvd 0, sent 8134, total: 0
clnt [Ongoing] DataQueued: qd 15302, rcvd 0, sent 8134, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 8134, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 16326, rcvd 0, sent 8134, total: 0
clnt [Ongoing] DataSent: qd 16326, rcvd 0, sent 9158, total: 0
clnt [Ongoing] DataQueued: qd 17350, rcvd 0, sent 9158, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 9158, sent 0, total: 0
clnt [Ongoing] DataSent: qd 17350, rcvd 0, sent 10182, total: 0
clnt [Ongoing] DataQueued: qd 18374, rcvd 0, sent 10182, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 10182, sent 0, total: 0
clnt [Ongoing] DataQueued: qd 19398, rcvd 0, sent 10182, total: 0
clnt [Ongoing] DataSent: qd 19398, rcvd 0, sent 11206, total: 0
clnt [Ongoing] DataQueued: qd 20422, rcvd 0, sent 11206, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 11206, sent 0, total: 0
clnt [Ongoing] DataSent: qd 20422, rcvd 0, sent 12230, total: 0
clnt [Ongoing] DataQueued: qd 20439, rcvd 0, sent 12230, total: 0
clnt [Ongoing] DataSent: qd 20439, rcvd 0, sent 13254, total: 0
clnt [Ongoing] DataSent: qd 20439, rcvd 0, sent 14278, total: 0
clnt [Ongoing] DataSent: qd 20439, rcvd 0, sent 14295, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 12230, sent 0, total: 0
clnt [TransferFinished] FinishTransfer: qd 20439, rcvd 0, sent 14295, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 13254, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 14278, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 15302, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 16326, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 17350, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 18374, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 19398, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 20422, sent 0, total: 0
prov [Ongoing] DataReceived: qd 0, rcvd 20439, sent 0, total: 0
clnt [Completing] ResponderCompletes: qd 20439, rcvd 0, sent 14295, total: 0
prov [Completing] Complete: qd 0, rcvd 20439, sent 0, total: 0
prov [Completed] CleanupComplete: qd 0, rcvd 20439, sent 0, total: 0
clnt [Completed] CleanupComplete: qd 20439, rcvd 0, sent 14295, total: 0

    integration_test.go:342: 
        	Error Trace:	integration_test.go:342
        	Error:      	Not equal: 
        	            	expected: []uint64{0x3c6, 0x7c6, 0xbc6, 0xfc6, 0x13c6, 0x17c6, 0x1bc6, 0x1fc6, 0x23c6, 0x27c6, 0x2bc6, 0x2fc6, 0x33c6, 0x37c6, 0x3bc6, 0x3fc6, 0x43c6, 0x47c6, 0x4bc6, 0x4fc6, 0x4fd7}
        	            	actual  : []uint64{0xbc6, 0x13c6, 0x1bc6, 0x1fc6, 0x23c6, 0x27c6, 0x2fc6, 0x37c6, 0x3fc6, 0x43c6, 0x4bc6, 0x4fc6, 0x4fd7, 0x4fd7, 0x4fd7}
        	            	
        	            	Diff:
        	            	--- Expected
        	            	+++ Actual
        	            	@@ -1,8 +1,4 @@
        	            	-([]uint64) (len=21) {
        	            	- (uint64) 966,
        	            	- (uint64) 1990,
        	            	+([]uint64) (len=15) {
        	            	  (uint64) 3014,
        	            	- (uint64) 4038,
        	            	  (uint64) 5062,
        	            	- (uint64) 6086,
        	            	  (uint64) 7110,
        	            	@@ -11,12 +7,10 @@
        	            	  (uint64) 10182,
        	            	- (uint64) 11206,
        	            	  (uint64) 12230,
        	            	- (uint64) 13254,
        	            	  (uint64) 14278,
        	            	- (uint64) 15302,
        	            	  (uint64) 16326,
        	            	  (uint64) 17350,
        	            	- (uint64) 18374,
        	            	  (uint64) 19398,
        	            	  (uint64) 20422,
        	            	+ (uint64) 20439,
        	            	+ (uint64) 20439,
        	            	  (uint64) 20439
        	Test:       	TestDataSentEvents
--- FAIL: TestDataSentEvents (0.01s)
FAIL

Debugger finished with exit code 0
@dirkmc
Copy link
Collaborator Author

dirkmc commented Dec 9, 2020

It seems that the root cause of the issue is that when the messagequeue sends a message, the message may contain several blocks, but only one "Sent" event is emitted.

func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage, topic Topic) bool {
err := mq.sender.SendMsg(mq.ctx, message)
if err == nil {
mq.eventPublisher.Publish(topic, Event{Name: Sent})
return true
}

When this event bubbles up to the responsemanager subscriber, it only includes the block data for the first block (not all blocks in the message)
https://github.com/ipfs/go-graphsync/blob/master/responsemanager/subscriber.go#L42

@hannahhoward
Copy link
Collaborator

Hey, so this exposes a flaw in the mappable subscriber system, and provides a pretext for refactoring the interface.

@hannahhoward
Copy link
Collaborator

hannahhoward commented Dec 9, 2020

@acruikshank I am recommending a fix to the way MappableSubscriber currently works, as well as some interface changes that will make things more clear:

These are my recommended new interfaces -- please don't take these set in stone and please feel free to improve naming. (taken from: https://github.com/ipfs/go-graphsync/blob/master/notifications/types.go)


// Topic is a topic that events appear on
type Topic interface{}

// Event is a publishable event
type Event interface{}

// TopicData is data added to every message broadcast on a topic
type TopicData interface{}

// Subscriber is a subscriber that can receive events
type Subscriber interface {
	OnNext(Topic, Event, ...TopicData)
	OnClose(Topic, ...TopicData)
}

// TopicDataSubscriber is a subscriber that can inject data to into subscriber callbacks when events happen for a given topic
type TopicDataSubscriber interface {
	Subscriber
	AddTopicData(topic Topic, data TopicData)
}

// Subscribable is a stream that can be subscribed to
type Subscribable interface {
	Subscribe(topic Topic, sub Subscriber) bool
	Unsubscribe(sub Subscriber) bool
}

// Publisher is an publisher of events that can be subscribed to
type Publisher interface {
	Close(Topic)
	Publish(Topic, Event)
	Shutdown()
	Startup()
	Subscribable
}

// EventTransform if a fucntion transforms one kind of event to another
type EventTransform func(Event) Event

// Notifee is a topic data subscriber plus a set of data you want to add to any topics subscribed to
// (used to call SubscribeWithData to inject data when events for a given topic emit)
type Notifee struct {
	Data      TopicData
	Subscriber TopicDataSubscriber
}

// SubscribeWithData subscribes to the given subscriber on the given topic, and adds the notifiees
// custom data into the list of data injected into callbacks when events occur on that topic
func SubscribeWithData(p Subscribable, topic Topic, notifee Notifee) {
	notifee.Subscriber.AddTopicData(topic, notifee.Data)
	p.Subscribe(topic, notifee.Subscriber)
}

--

I also recommend:

  • NewMappableSubscribe -> NewTopicDataSubscriber
  • Lose the event transform parameter to NewTopicDataSubscriber -- we never use this other than to pass a function that does no transformation
  • change the internal map from map[Topic]Topic to map[Topic][]TopicData -- note the fact that it's currently over written is actually the root cause of the issue

--

All of this will neccesitate changes to all of the subscribers. We will need to dispatch multiple BlockSent notifications in the subscriber for the responsemanager, which will ultimately be the solution to the root issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
need/triage Needs initial labeling and prioritization
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants