Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query continuation implemented for GqlQuery + tests #273

Merged
merged 3 commits into from
Oct 22, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ protected void populatePb(com.google.datastore.v1beta3.RunQueryRequest.Builder r
}

@Override
protected GqlQuery<V> nextQuery(com.google.datastore.v1beta3.QueryResultBatch responsePb) {
// See issue #17
throw new UnsupportedOperationException("paging for this query is not implemented yet");
protected Query<V> nextQuery(com.google.datastore.v1beta3.RunQueryResponse responsePb) {
return StructuredQuery.<V>fromPb(type(), namespace(), responsePb.getQuery())
.nextQuery(responsePb);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ protected abstract Object fromPb(ResultType<V> resultType, String namespace, byt
protected abstract void populatePb(
com.google.datastore.v1beta3.RunQueryRequest.Builder requestPb);

protected abstract Query<V> nextQuery(com.google.datastore.v1beta3.QueryResultBatch responsePb);
protected abstract Query<V> nextQuery(com.google.datastore.v1beta3.RunQueryResponse responsePb);

/**
* Returns a new {@link GqlQuery} builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class QueryResultsImpl<T> extends AbstractIterator<T> implements QueryResults<T>
private final ResultType<T> queryResultType;
private Query<T> query;
private ResultType<?> actualResultType;
private com.google.datastore.v1beta3.QueryResultBatch queryResultBatchPb;
private com.google.datastore.v1beta3.RunQueryResponse runQueryResponsePb;
private com.google.datastore.v1beta3.Query mostRecentQueryPb;
private boolean lastBatch;
private Iterator<com.google.datastore.v1beta3.EntityResult> entityResultPbIter;
Expand All @@ -56,8 +56,8 @@ class QueryResultsImpl<T> extends AbstractIterator<T> implements QueryResults<T>
}
partitionIdPb = pbBuilder.build();
sendRequest();
if (queryResultBatchPb.getSkippedResults() > 0) {
cursor = queryResultBatchPb.getSkippedCursor();
if (runQueryResponsePb.getBatch().getSkippedResults() > 0) {
cursor = runQueryResponsePb.getBatch().getSkippedCursor();
} else {
cursor = mostRecentQueryPb.getStartCursor();
}
Expand All @@ -71,16 +71,14 @@ private void sendRequest() {
}
requestPb.setPartitionId(partitionIdPb);
query.populatePb(requestPb);
com.google.datastore.v1beta3.RunQueryResponse runQueryResponsePb =
datastore.runQuery(requestPb.build());
queryResultBatchPb = runQueryResponsePb.getBatch();
runQueryResponsePb = datastore.runQuery(requestPb.build());
mostRecentQueryPb = runQueryResponsePb.getQuery();
if (mostRecentQueryPb == null) {
mostRecentQueryPb = requestPb.getQuery();
}
lastBatch = queryResultBatchPb.getMoreResults() != MoreResultsType.NOT_FINISHED;
entityResultPbIter = queryResultBatchPb.getEntityResultsList().iterator();
actualResultType = ResultType.fromPb(queryResultBatchPb.getEntityResultType());
lastBatch = runQueryResponsePb.getBatch().getMoreResults() != MoreResultsType.NOT_FINISHED;
entityResultPbIter = runQueryResponsePb.getBatch().getEntityResultsList().iterator();
actualResultType = ResultType.fromPb(runQueryResponsePb.getBatch().getEntityResultType());
if (Objects.equals(queryResultType, ResultType.PROJECTION_ENTITY)) {
// projection entity can represent all type of results
actualResultType = ResultType.PROJECTION_ENTITY;
Expand All @@ -92,7 +90,7 @@ private void sendRequest() {
@Override
protected T computeNext() {
while (!entityResultPbIter.hasNext() && !lastBatch) {
query = query.nextQuery(queryResultBatchPb);
query = query.nextQuery(runQueryResponsePb);
sendRequest();
}
if (!entityResultPbIter.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -844,16 +844,16 @@ protected void populatePb(com.google.datastore.v1beta3.RunQueryRequest.Builder r
}

@Override
protected StructuredQuery<V> nextQuery(com.google.datastore.v1beta3.QueryResultBatch responsePb) {
protected Query<V> nextQuery(com.google.datastore.v1beta3.RunQueryResponse responsePb) {
Builder<V> builder = new Builder<>(type());
builder.mergeFrom(toPb());
builder.startCursor(new Cursor(responsePb.getEndCursor()));
if (offset > 0 && responsePb.getSkippedResults() < offset) {
builder.offset(offset - responsePb.getSkippedResults());
builder.startCursor(new Cursor(responsePb.getBatch().getEndCursor()));
if (offset > 0 && responsePb.getBatch().getSkippedResults() < offset) {
builder.offset(offset - responsePb.getBatch().getSkippedResults());
} else {
builder.offset(0);
if (limit != null) {
builder.limit(limit - responsePb.getEntityResultsCount());
builder.limit(limit - responsePb.getBatch().getEntityResultsCount());
}
}
return builder.build();
Expand Down Expand Up @@ -904,7 +904,9 @@ protected Object fromPb(ResultType<V> resultType, String namespace, byte[] bytes
return fromPb(resultType, namespace, com.google.datastore.v1beta3.Query.parseFrom(bytesPb));
}

private static StructuredQuery<?> fromPb(ResultType<?> resultType, String namespace,
@SuppressWarnings("unchecked")
static <V> StructuredQuery<V> fromPb(
ResultType<?> resultType, String namespace,
com.google.datastore.v1beta3.Query queryPb) {
BaseBuilder<?, ?> builder;
if (resultType.equals(ResultType.ENTITY)) {
Expand All @@ -914,6 +916,6 @@ private static StructuredQuery<?> fromPb(ResultType<?> resultType, String namesp
} else {
builder = new ProjectionEntityQueryBuilder();
}
return builder.namespace(namespace).mergeFrom(queryPb).build();
return (StructuredQuery<V>) builder.namespace(namespace).mergeFrom(queryPb).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.gcloud.datastore.StructuredQuery.OrderBy;
import com.google.gcloud.datastore.StructuredQuery.PropertyFilter;
import com.google.gcloud.spi.DatastoreRpc;
import com.google.gcloud.spi.DatastoreRpc.DatastoreRpcException;
import com.google.gcloud.spi.DatastoreRpc.DatastoreRpcException.Reason;
import com.google.gcloud.spi.DatastoreRpcFactory;

Expand All @@ -43,6 +44,7 @@
import org.junit.runners.JUnit4;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -403,6 +405,38 @@ public void testRunGqlQueryWithCasting() {
assertFalse(results3.hasNext());
}

@Test
public void testGqlQueryPagination() throws DatastoreRpcException {
DatastoreRpcFactory rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class);
DatastoreRpc rpcMock = EasyMock.createStrictMock(DatastoreRpc.class);
EasyMock.expect(rpcFactoryMock.create(EasyMock.anyObject(DatastoreOptions.class)))
.andReturn(rpcMock);
List<com.google.datastore.v1beta3.RunQueryResponse> responses =
buildResponsesForQueryPagination();
for (int i = 0; i < responses.size(); i++) {
EasyMock
.expect(rpcMock.runQuery(
EasyMock.anyObject(com.google.datastore.v1beta3.RunQueryRequest.class)))
.andReturn(responses.get(i));
}
EasyMock.replay(rpcFactoryMock, rpcMock);
DatastoreOptions options =
this.options.toBuilder()
.retryParams(RetryParams.getDefaultInstance())
.serviceRpcFactory(rpcFactoryMock)
.build();
Datastore mockDatastore = DatastoreFactory.instance().get(options);
QueryResults<Key> results =
mockDatastore.run(Query.gqlQueryBuilder(ResultType.KEY, "select __key__ from *").build());
int count = 0;
while (results.hasNext()) {
count += 1;
results.next();
}
assertEquals(count, 5);
EasyMock.verify(rpcFactoryMock, rpcMock);
}

@Test
public void testRunStructuredQuery() {
Query<Entity> query =
Expand Down Expand Up @@ -445,7 +479,92 @@ public void testRunStructuredQuery() {
assertEquals(20, entity.getLong("age"));
assertEquals(1, entity.properties().size());
assertFalse(results4.hasNext());
// TODO(ozarov): construct a test to verify nextQuery/pagination
}

@Test
public void testStructuredQueryPagination() throws DatastoreRpcException {
DatastoreRpcFactory rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class);
DatastoreRpc rpcMock = EasyMock.createStrictMock(DatastoreRpc.class);
EasyMock.expect(rpcFactoryMock.create(EasyMock.anyObject(DatastoreOptions.class)))
.andReturn(rpcMock);
List<com.google.datastore.v1beta3.RunQueryResponse> responses =
buildResponsesForQueryPagination();
for (int i = 0; i < responses.size(); i++) {
EasyMock
.expect(rpcMock.runQuery(
EasyMock.anyObject(com.google.datastore.v1beta3.RunQueryRequest.class)))
.andReturn(responses.get(i));
}
EasyMock.replay(rpcFactoryMock, rpcMock);
DatastoreOptions options =
this.options.toBuilder()
.retryParams(RetryParams.getDefaultInstance())
.serviceRpcFactory(rpcFactoryMock)
.build();
Datastore mockDatastore = DatastoreFactory.instance().get(options);
QueryResults<Key> results = mockDatastore.run(Query.keyQueryBuilder().build());
int count = 0;
while (results.hasNext()) {
count += 1;
results.next();
}
assertEquals(count, 5);
EasyMock.verify(rpcFactoryMock, rpcMock);
}

private List<com.google.datastore.v1beta3.RunQueryResponse> buildResponsesForQueryPagination() {

This comment was marked as spam.

Entity entity4 = Entity.builder(KEY4).set("value", StringValue.of("value")).build();
Entity entity5 = Entity.builder(KEY5).set("value", "value").build();
datastore.add(ENTITY3, entity4, entity5);
List<com.google.datastore.v1beta3.RunQueryResponse> responses = new ArrayList<>();
Query<Key> query = Query.keyQueryBuilder().build();
com.google.datastore.v1beta3.RunQueryRequest.Builder requestPb =
com.google.datastore.v1beta3.RunQueryRequest.newBuilder();
query.populatePb(requestPb);
com.google.datastore.v1beta3.QueryResultBatch queryResultBatchPb =
com.google.datastore.v1beta3.RunQueryResponse.newBuilder()
.mergeFrom(((DatastoreImpl) datastore).runQuery(requestPb.build()))
.getBatch();
com.google.datastore.v1beta3.QueryResultBatch queryResultBatchPb1 =
com.google.datastore.v1beta3.QueryResultBatch.newBuilder()
.mergeFrom(queryResultBatchPb)
.setMoreResults(
com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED)
.clearEntityResults()
.addAllEntityResults(queryResultBatchPb.getEntityResultsList().subList(0, 1))
.setEndCursor(queryResultBatchPb.getEntityResultsList().get(0).getCursor())
.build();
responses.add(
com.google.datastore.v1beta3.RunQueryResponse.newBuilder()
.setBatch(queryResultBatchPb1)
.build());
com.google.datastore.v1beta3.QueryResultBatch queryResultBatchPb2 =
com.google.datastore.v1beta3.QueryResultBatch.newBuilder()
.mergeFrom(queryResultBatchPb)
.setMoreResults(
com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED)
.clearEntityResults()
.addAllEntityResults(queryResultBatchPb.getEntityResultsList().subList(1, 3))
.setEndCursor(queryResultBatchPb.getEntityResultsList().get(2).getCursor())
.build();
responses.add(
com.google.datastore.v1beta3.RunQueryResponse.newBuilder()
.setBatch(queryResultBatchPb2)
.build());
com.google.datastore.v1beta3.QueryResultBatch queryResultBatchPb3 =
com.google.datastore.v1beta3.QueryResultBatch.newBuilder()
.mergeFrom(queryResultBatchPb)
.setMoreResults(
com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NO_MORE_RESULTS)
.clearEntityResults()
.addAllEntityResults(queryResultBatchPb.getEntityResultsList().subList(3, 5))
.setEndCursor(queryResultBatchPb.getEntityResultsList().get(4).getCursor())
.build();
responses.add(
com.google.datastore.v1beta3.RunQueryResponse.newBuilder()
.setBatch(queryResultBatchPb3)
.build());
return responses;
}

@Test
Expand Down