Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Self dialing #638

Closed
wants to merge 8 commits into from
Closed

[WIP] Self dialing #638

wants to merge 8 commits into from

Conversation

bigs
Copy link
Contributor

@bigs bigs commented May 17, 2019

The beginning of self dialing work, eventually closing #328.

This work starts with the transports. I developed a ring buffer based MemoryTransport, but then, in documenting it, realized I really should just use pipes. I've implemented both and left them in for posterity. I'd be happy to delete the MemoryTransport if we decide there is no use for it.

The memory transport expects to listen to /p2p or /ipfs multiaddrs populated with the peer's identity.

@bigs bigs requested review from vyzo, Stebalien and raulk May 17, 2019 19:21
@vyzo
Copy link
Contributor

vyzo commented May 17, 2019

I think we should stick to a single transport for this, pipes look nice and simple.

Copy link
Contributor

@vyzo vyzo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not reviewed the memory transport, I think we should just remove it in favor of pipes.

Note: We probably want to avoid the encryption overhead in the pipe.


type PipeTransport struct {
mlistenchans *sync.RWMutex
listenchans map[string]chan *PipeConn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really care to support multiple connections for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if that not a thing here I think we can expect some people or other maybe dialling multiple process via shared memory or multiple instance of libp2p in a same process for debugging other layer of libp2p in the future.


func (t *PipeTransport) CanDial(addr ma.Multiaddr) bool {
protocols := addr.Protocols()
return len(protocols) == 1 && protocols[0].Code == ma.P_P2P
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filter is too wide I think, it will also catch arbitrary /p2p/QmTarget addresses which almost certainly cannot be dialed with the pipe.
Maybe we should check the peer Id matching our own here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never pass the final /p2p/... part to transports so I'm not sure how this works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wanted to do this, but we wont always know the peer ID at the time of creating the transport, so i left it blank. do we think we should lock it in?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, tricky. My worry is that we'll start trying to pipe-dial arbitrary addrs (I've seen naked /p2p addrs in the wild).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always know our own peer ID when creating the transport. You can define the constructor to accept it as a parameter (instead of accepting an upgrader).

manetConnA := WrapNetConn(connA, raddr)
manetConnB := WrapNetConn(connB, raddr)
ch <- manetConnA
return t.upgrader.UpgradeOutbound(ctx, t, manetConnB, p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to disable encryption for this, it's a self-pipe after all and it only adds overhead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the upgrader provided can be the "insecure" upgrader which just adds muxing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this work in practice? Let's take care to avoid the encryption overhead.

t.listenchans[laddrStr] = ch

listener := NewPipeListener(laddr, ch, t)
upgradedListener := t.upgrader.UpgradeListener(t, listener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

@bigs
Copy link
Contributor Author

bigs commented May 17, 2019

thanks @vyzo! def agree, will remove the memorytransport.

Copy link
Member

@Stebalien Stebalien left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To actually hook this in, I think we'll need a /memory multiaddr. We strip out the /p2p/Qm... part when looking up transports as that isn't actually a part of the transport address.

We can even introduce a memory network concept so all peers within a process can communicate through this.

  • Listening on /memory would cause this transport to register a listener (for its peer ID) with the memory" network.
  • Dialing a peer with /memory would cause this transport to try to connect to that peer using the memory network.

We could even name these networks /memory/SOME_NAME if we want to support multiple local networks but we shouldn't get too fancy (we can also control this in the in-memory network itself).


func (t *PipeTransport) CanDial(addr ma.Multiaddr) bool {
protocols := addr.Protocols()
return len(protocols) == 1 && protocols[0].Code == ma.P_P2P
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never pass the final /p2p/... part to transports so I'm not sure how this works.

mlistenchans *sync.RWMutex
listenchans map[string]chan *MemoryConn

upgrader *tptu.Upgrader
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need an upgrader at all for this transport. Each stream should just create a new in-memory connection (no need to bother with yamux, etc.).

defer t.mlistenchans.RUnlock()
raddrStr := raddr.String()

ch, ok := t.listenchans[raddrStr]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we just check if the peer ID is ours?

func (t *MemoryTransport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (tpt.Conn, error) {
t.mlistenchans.RLock()
defer t.mlistenchans.RUnlock()
raddrStr := raddr.String()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to use String(), just use string(raddr.Bytes()) (avoids pretty printing).


// GrowableRingBuffer wraps an rbuf.AtomicFixedSizeRingBuf with a mutex,
// allowing us to grow its capacity should it become filled.
type GrowableRingBuffer struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to grow? IMO, the buffer should have a fixed limit for back-pressure.

parent *MemoryConnPair
}

func NewMemoryConn(reader, writer *GrowableRingBuffer, parent *MemoryConnPair) *MemoryConn {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not need any buffering at all. In that case, we can implement this entirely with channels:

func (c *ChanConn) Write(b []byte) (int, error) {
  n := 0
  for n < len(b) {
    select {
    case <-c.writeTimer.C:
      return n, ErrTimeout
    case <-c.closed:
      ...
    case remote, ok := <-c.write:
      if !ok { ... }
      coppied := copy(remote, b[:n])
      c.writeResult <- coppied
      n += coppied
    }
  }
}

func (c *ChanConn) Read(b []byte) (int, error) {
  select {
    case <-c.readTimer.C:
      return 0, ErrTimeout
    case <-c.closed:
      ...
    case c.read <- b:
  }
  return <-c.readResult, nil
}


// avoids any intermediate buffers in `io.Copy`.
func (c *ChanConn) ReadFrom(r io.Reader) (int64, error) {
  n := 0
  for {
    select {
    case <-c.writeTimer.C:
      return n, ErrTimeout
    case <-c.closed:
      ...
    case remote, ok := <-c.write:
      if !ok { ... }
      coppied, err := r.Read(remote)
      c.writeResult <- coppied
      n += coppied
      if err != nil {
        return n, err
      }
    }
  }
}


// c.write is connected to c.read, c.writeResult is connected to c.readResult.

That should use very little memory and should avoid copying when at all possible.

Copy link
Member

@Stebalien Stebalien May 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it looks like net.Pipe does everything except ReadFrom (which isn't absolutely necessary. We should probably just use that (i.e., your pipe transport)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeahhhhhh i was thinking about this too! i'm happy to just keep the pipe.

@bigs
Copy link
Contributor Author

bigs commented May 17, 2019

To actually hook this in, I think we'll need a /memory multiaddr. We strip out the /p2p/Qm... part when looking up transports as that isn't actually a part of the transport address.

We can even introduce a memory network concept so all peers within a process can communicate through this.

* Listening on `/memory` would cause this transport to register a listener (for its peer ID) with the memory" network.

* Dialing a peer with `/memory` would cause this transport to try to connect to that peer using the memory network.

We could even name these networks /memory/SOME_NAME if we want to support multiple local networks but we shouldn't get too fancy (we can also control this in the in-memory network itself).

i wanted to avoid having to extend the multiformats repo, though it seems it's unavoidable. i'd advocate for both /memory and /pipe, even if we don't choose to implement memory at the moment (or reconfigure to use channels).

should the pipe transport have a muxer and no security or should i simply create a new pipe for every new stream?

i think we should ditch the memory transport, since your proposed channel based solution is almost exactly what net.Pipe() is doing

@Stebalien
Copy link
Member

i wanted to avoid having to extend the multiformats repo, though it seems it's unavoidable. i'd advocate for both /memory and /pipe, even if we don't choose to implement memory at the moment (or reconfigure to use channels).

By "memory" I just mean in-memory, not necessarily a specific in-memory transport (the user can pick which transport they want to use for process-local connections). Maybe /local is better? The idea is that any application would get to define what the "local" protocol is and how it should work.

should the pipe transport have a muxer and no security or should i simply create a new pipe for every new stream?

I'd just create a new pipe for every stream. That'll be a lot less overhead in practice.

@ghost
Copy link

ghost commented May 17, 2019

By "memory" I just mean in-memory, not necessarily a specific in-memory transport

tomaka needs the same: multiformats/multiaddr#71

@bigs
Copy link
Contributor Author

bigs commented May 17, 2019

oh wow @lgierth thanks for the link in

@bigs
Copy link
Contributor Author

bigs commented May 18, 2019

i wanted to avoid having to extend the multiformats repo, though it seems it's unavoidable. i'd advocate for both /memory and /pipe, even if we don't choose to implement memory at the moment (or reconfigure to use channels).

By "memory" I just mean in-memory, not necessarily a specific in-memory transport (the user can pick which transport they want to use for process-local connections). Maybe /local is better? The idea is that any application would get to define what the "local" protocol is and how it should work.

should these slots be named, then? i.e. /memory/foo should /memory be a path multiaddr? so we sandbox by peer ID but allow arbitrary named ports?

@Stebalien
Copy link
Member

should these slots be named, then? i.e. /memory/foo should /memory be a path multiaddr? so we sandbox by peer ID but allow arbitrary named ports?

I wouldn't make it a path multiaddr but having arbitrary slots may make sense. However, I can't think of any reason I'd want to use slots where I wouldn't just want to setup a per-peer policy (i.e., tell the local network service "allow peer X to dial peer Y").

What I'm thinking of is basically @tomaka's approach.

@bigs
Copy link
Contributor Author

bigs commented May 21, 2019

@Stebalien i've fixed the implementation of the pipetransport to eschew upgraders and the like. i'm going to focus my attention to the multiformats side to get that set up, then i'll continue work on this change. remaining:

  • update multiaddr handling to use /memory
  • add options and new transport to libp2p hosts
  • implement self dialing via pipetransport when enabled on a host

@raulk
Copy link
Member

raulk commented May 28, 2019

@bigs – how is this going? do you need a review from me or should I wait for the next iteration? In the latter case, when is it due, so I can plan it? Thanks!

Copy link
Member

@raulk raulk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With dialer v2, a memory transport should not be necessary, unless we actually want to support dialling /memory addresses explicitly, which I would discourage. That scenario might be useful for tests, but off the top of my head, I don't see production usage for that.

The way to do this with dialer v2 is to inject a Preparer that matches on dials where peer ID == us and returns a singleton in-memory transport.CapableConn backed by a net.Pipe.

We don't need to use an upgrader; it seems you're on the money with the PipeStream here as a way to shim the multiplexing with no actual multiplexer. Encryption is not necessary and we can shim the network.ConnSecurity accessors with the our own identity on both ends (local, remote).

We should try to get a benchmark in for this.

@bigs
Copy link
Contributor Author

bigs commented May 28, 2019 via email

@bigs
Copy link
Contributor Author

bigs commented May 28, 2019

regarding the dialer work, i think the transport is still necessary (you still have to listen somewhere in your code) but it will be much cleaner determining where we want to use it, just as you’ve described.

edit: the shimming is already there as well. down to benchmark, what’s the goal or target? being slower than a tcp transport would be an absolute failure, should we just aim to be better than that?

@raulk
Copy link
Member

raulk commented May 28, 2019

@bigs

i think the transport is still necessary (you still have to listen somewhere in your code)

Hm, I think listening is not necessary. I think of this as a persistent, in-memory bridge rather than a traditional transport.

There's only ever a singleton virtual connection. We create it at system startup, and whenever the swarm processes a dial to ourselves, we return that connection.

We probably don't even want to generate any connection Notifee events, nor trigger the connection handler. It doesn't need to be registered in the swarm either, and it shouldn't be subject to connection management.

We'll need access to the swarm's StreamHandler, so we can invoke it when a new stream is opened. That should give way to multistream-select negotiation, and eventually a callback to the appropriate stream handler.

One detail here is that we should only consider handlers that have annotated themselves as "supporting self-dialling", as per the original design: #328 (comment)

@bigs
Copy link
Contributor Author

bigs commented May 28, 2019

i suppose so long as a stream handler exists, listening becomes irrelevant, though that does break pierre’s testing use case, which could be quite nice. it allows us to allocate an address that doesn’t consume any resources beyond cpu/memory which could potentially enable large tests. do see what you mean, though.

@bigs
Copy link
Contributor Author

bigs commented Jun 3, 2019

doesn't look like my comment posted but as of last friday this is ready for final review. only thing left after review is go.mod directive replacement and publish.

@raulk @Stebalien @vyzo

Copy link
Member

@Stebalien Stebalien left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial review.

return s.outbound.Write(b)
}

func (s *PipeStream) SetDeadline(t time.Time) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When setting deadlines, we can probably improve perf by only setting deadlines on the side we care about (read from inbound, write from outbound).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good!


type PipeStream struct {
inbound net.Conn
outbound net.Conn
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if go would accept a patch to expose a CloseWrite function...

return s.outbound.Close()
}

func (s *PipeStream) Reset() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. With a normal pipe conn, we'd be able to call CloseWithError. Unfortunately, that doesn't look possible here.

We either need to:

  1. Re-implement this type.
  2. Set some atomic "reset" flag before closing. and check that flag after reading/writing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what exactly do you mean here? the implementation as stands seems reasonable to me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resetting a stream needs to close it with an error. We don't want the other side to think we successfully finished writing, we want them to see that something went wrong. For example, ioutil.ReadAll(stream) should fail of the remote side calls Reset().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhh makes total sense. missed this comment. implemented a simple version of this with buffered channels.

Copy link
Contributor Author

@bigs bigs Jun 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated friday @Stebalien edit: sorry forgot to comment!

Comment on lines +43 to +45
replace github.com/libp2p/go-libp2p-swarm => ../go-libp2p-swarm

replace github.com/multiformats/go-multiaddr => ../../workspace-multiformats/go-multiaddr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing else to say :)


type PipeTransport struct {
mlistenchans *sync.RWMutex
listenchans map[string]chan *PipeConn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if that not a thing here I think we can expect some people or other maybe dialling multiple process via shared memory or multiple instance of libp2p in a same process for debugging other layer of libp2p in the future.

@mffrench
Copy link

mffrench commented Dec 7, 2020

Hi,

is there any progress on that effort ? seems to me it has been abandonned since more a year now and I'm facing similar issue on my own project now and it seems I will have to spent time on a feature which would be definitively great to have in libp2p ...

In my case I would like to simulate several identity in a tiny environment regarding the amount of p2p node roles I've to manage. I'm currently starting many p2p nodes (currently 12) but it has its limit (many current IOs leading to many timeouts ... Note: libp2p nodes are not the only IOs consuming component in my test scenario). Let me know if you see some helpers...

Thank you

@Stebalien
Copy link
Member

If you're doing local testing, I recommend using a "mock network": https://pkg.go.dev/github.com/libp2p/[email protected]/p2p/net/mock#Mocknet.

@mffrench
Copy link

mffrench commented Dec 7, 2020

Well this is not only local testing, I would like some of my libp2p node to handles several identity/IPFS addresses a little like nginx can handles several URLs.

Currently for each identity I'm running one libp2p node because of dial to self attempted issue. But in my business case, most of these identity should be managed by a super libp2p node - or lets say a nginx like libp2p node - regarding their roles on the network...

Thank you for this first answer however, that may help even it is not targeting my entire scenario ;)

@Stebalien
Copy link
Member

You want multiple identities but one node? Every node will always have it's own identity. Are you sure you need multiple identities not just multiple protocols? Let's move this discussion over to https://discuss.libp2p.io, I think we can solve your issue without any new features.

@MarcoPolo
Copy link
Collaborator

Hey folks, this PR is really old. A lot has changed and interest here is gone. I'm closing this PR to tidy things up.

@MarcoPolo MarcoPolo closed this Jun 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants