Skip to content

Commit

Permalink
Fix tests for event batch publisher implementation (#924)
Browse files Browse the repository at this point in the history
This commit addresses test failures that emerged after introducing
event batching.

The main fixes focus on two areas:

1. Watched Event Delivery
- Previously: Events were published immediately to current subscribers.
- Now: Events are batched and filtered based on subscription status at
  publish time.
- Added logic to prevent sending watched events to clients who already
  know the watch state.

2. Attach Event Sequencing
- Updated tests to account for new event delivery timing
- Modified attach/subscribe sequence for deterministic behavior
- Ensured late subscribers can properly receive synced events under the
  batch system

These changes maintain correct event delivery semantics while
supporting the new batched publishing mechanism.
  • Loading branch information
chacha912 authored Nov 4, 2024
1 parent ea5b6f3 commit 9a6ab62
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 14 deletions.
4 changes: 4 additions & 0 deletions packages/sdk/src/document/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,10 @@ export class Document<T, P extends Indexable = Indexable> {
const event: Array<WatchedEvent<P> | UnwatchedEvent<P> | BroadcastEvent> =
[];
if (type === PbDocEventType.DOCUMENT_WATCHED) {
if (this.onlineClients.has(publisher) && this.hasPresence(publisher)) {
return;
}

this.addOnlineClient(publisher);
// NOTE(chacha912): We added to onlineClients, but we won't trigger watched event
// unless we also know their initial presence data at this point.
Expand Down
30 changes: 16 additions & 14 deletions packages/sdk/test/integration/client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ describe.sequential('Client', function () {

// 03. [Step2] c1 sync with push-only mode, c2 sync with sync-off mode.
// c3 can get the changes of c1 and c2, because c3 sync with push-pull mode.
c1.changeSyncMode(d1, SyncMode.RealtimePushOnly);
c2.changeSyncMode(d2, SyncMode.RealtimeSyncOff);
await c1.changeSyncMode(d1, SyncMode.RealtimePushOnly);
await c2.changeSyncMode(d2, SyncMode.RealtimeSyncOff);
d1.update((root) => {
root.c1 = 1;
});
Expand All @@ -397,8 +397,8 @@ describe.sequential('Client', function () {
assert.equal(d3.toSortedJSON(), '{"c1":1,"c2":0,"c3":1}', 'd3');

// 04. [Step3] c1 sync with sync-off mode, c2 sync with push-only mode.
c1.changeSyncMode(d1, SyncMode.RealtimeSyncOff);
c2.changeSyncMode(d2, SyncMode.RealtimePushOnly);
await c1.changeSyncMode(d1, SyncMode.RealtimeSyncOff);
await c2.changeSyncMode(d2, SyncMode.RealtimePushOnly);
d1.update((root) => {
root.c1 = 2;
});
Expand All @@ -419,8 +419,8 @@ describe.sequential('Client', function () {
assert.equal(d3.toSortedJSON(), '{"c1":1,"c2":2,"c3":2}', 'd3');

// 05. [Step4] c1 and c2 sync with push-pull mode.
c1.changeSyncMode(d1, SyncMode.Realtime);
c2.changeSyncMode(d2, SyncMode.Realtime);
await c1.changeSyncMode(d1, SyncMode.Realtime);
await c2.changeSyncMode(d2, SyncMode.Realtime);
await eventCollectorD1.waitAndVerifyNthEvent(6, DocEventType.RemoteChange);
await eventCollectorD1.waitAndVerifyNthEvent(7, DocEventType.RemoteChange);
await eventCollectorD1.waitAndVerifyNthEvent(8, DocEventType.RemoteChange);
Expand Down Expand Up @@ -621,7 +621,7 @@ describe.sequential('Client', function () {
c2.sync();

// In push-only mode, remote-change events should not occur.
c2.changeSyncMode(d2, SyncMode.RealtimePushOnly);
await c2.changeSyncMode(d2, SyncMode.RealtimePushOnly);
let remoteChangeOccured = false;
const unsub3 = d2.subscribe((event) => {
if (event.type === DocEventType.RemoteChange) {
Expand All @@ -635,7 +635,7 @@ describe.sequential('Client', function () {
unsub3();
assert.isFalse(remoteChangeOccured);

c2.changeSyncMode(d2, SyncMode.Realtime);
await c2.changeSyncMode(d2, SyncMode.Realtime);

d2.update((root) => {
root.tree.edit(2, 2, { type: 'text', value: 'b' });
Expand Down Expand Up @@ -704,7 +704,7 @@ describe.sequential('Client', function () {
c2.sync();

// In sync-off mode, remote-change events should not occur.
c2.changeSyncMode(d2, SyncMode.RealtimeSyncOff);
await c2.changeSyncMode(d2, SyncMode.RealtimeSyncOff);
let remoteChangeOccured = false;
const unsub3 = d2.subscribe((event) => {
if (event.type === DocEventType.RemoteChange) {
Expand All @@ -718,7 +718,7 @@ describe.sequential('Client', function () {
unsub3();
assert.isFalse(remoteChangeOccured);

c2.changeSyncMode(d2, SyncMode.Realtime);
await c2.changeSyncMode(d2, SyncMode.Realtime);

d2.update((root) => {
root.tree.edit(2, 2, { type: 'text', value: 'b' });
Expand All @@ -745,8 +745,6 @@ describe.sequential('Client', function () {
const docKey = toDocKey(`${task.name}-${new Date().getTime()}`);
const d1 = new yorkie.Document<{ t: Text }>(docKey);
const d2 = new yorkie.Document<{ t: Text }>(docKey);
await c1.attach(d1);
await c2.attach(d2);

const eventCollectorD1 = new EventCollector();
const eventCollectorD2 = new EventCollector();
Expand All @@ -757,6 +755,10 @@ describe.sequential('Client', function () {
eventCollectorD2.add(event.value);
});

await c1.attach(d1);
await c2.attach(d2);
await eventCollectorD1.waitAndVerifyNthEvent(1, DocumentSyncStatus.Synced);

d1.update((root) => {
root.t = new Text();
root.t.edit(0, 0, 'a');
Expand All @@ -767,7 +769,7 @@ describe.sequential('Client', function () {
assert.equal(d2.getRoot().t.toString(), 'a');

eventCollectorD1.reset();
c1.changeSyncMode(d1, SyncMode.RealtimePushOnly);
await c1.changeSyncMode(d1, SyncMode.RealtimePushOnly);
d2.update((root) => {
root.t.edit(1, 1, 'b');
});
Expand All @@ -778,7 +780,7 @@ describe.sequential('Client', function () {
await eventCollectorD2.waitAndVerifyNthEvent(3, DocumentSyncStatus.Synced);

assert.equal(eventCollectorD1.getLength(), 0);
c1.changeSyncMode(d1, SyncMode.Realtime);
await c1.changeSyncMode(d1, SyncMode.Realtime);
await eventCollectorD1.waitAndVerifyNthEvent(1, DocumentSyncStatus.Synced);

assert.equal(d1.getRoot().t.toString(), 'abc');
Expand Down
11 changes: 11 additions & 0 deletions packages/sdk/test/integration/presence_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,23 @@ describe('Presence', function () {
const unsub1 = doc1.subscribe('presence', ({ type, value }) =>
events1.add({ type, value }),
);
assert.deepEqual(
deepSort(doc1.getPresences()),
deepSort([{ clientID: c1ID, presence: { name: 'a' } }]),
);

const doc2 = new yorkie.Document<{}, { name: string }>(docKey);
await c2.attach(doc2, { initialPresence: { name: 'b' } });
const unsub2 = doc2.subscribe('presence', ({ type, value }) =>
events2.add({ type, value }),
);
assert.deepEqual(
deepSort(doc2.getPresences()),
deepSort([
{ clientID: c2ID, presence: { name: 'b' } },
{ clientID: c1ID, presence: { name: 'a' } },
]),
);

await events1.waitAndVerifyNthEvent(1, {
type: DocEventType.Watched,
Expand Down

0 comments on commit 9a6ab62

Please sign in to comment.