Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kad): Implement ability to acknowledge AddProvider before reading further messages from substream #3468

Conversation

nazar-pc
Copy link
Contributor

@nazar-pc nazar-pc commented Feb 15, 2023

Description

The goal is to make it possible for AddProvider to be processed asynchronously. It is a special case in Kademlia because there is no response expected.

While it works fine with in-memory storage, disk-based and other kinds of storage present challenges here and it might be desirable to do it asynchronously, but this is a challenge because Kademlia implementation before this PR will simply move on and will accept ever growing announcements, which may result in DoS.

The approach here is to process AddProvider with events (KademliaStoreInserts::FilterBoth) and keep introduced event guard alive for corresponding event as long as necessary. As long as guard is not dropped, corresponding incoming stream will be in pending state and will not receive new incoming messages from sender. Once guard is dropped incoming stream will continue working normally.

Notes

This could have been implemented with an additional message passing, but I find it more cumbersome to implement and to use, thus prefer RAII whenever possible for these things.

Opening as draft for now since due to lack of stream reuse this resurrects following error in my environment:

New inbound substream to PeerId("X") exceeds inbound substream limit. No older substream waiting to be reused. Dropping new substream.

It was partially mitigated in #3287, but never resolved fully and we acknowledged back then that there is a small chance it might still happen. Here chance is 100% under right conditions because incoming streams will be in pending state indefinitely, while sender will assume stream is closed already because they did it already.

This is fixed by stream reuse implemented in #3474.

Note that we don't HAVE TO expose Arc<InboundStreamEventGuard> publicly, we could make it anonymous with Arc<dyn Any + Send + Sync + 'static>, but it looked not as nice to me, so I decided to leave it exposed.

Links to any relevant issues

Primarily #3411

Open Questions

Change checklist

  • I have performed a self-review of my own code
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works
  • A changelog entry has been made in the appropriate crates

@nazar-pc nazar-pc changed the title Implement ability to acknowledge AddProvider before reading further messages from substream feat(kad): Implement ability to acknowledge AddProvider before reading further messages from substream Feb 15, 2023
@nazar-pc nazar-pc marked this pull request as draft February 15, 2023 15:52
@thomaseizinger
Copy link
Contributor

thomaseizinger commented Feb 15, 2023

Thank you for submitting this. I feel uneasy about introducing yet another pattern. The solution is novel but I don't think we have an RAII guards so far?

@mxinden What are your thoughts on this vs a Arc<Mutex<dyn RecordStore>> that is shared between all connections?

The latter would also have the benefit of greatly simplifying libp2p-kad as it would allow us to process the messages right in the connection task.

It comes at the expense of removing RecordStoreInserts as we'd no longer pass the message up to the NetworkBehaviour.

@melekes How do you use RecordStore? Would you benefit from a design with async & fallible functions?

@divagant-martian Do you use libp2p-kad? Do you have an opinion on the design of RecordStore?

Further reading: #3411

@nazar-pc
Copy link
Contributor Author

The solution is novel but I don't think we have an RAII guards so far?

I believe all of the streams have RAII guards as discussed on Rust maintainers call recently, so it is not entirely new to the codebase. Though I can certainly refactor it to use something else if that is desirable.

Arc<Mutex<dyn RecordStore>>

I'm not sure how it'll help, we still want to be able to read and write multiple things to/from record store, being behind mutex means it'll not allow for concurrency. In this case it should be just Arc<dyn RecordStore> and let record store manage concurrency internally if necessary.

Though this is a departure from current design where boxed values are rare. Also while at it I think RecordStore itself can be made async with async_trait and errors, but then it is quite large refactoring.

Alternatively I recall there was a thought somewhere to remove RecordStore entirely and let users handle events instead as the only canonical way to do this. I'd be in favor of such design actually, it'll make things a bit more complex in trivial case for users, but at the same time simplify in others and simplify codebase of libp2p a bit. Then we could even use RAII instead of message IDs there and provide objects that are capable of sending responses, unifying handling of both values and providers.

@thomaseizinger
Copy link
Contributor

thomaseizinger commented Feb 15, 2023

The solution is novel but I don't think we have an RAII guards so far?

I believe all of the streams have RAII guards as discussed on Rust maintainers call recently, so it is not entirely new to the codebase.

I am not sure if the comparison is 100% on point. Here the "guard" is a token that is passed along but otherwise not in use whereas a stream will free resources in Drop.

Arc<Mutex<dyn RecordStore>>

I'm not sure how it'll help, we still want to be able to read and write multiple things to/from record store, being behind mutex means it'll not allow for concurrency. In this case it should be just Arc<dyn RecordStore> and let record store manage concurrency internally if necessary.

If your DB is faster than the network, the mutex doesn't harm. If the DB is slower than the network, we will backpressure to the remote. Isn't that what we want?

Though this is a departure from current design where boxed values are rare. Also while at it I think RecordStore itself can be made async with async_trait and errors, but then it is quite large refactoring.

Minor correct: Everything regarding streams uses boxing and Arc<Mutex> internally, so it wouldn't be much of a departure.

Alternatively I recall there was a thought somewhere to remove RecordStore entirely and let users handle events instead as the only canonical way to do this. I'd be in favor of such design actually, it'll make things a bit more complex in trivial case for users, but at the same time simplify in others and simplify codebase of libp2p a bit. Then we could even use RAII instead of message IDs there and provide objects that are capable of sending responses, unifying handling of both values and providers.

That is another idea yes. That is what I am currently prototyping in mxinden/asynchronous-codec#5. I hadn't considered exposing the ResponsePlaceholder all the way to the user but I guess if we can remove RecordStore as a result, it may actually be worthwhile. That solution would have the same back-pressure properties as long as we send () back as the response to the handler but without actually sending a response on the wire. I'd have to check how that pans out.

