Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing customers to wrap CosmosAsyncContainer #43724

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

FabianMeiswinkel
Copy link
Member

@FabianMeiswinkel FabianMeiswinkel commented Jan 7, 2025

Description

This PR adds a protected ctor for CosmosAsyncContainer to allow extending Container - for example to add custom diagnostics, error handling, retry policies or addiitonal validation - like disallowing certain query functionality etc.
The CosmosAsyncContainer has several non-final methods already - but there was no public/protected ctor - this was intentional, because the business logic in the CosmosAsyncContainer really does not allow for artificial extensions - like it won't make much sense to try to inject a different store than Cosmso DB because the logc for metrics, diagnostics and even the PagedIterator returned from query etc. is intentionally tight to Cosmos DB.
There have been a few customer reports thought for limited extensibility - for testing mocking, but also to add some shared custom diagnostics and some validations to prevent application teams to violate against best practices (preventing cross-partition queries, disabling scan in query etc.)
So, this PR opens the extensibility in CosmosAsyncContainer a bit more - but since the only protected ctor still requires a CosmosAsnycContainer for the actual implementations it still only allows extensions to wrap around the original Container implementation. This PR is intentionally not trying to extend extensibility to support generic stores etc.

There are two options on how to inject custom containers.

  1. Wrap explicitly.
CosmosClient clientWithoutInterceptor = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .userAgentSuffix("noInterceptor")
            .buildClient();

CosmosContainer normalContainer = clientWithoutInterceptor
    .getDatabase("TestDB")
    .getContainer("TestContainer");

CustomContainer = new CustomContainer(normalContainer);

private static class CustomContainer extends CosmosAsyncContainer {
        protected CustomContainer (CosmosAsyncContainer toBeWrappedContainer) {
            super(toBeWrappedContainer);
        }
 }
  1. Automatically inject customer container for all containers (sync)
CosmosClient clientWithInterceptor = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .userAgentSuffix("withInterceptor")
            .containerCreationInterceptor(originalContainer -> new DisallowQueriesContainer(originalContainer))
            .buildClient();

CosmosContainer customContainer = clientWithInterceptor 
    .getDatabase("TestDB")
    .getContainer("TestContainer");

private static class CustomContainer extends CosmosAsyncContainer {
        protected CustomContainer (CosmosAsyncContainer toBeWrappedContainer) {
            super(toBeWrappedContainer);
        }
 }
  1. Automatically inject customer container for all containers (async)
CosmosAsyncClient asyncClientWithInterceptor = new CosmosClientBuilder()
            .endpoint(TestConfigurations.HOST)
            .key(TestConfigurations.MASTER_KEY)
            .userAgentSuffix("withInterceptor")
            .containerCreationInterceptor(originalContainer -> new DisallowQueriesContainer(originalContainer))
            .buildAsyncClient();

CustomContainer customContainer = (CustomContainer)asyncClientWithInterceptor 
    .getDatabase("TestDB")
    .getContainer("TestContainer");

private static class CustomContainer extends CosmosAsyncContainer {
        protected CustomContainer (CosmosAsyncContainer toBeWrappedContainer) {
            super(toBeWrappedContainer);
        }
 }

All SDK Contribution checklist:

  • The pull request does not introduce [breaking changes]
  • CHANGELOG is updated for new features, bug fixes or other significant changes.
  • I have read the contribution guidelines.

General Guidelines and Best Practices

  • Title of the pull request is clear and informative.
  • There are a small number of commits, each of which have an informative message. This means that previously merged commits do not appear in the history of the PR. For more information on cleaning up the commits in your PR, see this page.

Testing Guidelines

  • Pull request includes test coverage for the included changes.

@FabianMeiswinkel FabianMeiswinkel marked this pull request as draft January 7, 2025 19:06
@FabianMeiswinkel FabianMeiswinkel changed the title Allowing customers to wrap CosmosAsyncContainer DRAFT: Allowing customers to wrap CosmosAsyncContainer Jan 7, 2025
@github-actions github-actions bot added the Cosmos label Jan 7, 2025
@azure-sdk
Copy link
Collaborator

