Skip to content

Commit

Permalink
Refactor scalardb-test (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
brfrn169 authored Nov 15, 2024
1 parent a5cb40f commit f7864fd
Show file tree
Hide file tree
Showing 39 changed files with 96 additions and 926 deletions.
18 changes: 9 additions & 9 deletions scalardb-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,26 @@ repositories {
}
}

def awssdkVersion = "2.25.1"
def awssdkVersion = "2.29.1"
def dockerVersion = project.hasProperty('dockerVersion') ? project.dockerVersion : 'latest'
def scalarDbVersion = "4.0.0-SNAPSHOT"

dependencies {
implementation 'com.scalar-labs:kelpie:1.2.3'
implementation 'javax.json:javax.json-api:1.1.4'
implementation 'org.glassfish:javax.json:1.1.4'
implementation 'io.github.resilience4j:resilience4j-retry:1.7.1'
implementation 'com.scalar-labs:scalardb:4.0.0-SNAPSHOT'
implementation "com.scalar-labs:scalardb:${scalarDbVersion}"
implementation 'com.google.guava:guava:32.1.3-jre'
implementation 'com.azure:azure-cosmos:4.56.0'
implementation 'com.azure:azure-cosmos:4.64.0'
implementation "software.amazon.awssdk:dynamodb:${awssdkVersion}"
implementation "software.amazon.awssdk:core:${awssdkVersion}"
implementation 'org.apache.commons:commons-dbcp2:2.12.0'
implementation 'com.scalar-labs:scalardb-sql-direct-mode:4.0.0-SNAPSHOT'
implementation 'com.scalar-labs:scalardb-sql-server-mode:4.0.0-SNAPSHOT'
implementation 'com.scalar-labs:scalardb-sql-jdbc:4.0.0-SNAPSHOT'
implementation 'com.scalar-labs:scalardb-graphql:4.0.0-SNAPSHOT'
implementation 'com.scalar-labs:scalardb-cluster-java-client-sdk:4.0.0-SNAPSHOT'
implementation 'com.graphql-java:graphql-java:20.7'
implementation "com.scalar-labs:scalardb-sql-direct-mode:${scalarDbVersion}"
implementation "com.scalar-labs:scalardb-sql-jdbc:${scalarDbVersion}"
implementation "com.scalar-labs:scalardb-graphql:${scalarDbVersion}"
implementation "com.scalar-labs:scalardb-cluster-java-client-sdk:${scalarDbVersion}"
implementation 'com.graphql-java:graphql-java:20.9'
implementation 'com.squareup.okhttp3:okhttp:4.12.0'
}

Expand Down
17 changes: 0 additions & 17 deletions scalardb-test/schema/multiple-clustering-key.json

This file was deleted.

12 changes: 0 additions & 12 deletions scalardb-test/schema/no-clustering-key.json

This file was deleted.

15 changes: 0 additions & 15 deletions scalardb-test/schema/single-clustering-key.json

This file was deleted.

25 changes: 10 additions & 15 deletions scalardb-test/src/main/java/kelpie/scalardb/Common.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.slf4j.LoggerFactory;

