Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to blocking batch updates #152

Merged
merged 1 commit into from
Jan 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions aerospike-storage-backend/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,6 @@
<artifactId>aerospike-client</artifactId>
</dependency>

<dependency>
<groupId>com.aerospike</groupId>
<artifactId>aerospike-reactor-client</artifactId>
<exclusions>
<!-- take last version from aerospike-batch-updater -->
<exclusion>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Map;

import static com.playtika.janusgraph.aerospike.operations.AerospikeOperations.getValue;
import static com.playtika.janusgraph.aerospike.util.ReactorUtil.block;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -81,14 +80,14 @@ public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery
logger.trace("getSlice({}, tx:{}, {}, start:{}, end:{})",
storeName, txh, keys, query.getSliceStart(), query.getSliceEnd());

return readOperations.getSlice(storeName, keys, query, txh);
return readOperations.getSlice(storeName, keys, query);
}

@Override
public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException{
logger.trace("getSlice({}, tx:{}, {})", storeName, txh, query);

return block(readOperations.getSlice(storeName, query, txh)).getValue();
return readOperations.getSlice(storeName, query);
}

@Override
Expand All @@ -102,7 +101,7 @@ public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> d

//no need in transactional logic
if(transaction.getLocks().isEmpty()){
block(mutateOperations.mutate(storeName, keyValue, mutationMap));
mutateOperations.mutate(storeName, keyValue, mutationMap);
return;
}

Expand All @@ -121,10 +120,14 @@ public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> d
Map<String, Map<Value, Map<Value, Value>>> mutationsByStore = singletonMap(storeName,
singletonMap(keyValue, mutationMap));

block(batchUpdater.update(new BatchUpdate(
new BatchLocks(locksByStore, aerospikeOperations),
new BatchUpdates(mutationsByStore)))
.onErrorMap(ErrorMapper.INSTANCE));
try {
batchUpdater.update(new BatchUpdate(
new BatchLocks(locksByStore, aerospikeOperations),
new BatchUpdates(mutationsByStore)));
} catch (Throwable t) {
throw ErrorMapper.INSTANCE.apply(t);
}

transaction.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static com.playtika.janusgraph.aerospike.operations.AerospikeOperations.getValue;
import static com.playtika.janusgraph.aerospike.util.AerospikeUtils.isEmptyNamespace;
import static com.playtika.janusgraph.aerospike.util.AerospikeUtils.truncateNamespace;
import static com.playtika.janusgraph.aerospike.util.ReactorUtil.block;
import static java.util.Collections.emptyMap;
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.BUFFER_SIZE;

Expand Down Expand Up @@ -136,10 +135,13 @@ public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, St
transaction.getTransactionValidator().accept(locksByStore, mutationsByStore);
}

block(operations.batchUpdater().update(new BatchUpdate(
new BatchLocks(locksByStore, operations.getAerospikeOperations()),
new BatchUpdates(mutationsByStore)))
.onErrorMap(ErrorMapper.INSTANCE));
try {
operations.batchUpdater().update(new BatchUpdate(
new BatchLocks(locksByStore, operations.getAerospikeOperations()),
new BatchUpdates(mutationsByStore)));
} catch (Throwable t) {
throw ErrorMapper.INSTANCE.apply(t);
}
transaction.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ public class ConfigOptions {

public static final ConfigOption<Integer> AEROSPIKE_CONNECTIONS_PER_NODE= new ConfigOption<>(STORAGE_NS,
"aerospike-connections-per-node-parallelism", "Limits how many connections aerospike can hold per node",
ConfigOption.Type.LOCAL, 300);
ConfigOption.Type.LOCAL, 100);

public static final ConfigOption<Integer> PARALLEL_READ_THRESHOLD = new ConfigOption<>(STORAGE_NS,
"parallel-read-threshold", "Number of keys when we should start run reads in parallel",
ConfigOption.Type.LOCAL, 3);

