Skip to content

Commit

Permalink
Add axon-cassandra implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
lion7 committed Sep 30, 2016
1 parent 9486389 commit ac4b76e
Show file tree
Hide file tree
Showing 11 changed files with 871 additions and 12 deletions.
21 changes: 9 additions & 12 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
*.class

# Mobile Tools for Java (J2ME)
.mtj.tmp/

# Package Files #
*.jar
*.war
*.ear

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
*.iml
*.ipr
*.iws
target/
.idea/
.classpath
.project
.settings/
.idea
121 changes: 121 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2010-2014. Axon Framework
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.axonframework</groupId>
<artifactId>axon</artifactId>
<version>3.0-M3</version>
</parent>

<artifactId>axon-cassandra</artifactId>

<name>Axon Cassandra Components</name>
<description>This module contains components that integrate with Apache Cassandra.</description>

<packaging>bundle</packaging>

<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>2.4.0</version>
<extensions>true</extensions>
<configuration>
<instructions>
<Bundle-SymbolicName>${project.groupId}.${project.artifactId}</Bundle-SymbolicName>
<Bundle-Name>${project.artifactId}</Bundle-Name>
<Bundle-Description>${project.description}</Bundle-Description>
<Bundle-Version>${project.version}</Bundle-Version>
<Export-Package>
org.axonframework.integration.*
</Export-Package>
<Import-Package>
org.slf4j.*,
org.axonframework.*,
*;resolution:=optional
</Import-Package>
</instructions>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<!-- Axon dependencies -->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- Cassandra dependencies -->
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.1.0</version>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>axon-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hibernate.javax.persistence</groupId>
<artifactId>hibernate-jpa-2.1-api</artifactId>
<version>1.0.0.Final</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.eaio.uuid</groupId>
<artifactId>uuid</artifactId>
<version>3.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.axonframework.cassandra.eventsourcing.eventstore;

import org.axonframework.eventsourcing.eventstore.jdbc.EventSchema;

/**
* Created by gle21221 on 13-9-2016.
*/
public class CassandraEventSchema extends EventSchema {

private CassandraEventSchema() {
}

public static Builder builder() {
return new Builder();
}

public static class Builder extends EventSchema.Builder {
public Builder() {
withPayloadColumn("payloadBuffer").withMetaDataColumn("metaDataBuffer");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.axonframework.cassandra.eventsourcing.eventstore;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Session;
import org.axonframework.common.jdbc.PersistenceExceptionResolver;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.eventstore.EventUtils;
import org.axonframework.eventsourcing.eventstore.jdbc.EventSchema;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

/**
* Created by gle21221 on 2-9-2016.
*/
public class CassandraEventStorageEngine extends CassandraReadOnlyEventStorageEngine {

private static final String EVENTS_COUNTER_NAME = "events";
private final String batchKey = this + "_BATCH";

private final AtomicLong globalIndexCounter;

public CassandraEventStorageEngine(Serializer serializer, EventUpcasterChain upcasterChain, PersistenceExceptionResolver persistenceExceptionResolver, TransactionManager transactionManager, Integer batchSize, Session session, EventSchema schema) {
super(serializer, upcasterChain, persistenceExceptionResolver, transactionManager, batchSize, session, schema);
this.globalIndexCounter = new AtomicLong(selectCounter(EVENTS_COUNTER_NAME));
}

private static DomainEventEntry asDomainEventEntry(DomainEventMessage<?> eventMessage, Serializer serializer, long globalIndex) {
return new DomainEventEntry(globalIndex, eventMessage, serializer);
}

private static SnapshotEventEntry asSnapshotEventEntry(DomainEventMessage<?> eventMessage, Serializer serializer) {
return new SnapshotEventEntry(eventMessage, serializer);
}

@Override
protected void appendEvents(List<? extends EventMessage<?>> events, Serializer serializer) {
events.stream()
.map(EventUtils::asDomainEventMessage)
.map(e -> asDomainEventEntry(e, serializer, globalIndexCounter.getAndIncrement()))
.map(this::storeEventLogEntry)
.map(eventMapper::saveQuery)
.forEachOrdered(batch()::add);
}

@Override
protected void storeSnapshot(DomainEventMessage<?> snapshot, Serializer serializer) {
batch().add(snapshotMapper.saveQuery(asSnapshotEventEntry(snapshot, serializer)));
}

private DomainEventEntry storeEventLogEntry(DomainEventEntry event) {
EventLogEntry eventLogEntry = new EventLogEntry(event.getGlobalIndex(), event.getAggregateIdentifier(), event.getSequenceNumber());
batch().add(eventLogMapper.saveQuery(eventLogEntry));
return event;
}

private BatchStatement batch() {
UnitOfWork<?> root = CurrentUnitOfWork.get().root();
return root.getOrComputeResource(batchKey, s -> {
BatchStatement batch = new BatchStatement();
root.onCommit(unitOfWork -> session.execute(batch));
return batch;
});
}

private long selectCounter(String name) {
ResultSet resultSet = session.execute("SELECT " + quoted("value") +
" FROM" + quoted("Counters") +
" WHERE " + quoted("name") + " = ? LIMIT 1", name);
return Optional.ofNullable(resultSet.one()).map(row -> row.getLong(0)).orElse(0L);
}

}
Loading

0 comments on commit ac4b76e

Please sign in to comment.