From f46ecf970c658ae34b8d5fc3e73369c31ac79e90 Mon Sep 17 00:00:00 2001 From: Robin Townsend Date: Mon, 21 Nov 2022 11:58:27 -0500 Subject: [PATCH 1/5] Refactor GroupCall participant management This refactoring brings a number of improvements to GroupCall, which I've unfortunately had to combine into a single commit due to coupling: - Moves the expiration timestamp field on call membership state to be per-device - Makes the participants of a group call visible without having to enter the call yourself - Enables users to join group calls from multiple devices - Identifies active speakers by their call feed, rather than just their user ID - Plays nicely with clients that can be in multiple calls in a room at once - Fixes a memory leak caused by the call retry loop never stopping - Changes GroupCall to update its state synchronously, and write back to room state asynchronously - This was already sort of halfway being done, but now we'd be committing to it - Generally improves the robustness of the state machine - It means that group call joins will appear instant, in a sense For many reasons, this is a breaking change. --- spec/test-utils/webrtc.ts | 1 + spec/unit/webrtc/call.spec.ts | 33 +- spec/unit/webrtc/groupCall.spec.ts | 207 ++-- .../unit/webrtc/groupCallEventHandler.spec.ts | 53 +- src/utils.ts | 13 + src/webrtc/call.ts | 9 + src/webrtc/callEventHandler.ts | 4 +- src/webrtc/callFeed.ts | 7 +- src/webrtc/groupCall.ts | 1044 +++++++++-------- src/webrtc/groupCallEventHandler.ts | 8 - 10 files changed, 723 insertions(+), 656 deletions(-) diff --git a/spec/test-utils/webrtc.ts b/spec/test-utils/webrtc.ts index efee15d1239..8d9423796a3 100644 --- a/spec/test-utils/webrtc.ts +++ b/spec/test-utils/webrtc.ts @@ -431,6 +431,7 @@ export class MockCallMatrixClient extends TypedEventEmitter { return { - getMember: (userId) => { + getMember: (userId: string) => { if (userId === opponentMember.userId) { return opponentMember; } @@ -521,10 +524,12 @@ describe('Call', function() { it("should correctly generate local SDPStreamMetadata", async () => { const callPromise = call.placeCallWithCallFeeds([new CallFeed({ client: client.client, - // @ts-ignore Mock - stream: new MockMediaStream("local_stream1", [new MockMediaStreamTrack("track_id", "audio")]), + stream: new MockMediaStream( + "local_stream1", [new MockMediaStreamTrack("track_id", "audio")], + ) as unknown as MediaStream, roomId: call.roomId, userId: client.getUserId(), + deviceId: undefined, purpose: SDPStreamMetadataPurpose.Usermedia, audioMuted: false, videoMuted: false, @@ -534,8 +539,10 @@ describe('Call', function() { call.getOpponentMember = jest.fn().mockReturnValue({ userId: "@bob:bar.uk" }); (call as any).pushNewLocalFeed( - new MockMediaStream("local_stream2", [new MockMediaStreamTrack("track_id", "video")]), - SDPStreamMetadataPurpose.Screenshare, "feed_id2", + new MockMediaStream( + "local_stream2", [new MockMediaStreamTrack("track_id", "video")], + ) as unknown as MediaStream, + SDPStreamMetadataPurpose.Screenshare, ); await call.setMicrophoneMuted(true); @@ -563,20 +570,18 @@ describe('Call', function() { new CallFeed({ client: client.client, userId: client.getUserId(), - // @ts-ignore Mock - stream: localUsermediaStream, + deviceId: undefined, + stream: localUsermediaStream as unknown as MediaStream, purpose: SDPStreamMetadataPurpose.Usermedia, - id: "local_usermedia_feed_id", audioMuted: false, videoMuted: false, }), new CallFeed({ client: client.client, userId: client.getUserId(), - // @ts-ignore Mock - stream: localScreensharingStream, + deviceId: undefined, + stream: localScreensharingStream as unknown as MediaStream, purpose: SDPStreamMetadataPurpose.Screenshare, - id: "local_screensharing_feed_id", audioMuted: false, videoMuted: false, }), diff --git a/spec/unit/webrtc/groupCall.spec.ts b/spec/unit/webrtc/groupCall.spec.ts index fa84490c146..20a40c1e4ee 100644 --- a/spec/unit/webrtc/groupCall.spec.ts +++ b/spec/unit/webrtc/groupCall.spec.ts @@ -23,6 +23,7 @@ import { Room, RoomMember, } from '../../../src'; +import { RoomStateEvent } from "../../../src/models/room-state"; import { GroupCall, GroupCallEvent, GroupCallState } from "../../../src/webrtc/groupCall"; import { MatrixClient } from "../../../src/client"; import { @@ -53,18 +54,19 @@ const FAKE_USER_ID_3 = "@charlie:test.dummy"; const FAKE_STATE_EVENTS = [ { getContent: () => ({ - ["m.expires_ts"]: Date.now() + ONE_HOUR, + "m.calls": [], }), getStateKey: () => FAKE_USER_ID_1, getRoomId: () => FAKE_ROOM_ID, }, { getContent: () => ({ - ["m.expires_ts"]: Date.now() + ONE_HOUR, - ["m.calls"]: [{ - ["m.call_id"]: FAKE_CONF_ID, - ["m.devices"]: [{ + "m.calls": [{ + "m.call_id": FAKE_CONF_ID, + "m.devices": [{ device_id: FAKE_DEVICE_ID_2, + session_id: FAKE_SESSION_ID_2, + expires_ts: Date.now() + ONE_HOUR, feeds: [], }], }], @@ -73,11 +75,13 @@ const FAKE_STATE_EVENTS = [ getRoomId: () => FAKE_ROOM_ID, }, { getContent: () => ({ - ["m.expires_ts"]: Date.now() + ONE_HOUR, - ["m.calls"]: [{ - ["m.call_id"]: FAKE_CONF_ID, - ["m.devices"]: [{ + "m.expires_ts": Date.now() + ONE_HOUR, + "m.calls": [{ + "m.call_id": FAKE_CONF_ID, + "m.devices": [{ device_id: "user3_device", + session_id: "user3_session", + expires_ts: Date.now() + ONE_HOUR, feeds: [], }], }], @@ -111,6 +115,8 @@ class MockCall { public state = CallState.Ringing; public opponentUserId = FAKE_USER_ID_1; + public opponentDeviceId = FAKE_DEVICE_ID_1; + public opponentMember = { userId: this.opponentUserId }; public callId = "1"; public localUsermediaFeed = { setAudioVideoMuted: jest.fn(), @@ -129,9 +135,11 @@ class MockCall { public removeListener = jest.fn(); public getOpponentMember(): Partial { - return { - userId: this.opponentUserId, - }; + return this.opponentMember; + } + + public getOpponentDeviceId(): string { + return this.opponentDeviceId; } public typed(): MatrixCall { return this as unknown as MatrixCall; } @@ -326,8 +334,8 @@ describe('Group Call', function() { describe("call feeds changing", () => { let call: MockCall; - const currentFeed = new MockCallFeed(FAKE_USER_ID_1, new MockMediaStream("current")); - const newFeed = new MockCallFeed(FAKE_USER_ID_1, new MockMediaStream("new")); + const currentFeed = new MockCallFeed(FAKE_USER_ID_1, FAKE_DEVICE_ID_1, new MockMediaStream("current")); + const newFeed = new MockCallFeed(FAKE_USER_ID_1, FAKE_DEVICE_ID_1, new MockMediaStream("new")); beforeEach(async () => { jest.spyOn(currentFeed, "dispose"); @@ -358,7 +366,7 @@ describe('Group Call', function() { }); it("replaces usermedia feed", async () => { - groupCall.userMediaFeeds = [currentFeed.typed()]; + groupCall.userMediaFeeds.push(currentFeed.typed()); call.remoteUsermediaFeed = newFeed.typed(); // @ts-ignore Mock @@ -368,7 +376,7 @@ describe('Group Call', function() { }); it("removes usermedia feed", async () => { - groupCall.userMediaFeeds = [currentFeed.typed()]; + groupCall.userMediaFeeds.push(currentFeed.typed()); // @ts-ignore Mock groupCall.onCallFeedsChanged(call); @@ -387,7 +395,7 @@ describe('Group Call', function() { }); it("replaces screenshare feed", async () => { - groupCall.screenshareFeeds = [currentFeed.typed()]; + groupCall.screenshareFeeds.push(currentFeed.typed()); call.remoteScreensharingFeed = newFeed.typed(); // @ts-ignore Mock @@ -397,7 +405,7 @@ describe('Group Call', function() { }); it("removes screenshare feed", async () => { - groupCall.screenshareFeeds = [currentFeed.typed()]; + groupCall.screenshareFeeds.push(currentFeed.typed()); // @ts-ignore Mock groupCall.onCallFeedsChanged(call); @@ -408,7 +416,7 @@ describe('Group Call', function() { describe("feed replacing", () => { it("replaces usermedia feed", async () => { - groupCall.userMediaFeeds = [currentFeed.typed()]; + groupCall.userMediaFeeds.push(currentFeed.typed()); // @ts-ignore Mock groupCall.replaceUserMediaFeed(currentFeed, newFeed); @@ -422,7 +430,7 @@ describe('Group Call', function() { }); it("replaces screenshare feed", async () => { - groupCall.screenshareFeeds = [currentFeed.typed()]; + groupCall.screenshareFeeds.push(currentFeed.typed()); // @ts-ignore Mock groupCall.replaceScreenshareFeed(currentFeed, newFeed); @@ -489,7 +497,10 @@ describe('Group Call', function() { it("sends metadata updates before unmuting in PTT mode", async () => { const mockCall = new MockCall(FAKE_ROOM_ID, groupCall.groupCallId); - groupCall.calls.push(mockCall as unknown as MatrixCall); + groupCall.calls.set( + mockCall.getOpponentMember() as RoomMember, + new Map([[mockCall.getOpponentDeviceId(), mockCall.typed()]]), + ); let metadataUpdateResolve: () => void; const metadataUpdatePromise = new Promise(resolve => { @@ -511,7 +522,10 @@ describe('Group Call', function() { it("sends metadata updates after muting in PTT mode", async () => { const mockCall = new MockCall(FAKE_ROOM_ID, groupCall.groupCallId); - groupCall.calls.push(mockCall as unknown as MatrixCall); + groupCall.calls.set( + mockCall.getOpponentMember() as RoomMember, + new Map([[mockCall.getOpponentDeviceId(), mockCall.typed()]]), + ); // the call starts muted, so unmute to get in the right state to test await groupCall.setMicrophoneMuted(false); @@ -560,7 +574,7 @@ describe('Group Call', function() { if (eventType === EventType.GroupCallMemberPrefix) { const fakeEvent = { getContent: () => content, - getRoomId: () => FAKE_ROOM_ID, + getRoomId: () => roomId, getStateKey: () => statekey, } as unknown as MatrixEvent; @@ -574,8 +588,8 @@ describe('Group Call', function() { // just add it once. subMap.set(statekey, fakeEvent); - groupCall1.onMemberStateChanged(fakeEvent); - groupCall2.onMemberStateChanged(fakeEvent); + client1Room.currentState.emit(RoomStateEvent.Update, client1Room.currentState); + client2Room.currentState.emit(RoomStateEvent.Update, client2Room.currentState); } return Promise.resolve({ "event_id": "foo" }); }; @@ -584,9 +598,17 @@ describe('Group Call', function() { client2.sendStateEvent.mockImplementation(fakeSendStateEvents); const client1Room = new Room(FAKE_ROOM_ID, client1.typed(), FAKE_USER_ID_1); - const client2Room = new Room(FAKE_ROOM_ID, client2.typed(), FAKE_USER_ID_2); + client1Room.currentState.members[FAKE_USER_ID_1] = client2Room.currentState.members[FAKE_USER_ID_1] = { + userId: FAKE_USER_ID_1, + membership: "join", + } as unknown as RoomMember; + client1Room.currentState.members[FAKE_USER_ID_2] = client2Room.currentState.members[FAKE_USER_ID_2] = { + userId: FAKE_USER_ID_2, + membership: "join", + } as unknown as RoomMember; + groupCall1 = new GroupCall( client1.typed(), client1Room, GroupCallType.Video, false, GroupCallIntent.Prompt, FAKE_CONF_ID, ); @@ -594,20 +616,6 @@ describe('Group Call', function() { groupCall2 = new GroupCall( client2.typed(), client2Room, GroupCallType.Video, false, GroupCallIntent.Prompt, FAKE_CONF_ID, ); - - client1Room.currentState.members[FAKE_USER_ID_1] = { - userId: FAKE_USER_ID_1, - } as unknown as RoomMember; - client1Room.currentState.members[FAKE_USER_ID_2] = { - userId: FAKE_USER_ID_2, - } as unknown as RoomMember; - - client2Room.currentState.members[FAKE_USER_ID_1] = { - userId: FAKE_USER_ID_1, - } as unknown as RoomMember; - client2Room.currentState.members[FAKE_USER_ID_2] = { - userId: FAKE_USER_ID_2, - } as unknown as RoomMember; }); afterEach(function() { @@ -672,8 +680,10 @@ describe('Group Call', function() { expect(client1.sendToDevice).toHaveBeenCalled(); - const oldCall = groupCall1.getCallByUserId(client2.userId); - oldCall!.emit(CallEvent.Hangup, oldCall!); + const oldCall = groupCall1.calls.get( + groupCall1.room.getMember(client2.userId)!, + )!.get(client2.deviceId)!; + oldCall.emit(CallEvent.Hangup, oldCall!); client1.sendToDevice.mockClear(); @@ -691,9 +701,11 @@ describe('Group Call', function() { // to even be created... let newCall: MatrixCall | undefined; while ( - (newCall = groupCall1.getCallByUserId(client2.userId)) === undefined || - newCall.peerConn === undefined || - newCall.callId == oldCall!.callId + (newCall = groupCall1.calls.get( + groupCall1.room.getMember(client2.userId)!, + )?.get(client2.deviceId)) === undefined + || newCall.peerConn === undefined + || newCall.callId == oldCall.callId ) { await flushPromises(); } @@ -733,7 +745,9 @@ describe('Group Call', function() { groupCall1.setMicrophoneMuted(false); groupCall1.setLocalVideoMuted(false); - const call = groupCall1.getCallByUserId(client2.userId)!; + const call = groupCall1.calls.get( + groupCall1.room.getMember(client2.userId)!, + )!.get(client2.deviceId)!; call.isMicrophoneMuted = jest.fn().mockReturnValue(true); call.setMicrophoneMuted = jest.fn(); call.isLocalVideoMuted = jest.fn().mockReturnValue(true); @@ -765,7 +779,14 @@ describe('Group Call', function() { ? FAKE_STATE_EVENTS.find(e => e.getStateKey() === userId) || FAKE_STATE_EVENTS : { getContent: () => ([]) }; }); - room.getMember = jest.fn().mockImplementation((userId) => ({ userId })); + room.currentState.members[FAKE_USER_ID_1] = { + userId: FAKE_USER_ID_1, + membership: "join", + } as unknown as RoomMember; + room.currentState.members[FAKE_USER_ID_2] = { + userId: FAKE_USER_ID_2, + membership: "join", + } as unknown as RoomMember; }); describe("local muting", () => { @@ -773,17 +794,13 @@ describe('Group Call', function() { const groupCall = await createAndEnterGroupCall(mockClient, room); groupCall.localCallFeed!.setAudioVideoMuted = jest.fn(); - const setAVMutedArray = groupCall.calls.map(call => { - call.localUsermediaFeed!.setAudioVideoMuted = jest.fn(); - return call.localUsermediaFeed!.setAudioVideoMuted; - }); - const tracksArray = groupCall.calls.reduce((acc: MediaStreamTrack[], call: MatrixCall) => { - acc.push(...call.localUsermediaStream!.getAudioTracks()); - return acc; - }, []); - const sendMetadataUpdateArray = groupCall.calls.map(call => { - call.sendMetadataUpdate = jest.fn(); - return call.sendMetadataUpdate; + const setAVMutedArray: ((audioMuted: boolean | null, videoMuted: boolean | null) => void)[] = []; + const tracksArray: MediaStreamTrack[] = []; + const sendMetadataUpdateArray: (() => Promise)[] = []; + groupCall.forEachCall(call => { + setAVMutedArray.push(call.localUsermediaFeed!.setAudioVideoMuted = jest.fn()); + tracksArray.push(...call.localUsermediaStream!.getAudioTracks()); + sendMetadataUpdateArray.push(call.sendMetadataUpdate = jest.fn()); }); await groupCall.setMicrophoneMuted(true); @@ -801,18 +818,14 @@ describe('Group Call', function() { const groupCall = await createAndEnterGroupCall(mockClient, room); groupCall.localCallFeed!.setAudioVideoMuted = jest.fn(); - const setAVMutedArray = groupCall.calls.map(call => { - call.localUsermediaFeed!.setAudioVideoMuted = jest.fn(); + const setAVMutedArray: ((audioMuted: boolean | null, videoMuted: boolean | null) => void)[] = []; + const tracksArray: MediaStreamTrack[] = []; + const sendMetadataUpdateArray: (() => Promise)[] = []; + groupCall.forEachCall(call => { call.localUsermediaFeed!.isVideoMuted = jest.fn().mockReturnValue(true); - return call.localUsermediaFeed!.setAudioVideoMuted; - }); - const tracksArray = groupCall.calls.reduce((acc: MediaStreamTrack[], call: MatrixCall) => { - acc.push(...call.localUsermediaStream!.getVideoTracks()); - return acc; - }, []); - const sendMetadataUpdateArray = groupCall.calls.map(call => { - call.sendMetadataUpdate = jest.fn(); - return call.sendMetadataUpdate; + setAVMutedArray.push(call.localUsermediaFeed!.setAudioVideoMuted = jest.fn()); + tracksArray.push(...call.localUsermediaStream!.getVideoTracks()); + sendMetadataUpdateArray.push(call.sendMetadataUpdate = jest.fn()); }); await groupCall.setLocalVideoMuted(true); @@ -847,7 +860,7 @@ describe('Group Call', function() { // It takes a bit of time for the calls to get created await sleep(10); - const call = groupCall.calls[0]; + const call = groupCall.calls.get(groupCall.room.getMember(FAKE_USER_ID_2)!)!.get(FAKE_DEVICE_ID_2)!; call.getOpponentMember = () => ({ userId: call.invitee }) as RoomMember; // @ts-ignore Mock call.pushRemoteFeed(new MockMediaStream("stream", [ @@ -856,7 +869,7 @@ describe('Group Call', function() { ])); call.onSDPStreamMetadataChangedReceived(metadataEvent); - const feed = groupCall.getUserMediaFeedByUserId(call.invitee!); + const feed = groupCall.getUserMediaFeed(call.invitee!, call.getOpponentDeviceId()!); expect(feed!.isAudioMuted()).toBe(true); expect(feed!.isVideoMuted()).toBe(false); @@ -870,7 +883,7 @@ describe('Group Call', function() { // It takes a bit of time for the calls to get created await sleep(10); - const call = groupCall.calls[0]; + const call = groupCall.calls.get(groupCall.room.getMember(FAKE_USER_ID_2)!)!.get(FAKE_DEVICE_ID_2)!; call.getOpponentMember = () => ({ userId: call.invitee }) as RoomMember; // @ts-ignore Mock call.pushRemoteFeed(new MockMediaStream("stream", [ @@ -879,7 +892,7 @@ describe('Group Call', function() { ])); call.onSDPStreamMetadataChangedReceived(metadataEvent); - const feed = groupCall.getUserMediaFeedByUserId(call.invitee!); + const feed = groupCall.getUserMediaFeed(call.invitee!, call.getOpponentDeviceId()!); expect(feed!.isAudioMuted()).toBe(false); expect(feed!.isVideoMuted()).toBe(true); @@ -945,12 +958,16 @@ describe('Group Call', function() { expect(mockCall.reject).not.toHaveBeenCalled(); expect(mockCall.answerWithCallFeeds).toHaveBeenCalled(); - expect(groupCall.calls).toEqual([mockCall]); + expect(groupCall.calls).toEqual(new Map([[ + groupCall.room.getMember(FAKE_USER_ID_1)!, + new Map([[FAKE_DEVICE_ID_1, mockCall]]), + ]])); }); it("replaces calls if it already has one with the same user", async () => { const oldMockCall = new MockCall(room.roomId, groupCall.groupCallId); const newMockCall = new MockCall(room.roomId, groupCall.groupCallId); + newMockCall.opponentMember = oldMockCall.opponentMember; // Ensure referential equality newMockCall.callId = "not " + oldMockCall.callId; mockClient.emit(CallEventHandlerEvent.Incoming, oldMockCall as unknown as MatrixCall); @@ -958,7 +975,10 @@ describe('Group Call', function() { expect(oldMockCall.hangup).toHaveBeenCalled(); expect(newMockCall.answerWithCallFeeds).toHaveBeenCalled(); - expect(groupCall.calls).toEqual([newMockCall]); + expect(groupCall.calls).toEqual(new Map([[ + groupCall.room.getMember(FAKE_USER_ID_1)!, + new Map([[FAKE_DEVICE_ID_1, newMockCall]]), + ]])); }); it("starts to process incoming calls when we've entered", async () => { @@ -988,7 +1008,14 @@ describe('Group Call', function() { mockClient = typedMockClient.typed(); room = new Room(FAKE_ROOM_ID, mockClient, FAKE_USER_ID_1); - room.getMember = jest.fn().mockImplementation((userId) => ({ userId })); + room.currentState.members[FAKE_USER_ID_1] = { + userId: FAKE_USER_ID_1, + membership: "join", + } as unknown as RoomMember; + room.currentState.members[FAKE_USER_ID_2] = { + userId: FAKE_USER_ID_2, + membership: "join", + } as unknown as RoomMember; room.currentState.getStateEvents = jest.fn().mockImplementation((type: EventType, userId: string) => { return type === EventType.GroupCallMemberPrefix ? FAKE_STATE_EVENTS.find(e => e.getStateKey() === userId) || FAKE_STATE_EVENTS @@ -999,21 +1026,20 @@ describe('Group Call', function() { }); it("sending screensharing stream", async () => { - const onNegotiationNeededArray = groupCall.calls.map(call => { + const onNegotiationNeededArray: (() => Promise)[] = []; + groupCall.forEachCall(call => { // @ts-ignore Mock - call.gotLocalOffer = jest.fn(); - // @ts-ignore Mock - return call.gotLocalOffer; + onNegotiationNeededArray.push(call.gotLocalOffer = jest.fn()); }); - let enabledResult; + let enabledResult: boolean; enabledResult = await groupCall.setScreensharingEnabled(true); expect(enabledResult).toEqual(true); expect(typedMockClient.mediaHandler.getScreensharingStream).toHaveBeenCalled(); MockRTCPeerConnection.triggerAllNegotiations(); expect(groupCall.screenshareFeeds).toHaveLength(1); - groupCall.calls.forEach(c => { + groupCall.forEachCall(c => { expect(c.getLocalFeeds().find(f => f.purpose === SDPStreamMetadataPurpose.Screenshare)).toBeDefined(); }); onNegotiationNeededArray.forEach(f => expect(f).toHaveBeenCalled()); @@ -1036,7 +1062,7 @@ describe('Group Call', function() { // It takes a bit of time for the calls to get created await sleep(10); - const call = groupCall.calls[0]; + const call = groupCall.calls.get(groupCall.room.getMember(FAKE_USER_ID_2)!)!.get(FAKE_DEVICE_ID_2)!; call.getOpponentMember = () => ({ userId: call.invitee }) as RoomMember; call.onNegotiateReceived({ getContent: () => ({ @@ -1057,7 +1083,7 @@ describe('Group Call', function() { ])); expect(groupCall.screenshareFeeds).toHaveLength(1); - expect(groupCall.getScreenshareFeedByUserId(call.invitee!)).toBeDefined(); + expect(groupCall.getScreenshareFeed(call.invitee!, call.getOpponentDeviceId()!)).toBeDefined(); groupCall.terminate(); }); @@ -1097,12 +1123,16 @@ describe('Group Call', function() { ); room = new Room(FAKE_ROOM_ID, mockClient.typed(), FAKE_USER_ID_1); + room.currentState.members[FAKE_USER_ID_1] = { + userId: FAKE_USER_ID_1, + } as unknown as RoomMember; groupCall = await createAndEnterGroupCall(mockClient.typed(), room); mediaFeed1 = new CallFeed({ client: mockClient.typed(), roomId: FAKE_ROOM_ID, userId: FAKE_USER_ID_2, + deviceId: FAKE_DEVICE_ID_1, stream: (new MockMediaStream("foo", [])).typed(), purpose: SDPStreamMetadataPurpose.Usermedia, audioMuted: false, @@ -1114,6 +1144,7 @@ describe('Group Call', function() { client: mockClient.typed(), roomId: FAKE_ROOM_ID, userId: FAKE_USER_ID_3, + deviceId: FAKE_DEVICE_ID_1, stream: (new MockMediaStream("foo", [])).typed(), purpose: SDPStreamMetadataPurpose.Usermedia, audioMuted: false, @@ -1136,15 +1167,15 @@ describe('Group Call', function() { mediaFeed2.speakingVolumeSamples = [0, 0]; jest.runOnlyPendingTimers(); - expect(groupCall.activeSpeaker).toEqual(FAKE_USER_ID_2); - expect(onActiveSpeakerEvent).toHaveBeenCalledWith(FAKE_USER_ID_2); + expect(groupCall.activeSpeaker).toEqual(mediaFeed1); + expect(onActiveSpeakerEvent).toHaveBeenCalledWith(mediaFeed1); mediaFeed1.speakingVolumeSamples = [0, 0]; mediaFeed2.speakingVolumeSamples = [100, 100]; jest.runOnlyPendingTimers(); - expect(groupCall.activeSpeaker).toEqual(FAKE_USER_ID_3); - expect(onActiveSpeakerEvent).toHaveBeenCalledWith(FAKE_USER_ID_3); + expect(groupCall.activeSpeaker).toEqual(mediaFeed2); + expect(onActiveSpeakerEvent).toHaveBeenCalledWith(mediaFeed2); }); }); diff --git a/spec/unit/webrtc/groupCallEventHandler.spec.ts b/spec/unit/webrtc/groupCallEventHandler.spec.ts index 6712b0f09f0..8df7356f471 100644 --- a/spec/unit/webrtc/groupCallEventHandler.spec.ts +++ b/spec/unit/webrtc/groupCallEventHandler.spec.ts @@ -16,26 +16,21 @@ limitations under the License. import { mocked } from "jest-mock"; +import { ClientEvent } from "../../../src/client"; +import { RoomMember } from "../../../src/models/room-member"; +import { SyncState } from "../../../src/sync"; import { - ClientEvent, - GroupCall, GroupCallIntent, GroupCallState, GroupCallType, - IContent, - MatrixEvent, - Room, - RoomState, -} from "../../../src"; -import { SyncState } from "../../../src/sync"; -import { GroupCallTerminationReason } from "../../../src/webrtc/groupCall"; + GroupCallTerminationReason, +} from "../../../src/webrtc/groupCall"; +import { IContent, MatrixEvent } from "../../../src/models/event"; +import { Room } from "../../../src/models/room"; +import { RoomState } from "../../../src/models/room-state"; import { GroupCallEventHandler, GroupCallEventHandlerEvent } from "../../../src/webrtc/groupCallEventHandler"; import { flushPromises } from "../../test-utils/flushPromises"; -import { - makeMockGroupCallMemberStateEvent, - makeMockGroupCallStateEvent, - MockCallMatrixClient, -} from "../../test-utils/webrtc"; +import { makeMockGroupCallStateEvent, MockCallMatrixClient } from "../../test-utils/webrtc"; const FAKE_USER_ID = "@alice:test.dummy"; const FAKE_DEVICE_ID = "AAAAAAA"; @@ -47,6 +42,7 @@ describe('Group Call Event Handler', function() { let groupCallEventHandler: GroupCallEventHandler; let mockClient: MockCallMatrixClient; let mockRoom: Room; + let mockMember: RoomMember; beforeEach(() => { mockClient = new MockCallMatrixClient( @@ -54,13 +50,21 @@ describe('Group Call Event Handler', function() { ); groupCallEventHandler = new GroupCallEventHandler(mockClient.typed()); + mockMember = { + userId: FAKE_USER_ID, + membership: "join", + } as unknown as RoomMember; + mockRoom = { + on: () => {}, + off: () => {}, roomId: FAKE_ROOM_ID, currentState: { getStateEvents: jest.fn().mockReturnValue([makeMockGroupCallStateEvent( FAKE_ROOM_ID, FAKE_GROUP_CALL_ID, )]), }, + getMember: (userId: string) => userId === FAKE_USER_ID ? mockMember : null, } as unknown as Room; (mockClient as any).getRoom = jest.fn().mockReturnValue(mockRoom); @@ -211,27 +215,6 @@ describe('Group Call Event Handler', function() { ); }); - it("sends member events to group calls", async () => { - await groupCallEventHandler.start(); - - const mockGroupCall = { - onMemberStateChanged: jest.fn(), - }; - - groupCallEventHandler.groupCalls.set(FAKE_ROOM_ID, mockGroupCall as unknown as GroupCall); - - const mockStateEvent = makeMockGroupCallMemberStateEvent(FAKE_ROOM_ID, FAKE_GROUP_CALL_ID); - - mockClient.emitRoomState( - mockStateEvent, - { - roomId: FAKE_ROOM_ID, - } as unknown as RoomState, - ); - - expect(mockGroupCall.onMemberStateChanged).toHaveBeenCalledWith(mockStateEvent); - }); - describe("ignoring invalid group call state events", () => { let mockClientEmit: jest.Func; diff --git a/src/utils.ts b/src/utils.ts index 201be5b3e4a..71871d3b737 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -694,3 +694,16 @@ export function sortEventsByLatestContentTimestamp(left: MatrixEvent, right: Mat export function isSupportedReceiptType(receiptType: string): boolean { return [ReceiptType.Read, ReceiptType.ReadPrivate].includes(receiptType as ReceiptType); } + +/** + * Determines whether two maps are equal. + * @param eq The equivalence relation to compare values by. Defaults to strict equality. + */ +export function mapsEqual(x: Map, y: Map, eq = (v1: V, v2: V): boolean => v1 === v2): boolean { + if (x.size !== y.size) return false; + for (const [k, v1] of x) { + const v2 = y.get(k); + if (v2 === undefined || !eq(v1, v2)) return false; + } + return true; +} diff --git a/src/webrtc/call.ts b/src/webrtc/call.ts index 00a9b37d78c..8b4882c960a 100644 --- a/src/webrtc/call.ts +++ b/src/webrtc/call.ts @@ -462,6 +462,10 @@ export class MatrixCall extends TypedEventEmitter public stream: MediaStream; public sdpMetadataStreamId: string; public userId: string; + public readonly deviceId: string | undefined; public purpose: SDPStreamMetadataPurpose; public speakingVolumeSamples: number[]; @@ -86,6 +88,7 @@ export class CallFeed extends TypedEventEmitter this.client = opts.client; this.roomId = opts.roomId; this.userId = opts.userId; + this.deviceId = opts.deviceId; this.purpose = opts.purpose; this.audioMuted = opts.audioMuted; this.videoMuted = opts.videoMuted; @@ -156,7 +159,8 @@ export class CallFeed extends TypedEventEmitter * @returns {boolean} is local? */ public isLocal(): boolean { - return this.userId === this.client.getUserId(); + return this.userId === this.client.getUserId() + && (this.deviceId === undefined || this.deviceId === this.client.getDeviceId()); } /** @@ -282,6 +286,7 @@ export class CallFeed extends TypedEventEmitter client: this.client, roomId: this.roomId, userId: this.userId, + deviceId: this.deviceId, stream, purpose: this.purpose, audioMuted: this.audioMuted, diff --git a/src/webrtc/groupCall.ts b/src/webrtc/groupCall.ts index cafcadb105d..b676d6d9540 100644 --- a/src/webrtc/groupCall.ts +++ b/src/webrtc/groupCall.ts @@ -1,7 +1,8 @@ import { TypedEventEmitter } from "../models/typed-event-emitter"; import { CallFeed, SPEAKING_THRESHOLD } from "./callFeed"; import { MatrixClient } from "../client"; -import { CallErrorCode, +import { + CallErrorCode, CallEvent, CallEventHandlerMap, CallState, @@ -13,15 +14,16 @@ import { CallErrorCode, } from "./call"; import { RoomMember } from "../models/room-member"; import { Room } from "../models/room"; +import { RoomStateEvent } from "../models/room-state"; import { logger } from "../logger"; import { ReEmitter } from "../ReEmitter"; import { SDPStreamMetadataPurpose } from "./callEventTypes"; -import { ISendEventResponse } from "../@types/requests"; import { MatrixEvent } from "../models/event"; import { EventType } from "../@types/event"; import { CallEventHandlerEvent } from "./callEventHandler"; import { GroupCallEventHandlerEvent } from "./groupCallEventHandler"; import { IScreensharingOpts } from "./mediaHandler"; +import { mapsEqual } from "../utils"; export enum GroupCallIntent { Ring = "m.ring", @@ -52,15 +54,15 @@ export enum GroupCallEvent { export type GroupCallEventHandlerMap = { [GroupCallEvent.GroupCallStateChanged]: (newState: GroupCallState, oldState: GroupCallState) => void; - [GroupCallEvent.ActiveSpeakerChanged]: (activeSpeaker: string) => void; - [GroupCallEvent.CallsChanged]: (calls: MatrixCall[]) => void; + [GroupCallEvent.ActiveSpeakerChanged]: (activeSpeaker: CallFeed | undefined) => void; + [GroupCallEvent.CallsChanged]: (calls: Map>) => void; [GroupCallEvent.UserMediaFeedsChanged]: (feeds: CallFeed[]) => void; [GroupCallEvent.ScreenshareFeedsChanged]: (feeds: CallFeed[]) => void; [GroupCallEvent.LocalScreenshareStateChanged]: ( isScreensharing: boolean, feed?: CallFeed, sourceId?: string, ) => void; [GroupCallEvent.LocalMuteStateChanged]: (audioMuted: boolean, videoMuted: boolean) => void; - [GroupCallEvent.ParticipantsChanged]: (participants: RoomMember[]) => void; + [GroupCallEvent.ParticipantsChanged]: (participants: Map>) => void; [GroupCallEvent.Error]: (error: GroupCallError) => void; }; @@ -112,6 +114,7 @@ export interface IGroupCallRoomMemberFeed { export interface IGroupCallRoomMemberDevice { "device_id": string; "session_id": string; + "expires_ts": number; "feeds": IGroupCallRoomMemberFeed[]; } @@ -123,18 +126,21 @@ export interface IGroupCallRoomMemberCallState { export interface IGroupCallRoomMemberState { "m.calls": IGroupCallRoomMemberCallState[]; - "m.expires_ts": number; } export enum GroupCallState { LocalCallFeedUninitialized = "local_call_feed_uninitialized", InitializingLocalCallFeed = "initializing_local_call_feed", LocalCallFeedInitialized = "local_call_feed_initialized", - Entering = "entering", Entered = "entered", Ended = "ended", } +export interface ParticipantState { + sessionId: string; + screensharing: boolean; +} + interface ICallHandlers { onCallFeedsChanged: (feeds: CallFeed[]) => void; onCallStateChanged: (state: CallState, oldState: CallState | undefined) => void; @@ -142,14 +148,7 @@ interface ICallHandlers { onCallReplaced: (newCall: MatrixCall) => void; } -const CALL_MEMBER_STATE_TIMEOUT = 1000 * 60 * 60; // 1 hour - -const callMemberStateIsExpired = (event: MatrixEvent): boolean => { - const now = Date.now(); - const content = event?.getContent() ?? {}; - const expiresAt = typeof content["m.expires_ts"] === "number" ? content["m.expires_ts"] : -Infinity; - return expiresAt <= now; -}; +const DEVICE_TIMEOUT = 1000 * 60 * 60; // 1 hour function getCallUserId(call: MatrixCall): string | null { return call.getOpponentMember()?.userId || call.invitee || null; @@ -165,24 +164,22 @@ export class GroupCall extends TypedEventEmitter< public participantTimeout = 1000 * 15; public pttMaxTransmitTime = 1000 * 20; - public state = GroupCallState.LocalCallFeedUninitialized; - public activeSpeaker?: string; // userId + public activeSpeaker?: CallFeed; public localCallFeed?: CallFeed; public localScreenshareFeed?: CallFeed; public localDesktopCapturerSourceId?: string; - public calls: MatrixCall[] = []; - public participants: RoomMember[] = []; - public userMediaFeeds: CallFeed[] = []; - public screenshareFeeds: CallFeed[] = []; + public readonly calls = new Map>(); + public readonly userMediaFeeds: CallFeed[] = []; + public readonly screenshareFeeds: CallFeed[] = []; public groupCallId: string; - private callHandlers: Map = new Map(); - private activeSpeakerLoopTimeout?: ReturnType; - private retryCallLoopTimeout?: ReturnType; - private retryCallCounts: Map = new Map(); + private callHandlers = new Map>(); // User ID -> device ID -> handlers + private activeSpeakerLoopInterval?: ReturnType; + private retryCallLoopInterval?: ReturnType; + private retryCallCounts: Map> = new Map(); private reEmitter: ReEmitter; private transmitTimer: ReturnType | null = null; - private memberStateExpirationTimers: Map> = new Map(); + private participantsExpirationTimer: ReturnType | null = null; private resendMemberStateTimer: ReturnType | null = null; private initWithAudioMuted = false; private initWithVideoMuted = false; @@ -199,11 +196,13 @@ export class GroupCall extends TypedEventEmitter< ) { super(); this.reEmitter = new ReEmitter(this); - this.groupCallId = groupCallId || genCallID(); + this.groupCallId = groupCallId ?? genCallID(); + this.updateParticipants(); - for (const stateEvent of this.getMemberStateEvents()) { - this.onMemberStateChanged(stateEvent); - } + room.on(RoomStateEvent.Update, this.onRoomState); + this.on(GroupCallEvent.ParticipantsChanged, this.onParticipantsChanged); + this.on(GroupCallEvent.GroupCallStateChanged, this.onStateChanged); + this.on(GroupCallEvent.LocalScreenshareStateChanged, this.onLocalFeedsChanged); } public async create(): Promise { @@ -226,10 +225,55 @@ export class GroupCall extends TypedEventEmitter< return this; } - private setState(newState: GroupCallState): void { - const oldState = this.state; - this.state = newState; - this.emit(GroupCallEvent.GroupCallStateChanged, newState, oldState); + private _state = GroupCallState.LocalCallFeedUninitialized; + + /** + * The group call's state. + */ + public get state(): GroupCallState { + return this._state; + } + + private set state(value: GroupCallState) { + const prevValue = this._state; + if (value !== prevValue) { + this._state = value; + this.emit(GroupCallEvent.GroupCallStateChanged, value, prevValue); + } + } + + private _participants = new Map>(); + + /** + * The current participants in the call, as a map from members to device IDs + * to participant info. + */ + public get participants(): Map> { + return this._participants; + } + + private set participants(value: Map>) { + const prevValue = this._participants; + const participantStateEqual = (x: ParticipantState, y: ParticipantState): boolean => + x.sessionId === y.sessionId && x.screensharing === y.screensharing; + const deviceMapsEqual = (x: Map, y: Map): boolean => + mapsEqual(x, y, participantStateEqual); + + // Only update if the map actually changed + if (!mapsEqual(value, prevValue, deviceMapsEqual)) { + this._participants = value; + this.emit(GroupCallEvent.ParticipantsChanged, value); + } + } + + /** + * Executes the given callback on all calls in this group call. + * @param f The callback. + */ + public forEachCall(f: (call: MatrixCall) => void): void { + for (const deviceMap of this.calls.values()) { + for (const call of deviceMap.values()) f(call); + } } public getLocalFeeds(): CallFeed[] { @@ -242,8 +286,9 @@ export class GroupCall extends TypedEventEmitter< } public hasLocalParticipant(): boolean { - const userId = this.client.getUserId(); - return this.participants.some((member) => member.userId === userId); + return this.participants.get( + this.room.getMember(this.client.getUserId()!)!, + )?.has(this.client.getDeviceId()!) ?? false; } public async initLocalCallFeed(): Promise { @@ -253,7 +298,7 @@ export class GroupCall extends TypedEventEmitter< throw new Error(`Cannot initialize local call feed in the "${this.state}" state.`); } - this.setState(GroupCallState.InitializingLocalCallFeed); + this.state = GroupCallState.InitializingLocalCallFeed; let stream: MediaStream; @@ -268,7 +313,7 @@ export class GroupCall extends TypedEventEmitter< try { stream = await this.client.getMediaHandler().getUserMediaStream(true, this.type === GroupCallType.Video); } catch (error) { - this.setState(GroupCallState.LocalCallFeedUninitialized); + this.state = GroupCallState.LocalCallFeedUninitialized; throw error; } finally { this.off(GroupCallEvent.GroupCallStateChanged, onState); @@ -277,12 +322,11 @@ export class GroupCall extends TypedEventEmitter< // The call could've been disposed while we were waiting if (disposed) throw new Error("Group call disposed"); - const userId = this.client.getUserId()!; - const callFeed = new CallFeed({ client: this.client, roomId: this.room.roomId, - userId, + userId: this.client.getUserId()!, + deviceId: this.client.getDeviceId()!, stream, purpose: SDPStreamMetadataPurpose.Usermedia, audioMuted: this.initWithAudioMuted || stream.getAudioTracks().length === 0 || this.isPtt, @@ -295,7 +339,7 @@ export class GroupCall extends TypedEventEmitter< this.localCallFeed = callFeed; this.addUserMediaFeed(callFeed); - this.setState(GroupCallState.LocalCallFeedInitialized); + this.state = GroupCallState.LocalCallFeedInitialized; return callFeed; } @@ -316,42 +360,29 @@ export class GroupCall extends TypedEventEmitter< } public async enter(): Promise { - if (!(this.state === GroupCallState.LocalCallFeedUninitialized || - this.state === GroupCallState.LocalCallFeedInitialized)) { - throw new Error(`Cannot enter call in the "${this.state}" state`); - } - if (this.state === GroupCallState.LocalCallFeedUninitialized) { await this.initLocalCallFeed(); + } else if (this.state !== GroupCallState.LocalCallFeedInitialized) { + throw new Error(`Cannot enter call in the "${this.state}" state`); } - this.addParticipant(this.room.getMember(this.client.getUserId()!)!); - - await this.sendMemberStateEvent(); - - this.activeSpeaker = undefined; - - this.setState(GroupCallState.Entered); - logger.log(`Entered group call ${this.groupCallId}`); + this.state = GroupCallState.Entered; this.client.on(CallEventHandlerEvent.Incoming, this.onIncomingCall); - const calls = this.client.callEventHandler!.calls.values(); - - for (const call of calls) { + for (const call of this.client.callEventHandler!.calls.values()) { this.onIncomingCall(call); } - // Set up participants for the members currently in the room. - // Other members will be picked up by the RoomState.members event. - for (const stateEvent of this.getMemberStateEvents()) { - this.onMemberStateChanged(stateEvent); - } - - this.retryCallLoopTimeout = setTimeout(this.onRetryCallLoop, this.retryCallInterval); + this.retryCallLoopInterval = setInterval(this.onRetryCallLoop, this.retryCallInterval); + this.activeSpeaker = undefined; this.onActiveSpeakerLoop(); + this.activeSpeakerLoopInterval = setInterval( + this.onActiveSpeakerLoop, + this.activeSpeakerInterval, + ); } private dispose(): void { @@ -369,79 +400,63 @@ export class GroupCall extends TypedEventEmitter< this.client.getMediaHandler().stopAllStreams(); - if (this.state !== GroupCallState.Entered) { - return; + if (this.transmitTimer !== null) { + clearTimeout(this.transmitTimer); + this.transmitTimer = null; } - this.removeParticipant(this.room.getMember(this.client.getUserId()!)!); - - this.removeMemberStateEvent(); + if (this.retryCallLoopInterval !== undefined) { + clearInterval(this.retryCallLoopInterval); + this.retryCallLoopInterval = undefined; + } - while (this.calls.length > 0) { - this.removeCall(this.calls[this.calls.length - 1], CallErrorCode.UserHangup); + if (this.state !== GroupCallState.Entered) { + return; } + this.forEachCall(call => this.disposeCall(call, CallErrorCode.UserHangup)); + this.calls.clear(); + this.activeSpeaker = undefined; - clearTimeout(this.activeSpeakerLoopTimeout); + clearInterval(this.activeSpeakerLoopInterval); this.retryCallCounts.clear(); - clearTimeout(this.retryCallLoopTimeout); - - for (const [userId] of this.memberStateExpirationTimers) { - clearTimeout(this.memberStateExpirationTimers.get(userId)); - this.memberStateExpirationTimers.delete(userId); - } - - if (this.transmitTimer !== null) { - clearTimeout(this.transmitTimer); - this.transmitTimer = null; - } + clearInterval(this.retryCallLoopInterval); this.client.removeListener(CallEventHandlerEvent.Incoming, this.onIncomingCall); } public leave(): void { - if (this.transmitTimer !== null) { - clearTimeout(this.transmitTimer); - this.transmitTimer = null; - } - this.dispose(); - this.setState(GroupCallState.LocalCallFeedUninitialized); + this.state = GroupCallState.LocalCallFeedUninitialized; } public async terminate(emitStateEvent = true): Promise { this.dispose(); - if (this.transmitTimer !== null) { - clearTimeout(this.transmitTimer); - this.transmitTimer = null; - } - - this.participants = []; + this.room.off(RoomStateEvent.Update, this.onRoomState); this.client.groupCallEventHandler!.groupCalls.delete(this.room.roomId); + this.client.emit(GroupCallEventHandlerEvent.Ended, this); + this.state = GroupCallState.Ended; if (emitStateEvent) { const existingStateEvent = this.room.currentState.getStateEvents( EventType.GroupCallPrefix, this.groupCallId, - ); + )!; await this.client.sendStateEvent( this.room.roomId, EventType.GroupCallPrefix, { - ...existingStateEvent!.getContent(), - ["m.terminated"]: GroupCallTerminationReason.CallEnded, + ...existingStateEvent.getContent(), + "m.terminated": GroupCallTerminationReason.CallEnded, }, this.groupCallId, ); } - - this.client.emit(GroupCallEventHandlerEvent.Ended, this); - this.setState(GroupCallState.Ended); } - /** + /* * Local Usermedia */ @@ -489,17 +504,18 @@ export class GroupCall extends TypedEventEmitter< } } - for (const call of this.calls) { - call.localUsermediaFeed?.setAudioVideoMuted(muted, null); - } + this.forEachCall(call => call.localUsermediaFeed?.setAudioVideoMuted(muted, null)); - if (sendUpdatesBefore) { - try { - await Promise.all(this.calls.map(c => c.sendMetadataUpdate())); - } catch (e) { - logger.info("Failed to send one or more metadata updates", e); - } - } + const sendUpdates = async (): Promise => { + const updates: Promise[] = []; + this.forEachCall(call => updates.push(call.sendMetadataUpdate())); + + await Promise.all(updates).catch( + e => logger.info("Failed to send some metadata updates", e), + ); + }; + + if (sendUpdatesBefore) await sendUpdates(); if (this.localCallFeed) { logger.log(`groupCall ${this.groupCallId} setMicrophoneMuted stream ${ @@ -515,19 +531,10 @@ export class GroupCall extends TypedEventEmitter< this.initWithAudioMuted = muted; } - for (const call of this.calls) { - setTracksEnabled(call.localUsermediaFeed!.stream.getAudioTracks(), !muted); - } - + this.forEachCall(call => setTracksEnabled(call.localUsermediaFeed!.stream.getAudioTracks(), !muted)); this.emit(GroupCallEvent.LocalMuteStateChanged, muted, this.isLocalVideoMuted()); - if (!sendUpdatesBefore) { - try { - await Promise.all(this.calls.map(c => c.sendMetadataUpdate())); - } catch (e) { - logger.info("Failed to send one or more metadata updates", e); - } - } + if (!sendUpdatesBefore) await sendUpdates(); return true; } @@ -555,11 +562,12 @@ export class GroupCall extends TypedEventEmitter< this.initWithVideoMuted = muted; } - for (const call of this.calls) { - call.setLocalVideoMuted(muted); - } + const updates: Promise[] = []; + this.forEachCall(call => updates.push(call.setLocalVideoMuted(muted))); + await Promise.all(updates); this.emit(GroupCallEvent.LocalMuteStateChanged, this.isMicrophoneMuted(), muted); + return true; } @@ -591,6 +599,7 @@ export class GroupCall extends TypedEventEmitter< client: this.client, roomId: this.room.roomId, userId: this.client.getUserId()!, + deviceId: this.client.getDeviceId()!, stream, purpose: SDPStreamMetadataPurpose.Screenshare, audioMuted: false, @@ -606,11 +615,7 @@ export class GroupCall extends TypedEventEmitter< ); // TODO: handle errors - await Promise.all(this.calls.map(call => call.pushLocalFeed( - this.localScreenshareFeed!.clone(), - ))); - - await this.sendMemberStateEvent(); + this.forEachCall(call => call.pushLocalFeed(this.localScreenshareFeed!.clone())); return true; } catch (error) { @@ -625,14 +630,13 @@ export class GroupCall extends TypedEventEmitter< return false; } } else { - await Promise.all(this.calls.map(call => { + this.forEachCall(call => { if (call.localScreensharingFeed) call.removeLocalFeed(call.localScreensharingFeed); - })); + }); this.client.getMediaHandler().stopScreensharingStream(this.localScreenshareFeed!.stream); this.removeScreenshareFeed(this.localScreenshareFeed!); this.localScreenshareFeed = undefined; this.localDesktopCapturerSourceId = undefined; - await this.sendMemberStateEvent(); this.emit(GroupCallEvent.LocalScreenshareStateChanged, false, undefined, undefined); return false; } @@ -642,7 +646,7 @@ export class GroupCall extends TypedEventEmitter< return !!this.localScreenshareFeed; } - /** + /* * Call Setup * * There are two different paths for calls to be created: @@ -669,352 +673,172 @@ export class GroupCall extends TypedEventEmitter< return; } - const opponentMemberId = newCall.getOpponentMember()?.userId; - const existingCall = opponentMemberId ? this.getCallByUserId(opponentMemberId) : null; - - if (existingCall && existingCall.callId === newCall.callId) { + const opponent = newCall.getOpponentMember(); + if (opponent === undefined) { + logger.warn("Incoming call with no member. Ignoring."); return; } - logger.log(`GroupCall: incoming call from: ${opponentMemberId} with ID ${newCall.callId}`); + const deviceMap = this.calls.get(opponent) ?? new Map(); + const prevCall = deviceMap.get(newCall.getOpponentDeviceId()!); - // we are handlng this call as a PTT call, so enable PTT semantics - newCall.isPtt = this.isPtt; + if (prevCall?.callId === newCall.callId) return; - // Check if the user calling has an existing call and use this call instead. - if (existingCall) { - this.replaceCall(existingCall, newCall); - } else { - this.addCall(newCall); - } + logger.log(`GroupCall: incoming call from ${opponent.userId} with ID ${newCall.callId}`); + if (prevCall) this.disposeCall(prevCall, CallErrorCode.Replaced); + + this.initCall(newCall); newCall.answerWithCallFeeds(this.getLocalFeeds().map((feed) => feed.clone())); + + deviceMap.set(newCall.getOpponentDeviceId()!, newCall); + this.calls.set(opponent, deviceMap); + this.emit(GroupCallEvent.CallsChanged, this.calls); }; /** - * Room Member State + * Determines whether a given participant expects us to call them (versus + * them calling us). + * @param userId The participant's user ID. + * @param deviceId The participant's device ID. + * @returns Whether we need to place an outgoing call to the participant. */ - - private getMemberStateEvents(): MatrixEvent[]; - private getMemberStateEvents(userId: string): MatrixEvent | null; - private getMemberStateEvents(userId?: string): MatrixEvent[] | MatrixEvent | null { - if (userId != null) { - const event = this.room.currentState.getStateEvents(EventType.GroupCallMemberPrefix, userId); - return callMemberStateIsExpired(event!) ? null : event; - } else { - return this.room.currentState.getStateEvents(EventType.GroupCallMemberPrefix) - .filter(event => !callMemberStateIsExpired(event)); - } - } - - private async sendMemberStateEvent(): Promise { - const send = (): Promise => this.updateMemberCallState({ - "m.call_id": this.groupCallId, - "m.devices": [ - { - "device_id": this.client.getDeviceId()!, - "session_id": this.client.getSessionId(), - "feeds": this.getLocalFeeds().map((feed) => ({ - purpose: feed.purpose, - })), - // TODO: Add data channels - }, - ], - // TODO "m.foci" - }); - - const res = await send(); - - // Clear the old interval first, so that it isn't forgot - if (this.resendMemberStateTimer !== null) clearInterval(this.resendMemberStateTimer); - // Resend the state event every so often so it doesn't become stale - this.resendMemberStateTimer = setInterval(async () => { - logger.log("Resending call member state"); - await send(); - }, CALL_MEMBER_STATE_TIMEOUT * 3 / 4); - - return res; - } - - private async removeMemberStateEvent(): Promise { - if (this.resendMemberStateTimer !== null) clearInterval(this.resendMemberStateTimer); - this.resendMemberStateTimer = null; - return await this.updateMemberCallState(undefined, true); - } - - private async updateMemberCallState( - memberCallState?: IGroupCallRoomMemberCallState, - keepAlive = false, - ): Promise { + private wantsOutgoingCall(userId: string, deviceId: string): boolean { const localUserId = this.client.getUserId()!; - - const memberState = this.getMemberStateEvents(localUserId)?.getContent(); - - let calls: IGroupCallRoomMemberCallState[] = []; - - // Sanitize existing member state event - if (memberState && Array.isArray(memberState["m.calls"])) { - calls = memberState["m.calls"].filter((call) => !!call); - } - - const existingCallIndex = calls.findIndex((call) => call && call["m.call_id"] === this.groupCallId); - - if (existingCallIndex !== -1) { - if (memberCallState) { - calls.splice(existingCallIndex, 1, memberCallState); - } else { - calls.splice(existingCallIndex, 1); - } - } else if (memberCallState) { - calls.push(memberCallState); - } - - const content = { - "m.calls": calls, - "m.expires_ts": Date.now() + CALL_MEMBER_STATE_TIMEOUT, - }; - - return this.client.sendStateEvent( - this.room.roomId, EventType.GroupCallMemberPrefix, content, localUserId, { keepAlive }, + const localDeviceId = this.client.getDeviceId()!; + return ( + // If a user's ID is less than our own, they'll call us + userId >= localUserId + // If this is another one of our devices, compare device IDs to tell whether it'll call us + && (userId !== localUserId || deviceId > localDeviceId) ); } - public onMemberStateChanged = async (event: MatrixEvent): Promise => { - // If we haven't entered the call yet, we don't care - if (this.state !== GroupCallState.Entered) { - return; - } - - // The member events may be received for another room, which we will ignore. - if (event.getRoomId() !== this.room.roomId) return; - - const member = this.room.getMember(event.getStateKey()!); - if (!member) { - logger.warn(`Couldn't find room member for ${event.getStateKey()}: ignoring member state event!`); - return; - } - - // Don't process your own member. - const localUserId = this.client.getUserId()!; - if (member.userId === localUserId) return; - - logger.debug(`Processing member state event for ${member.userId}`); - - const ignore = (): void => { - this.removeParticipant(member); - clearTimeout(this.memberStateExpirationTimers.get(member.userId)); - this.memberStateExpirationTimers.delete(member.userId); - }; - - const content = event.getContent(); - const callsState = !callMemberStateIsExpired(event) && Array.isArray(content["m.calls"]) - ? content["m.calls"].filter((call) => call) - : []; // Ignore expired device data - - if (callsState.length === 0) { - logger.info(`Ignoring member state from ${member.userId} member not in any calls.`); - ignore(); - return; - } - - // Currently we only support a single call per room. So grab the first call. - const callState = callsState[0]; - const callId = callState["m.call_id"]; - - if (!callId) { - logger.warn(`Room member ${member.userId} does not have a valid m.call_id set. Ignoring.`); - ignore(); - return; - } - - if (callId !== this.groupCallId) { - logger.warn(`Call id ${callId} does not match group call id ${this.groupCallId}, ignoring.`); - ignore(); - return; - } - - this.addParticipant(member); - - clearTimeout(this.memberStateExpirationTimers.get(member.userId)); - this.memberStateExpirationTimers.set(member.userId, setTimeout(() => { - logger.warn(`Call member state for ${member.userId} has expired`); - this.removeParticipant(member); - }, content["m.expires_ts"] - Date.now())); - - // Only initiate a call with a user who has a userId that is lexicographically - // less than your own. Otherwise, that user will call you. - if (member.userId < localUserId) { - logger.debug(`Waiting for ${member.userId} to send call invite.`); - return; - } - - const opponentDevice = this.getDeviceForMember(member.userId); - - if (!opponentDevice) { - logger.warn(`No opponent device found for ${member.userId}, ignoring.`); - this.emit( - GroupCallEvent.Error, - new GroupCallUnknownDeviceError(member.userId), - ); - return; - } - - const existingCall = this.getCallByUserId(member.userId); - - if ( - existingCall && - existingCall.getOpponentSessionId() === opponentDevice.session_id - ) { - return; - } - - const newCall = createNewMatrixCall( - this.client, - this.room.roomId, - { - invitee: member.userId, - opponentDeviceId: opponentDevice.device_id, - opponentSessionId: opponentDevice.session_id, - groupCallId: this.groupCallId, - }, - ); - - if (!newCall) { - logger.error("Failed to create call!"); - return; - } - - if (existingCall) { - logger.debug(`Replacing call ${existingCall.callId} to ${member.userId} with ${newCall.callId}`); - this.replaceCall(existingCall, newCall, CallErrorCode.NewSession); - } else { - logger.debug(`Adding call ${newCall.callId} to ${member.userId}`); - this.addCall(newCall); - } - - newCall.isPtt = this.isPtt; - - const requestScreenshareFeed = opponentDevice.feeds.some( - (feed) => feed.purpose === SDPStreamMetadataPurpose.Screenshare); - - logger.debug( - `Placing call to ${member.userId}/${opponentDevice.device_id} session ID ${opponentDevice.session_id}.`, - ); + /** + * Places calls to all participants that we're responsible for calling. + */ + private placeOutgoingCalls(): void { + let callsChanged = false; + + for (const [member, participantMap] of this.participants) { + const callMap = this.calls.get(member) ?? new Map(); + + for (const [deviceId, participant] of participantMap) { + const prevCall = callMap.get(deviceId); + + if ( + prevCall?.getOpponentSessionId() !== participant.sessionId + && this.wantsOutgoingCall(member.userId, deviceId) + ) { + callsChanged = true; + + if (prevCall !== undefined) { + logger.debug(`Replacing call ${prevCall.callId} to ${member.userId} ${deviceId}`); + this.disposeCall(prevCall, CallErrorCode.NewSession); + } + + const newCall = createNewMatrixCall( + this.client, + this.room.roomId, + { + invitee: member.userId, + opponentDeviceId: deviceId, + opponentSessionId: participant.sessionId, + groupCallId: this.groupCallId, + }, + ); + + if (newCall === null) { + logger.error(`Failed to create call with ${member.userId} ${deviceId}`); + callMap.delete(deviceId); + } else { + this.initCall(newCall); + callMap.set(deviceId, newCall); + + logger.debug( + `Placing call to ${member.userId} ${deviceId} (session ${participant.sessionId})`, + ); + + newCall.placeCallWithCallFeeds( + this.getLocalFeeds().map(feed => feed.clone()), + participant.screensharing, + ).then(() => { + if (this.dataChannelsEnabled) { + newCall.createDataChannel("datachannel", this.dataChannelOptions); + } + }).catch(e => { + logger.warn(`Failed to place call to ${member.userId}`, e); + + if (e instanceof CallError && e.code === GroupCallErrorCode.UnknownDevice) { + this.emit(GroupCallEvent.Error, e); + } else { + this.emit( + GroupCallEvent.Error, + new GroupCallError( + GroupCallErrorCode.PlaceCallFailed, + `Failed to place call to ${member.userId}`, + ), + ); + } + + this.disposeCall(newCall, CallErrorCode.SignallingFailed); + if (callMap.get(deviceId) === newCall) callMap.delete(deviceId); + }); + } + } + } - try { - await newCall.placeCallWithCallFeeds( - this.getLocalFeeds().map(feed => feed.clone()), - requestScreenshareFeed, - ); - } catch (e) { - logger.warn(`Failed to place call to ${member.userId}!`, e); - if (e instanceof CallError && e.code === GroupCallErrorCode.UnknownDevice) { - this.emit(GroupCallEvent.Error, e); + if (callMap.size > 0) { + this.calls.set(member, callMap); } else { - this.emit( - GroupCallEvent.Error, - new GroupCallError( - GroupCallErrorCode.PlaceCallFailed, - `Failed to place call to ${member.userId}.`, - ), - ); + this.calls.delete(member); } - this.removeCall(newCall, CallErrorCode.SignallingFailed); - return; - } - - if (this.dataChannelsEnabled) { - newCall.createDataChannel("datachannel", this.dataChannelOptions); } - }; - - public getDeviceForMember(userId: string): IGroupCallRoomMemberDevice | undefined { - const memberStateEvent = this.getMemberStateEvents(userId); - if (!memberStateEvent) { - return undefined; - } - - const memberState = memberStateEvent.getContent(); - const memberGroupCallState = memberState["m.calls"]?.find( - (call) => call && call["m.call_id"] === this.groupCallId); - - if (!memberGroupCallState) { - return undefined; - } - - const memberDevices = memberGroupCallState["m.devices"]; + if (callsChanged) this.emit(GroupCallEvent.CallsChanged, this.calls); + } - if (!memberDevices || memberDevices.length === 0) { - return undefined; - } + /* + * Room Member State + */ - // NOTE: For now we only support one device so we use the device id in the first source. - return memberDevices[0]; + private getMemberStateEvents(): MatrixEvent[]; + private getMemberStateEvents(userId: string): MatrixEvent | null; + private getMemberStateEvents(userId?: string): MatrixEvent[] | MatrixEvent | null { + return userId === undefined + ? this.room.currentState.getStateEvents(EventType.GroupCallMemberPrefix) + : this.room.currentState.getStateEvents(EventType.GroupCallMemberPrefix, userId); } private onRetryCallLoop = (): void => { - for (const event of this.getMemberStateEvents()) { - const memberId = event.getStateKey()!; - const existingCall = this.calls.find((call) => getCallUserId(call) === memberId); - const retryCallCount = this.retryCallCounts.get(memberId) || 0; - - if (!existingCall && retryCallCount < 3) { - this.retryCallCounts.set(memberId, retryCallCount + 1); - this.onMemberStateChanged(event); + let needsRetry = false; + + for (const [member, participantMap] of this.participants) { + const callMap = this.calls.get(member); + let retriesMap = this.retryCallCounts.get(member); + + for (const [deviceId, participant] of participantMap) { + const call = callMap?.get(deviceId); + const retries = retriesMap?.get(deviceId) ?? 0; + + if ( + call?.getOpponentSessionId() !== participant.sessionId + && this.wantsOutgoingCall(member.userId, deviceId) + && retries < 3 + ) { + if (retriesMap === undefined) { + retriesMap = new Map(); + this.retryCallCounts.set(member, retriesMap); + } + retriesMap.set(deviceId, retries + 1); + needsRetry = true; + } } } - this.retryCallLoopTimeout = setTimeout(this.onRetryCallLoop, this.retryCallInterval); + if (needsRetry) this.placeOutgoingCalls(); }; - /** - * Call Event Handlers - */ - - public getCallByUserId(userId: string): MatrixCall | undefined { - return this.calls.find((call) => getCallUserId(call) === userId); - } - - private addCall(call: MatrixCall): void { - this.calls.push(call); - this.initCall(call); - this.emit(GroupCallEvent.CallsChanged, this.calls); - } - - private replaceCall( - existingCall: MatrixCall, - replacementCall: MatrixCall, - hangupReason = CallErrorCode.Replaced, - ): void { - const existingCallIndex = this.calls.indexOf(existingCall); - - if (existingCallIndex === -1) { - throw new Error("Couldn't find call to replace"); - } - - this.calls.splice(existingCallIndex, 1, replacementCall); - - this.disposeCall(existingCall, hangupReason); - this.initCall(replacementCall); - - this.emit(GroupCallEvent.CallsChanged, this.calls); - } - - private removeCall(call: MatrixCall, hangupReason: CallErrorCode): void { - this.disposeCall(call, hangupReason); - - const callIndex = this.calls.indexOf(call); - - if (callIndex === -1) { - throw new Error("Couldn't find call to remove"); - } - - this.calls.splice(callIndex, 1); - - this.emit(GroupCallEvent.CallsChanged, this.calls); - } - private initCall(call: MatrixCall): void { const opponentMemberId = getCallUserId(call); @@ -1028,9 +852,15 @@ export class GroupCall extends TypedEventEmitter< oldState?: CallState, ): void => this.onCallStateChanged(call, state, oldState); const onCallHangup = this.onCallHangup; - const onCallReplaced = (newCall: MatrixCall): void => this.replaceCall(call, newCall); + const onCallReplaced = (newCall: MatrixCall): void => this.onCallReplaced(call, newCall); + + let deviceMap = this.callHandlers.get(opponentMemberId); + if (deviceMap === undefined) { + deviceMap = new Map(); + this.callHandlers.set(opponentMemberId, deviceMap); + } - this.callHandlers.set(opponentMemberId, { + deviceMap.set(call.getOpponentDeviceId()!, { onCallFeedsChanged, onCallStateChanged, onCallHangup, @@ -1042,6 +872,8 @@ export class GroupCall extends TypedEventEmitter< call.on(CallEvent.Hangup, onCallHangup); call.on(CallEvent.Replaced, onCallReplaced); + call.isPtt = this.isPtt; + this.reEmitter.reEmit(call, Object.values(CallEvent)); onCallFeedsChanged(); @@ -1049,24 +881,27 @@ export class GroupCall extends TypedEventEmitter< private disposeCall(call: MatrixCall, hangupReason: CallErrorCode): void { const opponentMemberId = getCallUserId(call); + const opponentDeviceId = call.getOpponentDeviceId()!; if (!opponentMemberId) { throw new Error("Cannot dispose call without user id"); } + const deviceMap = this.callHandlers.get(opponentMemberId)!; const { onCallFeedsChanged, onCallStateChanged, onCallHangup, onCallReplaced, - } = this.callHandlers.get(opponentMemberId)!; + } = deviceMap.get(opponentDeviceId)!; call.removeListener(CallEvent.FeedsChanged, onCallFeedsChanged); call.removeListener(CallEvent.State, onCallStateChanged); call.removeListener(CallEvent.Hangup, onCallHangup); call.removeListener(CallEvent.Replaced, onCallReplaced); - this.callHandlers.delete(opponentMemberId); + deviceMap.delete(opponentMemberId); + if (deviceMap.size === 0) this.callHandlers.delete(opponentMemberId); if (call.hangupReason === CallErrorCode.Replaced) { return; @@ -1076,13 +911,13 @@ export class GroupCall extends TypedEventEmitter< call.hangup(hangupReason, false); } - const usermediaFeed = this.getUserMediaFeedByUserId(opponentMemberId); + const usermediaFeed = this.getUserMediaFeed(opponentMemberId, opponentDeviceId); if (usermediaFeed) { this.removeUserMediaFeed(usermediaFeed); } - const screenshareFeed = this.getScreenshareFeedByUserId(opponentMemberId); + const screenshareFeed = this.getScreenshareFeed(opponentMemberId, opponentDeviceId); if (screenshareFeed) { this.removeScreenshareFeed(screenshareFeed); @@ -1091,12 +926,13 @@ export class GroupCall extends TypedEventEmitter< private onCallFeedsChanged = (call: MatrixCall): void => { const opponentMemberId = getCallUserId(call); + const opponentDeviceId = call.getOpponentDeviceId()!; if (!opponentMemberId) { throw new Error("Cannot change call feeds without user id"); } - const currentUserMediaFeed = this.getUserMediaFeedByUserId(opponentMemberId); + const currentUserMediaFeed = this.getUserMediaFeed(opponentMemberId, opponentDeviceId); const remoteUsermediaFeed = call.remoteUsermediaFeed; const remoteFeedChanged = remoteUsermediaFeed !== currentUserMediaFeed; @@ -1110,7 +946,7 @@ export class GroupCall extends TypedEventEmitter< } } - const currentScreenshareFeed = this.getScreenshareFeedByUserId(opponentMemberId); + const currentScreenshareFeed = this.getScreenshareFeed(opponentMemberId, opponentDeviceId); const remoteScreensharingFeed = call.remoteScreensharingFeed; const remoteScreenshareFeedChanged = remoteScreensharingFeed !== currentScreenshareFeed; @@ -1145,24 +981,49 @@ export class GroupCall extends TypedEventEmitter< } if (state === CallState.Connected) { - this.retryCallCounts.delete(getCallUserId(call)!); + const opponent = call.getOpponentMember()!; + const retriesMap = this.retryCallCounts.get(opponent); + retriesMap?.delete(call.getOpponentDeviceId()!); + if (retriesMap?.size === 0) this.retryCallCounts.delete(opponent); } }; private onCallHangup = (call: MatrixCall): void => { - if (call.hangupReason === CallErrorCode.Replaced) { - return; + if (call.hangupReason === CallErrorCode.Replaced) return; + + const opponent = call.getOpponentMember() ?? this.room.getMember(call.invitee!)!; + const deviceMap = this.calls.get(opponent); + + // Sanity check that this call is in fact in the map + if (deviceMap?.get(call.getOpponentDeviceId()!) === call) { + this.disposeCall(call, call.hangupReason as CallErrorCode); + deviceMap.delete(call.getOpponentDeviceId()!); + if (deviceMap.size === 0) this.calls.delete(opponent); + this.emit(GroupCallEvent.CallsChanged, this.calls); } + }; + + private onCallReplaced = (prevCall: MatrixCall, newCall: MatrixCall): void => { + const opponent = prevCall.getOpponentMember()!; - this.removeCall(call, call.hangupReason as CallErrorCode); + let deviceMap = this.calls.get(opponent); + if (deviceMap === undefined) { + deviceMap = new Map(); + this.calls.set(opponent, deviceMap); + } + + this.disposeCall(prevCall, CallErrorCode.Replaced); + this.initCall(newCall); + deviceMap.set(prevCall.getOpponentDeviceId()!, newCall); + this.emit(GroupCallEvent.CallsChanged, this.calls); }; - /** + /* * UserMedia CallFeed Event Handlers */ - public getUserMediaFeedByUserId(userId: string): CallFeed | undefined { - return this.userMediaFeeds.find((feed) => feed.userId === userId); + public getUserMediaFeed(userId: string, deviceId: string): CallFeed | undefined { + return this.userMediaFeeds.find(f => f.userId === userId && f.deviceId! === deviceId); } private addUserMediaFeed(callFeed: CallFeed): void { @@ -1172,7 +1033,9 @@ export class GroupCall extends TypedEventEmitter< } private replaceUserMediaFeed(existingFeed: CallFeed, replacementFeed: CallFeed): void { - const feedIndex = this.userMediaFeeds.findIndex((feed) => feed.userId === existingFeed.userId); + const feedIndex = this.userMediaFeeds.findIndex( + f => f.userId === existingFeed.userId && f.deviceId! === existingFeed.deviceId, + ); if (feedIndex === -1) { throw new Error("Couldn't find user media feed to replace"); @@ -1186,7 +1049,9 @@ export class GroupCall extends TypedEventEmitter< } private removeUserMediaFeed(callFeed: CallFeed): void { - const feedIndex = this.userMediaFeeds.findIndex((feed) => feed.userId === callFeed.userId); + const feedIndex = this.userMediaFeeds.findIndex( + f => f.userId === callFeed.userId && f.deviceId! === callFeed.deviceId, + ); if (feedIndex === -1) { throw new Error("Couldn't find user media feed to remove"); @@ -1197,36 +1062,27 @@ export class GroupCall extends TypedEventEmitter< callFeed.dispose(); this.emit(GroupCallEvent.UserMediaFeedsChanged, this.userMediaFeeds); - if ( - this.activeSpeaker === callFeed.userId && - this.userMediaFeeds.length > 0 - ) { - this.activeSpeaker = this.userMediaFeeds[0].userId; + if (this.activeSpeaker === callFeed) { + this.activeSpeaker = this.userMediaFeeds[0]; this.emit(GroupCallEvent.ActiveSpeakerChanged, this.activeSpeaker); } } private onActiveSpeakerLoop = (): void => { let topAvg: number | undefined = undefined; - let nextActiveSpeaker: string | undefined = undefined; + let nextActiveSpeaker: CallFeed | undefined = undefined; for (const callFeed of this.userMediaFeeds) { - if (callFeed.userId === this.client.getUserId() && this.userMediaFeeds.length > 1) { - continue; - } - - let total = 0; - - for (let i = 0; i < callFeed.speakingVolumeSamples.length; i++) { - const volume = callFeed.speakingVolumeSamples[i]; - total += Math.max(volume, SPEAKING_THRESHOLD); - } + if (callFeed.isLocal() && this.userMediaFeeds.length > 1) continue; + const total = callFeed.speakingVolumeSamples.reduce( + (acc, volume) => acc + Math.max(volume, SPEAKING_THRESHOLD), + ); const avg = total / callFeed.speakingVolumeSamples.length; if (!topAvg || avg > topAvg) { topAvg = avg; - nextActiveSpeaker = callFeed.userId; + nextActiveSpeaker = callFeed; } } @@ -1234,19 +1090,14 @@ export class GroupCall extends TypedEventEmitter< this.activeSpeaker = nextActiveSpeaker; this.emit(GroupCallEvent.ActiveSpeakerChanged, this.activeSpeaker); } - - this.activeSpeakerLoopTimeout = setTimeout( - this.onActiveSpeakerLoop, - this.activeSpeakerInterval, - ); }; - /** + /* * Screenshare Call Feed Event Handlers */ - public getScreenshareFeedByUserId(userId: string): CallFeed | undefined { - return this.screenshareFeeds.find((feed) => feed.userId === userId); + public getScreenshareFeed(userId: string, deviceId: string): CallFeed | undefined { + return this.screenshareFeeds.find(f => f.userId === userId && f.deviceId! === deviceId); } private addScreenshareFeed(callFeed: CallFeed): void { @@ -1255,7 +1106,9 @@ export class GroupCall extends TypedEventEmitter< } private replaceScreenshareFeed(existingFeed: CallFeed, replacementFeed: CallFeed): void { - const feedIndex = this.screenshareFeeds.findIndex((feed) => feed.userId === existingFeed.userId); + const feedIndex = this.screenshareFeeds.findIndex( + f => f.userId === existingFeed.userId && f.deviceId! === existingFeed.deviceId, + ); if (feedIndex === -1) { throw new Error("Couldn't find screenshare feed to replace"); @@ -1268,7 +1121,9 @@ export class GroupCall extends TypedEventEmitter< } private removeScreenshareFeed(callFeed: CallFeed): void { - const feedIndex = this.screenshareFeeds.findIndex((feed) => feed.userId === callFeed.userId); + const feedIndex = this.screenshareFeeds.findIndex( + f => f.userId === callFeed.userId && f.deviceId! === callFeed.deviceId, + ); if (feedIndex === -1) { throw new Error("Couldn't find screenshare feed to remove"); @@ -1281,30 +1136,201 @@ export class GroupCall extends TypedEventEmitter< } /** - * Participant Management + * Recalculates and updates the participant map to match the room state. */ + private updateParticipants(): void { + if (this.participantsExpirationTimer !== null) { + clearTimeout(this.participantsExpirationTimer); + this.participantsExpirationTimer = null; + } - private addParticipant(member: RoomMember): void { - if (this.participants.find((m) => m.userId === member.userId)) { + if (this.state === GroupCallState.Ended) { + this.participants = new Map(); return; } - this.participants.push(member); + const participants = new Map>(); + const now = Date.now(); + const entered = this.state === GroupCallState.Entered; + let nextExpiration = Infinity; + + for (const e of this.getMemberStateEvents()) { + const member = this.room.getMember(e.getStateKey()!); + const content = e.getContent>(); + const calls: Record[] = Array.isArray(content["m.calls"]) ? content["m.calls"] : []; + const call = calls.find(call => call["m.call_id"] === this.groupCallId); + const devices: Record[] = Array.isArray(call?.["m.devices"]) ? call!["m.devices"] : []; + + // Filter out invalid and expired devices + let validDevices = devices.filter(d => ( + typeof d.device_id === "string" + && typeof d.session_id === "string" + && typeof d.expires_ts === "number" + && d.expires_ts > now + && Array.isArray(d.feeds) + )) as unknown as IGroupCallRoomMemberDevice[]; + + // Apply local echo for the unentered case + if (!entered && member?.userId === this.client.getUserId()!) { + validDevices = validDevices.filter(d => d.device_id !== this.client.getDeviceId()!); + } + + // Must have a connected device and be joined to the room + if (validDevices.length > 0 && member?.membership === "join") { + const deviceMap = new Map(); + participants.set(member, deviceMap); + + for (const d of validDevices) { + deviceMap.set(d.device_id, { + sessionId: d.session_id, + screensharing: d.feeds.some(f => f.purpose === SDPStreamMetadataPurpose.Screenshare), + }); + if (d.expires_ts < nextExpiration) nextExpiration = d.expires_ts; + } + } + } + + // Apply local echo for the entered case + if (entered) { + const localMember = this.room.getMember(this.client.getUserId()!)!; + let deviceMap = participants.get(localMember); + if (deviceMap === undefined) { + deviceMap = new Map(); + participants.set(localMember, deviceMap); + } + + if (!deviceMap.has(this.client.getDeviceId()!)) { + deviceMap.set(this.client.getDeviceId()!, { + sessionId: this.client.getSessionId(), + screensharing: this.getLocalFeeds().some(f => f.purpose === SDPStreamMetadataPurpose.Screenshare), + }); + } + } + + this.participants = participants; + if (nextExpiration < Infinity) { + this.participantsExpirationTimer = setTimeout(() => this.updateParticipants(), nextExpiration - now); + } + } + + /** + * Updates the local user's member state with the devices returned by the given function. + * @param fn A function from the current devices to the new devices. + * @param keepAlive Whether the request should outlive the window. + */ + private async updateDevices( + fn: (devices: IGroupCallRoomMemberDevice[]) => IGroupCallRoomMemberDevice[], + keepAlive = false, + ): Promise { + const now = Date.now(); + const localUserId = this.client.getUserId()!; + + const event = this.getMemberStateEvents(localUserId); + const content = event?.getContent>() ?? {}; + const calls: Record[] = Array.isArray(content["m.calls"]) ? content["m.calls"] : []; + + let call: Record | null = null; + const otherCalls: Record[] = []; + for (const c of calls) { + if (c["m.call_id"] === this.groupCallId) { + call = c; + } else { + otherCalls.push(c); + } + } + if (call === null) call = {}; + + const devices: Record[] = Array.isArray(call["m.devices"]) ? call["m.devices"] : []; + + // Filter out invalid and expired devices + const validDevices = devices.filter(d => ( + typeof d.device_id === "string" + && typeof d.session_id === "string" + && typeof d.expires_ts === "number" + && d.expires_ts > now + && Array.isArray(d.feeds) + )) as unknown as IGroupCallRoomMemberDevice[]; + + const newDevices = fn(validDevices); + const newCalls = [...otherCalls as unknown as IGroupCallRoomMemberCallState[]]; + if (newDevices.length > 0) { + newCalls.push({ + ...call, + "m.call_id": this.groupCallId, + "m.devices": newDevices, + }); + } + + const newContent: IGroupCallRoomMemberState = { "m.calls": newCalls }; - this.emit(GroupCallEvent.ParticipantsChanged, this.participants); - this.client.emit(GroupCallEventHandlerEvent.Participants, this.participants, this); + await this.client.sendStateEvent( + this.room.roomId, EventType.GroupCallMemberPrefix, newContent, localUserId, { keepAlive }, + ); } - private removeParticipant(member: RoomMember): void { - const index = this.participants.findIndex((m) => m.userId === member.userId); + private async addDeviceToMemberState(): Promise { + await this.updateDevices(devices => [ + ...devices.filter(d => d.device_id !== this.client.getDeviceId()!), + { + "device_id": this.client.getDeviceId()!, + "session_id": this.client.getSessionId(), + "expires_ts": Date.now() + DEVICE_TIMEOUT, + "feeds": this.getLocalFeeds().map(feed => ({ purpose: feed.purpose })), + // TODO: Add data channels + }, + ]); + } - if (index === -1) { - return; + private async updateMemberState(): Promise { + // Clear the old update interval before proceeding + if (this.resendMemberStateTimer !== null) { + clearInterval(this.resendMemberStateTimer); + this.resendMemberStateTimer = null; } - this.participants.splice(index, 1); + if (this.state === GroupCallState.Entered) { + // Add the local device + await this.addDeviceToMemberState(); - this.emit(GroupCallEvent.ParticipantsChanged, this.participants); - this.client.emit(GroupCallEventHandlerEvent.Participants, this.participants, this); + // Resend the state event every so often so it doesn't become stale + this.resendMemberStateTimer = setInterval(async () => { + logger.log("Resending call member state"); + try { + await this.addDeviceToMemberState(); + } catch (e) { + logger.error("Failed to resend call member state", e); + } + }, DEVICE_TIMEOUT * 3 / 4); + } else { + // Remove the local device + await this.updateDevices( + devices => devices.filter(d => d.device_id !== this.client.getDeviceId()!), + true, + ); + } } + + private onRoomState = (): void => this.updateParticipants(); + + private onParticipantsChanged = (): void => { + if (this.state === GroupCallState.Entered) this.placeOutgoingCalls(); + }; + + private onStateChanged = (newState: GroupCallState, oldState: GroupCallState): void => { + if ( + newState === GroupCallState.Entered + || oldState === GroupCallState.Entered + || newState === GroupCallState.Ended + ) { + // We either entered, left, or ended the call + this.updateParticipants(); + this.updateMemberState().catch(e => logger.error("Failed to update member state devices", e)); + } + }; + + private onLocalFeedsChanged = (): void => { + if (this.state === GroupCallState.Entered) { + this.updateMemberState().catch(e => logger.error("Failed to update member state feeds", e)); + } + }; } diff --git a/src/webrtc/groupCallEventHandler.ts b/src/webrtc/groupCallEventHandler.ts index 6a1084152a9..de0e533572b 100644 --- a/src/webrtc/groupCallEventHandler.ts +++ b/src/webrtc/groupCallEventHandler.ts @@ -220,14 +220,6 @@ export class GroupCallEventHandler { logger.warn(`Multiple group calls detected for room: ${ state.roomId}. Multiple group calls are currently unsupported.`); } - } else if (eventType === EventType.GroupCallMemberPrefix) { - const groupCall = this.groupCalls.get(state.roomId); - - if (!groupCall) { - return; - } - - groupCall.onMemberStateChanged(event); } }; } From 44da9040f4fb3de32dcc5abae563b04265c7ced8 Mon Sep 17 00:00:00 2001 From: Robin Townsend Date: Fri, 25 Nov 2022 23:44:34 -0500 Subject: [PATCH 2/5] Emit an event for outgoing group calls --- src/client.ts | 1 + src/webrtc/groupCall.ts | 1 + src/webrtc/groupCallEventHandler.ts | 2 ++ 3 files changed, 4 insertions(+) diff --git a/src/client.ts b/src/client.ts index 732c0731626..b091a31ec45 100644 --- a/src/client.ts +++ b/src/client.ts @@ -896,6 +896,7 @@ export type EmittedEvents = ClientEvent | CallEvent // re-emitted by call.ts using Object.values | CallEventHandlerEvent.Incoming | GroupCallEventHandlerEvent.Incoming + | GroupCallEventHandlerEvent.Outgoing | GroupCallEventHandlerEvent.Ended | GroupCallEventHandlerEvent.Participants | HttpApiEvent.SessionLoggedOut diff --git a/src/webrtc/groupCall.ts b/src/webrtc/groupCall.ts index b676d6d9540..07559ded007 100644 --- a/src/webrtc/groupCall.ts +++ b/src/webrtc/groupCall.ts @@ -207,6 +207,7 @@ export class GroupCall extends TypedEventEmitter< public async create(): Promise { this.client.groupCallEventHandler!.groupCalls.set(this.room.roomId, this); + this.client.emit(GroupCallEventHandlerEvent.Outgoing, this); await this.client.sendStateEvent( this.room.roomId, diff --git a/src/webrtc/groupCallEventHandler.ts b/src/webrtc/groupCallEventHandler.ts index de0e533572b..b1768ee90dd 100644 --- a/src/webrtc/groupCallEventHandler.ts +++ b/src/webrtc/groupCallEventHandler.ts @@ -31,12 +31,14 @@ import { SyncState } from '../sync'; export enum GroupCallEventHandlerEvent { Incoming = "GroupCall.incoming", + Outgoing = "GroupCall.outgoing", Ended = "GroupCall.ended", Participants = "GroupCall.participants", } export type GroupCallEventHandlerEventHandlerMap = { [GroupCallEventHandlerEvent.Incoming]: (call: GroupCall) => void; + [GroupCallEventHandlerEvent.Outgoing]: (call: GroupCall) => void; [GroupCallEventHandlerEvent.Ended]: (call: GroupCall) => void; [GroupCallEventHandlerEvent.Participants]: (participants: RoomMember[], call: GroupCall) => void; }; From c54d61e15896e442ba12b37fea5c8817451cbe1b Mon Sep 17 00:00:00 2001 From: Robin Townsend Date: Fri, 25 Nov 2022 23:45:45 -0500 Subject: [PATCH 3/5] Put creation timestamps on group calls --- src/webrtc/groupCall.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/webrtc/groupCall.ts b/src/webrtc/groupCall.ts index 07559ded007..b5c5dac57a6 100644 --- a/src/webrtc/groupCall.ts +++ b/src/webrtc/groupCall.ts @@ -197,6 +197,9 @@ export class GroupCall extends TypedEventEmitter< super(); this.reEmitter = new ReEmitter(this); this.groupCallId = groupCallId ?? genCallID(); + this.creationTs = room.currentState.getStateEvents( + EventType.GroupCallPrefix, this.groupCallId, + )?.getTs() ?? null; this.updateParticipants(); room.on(RoomStateEvent.Update, this.onRoomState); @@ -206,6 +209,7 @@ export class GroupCall extends TypedEventEmitter< } public async create(): Promise { + this.creationTs = Date.now(); this.client.groupCallEventHandler!.groupCalls.set(this.room.roomId, this); this.client.emit(GroupCallEventHandlerEvent.Outgoing, this); @@ -267,6 +271,20 @@ export class GroupCall extends TypedEventEmitter< } } + private _creationTs: number | null = null; + + /** + * The timestamp at which the call was created, or null if it has not yet + * been created. + */ + public get creationTs(): number | null { + return this._creationTs; + } + + private set creationTs(value: number | null) { + this._creationTs = value; + } + /** * Executes the given callback on all calls in this group call. * @param f The callback. From 19e02e894f77f4b23be6b993021560178646d50e Mon Sep 17 00:00:00 2001 From: Robin Townsend Date: Fri, 25 Nov 2022 23:47:01 -0500 Subject: [PATCH 4/5] Add a method for cleaning group call member state --- src/webrtc/groupCall.ts | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/webrtc/groupCall.ts b/src/webrtc/groupCall.ts index b5c5dac57a6..7713d6de9a0 100644 --- a/src/webrtc/groupCall.ts +++ b/src/webrtc/groupCall.ts @@ -1,6 +1,6 @@ import { TypedEventEmitter } from "../models/typed-event-emitter"; import { CallFeed, SPEAKING_THRESHOLD } from "./callFeed"; -import { MatrixClient } from "../client"; +import { MatrixClient, IMyDevice } from "../client"; import { CallErrorCode, CallEvent, @@ -1234,11 +1234,12 @@ export class GroupCall extends TypedEventEmitter< /** * Updates the local user's member state with the devices returned by the given function. - * @param fn A function from the current devices to the new devices. + * @param fn A function from the current devices to the new devices. If it + * returns null, the update will be skipped. * @param keepAlive Whether the request should outlive the window. */ private async updateDevices( - fn: (devices: IGroupCallRoomMemberDevice[]) => IGroupCallRoomMemberDevice[], + fn: (devices: IGroupCallRoomMemberDevice[]) => IGroupCallRoomMemberDevice[] | null, keepAlive = false, ): Promise { const now = Date.now(); @@ -1271,6 +1272,8 @@ export class GroupCall extends TypedEventEmitter< )) as unknown as IGroupCallRoomMemberDevice[]; const newDevices = fn(validDevices); + if (newDevices === null) return; + const newCalls = [...otherCalls as unknown as IGroupCallRoomMemberCallState[]]; if (newDevices.length > 0) { newCalls.push({ @@ -1329,6 +1332,27 @@ export class GroupCall extends TypedEventEmitter< } } + /** + * Cleans up our member state by filtering out logged out devices, inactive + * devices, and our own device (if we know we haven't entered). + */ + public async cleanMemberState(): Promise { + const { devices: myDevices } = await this.client.getDevices(); + const deviceMap = new Map(myDevices.map(d => [d.device_id, d])); + + // updateDevices takes care of filtering out inactive devices for us + await this.updateDevices(devices => { + const newDevices = devices.filter(d => { + const device = deviceMap.get(d.device_id); + return device?.last_seen_ts !== undefined + && !(d.device_id === this.client.getDeviceId()! && this.state !== GroupCallState.Entered); + }); + + // Skip the update if the devices are unchanged + return newDevices.length === devices.length ? null : newDevices; + }); + } + private onRoomState = (): void => this.updateParticipants(); private onParticipantsChanged = (): void => { From 5511a6ef8c9532f73039839779c831562272d46a Mon Sep 17 00:00:00 2001 From: Robin Townsend Date: Sat, 26 Nov 2022 00:28:11 -0500 Subject: [PATCH 5/5] Fix tests --- spec/unit/webrtc/groupCall.spec.ts | 26 ++++++++++++------- .../unit/webrtc/groupCallEventHandler.spec.ts | 12 ++++++--- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/spec/unit/webrtc/groupCall.spec.ts b/spec/unit/webrtc/groupCall.spec.ts index 20a40c1e4ee..047474f8e12 100644 --- a/spec/unit/webrtc/groupCall.spec.ts +++ b/spec/unit/webrtc/groupCall.spec.ts @@ -58,6 +58,7 @@ const FAKE_STATE_EVENTS = [ }), getStateKey: () => FAKE_USER_ID_1, getRoomId: () => FAKE_ROOM_ID, + getTs: () => 0, }, { getContent: () => ({ @@ -73,6 +74,7 @@ const FAKE_STATE_EVENTS = [ }), getStateKey: () => FAKE_USER_ID_2, getRoomId: () => FAKE_ROOM_ID, + getTs: () => 0, }, { getContent: () => ({ "m.expires_ts": Date.now() + ONE_HOUR, @@ -88,9 +90,21 @@ const FAKE_STATE_EVENTS = [ }), getStateKey: () => "user3", getRoomId: () => FAKE_ROOM_ID, + getTs: () => 0, }, ]; +const mockGetStateEvents = (type: EventType, userId?: string): MatrixEvent[] | MatrixEvent | null => { + if (type === EventType.GroupCallMemberPrefix) { + return userId === undefined + ? FAKE_STATE_EVENTS as MatrixEvent[] + : FAKE_STATE_EVENTS.find(e => e.getStateKey() === userId) as MatrixEvent; + } else { + const fakeEvent = { getContent: () => ({}), getTs: () => 0 } as MatrixEvent; + return userId === undefined ? [fakeEvent] : fakeEvent; + } +}; + const ONE_HOUR = 1000 * 60 * 60; const createAndEnterGroupCall = async (cli: MatrixClient, room: Room): Promise => { @@ -774,11 +788,7 @@ describe('Group Call', function() { mockClient = typedMockClient as unknown as MatrixClient; room = new Room(FAKE_ROOM_ID, mockClient, FAKE_USER_ID_1); - room.currentState.getStateEvents = jest.fn().mockImplementation((type: EventType, userId: string) => { - return type === EventType.GroupCallMemberPrefix - ? FAKE_STATE_EVENTS.find(e => e.getStateKey() === userId) || FAKE_STATE_EVENTS - : { getContent: () => ([]) }; - }); + room.currentState.getStateEvents = jest.fn().mockImplementation(mockGetStateEvents); room.currentState.members[FAKE_USER_ID_1] = { userId: FAKE_USER_ID_1, membership: "join", @@ -1016,11 +1026,7 @@ describe('Group Call', function() { userId: FAKE_USER_ID_2, membership: "join", } as unknown as RoomMember; - room.currentState.getStateEvents = jest.fn().mockImplementation((type: EventType, userId: string) => { - return type === EventType.GroupCallMemberPrefix - ? FAKE_STATE_EVENTS.find(e => e.getStateKey() === userId) || FAKE_STATE_EVENTS - : { getContent: () => ([]) }; - }); + room.currentState.getStateEvents = jest.fn().mockImplementation(mockGetStateEvents); groupCall = await createAndEnterGroupCall(mockClient, room); }); diff --git a/spec/unit/webrtc/groupCallEventHandler.spec.ts b/spec/unit/webrtc/groupCallEventHandler.spec.ts index 8df7356f471..de70e42085b 100644 --- a/spec/unit/webrtc/groupCallEventHandler.spec.ts +++ b/spec/unit/webrtc/groupCallEventHandler.spec.ts @@ -55,14 +55,20 @@ describe('Group Call Event Handler', function() { membership: "join", } as unknown as RoomMember; + const mockEvent = makeMockGroupCallStateEvent(FAKE_ROOM_ID, FAKE_GROUP_CALL_ID); + mockRoom = { on: () => {}, off: () => {}, roomId: FAKE_ROOM_ID, currentState: { - getStateEvents: jest.fn().mockReturnValue([makeMockGroupCallStateEvent( - FAKE_ROOM_ID, FAKE_GROUP_CALL_ID, - )]), + getStateEvents: jest.fn((type, key) => { + if (type === mockEvent.getType()) { + return key === undefined ? [mockEvent] : mockEvent; + } else { + return key === undefined ? [] : null; + } + }), }, getMember: (userId: string) => userId === FAKE_USER_ID ? mockMember : null, } as unknown as Room;