public class Common {
private static final Logger LOGGER = LoggerFactory.getLogger(Common.class);
private static final Logger logger = LoggerFactory.getLogger(Common.class);

private static final int WAIT_MILLS = 1000;
private static final long SLEEP_BASE_MILLIS = 100L;
Expand All @@ -36,31 +36,26 @@ public static DistributedStorage getStorage(Config config) {
return factory.getStorage();
}

public static DistributedTransactionManager getTransactionManager(
Config config, String keyspace, String table) {
public static DistributedTransactionManager getTransactionManager(Config config) {
DatabaseConfig dbConfig = getDatabaseConfig(config);
TransactionFactory factory = new TransactionFactory(dbConfig);
DistributedTransactionManager manager = factory.getTransactionManager();
manager.with(keyspace, table);
return manager;
return factory.getTransactionManager();
}

public static TwoPhaseCommitTransactionManager getTwoPhaseCommitTransactionManager1(
Config config, String keyspace, String table) {
return getTwoPhaseCommitTransactionManager(getDatabaseConfig1(config), keyspace, table);
Config config) {
return getTwoPhaseCommitTransactionManager(getDatabaseConfig1(config));
}

public static TwoPhaseCommitTransactionManager getTwoPhaseCommitTransactionManager2(
Config config, String keyspace, String table) {
return getTwoPhaseCommitTransactionManager(getDatabaseConfig2(config), keyspace, table);
Config config) {
return getTwoPhaseCommitTransactionManager(getDatabaseConfig2(config));
}

private static TwoPhaseCommitTransactionManager getTwoPhaseCommitTransactionManager(
Properties properties, String keyspace, String table) {
Properties properties) {
TransactionFactory factory = TransactionFactory.create(properties);
TwoPhaseCommitTransactionManager manager = factory.getTwoPhaseCommitTransactionManager();
manager.with(keyspace, table);
return manager;
return factory.getTwoPhaseCommitTransactionManager();
}

public static DatabaseConfig getDatabaseConfig(Config config) {
Expand All @@ -74,7 +69,7 @@ public static DatabaseConfig getDatabaseConfig(Config config) {
try {
return new DatabaseConfig(new File(configFile));
} catch (IOException e) {
LOGGER.warn("failed to load the specified config file: " + configFile, e);
logger.warn("failed to load the specified config file: {}", configFile, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class SensorChecker extends PostProcessor {

public SensorChecker(Config config) {
super(config);
this.manager = SensorCommon.getTransactionManager(config);
this.manager = Common.getTransactionManager(config);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,62 +1,61 @@
package kelpie.scalardb.sensor;

import com.scalar.db.api.Consistency;
import com.scalar.db.api.DistributedTransactionManager;
import com.scalar.db.api.Put;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.io.IntValue;
import com.scalar.db.transaction.consensuscommit.TransactionResult;
import com.scalar.db.io.Key;
import com.scalar.kelpie.config.Config;
import java.util.HashSet;
import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.IntStream;
import kelpie.scalardb.Common;

public class SensorCommon {
private static final String KEYSPACE = "sensor";
private static final String NAMESPACE = "sensor";
private static final String TABLE = "tx_sensor";
private static final String TIMESTAMP = "timestamp";
private static final String DEVICE_ID = "device_id";
private static final String REVISION = "revision";

public static DistributedTransactionManager getTransactionManager(Config config) {
return Common.getTransactionManager(config, KEYSPACE, TABLE);
}

public static Scan prepareScan(int timestamp) {
Key partitionKey = new Key(new IntValue(TIMESTAMP, timestamp));

return new Scan(partitionKey)
.withOrdering(new Scan.Ordering(DEVICE_ID, Scan.Ordering.Order.ASC))
.withConsistency(Consistency.LINEARIZABLE);
Key partitionKey = Key.ofInt(TIMESTAMP, timestamp);
return Scan.newBuilder()
.namespace(NAMESPACE)
.table(TABLE)
.partitionKey(partitionKey)
.ordering(Scan.Ordering.asc(DEVICE_ID))
.consistency(Consistency.LINEARIZABLE)
.build();
}

public static Put preparePut(int timestamp, int deviceId, int revision) {
Key partitionKey = new Key(new IntValue(TIMESTAMP, timestamp));
Key clusteringKey = new Key(new IntValue(DEVICE_ID, deviceId));
return new Put(partitionKey, clusteringKey)
.withConsistency(Consistency.LINEARIZABLE)
.withValue(new IntValue(REVISION, revision));
Key partitionKey = Key.ofInt(TIMESTAMP, timestamp);
Key clusteringKey = Key.ofInt(DEVICE_ID, deviceId);
return Put.newBuilder()
.namespace(NAMESPACE)
.table(TABLE)
.partitionKey(partitionKey)
.clusteringKey(clusteringKey)
.consistency(Consistency.LINEARIZABLE)
.intValue(REVISION, revision)
.build();
}

public static boolean hasDuplicatedRevision(List<Result> results) {
IntStream revisions = results.stream().mapToInt(r -> getRevisionFromResult(r));
IntStream revisions = results.stream().mapToInt(SensorCommon::getRevisionFromResult);

Set<Integer> tempSet = new HashSet<>();
return revisions.anyMatch(rev -> !tempSet.add(rev));
}

public static int getMaxRevision(List<Result> results) {
OptionalInt maxRevision = results.stream().mapToInt(r -> getRevisionFromResult(r)).max();
OptionalInt maxRevision = results.stream().mapToInt(SensorCommon::getRevisionFromResult).max();

return maxRevision.orElse(0);
}

private static int getRevisionFromResult(Result result) {
return ((IntValue) result.getValue(REVISION).get()).get();
return result.getInt(REVISION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.json.Json;
import kelpie.scalardb.Common;

public class SensorProcessor extends TimeBasedProcessor {
private final DistributedTransactionManager manager;
Expand All @@ -23,7 +24,7 @@ public class SensorProcessor extends TimeBasedProcessor {

public SensorProcessor(Config config) {
super(config);
this.manager = SensorCommon.getTransactionManager(config);
this.manager = Common.getTransactionManager(config);
this.numDevices = (int) config.getUserLong("test_config", "num_devices");
this.isVerification =
new AtomicBoolean(config.getUserBoolean("test_config", "is_verification", false));
Expand Down

This file was deleted.

Loading

0 comments on commit f7864fd

Please sign in to comment.