Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new stages. #1908

Merged
merged 7 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@
import com.google.cloud.firestore.pipeline.stages.Limit;
import com.google.cloud.firestore.pipeline.stages.Offset;
import com.google.cloud.firestore.pipeline.stages.RemoveFields;
import com.google.cloud.firestore.pipeline.stages.Replace;
import com.google.cloud.firestore.pipeline.stages.Sample;
import com.google.cloud.firestore.pipeline.stages.SampleOptions;
import com.google.cloud.firestore.pipeline.stages.Select;
import com.google.cloud.firestore.pipeline.stages.Sort;
import com.google.cloud.firestore.pipeline.stages.Stage;
import com.google.cloud.firestore.pipeline.stages.StageUtils;
import com.google.cloud.firestore.pipeline.stages.Union;
import com.google.cloud.firestore.pipeline.stages.Unnest;
import com.google.cloud.firestore.pipeline.stages.UnnestOptions;
import com.google.cloud.firestore.pipeline.stages.Where;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
Expand All @@ -54,6 +60,7 @@
import com.google.firestore.v1.ExecutePipelineRequest;
import com.google.firestore.v1.ExecutePipelineResponse;
import com.google.firestore.v1.StructuredPipeline;
import com.google.firestore.v1.Value;
import com.google.protobuf.ByteString;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracing;
Expand Down Expand Up @@ -586,6 +593,297 @@ public Pipeline sort(Ordering... orders) {
return append(new Sort(orders));
}

/**
* Fully overwrites all fields in a document with those coming from a nested map.
*
* <p>This stage allows you to emit a map value as a document. Each key of the map becomes a field
* on the document that contains the corresponding value.
*
* <p>Example:
*
* <pre>{@code
* // Input.
* // {
* // "name": "John Doe Jr.",
* // "parents": {
* // "father": "John Doe Sr.",
* // "mother": "Jane Doe"
* // }
*
* // Emit parents as document.
* firestore.pipeline().collection("people").replace("parents");
*
* // Output
* // {
* // "father": "John Doe Sr.",
* // "mother": "Jane Doe"
* // }
* }</pre>
*
* @param fieldName The name of the field containing the nested map.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline replace(String fieldName) {
return replace(Field.of(fieldName));
}

/**
* Fully overwrites all fields in a document with those coming from a nested map.
*
* <p>This stage allows you to emit a map value as a document. Each key of the map becomes a field
* on the document that contains the corresponding value.
*
* <p>Example:
*
* <pre>{@code
* // Input.
* // {
* // "name": "John Doe Jr.",
* // "parents": {
* // "father": "John Doe Sr.",
* // "mother": "Jane Doe"
* // }
*
* // Emit parents as document.
* firestore.pipeline().collection("people").replace(Field.of("parents"));
*
* // Output
* // {
* // "father": "John Doe Sr.",
* // "mother": "Jane Doe"
* // }
* }</pre>
*
* @param field The {@link Selectable} field containing the nested map.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline replace(Selectable field) {
return append(new Replace(field));
}

/**
* Performs a pseudo-random sampling of the documents from the previous stage.
*
* <p>This stage will filter documents pseudo-randomly. The 'limit' parameter specifies the number
* of documents to emit from this stage, but if there are fewer documents from previous stage than
* the 'limit' parameter, then no filtering will occur and all documents will pass through.
*
* <p>Example:
*
* <pre>{@code
* // Sample 10 books, if available.
* firestore.pipeline().collection("books")
* .sample(10);
* }</pre>
*
* @param limit The number of documents to emit, if possible.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline sample(int limit) {
SampleOptions options = SampleOptions.docLimit(limit);
return sample(options);
}

/**
* Performs a pseudo-random sampling of the documents from the previous stage.
*
* <p>This stage will filter documents pseudo-randomly. The 'options' parameter specifies how
* sampling will be performed. See {@code SampleOptions} for more information.
*
* <p>Examples:
*
* <pre>{@code
* // Sample 10 books, if available.
* firestore.pipeline().collection("books")
* .sample(SampleOptions.docLimit(10));
*
* // Sample 50% of books.
* firestore.pipeline().collection("books")
* .sample(SampleOptions.percentage(0.5));
* }</pre>
*
* @param options The {@code SampleOptions} specifies how sampling is performed.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline sample(SampleOptions options) {
return append(new Sample(options));
}

/**
* Performs union of all documents from two pipelines, including duplicates.
*
* <p>This stage will pass through documents from previous stage, and also pass through documents
* from previous stage of the `other` {@code Pipeline} given in parameter. The order of documents
* emitted from this stage is undefined.
*
* <p>Example:
*
* <pre>{@code
* // Emit documents from books collection and magazines collection.
* firestore.pipeline().collection("books")
* .union(firestore.pipeline().collection("magazines"));
* }</pre>
*
* @param other The other {@code Pipeline} that is part of union.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline union(Pipeline other) {
return append(new Union(other));
}

/**
* Produces a document for each element in array found in previous stage document.
*
* <p>For each previous stage document, this stage will emit zero or more augmented documents. The
* input array found in the previous stage document field specified by the `fieldName` parameter,
* will for each input array element produce an augmented document. The input array element will
* augment the previous stage document by replacing the field specified by `fieldName` parameter
* with the element value.
*
* <p>In other words, the field containing the input array will be removed from the augmented
* document and replaced by the corresponding array element.
*
* <p>Example:
*
* <pre>{@code
* // Input:
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space", "adventure" ], ... }
*
* // Emit a book document for each tag of the book.
* firestore.pipeline().collection("books")
* .unnest("tags");
*
* // Output:
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "comedy", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "space", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": "adventure", ... }
* }</pre>
*
* @param fieldName The name of the field containing the array.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline unnest(String fieldName) {
// return unnest(Field.of(fieldName));
return append(new Unnest(Field.of(fieldName)));
}

// /**
// * Produces a document for each element in array found in previous stage document.
// *
// * <p>For each previous stage document, this stage will emit zero or more augmented documents.
// * The input array found in the specified by {@code Selectable} expression parameter, will for
// * each input array element produce an augmented document. The input array element will augment
// * the previous stage document by assigning the {@code Selectable} alias the element value.
// *
// * <p>Example:
// *
// * <pre>{@code
// * // Input:
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space",
// "adventure" ], ... }
// *
// * // Emit a book document for each tag of the book.
// * firestore.pipeline().collection("books")
// * .unnest(Field.of("tags").as("tag"));
// *
// * // Output:
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "comedy", "tags": [ "comedy",
// "space", "adventure" ], ... }
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "space", "tags": [ "comedy",
// "space", "adventure" ], ... }
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tag": "adventure", "tags": [
// "comedy", "space", "adventure" ], ... }
// * }</pre>
// *
// * @param field The expression that evaluates to the input array.
// * @return A new {@code Pipeline} object with this stage appended to the stage list.
// */
// @BetaApi
// public Pipeline unnest(Selectable field) {
// return append(new Unnest(field));
// }

