Skip to content

Commit

Permalink
Refactor Selenium Grid scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Viet Nguyen Duc <[email protected]>
  • Loading branch information
VietND96 committed Sep 17, 2024
1 parent f6358d5 commit cb52a32
Show file tree
Hide file tree
Showing 4 changed files with 1,266 additions and 242 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ Here is an overview of all new **experimental** features:
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))
- **Selenium Grid Scaler**: Improve logic based on node stereotypes, node sessions and queue requests capabilities ([#6080](https://github.com/kedacore/keda/issues/6080))
- **Selenium Grid Scaler**: Support for Username and Password Authentication of Grid GraphQL endpoint ([#6144](https://github.com/kedacore/keda/issues/6144))
- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

### Fixes
Expand Down
204 changes: 155 additions & 49 deletions pkg/scalers/selenium_grid_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"strings"

Expand All @@ -30,6 +29,8 @@ type seleniumGridScalerMetadata struct {
triggerIndex int

URL string `keda:"name=url, order=triggerMetadata;authParams"`
Username string `keda:"name=username, order=triggerMetadata;authParams, optional"`
Password string `keda:"name=password, order=triggerMetadata;authParams, optional"`
BrowserName string `keda:"name=browserName, order=triggerMetadata"`
SessionBrowserName string `keda:"name=sessionBrowserName, order=triggerMetadata, optional"`
ActivationThreshold int64 `keda:"name=activationThreshold, order=triggerMetadata, optional"`
Expand All @@ -40,40 +41,69 @@ type seleniumGridScalerMetadata struct {
TargetValue int64
}

type seleniumResponse struct {
Data data `json:"data"`
type SeleniumResponse struct {
Data Data `json:"data"`
}

type data struct {
Grid grid `json:"grid"`
SessionsInfo sessionsInfo `json:"sessionsInfo"`
type Data struct {
Grid Grid `json:"grid"`
NodesInfo NodesInfo `json:"nodesInfo"`
SessionsInfo SessionsInfo `json:"sessionsInfo"`
}

type grid struct {
MaxSession int `json:"maxSession"`
NodeCount int `json:"nodeCount"`
type Grid struct {
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
TotalSlots int `json:"totalSlots"`
}

type sessionsInfo struct {
SessionQueueRequests []string `json:"sessionQueueRequests"`
Sessions []seleniumSession `json:"sessions"`
type NodesInfo struct {
Nodes Nodes `json:"nodes"`
}

type seleniumSession struct {
type SessionsInfo struct {
SessionQueueRequests []string `json:"sessionQueueRequests"`
}

type Nodes []struct {
ID string `json:"id"`
Status string `json:"status"`
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
Stereotypes string `json:"stereotypes"`
Sessions Sessions `json:"sessions"`
}

type ReservedNodes struct {
ID string `json:"id"`
SlotCount int `json:"slotCount"`
}

type Sessions []struct {
ID string `json:"id"`
Capabilities string `json:"capabilities"`
NodeID string `json:"nodeId"`
Slot Slot `json:"slot"`
}

type capability struct {
type Slot struct {
ID string `json:"id"`
Stereotype string `json:"stereotype"`
}

type Capability struct {
BrowserName string `json:"browserName"`
BrowserVersion string `json:"browserVersion"`
PlatformName string `json:"platformName"`
}

type Stereotypes []struct {
Slots int `json:"slots"`
Stereotype Capability `json:"stereotype"`
}

const (
DefaultBrowserVersion string = "latest"
DefaultPlatformName string = "linux"
)

func NewSeleniumGridScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
Expand Down Expand Up @@ -117,7 +147,7 @@ func parseSeleniumGridScalerMetadata(config *scalersconfig.ScalerConfig) (*selen
return meta, nil
}

// No cleanup required for selenium grid scaler
// No cleanup required for Selenium Grid scaler
func (s *seleniumGridScaler) Close(context.Context) error {
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
Expand Down Expand Up @@ -152,7 +182,7 @@ func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.Metri

func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.Logger) (int64, error) {
body, err := json.Marshal(map[string]string{
"query": "{ grid { maxSession, nodeCount }, sessionsInfo { sessionQueueRequests, sessions { id, capabilities, nodeId } } }",
"query": "{ grid { sessionCount, maxSession, totalSlots }, nodesInfo { nodes { id, status, sessionCount, maxSession, slotCount, sessions { id, capabilities }, slot { id, stereotype } } }, sessionsInfo { sessionQueueRequests } }",
})

if err != nil {
Expand All @@ -164,6 +194,10 @@ func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.L
return -1, err
}

if s.metadata.Username != "" && s.metadata.Password != "" {
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)
}

res, err := s.httpClient.Do(req)
if err != nil {
return -1, err
Expand All @@ -186,55 +220,127 @@ func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.L
return v, nil
}

func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int {
var matchingSlots int
for _, stereotype := range stereotypes {
if checkCapabilitiesMatch(stereotype.Stereotype, request, browserName, browserVersion, sessionBrowserName, platformName) {
matchingSlots += stereotype.Slots
}
}
return matchingSlots
}

func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int {
var matchingSessions int
for _, session := range sessions {
var capability = Capability{}
if err := json.Unmarshal([]byte(session.Capabilities), &capability); err == nil {
if checkCapabilitiesMatch(capability, request, browserName, browserVersion, sessionBrowserName, platformName) {
matchingSessions++
}
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling session capabilities: %s", err))
}
}
return matchingSessions
}

func checkCapabilitiesMatch(capability Capability, requestCapability Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) bool {
// Ensure the logic should be aligned with DefaultSlotMatcher in Selenium Grid - SeleniumHQ/selenium/java/src/org/openqa/selenium/grid/data/DefaultSlotMatcher.java
// A browserName matches when one of the following conditions is met:
// 1. `browserName` in capability matches with `browserName` or `sessionBrowserName` in scaler metadata
// 2. `browserName` in request capability is empty or not provided
var browserNameMatches = strings.EqualFold(capability.BrowserName, browserName) || strings.EqualFold(capability.BrowserName, sessionBrowserName) ||
requestCapability.BrowserName == ""
// A browserVersion matches when one of the following conditions is met:
// 1. `browserVersion` in request capability is empty or not provided or `stable`
// 2. `browserVersion` in capability matches with prefix of the scaler metadata `browserVersion`
// 3. `browserVersion` in scaler metadata is `latest`
var browserVersionMatches = requestCapability.BrowserVersion == "" || requestCapability.BrowserVersion == "stable" ||
strings.HasPrefix(capability.BrowserVersion, browserVersion) || browserVersion == DefaultBrowserVersion
// A platformName matches when one of the following conditions is met:
// 1. `platformName` in request capability is empty or not provided
// 2. `platformName` in capability is empty or not provided
// 3. `platformName` in capability matches with the scaler metadata `platformName`
// 4. `platformName` in scaler metadata is empty or not provided
var platformNameMatches = requestCapability.PlatformName == "" || capability.PlatformName == "" ||
strings.EqualFold(capability.PlatformName, platformName) || platformName == ""
return browserNameMatches && browserVersionMatches && platformNameMatches
}

func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int) int {
for _, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
return reservedNode.SlotCount
}
}
return availableSlots
}

func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int) []ReservedNodes {
for i, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
// Update remaining available slots for the reserved node
reservedNodes[i].SlotCount = slotCount
return reservedNodes
}
}
// Add new reserved node if not found
return append(reservedNodes, ReservedNodes{ID: nodeID, SlotCount: slotCount})
}

func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) (int64, error) {
var count int64
var seleniumResponse = seleniumResponse{}
var availableSlots int
var queueSlots int
var seleniumResponse = SeleniumResponse{}

if err := json.Unmarshal(b, &seleniumResponse); err != nil {
return 0, err
}

var sessionQueueRequests = seleniumResponse.Data.SessionsInfo.SessionQueueRequests
var nodes = seleniumResponse.Data.NodesInfo.Nodes
var reservedNodes []ReservedNodes
for _, sessionQueueRequest := range sessionQueueRequests {
var capability = capability{}
if err := json.Unmarshal([]byte(sessionQueueRequest), &capability); err == nil {
if capability.BrowserName == browserName {
var platformNameMatches = capability.PlatformName == "" || strings.EqualFold(capability.PlatformName, platformName)
if strings.HasPrefix(capability.BrowserVersion, browserVersion) && platformNameMatches {
count++
} else if len(strings.TrimSpace(capability.BrowserVersion)) == 0 && browserVersion == DefaultBrowserVersion && platformNameMatches {
count++
}
var requestCapability = Capability{}
if err := json.Unmarshal([]byte(sessionQueueRequest), &requestCapability); err == nil {
if checkCapabilitiesMatch(requestCapability, requestCapability, browserName, browserVersion, sessionBrowserName, platformName) {
queueSlots++
}
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling session queue requests: %s", err))
logger.Error(err, fmt.Sprintf("Error when unmarshaling sessionQueueRequest capability: %s", err))
}
}

var sessions = seleniumResponse.Data.SessionsInfo.Sessions
for _, session := range sessions {
var capability = capability{}
if err := json.Unmarshal([]byte(session.Capabilities), &capability); err == nil {
var platformNameMatches = capability.PlatformName == "" || strings.EqualFold(capability.PlatformName, platformName)
if capability.BrowserName == sessionBrowserName {
if strings.HasPrefix(capability.BrowserVersion, browserVersion) && platformNameMatches {
count++
} else if browserVersion == DefaultBrowserVersion && platformNameMatches {
count++
for _, node := range nodes {
// Check if node is UP and has available slots (maxSession > sessionCount)
if strings.EqualFold(node.Status, "UP") && checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount) > 0 {
var stereotypes = Stereotypes{}
var availableSlotsMatch int
if err := json.Unmarshal([]byte(node.Stereotypes), &stereotypes); err == nil {
// Count available slots that match the request capability and scaler metadata
availableSlotsMatch += countMatchingSlotsStereotypes(stereotypes, requestCapability, browserName, browserVersion, sessionBrowserName, platformName)
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling node stereotypes: %s", err))
}
// Count ongoing sessions that match the request capability and scaler metadata
var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger)
// Count remaining available slots can be reserved for this request
var availableSlotsCanBeReserved = checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount)
// Reserve one available slot for the request if available slots match is greater than current sessions match
if availableSlotsMatch > currentSessionsMatch {
availableSlots++
reservedNodes = updateOrAddReservedNode(reservedNodes, node.ID, availableSlotsCanBeReserved-1)
}
}
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling sessions info: %s", err))
}
}

var gridMaxSession = int64(seleniumResponse.Data.Grid.MaxSession)
var gridNodeCount = int64(seleniumResponse.Data.Grid.NodeCount)

if gridMaxSession > 0 && gridNodeCount > 0 {
// Get count, convert count to next highest int64
var floatCount = float64(count) / (float64(gridMaxSession) / float64(gridNodeCount))
count = int64(math.Ceil(floatCount))
if queueSlots > availableSlots {
count = int64(queueSlots - availableSlots)
} else {
count = 0
}

return count, nil
}
Loading

0 comments on commit cb52a32

Please sign in to comment.