Skip to content

Commit

Permalink
FMWK-445 Refactor value setters in Qualifier builders (#746)
Browse files Browse the repository at this point in the history
* Make setValue() and setSecondValue() accept Object that is read into Value
* Replace setValueAsObj() and setSecondValueAsObj() in MetadataQualifierBuilder with unified setValue() and setSecondValue()
* Add logging whether sIndex filter and filterExp are created
* Cleanup
  • Loading branch information
agrgr authored May 28, 2024
1 parent dae6842 commit 9b09dde
Show file tree
Hide file tree
Showing 23 changed files with 259 additions and 242 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.springframework.data.aerospike.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand All @@ -10,6 +11,7 @@
*/
@Retention(RetentionPolicy.SOURCE)
@Target({ElementType.TYPE, ElementType.CONSTRUCTOR, ElementType.METHOD})
@Documented
public @interface Beta {

}
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,8 @@ private Record getRecord(AerospikePersistentEntity<?> entity, Key key, Query que
private BatchPolicy getBatchPolicyFilterExp(Query query) {
if (queryCriteriaIsNotNull(query)) {
BatchPolicy policy = new BatchPolicy(getAerospikeClient().getBatchPolicyDefault());
policy.filterExp = queryEngine.getFilterExpressionsBuilder().build(query);
Qualifier qualifier = query.getCriteriaObject();
policy.filterExp = queryEngine.getFilterExpressionsBuilder().build(qualifier);
return policy;
}
return null;
Expand Down Expand Up @@ -815,7 +816,8 @@ private <S> Object getRecordMapToTargetClass(AerospikePersistentEntity<?> entity
private Policy getPolicyFilterExp(Query query) {
if (queryCriteriaIsNotNull(query)) {
Policy policy = new Policy(getAerospikeClient().getReadPolicyDefault());
policy.filterExp = queryEngine.getFilterExpressionsBuilder().build(query);
Qualifier qualifier = query.getCriteriaObject();
policy.filterExp = queryEngine.getFilterExpressionsBuilder().build(qualifier);
return policy;
}
return null;
Expand All @@ -826,7 +828,8 @@ private Record getAndTouch(Key key, int expiration, String[] binNames, Query que
.expiration(expiration);

if (queryCriteriaIsNotNull(query)) {
writePolicyBuilder.filterExp(queryEngine.getFilterExpressionsBuilder().build(query));
Qualifier qualifier = query.getCriteriaObject();
writePolicyBuilder.filterExp(queryEngine.getFilterExpressionsBuilder().build(qualifier));
}
WritePolicy writePolicy = writePolicyBuilder.build();

Expand Down Expand Up @@ -1424,7 +1427,6 @@ private <T> Stream<KeyRecord> findRecordsUsingQuery(String setName, Class<T> tar
}

KeyRecordIterator recIterator;

if (targetClass != null) {
String[] binNames = getBinNamesFromTargetClass(targetClass, mappingContext);
recIterator = queryEngine.select(namespace, setName, binNames, query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,8 @@ public <T, S> Mono<?> findByIdUsingQuery(Object id, Class<T> entityClass, Class<
Policy policy = null;
if (queryCriteriaIsNotNull(query)) {
policy = new Policy(reactorClient.getReadPolicyDefault());
policy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(query);
Qualifier qualifier = query.getCriteriaObject();
policy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(qualifier);
}
return reactorClient.get(policy, key, binNames)
.filter(keyRecord -> Objects.nonNull(keyRecord.record))
Expand Down Expand Up @@ -1010,7 +1011,8 @@ public <T> Flux<T> findInRange(long offset, long limit, Sort sort, Class<T> targ
private BatchPolicy getBatchPolicyFilterExp(Query query) {
if (queryCriteriaIsNotNull(query)) {
BatchPolicy policy = new BatchPolicy(reactorClient.getAerospikeClient().getBatchPolicyDefault());
policy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(query);
Qualifier qualifier = query.getCriteriaObject();
policy.filterExp = reactorQueryEngine.getFilterExpressionsBuilder().build(qualifier);
return policy;
}
return null;
Expand Down Expand Up @@ -1267,7 +1269,8 @@ private Mono<KeyRecord> getAndTouch(Key key, int expiration, String[] binNames,
.expiration(expiration);

if (queryCriteriaIsNotNull(query)) {
writePolicyBuilder.filterExp(reactorQueryEngine.getFilterExpressionsBuilder().build(query));
Qualifier qualifier = query.getCriteriaObject();
writePolicyBuilder.filterExp(reactorQueryEngine.getFilterExpressionsBuilder().build(qualifier));
}
WritePolicy writePolicy = writePolicyBuilder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@

import com.aerospike.client.exp.Exp;
import com.aerospike.client.exp.Expression;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.aerospike.query.qualifier.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;

import static org.springframework.data.aerospike.query.FilterOperation.dualFilterOperations;
import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;

@Slf4j
public class FilterExpressionsBuilder {

public Expression build(Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteriaObject() : null;
public Expression build(Qualifier qualifier) {
if (qualifier != null && requiresFilterExp(qualifier)) {
return Exp.build(qualifier.getFilterExp());
Exp exp = qualifier.getFilterExp();
if (exp == null) {
log.debug("Query #{}, filterExp is not set", qualifier.hashCode());
} else {
log.debug("Query #{}, filterExp is set", qualifier.hashCode());
}
return Exp.build(exp);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.Optional;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.aerospike.client.command.ParticleType.BOOL;
import static com.aerospike.client.command.ParticleType.INTEGER;
Expand All @@ -54,6 +53,7 @@
import static org.springframework.data.aerospike.util.FilterOperationRegexpBuilder.getNotContaining;
import static org.springframework.data.aerospike.util.FilterOperationRegexpBuilder.getStartsWith;
import static org.springframework.data.aerospike.util.FilterOperationRegexpBuilder.getStringEquals;
import static org.springframework.data.aerospike.util.Utils.ctxArrToString;
import static org.springframework.data.aerospike.util.Utils.getExpType;
import static org.springframework.data.aerospike.util.Utils.getValueExpOrFail;

Expand Down Expand Up @@ -1266,21 +1266,21 @@ public Filter sIndexFilter(Map<QualifierKey, Object> qualifierMap) {
@SuppressWarnings("unchecked")
private static Exp processMetadataFieldInOrNot(Map<QualifierKey, Object> qualifierMap, boolean notIn) {
FilterOperation filterOperation = notIn ? NOTEQ : EQ;
Object value = getValueAsObject(qualifierMap);
Object obj = getValue(qualifierMap).getObject();

Collection<Long> listOfLongs;
try {
listOfLongs = (Collection<Long>) value; // previously validated
listOfLongs = (Collection<Long>) obj; // previously validated
} catch (Exception e) {
String operation = notIn ? "NOT_IN" : "IN";
throw new IllegalStateException("FilterOperation." + operation + " metadata query: expecting value with " +
"type List<Long>");
throw new IllegalStateException("FilterOperation." + operation + " metadata query: expecting value as " +
"a Collection<Long>");
}
Exp[] listElementsExp = listOfLongs.stream().map(item ->
Qualifier.metadataBuilder()
.setMetadataField(getMetadataField(qualifierMap))
.setFilterOperation(filterOperation)
.setValueAsObj(item)
.setValue(Value.get(item))
.build()
.getFilterExp()
).toArray(Exp[]::new);
Expand Down Expand Up @@ -1331,13 +1331,13 @@ private static Optional<Exp> getMetadataExp(Map<QualifierKey, Object> qualifierM
return Optional.of(
operationFunction.apply(
mapMetadataExp(metadataField, getServerVersionSupport(qualifierMap)),
Exp.val(getValueAsLongOrFail(getValueAsObject(qualifierMap)))
Exp.val(getValue(qualifierMap).toLong()) // previously validated
)
);
}
case BETWEEN -> {
Exp metadata = mapMetadataExp(metadataField, getServerVersionSupport(qualifierMap));
Exp value = Exp.val(getValue(qualifierMap).toLong());
Exp value = Exp.val(getValue(qualifierMap).toLong()); // previously validated
Exp secondValue = Exp.val(getSecondValue(qualifierMap).toLong());
return Optional.of(Exp.and(Exp.ge(metadata, value), Exp.lt(metadata, secondValue)));
}
Expand All @@ -1353,15 +1353,6 @@ private static Optional<Exp> getMetadataExp(Map<QualifierKey, Object> qualifierM
return Optional.empty();
}

// expecting value always be of type Long
private static Long getValueAsLongOrFail(Object value) {
try {
return (Long) value;
} catch (Exception e) {
throw new IllegalArgumentException("Expecting value to be of type Long");
}
}

private static Exp mapMetadataExp(CriteriaDefinition.AerospikeMetadata metadataField,
ServerVersionSupport versionSupport) {
return switch (metadataField) {
Expand Down Expand Up @@ -1682,10 +1673,6 @@ protected static Value getValue(Map<QualifierKey, Object> qualifierMap) {
return Value.get(qualifierMap.get(VALUE));
}

protected static Object getValueAsObject(Map<QualifierKey, Object> qualifierMap) {
return qualifierMap.get(VALUE);
}

protected static Value getSecondValue(Map<QualifierKey, Object> qualifierMap) {
return Value.get(qualifierMap.get(SECOND_VALUE));
}
Expand All @@ -1706,10 +1693,6 @@ protected static String getCtxArrAsString(Map<QualifierKey, Object> qualifierMap
return ctxArrToString(ctxArr);
}

private static String ctxArrToString(CTX[] ctxArr) {
return Arrays.stream(ctxArr).map(ctx -> ctx.value.toString()).collect(Collectors.joining("."));
}

protected static ServerVersionSupport getServerVersionSupport(Map<QualifierKey, Object> qualifierMap) {
return (ServerVersionSupport) qualifierMap.get(SERVER_VERSION_SUPPORT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.aerospike.client.query.Statement;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.aerospike.config.AerospikeDataSettings;
import org.springframework.data.aerospike.query.qualifier.Qualifier;
import org.springframework.data.aerospike.repository.query.Query;
Expand All @@ -41,6 +43,7 @@
*/
public class QueryEngine {

private static final Logger logger = LoggerFactory.getLogger(QueryEngine.class);
public static final String SCANS_DISABLED_MESSAGE =
"Query without a filter will initiate a scan. Since scans are potentially dangerous operations, they are " +
"disabled by default in spring-data-aerospike. " +
Expand Down Expand Up @@ -116,7 +119,7 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames,
}
Statement statement = statementBuilder.build(namespace, set, query, binNames);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = getQueryPolicy(query, true);
QueryPolicy localQueryPolicy = getQueryPolicy(qualifier, true);

if (!scansEnabled && statement.getFilter() == null) {
throw new IllegalStateException(SCANS_DISABLED_MESSAGE);
Expand All @@ -137,7 +140,8 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames,
public KeyRecordIterator selectForCount(String namespace, String set, @Nullable Query query) {
Statement statement = statementBuilder.build(namespace, set, query);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = getQueryPolicy(query, false);
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteriaObject() : null;
QueryPolicy localQueryPolicy = getQueryPolicy(qualifier, false);

if (!scansEnabled && statement.getFilter() == null) {
throw new IllegalStateException(SCANS_DISABLED_MESSAGE);
Expand All @@ -155,9 +159,9 @@ private Record getRecord(Policy policy, Key key, String[] binNames) {
return client.get(policy, key, binNames);
}

private QueryPolicy getQueryPolicy(Query query, boolean includeBins) {
private QueryPolicy getQueryPolicy(Qualifier qualifier, boolean includeBins) {
QueryPolicy queryPolicy = new QueryPolicy(client.getQueryPolicyDefault());
queryPolicy.filterExp = filterExpressionsBuilder.build(query);
queryPolicy.filterExp = filterExpressionsBuilder.build(qualifier);
queryPolicy.includeBinData = includeBins;
return queryPolicy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Flux<KeyRecord> select(String namespace, String set, String[] binNames, @
}
Statement statement = statementBuilder.build(namespace, set, query, binNames);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = getQueryPolicy(query, true);
QueryPolicy localQueryPolicy = getQueryPolicy(qualifier, true);

if (!scansEnabled && statement.getFilter() == null) {
return Flux.error(new IllegalStateException(QueryEngine.SCANS_DISABLED_MESSAGE));
Expand All @@ -128,7 +128,8 @@ public Flux<KeyRecord> select(String namespace, String set, String[] binNames, @
public Flux<KeyRecord> selectForCount(String namespace, String set, @Nullable Query query) {
Statement statement = statementBuilder.build(namespace, set, query);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = getQueryPolicy(query, false);
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteriaObject() : null;
QueryPolicy localQueryPolicy = getQueryPolicy(qualifier, false);

if (!scansEnabled && statement.getFilter() == null) {
return Flux.error(new IllegalStateException(QueryEngine.SCANS_DISABLED_MESSAGE));
Expand All @@ -137,9 +138,9 @@ public Flux<KeyRecord> selectForCount(String namespace, String set, @Nullable Qu
return client.query(localQueryPolicy, statement);
}

private QueryPolicy getQueryPolicy(Query query, boolean includeBins) {
private QueryPolicy getQueryPolicy(Qualifier qualifier, boolean includeBins) {
QueryPolicy queryPolicy = new QueryPolicy(client.getQueryPolicyDefault());
queryPolicy.filterExp = filterExpressionsBuilder.build(query);
queryPolicy.filterExp = filterExpressionsBuilder.build(qualifier);
queryPolicy.includeBinData = includeBins;
return queryPolicy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

import com.aerospike.client.query.Filter;
import com.aerospike.client.query.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.aerospike.query.cache.IndexesCache;
import org.springframework.data.aerospike.query.model.Index;
import org.springframework.data.aerospike.query.model.IndexedField;
Expand All @@ -32,14 +31,15 @@
import java.util.Optional;

import static org.springframework.data.aerospike.query.QualifierUtils.queryCriteriaIsNotNull;
import static org.springframework.data.aerospike.util.Utils.logQualifierDetails;

/**
* @author peter
* @author Anastasiia Smirnova
*/
@Slf4j
public class StatementBuilder {

private static final Logger log = LoggerFactory.getLogger(StatementBuilder.class);
private final IndexesCache indexesCache;

public StatementBuilder(IndexesCache indexesCache) {
Expand All @@ -58,7 +58,10 @@ public Statement build(String namespace, String set, @Nullable Query query, Stri
stmt.setBinNames(binNames);
}
if (queryCriteriaIsNotNull(query)) {
// statement's filter is set based on the first processed qualifier's filter
// logging query
logQualifierDetails(query.getCriteriaObject(), log);
// statement's filter is set based either on cardinality (the lowest bin values ratio)
// or on order (the first processed filter)
setStatementFilterFromQualifiers(stmt, query.getCriteriaObject());
}
return stmt;
Expand All @@ -67,6 +70,7 @@ public Statement build(String namespace, String set, @Nullable Query query, Stri
private void setStatementFilterFromQualifiers(Statement stmt, Qualifier qualifier) {
// No qualifier, no need to set statement filter
if (qualifier == null) {
log.debug("Query #{}, secondary index filter is not set", qualifier.hashCode());
return;
}

Expand All @@ -77,6 +81,12 @@ private void setStatementFilterFromQualifiers(Statement stmt, Qualifier qualifie
} else if (isIndexedBin(stmt, qualifier)) { // Single qualifier
setFilterFromSingleQualifier(stmt, qualifier);
}
if (stmt.getFilter() != null) {
log.debug("Query #{}, secondary index filter is set on the bin '{}'", qualifier.hashCode(),
stmt.getFilter().getName());
} else {
log.debug("Query #{}, secondary index filter is not set", qualifier.hashCode());
}
}

private void setFilterFromMultipleQualifiers(Statement stmt, Qualifier qualifier) {
Expand Down Expand Up @@ -130,8 +140,8 @@ private boolean isIndexedBin(Statement stmt, Qualifier qualifier) {
}

if (log.isDebugEnabled() && hasField) {
log.debug("Bin {}.{}.{} has secondary index: {}",
stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), hasIndex);
log.debug("Query #{}, bin {}.{}.{} has secondary index: {}",
qualifier.hashCode(), stmt.getNamespace(), stmt.getSetName(), qualifier.getBinName(), hasIndex);
}
return hasIndex;
}
Expand Down
Loading

0 comments on commit 9b09dde

Please sign in to comment.