Skip to content

Commit

Permalink
Prevent multiple appends within the same event loop creating many bat…
Browse files Browse the repository at this point in the history
…ches.

- immediately cache the promise and await the promise on subsequent appends
- rework backpressured write to work across multiple calls
- add test
  • Loading branch information
George-Payne committed Aug 10, 2022
1 parent 2fe617a commit 088bc45
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 11 deletions.
9 changes: 6 additions & 3 deletions src/Client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,16 +354,19 @@ export class Client {
Client: GRPCClientConstructor<Client>,
debugName: string,
creator: (client: Client) => T | Promise<T>,
cache?: WeakMap<Client, T>
cache?: WeakMap<Client, T | Promise<T>>
) =>
async (): Promise<T> => {
const client = await this.getGRPCClient(Client, debugName);

if (cache && cache.has(client)) return cache.get(client)!;

const stream = await creator(client);
const streamPromise = creator(client);

cache?.set(client, streamPromise);

const stream = await streamPromise;

cache?.set(client, stream);
this.disposableStreams.add(stream);

finished(stream, (err) => {
Expand Down
50 changes: 50 additions & 0 deletions src/__test__/streams/appendToStream-batch-append-flood.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/** @jest-environment ./src/__test__/utils/enableVersionCheck.ts */
import type { Duplex } from "stream";

import {
createTestNode,
matchServerVersion,
optionalDescribe,
} from "@test-utils";
import { EventStoreDBClient, jsonEvent } from "@eventstore/db-client";

describe("appendToStream - batch append - flood", () => {
const supported = matchServerVersion`>=21.10`;

const node = createTestNode();
let client!: EventStoreDBClient;

beforeAll(async () => {
await node.up();
client = new EventStoreDBClient(
{ endpoint: node.uri },
{ rootCertificate: node.rootCertificate }
);
});

afterAll(async () => {
await node.down();
});

optionalDescribe(supported)("Supported (>=21.10)", () => {
test("Can handle multiple appends within the same event loop", async () => {
const streamName = "batchFlood";
const numberOfEvents = 10_000;
const value = "A".repeat(904);

const oneKiloByteEvent = () =>
jsonEvent({
type: "AnyEventType",
data: { value },
});

const requests = [];
for (let i = 0; i < numberOfEvents; i++) {
requests.push(client.appendToStream(streamName, oneKiloByteEvent()));
}
await Promise.all(requests);

await client.dispose();
});
});
});
2 changes: 1 addition & 1 deletion src/streams/appendToStream/batchAppend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import type { AppendToStreamOptions } from ".";

const streamCache = new WeakMap<
StreamsClient,
ReturnType<StreamsClient["batchAppend"]>
Promise<ReturnType<StreamsClient["batchAppend"]>>
>();

const promiseBank = new Map<
Expand Down
75 changes: 68 additions & 7 deletions src/utils/backpressuredWrite.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,72 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import type { ClientWritableStream } from "@grpc/grpc-js";

export const backpressuredWrite = <T>(
const cache = new WeakMap<ClientWritableStream<any>, BackpressureQueue<any>>();

interface QueueItem<T> {
data: T;
resolve: () => void;
reject: (err: Error) => void;
}

class BackpressureQueue<T> {
private stream: ClientWritableStream<T>;
private queue: QueueItem<T>[] = [];
private writing = false;

constructor(stream: ClientWritableStream<T>) {
this.stream = stream;
this.stream.once("error", this.errorOut);
}

write = async (data: T) =>
new Promise<void>((resolve, reject) => {
this.queue.push({ data, resolve, reject });
this.triggerWrite();
});

private triggerWrite = async () => {
if (this.writing) return;

this.writing = true;

while (this.queue.length) {
const { data, resolve } = this.queue.shift()!;

const written = this.stream.write(data);

if (!written) {
await new Promise<void>((r) => this.stream.once("drain", r));
}

resolve();
}

this.cleanUp();
};

private errorOut = (err: Error) => {
this.cleanUp();

while (this.queue.length) {
const { reject } = this.queue.shift()!;
reject(err);
}
};

private cleanUp = () => {
this.stream.removeListener("error", this.errorOut);
cache.delete(this.stream);
};
}

export const backpressuredWrite = async <T>(
stream: ClientWritableStream<T>,
data: T
) =>
new Promise<void>((resolve) => {
const written = stream.write(data);
if (written) return resolve();
stream.once("drain", resolve);
});
) => {
if (!cache.has(stream)) {
cache.set(stream, new BackpressureQueue<T>(stream));
}

await cache.get(stream)!.write(data);
};

0 comments on commit 088bc45

Please sign in to comment.