diff --git a/scalardb-test/build.gradle b/scalardb-test/build.gradle index 44c9c84..2041924 100644 --- a/scalardb-test/build.gradle +++ b/scalardb-test/build.gradle @@ -25,26 +25,26 @@ repositories { } } -def awssdkVersion = "2.25.1" +def awssdkVersion = "2.29.1" def dockerVersion = project.hasProperty('dockerVersion') ? project.dockerVersion : 'latest' +def scalarDbVersion = "4.0.0-SNAPSHOT" dependencies { implementation 'com.scalar-labs:kelpie:1.2.3' implementation 'javax.json:javax.json-api:1.1.4' implementation 'org.glassfish:javax.json:1.1.4' implementation 'io.github.resilience4j:resilience4j-retry:1.7.1' - implementation 'com.scalar-labs:scalardb:4.0.0-SNAPSHOT' + implementation "com.scalar-labs:scalardb:${scalarDbVersion}" implementation 'com.google.guava:guava:32.1.3-jre' - implementation 'com.azure:azure-cosmos:4.56.0' + implementation 'com.azure:azure-cosmos:4.64.0' implementation "software.amazon.awssdk:dynamodb:${awssdkVersion}" implementation "software.amazon.awssdk:core:${awssdkVersion}" implementation 'org.apache.commons:commons-dbcp2:2.12.0' - implementation 'com.scalar-labs:scalardb-sql-direct-mode:4.0.0-SNAPSHOT' - implementation 'com.scalar-labs:scalardb-sql-server-mode:4.0.0-SNAPSHOT' - implementation 'com.scalar-labs:scalardb-sql-jdbc:4.0.0-SNAPSHOT' - implementation 'com.scalar-labs:scalardb-graphql:4.0.0-SNAPSHOT' - implementation 'com.scalar-labs:scalardb-cluster-java-client-sdk:4.0.0-SNAPSHOT' - implementation 'com.graphql-java:graphql-java:20.7' + implementation "com.scalar-labs:scalardb-sql-direct-mode:${scalarDbVersion}" + implementation "com.scalar-labs:scalardb-sql-jdbc:${scalarDbVersion}" + implementation "com.scalar-labs:scalardb-graphql:${scalarDbVersion}" + implementation "com.scalar-labs:scalardb-cluster-java-client-sdk:${scalarDbVersion}" + implementation 'com.graphql-java:graphql-java:20.9' implementation 'com.squareup.okhttp3:okhttp:4.12.0' } diff --git a/scalardb-test/schema/multiple-clustering-key.json b/scalardb-test/schema/multiple-clustering-key.json deleted file mode 100644 index da5f032..0000000 --- a/scalardb-test/schema/multiple-clustering-key.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "storage.multiple-clustering-key": { - "partition-key": [ - "pkey" - ], - "clustering-key": [ - "ckey1", - "ckey2" - ], - "columns": { - "pkey": "INT", - "ckey1": "INT", - "ckey2": "INT", - "col": "INT" - } - } -} diff --git a/scalardb-test/schema/no-clustering-key.json b/scalardb-test/schema/no-clustering-key.json deleted file mode 100644 index ba89f57..0000000 --- a/scalardb-test/schema/no-clustering-key.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "storage.no-clustering-key": { - "partition-key": [ - "pkey" - ], - "clustering-key": [], - "columns": { - "pkey": "INT", - "col": "INT" - } - } -} diff --git a/scalardb-test/schema/single-clustering-key.json b/scalardb-test/schema/single-clustering-key.json deleted file mode 100644 index 235a10c..0000000 --- a/scalardb-test/schema/single-clustering-key.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "storage.single-clustering-key": { - "partition-key": [ - "pkey" - ], - "clustering-key": [ - "ckey" - ], - "columns": { - "pkey": "INT", - "ckey": "INT", - "col": "INT" - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/Common.java b/scalardb-test/src/main/java/kelpie/scalardb/Common.java index f735b98..de50dcd 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/Common.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/Common.java @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory; public class Common { - private static final Logger LOGGER = LoggerFactory.getLogger(Common.class); + private static final Logger logger = LoggerFactory.getLogger(Common.class); private static final int WAIT_MILLS = 1000; private static final long SLEEP_BASE_MILLIS = 100L; @@ -36,31 +36,26 @@ public static DistributedStorage getStorage(Config config) { return factory.getStorage(); } - public static DistributedTransactionManager getTransactionManager( - Config config, String keyspace, String table) { + public static DistributedTransactionManager getTransactionManager(Config config) { DatabaseConfig dbConfig = getDatabaseConfig(config); TransactionFactory factory = new TransactionFactory(dbConfig); - DistributedTransactionManager manager = factory.getTransactionManager(); - manager.with(keyspace, table); - return manager; + return factory.getTransactionManager(); } public static TwoPhaseCommitTransactionManager getTwoPhaseCommitTransactionManager1( - Config config, String keyspace, String table) { - return getTwoPhaseCommitTransactionManager(getDatabaseConfig1(config), keyspace, table); + Config config) { + return getTwoPhaseCommitTransactionManager(getDatabaseConfig1(config)); } public static TwoPhaseCommitTransactionManager getTwoPhaseCommitTransactionManager2( - Config config, String keyspace, String table) { - return getTwoPhaseCommitTransactionManager(getDatabaseConfig2(config), keyspace, table); + Config config) { + return getTwoPhaseCommitTransactionManager(getDatabaseConfig2(config)); } private static TwoPhaseCommitTransactionManager getTwoPhaseCommitTransactionManager( - Properties properties, String keyspace, String table) { + Properties properties) { TransactionFactory factory = TransactionFactory.create(properties); - TwoPhaseCommitTransactionManager manager = factory.getTwoPhaseCommitTransactionManager(); - manager.with(keyspace, table); - return manager; + return factory.getTwoPhaseCommitTransactionManager(); } public static DatabaseConfig getDatabaseConfig(Config config) { @@ -74,7 +69,7 @@ public static DatabaseConfig getDatabaseConfig(Config config) { try { return new DatabaseConfig(new File(configFile)); } catch (IOException e) { - LOGGER.warn("failed to load the specified config file: " + configFile, e); + logger.warn("failed to load the specified config file: {}", configFile, e); } } diff --git a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorChecker.java b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorChecker.java index 39b3f22..5b86686 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorChecker.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorChecker.java @@ -20,7 +20,7 @@ public class SensorChecker extends PostProcessor { public SensorChecker(Config config) { super(config); - this.manager = SensorCommon.getTransactionManager(config); + this.manager = Common.getTransactionManager(config); } @Override diff --git a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorCommon.java b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorCommon.java index 0ed9c81..f4b98c1 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorCommon.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorCommon.java @@ -1,62 +1,61 @@ package kelpie.scalardb.sensor; import com.scalar.db.api.Consistency; -import com.scalar.db.api.DistributedTransactionManager; import com.scalar.db.api.Put; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; -import com.scalar.db.io.IntValue; -import com.scalar.db.transaction.consensuscommit.TransactionResult; import com.scalar.db.io.Key; -import com.scalar.kelpie.config.Config; import java.util.HashSet; import java.util.List; import java.util.OptionalInt; import java.util.Set; import java.util.stream.IntStream; -import kelpie.scalardb.Common; public class SensorCommon { - private static final String KEYSPACE = "sensor"; + private static final String NAMESPACE = "sensor"; private static final String TABLE = "tx_sensor"; private static final String TIMESTAMP = "timestamp"; private static final String DEVICE_ID = "device_id"; private static final String REVISION = "revision"; - public static DistributedTransactionManager getTransactionManager(Config config) { - return Common.getTransactionManager(config, KEYSPACE, TABLE); - } - public static Scan prepareScan(int timestamp) { - Key partitionKey = new Key(new IntValue(TIMESTAMP, timestamp)); - - return new Scan(partitionKey) - .withOrdering(new Scan.Ordering(DEVICE_ID, Scan.Ordering.Order.ASC)) - .withConsistency(Consistency.LINEARIZABLE); + Key partitionKey = Key.ofInt(TIMESTAMP, timestamp); + return Scan.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(partitionKey) + .ordering(Scan.Ordering.asc(DEVICE_ID)) + .consistency(Consistency.LINEARIZABLE) + .build(); } public static Put preparePut(int timestamp, int deviceId, int revision) { - Key partitionKey = new Key(new IntValue(TIMESTAMP, timestamp)); - Key clusteringKey = new Key(new IntValue(DEVICE_ID, deviceId)); - return new Put(partitionKey, clusteringKey) - .withConsistency(Consistency.LINEARIZABLE) - .withValue(new IntValue(REVISION, revision)); + Key partitionKey = Key.ofInt(TIMESTAMP, timestamp); + Key clusteringKey = Key.ofInt(DEVICE_ID, deviceId); + return Put.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .consistency(Consistency.LINEARIZABLE) + .intValue(REVISION, revision) + .build(); } public static boolean hasDuplicatedRevision(List results) { - IntStream revisions = results.stream().mapToInt(r -> getRevisionFromResult(r)); + IntStream revisions = results.stream().mapToInt(SensorCommon::getRevisionFromResult); Set tempSet = new HashSet<>(); return revisions.anyMatch(rev -> !tempSet.add(rev)); } public static int getMaxRevision(List results) { - OptionalInt maxRevision = results.stream().mapToInt(r -> getRevisionFromResult(r)).max(); + OptionalInt maxRevision = results.stream().mapToInt(SensorCommon::getRevisionFromResult).max(); return maxRevision.orElse(0); } private static int getRevisionFromResult(Result result) { - return ((IntValue) result.getValue(REVISION).get()).get(); + return result.getInt(REVISION); } } diff --git a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorProcessor.java index 6c31cd4..ad96d92 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/sensor/SensorProcessor.java @@ -14,6 +14,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import javax.json.Json; +import kelpie.scalardb.Common; public class SensorProcessor extends TimeBasedProcessor { private final DistributedTransactionManager manager; @@ -23,7 +24,7 @@ public class SensorProcessor extends TimeBasedProcessor { public SensorProcessor(Config config) { super(config); - this.manager = SensorCommon.getTransactionManager(config); + this.manager = Common.getTransactionManager(config); this.numDevices = (int) config.getUserLong("test_config", "num_devices"); this.isVerification = new AtomicBoolean(config.getUserBoolean("test_config", "is_verification", false)); diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeyPreparer.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeyPreparer.java deleted file mode 100644 index bea3484..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeyPreparer.java +++ /dev/null @@ -1,112 +0,0 @@ -package kelpie.scalardb.storage; - -import static kelpie.scalardb.storage.MultipleClusteringKeySchema.NUM_CLUSTERING_KEY1; -import static kelpie.scalardb.storage.MultipleClusteringKeySchema.NUM_CLUSTERING_KEY2; -import static kelpie.scalardb.storage.MultipleClusteringKeySchema.preparePut; -import static kelpie.scalardb.storage.StorageCommon.DEFAULT_POPULATION_CONCURRENCY; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.Put; -import com.scalar.kelpie.config.Config; -import com.scalar.kelpie.modules.PreProcessor; -import io.github.resilience4j.retry.Retry; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.IntStream; -import kelpie.scalardb.Common; - -public class MultipleClusteringKeyPreparer extends PreProcessor { - - private final int BATCH_SIZE = 20; - - private final DistributedStorage storage; - - public MultipleClusteringKeyPreparer(Config config) { - super(config); - storage = Common.getStorage(config); - } - - @Override - public void execute() { - logInfo("insert initial values... "); - - int concurrency = - (int) - config.getUserLong( - "test_config", "population_concurrency", DEFAULT_POPULATION_CONCURRENCY); - ExecutorService es = Executors.newCachedThreadPool(); - List> futures = new ArrayList<>(); - IntStream.range(0, concurrency) - .forEach( - i -> { - CompletableFuture future = - CompletableFuture.runAsync(() -> new PopulationRunner(i).run(), es); - futures.add(future); - }); - - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - logInfo("all records have been inserted"); - } - - @Override - public void close() { - try { - storage.close(); - } catch (Exception e) { - logWarn("Failed to close the storage", e); - } - } - - private class PopulationRunner { - private final int id; - - public PopulationRunner(int threadId) { - this.id = threadId; - } - - public void run() { - int concurrency = - (int) - config.getUserLong( - "test_config", "population_concurrency", DEFAULT_POPULATION_CONCURRENCY); - int numKeys = (int) config.getUserLong("test_config", "num_keys"); - int numPerThread = (numKeys + concurrency - 1) / concurrency; - int startPKey = numPerThread * id; - int endPKey = Math.min(numPerThread * (id + 1), numKeys); - IntStream.range(startPKey, endPKey).forEach(this::populate); - } - - private void populate(int pkey) { - Runnable populate = - () -> { - try { - for (int i = 0; i < NUM_CLUSTERING_KEY1; i++) { - List puts = new ArrayList<>(NUM_CLUSTERING_KEY2); - for (int j = 0; j < NUM_CLUSTERING_KEY2; j++) { - Put put = preparePut(pkey, i, j, ThreadLocalRandom.current().nextInt()); - puts.add(put); - } - StorageCommon.batchPut(storage, puts, BATCH_SIZE); - MultipleClusteringKeyPreparer.this.logInfo( - "(pkey=" + pkey + ", ckey1=" + i + ") inserted"); - } - } catch (Exception e) { - throw new RuntimeException("population failed, retry", e); - } - }; - - Retry retry = Common.getRetryWithFixedWaitDuration("populate"); - Runnable decorated = Retry.decorateRunnable(retry, populate); - try { - decorated.run(); - } catch (Exception e) { - logError("population failed repeatedly!"); - throw e; - } - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeyReaderProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeyReaderProcessor.java deleted file mode 100644 index b0f28be..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeyReaderProcessor.java +++ /dev/null @@ -1,67 +0,0 @@ -package kelpie.scalardb.storage; - -import static kelpie.scalardb.storage.MultipleClusteringKeySchema.NUM_CLUSTERING_KEY1; -import static kelpie.scalardb.storage.MultipleClusteringKeySchema.NUM_CLUSTERING_KEY2; -import static kelpie.scalardb.storage.MultipleClusteringKeySchema.prepareScan; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.Result; -import com.scalar.db.api.Scan; -import com.scalar.db.api.Scan.Ordering.Order; -import com.scalar.db.api.Scanner; -import com.scalar.kelpie.config.Config; -import com.scalar.kelpie.modules.TimeBasedProcessor; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import kelpie.scalardb.transfer.TransferCommon; - -public class MultipleClusteringKeyReaderProcessor extends TimeBasedProcessor { - private final DistributedStorage storage; - private final int numKeys; - private final Order ckey1Order; - private final Order ckey2Order; - - public MultipleClusteringKeyReaderProcessor(Config config) { - super(config); - storage = TransferCommon.getStorage(config); - numKeys = (int) config.getUserLong("test_config", "num_keys"); - - boolean reverse = config.getUserBoolean("test_config", "reverse_scan", false); - Order ckey1ClusteringOrder = - Order.valueOf(config.getUserString("test_config", "ckey1_clustering_order", "ASC")); - Order ckey2ClusteringOrder = - Order.valueOf(config.getUserString("test_config", "ckey2_clustering_order", "ASC")); - ckey1Order = !reverse ? ckey1ClusteringOrder : StorageCommon.reverseOrder(ckey1ClusteringOrder); - ckey2Order = !reverse ? ckey2ClusteringOrder : StorageCommon.reverseOrder(ckey2ClusteringOrder); - } - - @Override - protected void executeEach() throws Exception { - int pkey = ThreadLocalRandom.current().nextInt(numKeys); - int ckey1 = ThreadLocalRandom.current().nextInt(NUM_CLUSTERING_KEY1); - Scan scan = prepareScan(pkey, ckey1, ckey1Order, ckey2Order); - try (Scanner scanner = storage.scan(scan)) { - List results = scanner.all(); - if (results.size() != NUM_CLUSTERING_KEY2) { - logWarn( - "the number of results of the scan for (pkey=" - + pkey - + ", ckey1=" - + ckey1 - + ") should be " - + NUM_CLUSTERING_KEY2 - + ", but " - + results.size()); - } - } - } - - @Override - public void close() { - try { - storage.close(); - } catch (Exception e) { - logWarn("Failed to close the storage", e); - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeySchema.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeySchema.java deleted file mode 100644 index 36c44d6..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeySchema.java +++ /dev/null @@ -1,44 +0,0 @@ -package kelpie.scalardb.storage; - -import com.scalar.db.api.Put; -import com.scalar.db.api.Scan; -import com.scalar.db.api.Scan.Ordering; -import com.scalar.db.api.Scan.Ordering.Order; -import com.scalar.db.io.Key; - -public final class MultipleClusteringKeySchema { - - public static final String NAMESPACE = "storage"; - public static final String TABLE = "multiple-clustering-key"; - public static final String PARTITION_KEY = "pkey"; - public static final String CLUSTERING_KEY1 = "ckey1"; - public static final String CLUSTERING_KEY2 = "ckey2"; - public static final String COL = "col"; - - public static final int NUM_CLUSTERING_KEY1 = 10; - public static final int NUM_CLUSTERING_KEY2 = 100; - - private MultipleClusteringKeySchema() {} - - public static Put preparePut(int pkey, int ckey1, int ckey2, int colValue) { - Key partitionKey = new Key(PARTITION_KEY, pkey); - Key clusteringKey = - Key.newBuilder().addInt(CLUSTERING_KEY1, ckey1).addInt(CLUSTERING_KEY2, ckey2).build(); - return new Put(partitionKey, clusteringKey) - .withValue(COL, colValue) - .forNamespace(NAMESPACE) - .forTable(TABLE); - } - - public static Scan prepareScan(int pkey, int ckey1, Order ckey1Order, Order ckey2Order) { - Key partitionKey = new Key(PARTITION_KEY, pkey); - Key clusteringKey1 = new Key(CLUSTERING_KEY1, ckey1); - return new Scan(partitionKey) - .withStart(clusteringKey1) - .withEnd(clusteringKey1) - .withOrdering(new Ordering(CLUSTERING_KEY1, ckey1Order)) - .withOrdering(new Ordering(CLUSTERING_KEY2, ckey2Order)) - .forNamespace(NAMESPACE) - .forTable(TABLE); - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeyWriterProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeyWriterProcessor.java deleted file mode 100644 index e4873bb..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/MultipleClusteringKeyWriterProcessor.java +++ /dev/null @@ -1,43 +0,0 @@ -package kelpie.scalardb.storage; - -import static kelpie.scalardb.storage.MultipleClusteringKeySchema.NUM_CLUSTERING_KEY1; -import static kelpie.scalardb.storage.MultipleClusteringKeySchema.NUM_CLUSTERING_KEY2; -import static kelpie.scalardb.storage.MultipleClusteringKeySchema.preparePut; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.Put; -import com.scalar.kelpie.config.Config; -import com.scalar.kelpie.modules.TimeBasedProcessor; -import java.util.concurrent.ThreadLocalRandom; -import kelpie.scalardb.transfer.TransferCommon; - -public class MultipleClusteringKeyWriterProcessor extends TimeBasedProcessor { - private final DistributedStorage storage; - private final int numKeys; - - public MultipleClusteringKeyWriterProcessor(Config config) { - super(config); - storage = TransferCommon.getStorage(config); - numKeys = (int) config.getUserLong("test_config", "num_keys"); - } - - @Override - protected void executeEach() throws Exception { - int pkey = ThreadLocalRandom.current().nextInt(numKeys); - int ckey1 = ThreadLocalRandom.current().nextInt(NUM_CLUSTERING_KEY1); - int ckey2 = ThreadLocalRandom.current().nextInt(NUM_CLUSTERING_KEY2); - int colValue = ThreadLocalRandom.current().nextInt(); - - Put put = preparePut(pkey, ckey1, ckey2, colValue); - storage.put(put); - } - - @Override - public void close() { - try { - storage.close(); - } catch (Exception e) { - logWarn("Failed to close the storage", e); - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeyPreparer.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeyPreparer.java deleted file mode 100644 index d060199..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeyPreparer.java +++ /dev/null @@ -1,100 +0,0 @@ -package kelpie.scalardb.storage; - -import static kelpie.scalardb.storage.NoClusteringKeySchema.preparePut; -import static kelpie.scalardb.storage.StorageCommon.DEFAULT_POPULATION_CONCURRENCY; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.kelpie.config.Config; -import com.scalar.kelpie.modules.PreProcessor; -import io.github.resilience4j.retry.Retry; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.IntStream; -import kelpie.scalardb.Common; -import kelpie.scalardb.transfer.TransferCommon; - -public class NoClusteringKeyPreparer extends PreProcessor { - - private final DistributedStorage storage; - - public NoClusteringKeyPreparer(Config config) { - super(config); - this.storage = TransferCommon.getStorage(config); - } - - @Override - public void execute() { - logInfo("insert initial values... "); - - int concurrency = - (int) - config.getUserLong( - "test_config", "population_concurrency", DEFAULT_POPULATION_CONCURRENCY); - ExecutorService es = Executors.newCachedThreadPool(); - List> futures = new ArrayList<>(); - IntStream.range(0, concurrency) - .forEach( - i -> { - CompletableFuture future = - CompletableFuture.runAsync(() -> new PopulationRunner(i).run(), es); - futures.add(future); - }); - - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - logInfo("all records have been inserted"); - } - - @Override - public void close() { - try { - storage.close(); - } catch (Exception e) { - logWarn("Failed to close the storage", e); - } - } - - private class PopulationRunner { - private final int id; - - public PopulationRunner(int threadId) { - this.id = threadId; - } - - public void run() { - int concurrency = - (int) - config.getUserLong( - "test_config", "population_concurrency", DEFAULT_POPULATION_CONCURRENCY); - int numKeys = (int) config.getUserLong("test_config", "num_keys"); - int numPerThread = (numKeys + concurrency - 1) / concurrency; - int start = numPerThread * id; - int end = Math.min(numPerThread * (id + 1), numKeys); - IntStream.range(start, end).forEach(this::populate); - } - - private void populate(int pkey) { - Runnable populate = - () -> { - try { - storage.put(preparePut(pkey, ThreadLocalRandom.current().nextInt())); - NoClusteringKeyPreparer.this.logInfo("pkey=" + pkey + " inserted"); - } catch (Exception e) { - throw new RuntimeException("population failed, retry", e); - } - }; - - Retry retry = Common.getRetryWithFixedWaitDuration("populate"); - Runnable decorated = Retry.decorateRunnable(retry, populate); - try { - decorated.run(); - } catch (Exception e) { - logError("population failed repeatedly!"); - throw e; - } - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeyReaderProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeyReaderProcessor.java deleted file mode 100644 index 4bd7033..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeyReaderProcessor.java +++ /dev/null @@ -1,40 +0,0 @@ -package kelpie.scalardb.storage; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.Get; -import com.scalar.db.api.Result; -import com.scalar.kelpie.config.Config; -import com.scalar.kelpie.modules.TimeBasedProcessor; -import java.util.Optional; -import java.util.concurrent.ThreadLocalRandom; -import kelpie.scalardb.transfer.TransferCommon; - -public class NoClusteringKeyReaderProcessor extends TimeBasedProcessor { - private final DistributedStorage storage; - private final int numKeys; - - public NoClusteringKeyReaderProcessor(Config config) { - super(config); - storage = TransferCommon.getStorage(config); - numKeys = (int) config.getUserLong("test_config", "num_keys"); - } - - @Override - protected void executeEach() throws Exception { - int pkey = ThreadLocalRandom.current().nextInt(numKeys); - Get get = NoClusteringKeySchema.prepareGet(pkey); - Optional result = storage.get(get); - if (!result.isPresent()) { - logWarn("the results should exist, but not"); - } - } - - @Override - public void close() { - try { - storage.close(); - } catch (Exception e) { - logWarn("Failed to close the storage", e); - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeySchema.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeySchema.java deleted file mode 100644 index 6fa2deb..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeySchema.java +++ /dev/null @@ -1,25 +0,0 @@ -package kelpie.scalardb.storage; - -import com.scalar.db.api.Get; -import com.scalar.db.api.Put; -import com.scalar.db.io.Key; - -public class NoClusteringKeySchema { - - public static final String NAMESPACE = "storage"; - public static final String TABLE = "no-clustering-key"; - public static final String PARTITION_KEY = "pkey"; - public static final String COL = "col"; - - private NoClusteringKeySchema() {} - - public static Put preparePut(int pkey, int colValue) { - Key partitionKey = new Key(PARTITION_KEY, pkey); - return new Put(partitionKey).withValue(COL, colValue).forNamespace(NAMESPACE).forTable(TABLE); - } - - public static Get prepareGet(int pkey) { - Key partitionKey = new Key(PARTITION_KEY, pkey); - return new Get(partitionKey).forNamespace(NAMESPACE).forTable(TABLE); - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeyWriterProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeyWriterProcessor.java deleted file mode 100644 index db33473..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/NoClusteringKeyWriterProcessor.java +++ /dev/null @@ -1,38 +0,0 @@ -package kelpie.scalardb.storage; - -import static kelpie.scalardb.storage.NoClusteringKeySchema.preparePut; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.Put; -import com.scalar.kelpie.config.Config; -import com.scalar.kelpie.modules.TimeBasedProcessor; -import java.util.concurrent.ThreadLocalRandom; -import kelpie.scalardb.transfer.TransferCommon; - -public class NoClusteringKeyWriterProcessor extends TimeBasedProcessor { - private final DistributedStorage storage; - private final int numKeys; - - public NoClusteringKeyWriterProcessor(Config config) { - super(config); - storage = TransferCommon.getStorage(config); - numKeys = (int) config.getUserLong("test_config", "num_keys"); - } - - @Override - protected void executeEach() throws Exception { - int pkey = ThreadLocalRandom.current().nextInt(numKeys); - int colValue = ThreadLocalRandom.current().nextInt(); - Put put = preparePut(pkey, colValue); - storage.put(put); - } - - @Override - public void close() { - try { - storage.close(); - } catch (Exception e) { - logWarn("Failed to close the storage", e); - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeyPreparer.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeyPreparer.java deleted file mode 100644 index 1b56ab9..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeyPreparer.java +++ /dev/null @@ -1,109 +0,0 @@ -package kelpie.scalardb.storage; - -import static kelpie.scalardb.storage.SingleClusteringKeySchema.NUM_CLUSTERING_KEY; -import static kelpie.scalardb.storage.SingleClusteringKeySchema.preparePut; -import static kelpie.scalardb.storage.StorageCommon.DEFAULT_POPULATION_CONCURRENCY; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.Put; -import com.scalar.kelpie.config.Config; -import com.scalar.kelpie.modules.PreProcessor; -import io.github.resilience4j.retry.Retry; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.IntStream; -import kelpie.scalardb.Common; -import kelpie.scalardb.transfer.TransferCommon; - -public class SingleClusteringKeyPreparer extends PreProcessor { - - private final int BATCH_SIZE = 20; - - private final DistributedStorage storage; - - public SingleClusteringKeyPreparer(Config config) { - super(config); - this.storage = TransferCommon.getStorage(config); - } - - @Override - public void execute() { - logInfo("insert initial values... "); - - int concurrency = - (int) - config.getUserLong( - "test_config", "population_concurrency", DEFAULT_POPULATION_CONCURRENCY); - ExecutorService es = Executors.newCachedThreadPool(); - List> futures = new ArrayList<>(); - IntStream.range(0, concurrency) - .forEach( - i -> { - CompletableFuture future = - CompletableFuture.runAsync(() -> new PopulationRunner(i).run(), es); - futures.add(future); - }); - - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); - logInfo("all records have been inserted"); - } - - @Override - public void close() { - try { - storage.close(); - } catch (Exception e) { - logWarn("Failed to close the storage", e); - } - } - - private class PopulationRunner { - private final int id; - - public PopulationRunner(int threadId) { - this.id = threadId; - } - - public void run() { - int concurrency = - (int) - config.getUserLong( - "test_config", "population_concurrency", DEFAULT_POPULATION_CONCURRENCY); - int numKeys = (int) config.getUserLong("test_config", "num_keys"); - int numPerThread = (numKeys + concurrency - 1) / concurrency; - int start = numPerThread * id; - int end = Math.min(numPerThread * (id + 1), numKeys); - IntStream.range(start, end).forEach(this::populate); - } - - private void populate(int pkey) { - Runnable populate = - () -> { - try { - List puts = new ArrayList<>(NUM_CLUSTERING_KEY); - for (int i = 0; i < NUM_CLUSTERING_KEY; i++) { - Put put = preparePut(pkey, i, ThreadLocalRandom.current().nextInt()); - puts.add(put); - } - StorageCommon.batchPut(storage, puts, BATCH_SIZE); - SingleClusteringKeyPreparer.this.logInfo("pkey=" + pkey + " inserted"); - } catch (Exception e) { - throw new RuntimeException("population failed, retry", e); - } - }; - - Retry retry = Common.getRetryWithFixedWaitDuration("populate"); - Runnable decorated = Retry.decorateRunnable(retry, populate); - try { - decorated.run(); - } catch (Exception e) { - logError("population failed repeatedly!"); - throw e; - } - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeyReaderProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeyReaderProcessor.java deleted file mode 100644 index ebef12a..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeyReaderProcessor.java +++ /dev/null @@ -1,59 +0,0 @@ -package kelpie.scalardb.storage; - -import static kelpie.scalardb.storage.SingleClusteringKeySchema.NUM_CLUSTERING_KEY; -import static kelpie.scalardb.storage.SingleClusteringKeySchema.prepareScan; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.Result; -import com.scalar.db.api.Scan; -import com.scalar.db.api.Scan.Ordering.Order; -import com.scalar.db.api.Scanner; -import com.scalar.kelpie.config.Config; -import com.scalar.kelpie.modules.TimeBasedProcessor; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; -import kelpie.scalardb.transfer.TransferCommon; - -public class SingleClusteringKeyReaderProcessor extends TimeBasedProcessor { - private final DistributedStorage storage; - private final int numKeys; - private final Order ckeyOrder; - - public SingleClusteringKeyReaderProcessor(Config config) { - super(config); - storage = TransferCommon.getStorage(config); - numKeys = (int) config.getUserLong("test_config", "num_keys"); - - boolean reverse = config.getUserBoolean("test_config", "reverse_scan", false); - Order ckey1ClusteringOrder = - Order.valueOf(config.getUserString("test_config", "ckey_clustering_order", "ASC")); - ckeyOrder = !reverse ? ckey1ClusteringOrder : StorageCommon.reverseOrder(ckey1ClusteringOrder); - } - - @Override - protected void executeEach() throws Exception { - int pkey = ThreadLocalRandom.current().nextInt(numKeys); - Scan scan = prepareScan(pkey, ckeyOrder); - try (Scanner scanner = storage.scan(scan)) { - List results = scanner.all(); - if (results.size() != NUM_CLUSTERING_KEY) { - logWarn( - "the number of results of the scan for (pkey=" - + pkey - + ") should be " - + NUM_CLUSTERING_KEY - + ", but " - + results.size()); - } - } - } - - @Override - public void close() { - try { - storage.close(); - } catch (Exception e) { - logWarn("Failed to close the storage", e); - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeySchema.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeySchema.java deleted file mode 100644 index c0e93de..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeySchema.java +++ /dev/null @@ -1,37 +0,0 @@ -package kelpie.scalardb.storage; - -import com.scalar.db.api.Put; -import com.scalar.db.api.Scan; -import com.scalar.db.api.Scan.Ordering; -import com.scalar.db.api.Scan.Ordering.Order; -import com.scalar.db.io.Key; - -public final class SingleClusteringKeySchema { - - public static final String NAMESPACE = "storage"; - public static final String TABLE = "single-clustering-key"; - public static final String PARTITION_KEY = "pkey"; - public static final String CLUSTERING_KEY = "ckey"; - public static final String COL = "col"; - - public static final int NUM_CLUSTERING_KEY = 100; - - private SingleClusteringKeySchema() {} - - public static Put preparePut(int pkey, int ckey, int colValue) { - Key partitionKey = new Key(PARTITION_KEY, pkey); - Key clusteringKey = new Key(CLUSTERING_KEY, ckey); - return new Put(partitionKey, clusteringKey) - .withValue(COL, colValue) - .forNamespace(NAMESPACE) - .forTable(TABLE); - } - - public static Scan prepareScan(int pkey, Order ckeyOrder) { - Key partitionKey = new Key(PARTITION_KEY, pkey); - return new Scan(partitionKey) - .withOrdering(new Ordering(CLUSTERING_KEY, ckeyOrder)) - .forNamespace(NAMESPACE) - .forTable(TABLE); - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeyWriterProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeyWriterProcessor.java deleted file mode 100644 index 3d10e1a..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/SingleClusteringKeyWriterProcessor.java +++ /dev/null @@ -1,41 +0,0 @@ -package kelpie.scalardb.storage; - -import static kelpie.scalardb.storage.SingleClusteringKeySchema.NUM_CLUSTERING_KEY; -import static kelpie.scalardb.storage.SingleClusteringKeySchema.preparePut; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.Put; -import com.scalar.kelpie.config.Config; -import com.scalar.kelpie.modules.TimeBasedProcessor; -import java.util.concurrent.ThreadLocalRandom; -import kelpie.scalardb.transfer.TransferCommon; - -public class SingleClusteringKeyWriterProcessor extends TimeBasedProcessor { - private final DistributedStorage storage; - private final int numKeys; - - public SingleClusteringKeyWriterProcessor(Config config) { - super(config); - storage = TransferCommon.getStorage(config); - numKeys = (int) config.getUserLong("test_config", "num_keys"); - } - - @Override - protected void executeEach() throws Exception { - int pkey = ThreadLocalRandom.current().nextInt(numKeys); - int ckey = ThreadLocalRandom.current().nextInt(NUM_CLUSTERING_KEY); - int colValue = ThreadLocalRandom.current().nextInt(); - - Put put = preparePut(pkey, ckey, colValue); - storage.put(put); - } - - @Override - public void close() { - try { - storage.close(); - } catch (Exception e) { - logWarn("Failed to close the storage", e); - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/StorageCommon.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/StorageCommon.java deleted file mode 100644 index 9ec9526..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/StorageCommon.java +++ /dev/null @@ -1,41 +0,0 @@ -package kelpie.scalardb.storage; - -import com.scalar.db.api.DistributedStorage; -import com.scalar.db.api.Put; -import com.scalar.db.api.Scan.Ordering.Order; -import com.scalar.db.exception.storage.ExecutionException; -import java.util.ArrayList; -import java.util.List; - -public final class StorageCommon { - - public static final long DEFAULT_POPULATION_CONCURRENCY = 32L; - - private StorageCommon() {} - - public static Order reverseOrder(Order order) { - switch (order) { - case ASC: - return Order.DESC; - case DESC: - return Order.ASC; - default: - throw new AssertionError("unknown order: " + order); - } - } - - public static void batchPut(DistributedStorage storage, List puts, int batchSize) - throws ExecutionException { - List buffer = new ArrayList<>(batchSize); - for (Put put : puts) { - buffer.add(put); - if (buffer.size() == batchSize) { - storage.put(buffer); - buffer.clear(); - } - } - if (!buffer.isEmpty()) { - storage.put(buffer); - } - } -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/storage/StorageReporter.java b/scalardb-test/src/main/java/kelpie/scalardb/storage/StorageReporter.java deleted file mode 100644 index 81ea090..0000000 --- a/scalardb-test/src/main/java/kelpie/scalardb/storage/StorageReporter.java +++ /dev/null @@ -1,19 +0,0 @@ -package kelpie.scalardb.storage; - -import com.scalar.kelpie.config.Config; -import com.scalar.kelpie.modules.PostProcessor; - -public class StorageReporter extends PostProcessor { - - public StorageReporter(Config config) { - super(config); - } - - @Override - public void execute() { - getSummary(); - } - - @Override - public void close() {} -} diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/NontransactionalTransferPreparer.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/NontransactionalTransferPreparer.java index 61693d8..ec2fb25 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/NontransactionalTransferPreparer.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/NontransactionalTransferPreparer.java @@ -21,7 +21,7 @@ public class NontransactionalTransferPreparer extends PreProcessor { public NontransactionalTransferPreparer(Config config) { super(config); - this.storage = TransferCommon.getStorage(config); + this.storage = Common.getStorage(config); } @Override diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/NontransactionalTransferProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/NontransactionalTransferProcessor.java index 73c74b3..9740997 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/NontransactionalTransferProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/NontransactionalTransferProcessor.java @@ -9,6 +9,7 @@ import com.scalar.kelpie.modules.TimeBasedProcessor; import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; +import kelpie.scalardb.Common; public class NontransactionalTransferProcessor extends TimeBasedProcessor { private final DistributedStorage storage; @@ -16,7 +17,7 @@ public class NontransactionalTransferProcessor extends TimeBasedProcessor { public NontransactionalTransferProcessor(Config config) { super(config); - this.storage = TransferCommon.getStorage(config); + this.storage = Common.getStorage(config); this.numAccounts = (int) config.getUserLong("test_config", "num_accounts"); } diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferCommon.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferCommon.java index e42028f..211eadd 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferCommon.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferCommon.java @@ -7,7 +7,6 @@ import com.scalar.db.api.Get; import com.scalar.db.api.Put; import com.scalar.db.api.Result; -import com.scalar.db.api.TwoPhaseCommitTransactionManager; import com.scalar.db.exception.storage.ExecutionException; import com.scalar.db.exception.transaction.AbortException; import com.scalar.db.exception.transaction.TransactionException; @@ -21,7 +20,7 @@ import kelpie.scalardb.Common; public class TransferCommon { - public static final String KEYSPACE = "transfer"; + public static final String NAMESPACE = "transfer"; public static final String TABLE = "tx_transfer"; public static final String ACCOUNT_ID = "account_id"; public static final String ACCOUNT_TYPE = "account_type"; @@ -30,47 +29,38 @@ public class TransferCommon { public static final int INITIAL_BALANCE = 10000; public static final int NUM_TYPES = 2; - public static DistributedTransactionManager getTransactionManager(Config config) { - return Common.getTransactionManager(config, KEYSPACE, TABLE); - } - - public static TwoPhaseCommitTransactionManager getTwoPhaseCommitTransactionManager1( - Config config) { - return Common.getTwoPhaseCommitTransactionManager1(config, KEYSPACE, TABLE); - } - - public static TwoPhaseCommitTransactionManager getTwoPhaseCommitTransactionManager2( - Config config) { - return Common.getTwoPhaseCommitTransactionManager2(config, KEYSPACE, TABLE); - } - - public static DistributedStorage getStorage(Config config) { - DistributedStorage storage = Common.getStorage(config); - storage.with(KEYSPACE, TABLE); - return storage; - } - public static Get prepareGet(int id, int type) { - Key partitionKey = new Key(ACCOUNT_ID, id); - Key clusteringKey = new Key(ACCOUNT_TYPE, type); - - return new Get(partitionKey, clusteringKey).withConsistency(Consistency.LINEARIZABLE); + Key partitionKey = Key.ofInt(ACCOUNT_ID, id); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, type); + + return Get.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .consistency(Consistency.LINEARIZABLE) + .build(); } public static Put preparePut(int id, int type, int amount) { - Key partitionKey = new Key(ACCOUNT_ID, id); - Key clusteringKey = new Key(ACCOUNT_TYPE, type); - return new Put(partitionKey, clusteringKey) - .withConsistency(Consistency.LINEARIZABLE) - .withValue(BALANCE, amount); + Key partitionKey = Key.ofInt(ACCOUNT_ID, id); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, type); + return Put.newBuilder() + .namespace(NAMESPACE) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .consistency(Consistency.LINEARIZABLE) + .intValue(BALANCE, amount) + .build(); } public static List readRecordsWithRetry(Config config) { int maxRetry = (int) config.getUserLong("test_config", "checker_max_retries_for_read", 10L); long retryIntervalSleepTime = config.getUserLong("test_config", "checker_retry_interval_millis", 1000L); - DistributedTransactionManager manager = getTransactionManager(config); - DistributedStorage storage = getStorage(config); + DistributedTransactionManager manager = Common.getTransactionManager(config); + DistributedStorage storage = Common.getStorage(config); Retry retry = Common.getRetryWithExponentialBackoff("readBalances", maxRetry, retryIntervalSleepTime); Supplier> decorated = @@ -138,7 +128,7 @@ private static List readRecords( } public static int getBalanceFromResult(Result result) { - return result.getValue(BALANCE).get().getAsInt(); + return result.getInt(BALANCE); } public static int getTotalInitialBalance(Config config) { @@ -151,6 +141,6 @@ public static int getActualTotalVersion(List results) { } public static int getActualTotalBalance(List results) { - return results.stream().mapToInt(r -> r.getValue(BALANCE).get().getAsInt()).sum(); + return results.stream().mapToInt(r -> r.getInt(BALANCE)).sum(); } } diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferPreparer.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferPreparer.java index 49a9150..362873d 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferPreparer.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferPreparer.java @@ -25,7 +25,7 @@ public class TransferPreparer extends PreProcessor { public TransferPreparer(Config config) { super(config); - this.manager = TransferCommon.getTransactionManager(config); + this.manager = Common.getTransactionManager(config); } @Override diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferProcessor.java index a230128..cd2c55c 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferProcessor.java @@ -16,6 +16,7 @@ import java.util.concurrent.ThreadLocalRandom; import javax.json.Json; import javax.json.JsonObjectBuilder; +import kelpie.scalardb.Common; public class TransferProcessor extends TimeBasedProcessor { private final DistributedTransactionManager manager; @@ -28,7 +29,7 @@ public class TransferProcessor extends TimeBasedProcessor { public TransferProcessor(Config config) { super(config); - this.manager = TransferCommon.getTransactionManager(config); + this.manager = Common.getTransactionManager(config); this.numAccounts = (int) config.getUserLong("test_config", "num_accounts"); this.isVerification = config.getUserBoolean("test_config", "is_verification", false); diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferWithTwoPhaseCommitTransactionProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferWithTwoPhaseCommitTransactionProcessor.java index bb6b321..1bbe842 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferWithTwoPhaseCommitTransactionProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/TransferWithTwoPhaseCommitTransactionProcessor.java @@ -16,6 +16,7 @@ import java.util.concurrent.ThreadLocalRandom; import javax.json.Json; import javax.json.JsonObjectBuilder; +import kelpie.scalardb.Common; public class TransferWithTwoPhaseCommitTransactionProcessor extends TimeBasedProcessor { @@ -30,8 +31,8 @@ public class TransferWithTwoPhaseCommitTransactionProcessor extends TimeBasedPro public TransferWithTwoPhaseCommitTransactionProcessor(Config config) { super(config); - manager1 = TransferCommon.getTwoPhaseCommitTransactionManager1(config); - manager2 = TransferCommon.getTwoPhaseCommitTransactionManager2(config); + manager1 = Common.getTwoPhaseCommitTransactionManager1(config); + manager2 = Common.getTwoPhaseCommitTransactionManager2(config); this.numAccounts = (int) config.getUserLong("test_config", "num_accounts"); this.isVerification = config.getUserBoolean("test_config", "is_verification", false); diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/WriteSkewTransferProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/WriteSkewTransferProcessor.java index 8db17ca..2b8a08c 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/WriteSkewTransferProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/WriteSkewTransferProcessor.java @@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.json.Json; import javax.json.JsonObjectBuilder; +import kelpie.scalardb.Common; public class WriteSkewTransferProcessor extends TimeBasedProcessor { private final DistributedTransactionManager manager; @@ -35,7 +36,7 @@ public class WriteSkewTransferProcessor extends TimeBasedProcessor { public WriteSkewTransferProcessor(Config config) { super(config); - this.manager = TransferCommon.getTransactionManager(config); + this.manager = Common.getTransactionManager(config); this.numAccounts = (int) config.getUserLong("test_config", "num_accounts"); this.isVerification = diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/cosmos/NontransactionalTransferPreparer.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/cosmos/NontransactionalTransferPreparer.java index 8d36f7b..4746843 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/cosmos/NontransactionalTransferPreparer.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/cosmos/NontransactionalTransferPreparer.java @@ -29,7 +29,7 @@ public NontransactionalTransferPreparer(Config config) { client = CosmosUtil.createCosmosClient(config); container = - client.getDatabase(TransferCommon.KEYSPACE).getContainer(TransferCommon.TABLE + "_cosmos"); + client.getDatabase(TransferCommon.NAMESPACE).getContainer(TransferCommon.TABLE + "_cosmos"); } @Override diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/cosmos/NontransactionalTransferProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/cosmos/NontransactionalTransferProcessor.java index 9707697..d459ecb 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/cosmos/NontransactionalTransferProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/cosmos/NontransactionalTransferProcessor.java @@ -25,7 +25,7 @@ public NontransactionalTransferProcessor(Config config) { super(config); client = CosmosUtil.createCosmosClient(config); - container = client.getDatabase(TransferCommon.KEYSPACE).getContainer(TABLE); + container = client.getDatabase(TransferCommon.NAMESPACE).getContainer(TABLE); numAccounts = (int) config.getUserLong("test_config", "num_accounts"); useStoredProcedure = config.getUserBoolean("cosmos_config", "use_stored_procedure"); if (useStoredProcedure) { diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/graphql/TransferProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/graphql/TransferProcessor.java index c2a379d..445f331 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/graphql/TransferProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/graphql/TransferProcessor.java @@ -77,9 +77,9 @@ public TransferProcessor(Config config) throws Exception { try { ScalarDbSchema.Builder scalarDBSchemaBuilder = ScalarDbSchema.newBuilder(); scalarDBSchemaBuilder.tableMetadata( - TransferCommon.KEYSPACE, + TransferCommon.NAMESPACE, TransferCommon.TABLE, - transactionAdmin.getTableMetadata(TransferCommon.KEYSPACE, TransferCommon.TABLE)); + transactionAdmin.getTableMetadata(TransferCommon.NAMESPACE, TransferCommon.TABLE)); scalarDbSchema = scalarDBSchemaBuilder.build(); } finally { try { diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/jdbc/NontransactionalTransferProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/jdbc/NontransactionalTransferProcessor.java index bcb04d0..7491b10 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/jdbc/NontransactionalTransferProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/jdbc/NontransactionalTransferProcessor.java @@ -65,7 +65,7 @@ private void transfer(int fromId, int toId, int amount) throws SQLException { private int get(int id, int type) throws SQLException { final String enclosedFullTableName = - rdbEngine.encloseFullTableName(TransferCommon.KEYSPACE, TransferCommon.TABLE); + rdbEngine.encloseFullTableName(TransferCommon.NAMESPACE, TransferCommon.TABLE); final String enclosedAccountId = rdbEngine.enclose(TransferCommon.ACCOUNT_ID); final String enclosedAccountType = rdbEngine.enclose(TransferCommon.ACCOUNT_TYPE); final String enclosedBalance = rdbEngine.enclose(TransferCommon.BALANCE); @@ -94,7 +94,7 @@ private int get(int id, int type) throws SQLException { private void put(int id, int type, int amount) throws SQLException { final String enclosedFullTableName = - rdbEngine.encloseFullTableName(TransferCommon.KEYSPACE, TransferCommon.TABLE); + rdbEngine.encloseFullTableName(TransferCommon.NAMESPACE, TransferCommon.TABLE); final String enclosedAccountId = rdbEngine.enclose(TransferCommon.ACCOUNT_ID); final String enclosedAccountType = rdbEngine.enclose(TransferCommon.ACCOUNT_TYPE); final String enclosedBalance = rdbEngine.enclose(TransferCommon.BALANCE); diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferPreparer.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferPreparer.java index 44c348e..b5ebe7e 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferPreparer.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferPreparer.java @@ -102,7 +102,7 @@ private void populateWithTx(int startId, int endId) { PreparedStatement preparedStatement = sqlSession.prepareStatement( "INSERT INTO " - + TransferCommon.KEYSPACE + + TransferCommon.NAMESPACE + "." + TransferCommon.TABLE + " VALUES(?,?,?)"); diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferProcessor.java index 2797c9d..939bd93 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferProcessor.java @@ -60,7 +60,7 @@ private void transfer(SqlSession sqlSession, int fromId, int toId, int amount) { preparedStatement = sqlSession.prepareStatement( "SELECT * FROM " - + TransferCommon.KEYSPACE + + TransferCommon.NAMESPACE + "." + TransferCommon.TABLE + " WHERE " @@ -85,7 +85,7 @@ record = resultSet.one(); preparedStatement = sqlSession.prepareStatement( "UPDATE " - + TransferCommon.KEYSPACE + + TransferCommon.NAMESPACE + "." + TransferCommon.TABLE + " SET " diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferWithTwoPhaseCommitTransactionProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferWithTwoPhaseCommitTransactionProcessor.java index cccf4d0..604dd6c 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferWithTwoPhaseCommitTransactionProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/TransferWithTwoPhaseCommitTransactionProcessor.java @@ -64,7 +64,7 @@ private void transfer( final String SELECT_QUERY = "SELECT * FROM " - + TransferCommon.KEYSPACE + + TransferCommon.NAMESPACE + "." + TransferCommon.TABLE + " WHERE " @@ -75,7 +75,7 @@ private void transfer( final String UPDATE_QUERY = "UPDATE " - + TransferCommon.KEYSPACE + + TransferCommon.NAMESPACE + "." + TransferCommon.TABLE + " SET " diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferPreparer.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferPreparer.java index 7201279..0c0b243 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferPreparer.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferPreparer.java @@ -100,7 +100,7 @@ private void populateWithTx(int startId, int endId) { try (PreparedStatement preparedStatement = connection.prepareStatement( "INSERT INTO " - + TransferCommon.KEYSPACE + + TransferCommon.NAMESPACE + "." + TransferCommon.TABLE + " VALUES(?,?,?)")) { diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferProcessor.java index bfc619c..b5d4b1b 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferProcessor.java @@ -56,7 +56,7 @@ private void transfer(Connection connection, int fromId, int toId, int amount) try (PreparedStatement preparedStatement = connection.prepareStatement( "SELECT * FROM " - + TransferCommon.KEYSPACE + + TransferCommon.NAMESPACE + "." + TransferCommon.TABLE + " WHERE " @@ -88,7 +88,7 @@ private void transfer(Connection connection, int fromId, int toId, int amount) try (PreparedStatement preparedStatement = connection.prepareStatement( "UPDATE " - + TransferCommon.KEYSPACE + + TransferCommon.NAMESPACE + "." + TransferCommon.TABLE + " SET " diff --git a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferWithTwoPhaseCommitTransactionProcessor.java b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferWithTwoPhaseCommitTransactionProcessor.java index de48698..2c85868 100644 --- a/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferWithTwoPhaseCommitTransactionProcessor.java +++ b/scalardb-test/src/main/java/kelpie/scalardb/transfer/sql/jdbc/TransferWithTwoPhaseCommitTransactionProcessor.java @@ -63,7 +63,7 @@ private void transfer( final String SELECT_QUERY = "SELECT * FROM " - + TransferCommon.KEYSPACE + + TransferCommon.NAMESPACE + "." + TransferCommon.TABLE + " WHERE " @@ -74,7 +74,7 @@ private void transfer( final String UPDATE_QUERY = "UPDATE " - + TransferCommon.KEYSPACE + + TransferCommon.NAMESPACE + "." + TransferCommon.TABLE + " SET "