Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autonat protocol #739

Merged
merged 22 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion libp2p/builders.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import
switch, peerid, peerinfo, stream/connection, multiaddress,
crypto/crypto, transports/[transport, tcptransport],
muxers/[muxer, mplex/mplex, yamux/yamux],
protocols/[identify, secure/secure, secure/noise, relay],
protocols/[identify, secure/secure, secure/noise, relay, autonat],
connmanager, upgrademngrs/muxedupgrade,
nameresolving/nameresolver,
errors, utility
Expand Down Expand Up @@ -58,6 +58,7 @@ type
agentVersion: string
nameResolver: NameResolver
peerStoreCapacity: Option[int]
autonat: bool
isCircuitRelay: bool
circuitRelayCanHop: bool

Expand Down Expand Up @@ -185,6 +186,10 @@ proc withNameResolver*(b: SwitchBuilder, nameResolver: NameResolver): SwitchBuil
b.nameResolver = nameResolver
b

proc withAutonat*(b: SwitchBuilder): SwitchBuilder =
b.autonat = true
b

proc withRelayTransport*(b: SwitchBuilder, canHop: bool): SwitchBuilder =
b.isCircuitRelay = true
b.circuitRelayCanHop = canHop
Expand Down Expand Up @@ -255,6 +260,10 @@ proc build*(b: SwitchBuilder): Switch
nameResolver = b.nameResolver,
peerStore = peerStore)

if b.autonat:
let autonat = Autonat.new(switch)
switch.mount(autonat)

if b.isCircuitRelay:
let relay = Relay.new(switch, b.circuitRelayCanHop)
switch.mount(relay)
Expand Down
40 changes: 40 additions & 0 deletions libp2p/dialer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import dial,
stream/connection,
transports/transport,
nameresolving/nameresolver,
upgrademngrs/upgrade,
errors