API change check

APIView has identified API level changes in this PR and created following API reviews.

com.azure:azure-cosmos

@FabianMeiswinkel FabianMeiswinkel marked this pull request as ready for review January 8, 2025 00:13
@FabianMeiswinkel FabianMeiswinkel changed the title DRAFT: Allowing customers to wrap CosmosAsyncContainer Allowing customers to wrap CosmosAsyncContainer Jan 8, 2025
@Gueorgi
Copy link

Gueorgi commented Jan 8, 2025

This is a good PR to open up the SDK a lot more in an easy to understand way.

  1. It still leaves no option to extend some methods like readAllItems (package private - no modifier) unless one makes their package appear to be part of the SDK one.

  2. One can't provide a different return object in some cases like CosmosPageFlux for example since it is not extendible (cases of classes that are either final or non public etc).

  3. If one needs to use this for a wrapper but then the wrapper's clients also try to use this functionality as well how that can work? The solution looks to be an util that builds a custom wrapper container-create callback and that util/callback also has to accept as param the "client custom" wrapper callbacks as well and invoke these - becomes a bit mor complex to support (a custom wrapper calling another custom wrapper callback that has been passed in etc).

@FabianMeiswinkel
Copy link
Member Author

This is a good PR to open up the SDK a lot more in an easy to understand way.

  1. It still leaves no option to extend some methods like readAllItems (package private - no modifier) unless one makes their package appear to be part of the SDK one.
  2. One can't provide a different return object in some cases like CosmosPageFlux for example since it is not extendible (cases of classes that are either final or non public etc).
  3. If one needs to use this for a wrapper but then the wrapper's clients also try to use this functionality as well how that can work? The solution looks to be an util that builds a custom wrapper container-create callback and that util/callback also has to accept as param the "client custom" wrapper callbacks as well and invoke these - becomes a bit mor complex to support (a custom wrapper calling another custom wrapper callback that has been passed in etc).

Regarding #1 - the public API for readAllItems is meant to read all documents of a logical partition - this one can also be extended -

/**
* Reads all the items of a logical partition
* <!-- src_embed com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <pre>
* cosmosAsyncContainer
* .readAllItems&#40;new PartitionKey&#40;partitionKey&#41;, Passenger.class&#41;
* .byPage&#40;100&#41;
* .flatMap&#40;passengerFeedResponse -&gt; &#123;
* for &#40;Passenger passenger : passengerFeedResponse.getResults&#40;&#41;&#41; &#123;
* System.out.println&#40;passenger&#41;;
* &#125;
* return Flux.empty&#40;&#41;;
* &#125;&#41;
* .subscribe&#40;&#41;;
* </pre>
* <!-- end com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed responses of the read Cosmos items. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param <T> the type parameter.
* @param partitionKey the partition key value of the documents that need to be read
* @param classType the class type.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages
* of the read Cosmos items or an error.
*/
public <T> CosmosPagedFlux<T> readAllItems(
PartitionKey partitionKey,
Class<T> classType) {
CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions();
queryRequestOptions.setPartitionKey(partitionKey);
return this.readAllItems(partitionKey, queryRequestOptions, classType);
}
/**
* Reads all the items of a logical partition
* <!-- src_embed com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <pre>
* cosmosAsyncContainer
* .readAllItems&#40;new PartitionKey&#40;partitionKey&#41;, Passenger.class&#41;
* .byPage&#40;100&#41;
* .flatMap&#40;passengerFeedResponse -&gt; &#123;
* for &#40;Passenger passenger : passengerFeedResponse.getResults&#40;&#41;&#41; &#123;
* System.out.println&#40;passenger&#41;;
* &#125;
* return Flux.empty&#40;&#41;;
* &#125;&#41;
* .subscribe&#40;&#41;;
* </pre>
* <!-- end com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed responses of the read Cosmos items. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param <T> the type parameter.
* @param partitionKey the partition key value of the documents that need to be read
* @param options the feed options (Optional).
* @param classType the class type.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages
* of the read Cosmos items or an error.
*/
public <T> CosmosPagedFlux<T> readAllItems(
PartitionKey partitionKey,
CosmosQueryRequestOptions options,
Class<T> classType) {
CosmosAsyncClient client = this.getDatabase().getClient();
final CosmosQueryRequestOptions requestOptions = options == null ? new CosmosQueryRequestOptions() : options;
requestOptions.setPartitionKey(partitionKey);
CosmosQueryRequestOptionsBase<?> cosmosQueryRequestOptionsImpl = queryOptionsAccessor.getImpl(requestOptions);
applyPolicies(OperationType.Query, ResourceType.Document, cosmosQueryRequestOptionsImpl, this.readManyItemsSpanName);
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
QueryFeedOperationState state = new QueryFeedOperationState(
client,
this.readAllItemsOfLogicalPartitionSpanName,
database.getId(),
this.getId(),
ResourceType.Document,
OperationType.ReadFeed,
queryOptionsAccessor.getQueryNameOrDefault(requestOptions, this.readAllItemsOfLogicalPartitionSpanName),
requestOptions,
pagedFluxOptions
);
pagedFluxOptions.setFeedOperationState(state);
return getDatabase()
.getDocClientWrapper()
.readAllDocuments(getLink(), partitionKey, state, classType)
.map(response -> prepareFeedResponse(response, false));
});
}
. I assume you are referring to the internal readAllItems without PK - this functionally is equivalent to doing a query of "SELECT * from c" - so, the API is internal only both in the "normal" container and as such cannot be extended - but the queryItems can be - so, I don't think this is really an issue.

