Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-199 Refactor implementation of paginated queries #658

Merged
merged 18 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public QueryEngine queryEngine(IAerospikeClient aerospikeClient,
boolean scansEnabled = aerospikeDataSettings().isScansEnabled();
log.debug("AerospikeDataSettings.scansEnabled: {}", scansEnabled);
queryEngine.setScansEnabled(scansEnabled);
long queryMaxRecords = aerospikeDataSettings().getQueryMaxRecords();
log.debug("AerospikeDataSettings.queryMaxRecords: {}", queryMaxRecords);
queryEngine.setQueryMaxRecords(queryMaxRecords);
return queryEngine;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public ReactorQueryEngine reactorQueryEngine(IAerospikeReactorClient aerospikeRe
boolean scansEnabled = aerospikeDataSettings().isScansEnabled();
queryEngine.setScansEnabled(scansEnabled);
log.debug("AerospikeDataSettings.scansEnabled: {}", scansEnabled);
long queryMaxRecords = aerospikeDataSettings().getQueryMaxRecords();
log.debug("AerospikeDataSettings.queryMaxRecords: {}", queryMaxRecords);
queryEngine.setQueryMaxRecords(queryMaxRecords);
return queryEngine;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class AerospikeDataSettings {
boolean createIndexesOnStartup = true;
@Builder.Default
int indexCacheRefreshFrequencySeconds = 3600;
@Builder.Default
long queryMaxRecords = 10_000;

/*
* (non-Javadoc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.query.ResultSet;
import org.springframework.data.aerospike.config.AerospikeDataSettings;
import org.springframework.data.aerospike.core.model.GroupedEntities;
import org.springframework.data.aerospike.core.model.GroupedKeys;
import org.springframework.data.aerospike.repository.query.Query;
Expand Down Expand Up @@ -74,6 +75,11 @@ public interface AerospikeOperations {
*/
IAerospikeClient getAerospikeClient();

/**
* @return value of configuration parameter {@link AerospikeDataSettings#getQueryMaxRecords()}.
*/
long getQueryMaxRecords();

/**
* Save a document.
* <p>
Expand Down Expand Up @@ -896,6 +902,18 @@ <T, S> List<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Clas
*/
<T> Stream<T> findInRange(long offset, long limit, Sort sort, Class<T> targetClass, String setName);

/**
* Find documents in the given entityClass's set using a query and map them to the given target class type. If the
* query has pagination and/or sorting, post-processing must be applied separately.
*
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @param targetClass The class to map the document to.
* @param query The {@link Query} to filter results.
* @return A Stream of all matching documents regardless of pagination/sorting, returned documents will be mapped to
* targetClass's type.
*/
<T, S> Stream<S> findUsingQueryWithoutPostProcessing(Class<T> entityClass, Class<S> targetClass, Query query);

/**
* Check if a document exists by providing document id and entityClass (set name will be determined by the given
* entityClass).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public IAerospikeClient getAerospikeClient() {
return client;
}

@Override
public long getQueryMaxRecords() {
return queryEngine.getQueryMaxRecords();
}

@Override
public void refreshIndexesCache() {
indexRefresher.refreshIndexes();
Expand Down Expand Up @@ -814,7 +819,7 @@ public <T, S> Object findByIdUsingQuery(Object id, Class<T> entityClass, Class<S
Assert.notNull(entityClass, "Entity class must not be null!");
Assert.notNull(setName, "Set name must not be null!");

Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
try {
AerospikePersistentEntity<?> entity = mappingContext.getRequiredPersistentEntity(entityClass);
Key key = getKey(id, setName);
Expand Down Expand Up @@ -944,6 +949,14 @@ private <T> Stream<T> findUsingQueryWithPostProcessing(String setName, Class<T>
return applyPostProcessingOnResults(results, query);
}

@Override
public <T, S> Stream<S> findUsingQueryWithoutPostProcessing(Class<T> entityClass, Class<S> targetClass,
Query query) {
verifyUnsortedWithOffset(query.getSort(), query.getOffset());
return findUsingQueryWithDistinctPredicate(getSetName(entityClass), targetClass,
getDistinctPredicate(query), query);
}

private <T> Stream<T> findUsingQueryWithDistinctPredicate(String setName, Class<T> targetClass,
Predicate<KeyRecord> distinctPredicate,
Query query) {
Expand Down Expand Up @@ -1044,15 +1057,15 @@ public <T> long count(Query query, Class<T> entityClass) {

@Override
public long count(Query query, String setName) {
Stream<KeyRecord> results = findRecordsUsingQuery(setName, query);
Stream<KeyRecord> results = countRecordsUsingQuery(setName, query);
return results.count();
}

private Stream<KeyRecord> findRecordsUsingQuery(String setName, Query query) {
private Stream<KeyRecord> countRecordsUsingQuery(String setName, Query query) {
Assert.notNull(query, "Query must not be null!");
Assert.notNull(setName, "Set name must not be null!");

return findRecordsUsingQuery(setName, null, query);
return countRecordsUsingQuery(setName, null, query);
}

@Override
Expand Down Expand Up @@ -1273,7 +1286,7 @@ private <T> Stream<T> applyPostProcessingOnResults(Stream<T> results, Sort sort,
}

private <T> Stream<KeyRecord> findRecordsUsingQuery(String setName, Class<T> targetClass, Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
if (idQualifier != null) {
Expand Down Expand Up @@ -1302,6 +1315,36 @@ private <T> Stream<KeyRecord> findRecordsUsingQuery(String setName, Class<T> tar
});
}

private <T> Stream<KeyRecord> countRecordsUsingQuery(String setName, Class<T> targetClass, Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
if (idQualifier != null) {
// a special flow if there is id given
return findByIdsWithoutMapping(getIdValue(idQualifier), setName, targetClass,
new Query(excludeIdQualifier(qualifier))).stream();
}
}

KeyRecordIterator recIterator;

if (targetClass != null) {
String[] binNames = getBinNamesFromTargetClass(targetClass);
recIterator = queryEngine.selectForCount(namespace, setName, binNames, query);
} else {
recIterator = queryEngine.selectForCount(namespace, setName, null, query);
}

return StreamUtils.createStreamFromIterator(recIterator)
.onClose(() -> {
try {
recIterator.close();
} catch (Exception e) {
log.error("Caught exception while closing query", e);
}
});
}

private List<KeyRecord> findByIdsWithoutMapping(Collection<?> ids, String setName,
Class<?> targetClass, Query query) {
Assert.notNull(ids, "Ids must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ static void verifyUnsortedWithOffset(Sort sort, long offset) {
static Predicate<KeyRecord> getDistinctPredicate(Query query) {
Predicate<KeyRecord> distinctPredicate;
if (query != null && query.isDistinct()) {
String dotPathString = query.getCriteria().getCriteriaObject().getDotPath();
String dotPathString = query.getQualifier().getDotPath();
if (StringUtils.hasLength(dotPathString)) {
throw new UnsupportedOperationException("DISTINCT queries are currently supported only for the first " +
"level objects, got a query for " + dotPathString);
}

final Set<Object> distinctValues = ConcurrentHashMap.newKeySet();
distinctPredicate = kr -> {
final String distinctField = query.getCriteria().getCriteriaObject().getField();
final String distinctField = query.getQualifier().getField();
if (kr.record == null || kr.record.bins == null) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.aerospike.client.query.IndexCollectionType;
import com.aerospike.client.query.IndexType;
import com.aerospike.client.reactor.IAerospikeReactorClient;
import org.springframework.data.aerospike.config.AerospikeDataSettings;
import org.springframework.data.aerospike.core.model.GroupedEntities;
import org.springframework.data.aerospike.core.model.GroupedKeys;
import org.springframework.data.aerospike.repository.query.Query;
Expand Down Expand Up @@ -51,6 +52,11 @@ public interface ReactiveAerospikeOperations {
*/
IAerospikeReactorClient getAerospikeReactorClient();

/**
* @return value of configuration parameter {@link AerospikeDataSettings#getQueryMaxRecords()}.
*/
long getQueryMaxRecords();

/**
* Reactively save document.
* <p>
Expand Down Expand Up @@ -868,6 +874,18 @@ <T, S> Flux<?> findByIdsUsingQuery(Collection<?> ids, Class<T> entityClass, Clas
*/
<T, S> Flux<S> findInRange(long offset, long limit, Sort sort, Class<T> entityClass, Class<S> targetClass);

/**
* Reactively find documents in the given entityClass's set using a query and map them to the given target class
* type. If the query has pagination and/or sorting, post-processing must be applied separately.
*
* @param entityClass The class to extract the Aerospike set from. Must not be {@literal null}.
* @param targetClass The class to map the document to.
* @param query The {@link Query} to filter results.
* @return A Flux of all matching documents regardless of pagination/sorting, returned documents will be mapped to
* targetClass's type.
*/
<T, S> Flux<S> findUsingQueryWithoutPostProcessing(Class<T> entityClass, Class<S> targetClass, Query query);

/**
* Reactively check if document exists by providing document id and entityClass (set name will be determined by the
* given entityClass).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,14 +974,14 @@ public Mono<Long> count(Query query, String setName) {
Assert.notNull(query, "Query must not be null!");
Assert.notNull(setName, "Set for count must not be null!");

return findRecordsUsingQuery(setName, query).count();
return countRecordsUsingQuery(setName, query).count();
}

private Flux<KeyRecord> findRecordsUsingQuery(String setName, Query query) {
private Flux<KeyRecord> countRecordsUsingQuery(String setName, Query query) {
Assert.notNull(query, "Query must not be null!");
Assert.notNull(setName, "Set name must not be null!");

return findRecordsUsingQuery(setName, null, query);
return countRecordsUsingQuery(setName, null, query);
}

@Override
Expand Down Expand Up @@ -1094,6 +1094,11 @@ public IAerospikeReactorClient getAerospikeReactorClient() {
return reactorClient;
}

@Override
public long getQueryMaxRecords() {
return reactorQueryEngine.getQueryMaxRecords();
}

private <T> Mono<T> doPersistAndHandleError(T document, AerospikeWriteData data, WritePolicy policy,
Operation[] operations) {
return reactorClient
Expand Down Expand Up @@ -1177,6 +1182,13 @@ private <T> Flux<T> findUsingQueryWithPostProcessing(String setName, Class<T> ta
return results;
}

@Override
public <T, S> Flux<S> findUsingQueryWithoutPostProcessing(Class<T> entityClass, Class<S> targetClass, Query query) {
verifyUnsortedWithOffset(query.getSort(), query.getOffset());
return findUsingQueryWithDistinctPredicate(getSetName(entityClass), targetClass,
getDistinctPredicate(query), query);
}

private void verifyUnsortedWithOffset(Sort sort, long offset) {
if ((sort == null || sort.isUnsorted())
&& offset > 0) {
Expand Down Expand Up @@ -1229,7 +1241,7 @@ private <T> Flux<T> findUsingQueryWithDistinctPredicate(String setName, Class<T>
}

private <T> Flux<KeyRecord> findRecordsUsingQuery(String setName, Class<T> targetClass, Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
if (idQualifier != null) {
Expand All @@ -1243,7 +1255,26 @@ private <T> Flux<KeyRecord> findRecordsUsingQuery(String setName, Class<T> targe
String[] binNames = getBinNamesFromTargetClass(targetClass);
return this.reactorQueryEngine.select(this.namespace, setName, binNames, query);
} else {
return this.reactorQueryEngine.select(this.namespace, setName, query);
return this.reactorQueryEngine.select(this.namespace, setName, null, query);
}
}

private <T> Flux<KeyRecord> countRecordsUsingQuery(String setName, Class<T> targetClass, Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null) {
Qualifier idQualifier = getOneIdQualifier(qualifier);
if (idQualifier != null) {
// a special flow if there is id given
return findByIdsWithoutMapping(getIdValue(idQualifier), setName, targetClass,
new Query(excludeIdQualifier(qualifier)));
}
}

if (targetClass != null) {
String[] binNames = getBinNamesFromTargetClass(targetClass);
return this.reactorQueryEngine.selectForCount(this.namespace, setName, binNames, query);
} else {
return this.reactorQueryEngine.selectForCount(this.namespace, setName, null, query);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public class FilterExpressionsBuilder {

public Expression build(Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
if (qualifier != null && excludeIrrelevantFilters(qualifier)) {
return Exp.build(qualifier.toFilterExp());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.aerospike.client.query.RecordSet;
import com.aerospike.client.query.Statement;
import lombok.Getter;
import lombok.Setter;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.lang.Nullable;

Expand Down Expand Up @@ -53,7 +54,11 @@ public class QueryEngine {
* Scans can potentially slow down Aerospike server, so we are disabling them by default. If you still need to use
* scans, set this property to true.
*/
private boolean scansEnabled = false;
@Setter
private boolean scansEnabled;
@Setter
@Getter
private long queryMaxRecords;

public QueryEngine(IAerospikeClient client, StatementBuilder statementBuilder,
FilterExpressionsBuilder filterExpressionsBuilder, QueryPolicy queryPolicy) {
Expand Down Expand Up @@ -85,7 +90,7 @@ public KeyRecordIterator select(String namespace, String set, @Nullable Query qu
* @return A KeyRecordIterator to iterate over the results
*/
public KeyRecordIterator select(String namespace, String set, String[] binNames, @Nullable Query query) {
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getCriteria().getCriteriaObject() : null;
Qualifier qualifier = queryCriteriaIsNotNull(query) ? query.getQualifier() : null;
/*
* singleton using primary key
*/
Expand All @@ -105,6 +110,7 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames,
* query with filters
*/
Statement statement = statementBuilder.build(namespace, set, query, binNames);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = new QueryPolicy(queryPolicy);
localQueryPolicy.filterExp = filterExpressionsBuilder.build(query);

Expand All @@ -116,6 +122,30 @@ public KeyRecordIterator select(String namespace, String set, String[] binNames,
return new KeyRecordIterator(namespace, rs);
}

/**
* Select records filtered by a query to be counted
*
* @param namespace Namespace to storing the data
* @param set Set storing the data
* @param binNames Bin names to return from the query
* @param query {@link Query} for filtering results
* @return A KeyRecordIterator to iterate over the results
*/
public KeyRecordIterator selectForCount(String namespace, String set, String[] binNames, @Nullable Query query) {
Statement statement = statementBuilder.build(namespace, set, query, binNames);
statement.setMaxRecords(queryMaxRecords);
QueryPolicy localQueryPolicy = new QueryPolicy(queryPolicy);
localQueryPolicy.filterExp = filterExpressionsBuilder.build(query);
localQueryPolicy.includeBinData = false;

if (!scansEnabled && statement.getFilter() == null) {
throw new IllegalStateException(SCANS_DISABLED_MESSAGE);
}

RecordSet rs = client.query(localQueryPolicy, statement);
return new KeyRecordIterator(namespace, rs);
}

@SuppressWarnings("SameParameterValue")
private Record getRecord(Policy policy, Key key, String[] binNames) {
if (binNames == null || binNames.length == 0) {
Expand All @@ -124,10 +154,7 @@ private Record getRecord(Policy policy, Key key, String[] binNames) {
return client.get(policy, key, binNames);
}

public void setScansEnabled(boolean scansEnabled) {
this.scansEnabled = scansEnabled;
}

@Deprecated(since = "4.6.0", forRemoval = true)
public enum Meta {
KEY,
TTL,
Expand Down
Loading
Loading