Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Query Profiling #1609

Merged
merged 42 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7978c13
Update protos
ehsannas Oct 17, 2023
093c230
Query profiling APIs.
ehsannas Jul 25, 2023
509891b
Add explain() and explainAnalyze() implementation for Query.
ehsannas Oct 17, 2023
e1a2cd4
WIP: Add explain() and explainAnalyze() for aggregate queries.
ehsannas Oct 17, 2023
9527010
Fix aggregate query profiling.
ehsannas Oct 18, 2023
6fcc3fe
format.
ehsannas Oct 18, 2023
83e68b3
Update the test.
ehsannas Oct 18, 2023
2638b3e
Don't use .* imports.
ehsannas Oct 18, 2023
803e17e
decodeStruct should take Nullable value.
ehsannas Oct 26, 2023
8b09a3e
Address comments.
ehsannas Nov 22, 2023
6c18db9
Format.
ehsannas Nov 22, 2023
ec1ec7f
introduce QueryPlan class to make it possible to add node-level info …
ehsannas Jan 3, 2024
d38d889
don't make QueryPlan c'tor public.
ehsannas Jan 4, 2024
8c2de21
Remove print statement.
ehsannas Jan 4, 2024
67bfe83
undo manual proto changes.
ehsannas Jan 9, 2024
2b0b099
Merge remote-tracking branch 'origin/main' into query_profiling_5_new
ehsannas Jan 9, 2024
61bb62a
Address feedback.
ehsannas Jan 11, 2024
8f584ff
address feedback.
ehsannas Jan 11, 2024
0b81cfd
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 11, 2024
32b6003
address feedback.
ehsannas Jan 11, 2024
c15d575
v2.
ehsannas Feb 10, 2024
803c7ff
Add ExplainMetrics. Rename Plan to PlanSummary.
ehsannas Feb 22, 2024
8617ea3
Update tests.
ehsannas Feb 22, 2024
c547b4b
Fixes for Query and AggregateQuery explain methods.
ehsannas Feb 22, 2024
f9220c7
fix up the implementation based on the new protos.
ehsannas Mar 5, 2024
9c87e97
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 5, 2024
544d1c0
Fix: explain a query with empty result set.
ehsannas Mar 8, 2024
2373b4e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 8, 2024
abf9c72
Add check for the regular query flow as well.
ehsannas Mar 8, 2024
029017d
Add explainStream API.
ehsannas Mar 8, 2024
328c18d
Update the explainStream API based on API feedback.
ehsannas Mar 11, 2024
7b209cd
merge Query.java and AggregateQuery.java with origin/main.
ehsannas Mar 11, 2024
de7fb63
Merge remote-tracking branch 'origin/main' into query_profiling_impl_…
ehsannas Mar 13, 2024
1367dc4
Remove bytesReturned from the API.
ehsannas Mar 13, 2024
556d81e
minor improvements.
ehsannas Mar 18, 2024
524db49
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 18, 2024
dccb7ce
Address feedback and add ExplainOptions unit tests.
ehsannas Mar 21, 2024
cb1d76a
Separate out the ResponseDeliverer logic for different flows.
ehsannas Mar 21, 2024
26d5ded
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 21, 2024
b37ff0e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Mar 25, 2024
fbe301c
Address feedback.
ehsannas Mar 25, 2024
847a047
Address feedback.
ehsannas Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-firestore'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-firestore:3.18.0'
implementation 'com.google.cloud:google-cloud-firestore:3.19.3'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-firestore" % "3.18.0"
libraryDependencies += "com.google.cloud" % "google-cloud-firestore" % "3.19.3"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -222,7 +222,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-firestore/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-firestore.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-firestore/3.18.0
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-firestore/3.19.3
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.collect.ImmutableMap;
import com.google.firestore.v1.RunAggregationQueryRequest;
import com.google.firestore.v1.RunAggregationQueryResponse;
import com.google.firestore.v1.RunQueryRequest;
Expand All @@ -40,7 +41,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand All @@ -49,9 +49,9 @@
public class AggregateQuery {
@Nonnull private final Query query;

@Nonnull private List<AggregateField> aggregateFieldList;
@Nonnull private final List<AggregateField> aggregateFieldList;

@Nonnull private Map<String, String> aliasMap;
@Nonnull private final Map<String, String> aliasMap;

AggregateQuery(@Nonnull Query query, @Nonnull List<AggregateField> aggregateFields) {
this.query = query;
Expand All @@ -75,6 +75,26 @@ public ApiFuture<AggregateQuerySnapshot> get() {
return get(null, null);
}

/**
* Plans and optionally executes this query. Returns an ApiFuture that will be resolved with the
* planner information, statistics from the query execution (if any), and the query results (if
* any).
*
* @return An ApiFuture that will be resolved with the planner information, statistics from the
* query execution (if any), and the query results (if any).
*/
@Nonnull
public ApiFuture<ExplainResults<AggregateQuerySnapshot>> explain(ExplainOptions options) {
AggregateQueryExplainResponseDeliverer responseDeliverer =
new AggregateQueryExplainResponseDeliverer(
/* transactionId= */ null,
/* readTime= */ null,
/* startTimeNanos= */ query.rpcContext.getClock().nanoTime(),
/* explainOptions= */ options);
runQuery(responseDeliverer);
return responseDeliverer.getFuture();
}

@Nonnull
ApiFuture<AggregateQuerySnapshot> get(
@Nullable final ByteString transactionId, @Nullable com.google.protobuf.Timestamp readTime) {
Expand All @@ -85,25 +105,34 @@ ApiFuture<AggregateQuerySnapshot> get(
return responseDeliverer.getFuture();
}

private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) {
private <T> void runQuery(ResponseDeliverer<T> responseDeliverer) {
RunAggregationQueryRequest request =
toProto(responseDeliverer.transactionId, responseDeliverer.readTime);
AggregateQueryResponseObserver responseObserver =
new AggregateQueryResponseObserver(responseDeliverer);
toProto(
responseDeliverer.getTransactionId(),
responseDeliverer.getReadTime(),
responseDeliverer.getExplainOptions());
AggregateQueryResponseObserver<T> responseObserver =
new AggregateQueryResponseObserver<T>(responseDeliverer);
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse> callable =
query.rpcContext.getClient().runAggregationQueryCallable();
query.rpcContext.streamRequest(request, responseObserver, callable);
}

private final class AggregateQueryResponseDeliverer {
@Nonnull
private Map<String, Value> convertServerAggregateFieldsMapToClientAggregateFieldsMap(
@Nonnull Map<String, Value> data) {
ImmutableMap.Builder<String, Value> builder = ImmutableMap.builder();
data.forEach((serverAlias, value) -> builder.put(aliasMap.get(serverAlias), value));
return builder.build();
}

@Nullable private final ByteString transactionId;
@Nullable private final com.google.protobuf.Timestamp readTime;
private abstract static class ResponseDeliverer<T> {
private final @Nullable ByteString transactionId;
private final @Nullable com.google.protobuf.Timestamp readTime;
private final long startTimeNanos;
private final SettableApiFuture<AggregateQuerySnapshot> future = SettableApiFuture.create();
private final AtomicBoolean isFutureCompleted = new AtomicBoolean(false);
private final SettableApiFuture<T> future = SettableApiFuture.create();

AggregateQueryResponseDeliverer(
ResponseDeliverer(
@Nullable ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime,
long startTimeNanos) {
Expand All @@ -112,52 +141,148 @@ private final class AggregateQueryResponseDeliverer {
this.startTimeNanos = startTimeNanos;
}

ApiFuture<AggregateQuerySnapshot> getFuture() {
@Nullable
ByteString getTransactionId() {
return transactionId;
}

@Nullable
com.google.protobuf.Timestamp getReadTime() {
return readTime;
}

long getStartTimeNanos() {
return startTimeNanos;
}

@Nullable
ExplainOptions getExplainOptions() {
return null;
}

ApiFuture<T> getFuture() {
return future;
}

void deliverResult(@Nonnull Map<String, Value> data, Timestamp readTime) {
if (isFutureCompleted.compareAndSet(false, true)) {
Map<String, Value> mappedData = new HashMap<>();
data.forEach((serverAlias, value) -> mappedData.put(aliasMap.get(serverAlias), value));
future.set(new AggregateQuerySnapshot(AggregateQuery.this, readTime, mappedData));
}
protected void setFuture(T value) {
future.set(value);
}

void deliverError(Throwable throwable) {
if (isFutureCompleted.compareAndSet(false, true)) {
future.setException(throwable);
future.setException(throwable);
}

abstract void deliverResult(
@Nullable Map<String, Value> serverData,
Timestamp readTime,
@Nullable ExplainMetrics metrics);
}

private class AggregateQueryResponseDeliverer extends ResponseDeliverer<AggregateQuerySnapshot> {
AggregateQueryResponseDeliverer(
@Nullable ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime,
long startTimeNanos) {
super(transactionId, readTime, startTimeNanos);
}

@Override
void deliverResult(
@Nullable Map<String, Value> serverData,
Timestamp readTime,
@Nullable ExplainMetrics metrics) {
if (serverData == null) {
deliverError(new RuntimeException("Did not receive any aggregate query results."));
return;
}
setFuture(
new AggregateQuerySnapshot(
AggregateQuery.this,
readTime,
convertServerAggregateFieldsMapToClientAggregateFieldsMap(serverData)));
}
}

private final class AggregateQueryResponseObserver
implements ResponseObserver<RunAggregationQueryResponse> {
private final class AggregateQueryExplainResponseDeliverer
extends ResponseDeliverer<ExplainResults<AggregateQuerySnapshot>> {
private final @Nullable ExplainOptions explainOptions;

private final AggregateQueryResponseDeliverer responseDeliverer;
private StreamController streamController;
AggregateQueryExplainResponseDeliverer(
@Nullable ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime,
long startTimeNanos,
@Nullable ExplainOptions explainOptions) {
super(transactionId, readTime, startTimeNanos);
this.explainOptions = explainOptions;
}

AggregateQueryResponseObserver(AggregateQueryResponseDeliverer responseDeliverer) {
this.responseDeliverer = responseDeliverer;
@Override
@Nullable
ExplainOptions getExplainOptions() {
return explainOptions;
}

@Override
public void onStart(StreamController streamController) {
this.streamController = streamController;
void deliverResult(
@Nullable Map<String, Value> serverData,
Timestamp readTime,
@Nullable ExplainMetrics metrics) {
// The server is required to provide ExplainMetrics for explain queries.
if (metrics == null) {
deliverError(new RuntimeException("Did not receive any metrics for explain query."));
return;
}
AggregateQuerySnapshot snapshot =
serverData == null
? null
: new AggregateQuerySnapshot(
AggregateQuery.this,
readTime,
convertServerAggregateFieldsMapToClientAggregateFieldsMap(serverData));
setFuture(new ExplainResults<>(metrics, snapshot));
}
}

private final class AggregateQueryResponseObserver<T>
implements ResponseObserver<RunAggregationQueryResponse> {
private final ResponseDeliverer<T> responseDeliverer;
private Timestamp readTime = Timestamp.MAX_VALUE;
@Nullable private Map<String, Value> aggregateFieldsMap = null;
@Nullable private ExplainMetrics metrics = null;

AggregateQueryResponseObserver(ResponseDeliverer<T> responseDeliverer) {
this.responseDeliverer = responseDeliverer;
}

private boolean isExplainQuery() {
return this.responseDeliverer.getExplainOptions() != null;
}

@Override
public void onStart(StreamController streamController) {}

@Override
public void onResponse(RunAggregationQueryResponse response) {
// Close the stream to avoid it dangling, since we're not expecting any more responses.
streamController.cancel();
tom-andersen marked this conversation as resolved.
Show resolved Hide resolved
if (response.hasReadTime()) {
readTime = Timestamp.fromProto(response.getReadTime());
}

if (response.hasResult()) {
aggregateFieldsMap = response.getResult().getAggregateFieldsMap();
}

// Extract the aggregations and read time from the RunAggregationQueryResponse.
Timestamp readTime = Timestamp.fromProto(response.getReadTime());
if (response.hasExplainMetrics()) {
metrics = new ExplainMetrics(response.getExplainMetrics());
}

// Deliver the result; even though the `RunAggregationQuery` RPC is a "streaming" RPC, meaning
// that `onResponse()` can be called multiple times, it _should_ only be called once. But even
// if it is called more than once, `responseDeliverer` will drop superfluous results.
responseDeliverer.deliverResult(response.getResult().getAggregateFieldsMap(), readTime);
if (!isExplainQuery()) {
// Deliver the result; even though the `RunAggregationQuery` RPC is a "streaming" RPC,
// meaning that `onResponse()` can be called multiple times, it _should_ only be called
// once for non-explain queries. But even if it is called more than once,
// `responseDeliverer` will drop superfluous results. For explain queries, there will
// be more than one response, and the last response will contain the metrics.
onComplete();
}
tom-andersen marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand All @@ -170,17 +295,26 @@ public void onError(Throwable throwable) {
}

private boolean shouldRetry(Throwable throwable) {
// Do not retry EXPLAIN requests because it'd be executing
// multiple queries. This means stats would have to be aggregated,
// and that may not even make sense for many statistics.
if (isExplainQuery()) {
return false;
}

Set<StatusCode.Code> retryableCodes =
FirestoreSettings.newBuilder().runAggregationQuerySettings().getRetryableCodes();
return query.shouldRetryQuery(
throwable,
responseDeliverer.transactionId,
responseDeliverer.startTimeNanos,
responseDeliverer.getTransactionId(),
responseDeliverer.getStartTimeNanos(),
retryableCodes);
}

@Override
public void onComplete() {}
public void onComplete() {
responseDeliverer.deliverResult(aggregateFieldsMap, readTime, metrics);
}
}

/**
Expand All @@ -191,13 +325,14 @@ public void onComplete() {}
*/
@Nonnull
public RunAggregationQueryRequest toProto() {
return toProto(null, null);
return toProto(/* transactionId= */ null, /* readTime= */ null, /* explainOptions= */ null);
}

@Nonnull
RunAggregationQueryRequest toProto(
@Nullable final ByteString transactionId,
@Nullable final com.google.protobuf.Timestamp readTime) {
@Nullable final com.google.protobuf.Timestamp readTime,
@Nullable ExplainOptions explainOptions) {
RunQueryRequest runQueryRequest = query.toProto();

RunAggregationQueryRequest.Builder request = RunAggregationQueryRequest.newBuilder();
Expand All @@ -209,6 +344,10 @@ RunAggregationQueryRequest toProto(
request.setReadTime(readTime);
}

if (explainOptions != null) {
request.setExplainOptions(explainOptions.toProto());
}

StructuredAggregationQuery.Builder structuredAggregationQuery =
request.getStructuredAggregationQueryBuilder();
structuredAggregationQuery.setStructuredQuery(runQueryRequest.getStructuredQuery());
Expand Down
Loading
Loading