Regarding #2 - ACK - this is true and it is (an intentional) limitation of CosmosPagedFlux - let's go through the concrete use cases to see whether/how we can unblock them by allowing public CosmosPagedFlux overloads or factory methods

Regarding #3 - Correct - pipelining would be needed in this case - but I don't quite see why that is a problem.

@Gueorgi
Copy link

Gueorgi commented Jan 10, 2025

This is a good PR to open up the SDK a lot more in an easy to understand way.

  1. It still leaves no option to extend some methods like readAllItems (package private - no modifier) unless one makes their package appear to be part of the SDK one.
  2. One can't provide a different return object in some cases like CosmosPageFlux for example since it is not extendible (cases of classes that are either final or non public etc).
  3. If one needs to use this for a wrapper but then the wrapper's clients also try to use this functionality as well how that can work? The solution looks to be an util that builds a custom wrapper container-create callback and that util/callback also has to accept as param the "client custom" wrapper callbacks as well and invoke these - becomes a bit mor complex to support (a custom wrapper calling another custom wrapper callback that has been passed in etc).

Regarding #1 - the public API for readAllItems is meant to read all documents of a logical partition - this one can also be extended -

/**
* Reads all the items of a logical partition
* <!-- src_embed com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <pre>
* cosmosAsyncContainer
* .readAllItems&#40;new PartitionKey&#40;partitionKey&#41;, Passenger.class&#41;
* .byPage&#40;100&#41;
* .flatMap&#40;passengerFeedResponse -&gt; &#123;
* for &#40;Passenger passenger : passengerFeedResponse.getResults&#40;&#41;&#41; &#123;
* System.out.println&#40;passenger&#41;;
* &#125;
* return Flux.empty&#40;&#41;;
* &#125;&#41;
* .subscribe&#40;&#41;;
* </pre>
* <!-- end com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <p>
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed responses of the read Cosmos items. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param <T> the type parameter.
* @param partitionKey the partition key value of the documents that need to be read
* @param classType the class type.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages
* of the read Cosmos items or an error.
*/
public <T> CosmosPagedFlux<T> readAllItems(
PartitionKey partitionKey,
Class<T> classType) {
CosmosQueryRequestOptions queryRequestOptions = new CosmosQueryRequestOptions();
queryRequestOptions.setPartitionKey(partitionKey);
return this.readAllItems(partitionKey, queryRequestOptions, classType);
}
/**
* Reads all the items of a logical partition
* <!-- src_embed com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* <pre>
* cosmosAsyncContainer
* .readAllItems&#40;new PartitionKey&#40;partitionKey&#41;, Passenger.class&#41;
* .byPage&#40;100&#41;
* .flatMap&#40;passengerFeedResponse -&gt; &#123;
* for &#40;Passenger passenger : passengerFeedResponse.getResults&#40;&#41;&#41; &#123;
* System.out.println&#40;passenger&#41;;
* &#125;
* return Flux.empty&#40;&#41;;
* &#125;&#41;
* .subscribe&#40;&#41;;
* </pre>
* <!-- end com.azure.cosmos.CosmosAsyncContainer.readAllItems -->
* After subscription the operation will be performed. The {@link CosmosPagedFlux} will
* contain one or several feed responses of the read Cosmos items. In case of
* failure the {@link CosmosPagedFlux} will error.
*
* @param <T> the type parameter.
* @param partitionKey the partition key value of the documents that need to be read
* @param options the feed options (Optional).
* @param classType the class type.
* @return a {@link CosmosPagedFlux} containing one or several feed response pages
* of the read Cosmos items or an error.
*/
public <T> CosmosPagedFlux<T> readAllItems(
PartitionKey partitionKey,
CosmosQueryRequestOptions options,
Class<T> classType) {
CosmosAsyncClient client = this.getDatabase().getClient();
final CosmosQueryRequestOptions requestOptions = options == null ? new CosmosQueryRequestOptions() : options;
requestOptions.setPartitionKey(partitionKey);
CosmosQueryRequestOptionsBase<?> cosmosQueryRequestOptionsImpl = queryOptionsAccessor.getImpl(requestOptions);
applyPolicies(OperationType.Query, ResourceType.Document, cosmosQueryRequestOptionsImpl, this.readManyItemsSpanName);
return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> {
QueryFeedOperationState state = new QueryFeedOperationState(
client,
this.readAllItemsOfLogicalPartitionSpanName,
database.getId(),
this.getId(),
ResourceType.Document,
OperationType.ReadFeed,
queryOptionsAccessor.getQueryNameOrDefault(requestOptions, this.readAllItemsOfLogicalPartitionSpanName),
requestOptions,
pagedFluxOptions
);
pagedFluxOptions.setFeedOperationState(state);
return getDatabase()
.getDocClientWrapper()
.readAllDocuments(getLink(), partitionKey, state, classType)
.map(response -> prepareFeedResponse(response, false));
});
}

