Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
shinyoshiaki committed Jan 1, 2024
1 parent a696db4 commit 0e2947f
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 91 deletions.
86 changes: 46 additions & 40 deletions packages/ice/src/ice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { InterfaceAddresses } from "../../common/src/network";
import { Candidate, candidateFoundation, candidatePriority } from "./candidate";
import { DnsLookup } from "./dns/lookup";
import { TransactionError } from "./exceptions";
import { Future, PQueue, difference, future, randomString } from "./helper";
import { Future, PQueue, future, randomString } from "./helper";
import { classes, methods } from "./stun/const";
import { Message, parseMessage } from "./stun/message";
import { StunProtocol } from "./stun/protocol";
Expand All @@ -24,28 +24,29 @@ import { getHostAddresses, normalizeFamilyNodeV18 } from "./utils";
const log = debug("werift-ice : packages/ice/src/ice.ts : log");

export class Connection {
readonly stunServer?: Address;
readonly turnServer?: Address;
readonly useIpv4: boolean;
readonly useIpv6: boolean;
readonly options: IceOptions;

iceControlling: boolean;
localUserName = randomString(4);
localPassword = randomString(22);
remotePassword: string = "";
remoteUsername: string = "";
remoteIsLite = false;
checkList: CandidatePair[] = [];
localCandidates: Candidate[] = [];
stunServer?: Address;
turnServer?: Address;
useIpv4: boolean;
useIpv6: boolean;
options: IceOptions;
remoteCandidatesEnd = false;
_localCandidatesEnd = false;
_tieBreaker: bigint = BigInt(new Uint64BE(randomBytes(64)).toString());
state: IceState = "new";
dnsLookup?: DnsLookup;
restarted = false;

readonly onData = new Event<[Buffer, number]>();
readonly stateChanged = new Event<[IceState]>();
restarting = false;

/**@private */
_localCandidatesEnd = false;
/**@private */
_tieBreaker: bigint = BigInt(new Uint64BE(randomBytes(64)).toString());
private dnsLookup?: DnsLookup;
private _remoteCandidates: Candidate[] = [];
// P2P接続完了したソケット
private nominated?: CandidatePair;
Expand All @@ -58,7 +59,11 @@ export class Connection {
private queryConsentHandle?: Future;
private promiseGatherCandidates?: Event<[]>;

constructor(public iceControlling: boolean, options?: Partial<IceOptions>) {
readonly onData = new Event<[Buffer, number]>();
readonly stateChanged = new Event<[IceState]>();

constructor(iceControlling: boolean, options?: Partial<IceOptions>) {
this.iceControlling = iceControlling;
this.options = {
...defaultOptions,
...options,
Expand Down Expand Up @@ -308,6 +313,7 @@ export class Connection {
this.queryConsentHandle = future(this.queryConsent());

this.setState("connected");
this.restarting = false;
}

private unfreezeInitial() {
Expand Down Expand Up @@ -511,33 +517,14 @@ export class Connection {
}

send = async (data: Buffer) => {
// """
// Send a datagram on the first component.

// If the connection is not established, a `ConnectionError` is raised.

// :param data: The data to be sent.
// """
await this.sendTo(data);
};

private async sendTo(data: Buffer) {
// """
// Send a datagram on the specified component.

// If the connection is not established, a `ConnectionError` is raised.

// :param data: The data to be sent.
// :param component: The component on which to send the data.
// """
const activePair = this.nominated;
if (activePair) {
await activePair.protocol.sendData(data, activePair.remoteAddr);
} else {
// log("Cannot send data, ice not connected");
return;
}
}
};

getDefaultCandidate() {
const candidates = this.localCandidates.sort(
Expand Down Expand Up @@ -675,12 +662,6 @@ export class Connection {
this.sortCheckList();
}

resetNominatedPair() {
log("resetNominatedPair");
this.nominated = undefined;
this.nominating = false;
}

private checkComplete(pair: CandidatePair) {
pair.handle = undefined;
if (pair.state === CandidatePairState.SUCCEEDED) {
Expand Down Expand Up @@ -963,6 +944,31 @@ export class Connection {
log("sendStun error", e);
});
}

restart() {
this.restarting = true;

this.localUserName = randomString(4);
this.localPassword = randomString(22);
this.remotePassword = "";
this.remoteUsername = "";
this.remoteIsLite = false;
this.checkList = [];
this.localCandidates = [];
this.remoteCandidatesEnd = false;
this.state = "new";
this._localCandidatesEnd = false;
this._remoteCandidates = [];
this.nominated = undefined;
this.nominating = false;
this.checkListDone = false;
this.checkListState = new PQueue<number>();
this.earlyChecks = [];
this.localCandidatesStart = false;
this.protocols = [];
this.queryConsentHandle = undefined;
this.promiseGatherCandidates = undefined;
}
}

export class CandidatePair {
Expand Down
2 changes: 1 addition & 1 deletion packages/webrtc/src/dataChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const log = debug("werift:packages/webrtc/src/dataChannel.ts");

export class RTCDataChannel extends EventTarget {
readonly stateChanged = new Event<[DCState]>();
readonly message = new Event<[string | Buffer]>();
readonly onMessage = new Event<[string | Buffer]>();
// todo impl
readonly error = new Event<[Error]>();
readonly bufferedAmountLow = new Event();
Expand Down
66 changes: 43 additions & 23 deletions packages/webrtc/src/peerConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ export class RTCPeerConnection extends EventTarget {
this.transceivers[index] = t;
}

candidatesSent = new Set<string>();

readonly iceGatheringStateChange = new Event<[IceGathererState]>();
readonly iceConnectionStateChange = new Event<[RTCIceConnectionState]>();
readonly signalingStateChange = new Event<[RTCSignalingState]>();
Expand Down Expand Up @@ -147,6 +145,12 @@ export class RTCPeerConnection extends EventTarget {
return this.router.extIdUriMap;
}

get dataChannels() {
const channels =
Object.values(this.sctpTransport?.dataChannels ?? {}) ?? [];
return channels;
}

constructor(config: Partial<PeerConfig> = {}) {
super();

Expand Down Expand Up @@ -419,6 +423,7 @@ export class RTCPeerConnection extends EventTarget {
});

const channel = new RTCDataChannel(this.sctpTransport, parameters);
this.sctpTransport.dataChannels[channel.id] = channel;
return channel;
}

Expand Down Expand Up @@ -528,16 +533,6 @@ export class RTCPeerConnection extends EventTarget {

candidate.foundation = "candidate:" + candidate.foundation;

// prevent ice candidates that have already been sent from being being resent
// when the connection is renegotiated during a later setLocalDescription call.
if (candidate.sdpMid) {
const candidateKey = `${candidate.foundation}:${candidate.sdpMid}`;
if (this.candidatesSent.has(candidateKey)) {
return;
}
this.candidatesSent.add(candidateKey);
}

this.onIceCandidate.execute(candidate.toJSON());
if (this.onicecandidate) {
this.onicecandidate({ candidate: candidate.toJSON() });
Expand Down Expand Up @@ -595,8 +590,7 @@ export class RTCPeerConnection extends EventTarget {
this.setSignalingState("stable");
}

// # assign MID
description.media.forEach((media, i) => {
for (const [i, media] of enumerate(description.media)) {
const mid = media.rtp.muxId!;
this.seenMid.add(mid);
if (["audio", "video"].includes(media.kind)) {
Expand All @@ -608,10 +602,13 @@ export class RTCPeerConnection extends EventTarget {
if (media.kind === "application" && this.sctpTransport) {
this.sctpTransport.mid = mid;
}
});
}

const setupRole = (dtlsTransport: RTCDtlsTransport) => {
const iceTransport = dtlsTransport.iceTransport;
if (iceTransport.connection.restarting) {
return;
}

// # set ICE role
if (description.type === "offer") {
Expand All @@ -634,14 +631,17 @@ export class RTCPeerConnection extends EventTarget {
}
}
};
this.dtlsTransports.forEach((d) => setupRole(d));

for (const d of this.dtlsTransports) {
setupRole(d);
}

// # configure direction
if (["answer", "pranswer"].includes(description.type)) {
this.transceivers.forEach((t) => {
for (const t of this.transceivers) {
const direction = andDirection(t.direction, t.offerDirection);
t.setCurrentDirection(direction);
});
}
}

// for trickle ice
Expand Down Expand Up @@ -670,11 +670,12 @@ export class RTCPeerConnection extends EventTarget {
);
}

description.media
.filter((m) => ["audio", "video"].includes(m.kind))
.forEach((m, i) => {
addTransportDescription(m, this.transceivers[i].dtlsTransport);
});
for (const [i, m] of enumerate(
description.media.filter((m) => ["audio", "video"].includes(m.kind)),
)) {
addTransportDescription(m, this.transceivers[i].dtlsTransport);
}

const sctpMedia = description.media.find((m) => m.kind === "application");
if (this.sctpTransport && sctpMedia) {
addTransportDescription(sctpMedia, this.sctpTransport.dtlsTransport);
Expand Down Expand Up @@ -751,6 +752,18 @@ export class RTCPeerConnection extends EventTarget {

private async connect() {
if (this.transportEstablished) {
await Promise.allSettled(
this.dtlsTransports.map(async (dtlsTransport) => {
const { iceTransport } = dtlsTransport;
if (iceTransport.connection.restarting) {
await iceTransport.start().catch((err) => {
log("iceTransport.start failed", err);
throw err;
});
}
}),
);

return;
}
log("start connect");
Expand Down Expand Up @@ -784,6 +797,13 @@ export class RTCPeerConnection extends EventTarget {
this.setConnectionState("connected");
}

restartIce() {
for (const ice of this.iceTransports) {
ice.restart();
}
this.needNegotiation();
}

private getLocalRtpParams(transceiver: RTCRtpTransceiver): RTCRtpParameters {
if (transceiver.mid == undefined) throw new Error("mid not assigned");

Expand Down
Loading

0 comments on commit 0e2947f

Please sign in to comment.