public static final ConfigOption<Integer> AEROSPIKE_READ_TIMEOUT = new ConfigOption<>(STORAGE_NS,
"aerospike-read-timeout", "Total transaction timeout in milliseconds to aerospike read operations." +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,22 @@
package com.playtika.janusgraph.aerospike.operations;

import com.aerospike.client.AerospikeClient;
import com.aerospike.client.AerospikeException;
import com.aerospike.client.Host;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Value;
import com.aerospike.client.async.EventLoops;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.reactor.AerospikeReactorClient;
import com.aerospike.client.reactor.IAerospikeReactorClient;
import com.aerospike.client.reactor.retry.AerospikeReactorRetryClient;
import com.playtika.janusgraph.aerospike.AerospikePolicyProvider;
import com.playtika.janusgraph.aerospike.util.NamedThreadFactory;
import org.janusgraph.diskstorage.StaticBuffer;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_HOSTS;
Expand All @@ -39,27 +31,27 @@ public class AerospikeOperations {
private final String namespace;
private final String graphPrefix;
private final IAerospikeClient client;
private final ExecutorService aerospikeExecutor;

private IAerospikeReactorClient reactorClient;
private final AerospikePolicyProvider aerospikePolicyProvider;

public AerospikeOperations(String graphPrefix, String namespace,
IAerospikeClient client,
IAerospikeReactorClient reactorClient,
AerospikePolicyProvider aerospikePolicyProvider) {
AerospikePolicyProvider aerospikePolicyProvider,
ExecutorService aerospikeExecutor) {
this.graphPrefix = graphPrefix+".";
this.namespace = namespace;
this.client = client;
this.reactorClient = reactorClient;
this.aerospikeExecutor = aerospikeExecutor;
this.aerospikePolicyProvider = aerospikePolicyProvider;
}

public IAerospikeClient getClient() {
return client;
}

public IAerospikeReactorClient getReactorClient() {
return reactorClient;
public ExecutorService getAerospikeExecutor() {
return aerospikeExecutor;
}

public String getNamespace() {
Expand Down Expand Up @@ -95,46 +87,6 @@ public void close() {
aerospikePolicyProvider.close();
}

public static IAerospikeReactorClient buildAerospikeReactorClient(
IAerospikeClient aerospikeClient, EventLoops eventLoops){
return new AerospikeReactorRetryClient(
new AerospikeReactorClient(aerospikeClient, eventLoops),
retryOnNoMoreConnections());
}

//TODO Move to aerospike reactor client
public static final int BACKOFF_NANOS = 100;

public static Function<Flux<Throwable>, ? extends Publisher<?>> retryOnNoMoreConnections() {
return retryOn((throwable) -> throwable instanceof AerospikeException.Connection && ((AerospikeException.Connection)throwable).getResultCode() == -7,
BACKOFF_NANOS);
}

private static final Duration NEGATIVE_DURATION = Duration.ofSeconds(-1);

public static Function<Flux<Throwable>, ? extends Publisher<?>> retryOn(Predicate<Throwable> retryOn, int backoffNanos) {
AtomicLong backOff = new AtomicLong();
return retry((throwable, integer) -> retryOn.test(throwable)
? Duration.ofNanos(backOff.addAndGet(backoffNanos)) : NEGATIVE_DURATION);
}

public static Function<Flux<Throwable>, ? extends Publisher<?>> retry(BiFunction<Throwable, Integer, Duration> retryDelay) {
return (throwableFlux) -> {
return throwableFlux.zipWith(Flux.range(1, 2147483647), (error, index) -> {
Duration delay = retryDelay.apply(error, index);
if (delay.isNegative()) {
throw Exceptions.propagate(error);
} else {
return Tuples.of(delay, error);
}
}).concatMap((tuple2) -> {
return !tuple2.getT1().isZero() ? Mono.delay(tuple2.getT1()).map((time) -> {
return tuple2.getT2();
}) : Mono.just(tuple2.getT2());
});
};
}

public static IAerospikeClient buildAerospikeClient(Configuration configuration, ClientPolicy clientPolicy) {
int port = configuration.has(STORAGE_PORT) ? configuration.get(STORAGE_PORT) : DEFAULT_PORT;

Expand All @@ -144,4 +96,10 @@ public static IAerospikeClient buildAerospikeClient(Configuration configuration,
return new AerospikeClient(clientPolicy, hosts);
}

public static ExecutorService executorService(int maxThreads){
return new ThreadPoolExecutor(0, maxThreads,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("janus-aerospike", "janus-aerospike"));
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.playtika.janusgraph.aerospike.operations;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.MapOperation;
Expand All @@ -11,9 +13,7 @@
import com.aerospike.client.cdt.MapReturnType;
import com.aerospike.client.cdt.MapWriteMode;
import com.aerospike.client.policy.WritePolicy;
import com.aerospike.client.reactor.IAerospikeReactorClient;
import com.playtika.janusgraph.aerospike.AerospikePolicyProvider;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -28,7 +28,7 @@ public class BasicMutateOperations implements MutateOperations{

private final WritePolicy mutatePolicy;
private final WritePolicy deletePolicy;
private AerospikeOperations aerospikeOperations;
private final AerospikeOperations aerospikeOperations;

public BasicMutateOperations(AerospikeOperations aerospikeOperations) {
this.aerospikeOperations = aerospikeOperations;
Expand All @@ -39,7 +39,7 @@ public BasicMutateOperations(AerospikeOperations aerospikeOperations) {
}

@Override
public Mono<Void> mutate(String storeName, Value key, Map<Value, Value> mutation) {
public void mutate(String storeName, Value key, Map<Value, Value> mutation) {
Key aerospikeKey = aerospikeOperations.getKey(storeName, key);
List<Operation> operations = new ArrayList<>(3);
List<Value> keysToRemove = new ArrayList<>(mutation.size());
Expand Down Expand Up @@ -68,20 +68,22 @@ public Mono<Void> mutate(String storeName, Value key, Map<Value, Value> mutation
entriesNoOperationIndex = -1;
}

IAerospikeReactorClient client = aerospikeOperations.getReactorClient();
return client.operate(mutatePolicy, aerospikeKey, operations.toArray(new Operation[0]))
.onErrorResume(throwable -> throwable instanceof AerospikeException
&& ((AerospikeException)throwable).getResultCode() == ResultCode.KEY_NOT_FOUND_ERROR,
throwable -> Mono.empty())
.flatMap(keyRecord -> {
if(entriesNoOperationIndex != -1){
long entriesNoAfterMutation = (Long)keyRecord.record.getList(ENTRIES_BIN_NAME).get(entriesNoOperationIndex);
if(entriesNoAfterMutation == 0){
return client.delete(deletePolicy, aerospikeKey);
}
}
return Mono.empty();
}).then();
IAerospikeClient client = aerospikeOperations.getClient();
try {
Record record = client.operate(mutatePolicy, aerospikeKey, operations.toArray(new Operation[0]));

if(entriesNoOperationIndex != -1){
long entriesNoAfterMutation = (Long)record.getList(ENTRIES_BIN_NAME).get(entriesNoOperationIndex);
if(entriesNoAfterMutation == 0){
client.delete(deletePolicy, aerospikeKey);
}
}

} catch (AerospikeException ae) {
if(ae.getResultCode() != ResultCode.KEY_NOT_FOUND_ERROR){
throw ae;
}
}
}

private static WritePolicy buildMutationPolicy(AerospikePolicyProvider policyProvider){
Expand Down
Loading