Skip to content

Commit

Permalink
added new tests
Browse files Browse the repository at this point in the history
  • Loading branch information
skarpenko authored and kptfh committed Mar 12, 2019
1 parent 1e8d7c5 commit 2e745ab
Show file tree
Hide file tree
Showing 16 changed files with 242 additions and 137 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# gzip-reactive
Reactive gzip encoder/decoder
# aerospike storage backend
Aerospike storage backend for Janusgraph
Original file line number Diff line number Diff line change
Expand Up @@ -119,21 +119,14 @@ public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws Back
@Override
public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
AerospikeTransaction transaction = (AerospikeTransaction)txh;
for(AerospikeLock lock : transaction.getLocks()){
if(lock.key.compareTo(key) != 0){
throw new IllegalArgumentException("Unexpected lock in direct mutation transaction:" + lock);
}
}
StoreLocks locks = new StoreLocks(name, transaction.getLocks());

AerospikeLocks locks = new AerospikeLocks(transaction.getLocks().size());
locks.addLocks(transaction.getLocks());
lockOperations.acquireLocks(locks.locksMap);

lockOperations.acquireLocks(locks.getLocksMap());

mutate(key, additions, deletions, false);
mutate(key, additions, deletions);
}

void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, boolean andUnlock) {
void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions) {
List<Operation> operations = new ArrayList<>(3);

if(!deletions.isEmpty()) {
Expand All @@ -158,9 +151,7 @@ void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletion
operations.add(MapOperation.size(ENTRIES_BIN_NAME));
}

if(andUnlock){
operations.add(UNLOCK_OPERATION);
}
operations.add(UNLOCK_OPERATION);

Key aerospikeKey = getKey(key);
Record record = client.operate(null, aerospikeKey, operations.toArray(new Operation[0]));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,22 @@
public class AerospikeStoreManager extends AbstractStoreManager implements KeyColumnValueStoreManager {

private static final int DEFAULT_PORT = 3000;
static final int AEROSPIKE_BUFFER_SIZE = Integer.MAX_VALUE / 2;

private final StoreFeatures features;

private final AerospikeClient client;

private final Configuration configuration;
private final boolean useLocking;

private final ThreadPoolExecutor scanExecutor = new ThreadPoolExecutor(0, 1,
1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
private final ThreadPoolExecutor scanExecutor;

private final ThreadPoolExecutor aerospikeExecutor = new ThreadPoolExecutor(4, 40,
1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
private final ThreadPoolExecutor aerospikeExecutor;

public AerospikeStoreManager(Configuration configuration) {
super(configuration);

Preconditions.checkArgument(configuration.get(BUFFER_SIZE) == Integer.MAX_VALUE,
Preconditions.checkArgument(configuration.get(BUFFER_SIZE) == AEROSPIKE_BUFFER_SIZE,
"Set unlimited buffer size as we use deferred locking approach");

int port = storageConfig.has(STORAGE_PORT) ? storageConfig.get(STORAGE_PORT) : DEFAULT_PORT;
Expand All @@ -63,13 +61,16 @@ public AerospikeStoreManager(Configuration configuration) {
client = new AerospikeClient(clientPolicy, hosts);

this.configuration = configuration;
this.useLocking = configuration.get(USE_LOCKING);

features = new StandardStoreFeatures.Builder()
.keyConsistent(configuration)
.persists(true)
.locking(useLocking)
.optimisticLocking(true) //caused by deferred locking, actual locking happens just before transaction commit
//here we promise to take care of locking.
//If false janusgraph will do it via ExpectedValueCheckingStoreManager that is less effective
.locking(true)
//caused by deferred locking approach used in this storage backend,
//actual locking happens just before transaction commit
.optimisticLocking(true)
.distributed(true)
.multiQuery(true)
.batchMutation(true)
Expand All @@ -83,6 +84,12 @@ public AerospikeStoreManager(Configuration configuration) {
.build();

registerUdfs(client);

scanExecutor = new ThreadPoolExecutor(0, configuration.get(SCAN_PARALLELISM),
1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());

aerospikeExecutor = new ThreadPoolExecutor(4, configuration.get(AEROSPIKE_PARALLELISM),
1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
}

static void registerUdfs(AerospikeClient client){
Expand All @@ -109,7 +116,7 @@ public StoreTransaction beginTransaction(final BaseTransactionConfig config) {

@Override
public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
Map<String, AerospikeLocks> locksByStore = acquireLocks(((AerospikeTransaction) txh).getLocks(), mutations);
List<StoreLocks> locksByStore = acquireLocks(((AerospikeTransaction) txh).getLocks());

try {
Map<String, Set<StaticBuffer>> mutatedByStore = mutateMany(mutations);
Expand All @@ -120,15 +127,15 @@ public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, St
}
}

private Map<String, Set<StaticBuffer>> mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations) {
private Map<String, Set<StaticBuffer>> mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations) throws PermanentBackendException {

List<CompletableFuture<?>> futures = new ArrayList<>();
Map<String, Set<StaticBuffer>> mutatedByStore = new ConcurrentHashMap<>();

mutations.forEach((storeName, entry) -> {
final AerospikeKeyColumnValueStore store = openDatabase(storeName);
entry.forEach((key, mutation) -> futures.add(runAsync(() -> {
store.mutate(key, mutation.getAdditions(), mutation.getDeletions(), useLocking);
store.mutate(key, mutation.getAdditions(), mutation.getDeletions());
mutatedByStore.compute(storeName, (s, keys) -> {
Set<StaticBuffer> keysResult = keys != null ? keys : new HashSet<>();
keysResult.add(key);
Expand All @@ -142,49 +149,31 @@ private Map<String, Set<StaticBuffer>> mutateMany(Map<String, Map<StaticBuffer,
return mutatedByStore;
}

private Map<String, AerospikeLocks> acquireLocks(List<AerospikeLock> locks, Map<String, Map<StaticBuffer, KCVMutation>> mutations) throws BackendException {
private List<StoreLocks> acquireLocks(List<AerospikeLock> locks) throws BackendException {
Map<String, List<AerospikeLock>> locksByStore = locks.stream()
.collect(Collectors.groupingBy(lock -> lock.storeName));
Map<String, AerospikeLocks> locksAllByStore = new HashMap<>(locksByStore.size());
List<StoreLocks> locksAllByStore = new ArrayList<>(locksByStore.size());
for(Map.Entry<String, List<AerospikeLock>> entry : locksByStore.entrySet()){
String storeName = entry.getKey();
List<AerospikeLock> locksForStore = entry.getValue();
Map<StaticBuffer, KCVMutation> mutationsForStore = mutations.getOrDefault(storeName, emptyMap());
AerospikeLocks locksAll = new AerospikeLocks(locksForStore.size() + mutationsForStore.size());
locksAll.addLocks(locksForStore);
if (useLocking) {
locksAll.addLockOnKeys(mutationsForStore.keySet());
}

StoreLocks storeLocks = new StoreLocks(storeName, locksForStore);
final AerospikeKeyColumnValueStore store = openDatabase(storeName);
store.getLockOperations().acquireLocks(locksAll.getLocksMap());
locksAllByStore.put(storeName, locksAll);
store.getLockOperations().acquireLocks(storeLocks.locksMap);
locksAllByStore.add(storeLocks);
}
return locksAllByStore;
}

private void releaseLocks(Map<String, AerospikeLocks> locksByStore, Map<String, Set<StaticBuffer>> mutatedByStore){
locksByStore.forEach((storeName, locksForStore) -> {
final AerospikeKeyColumnValueStore store = openDatabase(storeName);
Set<StaticBuffer> mutatedForStore = mutatedByStore.get(storeName);
List<StaticBuffer> keysToRelease = locksForStore.getLocksMap().keySet().stream()
private void releaseLocks(List<StoreLocks> locksByStore, Map<String, Set<StaticBuffer>> mutatedByStore) throws PermanentBackendException {
for(StoreLocks storeLocks : locksByStore){
final AerospikeKeyColumnValueStore store = openDatabase(storeLocks.storeName);
Set<StaticBuffer> mutatedForStore = mutatedByStore.get(storeLocks.storeName);
List<StaticBuffer> keysToRelease = storeLocks.locksMap.keySet().stream()
//ignore mutated keys as they already have been released
.filter(key -> !mutatedForStore.contains(key))
.collect(Collectors.toList());
store.getLockOperations().releaseLockOnKeys(keysToRelease);
});
}

//called from AerospikeTransaction
void releaseLocks(List<AerospikeLock> locks){
Map<String, List<AerospikeLock>> locksByStore = locks.stream()
.collect(Collectors.groupingBy(lock -> lock.storeName));
locksByStore.forEach((storeName, locksForStore) -> {
AerospikeLocks locksAll = new AerospikeLocks(locksForStore.size());
locksAll.addLocks(locksForStore);
final AerospikeKeyColumnValueStore store = openDatabase(storeName);
store.getLockOperations().releaseLockOnKeys(locksAll.getLocksMap().keySet());
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ public class ConfigOptions {
public static final ConfigOption<String> NAMESPACE = new ConfigOption<>(STORAGE_NS,
"namespace", "Aerospike namespace to use", ConfigOption.Type.LOCAL, "test");

public static final ConfigOption<Boolean> USE_LOCKING = new ConfigOption<>(STORAGE_NS,
"pessimistic_locking", "Whether to use pessimistic locking", ConfigOption.Type.LOCAL, Boolean.TRUE);


public static final ConfigOption<Integer> LOCK_TTL = new ConfigOption<>(STORAGE_NS,
"lock_ttl", "How long to keep locks (in seconds)", ConfigOption.Type.LOCAL, 5);

public static final ConfigOption<Boolean> ALLOW_SCAN = new ConfigOption<>(STORAGE_NS,
"allow_scan", "Whether to allow scans on graph. Can't be changed after graph creation", ConfigOption.Type.LOCAL, false);

public static final ConfigOption<Integer> SCAN_PARALLELISM = new ConfigOption<>(STORAGE_NS,
"scan_parallelism", "How many threads may perform scan operations simultaneously", ConfigOption.Type.LOCAL, 1);

public static final ConfigOption<Integer> AEROSPIKE_PARALLELISM = new ConfigOption<>(STORAGE_NS,
"aerospike_parallelism", "Limits how many parallel calls allowed to aerospike", ConfigOption.Type.LOCAL, 100);

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.playtika.janusgraph.aerospike;

import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;

import java.util.Collection;
Expand All @@ -11,5 +12,5 @@ interface LockOperations {

void acquireLocks(Map<StaticBuffer, List<AerospikeLock>> locks) throws BackendException;

void releaseLockOnKeys(Collection<StaticBuffer> keys);
void releaseLockOnKeys(Collection<StaticBuffer> keys) throws PermanentBackendException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
import org.janusgraph.diskstorage.BackendException;
import org.janusgraph.diskstorage.PermanentBackendException;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.TemporaryBackendException;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.locking.PermanentLockingException;
import org.janusgraph.diskstorage.locking.TemporaryLockingException;

import java.util.*;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -73,9 +74,9 @@ public void acquireLocks(Map<StaticBuffer, List<AerospikeLock>> locks) throws Ba
allOf(futures);

if(lockResults.keySet().contains(CHECK_FAILED)){
throw new PermanentBackendException("Some pre-lock checks failed:"+lockResults.keySet());
throw new PermanentLockingException("Some pre-lock checks failed:"+lockResults.keySet());
} else if(lockResults.keySet().contains(ALREADY_LOCKED)){
throw new TemporaryBackendException("Some locks not released yet:"+lockResults.keySet());
throw new TemporaryLockingException("Some locks not released yet:"+lockResults.keySet());
}

} catch (Throwable t){
Expand Down Expand Up @@ -107,25 +108,18 @@ enum LockResult {
}

@Override
public void releaseLockOnKeys(Collection<StaticBuffer> keys) {
public void releaseLockOnKeys(Collection<StaticBuffer> keys) throws PermanentBackendException {
releaseLocks(keys.stream()
.map(store::getKey)
.collect(Collectors.toList())
);
}

private void releaseLocks(List<Key> keys) {
if(keys != null) {
private void releaseLocks(List<Key> keys) throws PermanentBackendException {
if(keys != null && !keys.isEmpty()) {
List<CompletableFuture<?>> futures = new ArrayList<>();
keys.forEach(key -> futures.add(runAsync(() -> {
try {
client.operate(null, key, UNLOCK_OPERATION);
} catch (AerospikeException e) {
if(e.getResultCode() != ResultCode.KEY_NOT_FOUND_ERROR){
throw e;
}
}
})));
keys.forEach(key -> futures.add(runAsync(
() -> client.operate(null, key, UNLOCK_OPERATION))));
allOf(futures);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.playtika.janusgraph.aerospike;

import org.janusgraph.diskstorage.StaticBuffer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Collections.singletonList;
import static java.util.Collections.unmodifiableMap;

public class StoreLocks {

public final String storeName;
public final Map<StaticBuffer, List<AerospikeLock>> locksMap;

public StoreLocks(String storeName, List<AerospikeLock> locks) {
this.storeName = storeName;
Map<StaticBuffer, List<AerospikeLock>> locksMap = new HashMap<>(locks.size());

for(AerospikeLock lock : locks){
locksMap.compute(lock.key, (key, locksForKey) -> {
if(locksForKey == null || locksForKey.isEmpty()){
return singletonList(lock);
} else if(locksForKey.size() == 1){
List<AerospikeLock> locksForKeyNew = new ArrayList<>(locksForKey);
locksForKeyNew.add(lock);
return locksForKeyNew;
} else {
locksForKey.add(lock);
return locksForKey;
}
});
}

this.locksMap = unmodifiableMap(locksMap);
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
package com.playtika.janusgraph.aerospike.util;

import org.janusgraph.diskstorage.PermanentBackendException;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AsyncUtil {

public static void allOf(List<CompletableFuture<?>> futures){
public static void allOf(List<CompletableFuture<?>> futures) throws PermanentBackendException {
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException();
throw new PermanentBackendException(e);
}
}
}
Loading

0 comments on commit 2e745ab

Please sign in to comment.