Skip to content

Commit

Permalink
Add position to events from stream, if returned by the server.
Browse files Browse the repository at this point in the history
- update types
- update and unify GRPC event converters
  • Loading branch information
George-Payne committed Jul 18, 2022
1 parent 9de9e2c commit 322b110
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {

import type { BaseOptions, PersistentSubscriptionToAll } from "../types";
import {
convertPersistentSubscriptionToAllGrpcEvent,
convertPersistentSubscriptionGrpcEvent,
debug,
UnsupportedError,
} from "../utils";
Expand Down Expand Up @@ -93,7 +93,7 @@ Client.prototype.subscribeToPersistentSubscriptionToAll = function (
return stream;
}
),
convertPersistentSubscriptionToAllGrpcEvent,
convertPersistentSubscriptionGrpcEvent,
duplexOptions
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type {
} from "../types";
import {
debug,
convertPersistentSubscriptionToStreamGrpcEvent,
convertPersistentSubscriptionGrpcEvent,
createStreamIdentifier,
} from "../utils";
import { Client } from "../Client";
Expand Down Expand Up @@ -108,7 +108,7 @@ Client.prototype.subscribeToPersistentSubscriptionToStream = function <
return stream;
}
),
convertPersistentSubscriptionToStreamGrpcEvent,
convertPersistentSubscriptionGrpcEvent,
duplexOptions
);
};
Expand Down
8 changes: 2 additions & 6 deletions src/streams/readAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type {
AllStreamResolvedEvent,
StreamingRead,
} from "../types";
import { debug, convertAllStreamGrpcEvent } from "../utils";
import { debug, convertGrpcEvent } from "../utils";
import { BACKWARDS, FORWARDS, START } from "../constants";
import { Client } from "../Client";

Expand Down Expand Up @@ -136,9 +136,5 @@ Client.prototype.readAll = function (
)
);

return new ReadStream(
createGRPCStream,
convertAllStreamGrpcEvent,
readableOptions
);
return new ReadStream(createGRPCStream, convertGrpcEvent, readableOptions);
};
4 changes: 2 additions & 2 deletions src/streams/subscribeToAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type {
BaseOptions,
Filter,
} from "../types";
import { convertAllStreamGrpcEvent, debug } from "../utils";
import { convertGrpcEvent, debug } from "../utils";
import { Client } from "../Client";
import { END, EVENT_TYPE, START, STREAM_NAME } from "../constants";

Expand Down Expand Up @@ -161,7 +161,7 @@ Client.prototype.subscribeToAll = function (

return new Subscription(
createGRPCStream,
convertAllStreamGrpcEvent,
convertGrpcEvent,
readableOptions,
filter?.checkpointReached
);
Expand Down
6 changes: 6 additions & 0 deletions src/types/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ export type RecordedEvent<E extends EventType = EventType> = E extends EventType
metadata: E["metadata"] extends MetadataType
? E["metadata"]
: MetadataType | undefined;

/**
* Position of this event in the transaction log.
* Only returned on server versions later than 22.6.0.
*/
position?: Position;
}
: never;

Expand Down
95 changes: 24 additions & 71 deletions src/utils/convertGrpcEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ import type { ReadResp as PersistentReadResp } from "../../generated/persistent_

import { debug } from "./debug";
import type {
AllStreamResolvedEvent,
EventType,
EventTypeToRecordedEvent,
LinkEvent,
PersistentSubscriptionToStreamResolvedEvent,
PersistentSubscriptionToAllResolvedEvent,
Position,
ResolvedEvent,
} from "../types";
Expand All @@ -25,10 +23,9 @@ export type GRPCRecordedEvent =

export type ConvertGrpcEvent<GRPCEvent, E> = (grpcEvent: GRPCEvent) => E;

