Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send matched apis, resources, subscription, application using metadata #2756

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions gateway/enforcer/internal/datastore/api_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type APIStore struct {
apis map[string]*requestconfig.API
mu sync.RWMutex
configStore *ConfigStore
cfg *config.Server
cfg *config.Server
}

// NewAPIStore creates a new instance of APIStore.
Expand Down Expand Up @@ -66,7 +66,8 @@ func (s *APIStore) AddAPIs(apis []*api.Api) {
Tier: api.Tier,
DisableAuthentication: api.DisableAuthentications,
DisableScopes: api.DisableScopes,
Resources: make([]requestconfig.Resource, 0),
Resources: make([]*requestconfig.Resource, 0),
ResourceMap: make(map[string]*requestconfig.Resource, 0),
IsMockedAPI: false, // You can add logic to determine if the API is mocked
MutualSSL: api.MutualSSL,
TransportSecurity: api.TransportSecurity,
Expand Down Expand Up @@ -98,7 +99,8 @@ func (s *APIStore) AddAPIs(apis []*api.Api) {
}
return endpointSecurity
}())
customAPI.Resources = append(customAPI.Resources, resource)
customAPI.Resources = append(customAPI.Resources, &resource)
customAPI.ResourceMap[resource.GetResourceIdentifier()] = &resource
}
}
s.cfg.Logger.Info(fmt.Sprintf("Adding API: %+v", customAPI.BackendJwtConfiguration))
Expand Down Expand Up @@ -209,8 +211,8 @@ func convertBackendJWTTokenInfoToJWTConfig(info *api.BackendJWTTokenInfo, cfg *c
Encoding: info.Encoding,
TokenIssuerDtoMap: make(map[string]dto.TokenIssuer), // Populate if required
JwtExcludedClaims: make(map[string]bool), // Populate if required
PublicCert: publicCert, // Add conversion logic if needed
PrivateKey: privateKey, // Add conversion logic if needed
PublicCert: publicCert, // Add conversion logic if needed
PrivateKey: privateKey, // Add conversion logic if needed
TTL: int64(info.TokenTTL), // Convert int32 to int64
CustomClaims: customClaims,
}
Expand Down
21 changes: 16 additions & 5 deletions gateway/enforcer/internal/datastore/subs_app_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ import (
// - mu: A read-write mutex to ensure thread-safe access to the data store.
// - commonControllerRestBaseUrl: The base URL for the common controller REST API.
type SubscriptionApplicationDataStore struct {
applications map[string]map[string]*subscription_model.Application
applicationMappings map[string]map[string]map[string]*subscription_model.ApplicationMapping
applicationKeyMappings map[string]map[string]*subscription_model.ApplicationKeyMapping
subscriptions map[string]map[string]*subscription_model.Subscription
applications map[string]map[string]*subscription_model.Application // OrganozationID -> ApplicationUUID -> Application
applicationMappings map[string]map[string]map[string]*subscription_model.ApplicationMapping // OrganizationID -> ApplicationRef -> ApplicationMappingUUID -> ApplicationMapping
applicationKeyMappings map[string]map[string]*subscription_model.ApplicationKeyMapping // OrganizationID -> ApplicationKeyMappingCacheKey -> ApplicationKeyMapping
subscriptions map[string]map[string]*subscription_model.Subscription // OrganizationID -> SubscriptionUUID -> Subscription
mu sync.RWMutex
commonControllerRestBaseURL string
}
Expand Down Expand Up @@ -177,6 +177,18 @@ func (ds *SubscriptionApplicationDataStore) GetSubscriptions(org string, subscri
return nil
}

// GetSubscription Get an Subscription by UUID
func (ds *SubscriptionApplicationDataStore) GetSubscription(org string, subscriptionID string) *subscription_model.Subscription {
ds.mu.RLock()
defer ds.mu.RUnlock()
if _, exists := ds.subscriptions[org]; exists {
if _, exists := ds.subscriptions[org][subscriptionID]; exists {
return ds.subscriptions[org][subscriptionID]
}
}
return nil
}

// GetApplicationKeyMapping Get an ApplicationKeyMapping by UUID
func (ds *SubscriptionApplicationDataStore) GetApplicationKeyMapping(org string, appKeyMapKey string) *subscription_model.ApplicationKeyMapping {
ds.mu.RLock()
Expand All @@ -201,7 +213,6 @@ func (ds *SubscriptionApplicationDataStore) GetApplication(org string, appID str
return nil
}


// // UpdateApplication Update an Application
// func (ds *SubscriptionApplicationDataStore) UpdateApplication(app *subscription_model.Application) error {
// ds.mu.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ package dto

// ExternalProcessingEnvoyMetadata represents the metadata extracted from the external processing request.
type ExternalProcessingEnvoyMetadata struct {
JwtAuthenticationData *JwtAuthenticationData `json:"jwtAuthenticationData"`
JwtAuthenticationData *JwtAuthenticationData `json:"jwtAuthenticationData"`
MatchedAPIIdentifier string `json:"matchedAPIIdentifier"`
MatchedResourceIdentifier string `json:"matchedResourceIdentifier"`
MatchedSubscriptionIdentifier string `json:"matchedSubscriptionIdentifier"`
MatchedApplicationIdentifier string `json:"matchedApplicationIdentifier"`
}

// JwtAuthenticationData represents the JWT authentication data.
Expand Down
112 changes: 87 additions & 25 deletions gateway/enforcer/internal/extproc/ext_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ const (
promptTokenIDMetadataKey string = "aitoken:prompttokenid"
completionTokenIDMetadataKey string = "aitoken:completiontokenid"
totalTokenIDMetadataKey string = "aitoken:totaltokenid"
matchedAPIMetadataKey string = "request:matchedapi"
matchedResourceMetadataKey string = "request:matchedresource"
matchedSubscriptionMetadataKey string = "request:matchedsubscription"
matchedApplicationMetadataKey string = "request:matchedapplication"


modelMetadataKey string = "aitoken:model"
)
Expand Down Expand Up @@ -165,7 +170,7 @@ func (s *ExternalProcessingServer) Process(srv envoy_service_proc_v3.ExternalPro
switch v := req.Request.(type) {
case *envoy_service_proc_v3.ProcessingRequest_RequestHeaders:
requestConfigHolder := &requestconfig.Holder{}
attributes, err := extractExternalProcessingAttributes(req.GetAttributes())
attributes, err := extractExternalProcessingXDSRouteMetadataAttributes(req.GetAttributes())
if err != nil {
s.log.Error(err, "failed to extract context attributes")
resp = &envoy_service_proc_v3.ProcessingResponse{
Expand All @@ -181,7 +186,9 @@ func (s *ExternalProcessingServer) Process(srv envoy_service_proc_v3.ExternalPro
}
break
}
apiKey := util.PrepareAPIKey(attributes.VHost, attributes.BasePath, attributes.APIVersion)
requestConfigHolder.MatchedAPI = s.apiStore.GetMatchedAPI(util.PrepareAPIKey(attributes.VHost, attributes.BasePath, attributes.APIVersion))
dynamicMetadataKeyValuePairs[matchedAPIMetadataKey] = apiKey
requestConfigHolder.ExternalProcessingEnvoyAttributes = attributes
metadata, err := extractExternalProcessingMetadata(req.GetMetadataContext())
if err != nil {
Expand All @@ -190,8 +197,10 @@ func (s *ExternalProcessingServer) Process(srv envoy_service_proc_v3.ExternalPro
}
requestConfigHolder.ExternalProcessingEnvoyMetadata = metadata
requestConfigHolder.MatchedResource = httpHandler.GetMatchedResource(requestConfigHolder.MatchedAPI, *requestConfigHolder.ExternalProcessingEnvoyAttributes)


if requestConfigHolder.MatchedResource != nil {
requestConfigHolder.MatchedResource.RouteMetadataAttributes = attributes
dynamicMetadataKeyValuePairs[matchedResourceMetadataKey] = requestConfigHolder.MatchedResource.GetResourceIdentifier()
}
// s.log.Info(fmt.Sprintf("Matched api bjc: %v", requestConfigHolder.MatchedAPI.BackendJwtConfiguration))
// s.log.Info(fmt.Sprintf("Matched Resource: %v", requestConfigHolder.MatchedResource))
// s.log.Info(fmt.Sprintf("req holderrr: %+v\n s: %+v", &requestConfigHolder, &s))
Expand Down Expand Up @@ -256,6 +265,13 @@ func (s *ExternalProcessingServer) Process(srv envoy_service_proc_v3.ExternalPro
resp.Response = &envoy_service_proc_v3.ProcessingResponse_RequestHeaders{
RequestHeaders: rhq,
}
if requestConfigHolder.MatchedApplication != nil {
dynamicMetadataKeyValuePairs[matchedApplicationMetadataKey] = requestConfigHolder.MatchedApplication.UUID
}
if requestConfigHolder.MatchedSubscription != nil {
dynamicMetadataKeyValuePairs[matchedSubscriptionMetadataKey] = requestConfigHolder.MatchedSubscription.UUID
}


case *envoy_service_proc_v3.ProcessingRequest_RequestBody:
// httpBody := req.GetRequestBody()
Expand Down Expand Up @@ -435,14 +451,30 @@ func (s *ExternalProcessingServer) Process(srv envoy_service_proc_v3.ExternalPro
ResponseHeaders: rhq,
},
}
s.log.Info("Response Header Flow")
if s.requestConfigHolder != nil &&
s.requestConfigHolder.MatchedAPI != nil &&
s.requestConfigHolder.MatchedAPI.AiProvider != nil &&
s.requestConfigHolder.MatchedAPI.AiProvider.CompletionToken != nil &&
s.requestConfigHolder.ExternalProcessingEnvoyAttributes.EnableBackendBasedAIRatelimit == "true" &&
s.requestConfigHolder.MatchedAPI.AiProvider.CompletionToken.In == dto.InHeader {
matchedAPI := s.requestConfigHolder.MatchedAPI
metadata, err := extractExternalProcessingMetadata(req.GetMetadataContext())
if err != nil {
s.log.Error(err, "failed to extract context metadata")
break
}
s.cfg.Logger.Info(fmt.Sprintf("metadata: %v", metadata))
matchedAPI := s.apiStore.GetMatchedAPI(metadata.MatchedAPIIdentifier)
if matchedAPI == nil {
s.cfg.Logger.Info(fmt.Sprintf("Matched API not found: %s", metadata.MatchedAPIIdentifier))
break
}
matchedResource := matchedAPI.ResourceMap[metadata.MatchedResourceIdentifier]
if matchedResource == nil {
s.cfg.Logger.Info(fmt.Sprintf("Matched Resource not found: %s", metadata.MatchedResourceIdentifier))
break
}
s.cfg.Logger.Info(fmt.Sprintf("Matched Resource: %v", matchedResource.RouteMetadataAttributes))
matchedSubscription := s.subscriptionApplicationDatastore.GetSubscription(matchedAPI.OrganizationID, metadata.MatchedSubscriptionIdentifier)
matchedApplication := s.subscriptionApplicationDatastore.GetApplication(matchedAPI.OrganizationID, metadata.MatchedApplicationIdentifier)
if matchedAPI.AiProvider != nil &&
matchedAPI.AiProvider.CompletionToken != nil &&
matchedResource.RouteMetadataAttributes != nil &&
matchedResource.RouteMetadataAttributes.EnableBackendBasedAIRatelimit == "true" &&
matchedAPI.AiProvider.CompletionToken.In == dto.InHeader {
s.log.Info("Backend based AI rate limit enabled using headers")
tokenCount, err := ratelimit.ExtractTokenCountFromExternalProcessingResponseHeaders(req.GetResponseHeaders().GetHeaders().GetHeaders(),
matchedAPI.AiProvider.PromptTokens.Value,
Expand All @@ -454,8 +486,8 @@ func (s *ExternalProcessingServer) Process(srv envoy_service_proc_v3.ExternalPro
} else {
s.ratelimitHelper.DoAIRatelimit(tokenCount, true,
matchedAPI.DoSubscriptionAIRLInHeaderReponse,
s.requestConfigHolder.ExternalProcessingEnvoyAttributes.BackendBasedAIRatelimitDescriptorValue,
s.requestConfigHolder.MatchedSubscription, s.requestConfigHolder.MatchedApplication)
matchedResource.RouteMetadataAttributes.BackendBasedAIRatelimitDescriptorValue,
matchedSubscription, matchedApplication)
aiProvider := matchedAPI.AiProvider
dynamicMetadataKeyValuePairs[analytics.AIProviderAPIVersionMetadataKey] = aiProvider.ProviderAPIVersion
dynamicMetadataKeyValuePairs[analytics.AIProviderNameMetadataKey] = aiProvider.ProviderName
Expand Down Expand Up @@ -543,13 +575,29 @@ func (s *ExternalProcessingServer) Process(srv envoy_service_proc_v3.ExternalPro
ResponseBody: rbq,
},
}
if s.requestConfigHolder != nil &&
s.requestConfigHolder.MatchedAPI != nil &&
s.requestConfigHolder.MatchedAPI.AiProvider != nil &&
s.requestConfigHolder.MatchedAPI.AiProvider.CompletionToken != nil &&
s.requestConfigHolder.ExternalProcessingEnvoyAttributes.EnableBackendBasedAIRatelimit == "true" &&
s.requestConfigHolder.MatchedAPI.AiProvider.CompletionToken.In == dto.InBody {
matchedAPI := s.requestConfigHolder.MatchedAPI
metadata, err := extractExternalProcessingMetadata(req.GetMetadataContext())
if err != nil {
s.log.Error(err, "failed to extract context metadata")
break
}
s.cfg.Logger.Info(fmt.Sprintf("metadata: %v", metadata))
matchedAPI := s.apiStore.GetMatchedAPI(metadata.MatchedAPIIdentifier)
if matchedAPI == nil {
s.cfg.Logger.Info(fmt.Sprintf("Matched API not found: %s", metadata.MatchedAPIIdentifier))
break
}
matchedResource := matchedAPI.ResourceMap[metadata.MatchedResourceIdentifier]
if matchedResource == nil {
s.cfg.Logger.Info(fmt.Sprintf("Matched Resource not found: %s", metadata.MatchedResourceIdentifier))
break
}
matchedSubscription := s.subscriptionApplicationDatastore.GetSubscription(matchedAPI.OrganizationID, metadata.MatchedSubscriptionIdentifier)
matchedApplication := s.subscriptionApplicationDatastore.GetApplication(matchedAPI.OrganizationID, metadata.MatchedApplicationIdentifier)
if matchedAPI.AiProvider != nil &&
matchedAPI.AiProvider.CompletionToken != nil &&
matchedResource.RouteMetadataAttributes != nil &&
matchedResource.RouteMetadataAttributes.EnableBackendBasedAIRatelimit == "true" &&
matchedAPI.AiProvider.CompletionToken.In == dto.InBody {
s.log.Info("Backend based AI rate limit enabled using body")
tokenCount, err := ratelimit.ExtractTokenCountFromExternalProcessingResponseBody(req.GetResponseBody().Body,
matchedAPI.AiProvider.PromptTokens.Value,
Expand All @@ -561,8 +609,8 @@ func (s *ExternalProcessingServer) Process(srv envoy_service_proc_v3.ExternalPro
} else {
s.ratelimitHelper.DoAIRatelimit(tokenCount, true,
matchedAPI.DoSubscriptionAIRLInBodyReponse,
s.requestConfigHolder.ExternalProcessingEnvoyAttributes.BackendBasedAIRatelimitDescriptorValue,
s.requestConfigHolder.MatchedSubscription, s.requestConfigHolder.MatchedApplication)
matchedResource.RouteMetadataAttributes.BackendBasedAIRatelimitDescriptorValue,
matchedSubscription, matchedApplication)
aiProvider := matchedAPI.AiProvider
dynamicMetadataKeyValuePairs[analytics.AIProviderAPIVersionMetadataKey] = aiProvider.ProviderAPIVersion
dynamicMetadataKeyValuePairs[analytics.AIProviderNameMetadataKey] = aiProvider.ProviderName
Expand Down Expand Up @@ -692,13 +740,28 @@ func extractExternalProcessingMetadata(data *corev3.Metadata) (*dto.ExternalProc
}
externalProcessingEnvoyMetadata.JwtAuthenticationData = authenticationData
}
if extProcMetadata, exists := filterMatadata[externalProessingMetadataContextKey]; exists {
if matchedAPIKey, exists := extProcMetadata.Fields[matchedAPIMetadataKey]; exists {
externalProcessingEnvoyMetadata.MatchedAPIIdentifier = matchedAPIKey.GetStringValue()
}
if matchedResourceKey, exists := extProcMetadata.Fields[matchedResourceMetadataKey]; exists {
externalProcessingEnvoyMetadata.MatchedResourceIdentifier = matchedResourceKey.GetStringValue()
}
if matchedApplicationKey, exists := extProcMetadata.Fields[matchedApplicationMetadataKey]; exists {
externalProcessingEnvoyMetadata.MatchedApplicationIdentifier = matchedApplicationKey.GetStringValue()
}
if matchedSubscriptionKey, exists := extProcMetadata.Fields[matchedSubscriptionMetadataKey]; exists {
externalProcessingEnvoyMetadata.MatchedSubscriptionIdentifier = matchedSubscriptionKey.GetStringValue()
}

}
return externalProcessingEnvoyMetadata, nil
}
return nil, nil
}

// extractExternalProcessingAttributes extracts the external processing attributes from the given data.
func extractExternalProcessingAttributes(data map[string]*structpb.Struct) (*dto.ExternalProcessingEnvoyAttributes, error) {
// extractExternalProcessingXDSRouteMetadataAttributes extracts the external processing attributes from the given data.
func extractExternalProcessingXDSRouteMetadataAttributes(data map[string]*structpb.Struct) (*dto.ExternalProcessingEnvoyAttributes, error) {

// Get the fields from the map
extProcData, exists := data["envoy.filters.http.ext_proc"]
Expand All @@ -713,7 +776,6 @@ func extractExternalProcessingAttributes(data map[string]*structpb.Struct) (*dto
if field, ok := fields["request.method"]; ok {
method := field.GetStringValue()
attributes.RequestMehod = method
fmt.Printf("******* %s\n", method)
}

// We need to navigate through the nested fields to get the actual values
Expand Down
3 changes: 2 additions & 1 deletion gateway/enforcer/internal/requestconfig/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type API struct {
Tier string `json:"tier"` // API tier (e.g., Unlimited)
DisableAuthentication bool `json:"disableAuthentication"` // Whether authentication is disabled
DisableScopes bool `json:"disableScopes"` // Whether scopes are disabled
Resources []Resource `json:"resources"` // List of resources for the API
Resources []*Resource `json:"resources"` // List of resources for the API
ResourceMap map[string]*Resource `json:"resourceMap"` // Map of resources for the API
IsMockedAPI bool `json:"isMockedApi"` // Whether the API is mocked
MutualSSL string `json:"mutualSSL"` // Mutual SSL configuration
TransportSecurity bool `json:"transportSecurity"` // Whether transport security is enabled
Expand Down
Loading
Loading