Skip to content

Commit

Permalink
feat: add publishAll method for synchronous event dispatching
Browse files Browse the repository at this point in the history
  • Loading branch information
sgomez committed Jul 25, 2021
1 parent fbdbcdc commit 9b5a865
Showing 1 changed file with 56 additions and 15 deletions.
71 changes: 56 additions & 15 deletions src/eventstore.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import {
AppendExpectedRevision,
END,
ErrorType,
EventData,
EventStoreDBClient,
FORWARDS,
jsonEvent,
Expand Down Expand Up @@ -37,28 +39,44 @@ export class EventStore
this.client = EventStoreDBClient.connectionString(config.connection);
}

async publish<T extends Event>(event: T) {
const streamName = `${this.category}-${event.aggregateId}`;
const expectedRevision =
event.version <= 0 ? NO_STREAM : BigInt(event.version - 1);
async publishAll<T extends Event>(events: T[]) {
events = [...events];

if (event.aggregateEncrypted) {
event = (await this.keyService.encryptEvent(event)) as T;
if (events.length === 0) {
return;
}

const eventData = jsonEvent({
id: uuid(),
type: event.eventType,
data: event.payload as JSONType,
metadata: event.metadata,
});
const streamName = this.getStreamName(events[0]);
const expectedRevision = this.getExpectedRevision(events[0]);

const eventsData = [];
for (const event of events) {
const eventData = await this.createEventData(event);

eventsData.push(eventData);
}

try {
this.client.appendToStream(streamName, eventsData, {
expectedRevision: expectedRevision,
});
} catch (e) {
this.logger.error(`Error publishing all events: ${e.message}`);
}
}

async publish<T extends Event>(event: T) {
const streamName = this.getStreamName(event);
const expectedRevision = this.getExpectedRevision(event);

const eventData = await this.createEventData(event);

try {
await this.client.appendToStream(streamName, eventData, {
expectedRevision: expectedRevision,
});
} catch (e) {
console.debug('error', e);
this.logger.error(`Error publishing event: ${e.message}`);
}
}

Expand Down Expand Up @@ -89,7 +107,7 @@ export class EventStore
return null;
}

this.logger.debug(error);
this.logger.error(error);
}

return null;
Expand All @@ -112,7 +130,30 @@ export class EventStore
})
.on('data', onEvent);
} catch (error) {
this.logger.debug(error);
this.logger.error(error);
}
}

private getStreamName<T extends Event>(event: T) {
return `${this.category}-${event.aggregateId}`;
}

private getExpectedRevision<T extends Event>(
event: T,
): AppendExpectedRevision {
return event.version <= 0 ? NO_STREAM : BigInt(event.version - 1);
}

private async createEventData<T extends Event>(event: T): Promise<EventData> {
if (event.aggregateEncrypted) {
event = (await this.keyService.encryptEvent(event)) as T;
}

return jsonEvent({
id: uuid(),
type: event.eventType,
data: event.payload as JSONType,
metadata: event.metadata,
});
}
}

0 comments on commit 9b5a865

Please sign in to comment.