Skip to content

Commit

Permalink
Refactor Flow End in SDKs (#3171)
Browse files Browse the repository at this point in the history
- **Refactor**
- Improved the efficiency of the `end` method by adding conditional
checks to prevent unnecessary processing.
- Cleaned up the codebase by removing redundant lines in the
asynchronous flow management.

---------

Co-authored-by: Hardik Shingala <[email protected]>
  • Loading branch information
hasit and hdkshingala authored Jan 30, 2024
1 parent a135d73 commit f0ca2d7
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 5 deletions.
8 changes: 8 additions & 0 deletions sdks/aperture-csharp/Core/FeatureFlow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ public FeatureFlowEndResponse End()

if (_checkResponse.LimiterDecisions[i].ConcurrencyLimiterInfo != null)
{
if (_checkResponse.LimiterDecisions[i].ConcurrencyLimiterInfo.RequestId == "") {
continue;
}

var inflightRequest = new InflightRequestRef
{
PolicyHash = _checkResponse.LimiterDecisions[i].PolicyHash,
Expand All @@ -277,6 +281,10 @@ public FeatureFlowEndResponse End()

if (_checkResponse.LimiterDecisions[i].ConcurrencySchedulerInfo != null)
{
if (_checkResponse.LimiterDecisions[i].ConcurrencySchedulerInfo.RequestId == "") {
continue;
}

var inflightRequest = new InflightRequestRef
{
PolicyHash = _checkResponse.LimiterDecisions[i].PolicyHash,
Expand Down
8 changes: 6 additions & 2 deletions sdks/aperture-go/sdk/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ func (f *flow) End() EndResponse {

for _, decision := range f.checkResponse.GetLimiterDecisions() {
if decision.GetConcurrencyLimiterInfo() != nil {
if decision.GetConcurrencyLimiterInfo().GetRequestId() == "" {
continue
}
inflightRequest := &checkv1.InflightRequestRef{
PolicyName: decision.PolicyName,
PolicyHash: decision.PolicyHash,
Expand All @@ -324,11 +327,13 @@ func (f *flow) End() EndResponse {
if decision.GetConcurrencyLimiterInfo().GetTokensInfo() != nil {
inflightRequest.Tokens = decision.GetConcurrencyLimiterInfo().GetTokensInfo().GetConsumed()
}

inflightRequests = append(inflightRequests, inflightRequest)
}

if decision.GetConcurrencySchedulerInfo() != nil {
if decision.GetConcurrencySchedulerInfo().GetRequestId() == "" {
continue
}
ref := &checkv1.InflightRequestRef{
PolicyName: decision.PolicyName,
PolicyHash: decision.PolicyHash,
Expand All @@ -339,7 +344,6 @@ func (f *flow) End() EndResponse {
if decision.GetConcurrencySchedulerInfo().GetTokensInfo() != nil {
ref.Tokens = decision.GetConcurrencySchedulerInfo().GetTokensInfo().GetConsumed()
}

inflightRequests = append(inflightRequests, ref)
}
}
Expand Down
6 changes: 6 additions & 0 deletions sdks/aperture-go/sdk/httpflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func (f *httpflow) End() EndResponse {

for _, decision := range f.checkResponse.GetCheckResponse().GetLimiterDecisions() {
if decision.GetConcurrencyLimiterInfo() != nil {
if decision.GetConcurrencyLimiterInfo().GetRequestId() == "" {
continue
}
inflightRequest := &checkv1.InflightRequestRef{
PolicyName: decision.PolicyName,
PolicyHash: decision.PolicyHash,
Expand All @@ -140,6 +143,9 @@ func (f *httpflow) End() EndResponse {
}

if decision.GetConcurrencySchedulerInfo() != nil {
if decision.GetConcurrencySchedulerInfo().GetRequestId() == "" {
continue
}
ref := &checkv1.InflightRequestRef{
PolicyName: decision.PolicyName,
PolicyHash: decision.PolicyHash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,10 @@ public EndResponse end() {

for (LimiterDecision decision : this.checkResponse.getLimiterDecisionsList()) {
if (decision.getConcurrencyLimiterInfo() != null) {
if (decision.getConcurrencyLimiterInfo().getRequestId().isEmpty()) {
continue;
}

InflightRequestRef.Builder refBuilder =
InflightRequestRef.newBuilder()
.setPolicyName(decision.getPolicyName())
Expand All @@ -435,6 +439,10 @@ public EndResponse end() {
}

if (decision.getConcurrencySchedulerInfo() != null) {
if (decision.getConcurrencySchedulerInfo().getRequestId().isEmpty()) {
continue;
}

InflightRequestRef.Builder refBuilder =
InflightRequestRef.newBuilder()
.setPolicyName(decision.getPolicyName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ public EndResponse end() {
for (LimiterDecision decision :
this.checkResponse.getCheckResponse().getLimiterDecisionsList()) {
if (decision.getConcurrencyLimiterInfo() != null) {
if (decision.getConcurrencyLimiterInfo().getRequestId().isEmpty()) {
continue;
}

InflightRequestRef.Builder refBuilder =
InflightRequestRef.newBuilder()
.setPolicyName(decision.getPolicyName())
Expand All @@ -198,6 +202,10 @@ public EndResponse end() {
}

if (decision.getConcurrencySchedulerInfo() != null) {
if (decision.getConcurrencySchedulerInfo().getRequestId().isEmpty()) {
continue;
}

InflightRequestRef.Builder refBuilder =
InflightRequestRef.newBuilder()
.setPolicyName(decision.getPolicyName())
Expand Down
8 changes: 7 additions & 1 deletion sdks/aperture-js/sdk/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class _Flow implements Flow {
if (
(!this.rampMode && this._checkResponse === null) ||
this._checkResponse?.decisionType ===
_aperture_flowcontrol_check_v1_CheckResponse_DecisionType.DECISION_TYPE_ACCEPTED
_aperture_flowcontrol_check_v1_CheckResponse_DecisionType.DECISION_TYPE_ACCEPTED
) {
return true;
} else {
Expand Down Expand Up @@ -493,6 +493,9 @@ export class _Flow implements Flow {

this._checkResponse.limiterDecisions.forEach((decision) => {
if (decision.concurrencyLimiterInfo) {
if (decision.concurrencyLimiterInfo.requestId == "") {
return;
}
inflightRequestRefs.push({
policyName: decision.policyName,
policyHash: decision.policyHash,
Expand All @@ -503,6 +506,9 @@ export class _Flow implements Flow {
});
}
if (decision.concurrencySchedulerInfo) {
if (decision.concurrencySchedulerInfo.requestId == "") {
return;
}
inflightRequestRefs.push({
policyName: decision.policyName,
policyHash: decision.policyHash,
Expand Down
4 changes: 4 additions & 0 deletions sdks/aperture-py/aperture_sdk/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ def end(self) -> EndResponse:
if self.check_response:
for decision in self.check_response.limiter_decisions:
if decision.WhichOneof("details") == "concurrency_limiter_info":
if decision.concurrency_limiter_info.request_id == "":
continue
ref: InflightRequestRef = InflightRequestRef(
policy_name=decision.policy_name,
policy_hash=decision.policy_hash,
Expand All @@ -148,6 +150,8 @@ def end(self) -> EndResponse:
)
inflight_request_ref.append(ref)
elif decision.WhichOneof("details") == "concurrency_scheduler_info":
if decision.concurrency_scheduler_info.request_id == "":
continue
ref: InflightRequestRef = InflightRequestRef(
policy_name=decision.policy_name,
policy_hash=decision.policy_hash,
Expand Down
2 changes: 0 additions & 2 deletions sdks/aperture-py/aperture_sdk/flow_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ async def end(self) -> EndResponse:
if decision.WhichOneof("details") == "concurrency_limiter_info":
if decision.concurrency_limiter_info.request_id == "":
continue

ref: InflightRequestRef = InflightRequestRef(
policy_name=decision.policy_name,
policy_hash=decision.policy_hash,
Expand All @@ -153,7 +152,6 @@ async def end(self) -> EndResponse:
elif decision.WhichOneof("details") == "concurrency_scheduler_info":
if decision.concurrency_scheduler_info.request_id == "":
continue

ref: InflightRequestRef = InflightRequestRef(
policy_name=decision.policy_name,
policy_hash=decision.policy_hash,
Expand Down

0 comments on commit f0ca2d7

Please sign in to comment.