-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmessages.ts
686 lines (608 loc) · 22.7 KB
/
messages.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
import * as Ably from 'ably';
import { messagesChannelName } from './channel.js';
import { ChannelManager } from './channel-manager.js';
import { ChatApi } from './chat-api.js';
import {
DiscontinuityEmitter,
DiscontinuityListener,
EmitsDiscontinuities,
HandlesDiscontinuity,
newDiscontinuityEmitter,
OnDiscontinuitySubscriptionResponse,
} from './discontinuity.js';
import { ErrorCodes } from './errors.js';
import { ChatMessageActions, MessageEvents, RealtimeMessageNames } from './events.js';
import { Logger } from './logger.js';
import { DefaultMessage, Message, MessageHeaders, MessageMetadata, MessageOperationMetadata } from './message.js';
import { parseMessage } from './message-parser.js';
import { PaginatedResult } from './query.js';
import { addListenerToChannelWithoutAttach } from './realtime-extensions.js';
import { ContributesToRoomLifecycle } from './room-lifecycle-manager.js';
import EventEmitter from './utils/event-emitter.js';
/**
* Event names and their respective payloads emitted by the messages feature.
*/
interface MessageEventsMap {
[MessageEvents.Created]: MessageEventPayload;
[MessageEvents.Updated]: MessageEventPayload;
[MessageEvents.Deleted]: MessageEventPayload;
}
/**
* Mapping of chat message actions to message events.
*/
const MessageActionsToEventsMap: Map<ChatMessageActions, MessageEvents> = new Map<ChatMessageActions, MessageEvents>([
[ChatMessageActions.MessageCreate, MessageEvents.Created],
[ChatMessageActions.MessageUpdate, MessageEvents.Updated],
[ChatMessageActions.MessageDelete, MessageEvents.Deleted],
]);
/**
* The order in which results should be returned when performing a paginated query (e.g. message history).
*/
export enum OrderBy {
/**
* Return results in ascending order (oldest first).
*/
OldestFirst = 'oldestFirst',
/**
* Return results in descending order (newest first).
*/
NewestFirst = 'newestFirst',
}
/**
* Options for querying messages in a chat room.
*/
export interface QueryOptions {
/**
* The start of the time window to query from. If provided, the response will include
* messages with timestamps equal to or greater than this value.
*
* @defaultValue The beginning of time
*/
start?: number;
/**
* The end of the time window to query from. If provided, the response will include
* messages with timestamps less than this value.
*
* @defaultValue Now
*/
end?: number;
/**
* The maximum number of messages to return in the response.
*
* @defaultValue 100
*/
limit?: number;
/**
* The direction to query messages in.
* If {@link OrderBy.OldestFirst}, the response will include messages from the start of the time window to the end.
* If {@link OrderBy.NewestFirst}, the response will include messages from the end of the time window to the start.
* If not provided, the default is {@link OrderBy.NewestFirst}.
*
* @defaultValue {@link OrderBy.NewestFirst}
*/
orderBy?: OrderBy;
}
/**
* The parameters supplied to a message action like delete or update.
*/
export interface OperationDetails {
/**
* Optional description for the message action.
*/
description?: string;
/**
* Optional metadata that will be added to the action. Defaults to empty.
*
*/
metadata?: MessageOperationMetadata;
}
/**
* Parameters for deleting a message.
*/
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface DeleteMessageParams extends OperationDetails {}
/**
* Params for sending a text message. Only `text` is mandatory.
*/
export interface SendMessageParams {
/**
* The text of the message.
*/
text: string;
/**
* Optional metadata of the message.
*
* The metadata is a map of extra information that can be attached to chat
* messages. It is not used by Ably and is sent as part of the realtime
* message payload. Example use cases are setting custom styling like
* background or text colors or fonts, adding links to external images,
* emojis, etc.
*
* Do not use metadata for authoritative information. There is no server-side
* validation. When reading the metadata, treat it like user input.
*
*/
metadata?: MessageMetadata;
/**
* Optional headers of the message.
*
* The headers are a flat key-value map and are sent as part of the realtime
* message's extras inside the `headers` property. They can serve similar
* purposes as the metadata, but they are read by Ably and can be used for
* features such as
* [subscription filters](https://faqs.ably.com/subscription-filters).
*
* Do not use the headers for authoritative information. There is no
* server-side validation. When reading the headers, treat them like user
* input.
*
*/
headers?: MessageHeaders;
}
/**
* Params for updating a message. It accepts all parameters that sending a
* message accepts.
*
* Note that updating a message replaces the whole previous message, so all
* metadata and headers that should be kept must be set in the update request,
* or they will be lost.
*/
// eslint-disable-next-line @typescript-eslint/no-empty-object-type
export interface UpdateMessageParams extends SendMessageParams {}
/**
* Payload for a message event.
*/
export interface MessageEventPayload {
/**
* The type of the message event.
*/
type: MessageEvents;
/**
* The message that was received.
*/
message: Message;
}
/**
* A listener for message events in a chat room.
* @param event The message event that was received.
*/
export type MessageListener = (event: MessageEventPayload) => void;
/**
* A response object that allows you to control a message subscription.
*/
export interface MessageSubscriptionResponse {
/**
* Unsubscribe the listener registered with {@link Messages.subscribe} from message events.
*/
unsubscribe: () => void;
/**
* Get the previous messages that were sent to the room before the listener was subscribed.
* @param params Options for the history query.
* @returns A promise that resolves with the paginated result of messages, in newest-to-oldest order.
*/
getPreviousMessages(params: Omit<QueryOptions, 'orderBy'>): Promise<PaginatedResult<Message>>;
}
/**
* This interface is used to interact with messages in a chat room: subscribing
* to new messages, fetching history, or sending messages.
*
* Get an instance via {@link Room.messages}.
*/
export interface Messages extends EmitsDiscontinuities {
/**
* Subscribe to new messages in this chat room.
* @param listener callback that will be called
* @returns A response object that allows you to control the subscription.
*/
subscribe(listener: MessageListener): MessageSubscriptionResponse;
/**
* Unsubscribe all listeners from new messages in the chat room.
*/
unsubscribeAll(): void;
/**
* Get messages that have been previously sent to the chat room, based on the provided options.
*
* @param options Options for the query.
* @returns A promise that resolves with the paginated result of messages. This paginated result can
* be used to fetch more messages if available.
*/
get(options: QueryOptions): Promise<PaginatedResult<Message>>;
/**
* Send a message in the chat room.
*
* This method uses the Ably Chat API endpoint for sending messages.
*
* Note that the Promise may resolve before OR after the message is received
* from the realtime channel. This means you may see the message that was just
* sent in a callback to `subscribe` before the returned promise resolves.
*
* @param params an object containing {text, headers, metadata} for the message
* to be sent. Text is required, metadata and headers are optional.
* @returns A promise that resolves when the message was published.
*/
send(params: SendMessageParams): Promise<Message>;
/**
* Delete a message in the chat room.
*
* This method uses the Ably Chat API REST endpoint for deleting messages.
* It performs a `soft` delete, meaning the message is marked as deleted.
*
* Note that the Promise may resolve before OR after the message is deleted
* from the realtime channel. This means you may see the message that was just
* deleted in a callback to `subscribe` before the returned promise resolves.
*
* Should you wish to restore a deleted message, and providing you have the appropriate permissions,
* you can simply send an update to the original message.
* Note: This is subject to change in future versions, whereby a new permissions model will be introduced
* and a deleted message may not be restorable in this way.
*
* @returns A promise that resolves when the message was deleted.
* @param message - The message to delete.
* @param deleteMessageParams - Optional details to record about the delete action.
* @return A promise that resolves to the deleted message.
*/
delete(message: Message, deleteMessageParams?: DeleteMessageParams): Promise<Message>;
/**
* Update a message in the chat room.
*
* Note that the Promise may resolve before OR after the updated message is
* received from the realtime channel. This means you may see the update that
* was just sent in a callback to `subscribe` before the returned promise
* resolves.
*
* @param message The message to update.
* @param update The new message content including headers and metadata. This
* fully replaces the old content. Everything that's not set will be removed.
* @param details Optional details to record about the update action.
* @returns A promise of the updated message.
*/
update(message: Message, update: UpdateMessageParams, details?: OperationDetails): Promise<Message>;
/**
* Get the underlying Ably realtime channel used for the messages in this chat room.
*
* @returns The realtime channel.
*/
get channel(): Ably.RealtimeChannel;
}
/**
* @inheritDoc
*/
export class DefaultMessages
extends EventEmitter<MessageEventsMap>
implements Messages, HandlesDiscontinuity, ContributesToRoomLifecycle
{
private readonly _roomId: string;
private readonly _channel: Ably.RealtimeChannel;
private readonly _chatApi: ChatApi;
private readonly _clientId: string;
private readonly _listenerSubscriptionPoints: Map<
MessageListener,
Promise<{
fromSerial: string;
}>
>;
private readonly _logger: Logger;
private readonly _discontinuityEmitter: DiscontinuityEmitter = newDiscontinuityEmitter();
/**
* Constructs a new `DefaultMessages` instance.
* @param roomId The unique identifier of the room.
* @param channelManager An instance of the ChannelManager.
* @param chatApi An instance of the ChatApi.
* @param clientId The client ID of the user.
* @param logger An instance of the Logger.
*/
constructor(roomId: string, channelManager: ChannelManager, chatApi: ChatApi, clientId: string, logger: Logger) {
super();
this._roomId = roomId;
this._channel = this._makeChannel(roomId, channelManager);
this._chatApi = chatApi;
this._clientId = clientId;
this._logger = logger;
this._listenerSubscriptionPoints = new Map<MessageListener, Promise<{ fromSerial: string }>>();
}
/**
* Creates the realtime channel for messages.
*/
private _makeChannel(roomId: string, channelManager: ChannelManager): Ably.RealtimeChannel {
const channel = channelManager.get(messagesChannelName(roomId));
addListenerToChannelWithoutAttach({
listener: this._processEvent.bind(this),
events: [RealtimeMessageNames.ChatMessage],
channel: channel,
});
// Handles the case where channel attaches and resume state is false. This can happen when the channel is first attached,
// or when the channel is reattached after a detach. In both cases, we reset the subscription points for all listeners.
channel.on('attached', (message) => {
this._handleAttach(message.resumed);
});
// Handles the case where an update message is received from a channel after a detach and reattach.
channel.on('update', (message) => {
if (message.current === 'attached' && message.previous === 'attached') {
this._handleAttach(message.resumed);
}
});
return channel;
}
/**
* @inheritdoc Messages
*/
private async _getBeforeSubscriptionStart(
listener: MessageListener,
params: Omit<QueryOptions, 'orderBy'>,
): Promise<PaginatedResult<Message>> {
this._logger.trace(`DefaultSubscriptionManager.getBeforeSubscriptionStart();`);
const subscriptionPoint = this._listenerSubscriptionPoints.get(listener);
if (subscriptionPoint === undefined) {
this._logger.error(
`DefaultSubscriptionManager.getBeforeSubscriptionStart(); listener has not been subscribed yet`,
);
throw new Ably.ErrorInfo(
'cannot query history; listener has not been subscribed yet',
40000,
400,
) as unknown as Error;
}
// Get the subscription point of the listener
const subscriptionPointParams = await subscriptionPoint;
// Query messages from the subscription point to the start of the time window
return this._chatApi.getMessages(this._roomId, {
...params,
orderBy: OrderBy.NewestFirst,
...subscriptionPointParams,
});
}
/**
* Handle the case where the channel experiences a detach and reattaches.
*/
private _handleAttach(fromResume: boolean) {
this._logger.trace(`DefaultSubscriptionManager.handleAttach();`);
// Do nothing if we have resumed as there is no discontinuity in the message stream
if (fromResume) return;
// Reset subscription points for all listeners
const newSubscriptionStartResolver = this._subscribeAtChannelAttach();
for (const [listener] of this._listenerSubscriptionPoints.entries()) {
this._listenerSubscriptionPoints.set(listener, newSubscriptionStartResolver);
}
}
/**
* Create a promise that resolves with the attachSerial of the channel or the serial of the latest message.
*/
private async _resolveSubscriptionStart(): Promise<{
fromSerial: string;
}> {
const channelWithProperties = this._getChannelProperties();
// If we are attached, we can resolve with the channelSerial
if (channelWithProperties.state === 'attached') {
if (channelWithProperties.properties.channelSerial) {
return { fromSerial: channelWithProperties.properties.channelSerial };
}
this._logger.error(`DefaultSubscriptionManager.handleAttach(); channelSerial is undefined`);
throw new Ably.ErrorInfo('channel is attached, but channelSerial is not defined', 40000, 400) as unknown as Error;
}
return this._subscribeAtChannelAttach();
}
private _getChannelProperties(): Ably.RealtimeChannel & {
properties: { attachSerial: string | undefined; channelSerial: string | undefined };
} {
// Get the attachSerial from the channel properties
return this._channel as Ably.RealtimeChannel & {
properties: {
attachSerial: string | undefined;
channelSerial: string | undefined;
};
};
}
private async _subscribeAtChannelAttach(): Promise<{ fromSerial: string }> {
const channelWithProperties = this._getChannelProperties();
return new Promise((resolve, reject) => {
// Check if the state is now attached
if (channelWithProperties.state === 'attached') {
// Get the attachSerial from the channel properties
// AttachSerial should always be defined at this point, but we check just in case
this._logger.debug('Messages._subscribeAtChannelAttach(); channel is attached already, using attachSerial', {
attachSerial: channelWithProperties.properties.attachSerial,
});
if (channelWithProperties.properties.attachSerial) {
resolve({ fromSerial: channelWithProperties.properties.attachSerial });
} else {
this._logger.error(`DefaultSubscriptionManager.handleAttach(); attachSerial is undefined`);
reject(
new Ably.ErrorInfo('channel is attached, but attachSerial is not defined', 40000, 400) as unknown as Error,
);
}
}
channelWithProperties.once('attached', () => {
// Get the attachSerial from the channel properties
// AttachSerial should always be defined at this point, but we check just in case
this._logger.debug('Messages._subscribeAtChannelAttach(); channel is now attached, using attachSerial', {
attachSerial: channelWithProperties.properties.attachSerial,
});
if (channelWithProperties.properties.attachSerial) {
resolve({ fromSerial: channelWithProperties.properties.attachSerial });
} else {
this._logger.error(`DefaultSubscriptionManager.handleAttach(); attachSerial is undefined`);
reject(
new Ably.ErrorInfo('channel is attached, but attachSerial is not defined', 40000, 400) as unknown as Error,
);
}
});
});
}
/**
* @inheritdoc Messages
*/
get channel(): Ably.RealtimeChannel {
return this._channel;
}
/**
* @inheritdoc Messages
*/
async get(options: QueryOptions): Promise<PaginatedResult<Message>> {
this._logger.trace('Messages.query();');
return this._chatApi.getMessages(this._roomId, options);
}
/**
* @inheritdoc Messages
*/
async send(params: SendMessageParams): Promise<Message> {
this._logger.trace('Messages.send();', { params });
const { text, metadata, headers } = params;
const response = await this._chatApi.sendMessage(this._roomId, { text, headers, metadata });
return new DefaultMessage(
response.serial,
this._clientId,
this._roomId,
text,
metadata ?? {},
headers ?? {},
ChatMessageActions.MessageCreate,
response.serial,
new Date(response.createdAt),
new Date(response.createdAt), // timestamp is the same as createdAt for new messages
);
}
async update(message: Message, update: UpdateMessageParams, details?: OperationDetails): Promise<Message> {
this._logger.trace('Messages.update();', { message, update, details });
const response = await this._chatApi.updateMessage(this._roomId, message.serial, {
...details,
message: update,
});
const updatedMessage = new DefaultMessage(
message.serial,
message.clientId,
this._roomId,
update.text,
update.metadata ?? {},
update.headers ?? {},
ChatMessageActions.MessageUpdate,
response.version,
new Date(message.createdAt),
new Date(response.timestamp),
{
clientId: this._clientId,
description: details?.description,
metadata: details?.metadata,
},
);
this._logger.debug('Messages.update(); message update successfully', { updatedMessage });
return updatedMessage;
}
/**
* @inheritdoc Messages
*/
async delete(message: Message, params?: DeleteMessageParams): Promise<Message> {
this._logger.trace('Messages.delete();', { params });
const response = await this._chatApi.deleteMessage(this._roomId, message.serial, params);
const deletedMessage: Message = new DefaultMessage(
message.serial,
message.clientId,
this._roomId,
message.text,
message.metadata,
message.headers,
ChatMessageActions.MessageDelete,
response.version,
new Date(message.createdAt),
new Date(response.timestamp),
{
clientId: this._clientId,
description: params?.description,
metadata: params?.metadata,
},
);
this._logger.debug('Messages.delete(); message deleted successfully', { deletedMessage });
return deletedMessage;
}
/**
* @inheritdoc Messages
*/
subscribe(listener: MessageListener): MessageSubscriptionResponse {
this._logger.trace('Messages.subscribe();');
super.on([MessageEvents.Created, MessageEvents.Updated, MessageEvents.Deleted], listener);
// Set the subscription point to a promise that resolves when the channel attaches or with the latest message
const resolvedSubscriptionStart = this._resolveSubscriptionStart();
// Add a handler for unhandled rejections incase the room is released before the subscription point is resolved
resolvedSubscriptionStart.catch(() => {
this._logger.debug('Messages.subscribe(); subscription point was not resolved before the room was released', {
roomId: this._roomId,
});
});
this._listenerSubscriptionPoints.set(listener, resolvedSubscriptionStart);
return {
unsubscribe: () => {
// Remove the listener from the subscription points
this._listenerSubscriptionPoints.delete(listener);
this._logger.trace('Messages.unsubscribe();');
super.off(listener);
},
getPreviousMessages: (params: Omit<QueryOptions, 'orderBy'>) =>
this._getBeforeSubscriptionStart(listener, params),
};
}
/**
* @inheritdoc Messages
*/
unsubscribeAll(): void {
this._logger.trace('Messages.unsubscribeAll();');
super.off();
this._listenerSubscriptionPoints.clear();
}
private _processEvent(channelEventMessage: Ably.InboundMessage) {
this._logger.trace('Messages._processEvent();', {
channelEventMessage,
});
const { action } = channelEventMessage;
const event = MessageActionsToEventsMap.get(action as ChatMessageActions);
if (!event) {
this._logger.debug('Messages._processEvent(); received unknown message action', { action });
return;
}
// Send the message to the listeners
const message = this._parseNewMessage(channelEventMessage);
if (!message) {
return;
}
this.emit(event, { type: event, message: message });
}
/**
* Validate the realtime message and convert it to a chat message.
*/
private _parseNewMessage(channelEventMessage: Ably.InboundMessage): Message | undefined {
try {
return parseMessage(this._roomId, channelEventMessage);
} catch (error: unknown) {
this._logger.error(`failed to parse incoming message;`, { channelEventMessage, error: error as Ably.ErrorInfo });
}
}
/**
* @inheritdoc HandlesDiscontinuity
*/
discontinuityDetected(reason?: Ably.ErrorInfo): void {
this._logger.warn('Messages.discontinuityDetected();', { reason });
this._discontinuityEmitter.emit('discontinuity', reason);
}
/**
* @inheritdoc EmitsDiscontinuities
*/
onDiscontinuity(listener: DiscontinuityListener): OnDiscontinuitySubscriptionResponse {
this._logger.trace('Messages.onDiscontinuity();');
this._discontinuityEmitter.on(listener);
return {
off: () => {
this._discontinuityEmitter.off(listener);
},
};
}
/**
* @inheritdoc ContributesToRoomLifecycle
*/
get attachmentErrorCode(): ErrorCodes {
return ErrorCodes.MessagesAttachmentFailed;
}
/**
* @inheritdoc ContributesToRoomLifecycle
*/
get detachmentErrorCode(): ErrorCodes {
return ErrorCodes.MessagesDetachmentFailed;
}
}