diff --git a/bin/bindings.properties b/bin/bindings.properties index a2aeb9a64b..a3db9189c6 100644 --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -32,6 +32,7 @@ arangodb:com.yahoo.ycsb.db.ArangoDBClient arangodb3:com.yahoo.ycsb.db.arangodb.ArangoDB3Client azuretablestorage:com.yahoo.ycsb.db.azuretablestorage.AzureClient basic:com.yahoo.ycsb.BasicDB +basicts:com.yahoo.ycsb.BasicTSDB cassandra-cql:com.yahoo.ycsb.db.CassandraCQLClient cassandra2-cql:com.yahoo.ycsb.db.CassandraCQLClient cloudspanner:com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient diff --git a/bin/ycsb b/bin/ycsb index 7fb7518024..59ad00dd0f 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -57,6 +57,7 @@ DATABASES = { "arangodb3" : "com.yahoo.ycsb.db.arangodb.ArangoDB3Client", "asynchbase" : "com.yahoo.ycsb.db.AsyncHBaseClient", "basic" : "com.yahoo.ycsb.BasicDB", + "basicts" : "com.yahoo.ycsb.BasicTSDB", "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "cloudspanner" : "com.yahoo.ycsb.db.cloudspanner.CloudSpannerClient", @@ -269,8 +270,8 @@ def main(): warn("Running against a source checkout. In order to get our runtime " "dependencies we'll have to invoke Maven. Depending on the state " "of your system, this may take ~30-45 seconds") - db_location = "core" if binding == "basic" else binding - project = "core" if binding == "basic" else binding + "-binding" + db_location = "core" if (binding == "basic" or binding == "basicts") else binding + project = "core" if (binding == "basic" or binding == "basicts") else binding + "-binding" db_dir = os.path.join(ycsb_home, db_location) # goes first so we can rely on side-effect of package maven_says = get_classpath_from_maven(project) diff --git a/core/src/main/java/com/yahoo/ycsb/BasicDB.java b/core/src/main/java/com/yahoo/ycsb/BasicDB.java index 9e483f2147..15d1101053 100644 --- a/core/src/main/java/com/yahoo/ycsb/BasicDB.java +++ b/core/src/main/java/com/yahoo/ycsb/BasicDB.java @@ -47,16 +47,16 @@ public class BasicDB extends DB { protected static Map inserts; protected static Map deletes; - private boolean verbose; - private boolean randomizedelay; - private int todelay; + protected boolean verbose; + protected boolean randomizedelay; + protected int todelay; protected boolean count; public BasicDB() { todelay = 0; } - private void delay() { + protected void delay() { if (todelay > 0) { long delayNs; if (randomizedelay) { diff --git a/core/src/main/java/com/yahoo/ycsb/BasicTSDB.java b/core/src/main/java/com/yahoo/ycsb/BasicTSDB.java new file mode 100644 index 0000000000..42117ea9e9 --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/BasicTSDB.java @@ -0,0 +1,273 @@ +/** + * Copyright (c) 2017 YCSB contributors All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; + +import com.yahoo.ycsb.workloads.TimeSeriesWorkload; + +/** + * Basic DB for printing out time series workloads and/or tracking the distribution + * of keys and fields. + */ +public class BasicTSDB extends BasicDB { + + /** Time series workload specific counters. */ + protected static Map timestamps; + protected static Map floats; + protected static Map integers; + + private String timestampKey; + private String valueKey; + private String tagPairDelimiter; + private String queryTimeSpanDelimiter; + private long lastTimestamp; + + @Override + public void init() { + super.init(); + + synchronized (MUTEX) { + if (timestamps == null) { + timestamps = new HashMap(); + floats = new HashMap(); + integers = new HashMap(); + } + } + + timestampKey = getProperties().getProperty( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY, + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT); + valueKey = getProperties().getProperty( + TimeSeriesWorkload.VALUE_KEY_PROPERTY, + TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT); + tagPairDelimiter = getProperties().getProperty( + TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY, + TimeSeriesWorkload.PAIR_DELIMITER_PROPERTY_DEFAULT); + queryTimeSpanDelimiter = getProperties().getProperty( + TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY, + TimeSeriesWorkload.QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT); + } + + public Status read(String table, String key, Set fields, Map result) { + delay(); + + if (verbose) { + StringBuilder sb = getStringBuilder(); + sb.append("READ ").append(table).append(" ").append(key).append(" [ "); + if (fields != null) { + for (String f : fields) { + sb.append(f).append(" "); + } + } else { + sb.append(""); + } + + sb.append("]"); + System.out.println(sb); + } + + if (count) { + Set filtered = null; + if (fields != null) { + filtered = new HashSet(); + for (final String field : fields) { + if (field.startsWith(timestampKey)) { + String[] parts = field.split(tagPairDelimiter); + if (parts[1].contains(queryTimeSpanDelimiter)) { + parts = parts[1].split(queryTimeSpanDelimiter); + lastTimestamp = Long.parseLong(parts[0]); + } else { + lastTimestamp = Long.parseLong(parts[1]); + } + synchronized(timestamps) { + Integer ctr = timestamps.get(lastTimestamp); + if (ctr == null) { + timestamps.put(lastTimestamp, 1); + } else { + timestamps.put(lastTimestamp, ctr + 1); + } + } + } else { + filtered.add(field); + } + } + } + incCounter(reads, hash(table, key, filtered)); + } + return Status.OK; + } + + @Override + public Status update(String table, String key, Map values) { + delay(); + + boolean isFloat = false; + + if (verbose) { + StringBuilder sb = getStringBuilder(); + sb.append("UPDATE ").append(table).append(" ").append(key).append(" [ "); + if (values != null) { + final TreeMap tree = new TreeMap(values); + for (Map.Entry entry : tree.entrySet()) { + if (entry.getKey().equals(timestampKey)) { + sb.append(entry.getKey()).append("=") + .append(Utils.bytesToLong(entry.getValue().toArray())).append(" "); + } else if (entry.getKey().equals(valueKey)) { + final NumericByteIterator it = (NumericByteIterator) entry.getValue(); + isFloat = it.isFloatingPoint(); + sb.append(entry.getKey()).append("=") + .append(isFloat ? it.getDouble() : it.getLong()).append(" "); + } else { + sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" "); + } + } + } + sb.append("]"); + System.out.println(sb); + } + + if (count) { + if (!verbose) { + isFloat = ((NumericByteIterator) values.get(valueKey)).isFloatingPoint(); + } + int hash = hash(table, key, values); + incCounter(updates, hash); + synchronized(timestamps) { + Integer ctr = timestamps.get(lastTimestamp); + if (ctr == null) { + timestamps.put(lastTimestamp, 1); + } else { + timestamps.put(lastTimestamp, ctr + 1); + } + } + if (isFloat) { + incCounter(floats, hash); + } else { + incCounter(integers, hash); + } + } + + return Status.OK; + } + + @Override + public Status insert(String table, String key, Map values) { + delay(); + + boolean isFloat = false; + + if (verbose) { + StringBuilder sb = getStringBuilder(); + sb.append("INSERT ").append(table).append(" ").append(key).append(" [ "); + if (values != null) { + final TreeMap tree = new TreeMap(values); + for (Map.Entry entry : tree.entrySet()) { + if (entry.getKey().equals(timestampKey)) { + sb.append(entry.getKey()).append("=") + .append(Utils.bytesToLong(entry.getValue().toArray())).append(" "); + } else if (entry.getKey().equals(valueKey)) { + final NumericByteIterator it = (NumericByteIterator) entry.getValue(); + isFloat = it.isFloatingPoint(); + sb.append(entry.getKey()).append("=") + .append(isFloat ? it.getDouble() : it.getLong()).append(" "); + } else { + sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" "); + } + } + } + sb.append("]"); + System.out.println(sb); + } + + if (count) { + if (!verbose) { + isFloat = ((NumericByteIterator) values.get(valueKey)).isFloatingPoint(); + } + int hash = hash(table, key, values); + incCounter(inserts, hash); + synchronized(timestamps) { + Integer ctr = timestamps.get(lastTimestamp); + if (ctr == null) { + timestamps.put(lastTimestamp, 1); + } else { + timestamps.put(lastTimestamp, ctr + 1); + } + } + if (isFloat) { + incCounter(floats, hash); + } else { + incCounter(integers, hash); + } + } + + return Status.OK; + } + + @Override + public void cleanup() { + super.cleanup(); + if (count && counter < 1) { + System.out.println("[TIMESTAMPS], Unique, " + timestamps.size()); + System.out.println("[FLOATS], Unique series, " + floats.size()); + System.out.println("[INTEGERS], Unique series, " + integers.size()); + + long minTs = Long.MAX_VALUE; + long maxTs = Long.MIN_VALUE; + for (final long ts : timestamps.keySet()) { + if (ts > maxTs) { + maxTs = ts; + } + if (ts < minTs) { + minTs = ts; + } + } + System.out.println("[TIMESTAMPS], Min, " + minTs); + System.out.println("[TIMESTAMPS], Max, " + maxTs); + } + } + + @Override + protected int hash(final String table, final String key, final Map values) { + final TreeMap sorted = new TreeMap(); + for (final Entry entry : values.entrySet()) { + if (entry.getKey().equals(valueKey)) { + continue; + } else if (entry.getKey().equals(timestampKey)) { + lastTimestamp = ((NumericByteIterator) entry.getValue()).getLong(); + entry.getValue().reset(); + continue; + } + sorted.put(entry.getKey(), entry.getValue()); + } + // yeah it's ugly but gives us a unique hash without having to add hashers + // to all of the ByteIterators. + StringBuilder buf = new StringBuilder().append(table).append(key); + for (final Entry entry : sorted.entrySet()) { + entry.getValue().reset(); + buf.append(entry.getKey()) + .append(entry.getValue().toString()); + } + return buf.toString().hashCode(); + } + +} \ No newline at end of file diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java new file mode 100644 index 0000000000..3a637eebd1 --- /dev/null +++ b/core/src/main/java/com/yahoo/ycsb/workloads/TimeSeriesWorkload.java @@ -0,0 +1,1286 @@ +/** + * Copyright (c) 2017 YCSB contributors All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.workloads; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.Vector; +import java.util.concurrent.TimeUnit; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.Client; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.NumericByteIterator; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.Utils; +import com.yahoo.ycsb.Workload; +import com.yahoo.ycsb.WorkloadException; +import com.yahoo.ycsb.generator.DiscreteGenerator; +import com.yahoo.ycsb.generator.Generator; +import com.yahoo.ycsb.generator.HotspotIntegerGenerator; +import com.yahoo.ycsb.generator.IncrementingPrintableStringGenerator; +import com.yahoo.ycsb.generator.NumberGenerator; +import com.yahoo.ycsb.generator.RandomDiscreteTimestampGenerator; +import com.yahoo.ycsb.generator.ScrambledZipfianGenerator; +import com.yahoo.ycsb.generator.SequentialGenerator; +import com.yahoo.ycsb.generator.UniformLongGenerator; +import com.yahoo.ycsb.generator.UnixEpochTimestampGenerator; +import com.yahoo.ycsb.generator.ZipfianGenerator; +import com.yahoo.ycsb.measurements.Measurements; + +/** + * A specialized workload dealing with time series data, i.e. series of discreet + * events associated with timestamps and identifiers. For this workload, identities + * consist of a {@link String} key and a set of {@link String} tag key/value + * pairs. + *

+ * For example: + * + * + * + * + * + * + *
Time Series KeyTag Keys/Values148322880014832288601483228920
AAAA=AA, AB=AA42.51.085.9
AAAA=AA, AB=AB-9.476.90.18
ABAA=AA, AB=AA-93.057.1-63.8
ABAA=AA, AB=AB7.656.1-0.3
+ *

+ * This table shows four time series with 3 measurements at three different timestamps. + * Keys, tags, timestamps and values (numeric only at this time) are generated by + * this workload. For details on properties and behavior, see the + * {@code workloads/tsworkload_template} file. The Javadocs will focus on implementation + * and how {@link DB} clients can parse the workload. + *

+ * In order to avoid having existing DB implementations implement a brand new interface + * this workload uses the existing APIs to encode a few special values that can be parsed + * by the client. The special values include the timestamp, numeric value and some + * query (read or scan) parameters. As an example on how to parse the fields, see + * {@link BasicTSDB}. + *

+ * Timestamps + *

+ * Timestamps are presented as Unix Epoch values in units of {@link TimeUnit#SECONDS}, + * {@link TimeUnit#MILLISECONDS} or {@link TimeUnit#NANOSECONDS} based on the + * {@code timestampunits} property. For calls to {@link DB#insert(String, String, java.util.Map)} + * and {@link DB#update(String, String, java.util.Map)}, the timestamp is added to the + * {@code values} map encoded in a {@link NumericByteIterator} with the key defined + * in the {@code timestampkey} property (defaulting to "YCSBTS"). To pull out the timestamp + * when iterating over the values map, cast the {@link ByteIterator} to a + * {@link NumericByteIterator} and call {@link NumericByteIterator#getLong()}. + *

+ * Note that for calls to {@link DB#update(String, String, java.util.Map)}, timestamps + * earlier than the timestamp generator's timestamp will be choosen at random to + * mimic a lambda architecture or old job re-reporting some data. + *

+ * For calls to {@link DB#read(String, String, java.util.Set, java.util.Map)} and + * {@link DB#scan(String, String, int, java.util.Set, Vector)}, timestamps + * are encoded in a {@link StringByteIterator} in a key/value format with the + * {@code tagpairdelimiter} separator. E.g {@code YCSBTS=1483228800}. If {@code querytimespan} + * has been set to a positive value then the value will include a range with the + * starting (oldest) timestamp followed by the {@code querytimespandelimiter} separator + * and the ending (most recent) timestamp. E.g. {@code YCSBTS=1483228800-1483228920}. + *

+ * For calls to {@link DB#delete(String, String)}, encoding is the same as reads and + * scans but key/value pairs are separated by the {@code deletedelimiter} property value. + *

+ * By default, the starting timestamp is the current system time without any rounding. + * All timestamps are then offsets from that starting value. + *

+ * Values + *

+ * Similar to timestamps, values are encoded in {@link NumericByteIterator}s and stored + * in the values map with the key defined in {@code valuekey} (defaulting to "YCSBV"). + * Values can either be 64 bit signed {@link long}s or double precision {@link double}s + * depending on the {@code valuetype} or {@code dataintegrity} properties. When parsing + * out the value, always call {@link NumericByteIterator#isFloatingPoint()} to determine + * whether or not to call {@link NumericByteIterator#getDouble()} (true) or + * {@link NumericByteIterator#getLong()} (false). + *

+ * When {@code dataintegrity} is set to true, then the value is always set to a + * 64 bit signed integer which is the Java hash code of the concatenation of the + * key and map of values (sorted on the map keys and skipping the timestamp and value + * entries) OR'd with the timestamp of the data point. See + * {@link #validationFunction(String, long, TreeMap)} for the implementation. + *

+ * Keys and Tags + *

+ * As mentioned, the workload generates strings for the keys and tags. On initialization + * three string generators are created using the {@link IncrementingPrintableStringGenerator} + * implementation. Then the generators fill three arrays with values based on the + * number of keys, the number of tags and the cardinality of each tag key/value pair. + * This implementation gives us time series like the example table where every string + * starts at something like "AA" (depending on the length of keys, tag keys and tag values) + * and continuing to "ZZ" wherein they rollover back to "AA". + *

+ * Each time series must have a unique set of tag keys, i.e. the key "AA" cannot appear + * more than once per time series. If the workload is configured for four tags with a + * tag key length of 2, the keys would be "AA", "AB", "AC" and "AD". + *

+ * Each tag key is then associated with a tag value. Tag values may appear more than once + * in each time series. E.g. time series will usually start with the tags "AA=AA", + * "AB=AA", "AC=AA" and "AD=AA". The {@code tagcardinality} property determines how many + * unique values will be generated per tag key. In the example table above, the + * {@code tagcardinality} property would have been set to {@code 1,2} meaning tag + * key "AA" would always have the tag value "AA" given a cardinality of 1. However + * tag key "AB" would have values "AA" and "AB" due to a cardinality of 2. This + * cardinality map, along with the number of unique time series keys determines how + * many unique time series are generated for the workload. Tag values share a common + * array of generated strings to save on memory. + *

+ * Operation Order + *

+ * The default behavior of the workload (for inserts and updates) is to generate a + * value for each time series for a given timestamp before incrementing to the next + * timestamp and writing values. This is an ideal workload and some time series + * databases are designed for this behavior. However in the real-world events will + * arrive grouped close to the current system time with a number of events being + * delayed, hence their timestamps are further in the past. The {@code delayedseries} + * property determines the percentage of time series that are delayed by up to + * {@code delayedintervals} intervals. E.g. setting this value to 0.05 means that + * 5% of the time series will be written with timestamps earlier than the timestamp + * generator's current time. + *

+ * Reads and Scans + *

+ * For benchmarking queries, some common tasks implemented by almost every time series + * data base are available and are passed in the fields {@link Set}: + *

+ * GroupBy - A common operation is to aggregate multiple time series into a + * single time series via common parameters. For example, a user may want to see the + * total network traffic in a data center so they'll issue a SQL query like: + * SELECT value FROM timeseriesdb GROUP BY datacenter ORDER BY SUM(value); + * If the {@code groupbyfunction} has been set to a group by function, then the fields + * will contain a key/value pair with the key set in {@code groupbykey}. E.g. + * {@code YCSBGB=SUM}. + *

+ * Additionally with grouping enabled, fields on tag keys where group bys should + * occur will only have the key defined and will not have a value or delimiter. E.g. + * if grouping on tag key "AA", the field will contain {@code AA} instead of {@code AA=AB}. + *

+ * Downsampling - Another common operation is to reduce the resolution of the + * queried time series when fetching a wide time range of data so fewer data points + * are returned. For example, a user may fetch a week of data but if the data is + * recorded on a 1 second interval, that would be over 600k data points so they + * may ask for a 1 hour downsampling (also called bucketing) wherein every hour, all + * of the data points for a "bucket" are aggregated into a single value. + *

+ * To enable downsampling, the {@code downsamplingfunction} property must be set to + * a supported function such as "SUM" and the {@code downsamplinginterval} must be + * set to a valid time interval with the same units as {@code timestampunits}, e.g. + * "3600" which would create 1 hour buckets if the time units were set to seconds. + * With downsampling, query fields will include a key/value pair with + * {@code downsamplingkey} as the key (defaulting to "YCSBDS") and the value being + * a concatenation of {@code downsamplingfunction} and {@code downsamplinginterval}, + * for example {@code YCSBDS=SUM60}. + *

+ * Timestamps - For every read, a random timestamp is selected from the interval + * set. If {@code querytimespan} has been set to a positive value, then the configured + * query time interval is added to the selected timestamp so the read passes the DB + * a range of times. Note that during the run phase, if no data was previously loaded, + * or if there are more {@code recordcount}s set for the run phase, reads may be sent + * to the DB with timestamps that are beyond the written data time range (or even the + * system clock of the DB). + *

+ * Deletes + *

+ * Because the delete API only accepts a single key, a full key and tag key/value + * pair map is flattened into a single string for parsing by the database. Common + * workloads include deleting a single time series (wherein all tag key and values are + * defined), deleting all series containing a tag key and value or deleting all of the + * time series sharing a common time series key. + *

+ * Right now the workload supports deletes with a key and for time series tag key/value + * pairs or a key with tags and a group by on one or more tags (meaning, delete all of + * the series with any value for the given tag key). The parameters are collapsed into + * a single string delimited with the character in the {@code deletedelimiter} property. + * For example, a delete request may look like: {@code AA:AA=AA:AA=AB} to delete the + * first time series in the table above. + *

+ * Threads + *

+ * For a multi-threaded execution, the number of time series keys set via the + * {@code fieldcount} property, must be greater than or equal to the number of + * threads set via {@code threads}. This is due to each thread choosing a subset + * of the total number of time series keys and being responsible for writing values + * for each time series containing those keys at each timestamp. Thus each thread + * will have it's own timestamp generator, incrementing each time every time series + * it is responsible for has had a value written. + *

+ * Each thread may, however, issue reads and scans for any time series in the + * complete set. + *

+ * Sparsity + *

+ * By default, during loads, every time series will have a data point written at every + * time stamp in the interval set. This is common in workloads where a sensor writes + * a value at regular intervals. However some time series are only reported under + * certain conditions. + *

+ * For example, a counter may track the number of errors over a + * time period for a web service and only report when the value is greater than 1. + * Or a time series may include tags such as a user ID and IP address when a request + * arrives at the web service and only report values when that combination is seen. + * This means the timeseries will not have a value at every timestamp and in + * some cases there may be only a single value! + *

+ * This workload has a {@code sparsity} parameter that can choose how often a + * time series should record a value. The default value of 0.0 means every series + * will get a value at every timestamp. A value of 0.95 will mean that for each + * series, only 5% of the timestamps in the interval will have a value. The distribution + * of values is random. + *

+ * Notes/Warnings + *

+ *

+ *

+ * TODOs + *

+ *

+ */ +public class TimeSeriesWorkload extends Workload { + + /** + * The types of values written to the timeseries store. + */ + public enum ValueType { + INTEGERS("integers"), + FLOATS("floats"), + MIXED("mixednumbers"); + + protected final String name; + + ValueType(final String name) { + this.name = name; + } + + public static ValueType fromString(final String name) { + for (final ValueType type : ValueType.values()) { + if (type.name.equalsIgnoreCase(name)) { + return type; + } + } + throw new IllegalArgumentException("Unrecognized type: " + name); + } + } + + /** Name and default value for the timestamp key property. */ + public static final String TIMESTAMP_KEY_PROPERTY = "timestampkey"; + public static final String TIMESTAMP_KEY_PROPERTY_DEFAULT = "YCSBTS"; + + /** Name and default value for the value key property. */ + public static final String VALUE_KEY_PROPERTY = "valuekey"; + public static final String VALUE_KEY_PROPERTY_DEFAULT = "YCSBV"; + + /** Name and default value for the timestamp interval property. */ + public static final String TIMESTAMP_INTERVAL_PROPERTY = "timestampinterval"; + public static final String TIMESTAMP_INTERVAL_PROPERTY_DEFAULT = "60"; + + /** Name and default value for the timestamp units property. */ + public static final String TIMESTAMP_UNITS_PROPERTY = "timestampunits"; + public static final String TIMESTAMP_UNITS_PROPERTY_DEFAULT = "SECONDS"; + + /** Name and default value for the number of tags property. */ + public static final String TAG_COUNT_PROPERTY = "tagcount"; + public static final String TAG_COUNT_PROPERTY_DEFAULT = "4"; + + /** Name and default value for the tag value cardinality map property. */ + public static final String TAG_CARDINALITY_PROPERTY = "tagcardinality"; + public static final String TAG_CARDINALITY_PROPERTY_DEFAULT = "1, 2, 4, 8"; + + /** Name and default value for the tag key length property. */ + public static final String TAG_KEY_LENGTH_PROPERTY = "tagkeylength"; + public static final String TAG_KEY_LENGTH_PROPERTY_DEFAULT = "8"; + + /** Name and default value for the tag value length property. */ + public static final String TAG_VALUE_LENGTH_PROPERTY = "tagvaluelength"; + public static final String TAG_VALUE_LENGTH_PROPERTY_DEFAULT = "8"; + + /** Name and default value for the tag pair delimiter property. */ + public static final String PAIR_DELIMITER_PROPERTY = "tagpairdelimiter"; + public static final String PAIR_DELIMITER_PROPERTY_DEFAULT = "="; + + /** Name and default value for the delete string delimiter property. */ + public static final String DELETE_DELIMITER_PROPERTY = "deletedelimiter"; + public static final String DELETE_DELIMITER_PROPERTY_DEFAULT = ":"; + + /** Name and default value for the random timestamp write order property. */ + public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY = "randomwritetimestamporder"; + public static final String RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT = "false"; + + /** Name and default value for the random time series write order property. */ + public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY = "randomtimeseriesorder"; + public static final String RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT = "true"; + + /** Name and default value for the value types property. */ + public static final String VALUE_TYPE_PROPERTY = "valuetype"; + public static final String VALUE_TYPE_PROPERTY_DEFAULT = "floats"; + + /** Name and default value for the sparsity property. */ + public static final String SPARSITY_PROPERTY = "sparsity"; + public static final String SPARSITY_PROPERTY_DEFAULT = "0.00"; + + /** Name and default value for the delayed series percentage property. */ + public static final String DELAYED_SERIES_PROPERTY = "delayedseries"; + public static final String DELAYED_SERIES_PROPERTY_DEFAULT = "0.10"; + + /** Name and default value for the delayed series intervals property. */ + public static final String DELAYED_INTERVALS_PROPERTY = "delayedintervals"; + public static final String DELAYED_INTERVALS_PROPERTY_DEFAULT = "5"; + + /** Name and default value for the query time span property. */ + public static final String QUERY_TIMESPAN_PROPERTY = "querytimespan"; + public static final String QUERY_TIMESPAN_PROPERTY_DEFAULT = "0"; + + /** Name and default value for the randomized query time span property. */ + public static final String QUERY_RANDOM_TIMESPAN_PROPERTY = "queryrandomtimespan"; + public static final String QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT = "false"; + + /** Name and default value for the query time stamp delimiter property. */ + public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY = "querytimespandelimiter"; + public static final String QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT = ","; + + /** Name and default value for the group-by key property. */ + public static final String GROUPBY_KEY_PROPERTY = "groupbykey"; + public static final String GROUPBY_KEY_PROPERTY_DEFAULT = "YCSBGB"; + + /** Name and default value for the group-by function property. */ + public static final String GROUPBY_PROPERTY = "groupbyfunction"; + + /** Name and default value for the group-by key map property. */ + public static final String GROUPBY_KEYS_PROPERTY = "groupbykeys"; + + /** Name and default value for the downsampling key property. */ + public static final String DOWNSAMPLING_KEY_PROPERTY = "downsamplingkey"; + public static final String DOWNSAMPLING_KEY_PROPERTY_DEFAULT = "YCSBDS"; + + /** Name and default value for the downsampling function property. */ + public static final String DOWNSAMPLING_FUNCTION_PROPERTY = "downsamplingfunction"; + + /** Name and default value for the downsampling interval property. */ + public static final String DOWNSAMPLING_INTERVAL_PROPERTY = "downsamplinginterval"; + + /** The properties to pull settings from. */ + protected Properties properties; + + /** Generators for keys, tag keys and tag values. */ + protected Generator keyGenerator; + protected Generator tagKeyGenerator; + protected Generator tagValueGenerator; + + /** The timestamp key, defaults to "YCSBTS". */ + protected String timestampKey; + + /** The value key, defaults to "YCSBDS". */ + protected String valueKey; + + /** The number of time units in between timestamps. */ + protected int timestampInterval; + + /** The units of time the timestamp and various intervals represent. */ + protected TimeUnit timeUnits; + + /** Whether or not to randomize the timestamp order when writing. */ + protected boolean randomizeTimestampOrder; + + /** Whether or not to randomize (shuffle) the time series order. NOT compatible + * with data integrity. */ + protected boolean randomizeTimeseriesOrder; + + /** The type of values to generate when writing data. */ + protected ValueType valueType; + + /** Used to calculate an offset for each time series. */ + protected int[] cumulativeCardinality; + + /** The calculated total cardinality based on the config. */ + protected int totalCardinality; + + /** The calculated per-time-series-key cardinality. I.e. the number of unique + * tag key and value combinations. */ + protected int perKeyCardinality; + + /** How much data to scan for in each call. */ + protected NumberGenerator scanlength; + + /** A generator used to select a random time series key per read/scan. */ + protected NumberGenerator keychooser; + + /** A generator to select what operation to perform during the run phase. */ + protected DiscreteGenerator operationchooser; + + /** The maximum number of interval offsets from the starting timestamp. Calculated + * based on the number of records configured for the run. */ + protected int maxOffsets; + + /** The number of records or operations to perform for this run. */ + protected int recordcount; + + /** The number of tag pairs per time series. */ + protected int tagPairs; + + /** The table we'll write to. */ + protected String table; + + /** How many time series keys will be generated. */ + protected int numKeys; + + /** The generated list of possible time series key values. */ + protected String[] keys; + + /** The generated list of possible tag key values. */ + protected String[] tagKeys; + + /** The generated list of possible tag value values. */ + protected String[] tagValues; + + /** The cardinality for each tag key. */ + protected int[] tagCardinality; + + /** A helper to skip non-incrementing tag values. */ + protected int firstIncrementableCardinality; + + /** How sparse the data written should be. */ + protected double sparsity; + + /** The percentage of time series that should be delayed in writes. */ + protected double delayedSeries; + + /** The maximum number of intervals to delay a series. */ + protected int delayedIntervals; + + /** Optional query time interval during reads/scans. */ + protected int queryTimeSpan; + + /** Whether or not the actual interval should be randomly chosen, using + * queryTimeSpan as the maximum value. */ + protected boolean queryRandomTimeSpan; + + /** The delimiter for tag pairs in fields. */ + protected String tagPairDelimiter; + + /** The delimiter between parameters for the delete key. */ + protected String deleteDelimiter; + + /** The delimiter between timestamps for query time spans. */ + protected String queryTimeSpanDelimiter; + + /** Whether or not to issue group-by queries. */ + protected boolean groupBy; + + /** The key used for group-by tag keys. */ + protected String groupByKey; + + /** The function used for group-by's. */ + protected String groupByFunction; + + /** The tag keys to group on. */ + protected boolean[] groupBys; + + /** Whether or not to issue downsampling queries. */ + protected boolean downsample; + + /** The key used for downsampling tag keys. */ + protected String downsampleKey; + + /** The downsampling function. */ + protected String downsampleFunction; + + /** The downsampling interval. */ + protected int downsampleInterval; + + /** + * Set to true if want to check correctness of reads. Must also + * be set to true during loading phase to function. + */ + protected boolean dataintegrity; + + /** Measurements to write data integrity results to. */ + protected Measurements measurements = Measurements.getMeasurements(); + + @Override + public void init(final Properties p) throws WorkloadException { + properties = p; + recordcount = + Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, + Client.DEFAULT_RECORD_COUNT)); + if (recordcount == 0) { + recordcount = Integer.MAX_VALUE; + } + timestampKey = p.getProperty(TIMESTAMP_KEY_PROPERTY, TIMESTAMP_KEY_PROPERTY_DEFAULT); + valueKey = p.getProperty(VALUE_KEY_PROPERTY, VALUE_KEY_PROPERTY_DEFAULT); + operationchooser = CoreWorkload.createOperationGenerator(properties); + + final int maxscanlength = + Integer.parseInt(p.getProperty(CoreWorkload.MAX_SCAN_LENGTH_PROPERTY, + CoreWorkload.MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); + String scanlengthdistrib = + p.getProperty(CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY, + CoreWorkload.SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); + + if (scanlengthdistrib.compareTo("uniform") == 0) { + scanlength = new UniformLongGenerator(1, maxscanlength); + } else if (scanlengthdistrib.compareTo("zipfian") == 0) { + scanlength = new ZipfianGenerator(1, maxscanlength); + } else { + throw new WorkloadException( + "Distribution \"" + scanlengthdistrib + "\" not allowed for scan length"); + } + + randomizeTimestampOrder = Boolean.parseBoolean(p.getProperty( + RANDOMIZE_TIMESTAMP_ORDER_PROPERTY, + RANDOMIZE_TIMESTAMP_ORDER_PROPERTY_DEFAULT)); + randomizeTimeseriesOrder = Boolean.parseBoolean(p.getProperty( + RANDOMIZE_TIMESERIES_ORDER_PROPERTY, + RANDOMIZE_TIMESERIES_ORDER_PROPERTY_DEFAULT)); + + // setup the cardinality + numKeys = Integer.parseInt(p.getProperty(CoreWorkload.FIELD_COUNT_PROPERTY, + CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); + tagPairs = Integer.parseInt(p.getProperty(TAG_COUNT_PROPERTY, + TAG_COUNT_PROPERTY_DEFAULT)); + sparsity = Double.parseDouble(p.getProperty(SPARSITY_PROPERTY, SPARSITY_PROPERTY_DEFAULT)); + tagCardinality = new int[tagPairs]; + + final String requestdistrib = + p.getProperty(CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY, + CoreWorkload.REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); + if (requestdistrib.compareTo("uniform") == 0) { + keychooser = new UniformLongGenerator(0, numKeys - 1); + } else if (requestdistrib.compareTo("sequential") == 0) { + keychooser = new SequentialGenerator(0, numKeys - 1); + } else if (requestdistrib.compareTo("zipfian") == 0) { + keychooser = new ScrambledZipfianGenerator(0, numKeys - 1); + //} else if (requestdistrib.compareTo("latest") == 0) { + // keychooser = new SkewedLatestGenerator(transactioninsertkeysequence); + } else if (requestdistrib.equals("hotspot")) { + double hotsetfraction = + Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_DATA_FRACTION, + CoreWorkload.HOTSPOT_DATA_FRACTION_DEFAULT)); + double hotopnfraction = + Double.parseDouble(p.getProperty(CoreWorkload.HOTSPOT_OPN_FRACTION, + CoreWorkload.HOTSPOT_OPN_FRACTION_DEFAULT)); + keychooser = new HotspotIntegerGenerator(0, numKeys - 1, + hotsetfraction, hotopnfraction); + } else { + throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\""); + } + + // figure out the start timestamp based on the units, cardinality and interval + try { + timestampInterval = Integer.parseInt(p.getProperty( + TIMESTAMP_INTERVAL_PROPERTY, TIMESTAMP_INTERVAL_PROPERTY_DEFAULT)); + } catch (NumberFormatException nfe) { + throw new WorkloadException("Unable to parse the " + + TIMESTAMP_INTERVAL_PROPERTY, nfe); + } + + try { + timeUnits = TimeUnit.valueOf(p.getProperty(TIMESTAMP_UNITS_PROPERTY, + TIMESTAMP_UNITS_PROPERTY_DEFAULT).toUpperCase()); + } catch (IllegalArgumentException e) { + throw new WorkloadException("Unknown time unit type", e); + } + if (timeUnits == TimeUnit.NANOSECONDS || timeUnits == TimeUnit.MICROSECONDS) { + throw new WorkloadException("YCSB doesn't support " + timeUnits + + " at this time."); + } + + tagPairDelimiter = p.getProperty(PAIR_DELIMITER_PROPERTY, PAIR_DELIMITER_PROPERTY_DEFAULT); + deleteDelimiter = p.getProperty(DELETE_DELIMITER_PROPERTY, DELETE_DELIMITER_PROPERTY_DEFAULT); + dataintegrity = Boolean.parseBoolean( + p.getProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, + CoreWorkload.DATA_INTEGRITY_PROPERTY_DEFAULT)); + + queryTimeSpan = Integer.parseInt(p.getProperty(QUERY_TIMESPAN_PROPERTY, + QUERY_TIMESPAN_PROPERTY_DEFAULT)); + queryRandomTimeSpan = Boolean.parseBoolean(p.getProperty(QUERY_RANDOM_TIMESPAN_PROPERTY, + QUERY_RANDOM_TIMESPAN_PROPERTY_DEFAULT)); + queryTimeSpanDelimiter = p.getProperty(QUERY_TIMESPAN_DELIMITER_PROPERTY, + QUERY_TIMESPAN_DELIMITER_PROPERTY_DEFAULT); + + groupByKey = p.getProperty(GROUPBY_KEY_PROPERTY, GROUPBY_KEY_PROPERTY_DEFAULT); + groupByFunction = p.getProperty(GROUPBY_PROPERTY); + if (groupByFunction != null && !groupByFunction.isEmpty()) { + final String groupByKeys = p.getProperty(GROUPBY_KEYS_PROPERTY); + if (groupByKeys == null || groupByKeys.isEmpty()) { + throw new WorkloadException("Group by was enabled but no keys were specified."); + } + final String[] gbKeys = groupByKeys.split(","); + if (gbKeys.length != tagKeys.length) { + throw new WorkloadException("Only " + gbKeys.length + " group by keys " + + "were specified but there were " + tagKeys.length + " tag keys given."); + } + groupBys = new boolean[gbKeys.length]; + for (int i = 0; i < gbKeys.length; i++) { + groupBys[i] = Integer.parseInt(gbKeys[i].trim()) == 0 ? false : true; + } + groupBy = true; + } + + downsampleKey = p.getProperty(DOWNSAMPLING_KEY_PROPERTY, DOWNSAMPLING_KEY_PROPERTY_DEFAULT); + downsampleFunction = p.getProperty(DOWNSAMPLING_FUNCTION_PROPERTY); + if (downsampleFunction != null && !downsampleFunction.isEmpty()) { + final String interval = p.getProperty(DOWNSAMPLING_INTERVAL_PROPERTY); + if (interval == null || interval.isEmpty()) { + throw new WorkloadException("'" + DOWNSAMPLING_INTERVAL_PROPERTY + "' was missing despite '" + + DOWNSAMPLING_FUNCTION_PROPERTY + "' being set."); + } + downsampleInterval = Integer.parseInt(interval); + downsample = true; + } + + delayedSeries = Double.parseDouble(p.getProperty(DELAYED_SERIES_PROPERTY, DELAYED_SERIES_PROPERTY_DEFAULT)); + delayedIntervals = Integer.parseInt(p.getProperty(DELAYED_INTERVALS_PROPERTY, DELAYED_INTERVALS_PROPERTY_DEFAULT)); + + valueType = ValueType.fromString(p.getProperty(VALUE_TYPE_PROPERTY, VALUE_TYPE_PROPERTY_DEFAULT)); + table = p.getProperty(CoreWorkload.TABLENAME_PROPERTY, CoreWorkload.TABLENAME_PROPERTY_DEFAULT); + initKeysAndTags(); + validateSettings(); + } + + @Override + public Object initThread(Properties p, int mythreadid, int threadcount) throws WorkloadException { + if (properties == null) { + throw new WorkloadException("Workload has not been initialized."); + } + return new ThreadState(mythreadid, threadcount); + } + + @Override + public boolean doInsert(DB db, Object threadstate) { + if (threadstate == null) { + throw new IllegalStateException("Missing thread state."); + } + final Map tags = new TreeMap(); + final String key = ((ThreadState)threadstate).nextDataPoint(tags, true); + if (db.insert(table, key, tags) == Status.OK) { + return true; + } + return false; + } + + @Override + public boolean doTransaction(DB db, Object threadstate) { + if (threadstate == null) { + throw new IllegalStateException("Missing thread state."); + } + switch (operationchooser.nextString()) { + case "READ": + doTransactionRead(db, threadstate); + break; + case "UPDATE": + doTransactionUpdate(db, threadstate); + break; + case "INSERT": + doTransactionInsert(db, threadstate); + break; + case "SCAN": + doTransactionScan(db, threadstate); + break; + case "DELETE": + doTransactionDelete(db, threadstate); + break; + default: + return false; + } + return true; + } + + protected void doTransactionRead(final DB db, Object threadstate) { + final ThreadState state = (ThreadState) threadstate; + final String keyname = keys[keychooser.nextValue().intValue()]; + + int offsets = state.queryOffsetGenerator.nextValue().intValue(); + //int offsets = Utils.random().nextInt(maxOffsets - 1); + final long startTimestamp; + if (offsets > 0) { + startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); + } else { + startTimestamp = state.startTimestamp; + } + + // rando tags + Set fields = new HashSet(); + for (int i = 0; i < tagPairs; ++i) { + if (groupBy && groupBys[i]) { + fields.add(tagKeys[i]); + } else { + fields.add(tagKeys[i] + tagPairDelimiter + + tagValues[Utils.random().nextInt(tagCardinality[i])]); + } + } + + if (queryTimeSpan > 0) { + final long endTimestamp; + if (queryRandomTimeSpan) { + endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval)); + } else { + endTimestamp = startTimestamp + queryTimeSpan; + } + fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); + } else { + fields.add(timestampKey + tagPairDelimiter + startTimestamp); + } + if (groupBy) { + fields.add(groupByKey + tagPairDelimiter + groupByFunction); + } + if (downsample) { + fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + downsampleInterval); + } + + final Map cells = new HashMap(); + final Status status = db.read(table, keyname, fields, cells); + + if (dataintegrity && status == Status.OK) { + verifyRow(keyname, cells); + } + } + + protected void doTransactionUpdate(final DB db, Object threadstate) { + if (threadstate == null) { + throw new IllegalStateException("Missing thread state."); + } + final Map tags = new TreeMap(); + final String key = ((ThreadState)threadstate).nextDataPoint(tags, false); + db.update(table, key, tags); + } + + protected void doTransactionInsert(final DB db, Object threadstate) { + doInsert(db, threadstate); + } + + protected void doTransactionScan(final DB db, Object threadstate) { + final ThreadState state = (ThreadState) threadstate; + + final String keyname = keys[Utils.random().nextInt(keys.length)]; + + // choose a random scan length + int len = scanlength.nextValue().intValue(); + + int offsets = Utils.random().nextInt(maxOffsets - 1); + final long startTimestamp; + if (offsets > 0) { + startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); + } else { + startTimestamp = state.startTimestamp; + } + + // rando tags + Set fields = new HashSet(); + for (int i = 0; i < tagPairs; ++i) { + if (groupBy && groupBys[i]) { + fields.add(tagKeys[i]); + } else { + fields.add(tagKeys[i] + tagPairDelimiter + + tagValues[Utils.random().nextInt(tagCardinality[i])]); + } + } + + if (queryTimeSpan > 0) { + final long endTimestamp; + if (queryRandomTimeSpan) { + endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval)); + } else { + endTimestamp = startTimestamp + queryTimeSpan; + } + fields.add(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); + } else { + fields.add(timestampKey + tagPairDelimiter + startTimestamp); + } + if (groupBy) { + fields.add(groupByKey + tagPairDelimiter + groupByFunction); + } + if (downsample) { + fields.add(downsampleKey + tagPairDelimiter + downsampleFunction + tagPairDelimiter + downsampleInterval); + } + + final Vector> results = new Vector>(); + db.scan(table, keyname, len, fields, results); + } + + protected void doTransactionDelete(final DB db, Object threadstate) { + final ThreadState state = (ThreadState) threadstate; + + final StringBuilder buf = new StringBuilder().append(keys[Utils.random().nextInt(keys.length)]); + + int offsets = Utils.random().nextInt(maxOffsets - 1); + final long startTimestamp; + if (offsets > 0) { + startTimestamp = state.startTimestamp + state.timestampGenerator.getOffset(offsets); + } else { + startTimestamp = state.startTimestamp; + } + + // rando tags + for (int i = 0; i < tagPairs; ++i) { + if (groupBy && groupBys[i]) { + buf.append(deleteDelimiter) + .append(tagKeys[i]); + } else { + buf.append(deleteDelimiter).append(tagKeys[i] + tagPairDelimiter + + tagValues[Utils.random().nextInt(tagCardinality[i])]); + } + } + + if (queryTimeSpan > 0) { + final long endTimestamp; + if (queryRandomTimeSpan) { + endTimestamp = startTimestamp + (timestampInterval * Utils.random().nextInt(queryTimeSpan / timestampInterval)); + } else { + endTimestamp = startTimestamp + queryTimeSpan; + } + buf.append(deleteDelimiter) + .append(timestampKey + tagPairDelimiter + startTimestamp + queryTimeSpanDelimiter + endTimestamp); + } else { + buf.append(deleteDelimiter) + .append(timestampKey + tagPairDelimiter + startTimestamp); + } + + db.delete(table, buf.toString()); + } + + /** + * Parses the values returned by a read or scan operation and determines whether + * or not the integer value matches the hash and timestamp of the original timestamp. + * Only works for raw data points, will not work for group-by's or downsampled data. + * @param key The time series key. + * @param cells The cells read by the DB. + * @return {@link Status#OK} if the data matched or {@link Status#UNEXPECTED_STATE} if + * the data did not match. + */ + protected Status verifyRow(final String key, final Map cells) { + Status verifyStatus = Status.UNEXPECTED_STATE; + long startTime = System.nanoTime(); + + double value = 0; + long timestamp = 0; + final TreeMap validationTags = new TreeMap(); + for (final Entry entry : cells.entrySet()) { + if (entry.getKey().equals(timestampKey)) { + final NumericByteIterator it = (NumericByteIterator) entry.getValue(); + timestamp = it.getLong(); + } else if (entry.getKey().equals(valueKey)) { + final NumericByteIterator it = (NumericByteIterator) entry.getValue(); + value = it.isFloatingPoint() ? it.getDouble() : it.getLong(); + } else { + validationTags.put(entry.getKey(), entry.getValue().toString()); + } + } + + if (validationFunction(key, timestamp, validationTags) == value) { + verifyStatus = Status.OK; + } + long endTime = System.nanoTime(); + measurements.measure("VERIFY", (int) (endTime - startTime) / 1000); + measurements.reportStatus("VERIFY", verifyStatus); + return verifyStatus; + } + + /** + * Function used for generating a deterministic hash based on the combination + * of metric, tags and timestamp. + * @param key A non-null string representing the key. + * @param timestamp A timestamp in the proper units for the workload. + * @param tags A non-null map of tag keys and values NOT including the YCSB + * key or timestamp. + * @return A hash value as an 8 byte integer. + */ + protected long validationFunction(final String key, final long timestamp, + final TreeMap tags) { + final StringBuilder validationBuffer = new StringBuilder(keys[0].length() + + (tagPairs * tagKeys[0].length()) + (tagPairs * tagCardinality[1])); + for (final Entry pair : tags.entrySet()) { + validationBuffer.append(pair.getKey()).append(pair.getValue()); + } + return (long) validationBuffer.toString().hashCode() ^ timestamp; + } + + /** + * Breaks out the keys, tags and cardinality initialization in another method + * to keep CheckStyle happy. + * @throws WorkloadException If something goes pear shaped. + */ + protected void initKeysAndTags() throws WorkloadException { + final int keyLength = Integer.parseInt(properties.getProperty( + CoreWorkload.FIELD_LENGTH_PROPERTY, + CoreWorkload.FIELD_LENGTH_PROPERTY_DEFAULT)); + final int tagKeyLength = Integer.parseInt(properties.getProperty( + TAG_KEY_LENGTH_PROPERTY, TAG_KEY_LENGTH_PROPERTY_DEFAULT)); + final int tagValueLength = Integer.parseInt(properties.getProperty( + TAG_VALUE_LENGTH_PROPERTY, TAG_VALUE_LENGTH_PROPERTY_DEFAULT)); + + keyGenerator = new IncrementingPrintableStringGenerator(keyLength); + tagKeyGenerator = new IncrementingPrintableStringGenerator(tagKeyLength); + tagValueGenerator = new IncrementingPrintableStringGenerator(tagValueLength); + + final int threads = Integer.parseInt(properties.getProperty(Client.THREAD_COUNT_PROPERTY, "1")); + final String tagCardinalityString = properties.getProperty( + TAG_CARDINALITY_PROPERTY, + TAG_CARDINALITY_PROPERTY_DEFAULT); + final String[] tagCardinalityParts = tagCardinalityString.split(","); + int idx = 0; + totalCardinality = numKeys; + perKeyCardinality = 1; + int maxCardinality = 0; + for (final String card : tagCardinalityParts) { + try { + tagCardinality[idx] = Integer.parseInt(card.trim()); + } catch (NumberFormatException nfe) { + throw new WorkloadException("Unable to parse cardinality: " + + card, nfe); + } + if (tagCardinality[idx] < 1) { + throw new WorkloadException("Cardinality must be greater than zero: " + + tagCardinality[idx]); + } + totalCardinality *= tagCardinality[idx]; + perKeyCardinality *= tagCardinality[idx]; + if (tagCardinality[idx] > maxCardinality) { + maxCardinality = tagCardinality[idx]; + } + ++idx; + if (idx >= tagPairs) { + // we have more cardinalities than tag keys so bail at this point. + break; + } + } + if (numKeys < threads) { + throw new WorkloadException("Field count " + numKeys + " (keys for time " + + "series workloads) must be greater or equal to the number of " + + "threads " + threads); + } + + // fill tags without explicit cardinality with 1 + if (idx < tagPairs) { + tagCardinality[idx++] = 1; + } + + for (int i = 0; i < tagCardinality.length; ++i) { + if (tagCardinality[i] > 1) { + firstIncrementableCardinality = i; + break; + } + } + + keys = new String[numKeys]; + tagKeys = new String[tagPairs]; + tagValues = new String[maxCardinality]; + for (int i = 0; i < numKeys; ++i) { + keys[i] = keyGenerator.nextString(); + } + + for (int i = 0; i < tagPairs; ++i) { + tagKeys[i] = tagKeyGenerator.nextString(); + } + + for (int i = 0; i < maxCardinality; i++) { + tagValues[i] = tagValueGenerator.nextString(); + } + if (randomizeTimeseriesOrder) { + Utils.shuffleArray(keys); + Utils.shuffleArray(tagValues); + } + + maxOffsets = (recordcount / totalCardinality) + 1; + final int[] keyAndTagCardinality = new int[tagPairs + 1]; + keyAndTagCardinality[0] = numKeys; + for (int i = 0; i < tagPairs; i++) { + keyAndTagCardinality[i + 1] = tagCardinality[i]; + } + + cumulativeCardinality = new int[keyAndTagCardinality.length]; + for (int i = 0; i < keyAndTagCardinality.length; i++) { + int cumulation = 1; + for (int x = i; x <= keyAndTagCardinality.length - 1; x++) { + cumulation *= keyAndTagCardinality[x]; + } + if (i > 0) { + cumulativeCardinality[i - 1] = cumulation; + } + } + cumulativeCardinality[cumulativeCardinality.length - 1] = 1; + } + + /** + * Makes sure the settings as given are compatible. + * @throws WorkloadException If one or more settings were invalid. + */ + protected void validateSettings() throws WorkloadException { + if (dataintegrity) { + if (valueType != ValueType.INTEGERS) { + throw new WorkloadException("Data integrity was enabled. 'valuetype' must " + + "be set to 'integers'."); + } + if (groupBy) { + throw new WorkloadException("Data integrity was enabled. 'groupbyfunction' must " + + "be empty or null."); + } + if (downsample) { + throw new WorkloadException("Data integrity was enabled. 'downsamplingfunction' must " + + "be empty or null."); + } + if (queryTimeSpan > 0) { + throw new WorkloadException("Data integrity was enabled. 'querytimespan' must " + + "be empty or 0."); + } + if (randomizeTimeseriesOrder) { + throw new WorkloadException("Data integrity was enabled. 'randomizetimeseriesorder' must " + + "be false."); + } + final String startTimestamp = properties.getProperty(CoreWorkload.INSERT_START_PROPERTY); + if (startTimestamp == null || startTimestamp.isEmpty()) { + throw new WorkloadException("Data integrity was enabled. 'insertstart' must " + + "be set to a Unix Epoch timestamp."); + } + } + } + + /** + * Thread state class holding thread local generators and indices. + */ + protected class ThreadState { + /** The timestamp generator for this thread. */ + protected final UnixEpochTimestampGenerator timestampGenerator; + + /** An offset generator to select a random offset for queries. */ + protected final NumberGenerator queryOffsetGenerator; + + /** The current write key index. */ + protected int keyIdx; + + /** The starting fence for writing keys. */ + protected int keyIdxStart; + + /** The ending fence for writing keys. */ + protected int keyIdxEnd; + + /** Indices for each tag value for writes. */ + protected int[] tagValueIdxs; + + /** Whether or not all time series have written values for the current timestamp. */ + protected boolean rollover; + + /** The starting timestamp. */ + protected long startTimestamp; + + /** + * Default ctor. + * @param threadID The zero based thread ID. + * @param threadCount The total number of threads. + * @throws WorkloadException If something went pear shaped. + */ + protected ThreadState(final int threadID, final int threadCount) throws WorkloadException { + int totalThreads = threadCount > 0 ? threadCount : 1; + + if (threadID >= totalThreads) { + throw new IllegalStateException("Thread ID " + threadID + " cannot be greater " + + "than or equal than the thread count " + totalThreads); + } + if (keys.length < threadCount) { + throw new WorkloadException("Thread count " + totalThreads + " must be greater " + + "than or equal to key count " + keys.length); + } + + int keysPerThread = keys.length / totalThreads; + keyIdx = keysPerThread * threadID; + keyIdxStart = keyIdx; + if (totalThreads - 1 == threadID) { + keyIdxEnd = keys.length; + } else { + keyIdxEnd = keyIdxStart + keysPerThread; + } + + tagValueIdxs = new int[tagPairs]; // all zeros + + final String startingTimestamp = + properties.getProperty(CoreWorkload.INSERT_START_PROPERTY); + if (startingTimestamp == null || startingTimestamp.isEmpty()) { + timestampGenerator = randomizeTimestampOrder ? + new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, maxOffsets) : + new UnixEpochTimestampGenerator(timestampInterval, timeUnits); + } else { + try { + timestampGenerator = randomizeTimestampOrder ? + new RandomDiscreteTimestampGenerator(timestampInterval, timeUnits, + Long.parseLong(startingTimestamp), maxOffsets) : + new UnixEpochTimestampGenerator(timestampInterval, timeUnits, + Long.parseLong(startingTimestamp)); + } catch (NumberFormatException nfe) { + throw new WorkloadException("Unable to parse the " + + CoreWorkload.INSERT_START_PROPERTY, nfe); + } + } + // Set the last value properly for the timestamp, otherwise it may start + // one interval ago. + startTimestamp = timestampGenerator.nextValue(); + // TODO - pick it + queryOffsetGenerator = new UniformLongGenerator(0, maxOffsets - 2); + } + + /** + * Generates the next write value for thread. + * @param map An initialized map to populate with tag keys and values as well + * as the timestamp and actual value. + * @param isInsert Whether or not it's an insert or an update. Updates will pick + * an older timestamp (if random isn't enabled). + * @return The next key to write. + */ + protected String nextDataPoint(final Map map, final boolean isInsert) { + int iterations = sparsity <= 0 ? 1 : + Utils.random().nextInt((int) ((double) perKeyCardinality * sparsity)); + if (iterations < 1) { + iterations = 1; + } + while (true) { + iterations--; + if (rollover) { + timestampGenerator.nextValue(); + rollover = false; + } + String key = null; + if (iterations <= 0) { + final TreeMap validationTags; + if (dataintegrity) { + validationTags = new TreeMap(); + } else { + validationTags = null; + } + key = keys[keyIdx]; + int overallIdx = keyIdx * cumulativeCardinality[0]; + for (int i = 0; i < tagPairs; ++i) { + int tvidx = tagValueIdxs[i]; + map.put(tagKeys[i], new StringByteIterator(tagValues[tvidx])); + if (dataintegrity) { + validationTags.put(tagKeys[i], tagValues[tvidx]); + } + if (delayedSeries > 0) { + overallIdx += (tvidx * cumulativeCardinality[i + 1]); + } + } + + if (!isInsert) { + final long delta = (timestampGenerator.currentValue() - startTimestamp) / timestampInterval; + final int intervals = Utils.random().nextInt((int) delta); + map.put(timestampKey, new NumericByteIterator(startTimestamp + (intervals * timestampInterval))); + } else if (delayedSeries > 0) { + // See if the series falls in a delay bucket and calculate an offset earlier + // than the current timestamp value if so. + double pct = (double) overallIdx / (double) totalCardinality; + if (pct < delayedSeries) { + int modulo = overallIdx % delayedIntervals; + if (modulo < 0) { + modulo *= -1; + } + map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue() - + timestampInterval * modulo)); + } else { + map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue())); + } + } else { + map.put(timestampKey, new NumericByteIterator(timestampGenerator.currentValue())); + } + + if (dataintegrity) { + map.put(valueKey, new NumericByteIterator(validationFunction(key, + timestampGenerator.currentValue(), validationTags))); + } else { + switch (valueType) { + case INTEGERS: + map.put(valueKey, new NumericByteIterator(Utils.random().nextInt())); + break; + case FLOATS: + map.put(valueKey, new NumericByteIterator( + Utils.random().nextDouble() * (double) 100000)); + break; + case MIXED: + if (Utils.random().nextBoolean()) { + map.put(valueKey, new NumericByteIterator(Utils.random().nextInt())); + } else { + map.put(valueKey, new NumericByteIterator( + Utils.random().nextDouble() * (double) 100000)); + } + break; + default: + throw new IllegalStateException("Somehow we didn't have a value " + + "type configured that we support: " + valueType); + } + } + } + + boolean tagRollover = false; + for (int i = tagCardinality.length - 1; i >= 0; --i) { + if (tagCardinality[i] <= 1) { + // nothing to increment here + continue; + } + + if (tagValueIdxs[i] + 1 >= tagCardinality[i]) { + tagValueIdxs[i] = 0; + if (i == firstIncrementableCardinality) { + tagRollover = true; + } + } else { + ++tagValueIdxs[i]; + break; + } + } + + if (tagRollover) { + if (keyIdx + 1 >= keyIdxEnd) { + keyIdx = keyIdxStart; + rollover = true; + } else { + ++keyIdx; + } + } + + if (iterations <= 0) { + return key; + } + } + } + } + +} \ No newline at end of file diff --git a/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java new file mode 100644 index 0000000000..55acc1f625 --- /dev/null +++ b/core/src/test/java/com/yahoo/ycsb/workloads/TestTimeSeriesWorkload.java @@ -0,0 +1,550 @@ +/** + * Copyright (c) 2017 YCSB contributors All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.workloads; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.Vector; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.Client; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.NumericByteIterator; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.Utils; +import com.yahoo.ycsb.WorkloadException; +import com.yahoo.ycsb.measurements.Measurements; + +import org.testng.annotations.Test; + +public class TestTimeSeriesWorkload { + + @Test + public void twoThreads() throws Exception { + final Properties p = getUTProperties(); + Measurements.setProperties(p); + + final TimeSeriesWorkload wl = new TimeSeriesWorkload(); + wl.init(p); + Object threadState = wl.initThread(p, 0, 2); + + MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + + threadState = wl.initThread(p, 1, 2); + db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAB"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + } + + @Test (expectedExceptions = WorkloadException.class) + public void badTimeUnit() throws Exception { + final Properties p = new Properties(); + p.put(TimeSeriesWorkload.TIMESTAMP_UNITS_PROPERTY, "foobar"); + getWorkload(p, true); + } + + @Test (expectedExceptions = WorkloadException.class) + public void failedToInitWorkloadBeforeThreadInit() throws Exception { + final Properties p = getUTProperties(); + final TimeSeriesWorkload wl = getWorkload(p, false); + //wl.init(p); // <-- we NEED this :( + final Object threadState = wl.initThread(p, 0, 2); + + final MockDB db = new MockDB(); + wl.doInsert(db, threadState); + } + + @Test (expectedExceptions = IllegalStateException.class) + public void failedToInitThread() throws Exception { + final Properties p = getUTProperties(); + final TimeSeriesWorkload wl = getWorkload(p, true); + + final MockDB db = new MockDB(); + wl.doInsert(db, null); + } + + @Test + public void insertOneKeyTwoTagsLowCardinality() throws Exception { + final Properties p = getUTProperties(); + p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1"); + final TimeSeriesWorkload wl = getWorkload(p, true); + final Object threadState = wl.initThread(p, 0, 1); + + final MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertTrue(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + } + + @Test + public void insertTwoKeysTwoTagsLowCardinality() throws Exception { + final Properties p = getUTProperties(); + + final TimeSeriesWorkload wl = getWorkload(p, true); + final Object threadState = wl.initThread(p, 0, 1); + + final MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + int metricCtr = 0; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertTrue(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + } + if (metricCtr++ > 1) { + assertEquals(db.keys.get(i), "AAAB"); + if (metricCtr >= 4) { + metricCtr = 0; + timestamp += 60; + } + } else { + assertEquals(db.keys.get(i), "AAAA"); + } + } + } + + @Test + public void insertTwoKeysTwoThreads() throws Exception { + final Properties p = getUTProperties(); + + final TimeSeriesWorkload wl = getWorkload(p, true); + Object threadState = wl.initThread(p, 0, 2); + + MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); // <-- key 1 + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertTrue(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + + threadState = wl.initThread(p, 1, 2); + db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAB"); // <-- key 2 + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + } + + @Test + public void insertThreeKeysTwoThreads() throws Exception { + // To make sure the distribution doesn't miss any metrics + final Properties p = getUTProperties(); + p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "3"); + + final TimeSeriesWorkload wl = getWorkload(p, true); + Object threadState = wl.initThread(p, 0, 2); + + MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertTrue(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + + threadState = wl.initThread(p, 1, 2); + db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + timestamp = 1451606400; + int metricCtr = 0; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertNotNull(db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)); + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + } + if (metricCtr++ > 1) { + assertEquals(db.keys.get(i), "AAAC"); + if (metricCtr >= 4) { + metricCtr = 0; + timestamp += 60; + } + } else { + assertEquals(db.keys.get(i), "AAAB"); + } + } + } + + @Test + public void insertWithValidation() throws Exception { + final Properties p = getUTProperties(); + p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "1"); + p.put(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); + p.put(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); + final TimeSeriesWorkload wl = getWorkload(p, true); + final Object threadState = wl.initThread(p, 0, 1); + + final MockDB db = new MockDB(); + for (int i = 0; i < 74; i++) { + assertTrue(wl.doInsert(db, threadState)); + } + + assertEquals(db.keys.size(), 74); + assertEquals(db.values.size(), 74); + long timestamp = 1451606400; + for (int i = 0; i < db.keys.size(); i++) { + assertEquals(db.keys.get(i), "AAAA"); + assertEquals(db.values.get(i).get("AA").toString(), "AAAA"); + assertEquals(Utils.bytesToLong(db.values.get(i).get( + TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT).toArray()), timestamp); + assertFalse(((NumericByteIterator) db.values.get(i) + .get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).isFloatingPoint()); + + // validation check + final TreeMap validationTags = new TreeMap(); + for (final Entry entry : db.values.get(i).entrySet()) { + if (entry.getKey().equals(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT) || + entry.getKey().equals(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT)) { + continue; + } + validationTags.put(entry.getKey(), entry.getValue().toString()); + } + assertEquals(wl.validationFunction(db.keys.get(i), timestamp, validationTags), + ((NumericByteIterator) db.values.get(i).get(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT)).getLong()); + + if (i % 2 == 0) { + assertEquals(db.values.get(i).get("AB").toString(), "AAAA"); + } else { + assertEquals(db.values.get(i).get("AB").toString(), "AAAB"); + timestamp += 60; + } + } + } + + @Test + public void read() throws Exception { + final Properties p = getUTProperties(); + final TimeSeriesWorkload wl = getWorkload(p, true); + final Object threadState = wl.initThread(p, 0, 1); + + final MockDB db = new MockDB(); + for (int i = 0; i < 20; i++) { + wl.doTransactionRead(db, threadState); + } + } + + @Test + public void verifyRow() throws Exception { + final Properties p = getUTProperties(); + final TimeSeriesWorkload wl = getWorkload(p, true); + + final TreeMap validationTags = new TreeMap(); + final HashMap cells = new HashMap(); + + validationTags.put("AA", "AAAA"); + cells.put("AA", new StringByteIterator("AAAA")); + validationTags.put("AB", "AAAB"); + cells.put("AB", new StringByteIterator("AAAB")); + long hash = wl.validationFunction("AAAA", 1451606400L, validationTags); + + cells.put(TimeSeriesWorkload.TIMESTAMP_KEY_PROPERTY_DEFAULT, new NumericByteIterator(1451606400L)); + cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash)); + + assertEquals(wl.verifyRow("AAAA", cells), Status.OK); + + // tweak the last value a bit + for (final ByteIterator it : cells.values()) { + it.reset(); + } + cells.put(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT, new NumericByteIterator(hash + 1)); + assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE); + + // no value cell, returns an unexpected state + for (final ByteIterator it : cells.values()) { + it.reset(); + } + cells.remove(TimeSeriesWorkload.VALUE_KEY_PROPERTY_DEFAULT); + assertEquals(wl.verifyRow("AAAA", cells), Status.UNEXPECTED_STATE); + } + + @Test + public void validateSettingsDataIntegrity() throws Exception { + Properties p = getUTProperties(); + + // data validation incompatibilities + p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + + p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); // now it's ok + p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, "sum"); // now it's not + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + + p.setProperty(TimeSeriesWorkload.GROUPBY_PROPERTY, ""); + p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, "sum"); + p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, "60"); + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + + p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_FUNCTION_PROPERTY, ""); + p.setProperty(TimeSeriesWorkload.DOWNSAMPLING_INTERVAL_PROPERTY, ""); + p.setProperty(TimeSeriesWorkload.QUERY_TIMESPAN_PROPERTY, "60"); + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + + p = getUTProperties(); + p.setProperty(CoreWorkload.DATA_INTEGRITY_PROPERTY, "true"); + p.setProperty(TimeSeriesWorkload.VALUE_TYPE_PROPERTY, "integers"); + p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "true"); + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + + p.setProperty(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false"); + p.setProperty(TimeSeriesWorkload.INSERT_START_PROPERTY, ""); + try { + getWorkload(p, true); + fail("Expected WorkloadException"); + } catch (WorkloadException e) { } + } + + /** Helper method that generates unit testing defaults for the properties map */ + private Properties getUTProperties() { + final Properties p = new Properties(); + p.put(Client.RECORD_COUNT_PROPERTY, "10"); + p.put(CoreWorkload.FIELD_COUNT_PROPERTY, "2"); + p.put(CoreWorkload.FIELD_LENGTH_PROPERTY, "4"); + p.put(TimeSeriesWorkload.TAG_KEY_LENGTH_PROPERTY, "2"); + p.put(TimeSeriesWorkload.TAG_VALUE_LENGTH_PROPERTY, "4"); + p.put(TimeSeriesWorkload.TAG_COUNT_PROPERTY, "2"); + p.put(TimeSeriesWorkload.TAG_CARDINALITY_PROPERTY, "1,2"); + p.put(CoreWorkload.INSERT_START_PROPERTY, "1451606400"); + p.put(TimeSeriesWorkload.DELAYED_SERIES_PROPERTY, "0"); + p.put(TimeSeriesWorkload.RANDOMIZE_TIMESERIES_ORDER_PROPERTY, "false"); + return p; + } + + /** Helper to setup the workload for testing. */ + private TimeSeriesWorkload getWorkload(final Properties p, final boolean init) + throws WorkloadException { + Measurements.setProperties(p); + if (!init) { + return new TimeSeriesWorkload(); + } else { + final TimeSeriesWorkload workload = new TimeSeriesWorkload(); + workload.init(p); + return workload; + } + } + + static class MockDB extends DB { + final List keys = new ArrayList(); + final List> values = + new ArrayList>(); + + @Override + public Status read(String table, String key, Set fields, + Map result) { + return Status.OK; + } + + @Override + public Status scan(String table, String startkey, int recordcount, + Set fields, Vector> result) { + // TODO Auto-generated method stub + return Status.OK; + } + + @Override + public Status update(String table, String key, + Map values) { + // TODO Auto-generated method stub + return Status.OK; + } + + @Override + public Status insert(String table, String key, + Map values) { + keys.add(key); + this.values.add(values); + return Status.OK; + } + + @Override + public Status delete(String table, String key) { + // TODO Auto-generated method stub + return Status.OK; + } + + public void dumpStdout() { + for (int i = 0; i < keys.size(); i++) { + System.out.print("[" + i + "] Key: " + keys.get(i) + " Values: {"); + int x = 0; + for (final Entry entry : values.get(i).entrySet()) { + if (x++ > 0) { + System.out.print(", "); + } + System.out.print("{" + entry.getKey() + " => "); + if (entry.getKey().equals("YCSBV")) { + System.out.print(new String(Utils.bytesToDouble(entry.getValue().toArray()) + "}")); + } else if (entry.getKey().equals("YCSBTS")) { + System.out.print(new String(Utils.bytesToLong(entry.getValue().toArray()) + "}")); + } else { + System.out.print(new String(entry.getValue().toArray()) + "}"); + } + } + System.out.println("}"); + } + } + } +} \ No newline at end of file diff --git a/workloads/tsworkload_template b/workloads/tsworkload_template new file mode 100644 index 0000000000..9b79a0096f --- /dev/null +++ b/workloads/tsworkload_template @@ -0,0 +1,283 @@ +# Copyright (c) 2017 YCSB contributors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. + +# Yahoo! Cloud System Benchmark +# Time Series Workload Template: Default Values +# +# File contains all properties that can be set to define a +# YCSB session. All properties are set to their default +# value if one exists. If not, the property is commented +# out. When a property has a finite number of settings, +# the default is enabled and the alternates are shown in +# comments below it. +# +# Use of each property is explained through comments in Client.java, +# CoreWorkload.java, TimeSeriesWorkload.java or on the YCSB wiki page: +# https://github.com/brianfrankcooper/YCSB/wiki/TimeSeriesWorkload + +# The name of the workload class to use. Always the following. +workload=com.yahoo.ycsb.workloads.TimeSeriesWorkload + +# The default is Java's Long.MAX_VALUE. +# The number of records in the table to be inserted in +# the load phase or the number of records already in the +# table before the run phase. +recordcount=1000000 + +# There is no default setting for operationcount but it is +# required to be set. +# The number of operations to use during the run phase. +operationcount=3000000 + +# The number of insertions to do, if different from recordcount. +# Used with insertstart to grow an existing table. +#insertcount= + +# ..::NOTE::.. This is different from the CoreWorkload! +# The starting timestamp of a run as a Unix Epoch numeral in the +# unit set in 'timestampunits'. This is used to determine what +# the first timestamp should be when writing or querying as well +# as how many offsets (based on 'timestampinterval'). +#insertstart= + +# The units represented by the 'insertstart' timestamp as well as +# durations such as 'timestampinterval', 'querytimespan', etc. +# For values, see https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html +# Note that only seconds through nanoseconds are supported. +timestampunits=SECONDS + +# The amount of time between each value in every time series in +# the units of 'timestampunits'. +timestampinterval=60 + +# ..::NOTE::.. This is different from the CoreWorkload! +# Represents the number of unique "metrics" or "keys" for time series. +# E.g. "sys.cpu" may be a single field or "metric" while there may be many +# time series sharing that key (perhaps a host tag with "web01" and "web02" +# as options). +fieldcount=16 + +# The number of characters in the "metric" or "key". +fieldlength=8 + +# --- TODO ---? +# The distribution used to choose the length of a field +fieldlengthdistribution=constant +#fieldlengthdistribution=uniform +#fieldlengthdistribution=zipfian + +# The number of unique tag combinations for each time series. E.g +# if this value is 4, each record will have a key and 4 tag combinations +# such as A=A, B=A, C=A, D=A. +tagcount=4 + +# The cardinality (number of unique values) of each tag value for +# every "metric" or field as a comma separated list. Each value must +# be a number from 1 to Java's Integer.MAX_VALUE and there must be +# 'tagcount' values. If there are more or fewer values than +#'tagcount' then either it is ignored or 1 is substituted respectively. +tagcardinality=1,2,4,8 + +# The length of each tag key in characters. +tagkeylength=8 + +# The length of each tag value in characters. +tagvaluelength=8 + +# The character separating tag keys from tag values when reads, deletes +# or scans are executed against a database. The default is the equals sign +# so a field passed in a read to a DB may look like 'AA=AB'. +tagpairdelimiter== + +# The delimiter between keys and tags when a delete is passed to the DB. +# E.g. if there was a key and a field, the request key would look like: +# 'AA:AA=AB' +deletedelimiter=: + +# Whether or not to randomize the timestamp order when performing inserts +# and updates against a DB. By default all writes perform with the +# timestamps moving linearly forward in time once all time series for a +# given key have been written. +randomwritetimestamporder=false + +# Whether or not to randomly shuffle the time series order when writing. +# This will shuffle the keys, tag keys and tag values. +# ************************************************************************ +# WARNING - When this is enabled, reads and scans will likely return many +# empty results as invalid tag combinations will be chosen. Likewise +# this setting is INCOMPATIBLE with data integrity checks. +# ************************************************************************ +randomtimeseriesorder=false + +# The type of numerical data generated for each data point. The values are +# 64 bit signed integers, double precision floating points or a random mix. +# For data integrity, this setting is ignored and values are switched to +# 64 bit signed ints. +#valuetype=integers +valuetype=floats +#valuetype=mixed + +# A value from 0 to 0.999999 representing how sparse each time series +# should be. The higher this value, the greater the time interval between +# values in a single series. For example, if sparsity is 0 and there are +# 10 time series with a 'timestampinterval' of 60 seconds with a total +# time range of 10 intervals, you would see 100 values written, one per +# timestamp interval per time series. If the sparsity is 0.50 then there +# would be only about 50 values written so some time series would have +# missing values at each interval. +sparsity=0.00 + +# The percentage of time series that are "lagging" behind the current +# timestamp of the writer. This is used to mimic a common behavior where +# most sources (agents, sensors, etc) are writing data in sync (same timestamp) +# but a subset are running behind due to buffering, latency issues, etc. +delayedSeries=0.10 + +# The maximum amount of delay for delayed series in interval counts. The +# actual delay is chosen based on a modulo of the series index. +delayedIntervals=5 + +# The fixed or maximum amount of time added to the start time of a +# read or scan operation to generate a query over a range of time +# instead of a single timestamp. Units are shared with 'timestampunits'. +# For example if the value is set to 3600 seconds (1 hour) then +# each read would pick a random start timestamp based on the +#'insertstart' value and number of intervals, then add 3600 seconds +# to create the end time of the query. If this value is 0 then reads +# will only provide a single timestamp. +# WARNING: Cannot be used with 'dataintegrity'. +querytimespan=0 + +# Whether or not reads should choose a random time span (aligned to +# the 'timestampinterval' value) for each read or scan request starting +# at 0 and reaching 'querytimespan' as the max. +queryrandomtimespan=false + +# A delimiter character used to separate the start and end timestamps +# of a read query when 'querytimespan' is enabled. +querytimespandelimiter=, + +# A unique key given to read, scan and delete operations when the +# operation should perform a group-by (multi-series aggregation) on one +# or more tags. If 'groupbyfunction' is set, this key will be given with +# the configured function. +groupbykey=YCSBGB + +# A function name (e.g. 'sum', 'max' or 'avg') passed during reads, +# scans and deletes to cause the database to perform a group-by +# operation on one or more tags. If this value is empty or null +# (default), group-by operations are not performed +#groupbyfunction= + +# A comma separated list of 0s or 1s to denote which of the tag keys +# should be grouped during group-by operations. The number of values +# must match the number of tags in 'tagcount'. +#groupbykeys=0,0,1,1 + +# A unique key given to read and scan operations when the operation +# should downsample the results of a query into lower resolution +# data. If 'downsamplingfunction' is set, this key will be given with +# the configured function. +downsamplingkey=YCSBDS + +# A function name (e.g. 'sum', 'max' or 'avg') passed during reads and +# scans to cause the database to perform a downsampling operation +# returning lower resolution data. If this value is empty or null +# (default), downsampling is not performed. +#downsamplingfunction= + +# A time interval for which to downsample the raw data into. Shares +# the same units as 'timestampinterval'. This value must be greater +# than 'timestampinterval'. E.g. if the timestamp interval for raw +# data is 60 seconds, the downsampling interval could be 3600 seconds +# to roll up the data into 1 hour buckets. +#downsamplinginterval= + +# What proportion of operations are reads +readproportion=0.10 + +# What proportion of operations are updates +updateproportion=0.00 + +# What proportion of operations are inserts +insertproportion=0.90 + +# The distribution of requests across the keyspace +requestdistribution=zipfian +#requestdistribution=uniform +#requestdistribution=latest + +# The name of the database table to run queries against +table=usertable + +# Whether or not data should be validated during writes and reads. If +# set then the data type is always a 64 bit signed integer and is the +# hash code of the key, timestamp and tags. +dataintegrity=false + +# How the latency measurements are presented +measurementtype=histogram +#measurementtype=timeseries +#measurementtype=raw +# When measurementtype is set to raw, measurements will be output +# as RAW datapoints in the following csv format: +# "operation, timestamp of the measurement, latency in us" +# +# Raw datapoints are collected in-memory while the test is running. Each +# data point consumes about 50 bytes (including java object overhead). +# For a typical run of 1 million to 10 million operations, this should +# fit into memory most of the time. If you plan to do 100s of millions of +# operations per run, consider provisioning a machine with larger RAM when using +# the RAW measurement type, or split the run into multiple runs. +# +# Optionally, you can specify an output file to save raw datapoints. +# Otherwise, raw datapoints will be written to stdout. +# The output file will be appended to if it already exists, otherwise +# a new output file will be created. +#measurement.raw.output_file = /tmp/your_output_file_for_this_run + +# JVM Reporting. +# +# Measure JVM information over time including GC counts, max and min memory +# used, max and min thread counts, max and min system load and others. This +# setting must be enabled in conjunction with the "-s" flag to run the status +# thread. Every "status.interval", the status thread will capture JVM +# statistics and record the results. At the end of the run, max and mins will +# be recorded. +# measurement.trackjvm = false + +# The range of latencies to track in the histogram (milliseconds) +histogram.buckets=1000 + +# Granularity for time series (in milliseconds) +timeseries.granularity=1000 + +# Latency reporting. +# +# YCSB records latency of failed operations separately from successful ones. +# Latency of all OK operations will be reported under their operation name, +# such as [READ], [UPDATE], etc. +# +# For failed operations: +# By default we don't track latency numbers of specific error status. +# We just report latency of all failed operation under one measurement name +# such as [READ-FAILED]. But optionally, user can configure to have either: +# 1. Record and report latency for each and every error status code by +# setting reportLatencyForEachError to true, or +# 2. Record and report latency for a select set of error status codes by +# providing a CSV list of Status codes via the "latencytrackederrors" +# property. +# reportlatencyforeacherror=false +# latencytrackederrors="" diff --git a/workloads/tsworkloada b/workloads/tsworkloada new file mode 100644 index 0000000000..868007f2fc --- /dev/null +++ b/workloads/tsworkloada @@ -0,0 +1,46 @@ +# Copyright (c) 2017 YCSB contributors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. + +# Yahoo! Cloud System Benchmark +# Workload A: Small cardinality consistent data for 2 days +# Application example: Typical monitoring of a single compute or small +# sensor station where 90% of the load is write and only 10% is read +# (it's usually much less). All writes are inserts. No sparsity so +# every series will have a value at every timestamp. +# +# Read/insert ratio: 10/90 +# Cardinality: 16 per key (field), 64 fields for a total of 1,024 +# time series. +workload=com.yahoo.ycsb.workloads.TimeSeriesWorkload + +recordcount=1474560 +operationcount=2949120 + +fieldlength=8 +fieldcount=64 +tagcount=4 +tagcardinality=1,2,4,2 + +sparsity=0.0 +delayedSeries=0.0 +delayedIntervals=0 + +timestampunits=SECONDS +timestampinterval=60 +querytimespan=3600 + +readproportion=0.10 +updateproportion=0.00 +insertproportion=0.90