/**
* Produces a document for each element in array found in previous stage document.
*
* <p>For each previous stage document, this stage will emit zero or more augmented documents. The
* input array found in the previous stage document field specified by the `fieldName` parameter,
* will for each input array element produce an augmented document. The input array element will
* augment the previous stage document by replacing the field specified by `fieldName` parameter
* with the element value.
*
* <p>In other words, the field containing the input array will be removed from the augmented
* document and replaced by the corresponding array element.
*
* <p>Example:
*
* <pre>{@code
* // Input:
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space", "adventure" ], ... }
*
* // Emit a book document for each tag of the book.
* firestore.pipeline().collection("books")
* .unnest("tags", UnnestOptions.indexField("tagIndex"));
*
* // Output:
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tags": "comedy", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tags": "space", ... }
* // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tags": "adventure", ... }
* }</pre>
*
* @param fieldName The name of the field containing the array.
* @param options The {@code UnnestOptions} options.
* @return A new {@code Pipeline} object with this stage appended to the stage list.
*/
@BetaApi
public Pipeline unnest(String fieldName, UnnestOptions options) {
// return unnest(Field.of(fieldName), options);
return append(new Unnest(Field.of(fieldName), options));
}

// /**
// * Produces a document for each element in array found in previous stage document.
// *
// * <p>For each previous stage document, this stage will emit zero or more augmented documents.
// * The input array found in the specified by {@code Selectable} expression parameter, will for
// * each input array element produce an augmented document. The input array element will augment
// * the previous stage document by assigning the {@code Selectable} alias the element value.
// *
// * <p>Example:
// *
// * <pre>{@code
// * // Input:
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tags": [ "comedy", "space",
// "adventure" ], ... }
// *
// * // Emit a book document for each tag of the book.
// * firestore.pipeline().collection("books")
// * .unnest(Field.of("tags").as("tag"), UnnestOptions.indexField("tagIndex"));
// *
// * // Output:
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 0, "tag": "comedy",
// "tags": [ "comedy", "space", "adventure" ], ... }
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 1, "tag": "space", "tags":
// [ "comedy", "space", "adventure" ], ... }
// * // { "title": "The Hitchhiker's Guide to the Galaxy", "tagIndex": 2, "tag": "adventure",
// "tags": [ "comedy", "space", "adventure" ], ... }
// * }</pre>
// *
// * @param field The expression that evaluates to the input array.
// * @param options The {@code UnnestOptions} options.
// * @return A new {@code Pipeline} object with this stage appended to the stage list.
// */
// @BetaApi
// public Pipeline unnest(Selectable field, UnnestOptions options) {
// return append(new Unnest(field, options));
// }