@nazar-pc nazar-pc force-pushed the support-acknowledgement-for-kademlia-events branch from e9340b9 to 1f73380 Compare February 15, 2023 22:01
@nazar-pc
Copy link
Contributor Author

Found some issues with older revision, replaced with a bit better one.

If your DB is faster than the network, the mutex doesn't harm. If the DB is slower than the network, we will backpressure to the remote. Isn't that what we want?

The issue here is that all accesses to record store will be sequential. While SSDs are optimized to do parallelized work. Not to say that "just DB" is an oversimplification in our protocol, we are also doing other things that can take seconds and we certainly don't want to block any access to record store for the duration of it. SSDs are optimized for concurrent access, it would be a shame to not take advantage of it.

@thomaseizinger
Copy link
Contributor

If your DB is faster than the network, the mutex doesn't harm. If the DB is slower than the network, we will backpressure to the remote. Isn't that what we want?

The issue here is that all accesses to record store will be sequential. While SSDs are optimized to do parallelized work. Not to say that "just DB" is an oversimplification in our protocol, we are also doing other things that can take seconds and we certainly don't want to block any access to record store for the duration of it. SSDs are optimized for concurrent access, it would be a shame to not take advantage of it.

Are you saying that some RecordStore implementations would be able to operate in a lock-less fashion without running into race conditions?

Wouldn't it be better to limit the scope of RecordStore to just the critical section of updating the provided record? If you want to do additional work as part of this, you can always spawn more tasks from a call to RecordStore::put_value right? Unless I am misunderstanding something, it makes sense for the interface to take &mut self to ensure consistency of the RecordStore through the type-system.

@nazar-pc
Copy link
Contributor Author

Are you saying that some RecordStore implementations would be able to operate in a lock-less fashion without running into race conditions?

Absolutely! RocksDB, ParityDB and probably most other databases use &self for read/write operations. But in my case the pipeline is much larger, it involves not only database, but also networking requests and various other operations that I'd like to do and if I do that inside RecordStore implementation then everything else will be blocked in the meantime.

I could spawn tasks, but the API would have to be async anyway because creating unbounded number of tasks is not an option. But either way single read or write to the database is a no-go for our use case.

@thomaseizinger
Copy link
Contributor

thomaseizinger commented Feb 16, 2023

Are you saying that some RecordStore implementations would be able to operate in a lock-less fashion without running into race conditions?

Absolutely! RocksDB, ParityDB and probably most other databases use &self for read/write operations. But in my case the pipeline is much larger, it involves not only database, but also networking requests and various other operations that I'd like to do and if I do that inside RecordStore implementation then everything else will be blocked in the meantime.

I could spawn tasks, but the API would have to be async anyway because creating unbounded number of tasks is not an option. But either way single read or write to the database is a no-go for our use case.

I am not suggesting you should do all your networking etc in the RecordStore. On the contrary, I am suggesting to have a minimal RecordStore that operates as the source of truth and once updated, more follow-up work can be started.

Can you expand why that is not possible in your situation?

Without knowing all requirements of your usecase, my first idea for a system that needs to be optimised for throughput would be to have mutations of the RecordStore use an append-only log and asynchronously update a read-model that is used for serving requests. That introduces eventual consistency but that might be an okay trade-off. In exchange, updates to the store will be about as fast as they can get.

Other work like network requests can be asynchronously propagated from this event-log.

@melekes
Copy link
Contributor

melekes commented Feb 16, 2023

@melekes How do you use RecordStore?

we're just reading / writing records

Would you benefit from a design with async & fallible functions?

probably not so much

@nazar-pc
Copy link
Contributor Author

Can you expand why that is not possible in your situation?

Record store in my opinion must be generic and multi-threaded. It can be in-memory, can be disk-based database, can be remote database with considerable latency, some RPC API, anything. Serializing access to record store is going to make any small delay multiplied by a big factor and can eventually result in DoS. It will work great for a narrow use-case of in-memory Arc<Mutex<Hashmap>>, but anything else will suffer and I don't see why it should (even current API doesn't have such problem except it is blocking corresponding handler, which will be resolved with introduction of async API).

Even if there is mutex, it needs to be an implementation detail of a specific record store, not a wrapper on top of record store.

In our protocol we're talking about thousands requests per minute, in some cases tens of thousands of requests for records/providers. There is no way we can handle them in sequential order and no way they fit into memory either.

@thomaseizinger
Copy link
Contributor

Right, if the records don't fit in memory, it is indeed hard to design an API that is directly called from within the ConnectionHandler.

I am currently travelling so don't have much capacity for coding. What do you think of mxinden/asynchronous-codec#5? Would you be interested in prototyping its usage in libp2p-kad and provide some feedback? It would only be for inbound requests to start with and completely remove RecordStore in favor of handling events returned from the Kademlia behaviour. The ResponsePlaceholder is the key design element which allows responding to requests without having to pass the response back to the behaviour.

@nazar-pc
Copy link
Contributor Author

Fixed compatibility with #3474, works for me now.

I'll take a look at mxinden/asynchronous-codec#5 likely some time tomorrow, thanks!

@mergify
Copy link
Contributor

mergify bot commented Feb 23, 2023

This pull request has merge conflicts. Could you please resolve them @nazar-pc? 🙏

@thomaseizinger
Copy link
Contributor

Closed because stale.

@nazar-pc nazar-pc deleted the support-acknowledgement-for-kademlia-events branch September 13, 2023 10:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants