Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Client does not receive pubsub after api node reboots #3465

Closed
v-stickykeys opened this issue Dec 29, 2020 · 29 comments · Fixed by #3468
Closed

Client does not receive pubsub after api node reboots #3465

v-stickykeys opened this issue Dec 29, 2020 · 29 comments · Fixed by #3468
Labels
status/ready Ready to be worked

Comments

@v-stickykeys
Copy link

v-stickykeys commented Dec 29, 2020

  • Version:

For the Client

"ipfs-http-client": "48.1.1",

For the API

"ipfs": "0.52.2",
"ipfs-http-gateway": "0.1.3",
"ipfs-http-server": "0.1.3",
  • Platform:

Darwin MacBook-Pro.local 19.6.0 Darwin Kernel Version 19.6.0: Mon Aug 31 22:12:52 PDT 2020; root:xnu-6153.141.2~1/RELEASE_X86_64 x86_64

  • Subsystem:

pubsub

Severity: Medium

Description:

I'm running an IPFS client C in docker connected to an IPFS API node A also in docker. I'm also running an additional node on my local machine L.

C receives the api url of A and subscribes to topic. When I send messages to this topic from L I can see them coming through my handler on C.

If A stops running, C no longer receives messages from the topic.
When A restarts, C still does not receive messages from the topic--I must resubscribe to topic from C in order to start receiving messages again.

It seems the handler to ipfs.pubsub.subscribe somehow gets broken.


Steps to reproduce the error:

1. Start jsipfs daemon

DEBUG=ipfs* jsipfs daemon

2. Subscribe to pubsub topic from client

node client.js
// client.js

const ipfsClient = require("ipfs-http-client")

const topic = process.env.TOPIC || '/ceramic/dev-unstable'
const ipfsApiUrl = process.env.IPFS_API_URL || 'http://localhost:5002'

const run = async () => {
  const ipfs = ipfsClient({ url: ipfsApiUrl })

  const receiveMsg = (msg) => { console.log('receiving', msg) }

  let opts = {}
  // someone suggested including this handler but it isn't getting triggered
  // const onError = (error) => { console.error('caught error', error) }
  // opts = { onError }

  await ipfs.pubsub.subscribe(topic, receiveMsg, opts)
    .then(() => {
      console.log('subscribe success')
    })
    .catch((err) => {
      console.error('subscribe failure', err.message)
    })

  setInterval(async () => {
    await ipfs.pubsub.publish(
      topic,
      'val was here',
      { timeout: 500 })
  }, 5000);
}

run()

3. Stop the jsipfs daemon

You will see errors from the client script but keep it running..

4. Start the jsipfs daemon again

Errors from the client script stop, it keeps sending messages to the API (you should see debug logs with 200 responses)

But it stops receiving messages (from itself and from others--you can test this by running another node that sends messages to the topic)

Also onError mentioned below never gets triggered here

@v-stickykeys v-stickykeys added the need/triage Needs initial labeling and prioritization label Dec 29, 2020
@welcome
Copy link

welcome bot commented Dec 29, 2020

Thank you for submitting your first issue to this repository! A maintainer will be here shortly to triage and review.
In the meantime, please double-check that you have provided all the necessary information to make this process easy! Any information that can help save additional round trips is useful! We currently aim to give initial feedback within two business days. If this does not happen, feel free to leave a comment.
Please keep an eye on how this issue will be labeled, as labels give an overview of priorities, assignments and additional actions requested by the maintainers:

  • "Priority" labels will show how urgent this is for the team.
  • "Status" labels will show if this is ready to be worked on, blocked, or in progress.
  • "Need" labels will indicate if additional input or analysis is required.

Finally, remember to use https://discuss.ipfs.io if you just need general support.

@Gozala
Copy link
Contributor

Gozala commented Dec 31, 2020

