Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(datastore): Observe Query should not return deleted items, fixes - #2069 #2522

Merged
merged 17 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,42 +132,50 @@ public void observeQuery(
Objects.requireNonNull(onObservationStarted);
Objects.requireNonNull(onObservationError);
Objects.requireNonNull(onObservationComplete);
onObservationStarted.accept(this);

Consumer<Object> onItemChanged = value -> {

@SuppressWarnings("unchecked")
@SuppressWarnings("unchecked")
StorageItemChange<T> itemChanged = (StorageItemChange<T>) value;
try {
if (sqlQueryProcessor.modelExists(itemChanged.item(), options.getQueryPredicate())) {
updateCompleteItemMap(itemChanged);
} else if (itemChanged.type() == StorageItemChange.Type.UPDATE) {
completeItemMap.remove(itemChanged.item().getPrimaryKeyString());
} else if (itemChanged.type() == StorageItemChange.Type.DELETE) {
completeItemMap.remove(itemChanged.item().getPrimaryKeyString());
}
collect(itemChanged, onQuerySnapshot, itemClass, options, onObservationError);
} catch (DataStoreException exception) {
onObservationError.accept(exception);
}
};
threadPool.submit(() -> queryLocalData(itemClass, options, onQuerySnapshot, onObservationError));

disposable = itemChangeSubject
.filter(x -> x.item().getClass().isAssignableFrom(itemClass))
.subscribe(
onItemChanged::accept,
failure -> {
if (failure instanceof DataStoreException) {
onObservationError.accept((DataStoreException) failure);
return;
}
onObservationError.accept(new DataStoreException(
"Failed to observe items in storage adapter.",
failure,
"Inspect the failure details."
));
},
onObservationComplete::call
);
threadPool.submit(() -> queryLocalData(
itemClass,
options,
value -> {
disposable = itemChangeSubject
.filter(x -> x.item().getClass().isAssignableFrom(itemClass))
.subscribe(
onItemChanged::accept,
failure -> {
if (failure instanceof DataStoreException) {
onObservationError.accept((DataStoreException) failure);
return;
}
onObservationError.accept(new DataStoreException(
"Failed to observe items in storage adapter.",
failure,
"Inspect the failure details."
));
},
onObservationComplete::call
);
onObservationStarted.accept(this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Is it strictly necessary for the onObservationStarted to be so late?

There is a slight behaviour change here in that if the initial query fails the caller will never see the onStarted callback, only onError (prior behaviour was to see onStarted and then onError). I'm not sure that is a problem, in fact it may be more correct than the prior implementation.

I was also looking at whether it would be possible for onItemChange to get invoked prior on onObservationStarted, it's a bit ambiguous to me but I don't think so?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had requested this change after diving in together because it was easy to replicate incorrect values and counts if mutations were made before the initial query was successful. Once we report onObservationStarted, my expectation is that no observations would be missed once that value is returned. This was not the case with the immediate callback.

If completeItemMap is empty (which is the case before its populated with initial values from a successful query, any item changes (such as a delete) will not properly be updated/removed from competeItemMap.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks.

onQuerySnapshot.accept(value);
},
onObservationError
));
}

private void queryLocalData(@NonNull Class<T> itemClass,
Expand All @@ -176,15 +184,15 @@ private void queryLocalData(@NonNull Class<T> itemClass,
@NonNull Consumer<DataStoreException> onObservationError) {
List<T> models = sqlQueryProcessor.queryOfflineData(itemClass,
Where.matchesAndSorts(options.getQueryPredicate(),
options.getSortBy()), onObservationError);
options.getSortBy()), onObservationError);
Consumer<DataStoreException> onQueryError = value -> {
cancel();
onObservationError.accept(value);
};
callOnQuerySnapshot(onQuerySnapshot, itemClass, onQueryError, models);
for (T model : models) {
completeItemMap.put(model.getPrimaryKeyString(), model);
}
callOnQuerySnapshot(onQuerySnapshot, itemClass, onQueryError, models);
}

/***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public void observeQueryReturnsRecordsBasedOnMaxTime() throws InterruptedExcepti
List<BlogOwner> datastoreResultList = new ArrayList<>();
int maxRecords = 50;
datastoreResultList.add(blogOwner);
Consumer<Cancelable> observationStarted = NoOpConsumer.create();
SyncStatus mockSyncStatus = mock(SyncStatus.class);
when(mockSyncStatus.get(any(), any())).thenReturn(false);
Subject<StorageItemChange<? extends Model>> subject =
Expand All @@ -132,6 +131,27 @@ public void observeQueryReturnsRecordsBasedOnMaxTime() throws InterruptedExcepti
count.getAndIncrement();
};
Consumer<DataStoreException> onObservationError = NoOpConsumer.create();
Consumer<Cancelable> observationStarted = value -> {
for (int i = 0; i < 5; i++) {
BlogOwner itemChange = BlogOwner.builder()
.name("Alan Turing" + i)
.build();
try {
subject.onNext(StorageItemChange.<BlogOwner>builder()
.changeId(UUID.randomUUID().toString())
.initiator(StorageItemChange.Initiator.SYNC_ENGINE)
.item(itemChange)
.patchItem(SerializedModel.create(itemChange,
ModelSchema.fromModelClass(BlogOwner.class)))
.modelSchema(ModelSchema.fromModelClass(BlogOwner.class))
.predicate(QueryPredicates.all())
.type(StorageItemChange.Type.UPDATE)
.build());
} catch (AmplifyException exception) {
exception.printStackTrace();
}
}
};
Action onObservationComplete = () -> { };
SqlQueryProcessor mockSqlQueryProcessor = mock(SqlQueryProcessor.class);
when(mockSqlQueryProcessor.queryOfflineData(eq(BlogOwner.class), any(), any()))
Expand All @@ -153,26 +173,6 @@ public void observeQueryReturnsRecordsBasedOnMaxTime() throws InterruptedExcepti
onObservationError,
onObservationComplete);
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
for (int i = 0; i < 5; i++) {
BlogOwner itemChange = BlogOwner.builder()
.name("Alan Turing" + i)
//.id("" + i + "")
.build();
try {
subject.onNext(StorageItemChange.<BlogOwner>builder()
.changeId(UUID.randomUUID().toString())
.initiator(StorageItemChange.Initiator.SYNC_ENGINE)
.item(itemChange)
.patchItem(SerializedModel.create(itemChange,
ModelSchema.fromModelClass(BlogOwner.class)))
.modelSchema(ModelSchema.fromModelClass(BlogOwner.class))
.predicate(QueryPredicates.all())
.type(StorageItemChange.Type.UPDATE)
.build());
} catch (AmplifyException exception) {
exception.printStackTrace();
}
}
Assert.assertTrue(changeLatch.await(5, TimeUnit.SECONDS));
}

Expand Down Expand Up @@ -264,4 +264,72 @@ public void observeQueryCancelsTheOperationOnQueryError() throws DataStoreExcept
onObservationError,
onObservationComplete);
}

/***
* Observe Query Should not return deleted record.
* @throws InterruptedException InterruptedException
* @throws DataStoreException DataStoreException
*/
@Test
public void observeQueryShouldNotReturnDeletedRecord() throws InterruptedException, DataStoreException {
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch changeLatch = new CountDownLatch(1);
BlogOwner blogOwner = BlogOwner.builder()
.name("Alan Turing")
.build();
List<BlogOwner> datastoreResultList = new ArrayList<>();
int maxRecords = 50;
datastoreResultList.add(blogOwner);
SyncStatus mockSyncStatus = mock(SyncStatus.class);
when(mockSyncStatus.get(any(), any())).thenReturn(true);
Subject<StorageItemChange<? extends Model>> subject =
PublishSubject.<StorageItemChange<? extends Model>>create().toSerialized();
Consumer<DataStoreQuerySnapshot<BlogOwner>> onQuerySnapshot = value -> {
if (latch.getCount() > 0) {
Assert.assertTrue(value.getItems().contains(blogOwner));
latch.countDown();
} else if (latch.getCount() == 0) {
Assert.assertFalse(value.getItems().contains(blogOwner));
changeLatch.countDown();
}
};
Consumer<Cancelable> observationStarted = value -> {
try {
subject.onNext(StorageItemChange.<BlogOwner>builder()
.changeId(UUID.randomUUID().toString())
.initiator(StorageItemChange.Initiator.SYNC_ENGINE)
.item(blogOwner)
.patchItem(SerializedModel.create(blogOwner,
ModelSchema.fromModelClass(BlogOwner.class)))
.modelSchema(ModelSchema.fromModelClass(BlogOwner.class))
.predicate(QueryPredicates.all())
.type(StorageItemChange.Type.DELETE)
.build());
} catch (AmplifyException exception) {
exception.printStackTrace();
}
};
Consumer<DataStoreException> onObservationError = NoOpConsumer.create();
Action onObservationComplete = () -> { };
SqlQueryProcessor mockSqlQueryProcessor = mock(SqlQueryProcessor.class);
when(mockSqlQueryProcessor.queryOfflineData(eq(BlogOwner.class), any(), any()))
.thenReturn(datastoreResultList);
when(mockSqlQueryProcessor.modelExists(any(), any())).thenReturn(false);
ExecutorService threadPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 5);
ObserveQueryExecutor<BlogOwner> observeQueryExecutor = new ObserveQueryExecutor<>(subject,
mockSqlQueryProcessor,
threadPool,
mockSyncStatus,
new ModelSorter<>(),
maxRecords, 2);
observeQueryExecutor.observeQuery(
BlogOwner.class,
new ObserveQueryOptions(), observationStarted,
onQuerySnapshot,
onObservationError,
onObservationComplete);
Assert.assertTrue(latch.await(1, TimeUnit.SECONDS));
Assert.assertTrue(changeLatch.await(10, TimeUnit.SECONDS));
}
}