-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Self dialing #638
[WIP] Self dialing #638
Conversation
I think we should stick to a single transport for this, pipes look nice and simple. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not reviewed the memory transport, I think we should just remove it in favor of pipes.
Note: We probably want to avoid the encryption overhead in the pipe.
|
||
type PipeTransport struct { | ||
mlistenchans *sync.RWMutex | ||
listenchans map[string]chan *PipeConn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really care to support multiple connections for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if that not a thing here I think we can expect some people or other maybe dialling multiple process via shared memory or multiple instance of libp2p in a same process for debugging other layer of libp2p in the future.
p2p/transport/pipe/pipetransport.go
Outdated
|
||
func (t *PipeTransport) CanDial(addr ma.Multiaddr) bool { | ||
protocols := addr.Protocols() | ||
return len(protocols) == 1 && protocols[0].Code == ma.P_P2P |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This filter is too wide I think, it will also catch arbitrary /p2p/QmTarget
addresses which almost certainly cannot be dialed with the pipe.
Maybe we should check the peer Id matching our own here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never pass the final /p2p/...
part to transports so I'm not sure how this works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wanted to do this, but we wont always know the peer ID at the time of creating the transport, so i left it blank. do we think we should lock it in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrm, tricky. My worry is that we'll start trying to pipe-dial arbitrary addrs (I've seen naked /p2p
addrs in the wild).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should always know our own peer ID when creating the transport. You can define the constructor to accept it as a parameter (instead of accepting an upgrader).
p2p/transport/pipe/pipetransport.go
Outdated
manetConnA := WrapNetConn(connA, raddr) | ||
manetConnB := WrapNetConn(connB, raddr) | ||
ch <- manetConnA | ||
return t.upgrader.UpgradeOutbound(ctx, t, manetConnB, p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to disable encryption for this, it's a self-pipe after all and it only adds overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the upgrader provided can be the "insecure" upgrader which just adds muxing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will this work in practice? Let's take care to avoid the encryption overhead.
p2p/transport/pipe/pipetransport.go
Outdated
t.listenchans[laddrStr] = ch | ||
|
||
listener := NewPipeListener(laddr, ch, t) | ||
upgradedListener := t.upgrader.UpgradeListener(t, listener) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here.
thanks @vyzo! def agree, will remove the memorytransport. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To actually hook this in, I think we'll need a /memory
multiaddr. We strip out the /p2p/Qm...
part when looking up transports as that isn't actually a part of the transport address.
We can even introduce a memory network concept so all peers within a process can communicate through this.
- Listening on
/memory
would cause this transport to register a listener (for its peer ID) with the memory" network. - Dialing a peer with
/memory
would cause this transport to try to connect to that peer using the memory network.
We could even name these networks /memory/SOME_NAME
if we want to support multiple local networks but we shouldn't get too fancy (we can also control this in the in-memory network itself).
p2p/transport/pipe/pipetransport.go
Outdated
|
||
func (t *PipeTransport) CanDial(addr ma.Multiaddr) bool { | ||
protocols := addr.Protocols() | ||
return len(protocols) == 1 && protocols[0].Code == ma.P_P2P |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never pass the final /p2p/...
part to transports so I'm not sure how this works.
p2p/transport/mem/memorytransport.go
Outdated
mlistenchans *sync.RWMutex | ||
listenchans map[string]chan *MemoryConn | ||
|
||
upgrader *tptu.Upgrader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need an upgrader at all for this transport. Each stream should just create a new in-memory connection (no need to bother with yamux, etc.).
p2p/transport/mem/memorytransport.go
Outdated
defer t.mlistenchans.RUnlock() | ||
raddrStr := raddr.String() | ||
|
||
ch, ok := t.listenchans[raddrStr] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we just check if the peer ID is ours?
p2p/transport/mem/memorytransport.go
Outdated
func (t *MemoryTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) { | ||
t.mlistenchans.RLock() | ||
defer t.mlistenchans.RUnlock() | ||
raddrStr := raddr.String() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use String()
, just use string(raddr.Bytes())
(avoids pretty printing).
p2p/transport/mem/memoryconn.go
Outdated
|
||
// GrowableRingBuffer wraps an rbuf.AtomicFixedSizeRingBuf with a mutex, | ||
// allowing us to grow its capacity should it become filled. | ||
type GrowableRingBuffer struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to grow? IMO, the buffer should have a fixed limit for back-pressure.
p2p/transport/mem/memoryconn.go
Outdated
parent *MemoryConnPair | ||
} | ||
|
||
func NewMemoryConn(reader, writer *GrowableRingBuffer, parent *MemoryConnPair) *MemoryConn { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may not need any buffering at all. In that case, we can implement this entirely with channels:
func (c *ChanConn) Write(b []byte) (int, error) {
n := 0
for n < len(b) {
select {
case <-c.writeTimer.C:
return n, ErrTimeout
case <-c.closed:
...
case remote, ok := <-c.write:
if !ok { ... }
coppied := copy(remote, b[:n])
c.writeResult <- coppied
n += coppied
}
}
}
func (c *ChanConn) Read(b []byte) (int, error) {
select {
case <-c.readTimer.C:
return 0, ErrTimeout
case <-c.closed:
...
case c.read <- b:
}
return <-c.readResult, nil
}
// avoids any intermediate buffers in `io.Copy`.
func (c *ChanConn) ReadFrom(r io.Reader) (int64, error) {
n := 0
for {
select {
case <-c.writeTimer.C:
return n, ErrTimeout
case <-c.closed:
...
case remote, ok := <-c.write:
if !ok { ... }
coppied, err := r.Read(remote)
c.writeResult <- coppied
n += coppied
if err != nil {
return n, err
}
}
}
}
// c.write is connected to c.read, c.writeResult is connected to c.readResult.
That should use very little memory and should avoid copying when at all possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it looks like net.Pipe
does everything except ReadFrom
(which isn't absolutely necessary. We should probably just use that (i.e., your pipe transport)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeahhhhhh i was thinking about this too! i'm happy to just keep the pipe.
i wanted to avoid having to extend the multiformats repo, though it seems it's unavoidable. i'd advocate for both should the pipe transport have a muxer and no security or should i simply create a new pipe for every new stream? i think we should ditch the memory transport, since your proposed channel based solution is almost exactly what |
By "memory" I just mean in-memory, not necessarily a specific in-memory transport (the user can pick which transport they want to use for process-local connections). Maybe
I'd just create a new pipe for every stream. That'll be a lot less overhead in practice. |
tomaka needs the same: multiformats/multiaddr#71 |
oh wow @lgierth thanks for the link in |
should these slots be named, then? i.e. |
I wouldn't make it a path multiaddr but having arbitrary slots may make sense. However, I can't think of any reason I'd want to use slots where I wouldn't just want to setup a per-peer policy (i.e., tell the local network service "allow peer X to dial peer Y"). What I'm thinking of is basically @tomaka's approach. |
@Stebalien i've fixed the implementation of the pipetransport to eschew upgraders and the like. i'm going to focus my attention to the multiformats side to get that set up, then i'll continue work on this change. remaining:
|
@bigs – how is this going? do you need a review from me or should I wait for the next iteration? In the latter case, when is it due, so I can plan it? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With dialer v2, a memory transport should not be necessary, unless we actually want to support dialling /memory
addresses explicitly, which I would discourage. That scenario might be useful for tests, but off the top of my head, I don't see production usage for that.
The way to do this with dialer v2 is to inject a Preparer
that matches on dials where peer ID == us and returns a singleton in-memory transport.CapableConn
backed by a net.Pipe
.
We don't need to use an upgrader; it seems you're on the money with the PipeStream
here as a way to shim the multiplexing with no actual multiplexer. Encryption is not necessary and we can shim the network.ConnSecurity
accessors with the our own identity on both ends (local, remote).
We should try to get a benchmark in for this.
@raulk the only blocker right now is consensus around /memory multiaddrs at
pierre’s linked issue. i’ve already implemented a version that uses opaque
uint64 slots should we thumbs up that!
…On Tue, May 28, 2019 at 08:59 Raúl Kripalani ***@***.***> wrote:
@bigs <https://github.com/bigs> – how is this going? do you need a review
from me or should I wait for the next iteration? In the latter case, when
is it due, so I can plan it? Thanks!
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#638?email_source=notifications&email_token=AABUCWRYR45WZZ7GST6IHALPXUUBJA5CNFSM4HNXZK4KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODWMBGSI#issuecomment-496505673>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AABUCWXPX5NWQGH6QBEFUW3PXUUBJANCNFSM4HNXZK4A>
.
|
regarding the dialer work, i think the transport is still necessary (you still have to listen somewhere in your code) but it will be much cleaner determining where we want to use it, just as you’ve described. edit: the shimming is already there as well. down to benchmark, what’s the goal or target? being slower than a tcp transport would be an absolute failure, should we just aim to be better than that? |
Hm, I think listening is not necessary. I think of this as a persistent, in-memory bridge rather than a traditional transport. There's only ever a singleton virtual connection. We create it at system startup, and whenever the swarm processes a dial to ourselves, we return that connection. We probably don't even want to generate any connection Notifee events, nor trigger the connection handler. It doesn't need to be registered in the swarm either, and it shouldn't be subject to connection management. We'll need access to the swarm's One detail here is that we should only consider handlers that have annotated themselves as "supporting self-dialling", as per the original design: #328 (comment) |
i suppose so long as a stream handler exists, listening becomes irrelevant, though that does break pierre’s testing use case, which could be quite nice. it allows us to allocate an address that doesn’t consume any resources beyond cpu/memory which could potentially enable large tests. do see what you mean, though. |
Also adds support to cross-host dialing in pipe transport
doesn't look like my comment posted but as of last friday this is ready for final review. only thing left after review is go.mod directive replacement and publish. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial review.
return s.outbound.Write(b) | ||
} | ||
|
||
func (s *PipeStream) SetDeadline(t time.Time) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When setting deadlines, we can probably improve perf by only setting deadlines on the side we care about (read from inbound, write from outbound).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good!
|
||
type PipeStream struct { | ||
inbound net.Conn | ||
outbound net.Conn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if go would accept a patch to expose a CloseWrite
function...
return s.outbound.Close() | ||
} | ||
|
||
func (s *PipeStream) Reset() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm. With a normal pipe conn, we'd be able to call CloseWithError
. Unfortunately, that doesn't look possible here.
We either need to:
- Re-implement this type.
- Set some atomic "reset" flag before closing. and check that flag after reading/writing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what exactly do you mean here? the implementation as stands seems reasonable to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resetting a stream needs to close it with an error. We don't want the other side to think we successfully finished writing, we want them to see that something went wrong. For example, ioutil.ReadAll(stream)
should fail of the remote side calls Reset()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahhh makes total sense. missed this comment. implemented a simple version of this with buffered channels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated friday @Stebalien edit: sorry forgot to comment!
replace github.com/libp2p/go-libp2p-swarm => ../go-libp2p-swarm | ||
|
||
replace github.com/multiformats/go-multiaddr => ../../workspace-multiformats/go-multiaddr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing else to say :)
|
||
type PipeTransport struct { | ||
mlistenchans *sync.RWMutex | ||
listenchans map[string]chan *PipeConn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if that not a thing here I think we can expect some people or other maybe dialling multiple process via shared memory or multiple instance of libp2p in a same process for debugging other layer of libp2p in the future.
Hi, is there any progress on that effort ? seems to me it has been abandonned since more a year now and I'm facing similar issue on my own project now and it seems I will have to spent time on a feature which would be definitively great to have in libp2p ... In my case I would like to simulate several identity in a tiny environment regarding the amount of p2p node roles I've to manage. I'm currently starting many p2p nodes (currently 12) but it has its limit (many current IOs leading to many timeouts ... Note: libp2p nodes are not the only IOs consuming component in my test scenario). Let me know if you see some helpers... Thank you |
If you're doing local testing, I recommend using a "mock network": https://pkg.go.dev/github.com/libp2p/[email protected]/p2p/net/mock#Mocknet. |
Well this is not only local testing, I would like some of my libp2p node to handles several identity/IPFS addresses a little like nginx can handles several URLs. Currently for each identity I'm running one libp2p node because of Thank you for this first answer however, that may help even it is not targeting my entire scenario ;) |
You want multiple identities but one node? Every node will always have it's own identity. Are you sure you need multiple identities not just multiple protocols? Let's move this discussion over to https://discuss.libp2p.io, I think we can solve your issue without any new features. |
Hey folks, this PR is really old. A lot has changed and interest here is gone. I'm closing this PR to tidy things up. |
The beginning of self dialing work, eventually closing #328.
This work starts with the transports. I developed a ring buffer based
MemoryTransport
, but then, in documenting it, realized I really should just use pipes. I've implemented both and left them in for posterity. I'd be happy to delete theMemoryTransport
if we decide there is no use for it.The memory transport expects to listen to
/p2p
or/ipfs
multiaddrs populated with the peer's identity.