diff --git a/packages/ice/src/ice.ts b/packages/ice/src/ice.ts index 0aa909cb..369c8d5e 100644 --- a/packages/ice/src/ice.ts +++ b/packages/ice/src/ice.ts @@ -13,7 +13,7 @@ import { InterfaceAddresses } from "../../common/src/network"; import { Candidate, candidateFoundation, candidatePriority } from "./candidate"; import { DnsLookup } from "./dns/lookup"; import { TransactionError } from "./exceptions"; -import { Future, PQueue, difference, future, randomString } from "./helper"; +import { Future, PQueue, future, randomString } from "./helper"; import { classes, methods } from "./stun/const"; import { Message, parseMessage } from "./stun/message"; import { StunProtocol } from "./stun/protocol"; @@ -24,6 +24,13 @@ import { getHostAddresses, normalizeFamilyNodeV18 } from "./utils"; const log = debug("werift-ice : packages/ice/src/ice.ts : log"); export class Connection { + readonly stunServer?: Address; + readonly turnServer?: Address; + readonly useIpv4: boolean; + readonly useIpv6: boolean; + readonly options: IceOptions; + + iceControlling: boolean; localUserName = randomString(4); localPassword = randomString(22); remotePassword: string = ""; @@ -31,21 +38,15 @@ export class Connection { remoteIsLite = false; checkList: CandidatePair[] = []; localCandidates: Candidate[] = []; - stunServer?: Address; - turnServer?: Address; - useIpv4: boolean; - useIpv6: boolean; - options: IceOptions; remoteCandidatesEnd = false; - _localCandidatesEnd = false; - _tieBreaker: bigint = BigInt(new Uint64BE(randomBytes(64)).toString()); state: IceState = "new"; - dnsLookup?: DnsLookup; - restarted = false; - - readonly onData = new Event<[Buffer, number]>(); - readonly stateChanged = new Event<[IceState]>(); + restarting = false; + /**@private */ + _localCandidatesEnd = false; + /**@private */ + _tieBreaker: bigint = BigInt(new Uint64BE(randomBytes(64)).toString()); + private dnsLookup?: DnsLookup; private _remoteCandidates: Candidate[] = []; // P2P接続完了したソケット private nominated?: CandidatePair; @@ -58,7 +59,11 @@ export class Connection { private queryConsentHandle?: Future; private promiseGatherCandidates?: Event<[]>; - constructor(public iceControlling: boolean, options?: Partial) { + readonly onData = new Event<[Buffer, number]>(); + readonly stateChanged = new Event<[IceState]>(); + + constructor(iceControlling: boolean, options?: Partial) { + this.iceControlling = iceControlling; this.options = { ...defaultOptions, ...options, @@ -308,6 +313,7 @@ export class Connection { this.queryConsentHandle = future(this.queryConsent()); this.setState("connected"); + this.restarting = false; } private unfreezeInitial() { @@ -511,25 +517,6 @@ export class Connection { } send = async (data: Buffer) => { - // """ - // Send a datagram on the first component. - - // If the connection is not established, a `ConnectionError` is raised. - - // :param data: The data to be sent. - // """ - await this.sendTo(data); - }; - - private async sendTo(data: Buffer) { - // """ - // Send a datagram on the specified component. - - // If the connection is not established, a `ConnectionError` is raised. - - // :param data: The data to be sent. - // :param component: The component on which to send the data. - // """ const activePair = this.nominated; if (activePair) { await activePair.protocol.sendData(data, activePair.remoteAddr); @@ -537,7 +524,7 @@ export class Connection { // log("Cannot send data, ice not connected"); return; } - } + }; getDefaultCandidate() { const candidates = this.localCandidates.sort( @@ -675,12 +662,6 @@ export class Connection { this.sortCheckList(); } - resetNominatedPair() { - log("resetNominatedPair"); - this.nominated = undefined; - this.nominating = false; - } - private checkComplete(pair: CandidatePair) { pair.handle = undefined; if (pair.state === CandidatePairState.SUCCEEDED) { @@ -963,6 +944,31 @@ export class Connection { log("sendStun error", e); }); } + + restart() { + this.restarting = true; + + this.localUserName = randomString(4); + this.localPassword = randomString(22); + this.remotePassword = ""; + this.remoteUsername = ""; + this.remoteIsLite = false; + this.checkList = []; + this.localCandidates = []; + this.remoteCandidatesEnd = false; + this.state = "new"; + this._localCandidatesEnd = false; + this._remoteCandidates = []; + this.nominated = undefined; + this.nominating = false; + this.checkListDone = false; + this.checkListState = new PQueue(); + this.earlyChecks = []; + this.localCandidatesStart = false; + this.protocols = []; + this.queryConsentHandle = undefined; + this.promiseGatherCandidates = undefined; + } } export class CandidatePair { diff --git a/packages/webrtc/src/dataChannel.ts b/packages/webrtc/src/dataChannel.ts index 879782c8..e7a8a43b 100644 --- a/packages/webrtc/src/dataChannel.ts +++ b/packages/webrtc/src/dataChannel.ts @@ -9,7 +9,7 @@ const log = debug("werift:packages/webrtc/src/dataChannel.ts"); export class RTCDataChannel extends EventTarget { readonly stateChanged = new Event<[DCState]>(); - readonly message = new Event<[string | Buffer]>(); + readonly onMessage = new Event<[string | Buffer]>(); // todo impl readonly error = new Event<[Error]>(); readonly bufferedAmountLow = new Event(); diff --git a/packages/webrtc/src/peerConnection.ts b/packages/webrtc/src/peerConnection.ts index c586191c..5193a7c6 100644 --- a/packages/webrtc/src/peerConnection.ts +++ b/packages/webrtc/src/peerConnection.ts @@ -96,8 +96,6 @@ export class RTCPeerConnection extends EventTarget { this.transceivers[index] = t; } - candidatesSent = new Set(); - readonly iceGatheringStateChange = new Event<[IceGathererState]>(); readonly iceConnectionStateChange = new Event<[RTCIceConnectionState]>(); readonly signalingStateChange = new Event<[RTCSignalingState]>(); @@ -147,6 +145,12 @@ export class RTCPeerConnection extends EventTarget { return this.router.extIdUriMap; } + get dataChannels() { + const channels = + Object.values(this.sctpTransport?.dataChannels ?? {}) ?? []; + return channels; + } + constructor(config: Partial = {}) { super(); @@ -419,6 +423,7 @@ export class RTCPeerConnection extends EventTarget { }); const channel = new RTCDataChannel(this.sctpTransport, parameters); + this.sctpTransport.dataChannels[channel.id] = channel; return channel; } @@ -528,16 +533,6 @@ export class RTCPeerConnection extends EventTarget { candidate.foundation = "candidate:" + candidate.foundation; - // prevent ice candidates that have already been sent from being being resent - // when the connection is renegotiated during a later setLocalDescription call. - if (candidate.sdpMid) { - const candidateKey = `${candidate.foundation}:${candidate.sdpMid}`; - if (this.candidatesSent.has(candidateKey)) { - return; - } - this.candidatesSent.add(candidateKey); - } - this.onIceCandidate.execute(candidate.toJSON()); if (this.onicecandidate) { this.onicecandidate({ candidate: candidate.toJSON() }); @@ -595,8 +590,7 @@ export class RTCPeerConnection extends EventTarget { this.setSignalingState("stable"); } - // # assign MID - description.media.forEach((media, i) => { + for (const [i, media] of enumerate(description.media)) { const mid = media.rtp.muxId!; this.seenMid.add(mid); if (["audio", "video"].includes(media.kind)) { @@ -608,10 +602,13 @@ export class RTCPeerConnection extends EventTarget { if (media.kind === "application" && this.sctpTransport) { this.sctpTransport.mid = mid; } - }); + } const setupRole = (dtlsTransport: RTCDtlsTransport) => { const iceTransport = dtlsTransport.iceTransport; + if (iceTransport.connection.restarting) { + return; + } // # set ICE role if (description.type === "offer") { @@ -634,14 +631,17 @@ export class RTCPeerConnection extends EventTarget { } } }; - this.dtlsTransports.forEach((d) => setupRole(d)); + + for (const d of this.dtlsTransports) { + setupRole(d); + } // # configure direction if (["answer", "pranswer"].includes(description.type)) { - this.transceivers.forEach((t) => { + for (const t of this.transceivers) { const direction = andDirection(t.direction, t.offerDirection); t.setCurrentDirection(direction); - }); + } } // for trickle ice @@ -670,11 +670,12 @@ export class RTCPeerConnection extends EventTarget { ); } - description.media - .filter((m) => ["audio", "video"].includes(m.kind)) - .forEach((m, i) => { - addTransportDescription(m, this.transceivers[i].dtlsTransport); - }); + for (const [i, m] of enumerate( + description.media.filter((m) => ["audio", "video"].includes(m.kind)), + )) { + addTransportDescription(m, this.transceivers[i].dtlsTransport); + } + const sctpMedia = description.media.find((m) => m.kind === "application"); if (this.sctpTransport && sctpMedia) { addTransportDescription(sctpMedia, this.sctpTransport.dtlsTransport); @@ -751,6 +752,18 @@ export class RTCPeerConnection extends EventTarget { private async connect() { if (this.transportEstablished) { + await Promise.allSettled( + this.dtlsTransports.map(async (dtlsTransport) => { + const { iceTransport } = dtlsTransport; + if (iceTransport.connection.restarting) { + await iceTransport.start().catch((err) => { + log("iceTransport.start failed", err); + throw err; + }); + } + }), + ); + return; } log("start connect"); @@ -784,6 +797,13 @@ export class RTCPeerConnection extends EventTarget { this.setConnectionState("connected"); } + restartIce() { + for (const ice of this.iceTransports) { + ice.restart(); + } + this.needNegotiation(); + } + private getLocalRtpParams(transceiver: RTCRtpTransceiver): RTCRtpParameters { if (transceiver.mid == undefined) throw new Error("mid not assigned"); diff --git a/packages/webrtc/src/transport/ice.ts b/packages/webrtc/src/transport/ice.ts index 1113b47c..8b0572d4 100644 --- a/packages/webrtc/src/transport/ice.ts +++ b/packages/webrtc/src/transport/ice.ts @@ -14,8 +14,6 @@ export class RTCIceTransport { readonly onStateChange = new Event<[RTCIceConnectionState]>(); - private waitStart?: Event<[]>; - constructor(private gather: RTCIceGatherer) { this.connection = this.gather.connection; this.connection.stateChanged.subscribe((state) => { @@ -65,18 +63,21 @@ export class RTCIceTransport { this.connection.remotePassword !== remoteParameters.password) ) { log("restartIce", remoteParameters); - this.connection.resetNominatedPair(); + this.gather.restart(); } this.connection.setRemoteParams(remoteParameters); } async start() { - if (this.state === "closed") throw new Error("RTCIceTransport is closed"); - if (!this.connection.remotePassword || !this.connection.remoteUsername) + if (this.state === "closed") { + throw new Error("RTCIceTransport is closed"); + } + if (!this.connection.remotePassword || !this.connection.remoteUsername) { throw new Error("remoteParams missing"); + } - if (this.waitStart) await this.waitStart.asPromise(); - this.waitStart = new Event(); + if (this.gather.waitStart) await this.gather.waitStart.asPromise(); + this.gather.waitStart = new Event(); this.setState("checking"); @@ -87,7 +88,11 @@ export class RTCIceTransport { throw error; } - this.waitStart.complete(); + this.gather.waitStart.complete(); + } + + restart() { + this.gather.restart(); } async stop() { @@ -113,8 +118,10 @@ export const IceGathererStates = ["new", "gathering", "complete"] as const; export type IceGathererState = (typeof IceGathererStates)[number]; export class RTCIceGatherer { + candidatesSent = new Set(); onIceCandidate: (candidate: IceCandidate) => void = () => {}; gatheringState: IceGathererState = "new"; + waitStart?: Event<[]>; readonly onGatheringStateChange = new Event<[IceGathererState]>(); readonly connection: Connection; @@ -126,9 +133,18 @@ export class RTCIceGatherer { async gather() { if (this.gatheringState === "new") { this.setState("gathering"); - await this.connection.gatherCandidates((candidate) => - this.onIceCandidate(candidateFromIce(candidate)), - ); + await this.connection.gatherCandidates((candidate) => { + // prevent ice candidates that have already been sent from being being resent + // when the connection is renegotiated during a later setLocalDescription call. + + const candidateKey = candidate.foundation; + if (this.candidatesSent.has(candidateKey)) { + return; + } + + this.candidatesSent.add(candidateKey); + this.onIceCandidate(candidateFromIce(candidate)); + }); this.setState("complete"); } } @@ -146,7 +162,15 @@ export class RTCIceGatherer { return params; } - private setState(state: IceGathererState) { + restart() { + this.setState("new"); + this.candidatesSent.clear(); + this.waitStart = undefined; + + this.connection.restart(); + } + + setState(state: IceGathererState) { if (state !== this.gatheringState) { this.gatheringState = state; this.onGatheringStateChange.execute(state); diff --git a/packages/webrtc/src/transport/sctp.ts b/packages/webrtc/src/transport/sctp.ts index 3fc67162..e262f7a3 100644 --- a/packages/webrtc/src/transport/sctp.ts +++ b/packages/webrtc/src/transport/sctp.ts @@ -194,7 +194,7 @@ export class RTCSctpTransport { } })(); - channel.message.execute(msg); + channel.onMessage.execute(msg); channel.emit("message", { data: msg }); if (channel.onmessage) { channel.onmessage({ data: msg }); diff --git a/packages/webrtc/tests/datachannel/send.test.ts b/packages/webrtc/tests/datachannel/send.test.ts index 81cbc7dd..cbde685a 100644 --- a/packages/webrtc/tests/datachannel/send.test.ts +++ b/packages/webrtc/tests/datachannel/send.test.ts @@ -53,7 +53,7 @@ describe.each([{}, { negotiated: true, id: 0 }])( }; createDataChannelPair(options).then(([channel1, channel2]) => { - channel2.message.subscribe(onMessage); + channel2.onMessage.subscribe(onMessage); channel1.send(helloBuffer); channel1.send(unicodeString); diff --git a/packages/webrtc/tests/integrate/datachannel.test.ts b/packages/webrtc/tests/integrate/datachannel.test.ts index ff47e25a..91e18456 100644 --- a/packages/webrtc/tests/integrate/datachannel.test.ts +++ b/packages/webrtc/tests/integrate/datachannel.test.ts @@ -10,9 +10,9 @@ describe("datachannel", () => { pc2.onDataChannel.subscribe((channel) => { Promise.all([ - channel.message.watch((v) => v === "1"), - channel.message.watch((v) => v === "2"), - channel.message.watch((v) => v === "3"), + channel.onMessage.watch((v) => v === "1"), + channel.onMessage.watch((v) => v === "2"), + channel.onMessage.watch((v) => v === "3"), ]).then(async () => { await pc1.close(); await pc2.close(); diff --git a/packages/webrtc/tests/integrate/peerConnection.test.ts b/packages/webrtc/tests/integrate/peerConnection.test.ts index 7c684f9a..8fe0701b 100644 --- a/packages/webrtc/tests/integrate/peerConnection.test.ts +++ b/packages/webrtc/tests/integrate/peerConnection.test.ts @@ -17,7 +17,7 @@ describe("peerConnection", () => { const pc2 = new RTCPeerConnection({}); pc2.onDataChannel.subscribe((channel) => { - channel.message.subscribe((data) => { + channel.onMessage.subscribe((data) => { expect(data.toString()).toBe("hello"); done(); }); @@ -94,7 +94,7 @@ describe("peerConnection", () => { const dc = pcOffer.createDataChannel("chat"); pcAnswer.onDataChannel.subscribe((channel) => { - channel.message.subscribe(async (data) => { + channel.onMessage.subscribe(async (data) => { expect(data.toString()).toBe("hello"); channel.close(); await Promise.all([ @@ -189,7 +189,7 @@ describe("initial config", () => { }; callee.onDataChannel.subscribe((channel) => { - channel.message.once(() => { + channel.onMessage.once(() => { caller.close(); callee.close(); done(); @@ -222,7 +222,7 @@ describe("initial config", () => { }; callee.onDataChannel.subscribe((channel) => { - channel.message.once(() => { + channel.onMessage.once(() => { caller.close(); callee.close(); done(); @@ -255,7 +255,7 @@ describe("initial config", () => { }; callee.onDataChannel.subscribe((channel) => { - channel.message.once(() => { + channel.onMessage.once(() => { caller.close(); callee.close(); done(); diff --git a/packages/webrtc/tests/integrate/restartIce.test.ts b/packages/webrtc/tests/integrate/restartIce.test.ts new file mode 100644 index 00000000..e450a4b7 --- /dev/null +++ b/packages/webrtc/tests/integrate/restartIce.test.ts @@ -0,0 +1,71 @@ +import { RTCPeerConnection } from "../../src"; + +describe("restartIce", () => { + it( + "test", + () => + new Promise(async (done) => { + const o = new RTCPeerConnection({ + iceServers: [{ urls: "stun:stun.l.google.com:19302" }], + }); + const a = new RTCPeerConnection({ + iceServers: [{ urls: "stun:stun.l.google.com:19302" }], + }); + + o.onIceCandidate.subscribe((candidate) => { + a.addIceCandidate(candidate); + }); + a.onIceCandidate.subscribe((candidate) => { + o.addIceCandidate(candidate); + }); + + o.createDataChannel("dc"); + + const offer = await o.createOffer(); + o.setLocalDescription(offer); + await a.setRemoteDescription(offer); + + const answer = await a.createAnswer(); + a.setLocalDescription(answer); + await o.setRemoteDescription(answer); + + if (a.dataChannels.length === 0) { + await a.onDataChannel.asPromise(); + } + + setTimeout(() => { + o.dataChannels[0].send(Buffer.from("o")); + }, 10); + { + const res = await a.dataChannels[0].onMessage.asPromise(); + expect(res.toString()).toBe("o"); + } + + o.restartIce(); + + { + const offer = await o.createOffer(); + o.setLocalDescription(offer); + await a.setRemoteDescription(offer); + + const answer = await a.createAnswer(); + a.setLocalDescription(answer); + await o.setRemoteDescription(answer); + } + + setTimeout(() => { + o.dataChannels[0].send(Buffer.from("o")); + }, 10); + { + const res = await a.dataChannels[0].onMessage.asPromise(); + expect(res.toString()).toBe("o"); + } + + o.close(); + a.close(); + + done(); + }), + 600_000, + ); +}); diff --git a/packages/webrtc/tests/integrate/trickle.test.ts b/packages/webrtc/tests/integrate/trickle.test.ts index 9a1db0fa..97aaf199 100644 --- a/packages/webrtc/tests/integrate/trickle.test.ts +++ b/packages/webrtc/tests/integrate/trickle.test.ts @@ -14,7 +14,7 @@ describe("trickle", () => { iceServers: [{ urls: "stun:stun.l.google.com:19302" }], }); pcAnswer.onDataChannel.subscribe((dc) => { - dc.message.subscribe((data) => { + dc.onMessage.subscribe((data) => { expect(data.toString()).toBe("hello"); done(); }); @@ -53,7 +53,7 @@ describe("trickle", () => { iceServers: [{ urls: "stun:stun.l.google.com:19302" }], }); pcAnswer.onDataChannel.subscribe((dc) => { - dc.message.subscribe((data) => { + dc.onMessage.subscribe((data) => { expect(data.toString()).toBe("hello"); done(); }); diff --git a/packages/webrtc/tests/transport/sctp.test.ts b/packages/webrtc/tests/transport/sctp.test.ts index 81ae8bb2..bfe8a23e 100644 --- a/packages/webrtc/tests/transport/sctp.test.ts +++ b/packages/webrtc/tests/transport/sctp.test.ts @@ -42,7 +42,7 @@ describe("RTCSctpTransportTest", () => { const serverChannels = trackChannels(server); serverChannels.event.subscribe((channel) => { channel.send(Buffer.from("ping")); - channel.message.subscribe((data) => { + channel.onMessage.subscribe((data) => { expect(data.toString()).toBe("pong"); done(); }); @@ -52,7 +52,7 @@ describe("RTCSctpTransportTest", () => { client, new RTCDataChannelParameters({ label: "chat", id: 1 }), ); - channel.message.subscribe((data) => { + channel.onMessage.subscribe((data) => { expect(data.toString()).toBe("ping"); channel.send(Buffer.from("pong")); }); diff --git a/packages/webrtc/tests/utils.ts b/packages/webrtc/tests/utils.ts index 25cc5da8..fe3a899e 100644 --- a/packages/webrtc/tests/utils.ts +++ b/packages/webrtc/tests/utils.ts @@ -116,7 +116,7 @@ export function awaitMessage(channel: RTCDataChannel) { r(e.data); }), ), - channel.message.asPromise(), + channel.onMessage.asPromise(), ]).then(([msg]) => resolve(msg)); channel.error.once(reject);