export const convertGrpcEvent: ConvertGrpcEvent<
StreamsReadResp.ReadEvent,
ResolvedEvent
> = (grpcEvent) => {
export const convertGrpcEvent = <T extends ResolvedEvent>(
grpcEvent: StreamsReadResp.ReadEvent
): T => {
const resolved: ResolvedEvent = {};

if (grpcEvent.hasEvent()) {
Expand All @@ -43,13 +40,14 @@ export const convertGrpcEvent: ConvertGrpcEvent<
resolved.commitPosition = BigInt(grpcEvent.getCommitPosition()!);
}

return resolved;
return resolved as T;
};

export const convertPersistentSubscriptionToStreamGrpcEvent: ConvertGrpcEvent<
PersistentReadResp.ReadEvent,
PersistentSubscriptionToStreamResolvedEvent
> = (grpcEvent) => {
export const convertPersistentSubscriptionGrpcEvent = <
T extends PersistentSubscriptionToStreamResolvedEvent
>(
grpcEvent: PersistentReadResp.ReadEvent
): T => {
const resolved: PersistentSubscriptionToStreamResolvedEvent = {
retryCount: grpcEvent.hasRetryCount() ? grpcEvent.getRetryCount() : 0,
};
Expand All @@ -66,74 +64,25 @@ export const convertPersistentSubscriptionToStreamGrpcEvent: ConvertGrpcEvent<
resolved.commitPosition = BigInt(grpcEvent.getCommitPosition()!);
}

return resolved;
};

export const convertAllStreamGrpcEvent: ConvertGrpcEvent<
StreamsReadResp.ReadEvent,
AllStreamResolvedEvent
> = (grpcEvent) => {
const resolved: AllStreamResolvedEvent = {};

if (grpcEvent.hasEvent()) {
const event = grpcEvent.getEvent()!;
resolved.event = {
...convertGrpcRecord(event),
position: extractPosition(event),
};
}

if (grpcEvent.hasLink()) {
const link = grpcEvent.getLink()!;
resolved.link = {
...convertGrpcRecord<LinkEvent>(link),
position: extractPosition(link),
};
}

if (grpcEvent.hasCommitPosition()) {
resolved.commitPosition = BigInt(grpcEvent.getCommitPosition()!);
}

return resolved;
return resolved as T;
};

export const convertPersistentSubscriptionToAllGrpcEvent: ConvertGrpcEvent<
PersistentReadResp.ReadEvent,
PersistentSubscriptionToAllResolvedEvent
> = (grpcEvent) => {
const resolved: PersistentSubscriptionToAllResolvedEvent = {
retryCount: grpcEvent.hasRetryCount() ? grpcEvent.getRetryCount() : 0,
};

if (grpcEvent.hasEvent()) {
const event = grpcEvent.getEvent()!;
resolved.event = {
...convertGrpcRecord(event),
position: extractPosition(event),
};
}
const extractPosition = (
grpcRecord: GRPCRecordedEvent
): Position | undefined => {
const commit = grpcRecord.getCommitPosition();
const prepare = grpcRecord.getPreparePosition();

if (grpcEvent.hasLink()) {
const link = grpcEvent.getLink()!;
resolved.link = {
...convertGrpcRecord<LinkEvent>(link),
position: extractPosition(link),
if (commit != null && prepare != null) {
return {
commit: BigInt(commit),
prepare: BigInt(prepare),
};
}

if (grpcEvent.hasCommitPosition()) {
resolved.commitPosition = BigInt(grpcEvent.getCommitPosition()!);
}

return resolved;
return undefined;
};

const extractPosition = (grpcRecord: GRPCRecordedEvent): Position => ({
commit: BigInt(grpcRecord.getCommitPosition()),
prepare: BigInt(grpcRecord.getPreparePosition()),
});

const safeParseJSON = <T = unknown>(
str: string,
fallback: (str: string) => T,
Expand Down Expand Up @@ -183,6 +132,8 @@ export const convertGrpcRecord = <E extends EventType = EventType>(
const metadata: E["metadata"] = parseMetadata(grpcRecord, id);
const isJson = contentType === "application/json";

const position = extractPosition(grpcRecord);

if (isJson) {
const dataStr = Buffer.from(grpcRecord.getData()).toString("utf8");

Expand All @@ -201,6 +152,7 @@ export const convertGrpcRecord = <E extends EventType = EventType>(
metadata,
isJson,
created,
position,
} as EventTypeToRecordedEvent<E>;
}

Expand All @@ -215,5 +167,6 @@ export const convertGrpcRecord = <E extends EventType = EventType>(
metadata,
isJson,
created,
position,
} as EventTypeToRecordedEvent<E>;
};

0 comments on commit 322b110

Please sign in to comment.