Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FMWK-301 Support beforeLastUpdate parameter in deleteAll #682

Merged
merged 9 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import org.springframework.data.util.StreamUtils;
import org.springframework.util.Assert;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -538,15 +540,28 @@ private void deleteGroupedEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
@Override
public <T> void deleteAll(Class<T> entityClass) {
Assert.notNull(entityClass, "Class must not be null!");
deleteAll(getSetName(entityClass));
deleteAll(entityClass, null);
}

@Override
public <T> void deleteAll(Class<T> entityClass, Instant beforeLastUpdate) {
Assert.notNull(entityClass, "Class must not be null!");
deleteAll(getSetName(entityClass), beforeLastUpdate);
}

@Override
public void deleteAll(String setName) {
Assert.notNull(setName, "Set name must not be null!");
deleteAll(setName, null);
}

@Override
public void deleteAll(String setName, Instant beforeLastUpdate) {
Assert.notNull(setName, "Set name must not be null!");
Calendar beforeLastUpdateCalendar = convertToCalendar(beforeLastUpdate);

try {
client.truncate(null, getNamespace(), setName, null);
client.truncate(null, getNamespace(), setName, beforeLastUpdateCalendar);
} catch (AerospikeException e) {
throw translateError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@
import org.springframework.data.mapping.model.ConvertingPropertyAccessor;
import org.springframework.util.Assert;

import java.time.Instant;
import java.util.Calendar;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -391,6 +394,26 @@ private <S> S convertIfNecessary(Object source, Class<S> type) {
: converter.getConversionService().convert(source, type);
}

protected Instant convertToInstant(Long millis) {
if (millis == null) return null;

if (millis >= Instant.now().toEpochMilli())
throw new IllegalArgumentException("Last update time (%d) must be less than the current time"
.formatted(millis));
return Instant.ofEpochMilli(millis);
}

protected Calendar convertToCalendar(Instant instant) {
if (instant == null) return null;

Calendar calendar = Calendar.getInstance();
if (instant.toEpochMilli() > calendar.getTimeInMillis())
throw new IllegalArgumentException("Last update time (%d) must be less than the current time"
.formatted(instant.toEpochMilli()));
calendar.setTime(Date.from(instant));
return calendar;
}

protected Operation[] getPutAndGetHeaderOperations(AerospikeWriteData data, boolean firstlyDeleteBins) {
Bin[] bins = data.getBinsAsArray();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Instant;
import java.util.Collection;
import java.util.Map;
import java.util.function.Supplier;
Expand Down Expand Up @@ -507,6 +508,16 @@ public interface ReactiveAerospikeOperations {
*/
<T> Mono<Void> deleteAll(Class<T> entityClass);

/**
* Reactively truncate/delete all records in the set determined by the given entity class.
*
* @param entityClass The class to extract set name from. Must not be {@literal null}.
* @param beforeLastUpdate Delete records before the specified time (must be earlier than the current time at
* millisecond resolution).
* @throws DataAccessException If operation failed (see {@link DefaultAerospikeExceptionTranslator} for details).
*/
<T> Mono<Void> deleteAll(Class<T> entityClass, Instant beforeLastUpdate);

/**
* Reactively truncate/delete all the documents in the given set.
*
Expand All @@ -515,6 +526,16 @@ public interface ReactiveAerospikeOperations {
*/
Mono<Void> deleteAll(String setName);

/**
* Reactively truncate/delete all documents in the given set.
*
* @param setName Set name to truncate/delete all records in.
* @param beforeLastUpdate Delete records before the specified time (must be earlier than the current time at
* millisecond resolution).
* @throws DataAccessException If operation failed (see {@link DefaultAerospikeExceptionTranslator} for details).
*/
Mono<Void> deleteAll(String setName, Instant beforeLastUpdate);

/**
* Find an existing record matching the document's class and id, add map values to the corresponding bins of the
* record and return the modified record mapped to the document's class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -548,16 +550,31 @@ private Mono<Void> deleteEntitiesByGroupedKeys(GroupedKeys groupedKeys) {
public <T> Mono<Void> deleteAll(Class<T> entityClass) {
Assert.notNull(entityClass, "Class must not be null!");

return deleteAll(getSetName(entityClass));
return deleteAll(getSetName(entityClass), null);
}

@Override
public <T> Mono<Void> deleteAll(Class<T> entityClass, Instant beforeLastUpdate) {
Assert.notNull(entityClass, "Class must not be null!");

return deleteAll(getSetName(entityClass), beforeLastUpdate);
}

@Override
public Mono<Void> deleteAll(String setName) {
Assert.notNull(setName, "Set name must not be null!");

return deleteAll(setName, null);
}

@Override
public Mono<Void> deleteAll(String setName, Instant beforeLastUpdate) {
Assert.notNull(setName, "Set name must not be null!");
Calendar beforeLastUpdateCalendar = convertToCalendar(beforeLastUpdate);

try {
return Mono.fromRunnable(
() -> reactorClient.getAerospikeClient().truncate(null, namespace, setName, null));
() -> reactorClient.getAerospikeClient().truncate(null, namespace, setName, beforeLastUpdateCalendar));
} catch (AerospikeException e) {
throw translateError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@
import org.springframework.data.aerospike.config.BlockingTestConfig;
import org.springframework.data.aerospike.config.CommonTestConfig;
import org.springframework.data.aerospike.core.AerospikeTemplate;
import org.springframework.data.aerospike.query.FilterOperation;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.aerospike.query.QueryEngine;
import org.springframework.data.aerospike.query.cache.IndexRefresher;
import org.springframework.data.aerospike.query.cache.IndexesCache;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;

import java.util.Collection;
import java.util.List;

import static org.springframework.data.aerospike.repository.query.CriteriaDefinition.AerospikeMetadata.LAST_UPDATE_TIME;

@SpringBootTest(
classes = {BlockingTestConfig.class, CommonTestConfig.class},
Expand Down Expand Up @@ -43,4 +49,14 @@ protected <T> void deleteOneByOne(Collection<T> collection) {
protected <T> void deleteOneByOne(Collection<T> collection, String setName) {
collection.forEach(item -> template.delete(item, setName));
}

protected <T> List<T> runLastUpdateTimeQuery(long lastUpdateTimeMillis, FilterOperation operation,
Class<T> entityClass) {
Qualifier lastUpdateTimeLtMillis = Qualifier.metadataBuilder()
.setMetadataField(LAST_UPDATE_TIME)
.setFilterOperation(operation)
.setValue1AsObj(lastUpdateTimeMillis * MILLIS_TO_NANO)
.build();
return template.find(new Query(lastUpdateTimeLtMillis), entityClass).toList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public abstract class BaseIntegrationTests {

public static final String DEFAULT_SET_NAME = "aerospike";
public static final String OVERRIDE_SET_NAME = "testSet1";
protected static final int MILLIS_TO_NANO = 1_000_000;

@Value("${embedded.aerospike.namespace}")
protected String namespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,17 @@
import org.springframework.data.aerospike.config.CommonTestConfig;
import org.springframework.data.aerospike.config.ReactiveTestConfig;
import org.springframework.data.aerospike.core.ReactiveAerospikeTemplate;
import org.springframework.data.aerospike.query.FilterOperation;
import org.springframework.data.aerospike.query.Qualifier;
import org.springframework.data.aerospike.query.cache.ReactorIndexRefresher;
import org.springframework.data.aerospike.repository.query.Query;
import org.springframework.data.aerospike.server.version.ServerVersionSupport;
import reactor.core.publisher.Flux;

import java.io.Serializable;
import java.util.List;

import static org.springframework.data.aerospike.repository.query.CriteriaDefinition.AerospikeMetadata.LAST_UPDATE_TIME;

@SpringBootTest(
classes = {ReactiveTestConfig.class, CommonTestConfig.class},
Expand Down Expand Up @@ -46,4 +52,14 @@ protected <T> void deleteAll(Iterable<T> iterable) {
protected <T> void deleteAll(Iterable<T> iterable, String setName) {
Flux.fromIterable(iterable).flatMap(item -> reactiveTemplate.delete(item, setName)).blockLast();
}

protected <T> List<T> runLastUpdateTimeQuery(long lastUpdateTimeMillis, FilterOperation operation,
Class<T> entityClass) {
Qualifier lastUpdateTimeLtMillis = Qualifier.metadataBuilder()
.setMetadataField(LAST_UPDATE_TIME)
.setFilterOperation(operation)
.setValue1AsObj(lastUpdateTimeMillis * MILLIS_TO_NANO)
.build();
return reactiveTemplate.find(new Query(lastUpdateTimeLtMillis), entityClass).collectList().block();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.data.aerospike.BaseBlockingIntegrationTests;
import org.springframework.data.aerospike.core.model.GroupedKeys;
import org.springframework.data.aerospike.query.FilterOperation;
import org.springframework.data.aerospike.sample.Customer;
import org.springframework.data.aerospike.sample.Person;
import org.springframework.data.aerospike.sample.SampleClasses.CollectionOfObjects;
import org.springframework.data.aerospike.sample.SampleClasses.CustomCollectionClassToDelete;
import org.springframework.data.aerospike.sample.SampleClasses.DocumentWithExpiration;
import org.springframework.data.aerospike.sample.SampleClasses.VersionedClass;
import org.springframework.data.aerospike.utility.AwaitilityUtils;
import org.springframework.test.context.TestPropertySource;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
Expand All @@ -49,6 +54,8 @@ public class AerospikeTemplateDeleteTests extends BaseBlockingIntegrationTests {
public void beforeEach() {
template.deleteAll(Person.class);
template.deleteAll(Customer.class);
template.deleteAll(VersionedClass.class);
template.deleteAll(CollectionOfObjects.class);
}

@Test
Expand Down Expand Up @@ -208,8 +215,7 @@ public void deleteByType_ShouldDeleteAllDocumentsWithCustomSetName() {

template.deleteAll(CustomCollectionClassToDelete.class);

// truncate is async operation that is why we need to wait until
// it completes
// truncate is async operation that is why we need to wait until it completes
await().atMost(TEN_SECONDS)
.untilAsserted(() -> assertThat(template.findByIds(Arrays.asList(id1, id2),
CustomCollectionClassToDelete.class)).isEmpty());
Expand Down Expand Up @@ -356,6 +362,58 @@ public void deleteAll_ShouldDeleteAllDocumentsWithSetName() {
}
}

@Test
public void deleteAll_ShouldDeleteAllDocumentsBeforeGivenLastUpdateTime() {
// batch delete operations are supported starting with Server version 6.0+
if (serverVersionSupport.batchWrite()) {
String id1 = nextId();
String id2 = nextId();
CollectionOfObjects document1 = new CollectionOfObjects(id1, List.of("test1"));
CollectionOfObjects document2 = new CollectionOfObjects(id2, List.of("test2"));

template.save(document1);
AwaitilityUtils.wait(1, MILLISECONDS);

Instant lastUpdateTime = Instant.now();
Instant inFuture = Instant.ofEpochMilli(lastUpdateTime.toEpochMilli() + 10000);
template.save(document2);

// make sure document1 has lastUpdateTime less than specified millis
List<CollectionOfObjects> resultsWithLutLtMillis =
runLastUpdateTimeQuery(lastUpdateTime.toEpochMilli(), FilterOperation.LT, CollectionOfObjects.class);
assertThat(resultsWithLutLtMillis.get(0).getId()).isEqualTo(document1.getId());
assertThat(resultsWithLutLtMillis.get(0).getCollection().iterator().next())
.isEqualTo(document1.getCollection().iterator().next());

assertThatThrownBy(() -> template.deleteAll(CollectionOfObjects.class, inFuture))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageMatching("Last update time (.*) must be less than the current time");

template.deleteAll(CollectionOfObjects.class, lastUpdateTime);
assertThat(template.findByIds(List.of(id1, id2), CollectionOfObjects.class)).hasSize(1);
CollectionOfObjects result = template.findByIds(List.of(id1, id2), CollectionOfObjects.class).get(0);
assertThat(result.getId()).isEqualTo(document2.getId());
assertThat(result.getCollection().iterator().next()).isEqualTo(document2.getCollection().iterator().next());

List<Person> persons = additionalAerospikeTestOperations.saveGeneratedPersons(101);
AwaitilityUtils.wait(1, MILLISECONDS);
lastUpdateTime = Instant.now();
AwaitilityUtils.wait(1, MILLISECONDS);
Person newPerson = new Person(nextId(), "testFirstName");
template.save(newPerson);
persons.add(newPerson);

template.deleteAll(template.getSetName(Person.class), lastUpdateTime);
List<String> personsIds = persons.stream().map(Person::getId).toList();
assertThat(template.findByIds(personsIds, Person.class)).contains(newPerson);

List<Person> persons2 = additionalAerospikeTestOperations.saveGeneratedPersons(1001);
template.deleteAll(Person.class, lastUpdateTime); // persons2 were saved after the given time
personsIds = persons2.stream().map(Person::getId).toList();
assertThat(template.findByIds(personsIds, Person.class)).containsExactlyElementsOf(persons2);
}
}

@Test
public void deleteAll_VersionsMismatch() {
// batch delete operations are supported starting with Server version 6.0+
Expand Down
Loading
Loading