Skip to content

Commit

Permalink
feat(ext/net): Add multicasting APIs to DatagramConn (denoland#10706)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgwilym committed Mar 9, 2023
1 parent 521cb4c commit eac4e1f
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 2 deletions.
28 changes: 26 additions & 2 deletions cli/tsc/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ declare namespace Deno {
*/
type ToNativeResultType<T extends NativeResultType = NativeResultType> =
T extends NativeStructType ? BufferSource
: ToNativeResultTypeMap[Exclude<T, NativeStructType>];
: ToNativeResultTypeMap[Exclude<T, NativeStructType>];

/** **UNSTABLE**: New API, yet to be vetted.
*
Expand Down Expand Up @@ -225,7 +225,7 @@ declare namespace Deno {
*/
type FromNativeResultType<T extends NativeResultType = NativeResultType> =
T extends NativeStructType ? Uint8Array
: FromNativeResultTypeMap[Exclude<T, NativeStructType>];
: FromNativeResultTypeMap[Exclude<T, NativeStructType>];

/** **UNSTABLE**: New API, yet to be vetted.
*
Expand Down Expand Up @@ -857,6 +857,30 @@ declare namespace Deno {
* @category Network
*/
export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> {
/** Joins an IPv4 multicast group. */
joinMulticastV4(
address: string,
networkInterface: string,
): Promise<{
/** Leaves the multicast group. */
leave: () => Promise<void>;
/** Sets the multicast loopback option. If enabled, multicast packets will be looped back to the local socket. */
setLoopback: (loopback: boolean) => Promise<void>;
/** Sets the time-to-live of outgoing multicast packets for this socket. */
setTTL: (ttl: number) => Promise<void>;
}>;

/** Joins an IPv6 multicast group. */
joinMulticastV6(
address: string,
networkInterface: number,
): Promise<{
/** Leaves the multicast group. */
leave: () => Promise<void>;
/** Sets the multicast loopback option. If enabled, multicast packets will be looped back to the local socket. */
setLoopback: (loopback: boolean) => Promise<void>;
}>;

/** Waits for and resolves to the next message to the instance.
*
* Messages are received in the format of a tuple containing the data array
Expand Down
59 changes: 59 additions & 0 deletions ext/net/01_net.js
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,65 @@ class Datagram {
return this.#addr;
}

async joinMulticastV4(addr, multiInterface) {
await core.opAsync(
"op_net_join_multi_v4_udp",
this.rid,
{ address: addr, interface: multiInterface },
);

return {
leave: () => {
return core.opAsync(
"op_net_leave_multi_v4_udp",
this.rid,
{ address: addr, interface: multiInterface },
);
},
setLoopback: (loopback) => {
return core.opAsync(
"op_net_set_multi_loopback_udp",
this.rid,
addr,
loopback,
);
},
setTTL: (ttl) => {
return core.opAsync(
"op_net_set_multi_ttl_udp",
this.rid,
ttl,
);
},
};
}

async joinMulticastV6(addr, multiInterface) {
await core.opAsync(
"op_net_join_multi_v6_udp",
this.rid,
{ address: addr, interface: multiInterface },
);

return {
leave: () => {
return core.opAsync(
"op_net_leave_multi_v6_udp",
this.rid,
{ address: addr, interface: multiInterface },
);
},
setLoopback: (loopback) => {
return core.opAsync(
"op_net_set_multi_loopback_udp",
this.rid,
addr,
loopback,
);
},
};
}

async receive(p) {
const buf = p || new Uint8Array(this.bufSize);
let nread;
Expand Down
168 changes: 168 additions & 0 deletions ext/net/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ use socket2::Socket;
use socket2::Type;
use std::borrow::Cow;
use std::cell::RefCell;
use std::net::IpAddr as StdIpAddr;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::rc::Rc;
use std::str::FromStr;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
Expand Down Expand Up @@ -65,6 +69,12 @@ pub fn init<P: NetPermissions + 'static>() -> Vec<OpDecl> {
#[cfg(unix)]
crate::ops_unix::op_net_recv_unixpacket::decl(),
op_net_send_udp::decl::<P>(),
op_net_join_multi_v4_udp::decl::<P>(),
op_net_join_multi_v6_udp::decl::<P>(),
op_net_leave_multi_v4_udp::decl::<P>(),
op_net_leave_multi_v6_udp::decl::<P>(),
op_net_set_multi_loopback_udp::decl::<P>(),
op_net_set_multi_ttl_udp::decl::<P>(),
#[cfg(unix)]
crate::ops_unix::op_net_send_unixpacket::decl::<P>(),
op_dns_resolve::decl::<P>(),
Expand All @@ -85,6 +95,18 @@ pub struct IpAddr {
pub port: u16,
}

#[derive(Deserialize, Serialize)]
pub struct MulticastMembershipV4 {
pub address: String,
pub interface: String,
}

#[derive(Deserialize, Serialize)]
pub struct MulticastMembershipV6 {
pub address: String,
pub interface: u32,
}

impl From<SocketAddr> for IpAddr {
fn from(addr: SocketAddr) -> Self {
Self {
Expand Down Expand Up @@ -185,6 +207,152 @@ where
Ok(nwritten)
}

#[op]
async fn op_net_join_multi_v4_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
membership: MulticastMembershipV4,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

let addr = Ipv4Addr::from_str(membership.address.as_str())?;
let interface_addr = Ipv4Addr::from_str(membership.interface.as_str())?;

socket.join_multicast_v4(addr, interface_addr)?;

Ok(())
}

#[op]
async fn op_net_join_multi_v6_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
membership: MulticastMembershipV6,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

let addr = Ipv6Addr::from_str(membership.address.as_str())?;

socket.join_multicast_v6(&addr, membership.interface)?;

Ok(())
}

#[op]
async fn op_net_leave_multi_v4_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
membership: MulticastMembershipV4,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

let addr = Ipv4Addr::from_str(membership.address.as_str())?;
let interface_addr = Ipv4Addr::from_str(membership.interface.as_str())?;

socket.leave_multicast_v4(addr, interface_addr)?;

Ok(())
}

#[op]
async fn op_net_leave_multi_v6_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
membership: MulticastMembershipV6,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

let addr = Ipv6Addr::from_str(membership.address.as_str())?;

socket.leave_multicast_v6(&addr, membership.interface)?;

Ok(())
}

#[op]
async fn op_net_set_multi_loopback_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
group_address: String,
loopback: bool,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

let addr = StdIpAddr::from_str(group_address.as_str())?;

match addr {
StdIpAddr::V4(_) => {
socket.set_multicast_loop_v4(loopback)?;
}
StdIpAddr::V6(_) => {
socket.set_multicast_loop_v6(loopback)?;
}
}

Ok(())
}

#[op]
async fn op_net_set_multi_ttl_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
ttl: u32,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

socket.set_multicast_ttl_v4(ttl)?;

Ok(())
}

#[op]
pub async fn op_net_connect_tcp<NP>(
state: Rc<RefCell<OpState>>,
Expand Down

0 comments on commit eac4e1f

Please sign in to comment.