Skip to content

Commit

Permalink
Make sure JPA and JDBC Event Storage Engines respect Event Message type
Browse files Browse the repository at this point in the history
When appending an Event Message that does not implement DomainEventMessage, reading such message out of the event store should not turn it into a DomainEventMessage.

Fixes #1697
  • Loading branch information
abuijze committed Feb 8, 2021
1 parent cfb1c41 commit c61a95b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,11 +636,13 @@ protected TrackedEventData<?> getTrackedEventData(ResultSet resultSet,
GapAwareTrackingToken previousToken) throws SQLException {
long globalSequence = resultSet.getLong(schema.globalIndexColumn());

String aggregateIdentifier = resultSet.getString(schema.aggregateIdentifierColumn());
String eventIdentifier = resultSet.getString(schema.eventIdentifierColumn());
GenericDomainEventEntry<?> domainEvent = new GenericDomainEventEntry<>(
resultSet.getString(schema.typeColumn()),
resultSet.getString(schema.aggregateIdentifierColumn()),
eventIdentifier.equals(aggregateIdentifier) ? null : aggregateIdentifier,
resultSet.getLong(schema.sequenceNumberColumn()),
resultSet.getString(schema.eventIdentifierColumn()),
eventIdentifier,
readTimeStamp(resultSet, schema.timestampColumn()),
resultSet.getString(schema.payloadTypeColumn()),
resultSet.getString(schema.payloadRevisionColumn()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,11 @@ protected List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken l
GapAwareTrackingToken token = previousToken;
for (Object[] entry : entries) {
long globalSequence = (Long) entry[0];
String aggregateIdentifier = (String) entry[2];
String eventIdentifier = (String) entry[4];
GenericDomainEventEntry<?> domainEvent = new GenericDomainEventEntry<>(
(String) entry[1], (String) entry[2], (long) entry[3], (String) entry[4], entry[5],
(String) entry[1], eventIdentifier.equals(aggregateIdentifier) ? null : aggregateIdentifier,
(long) entry[3], eventIdentifier, entry[5],
(String) entry[6], (String) entry[7], entry[8], entry[9]
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ protected void testStoreAndLoadEvents() {
assertEquals(1, testSubject.readEvents("otherAggregate").asStream().count());
}

@Test
void testAppendAndReadNonDomainEvent() {
testSubject.appendEvents(new GenericEventMessage<>("Hello world"));

List<? extends TrackedEventMessage<?>> actual = testSubject.readEvents(null, false)
.collect(toList());
assertEquals(1, actual.size());
assertFalse(actual.get(0) instanceof DomainEventMessage);
}

@Test
void testStoreAndLoadEventsArray() {
testSubject.appendEvents(createEvent(0), createEvent(1));
Expand Down

0 comments on commit c61a95b

Please sign in to comment.