export dial, errors
Expand Down Expand Up @@ -185,6 +186,45 @@ proc negotiateStream(

return conn

method canDial*(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
protos: seq[string]): Future[MultiAddress] {.raises: [Defect], async.} =
lchenut marked this conversation as resolved.
Show resolved Hide resolved
## Create a protocol stream and in order to check
## if a connection is possible.
## Doesn't use the Connection Manager to save it.
##

trace "Check if it can dial", peerId, addrs, protos
for address in addrs: # for each address
let
hostname = address.getHostname()
resolvedAddresses =
if isNil(self.nameResolver): @[address]
else: await self.nameResolver.resolveMAddress(address)

for a in resolvedAddresses: # for each resolved address
for transport in self.transports: # for each transport
if transport.handles(a): # check if it can dial it
trace "Dialing address", address = $a, peerId, hostname
try:
let dialed = await transport.dial(hostname, address)
defer: await dialed.close()
# make sure to assign the peer to the connection
dialed.peerId = peerId
# also keep track of the connection's bottom unsafe transport direction
# required by gossipsub scoring
dialed.transportDir = Direction.Out
let sconn = await transport.upgrader.secure(dialed)
lchenut marked this conversation as resolved.
Show resolved Hide resolved
await sconn.close()
return sconn.observedAddr
except CancelledError as exc:
raise exc
except CatchableError as exc:
continue # Try the next address
raise newException(DialFailedError, "Dial failed")

method dial*(
self: Dialer,
peerId: PeerId,
Expand Down
276 changes: 276 additions & 0 deletions libp2p/protocols/autonat.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
# Nim-LibP2P
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

{.push raises: [Defect].}

import std/[options, sets, sequtils]
import chronos, chronicles, stew/objects
import ./protocol,
../switch,
../multiaddress,
../peerid,
../errors

logScope:
topics = "libp2p identify"
Menduist marked this conversation as resolved.
Show resolved Hide resolved

const
AutonatCodec* = "/libp2p/autonat/1.0.0"
ArbitraryLimit = 128 # TODO: make it a configuration variable or smthg
lchenut marked this conversation as resolved.
Show resolved Hide resolved

type
AutonatError* = object of LPError

MsgType* = enum
Dial = 0
DialResponse = 1

ResponseStatus* = enum
Ok = 0
DialError = 100
DialRefused = 101
BadRequest = 200
InternalError = 300

AutonatPeerInfo* = object
id: Option[PeerID]
addrs: seq[MultiAddress]

AutonatDial* = object
peerInfo: Option[AutonatPeerInfo]

AutonatDialResponse* = object
status*: ResponseStatus
text*: Option[string]
ma*: Option[MultiAddress]

AutonatMsg = object
msgType: MsgType
dial: Option[AutonatDial]
response: Option[AutonatDialResponse]

proc encode*(msg: AutonatMsg): ProtoBuffer =
result = initProtoBuffer()
result.write(1, msg.msgType.uint)
if msg.dial.isSome():
var dial = initProtoBuffer()
if msg.dial.get().peerInfo.isSome():
var bufferPeerInfo = initProtoBuffer()
let peerInfo = msg.dial.get().peerInfo.get()
if peerInfo.id.isSome():
bufferPeerInfo.write(1, peerInfo.id.get())
for ma in peerInfo.addrs:
bufferPeerInfo.write(2, ma.data.buffer)
bufferPeerInfo.finish()
dial.write(1, bufferPeerInfo.buffer)
dial.finish()
result.write(2, dial.buffer)
if msg.response.isSome():
var bufferResponse = initProtoBuffer()
let response = msg.response.get()
bufferResponse.write(1, response.status.uint)
if response.text.isSome():
bufferResponse.write(2, response.text.get())
if response.ma.isSome():
bufferResponse.write(3, response.ma.get())
bufferResponse.finish()
result.write(3, bufferResponse.buffer)
result.finish()

proc encode*(d: AutonatDial): ProtoBuffer =
result = initProtoBuffer()
result.write(1, MsgType.Dial.uint)
var dial = initProtoBuffer()
if d.peerInfo.isSome():
var bufferPeerInfo = initProtoBuffer()
let peerInfo = d.peerInfo.get()
if peerInfo.id.isSome():
bufferPeerInfo.write(1, peerInfo.id.get())
for ma in peerInfo.addrs:
bufferPeerInfo.write(2, ma.data.buffer)
bufferPeerInfo.finish()
dial.write(1, bufferPeerInfo.buffer)
dial.finish()
result.write(2, dial.buffer)
result.finish()

proc encode*(r: AutonatDialResponse): ProtoBuffer =
result = initProtoBuffer()
result.write(1, MsgType.DialResponse.uint)
var bufferResponse = initProtoBuffer()
bufferResponse.write(1, r.status.uint)
if r.text.isSome():
bufferResponse.write(2, r.text.get())
if r.ma.isSome():
bufferResponse.write(3, r.ma.get())
bufferResponse.finish()
result.write(3, bufferResponse.buffer)
result.finish()

proc decode(_: typedesc[AutonatMsg], buf: seq[byte]): Option[AutonatMsg] =
var
msgTypeOrd: uint32
pbDial: ProtoBuffer
pbResponse: ProtoBuffer
msg: AutonatMsg

let
pb = initProtoBuffer(buf)
r1 = pb.getField(1, msgTypeOrd)
r2 = pb.getField(2, pbDial)
r3 = pb.getField(3, pbResponse)
if r1.isErr() or r2.isErr() or r3.isErr(): return none(AutonatMsg)

if r1.get() and not checkedEnumAssign(msg.msgType, msgTypeOrd):
return none(AutonatMsg)
if r2.get():
var
pbPeerInfo: ProtoBuffer
dial: AutonatDial
let
r4 = pbDial.getField(1, pbPeerInfo)
if r4.isErr(): return none(AutonatMsg)

var peerInfo: AutonatPeerInfo
if r4.get():
var pid: PeerId
let
r5 = pbPeerInfo.getField(1, pid)
r6 = pbPeerInfo.getRepeatedField(2, peerInfo.addrs)
if r5.isErr() or r6.isErr(): return none(AutonatMsg)
if r5.get(): peerInfo.id = some(pid)
dial.peerInfo = some(peerInfo)
msg.dial = some(dial)

if r3.get():
var
statusOrd: uint
text: string
ma: MultiAddress
response: AutonatDialResponse

let
r4 = pbResponse.getField(1, statusOrd)
r5 = pbResponse.getField(2, text)
r6 = pbResponse.getField(3, ma)

if r4.isErr() or r5.isErr() or r6.isErr() or
(r4.get() and not checkedEnumAssign(response.status, statusOrd)):
return none(AutonatMsg)
if r5.get(): response.text = some(text)
if r6.get(): response.ma = some(ma)
msg.response = some(response)

return some(msg)

proc sendDial(conn: Connection, pid: PeerId, addrs: seq[MultiAddress]) {.async.} =
let pb = AutonatDial(peerInfo: some(AutonatPeerInfo(
id: some(pid),
addrs: addrs
))).encode()
await conn.writeLp(pb.buffer)

proc sendResponseError(conn: Connection, status: ResponseStatus, text: string = "") {.async.} =
let pb = AutonatDialResponse(
status: status,
text: if text == "": none(string) else: some(text),
ma: none(MultiAddress)
).encode()
await conn.writeLp(pb.buffer)

proc sendResponseOk(conn: Connection, ma: MultiAddress) {.async.} =
let pb = AutonatDialResponse(
status: ResponseStatus.Ok,
text: some("Ok"),
ma: some(ma)
).encode()
await conn.writeLp(pb.buffer)

type
Autonat* = ref object of LPProtocol
switch*: Switch

proc dialBack*(a: Autonat, pid: PeerId, ma: MultiAddress|seq[MultiAddress]):
Future[MultiAddress] {.async.} =
let addrs = when ma is MultiAddress: @[ma] else: ma
let conn = await a.switch.dial(pid, addrs, AutonatCodec)
defer: await conn.close()
await conn.sendDial(a.switch.peerInfo.peerId, a.switch.peerInfo.addrs)
let msgOpt = AutonatMsg.decode(await conn.readLp(1024))
if msgOpt.isNone() or
msgOpt.get().msgType != DialResponse or
msgOpt.get().response.isNone():
raise newException(AutonatError, "Unexpected response")
let response = msgOpt.get().response.get()
if response.status != ResponseStatus.Ok:
raise newException(AutonatError, "Bad status " &
$response.status & " " &
response.text.get(""))
if response.ma.isNone():
raise newException(AutonatError, "Missing address")
return response.ma.get()

proc doDial(a: Autonat, conn: Connection, addrs: seq[MultiAddress]) {.async.} =
try:
let ma = await Dialer(a.switch.dialer).canDial(conn.peerId, addrs, @[AutonatCodec])
lchenut marked this conversation as resolved.
Show resolved Hide resolved
await conn.sendResponseOk(ma)
except CancelledError as exc:
raise exc
except CatchableError as exc:
await conn.sendResponseError(DialError, exc.msg)

proc handleDial(a: Autonat, conn: Connection, msg: AutonatMsg): Future[void] =
if msg.dial.isNone() or msg.dial.get().peerInfo.isNone():
return conn.sendResponseError(BadRequest, "Missing Peer Info")
let peerInfo = msg.dial.get().peerInfo.get()
if peerInfo.id.isSome() and peerInfo.id.get() != conn.peerId:
return conn.sendResponseError(BadRequest, "PeerId mismatch")

if conn.observedAddr[0].isErr() or (not IP4.match(conn.observedAddr[0].get()) and
not IP6.match(conn.observedAddr[0].get())):
return conn.sendResponseError(InternalError, "Expected an IP address")

var addrs = initHashSet[MultiAddress]()
addrs.incl(conn.observedAddr)
for ma in peerInfo.addrs:
let maFirst = ma[0]
if maFirst.isErr() or (not IP4.match(maFirst.get()) and
not IP6.match(maFirst.get())):
lchenut marked this conversation as resolved.
Show resolved Hide resolved
continue
addrs.incl(ma)
lchenut marked this conversation as resolved.
Show resolved Hide resolved
if len(addrs) >= ArbitraryLimit:
break

if len(addrs) == 0:
return conn.sendResponseError(DialRefused, "No dialable address")
return a.doDial(conn, toSeq(addrs))

proc new*(T: typedesc[Autonat], switch: Switch): T =
let autonat = T(switch: switch)
autonat.init()
autonat

method init*(a: Autonat) =
proc handleStream(conn: Connection, proto: string) {.async, gcsafe.} =
try:
let msgOpt = AutonatMsg.decode(await conn.readLp(1024))
if msgOpt.isNone() or msgOpt.get().msgType != MsgType.Dial:
raise newException(AutonatError, "Received malformed message")
let msg = msgOpt.get()
await a.handleDial(conn, msg)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "exception in autonat handler", exc = exc.msg, conn
finally:
trace "exiting autonat handler", conn
await conn.close()

a.handler = handleStream
a.codecs = @[AutonatCodec]
Loading