Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-123 Handle new behavior of create / drop secondary index #487

Merged
merged 13 commits into from
Feb 20, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@

import org.springframework.dao.InvalidDataAccessResourceUsageException;

/**
* @deprecated since Aerospike Server ver. 6.1.0.1. Creating secondary index no longer throws an exception if the index
* already exists.
* <p>Use {@link org.springframework.data.aerospike.repository.AerospikeRepository#indexExists(String)}
* for secondary index existence checks.</p>
*/
reugn marked this conversation as resolved.
Show resolved Hide resolved
@Deprecated
public class IndexAlreadyExistsException extends InvalidDataAccessResourceUsageException {

public IndexAlreadyExistsException(String msg, Throwable cause) {
super(msg, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.query.ResultSet;
import org.springframework.data.aerospike.IndexAlreadyExistsException;
import org.springframework.data.aerospike.core.model.GroupedEntities;
import org.springframework.data.aerospike.core.model.GroupedKeys;
import org.springframework.data.aerospike.repository.query.Query;
Expand Down Expand Up @@ -481,13 +480,6 @@ <T> void createIndex(Class<T> entityClass, String indexName, String binName,
*
* @param indexName The Aerospike index name. Must not be {@literal null}.
* @return true if exists
* @deprecated This operation is deprecated due to complications that are required for guaranteed index existence
* response.
* <p>If you need to conditionally create index — replace this method (indexExists) with {@link #createIndex} and
* catch {@link IndexAlreadyExistsException}.
* <p>More information can be found at: <a href="https://github.com/aerospike/aerospike-client-java/pull/149">
* https://github.com/aerospike/aerospike-client-java/pull/149</a>
*/
@Deprecated
*/
boolean indexExists(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -82,6 +84,7 @@
@Slf4j
public class AerospikeTemplate extends BaseAerospikeTemplate implements AerospikeOperations {

private static final Pattern INDEX_EXISTS_REGEX_PATTERN = Pattern.compile("^FAIL:(-?\\d+).*$");
private final IAerospikeClient client;
private final QueryEngine queryEngine;
private final IndexRefresher indexRefresher;
Expand Down Expand Up @@ -155,17 +158,42 @@ public <T> void deleteIndex(Class<T> entityClass, String indexName) {
@Override
public boolean indexExists(String indexName) {
Assert.notNull(indexName, "Index name must not be null!");
log.warn("`indexExists` operation is deprecated. Please stop using it as it will be removed " +
"in next major release.");

try {
Node[] nodes = client.getNodes();
Node node = Utils.getRandomNode(nodes);
String response = Info.request(node, "sindex/" + namespace + '/' + indexName);
return !response.startsWith("FAIL:201");
for (Node node : nodes) {
reugn marked this conversation as resolved.
Show resolved Hide resolved
String response = Info.request(node, "sindex-exists:ns=" + namespace + ";indexname=" + indexName);
if (response == null) throw new AerospikeException("Null node response");

if (response.equalsIgnoreCase("true")) {
return true;
} else if (response.equalsIgnoreCase("false")) {
return false;
} else {
Matcher matcher = INDEX_EXISTS_REGEX_PATTERN.matcher(response);
if (matcher.matches()) {
int reason;
try {
reason = Integer.parseInt(matcher.group(1));
} catch (NumberFormatException e) {
throw new AerospikeException("Unexpected node response, unable to parse ResultCode: " +
response);
}

// as for Server ver. >= 6.1.0.1 the response containing ResultCode.INVALID_NAMESPACE
// means that the request should be sent to another node
if (reason != ResultCode.INVALID_NAMESPACE) {
throw new AerospikeException(reason);
}
} else {
throw new AerospikeException("Unexpected node response: " + response);
}
}
}
} catch (AerospikeException e) {
throw translateError(e);
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,12 @@ <T> Mono<Void> createIndex(Class<T> entityClass, String indexName, String binNam
* @param indexName The index name. Must not be {@literal null}.
*/
<T> Mono<Void> deleteIndex(Class<T> entityClass, String indexName);

/**
* Check whether an index with the specified name exists in Aerospike.
*
* @param indexName The Aerospike index name. Must not be {@literal null}.
* @return true if exists.
*/
Mono<Boolean> indexExists(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Bin;
import com.aerospike.client.Info;
import com.aerospike.client.Key;
import com.aerospike.client.Operation;
import com.aerospike.client.Record;
import com.aerospike.client.ResultCode;
import com.aerospike.client.Value;
import com.aerospike.client.cdt.CTX;
import com.aerospike.client.cluster.Node;
Expand Down Expand Up @@ -57,6 +59,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.aerospike.client.ResultCode.KEY_NOT_FOUND_ERROR;
import static java.util.Objects.nonNull;
Expand All @@ -72,6 +76,7 @@
@Slf4j
public class ReactiveAerospikeTemplate extends BaseAerospikeTemplate implements ReactiveAerospikeOperations {

private static final Pattern INDEX_EXISTS_REGEX_PATTERN = Pattern.compile("^FAIL:(-?\\d+).*$");
private final IAerospikeReactorClient reactorClient;
private final ReactorQueryEngine queryEngine;
private final ReactorIndexRefresher reactorIndexRefresher;
Expand Down Expand Up @@ -566,6 +571,47 @@ public <T> Mono<Void> deleteIndex(Class<T> entityClass, String indexName) {
.onErrorMap(this::translateError);
}

@Override
public Mono<Boolean> indexExists(String indexName) {
Assert.notNull(indexName, "Index name must not be null!");

try {
Node[] nodes = reactorClient.getAerospikeClient().getNodes();
for (Node node : nodes) {
String response = Info.request(node, "sindex-exists:ns=" + namespace + ";indexname=" + indexName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see sindex-exists in as-info commands reference: https://docs.aerospike.com/reference/info
Also, we need to verify whether this info call is a node level or a cluster level (your code assumes its a node level info call but I remember that with indexes api it's enough to send as info call to a single node).

Copy link
Author

@agrgr agrgr Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was done according to Brian's letter (attached as a comment to FMWK-123)

if (response == null) throw new AerospikeException("Null node response");

if (response.equalsIgnoreCase("true")) {
return Mono.just(true);
} else if (response.equalsIgnoreCase("false")) {
return Mono.just(false);
} else {
Matcher matcher = INDEX_EXISTS_REGEX_PATTERN.matcher(response);
if (matcher.matches()) {
int reason;
try {
reason = Integer.parseInt(matcher.group(1));
} catch (NumberFormatException e) {
throw new AerospikeException("Unexpected node response, unable to parse ResultCode: " +
response);
}

// as for Server ver. >= 6.1.0.1 the response containing ResultCode.INVALID_NAMESPACE
// means that the request should be sent to another node
if (reason != ResultCode.INVALID_NAMESPACE) {
throw new AerospikeException(reason);
}
} else {
throw new AerospikeException("Unexpected node response: " + response);
}
}
}
} catch (AerospikeException e) {
throw translateError(e);
}
return Mono.just(false);
}

@Override
public IAerospikeReactorClient getAerospikeReactorClient() {
return reactorClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.data.aerospike.repository;

import com.aerospike.client.query.IndexType;
import org.springframework.data.aerospike.IndexAlreadyExistsException;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.data.repository.Repository;

Expand Down Expand Up @@ -52,13 +51,6 @@ public interface AerospikeRepository<T, ID> extends PagingAndSortingRepository<T
*
* @param indexName The Aerospike index name.
* @return true if exists
* @deprecated This operation is deprecated due to complications that are required for guaranteed index existence
* response.
* <p>If you need to conditionally create index — replace this method (indexExists) with {@link #createIndex} and
* catch {@link IndexAlreadyExistsException}.
* <p>More information can be found at: <a href="https://github.com/aerospike/aerospike-client-java/pull/149">
* https://github.com/aerospike/aerospike-client-java/pull/149</a>
*/
@Deprecated
boolean indexExists(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,15 @@ public <T> void dropIndexIfExists(Class<T> entityClass, String indexName) {
IndexUtils.dropIndex(client, getNamespace(), getSetName(entityClass), indexName);
}

/**
* @deprecated since Aerospike Server ver. 6.1.0.1.
* Use {@link org.springframework.data.aerospike.core.AerospikeTemplate#indexExists(String)}
*/
// Do not use this code in production!
// This will not guarantee the correct answer from Aerospike Server for all cases.
// Also, it requests index status only from one Aerospike node, which is OK for tests, and NOT OK for Production
// cluster.
@Deprecated
public boolean indexExists(String indexName) {
return IndexUtils.indexExists(client, getNamespace(), indexName);
}
Expand Down
68 changes: 56 additions & 12 deletions src/test/java/org/springframework/data/aerospike/IndexUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,99 @@
import org.springframework.data.aerospike.query.cache.IndexInfoParser;
import org.springframework.data.aerospike.query.model.Index;

import java.lang.module.ModuleDescriptor;
import java.util.Arrays;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class IndexUtils {

private static final ModuleDescriptor.Version SERVER_VERSION_6_1_0_1 = ModuleDescriptor.Version.parse("6.1.0.1");

public static void dropIndex(IAerospikeClient client, String namespace, String setName, String indexName) {
waitTillComplete(() -> client.dropIndex(null, namespace, setName, indexName));
if (IndexUtils.isDropCreateBehaviorUpdated(client)) {
waitTillComplete(() -> client.dropIndex(null, namespace, setName, indexName));
} else {
// ignoring ResultCode.INDEX_NOTFOUND for Aerospike Server prior to ver. 6.1.0.1
ignoreErrorAndWait(ResultCode.INDEX_NOTFOUND, () -> client.dropIndex(null, namespace, setName, indexName));
}
}

public static void createIndex(IAerospikeClient client, String namespace, String setName, String indexName,
String binName, IndexType indexType) {
waitTillComplete(() -> client.createIndex(null, namespace, setName, indexName, binName, indexType));
if (IndexUtils.isDropCreateBehaviorUpdated(client)) {
waitTillComplete(() -> client.createIndex(null, namespace, setName, indexName, binName, indexType));
} else {
// ignoring ResultCode.INDEX_ALREADY_EXISTS for Aerospike Server prior to ver. 6.1.0.1
ignoreErrorAndWait(ResultCode.INDEX_ALREADY_EXISTS, () -> client.createIndex(null, namespace, setName,
indexName, binName, indexType));
}
}

public static void createIndex(IAerospikeClient client, String namespace, String setName, String indexName,
String binName, IndexType indexType, IndexCollectionType collectionType) {
waitTillComplete(() -> client.createIndex(null, namespace, setName, indexName, binName, indexType,
collectionType));
if (IndexUtils.isDropCreateBehaviorUpdated(client)) {
waitTillComplete(() -> client.createIndex(null, namespace, setName, indexName, binName, indexType,
collectionType));
} else {
// ignoring ResultCode.INDEX_ALREADY_EXISTS for Aerospike Server prior to ver. 6.1.0.1
ignoreErrorAndWait(ResultCode.INDEX_ALREADY_EXISTS, () -> client.createIndex(null, namespace, setName,
indexName, binName, indexType, collectionType));
}
}

public static List<Index> getIndexes(IAerospikeClient client, String namespace, IndexInfoParser indexInfoParser) {
Node node = getNode(client);
Node node = client.getCluster().getRandomNode();
String response = Info.request(node, "sindex-list:ns=" + namespace + ";b64=true");
return Arrays.stream(response.split(";"))
.map(indexInfoParser::parse)
.collect(Collectors.toList());
}

/**
* @deprecated since Aerospike Server ver. 6.1.0.1. Use
* {@link org.springframework.data.aerospike.core.AerospikeTemplate#indexExists(String)}
*/
public static boolean indexExists(IAerospikeClient client, String namespace, String indexName) {
Node node = getNode(client);
Node node = client.getCluster().getRandomNode();
String response = Info.request(node, "sindex/" + namespace + '/' + indexName);
return !response.startsWith("FAIL:201");
}

public static String getServerVersion(IAerospikeClient client) {
String versionString = Info.request(client.getCluster().getRandomNode(), "version");
return versionString.substring(versionString.lastIndexOf(' ') + 1);
}

/**
* Since Aerospike Server ver. 6.1.0.1 attempting to create a secondary index which already exists or to drop a
* non-existing secondary index now returns success/OK instead of an error.
*/
public static boolean isDropCreateBehaviorUpdated(IAerospikeClient client) {
return ModuleDescriptor.Version.parse(IndexUtils.getServerVersion(client))
.compareTo(SERVER_VERSION_6_1_0_1) >= 0;
}

private static void waitTillComplete(Supplier<IndexTask> supplier) {
IndexTask task = supplier.get();
if (task == null) {
throw new IllegalStateException("task can not be null");
throw new IllegalStateException("Task can not be null");
}
task.waitTillComplete();
}

private static Node getNode(IAerospikeClient client) {
Node[] nodes = client.getNodes();
if (nodes.length == 0) {
throw new AerospikeException(ResultCode.SERVER_NOT_AVAILABLE, "Command failed because cluster is empty.");
private static void ignoreErrorAndWait(int errorCodeToSkip, Supplier<IndexTask> supplier) {
try {
IndexTask task = supplier.get();
if (task == null) {
throw new IllegalStateException("Task can not be null");
}
task.waitTillComplete();
} catch (AerospikeException e) {
if (e.getResultCode() != errorCodeToSkip) {
throw e;
}
}
return nodes[0];
}
}
Loading