Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow specifying the number of range keys in CassandraTransactionalKeyValue workload #16

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/com/yugabyte/sample/apps/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,7 @@ public static enum Type {

// The number of client connections to establish to each host in the YugaByte DB cluster.
public int concurrentClients = 4;

// Number of range keys to include in the primary key.
public int numRangeKeys = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to default this to -1 but the static defaults in the app to 0?

}
6 changes: 0 additions & 6 deletions src/main/java/com/yugabyte/sample/apps/CassandraKeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@
import java.util.Arrays;
import java.util.List;

import org.apache.log4j.Logger;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.yugabyte.sample.common.SimpleLoadGenerator.Key;

/**
* This workload writes and reads some random string keys from a CQL server. By default, this app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class CassandraTransactionalKeyValue extends CassandraKeyValue {
// The number of unique keys to write. This determines the number of inserts (as opposed to
// updates).
appConfig.numUniqueKeysToWrite = NUM_UNIQUE_KEYS;

appConfig.numRangeKeys = 0;
}

// The default table name to create and use for CRUD ops.
Expand All @@ -57,22 +59,67 @@ public class CassandraTransactionalKeyValue extends CassandraKeyValue {
public CassandraTransactionalKeyValue() {
}

private String getRangeKeysForSchemaDefinition() {
StringBuilder sb = new StringBuilder();
for (int i = 1; i <= appConfig.numRangeKeys; ++i) {
sb.append("r" + i + " varchar, ");
}
return sb.toString();
}

private String getPrimaryKeyStr() {
if (appConfig.numRangeKeys <= 0) {
return "k";
}
return "(k)" + getCommasAndRangeKeys();
}


private String getCommasAndRangeKeys() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add a comment that this returns a string with a , at the start, I almost missed that...

StringBuilder sb = new StringBuilder();
for (int i = 1; i <= appConfig.numRangeKeys; ++i) {
sb.append(", r" );
sb.append(i);
}
return sb.toString();
}

private String getRepeatedPlaceholderForRangeKeys(String placeholder) {
StringBuilder sb = new StringBuilder();
for (int i = 1; i <= appConfig.numRangeKeys; ++i) {
sb.append(", ");
sb.append(placeholder);
}
return sb.toString();
}

@Override
public List<String> getCreateTableStatements() {
String createStmt = String.format(
"CREATE TABLE IF NOT EXISTS %s (k varchar, v bigint, primary key (k)) " +
"CREATE TABLE IF NOT EXISTS %s (k varchar, " +
getRangeKeysForSchemaDefinition() +
"v bigint, " +
"primary key (" + getPrimaryKeyStr() + ")) " +
"WITH transactions = { 'enabled' : true };", getTableName());
return Arrays.asList(createStmt);
}

@Override
public String getTableName() {
return appConfig.tableName != null ? appConfig.tableName : DEFAULT_TABLE_NAME;
if (appConfig.tableName != null)
return appConfig.tableName;
if (appConfig.numRangeKeys == 0) {
return DEFAULT_TABLE_NAME;
}
return DEFAULT_TABLE_NAME + "_" + appConfig.numRangeKeys + "range_key" + (
appConfig.numRangeKeys > 1 ? "s" : "");
}

private PreparedStatement getPreparedSelect() {
return getPreparedSelect(String.format("SELECT k, v, writetime(v) FROM %s WHERE k in :k;",
getTableName()),
appConfig.localReads);
return getPreparedSelect(String.format(
"SELECT k" + getCommasAndRangeKeys() + ", v, writetime(v) AS wt FROM %s WHERE k in :k;",
getTableName()),
appConfig.localReads);
}

private void verifyValue(Key key, long value1, long value2) {
Expand All @@ -92,18 +139,19 @@ public long doRead() {
}
// Do the read from Cassandra.
// Bind the select statement.
BoundStatement select = getPreparedSelect().bind(Arrays.asList(key.asString() + "_1",
key.asString() + "_2"));
String k1 = key.asString() + "_1";
String k2 = key.asString() + "_2";
BoundStatement select = getPreparedSelect().bind(Arrays.asList(k1, k2));
ResultSet rs = getCassandraClient().execute(select);
List<Row> rows = rs.all();
if (rows.size() != 2) {
LOG.fatal("Read key: " + key.asString() + " expected 2 row in result, got " + rows.size());
return 1;
}
verifyValue(key, rows.get(0).getLong(1), rows.get(1).getLong(1));
if (rows.get(0).getLong(2) != rows.get(1).getLong(2)) {
verifyValue(key, rows.get(0).getLong("v"), rows.get(1).getLong("v"));
if (rows.get(0).getLong("wt") != rows.get(1).getLong("wt")) {
LOG.fatal("Writetime mismatch for key: " + key.toString() + ", " +
rows.get(0).getLong(2) + " vs " + rows.get(1).getLong(2));
rows.get(0).getLong("wt") + " vs " + rows.get(1).getLong("wt"));
}

LOG.debug("Read key: " + key.toString());
Expand All @@ -113,8 +161,10 @@ public long doRead() {
protected PreparedStatement getPreparedInsert() {
return getPreparedInsert(String.format(
"BEGIN TRANSACTION" +
" INSERT INTO %s (k, v) VALUES (:k1, :v1);" +
" INSERT INTO %s (k, v) VALUES (:k2, :v2);" +
" INSERT INTO %s (k" + getCommasAndRangeKeys() + ", v) VALUES (:k1" +
getRepeatedPlaceholderForRangeKeys(":k1") + ", :v1);" +
" INSERT INTO %s (k" + getCommasAndRangeKeys() + ", v) VALUES (:k2" +
getRepeatedPlaceholderForRangeKeys(":k2") + ", :v2);" +
"END TRANSACTION;",
getTableName(),
getTableName()));
Expand Down Expand Up @@ -150,7 +200,8 @@ public long doWrite(int threadIdx) {
@Override
public List<String> getWorkloadDescription() {
return Arrays.asList(
"Key-value app with multi-row transactions. Each write txn inserts a pair of unique string keys with the same value.",
"Key-value app with multi-row transactions. Each write txn inserts a pair of unique string " +
"keys with the same value.",
" There are multiple readers and writers that update these keys and read them in pair ",
"indefinitely. The number of reads and writes to perform can be specified as a parameter.");
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/yugabyte/sample/common/CmdLineOpts.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ public void initialize(CommandLine commandLine) throws ClassNotFoundException {
AppBase.appConfig.concurrentClients = Integer.parseInt(
commandLine.getOptionValue("concurrent_clients"));
}
if (appName.equals(CassandraTransactionalKeyValue.class.getSimpleName())) {
if (commandLine.hasOption("num_range_keys")) {
AppBase.appConfig.numRangeKeys = Integer.parseInt(
commandLine.getOptionValue("num_range_keys"));
}
}
}

/**
Expand Down Expand Up @@ -604,6 +610,12 @@ public static CmdLineOpts createFromArgs(String[] args) throws Exception {
options.addOption("batch_write", false,
"[CassandraSecondaryIndex] Enable batch write of key values.");

// Options for CassandraTransactionalKeyValue app.
options.addOption(
"num_range_keys", true,
"[CassandraTransactionalKeyValue] Use this number of range key components in the primary " +
"key.");

// Options for Redis Pipelined Key Value
options.addOption(
"pipeline_length", true,
Expand Down