/**
* Adds a generic stage to the pipeline.
*
Expand Down Expand Up @@ -648,7 +946,7 @@ public Pipeline genericStage(String name, List<Object> params) {
*/
@BetaApi
public ApiFuture<List<PipelineResult>> execute() {
return execute(null, null);
return execute((ByteString) null, (com.google.protobuf.Timestamp) null);
}

/**
Expand Down Expand Up @@ -701,6 +999,22 @@ public void execute(ApiStreamObserver<PipelineResult> observer) {
executeInternal(null, null, observer);
}

// @BetaApi
// public void execute(ApiStreamObserver<PipelineResult> observer, PipelineOptions options) {
// throw new RuntimeException("Not Implemented");
// }
//
// @BetaApi
// public ApiFuture<List<PipelineResult>> explain() {
// throw new RuntimeException("Not Implemented");
// }
//
// @BetaApi
// public void explain(ApiStreamObserver<PipelineResult> observer, PipelineExplainOptions options)
// {
// throw new RuntimeException("Not Implemented");
// }

ApiFuture<List<PipelineResult>> execute(
@Nullable final ByteString transactionId, @Nullable com.google.protobuf.Timestamp readTime) {
SettableApiFuture<List<PipelineResult>> futureResult = SettableApiFuture.create();
Expand Down Expand Up @@ -767,12 +1081,18 @@ public void onError(Throwable t) {
});
}

@InternalApi
private com.google.firestore.v1.Pipeline toProto() {
return com.google.firestore.v1.Pipeline.newBuilder()
.addAllStages(stages.transform(StageUtils::toStageProto))
.build();
}

@InternalApi
public com.google.firestore.v1.Value toProtoValue() {
return Value.newBuilder().setPipelineValue(toProto()).build();
}

private void pipelineInternalStream(
ExecutePipelineRequest request, PipelineResultObserver resultObserver) {
ResponseObserver<ExecutePipelineResponse> observer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.google.cloud.firestore.pipeline.expressions.Expr;
import com.google.cloud.firestore.pipeline.expressions.ExprWithAlias;
import com.google.cloud.firestore.pipeline.expressions.Field;
import com.google.cloud.firestore.pipeline.expressions.Fields;
import com.google.cloud.firestore.pipeline.expressions.FilterCondition;
import com.google.cloud.firestore.pipeline.expressions.Selectable;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -173,11 +172,6 @@ public static Map<String, Expr> selectablesToMap(Selectable... selectables) {
} else if (proj instanceof AccumulatorTarget) {
AccumulatorTarget aggregatorProj = (AccumulatorTarget) proj;
projMap.put(aggregatorProj.getFieldName(), aggregatorProj.getAccumulator());
} else if (proj instanceof Fields) {
Fields fieldsProj = (Fields) proj;
if (fieldsProj.getFields() != null) {
fieldsProj.getFields().forEach(f -> projMap.put(f.getPath().getEncodedPath(), f));
}
} else if (proj instanceof ExprWithAlias) {
ExprWithAlias exprWithAlias = (ExprWithAlias) proj;
projMap.put(exprWithAlias.getAlias(), exprWithAlias.getExpr());
Expand Down
Loading
Loading