I have no knowledge of history on this part of the code base, someone with more knowledge should chime. With that said, I did some investigation and as far as I can tell, it is combination of API design problem combined with lack of it's documentation. What follows is a breakdown

  • When client (C) creates subscription, it creates a request to the http endpoint on IPFS node (A).
    • Returned promise is resolved or failed when connection is established
  • Client (C) maintains long living connection and reading messages for a subscription as they arrive.
  • When server (A) is rebooted, connection from (A) is terminated which (I'm assuming, have not verified that) triggers undocumented options.onError handler.
  • When server (A) starts back up again client (C) no longer has connection to it so no messages are received.

@valmack from provided example I can see no onError is passed (which is not surprising given it's not documented). So I think fix here would be to have it in place and try to re-subscribe once server (A) is back online. I would also expect to see some errors in the console if onError isn't passed because as far as I can tell code just calls it assuming it's there.

What follows is API design critique and a proposal to improve it:

  1. It is just too easy to miss errors (if onError handler is missed)
  2. Successful promise creates false positive.
  3. No way of knowing when the subscription is done. In fact it could be that onError here is not even called because server just ends the connection & caller has no way of knowing.

I would like to propose API address those issues:

interface PubSub {
  /**
   * Creates a subscribtion for the given topic. You can provide your own custom queuingStrategy
   * if no queuingStrategy is supplied, the default used which just holds a single message in the
   * queue. Subscription will error if internal quee overflows.
   */
  subscribe <Topic extends string>(topic:Topic, options?:AbortOptions & QueuingStrategy):SubscriptionResult<Topic>
  /**
   * Returns all active subscribtions (excluding cancelled and not yet establed subscribtions, in other words `sub`
   * from `sub = pubsub.subscribe(subject)` is not included in `pubsub.ls()` until `await sub` resolves).
   */
  ls():Subscription<string>[]
  /**
   * Publish a data message to a pubsub topic.
   */
  publish(topic:string, data:Uint8Array, options?:AbortOptions):Promise<void>
}


/**
 * `SubscriptionResult` represents a `Subscription` that may fail (e.g. if http-client can't connect
 * to the API endpoint). For convenince it implemets `Subscription` intreface so user can start
 * consuming messages without having to await for `pubsub.subscribe`:
 * 
 * @example
 * ```js
 * try {
 *   for await (const msg of pubsub.subscribe(topic)) {
 *   }
 * } catch(error) {
 *   // handle failure
 * }
 * ```
 * 
 * However in some cases it is desired to tell if subscribtion was succesful without starting to consume it's messages. For
 * this reason `SubscriptionResult` also implemets promise interface:
 * @example
 * ```js
 * const subscribtion = await pubsub.subscribe(topic)
 * ```
 */
interface SubscriptionResult<Topic> extends Subscription<Topic> {
  then(succeed:(messages:Subscription<Topic>) => void, fail:(error:Error) => void): Promise<void>
  catch(fail:(error:Error) => void): Promise<void>
  finally <T>(then:() => void):Promise<void>
}


interface Subscription<Topic> {
  /**
   * Topic of this subscribtion, useful for bookeeping.
   */
  readonly topic: Topic
  
  /**
   * Whether or not subscribtion is locked by the reader (as one reader is allowed at a time)
   */
  readonly locked: boolean

  /**
   * Provides async iteration interface (via `for await`) over the messages of this subscribtion.
   * Obtains an exclusive lock preventing more then one consumer at a time.
   * Cancels subscribtion when async iteration is complete (e.g. via breaking a for await loop
   * or calling `return()` on the returned `SubscriptionIterator`).
   */
  [Symbol.asyncIterator](): SubscriptionIterator<Topic>

  /**
   * Just like  method above except it can be passed `{ preventCancel: true }`
   * in order to iterate over messages without cancelling this subscribtion.
   */
  values(options?:ReadableStreamIteratorOptions): SubscriptionIterator<Topic>

  /**
   * Just convenince function to cancel subscribtion.
   */
  cancel():void
}

interface SubscriptionIterator<Topic> {
  /**
   * Topic of this subscribtion.
   */
  readonly topic: Topic

  /**
   * Reads next message from the subscribtion.
   */
  next():Promise<IteratorResult<Message, void>>

  /**
   * Cancelles subscribtion unless `preventCancel` was used. Either way cancells itreation meaning
   * calling `next()` will return `{done: true}`.
   */
  return():Promise<IteratorReturnResult<void>>

  /**
   * Provides [Symbol.asyncIterator]()
   */
  [Symbol.asyncIterator](): this
}



interface Message {
  from: string
  topicIDs: string[]
  data: Uint8Array
  seqno: Uint8Array
}

// Same as in ReadableStream's
// See: https://streams.spec.whatwg.org/
interface ReadableStreamIteratorOptions {
  preventCancel: boolean
}


interface AbortOptions {
  timeout?: number
  signal?: AbortSignal
}

Such API would avoid all the above pitfalls:

  1. With such an API it is a lot more difficult to miss an error because they propagate into for await loops which are used for reading messages.
    try {
      for await (const message of pubsub.subscribe(topic)) {
        console.log(message)
      }
    } catch(error) {
      // Error has occurred, subscription is cancelled
    }
  2. There are no false positives here, because unlike combination of message & error handlers single return value encompasses all.
  • Not that it does not require double await
    for await (const message of ipfs.pubsub.subsrcibe(topic)) {
      console.log(message)
    }
  • It is however possible to await for subscription to be complete without having consuming any messages or having to unwind them
    const messages = await ipfs.pubsub.subcribe(topic)
  1. It is straight forward to know when subscription is over:
    console.log(`start ${topic}`)
    for await (const message of ipfs.pubsub.subsrcibe(topic)) {
      console.log(message)
    }
    console.log(`end ${topic}`)
    • And handle ended subscribtions whether due to error with finally (see examples below)

Here are couple more examples illustrating various points

// Example which consumes message for the pubsub topic recovering from errors by
// resubscribing
const forEach = async (pubsub:PubSub, topic:string, fn:(message:Message) => void) => {
  try {
    for await (const message of pubsub.subscribe("hello")) {
      fn(message)
    }
  } finally {
    // Restart subscsribtion if it finished for whatever reason. You should probably use
    // something like exponential beckoff strategy here instead of just doing it right away.
    forEach(pubsub, topic, fn)
  }
}

// Example that drops `n` messages from the subscribtion
const dropN = async (pubsub:PubSub, topic:string, n:number) => {
  const subscribtion = pubsub.subscribe(topic)
  // Use preventCancel to prevent subscribtion from been cancelled
  for await (const _ of subscribtion.values({ preventCancel: true })) {
    if (--n == 0) {
      break
    }
  }

  // return subscribtion with rest of the messages
  return subscribtion
}

// Exmaple which creates multiple subscribtion but only if all are succesful and
// without consuming ony of the messages.
const subs = async function * (pubsub:PubSub, topics:string[]) {
  const subscribtions = []
  try {
    for (const topic of topics) {
      // use await here to wait for subscsribtion to be succesful
      subscribtions.push(await pubsub.subscribe(topic))
    }
  } catch (error) {
    // If one of the subscribtions failed cancel ones that were succesful
    for (const subscribtion of subscribtions) {
      subscribtion.cancel()
    }
    throw error
  }
  
  return subscribtions
}

@Gozala
Copy link
Contributor

Gozala commented Dec 31, 2020

Just confirmed that onError is triggered when connection node is stopped, here is an example
https://observablehq.com/@gozala/ipfs-pubsub-example

and what I see after publishing two messages and then stopping deamon

Screen Shot 2020-12-30 at 4 38 59 PM

@Gozala
Copy link
Contributor

Gozala commented Dec 31, 2020

In the process I also have noticed that onError function is serilaized and passed as query param to the endpoint, which is a bug to be fixed

Screen Shot 2020-12-30 at 4 46 00 PM

@v-stickykeys
Copy link
Author

v-stickykeys commented Dec 31, 2020

@Gozala thank you for your investigation into this! Can you tell me what package and version you are using for the ipfs client in your test?

I am still having the problem with my code below and onError is not being called--I also am yet to find a reference to onError in the source code. Can you point me to it? or perhaps it does not exist on ipfs-http-client

const ipfsClient = require("ipfs-http-client")

const topic = process.env.TOPIC || '/ceramic/dev-unstable'
const ipfsApiUrl = process.env.IPFS_API_URL || 'http://localhost:5002'

const run = async () => {
  const ipfs = ipfsClient({ url: ipfsApiUrl })

  const receiveMsg = (msg) => { console.log('receiving', msg) }
  const onError = (error) => { console.error('caught error', error) }

  await ipfs.pubsub.subscribe(topic, receiveMsg, { onError })
    .then(() => {
      console.log('subscribe success')
    })
    .catch((err) => {
      console.error('subscribe failure', err.message)
    })

  setInterval(async () => {
    await ipfs.pubsub.publish(
      topic,
      'val was here',
      { timeout: 500 })
  }, 5000);
}

run()

@oed
Copy link
Contributor

oed commented Dec 31, 2020

@Gozala I'm unable to replicate the behavior that you are describing with the onError callback. Tested with running both ipfs daemon and jsipfs daemon.

in nodejs:

> const ipfs = require('ipfs-http-client')('http://localhost:5002')
undefined
> await ipfs.pubsub.subscribe('a', console.log, { onError: console.log })
undefined
> {
  from: 'QmXoQz228wVdWxCgxZnKUWLFZX34yg4crpKRipXba8A2dB',
  data: Uint8Array(4) [ 97, 115, 100, 102 ],
  seqno: Uint8Array(8) [
    212, 241,  33,  75,
    148,  75, 180, 232
  ],
  topicIDs: [ 'a' ]
}
$ jsipfs --version
0.52.3
$ ipfs version
ipfs version 0.6.0
$ npm list ipfs-http-client
[email protected] /Users/oed/3box/nodelek
└── [email protected]

@Gozala
Copy link
Contributor

Gozala commented Dec 31, 2020

@oed @valmack that linked example in observablehq loads http client from https://unpkg.com/[email protected]/dist/index.min.js and I was running [email protected] daemon.

  1. As per screenshot I was able to get two published messages.
  2. Then I stopped daemon.
  3. And got the onError also visible in the screenshot.

I could also see in the network panel when the connection is dropped.

I am still having the problem with my code below and onError is not being called--I also am yet to find a reference to onError in the source code. Can you point me to it?

readMessages(response.ndjson(), {
onMessage: handler,
onEnd: () => subsTracker.unsubscribe(topic, handler),
onError: options.onError

} catch (err) {
err.message = `Failed to parse pubsub message: ${err.message}`
onError(err, false, msg) // Not fatal
}
}
} catch (err) {
// FIXME: In testing with Chrome, err.type is undefined (should not be!)
// Temporarily use the name property instead.
if (err.type !== 'aborted' && err.name !== 'AbortError') {
onError(err, true) // Fatal
}

Are you running client in nodejs ? I have not tried that yet & there is a chance that node-fetch has bug and unlike browser it does not produce an error (I’ll check, both Firefox & Chrome reported errors as expected). I also have not tried with js-ipfs daemon and there is a chance it behaves different there, so I’ll check that as well.

How are you two going about restarting a (CA) node ? Maybe that causes some behavioral difference. If you want to try and debug locally on your end I’d suggest looking at what is happening with the http request for the pubsub/sub endpoint as it is where network error should occur and propagate all the way to the onError handler.

@v-stickykeys
Copy link
Author

v-stickykeys commented Dec 31, 2020

@Gozala Coincidentally I just found the onError you are talking about.

I am running client in Node.js and have used jsipfs deamon cli and from dockerized node app. onError is not getting called in both cases likely because I'm restarting the API (A) / daemon not the client.

@Gozala
Copy link
Contributor

Gozala commented Dec 31, 2020

both cases likely because I'm restarting the API (A) / daemon not the client.

I meant to say (A) in my previous comment as well. Working on code sandbox example for this.

@Gozala
Copy link
Contributor

Gozala commented Dec 31, 2020

Looks like http-client in node, unlike in browsers, is not triggering onError. Here is the reproducible example
https://codesandbox.io/s/nifty-heyrovsky-w5t8g?file=/src/main.js

I suspect an issue in node-fetch, I’ll investigate this further & report updates here. My guess is it’s treated as clean connection close that calls onEnd:

Which just causes an unsubscribe

onEnd: () => subsTracker.unsubscribe(topic, handler),

@Gozala
Copy link
Contributor

Gozala commented Dec 31, 2020

Created a reproducible test case for this issues, which as expected fails on node but passes in browsers. Expect fix in that pull #3468

@Gozala
Copy link
Contributor

Gozala commented Jan 1, 2021

As it was expected problem is coming from node-fetch, which does not seem to check if server fully transmitted a message before a connection was terminated
completed https://nodejs.org/dist/latest-v15.x/docs/api/http.html#http_message_complete

https://github.com/node-fetch/node-fetch/blob/b5e2e41b2b50bf2997720d6125accaf0dd68c0ab/src/index.js#L181-L184

However upstream node-fetch (we seem to be on 2.6.1) seems to have switched to stream pipeline API, which I do not believe would handle this either, but all this makes fixing this quite a bit more challenging.

That said we could possibly avoid all this by either:

  1. Switching to proposed API or alike (which clearly signals when thing is ended even if without error).
  2. Instead of just ending and unsubscribing when request trigger onError instead. I think it is reasonable to say that closing subscription connection is kind of an error (unless intentionally closed from client).
  3. Provide something like onEnd option to let user know when subscription is complete.

That is not to say let's ignore the node-fetch problem, on the contrary we should try to get it fixed, but it seems reasonable to pursue fixing issue here without blocking all node-fetch fix, that is because even with node-fetch fixed connection still may be closed on the server cleanly (I would actually argue that if node is stopped as opposed to crash it should probably close connections gracefully) in which case onError will not occur and user code will not be able to know that subscription is finished.

https://github.com/node-fetch/node-fetch/blob/d016690cd3ff3cfe48e87ff522379b4c85e402a9/src/index.js#L188

@Gozala
Copy link
Contributor

Gozala commented Jan 1, 2021

Reported issue to node-fetch node-fetch/node-fetch#1055 with reproducible test case attached node-fetch/node-fetch#1056

It also appears that node-fetch internally does some additional checks when response.body is read through other methods e.g. response.text(). We probably do a something thing

https://github.com/node-fetch/node-fetch/blob/d016690cd3ff3cfe48e87ff522379b4c85e402a9/src/body.js#L218-L230

In http module of ipfs-utils when end of the response.body is reached we could check if body.readableEnded === true || body._readableState.ended === true and if so thrown an error instead of passing {done: true}.

https://github.com/ipfs/js-ipfs-utils/blob/382f79b8d261d2d2205b05565232e7d1364917bc/src/http.js#L265

@Gozala
Copy link
Contributor

Gozala commented Jan 7, 2021

@vasco-santos do you mind sharing your feedback regarding proposed API #3465 (comment) for addressing usability issues.

The reason I am asking is, having started with implementation, I keep feeling that maybe all this should go into libp2p instead and just be exposed directly rather than just wrapping current API with proposed one.

@vasco-santos
Copy link
Member

@Gozala Just read through your proposal, thanks for pinging me.

I think that this approach makes sense taking into account all the pitfalls when forwarding libp2p pubsub messages from the node to the IPFS client. The reason for this is that in the IPFS client an HTTP Req happens to subscribe and the client will need to wait for messages to be received under a specific form, which is not valid as typical events. However, this approach takes some assumptions that will make more complex other pubsub simple use cases.

Having an API where subscribe returns an AsyncIterator is something that we thought in the past. However, this will mean that subscribe usage will be less flexible compared to the current API. Libp2p pubsub interface has the subscribe returning a void, which means that the subscribe messages were sent to the connected peers. With this, subscribed pubsub messages in a given topic will be consumed via EventEmitter events. Typically, libp2p dapps and browser apps leverage this feature by having the subscription trigger and subscription listener in different components of their application. If we move into returning the subscription messages via the AsyncIterator, this easy user land flow will be broken and users will need to move messages around according to their needs. Moreover, what you expect to happen once multiple subscriptions happen to the same topic at the same time, as a way to have "listeners" in multiple places? We would probably need to have an abstraction layer using the same EventEmitter approach that we currently do.

It is straight forward to know when subscription is over:

Subscriptions should be long lived and valid until the unsubscribe(topic) is called. This is also a reason that makes me feel that the usability of blocking a code flow in a for await (const msg of libp2p.pubsub.subscribe(topic) is strange and users will basically need to isolate that in a function.

The errors concerns are a good point. From the libp2p side of things, errors can occur when sending messages with the subscriptions and on receiving/parsing received messages. While the first might be worth throwing an error (specially if we cannot send a subscription message to any peer -- we currently don't do this!), the second should just be catched and logged for possible inspection. We will not throw on errors when handling messages received.

@Gozala
Copy link
Contributor

Gozala commented Jan 11, 2021

client will need to wait for messages to be received under a specific form, which is not valid as typical events. However, this approach takes some assumptions that will make more complex other pubsub simple use cases.

Could you please ellaborate on this a bit I'm not sure what you mean by non valid typical events or what are those simple use cases.

Having an API where subscribe returns an AsyncIterator is something that we thought in the past. However, this will mean that subscribe usage will be less flexible compared to the current API. Libp2p pubsub interface has the subscribe returning a void, which means that the subscribe messages were sent to the connected peers. With this, subscribed pubsub messages in a given topic will be consumed via EventEmitter events. Typically, libp2p dapps and browser apps leverage this feature by having the subscription trigger and subscription listener in different components of their application.

That is still possible it's just an actor doing subscription will have to pass "subscription" object to an actor doing listening. Which as I understand is other way round now, where listener needs to pass a handler that then subscriber will use to subscribe with, but still separation of concerns is there.

If we move into returning the subscription messages via the AsyncIterator, this easy user land flow will be broken and users will need to move messages around according to their needs. Moreover, what you expect to happen once multiple subscriptions happen to the same topic at the same time, as a way to have "listeners" in multiple places? We would probably need to have an abstraction layer using the same EventEmitter approach that we currently do.

It is true that it is different from EventEmitter but I do not think it is different in terms of capabilities. For the sake of discussion let's assume that subscribe under the hood creates a channel, that is a pair of {readable, writable} streams (a.k.a transform streams in whatwg speak). What subscribe could do is following:

  1. Check if topic already has an associated channel.
  • If channel exists, take available read stream and it tee it replacing available stream with on stream and handing back the other.
  • send sub message to connected peers
  • otherwise create a new channel
    • tee readable
    • store one into available stream slot
    • store writable end to the writable slot
    • return the other
  • write received messages to the writable end of the channel

Additional benefit is that once all readers are done, writer end gets aborted and do necessary cleanup.

Subscriptions should be long lived and valid until the unsubscribe(topic) is called. This is also a reason that makes me feel that the usability of blocking a code flow in a for await (const msg of libp2p.pubsub.subscribe(topic) is strange and users will basically need to isolate that in a function.

They might indeed, but I do not think that is inherently bad, as that forces error handling.

While the first might be worth throwing an error (specially if we cannot send a subscription message to any peer -- we currently don't do this!), the second should just be catched and logged for possible inspection. We will not throw on errors when handling messages received.

Honestly that makes me think something along these lines of the example below might be even better, because there is nothing worth than errors that you can not act upon

try {
  for await (const message of libp2p.pubsub.subscribe(topic)) {
     try {
       const data = message.decode()
       // use data
     } catch(parseError) {
       console.error(parseError)
     }
  }
} catch(networkError) {
  // either recover or loose susbscribtion
}

P.S: I mentioned readable and writable streams, not implying using them but rather to convey idea of channel / pipe where one end reads what the other writes into. Which is conceptually not that different from event emitters except they decouple reader and writer ends and tend to encourage channel per "event type" as opposed to multiplex channels, which is also possible.
P.P.S: It is also worth considering that streams are standard as opposed to event emitters that are widely used but are non-standard and have an incompatible DOM Event system in browsers.

@vasco-santos
Copy link
Member

Could you please ellaborate on this a bit I'm not sure what you mean by non valid typical events or what are those simple use cases.

What I mean is that you can do something as follows:

cons topic = 'example-topic'
await libp2p.pubsub.subscribe(topic)

// In any other places in the codebase with the libp2p reference you can simply do multiple times:
libp2p.pubsub.on(topic, (msg) => {})

You can add event listeners for the same topic in multiple places of the codebase and they can be setup even before the subscribe happens (I know, this might also be problematic if the subscribe does not happen and create false expectations).

I see quite often in dapps using pubsub approaches where there are multiple topic handlers for the same subscription.

That is still possible it's just an actor doing subscription will have to pass "subscription" object to an actor doing listening. Which as I understand is other way round now, where listener needs to pass a handler that then subscriber will use to subscribe with, but still separation of concerns is there.

Yes of course this can be done in the user layer. The biggest concern I have with this change is the big breaking change that will happen in projects using libp2p pubsub that might need to completely change their component orchestrating, in order to move receive pubsub messages around. I can see this as a daunting task that might get projects to delay updating libp2p.

In my opinion, this is an easy to use and flexible API compared to returning an async iterator that will require users to propagate data around their codebase. I can see advantages in returning async iterators as you suggest.

Additional benefit is that once all readers are done, writer end gets aborted and do necessary cleanup.

This is a + compared to requiring users to remove event handlers when they unsubscribe/stop.

Honestly that makes me think something along these lines of the example below might be even better, because there is nothing worth than errors that you can not act upon

I totally agree with this in the context of the IPFS client. There might be network errors when dealing with connections between the IPFS client and an IPFS daemon. However, what types of errors might exist from libp2p sending subscribe messages to all connected peers? For instance, if a given libp2p node is not connected to any other node running pubsub and a libp2p.pubsub.subscribe() happens, no error will happen even with any subscribption message being sent. The reason for this is that the subscriptions are kept locally and once new connections happen, the subscribtion messages are exchanged. Moreover, if any pubsub message fails to be sent for some reason, it should not break a pubsub flow and the message is just discarded.

Also FYI, taking into account this change in the context of the client, we currently have the API proposed here in the libp2p-daemon-client. This might have network errors related to the libp2p-daemon <-> libp2p-daemon-client communication that are included. A good reason to have libp2p.pubsub.subscribe to return an async iterator would be have a consistent API.

Overall, I am not totally against returning an Async Iterator. Probably I would prefer that approach if we were creating this now. However, taking into consideration the pros and cons including the migration that will need to happen, I think we should not do this now. We can keep it in mind as an improvement.

@Gozala
Copy link
Contributor

Gozala commented Jan 12, 2021

Yes of course this can be done in the user layer. The biggest concern I have with this change is the big breaking change that will happen in projects using libp2p pubsub that might need to completely change their component orchestrating, in order to move receive pubsub messages around. I can see this as a daunting task that might get projects to delay updating libp2p.

That is a fare point. I think there are couple of options available however:

  1. Just provide an API adapter so that existing users can continue using API, and update at their leisure.
  2. Try this with ipfs first, if successful migrate new API into libp2p in the future (still providing backwards compatibility adapter to users)

I totally agree with this in the context of the IPFS client. There might be network errors when dealing with connections between the IPFS client and an IPFS daemon. However, what types of errors might exist from libp2p sending subscribe messages to all connected peers? For instance, if a given libp2p node is not connected to any other node running pubsub and a libp2p.pubsub.subscribe() happens, no error will happen even with any subscribption message being sent. The reason for this is that the subscriptions are kept locally and once new connections happen, the subscribtion messages are exchanged. Moreover, if any pubsub message fails to be sent for some reason, it should not break a pubsub flow and the message is just discarded.

What about actual platform errors, e.g. dropped connections with inability to reopen them. While it sounds fairly unlikely and argument could be made to just try creating connections in the future, it also might be warranted to produce a subscription error allowing user space code to act (e.g. suggest user to troubleshoot network connectivity).

On the flip side we can also make HTTP client, try to reconnect instead of failing, in fact me and @hugomrdias were considering to do just that. However I can shake a feeling that it's best to let user space code act upon errors even if they just retry subscription.

Overall, I am not totally against returning an Async Iterator. Probably I would prefer that approach if we were creating this now. However, taking into consideration the pros and cons including the migration that will need to happen, I think we should not do this now. We can keep it in mind as an improvement.

Makes sense, thanks for providing insightful feedback.

@vasco-santos
Copy link
Member

Try this with ipfs first, if successful migrate new API into libp2p in the future (still providing backwards compatibility adapter to users)

Let's try this with IPFS first then. I think we can have the backwards compatibility adapter as you suggest later on.

What about actual platform errors, e.g. dropped connections with inability to reopen them. While it sounds fairly unlikely and argument could be made to just try creating connections in the future, it also might be warranted to produce a subscription error allowing user space code to act (e.g. suggest user to troubleshoot network connectivity).

That should probably be part of the subsystem itself (combined with the connection Manager), not a subscription. Managing the open connections of a subsystem, deciding if proactively we should try to connect to more pubsub peers, trim connections, retry connections should not affect the API consumption. We should have a more clear picture of how to handle possible errors on this regards once I start to work on the connection manager overhaul that will look into this specificities.

On the flip side we can also make HTTP client, try to reconnect instead of failing, in fact me and @hugomrdias were considering to do just that. However I can shake a feeling that it's best to let user space code act upon errors even if they just retry subscription.

I think that we should also do that, but we should also change the pubsub API in this context, even if the client iself can recover.

@achingbrain
Copy link
Member

Pubsub is a weird bit of the API, in the long term I'd like to remove it from the top-level ipfs object and have people use the .libp2p property directly. I think we should reduce the scope of the IPFS Core-API to just files (e.g. add, cat, ls, files.*) wherever possible.

I like @Gozala's suggestion, this seems natural to me:

try {
  for await (const message of ipfs.pubsub.subscribe('a-topic')) {
    // ...
  }
} catch (err) {
  // ...
}

As an addition, I like the idea of breaking out of the loop to unsubscribe if the subscription is an async generator:

const subscription = {
  [Symbol.asyncIterator]: () => subscription,

  async next () {
    //...
  },

  return () {
    // do unsubscribe here
  }
}

for await (const message of subscription) {
  break // or throw or what have you
}

// no longer subscribed

Though we'd have to think about what it means to be unsubscribed if there are multiple subscribers within an app context.

I have questions about this:

const subscription = await ipfs.pubsub.subscribe('a-topic')

for await (const message of subscription) {
  // ...
}

What happens when messages arrive between the subscribe and the for await..of? Do they get cached? How many get cached? What happens to messages that exceed the cache size? What happens if the server goes away during this time?

This could be exacerbated by the user doing async work between the pubsub.subscribe and the for await..of.

what you expect to happen once multiple subscriptions happen to the same topic at the same time

I think we can handle this in an efficient way internally as @Gozala suggests.

Provide something like onEnd option to let user know when subscription is complete

How do you define 'complete' here? Topics are global within your peers or overlay, what's to stop another peer sending messages? Or do you just mean it gets invoked when pubsub.unsubscribe gets called elsewhere?


That said, the issue here is that the ipfs-http-client user does not receive an error message when the node it's connected to goes away.

On the flip side we can also make HTTP client, try to reconnect instead of failing

I don't think we should do this, at least not by default - there are no guarantees reconnecting will be possible, there are loads of different strategies to use etc, I think it'd be better if we fail fast and leave what to do next up to the user.

To move this forward, I think we should:

  1. Fix up the onError callback so it's useful and can be used to know when to attempt to reconnect to the remote server
  2. In a future release, convert the return value of pubsub.subscribe as @Gozala suggests
  3. In a future, future release, move that API into libp2p itself and remove the pubsub API from ipfs-core

@Gozala Gozala added status/ready Ready to be worked and removed need/triage Needs initial labeling and prioritization labels Jan 14, 2021
@Gozala
Copy link
Contributor

Gozala commented Jan 15, 2021

How do you define 'complete' here? Topics are global within your peers or overlay, what's to stop another peer sending messages? Or do you just mean it gets invoked when pubsub.unsubscribe gets called elsewhere?

Basically it means you will no longer get messages on this subscription handler, it could be due to unsubscribe or due to some error. Either way it's a signal that if you want to get more messages on this topic with this handler you've got to resubscribe.

@Gozala
Copy link
Contributor

Gozala commented Jan 15, 2021

What happens when messages arrive between the subscribe and the for await..of? Do they get cached? How many get cached? What happens to messages that exceed the cache size? What happens if the server goes away during this time?

I think communicating sequential processes (CSP) is a perfect solution here. Essentially you want to put a some queue between producer/writer and consumer/reader so they can act on their own schedule. In other words incoming messages get queued until buffer is filled and then subscription could be terminated. This would allow consumer to either:

  1. Increase queue capacity as necessary.
  2. Process incoming messages asynchronously.
  3. Reestablish subscription when they are dropped due to filled up queue.
    • In practice I expect combination of 3 & 1 where queue capacity is increased if it proves inadequate for the load.

@oed
Copy link
Contributor

oed commented Mar 25, 2021

Any update here @Gozala? Looks like the PR with a fix was merged, although I'm not sure if it has been released yet: node-fetch/node-fetch#1064

@Gozala
Copy link
Contributor

Gozala commented Mar 25, 2021

@oed thanks for the reminder, it fall of my radar as I got busy with other things. Picking it up to see what's needed to get it finished. #3468 is PR to watch.

@oed
Copy link
Contributor

oed commented May 8, 2021

We should try to figure out what the right approach here is. Seems like the fix to node-fetch won't be backported to 2.x.x (node-fetch/node-fetch#1064 (comment)) which is very unfortunate. It also looks like v3 will only happen later this year (node-fetch/node-fetch#668 (comment)).

What are the options we have here @Gozala? This issue is causing a bunch of trouble for us on our infrastructure.

@Gozala
Copy link
Contributor

Gozala commented May 12, 2021

@oed I apologize for not getting around to finishing this up, there had been a reorg and I find myself in a position where I have almost no time to work on js-ipfs.

In terms of what’s remaining this PR #3468 just needs to be updated and merged. I will do it as soon as I’ll get a chance.

@BigLep can we put this on the project board to make sure this gets enough attention.

@oed
Copy link
Contributor

oed commented May 12, 2021

So #3468 can be fixed without the fix in node-fetch? That's great if true! 🙏

@Gozala
Copy link
Contributor

Gozala commented May 12, 2021

So #3468 can be fixed without the fix in node-fetch? That's great if true! 🙏

It is kind of workaround, but I think it is reasonable one, it will make sure that onError fires when subscription will no longer produce any output.

@stbrody
Copy link

stbrody commented May 26, 2021

From @Gozala on discord:
Turns out I still need that fix from node-fetch to make it all work

So it sounds like getting node-fetch/node-fetch#1064 backported to node-fetch v2 might be our only hope here?

achingbrain added a commit that referenced this issue Jun 1, 2021
…ed (#3468)

This change fixes #3465 by upgrading to a temporary fork of node-fetch with
node-fetch/node-fetch#1172 applied.

Co-authored-by: achingbrain <[email protected]>
SgtPooki referenced this issue in ipfs/js-kubo-rpc-client Aug 18, 2022
…ed (#3468)

This change fixes #3465 by upgrading to a temporary fork of node-fetch with
node-fetch/node-fetch#1172 applied.

Co-authored-by: achingbrain <[email protected]>
@tinytb tinytb moved this to Done in IP JS (PL EngRes) v2 Oct 14, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
status/ready Ready to be worked
Projects
No open projects
Archived in project
Development

Successfully merging a pull request may close this issue.

6 participants