. I assume you are referring to the internal readAllItems without PK - this functionally is equivalent to doing a query of "SELECT * from c" - so, the API is internal only both in the "normal" container and as such cannot be extended - but the queryItems can be - so, I don't think this is really an issue.
Regarding #2 - ACK - this is true and it is (an intentional) limitation of CosmosPagedFlux - let's go through the concrete use cases to see whether/how we can unblock them by allowing public CosmosPagedFlux overloads or factory methods

Regarding #3 - Correct - pipelining would be needed in this case - but I don't quite see why that is a problem.

on #1 I aggree - not an issue
on #2 the use case is providing mocks or cached data etc - not being able to create CosmosPageFlux means one can't return anything else but null or what was returned by the unerlying SDK. So this prevents the wrapping being used for mocks and other purposes (i.e. fetch from different cosmos client - cold storage etc).
on #3 - in other wrappers like JDBC one can get a wrapper (not knowing it is a wrapper) and create a new one on top of it as well - and this can be mostly trasparent to the end user/consumer of the wrapper. Happens with JDBC connection pooling, DataSources etc. Here for one wrapper to be used it will not be transparent - the client has to willingly create the wrapper - via some utility and this wrapper will not have a chance of being used in 2nd wrapper unless that 2nd warpper directly calls the first wrapper callback (not transparent).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants