Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added NifiNodeGroupAutoscaler with basic scaling strategies #89

Merged
merged 29 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6d5debc
Added NifiNodeGroupAutoscaler with basic scaling strategies
mh013370 Apr 22, 2022
44e0159
fix typo, update CRDs
mh013370 Apr 23, 2022
b73015a
update changelog, add CRD documentation to site
mh013370 Apr 25, 2022
806ef9d
Add missing finalizer
mh013370 Apr 25, 2022
95007c1
fix docs
mh013370 Apr 25, 2022
2aac444
Update api/v1alpha1/nifinodegroupautoscaler_types.go
mh013370 May 13, 2022
8035701
do not create HPA, centralize nodeId assignment, refactor scaling str…
mh013370 May 16, 2022
bd633f9
added NodeConfig to autoscaler spec
mh013370 May 16, 2022
1c773b7
Update docs
mh013370 May 16, 2022
1f021bc
relocate nodeId assignment logic to strategy package since that's whe…
mh013370 May 16, 2022
b34c98b
Added NifiNodeGroupAutoscaler with basic scaling strategies
mh013370 Apr 22, 2022
61b5af1
fix typo, update CRDs
mh013370 Apr 23, 2022
cf49b77
update changelog, add CRD documentation to site
mh013370 Apr 25, 2022
f76a5c2
Add missing finalizer
mh013370 Apr 25, 2022
f728f2a
fix docs
mh013370 Apr 25, 2022
29279bc
Update api/v1alpha1/nifinodegroupautoscaler_types.go
mh013370 May 13, 2022
1ebf92f
do not create HPA, centralize nodeId assignment, refactor scaling str…
mh013370 May 16, 2022
5aaff70
added NodeConfig to autoscaler spec
mh013370 May 16, 2022
b958073
Update docs
mh013370 May 16, 2022
dc09428
relocate nodeId assignment logic to strategy package since that's whe…
mh013370 May 16, 2022
98afaa5
make node status creation time optional to avoid breaking pre-existin…
mh013370 May 16, 2022
6f5df2a
merge
mh013370 May 16, 2022
1a27dde
fixed LIFO downscale bug, added unit tests
mh013370 May 16, 2022
37dc526
revert minor change
mh013370 May 16, 2022
935c200
Merge branch 'master' into create-nodegroup-autoscaler
mh013370 Jun 16, 2022
17df0c2
update CRDs after merging master
mh013370 Jun 23, 2022
385f748
Merge branch 'master' into create-nodegroup-autoscaler
mh013370 Jun 27, 2022
c40bd64
update CRDs after merging master in
mh013370 Jun 27, 2022
cf7946d
Merge branch 'master' into create-nodegroup-autoscaler
erdrix Jul 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,6 @@ testbin/*

# editor and IDE paraphernalia

*~
*~

vendor/
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Added

- [PR #89](https://github.com/konpyutaika/nifikop/pull/89) - **[Operator/NifiNodeGroupAutoscaler]** Add NifiNodeGroupAutoscaler to automatically horizontally scale a NifiCluster resource via the Kubernetes HorizontalPodAutoscaler.
- [PR #108](https://github.com/konpyutaika/nifikop/pull/108) - **[Operator/Logging]** Migrated from logr library to zap
- [PR #112](https://github.com/konpyutaika/nifikop/pull/112) - **[Documentation]** Add section to explain how upgrade from 0.7.6 to 0.8.0.
- [PR #114](https://github.com/konpyutaika/nifikop/pull/114) - **[Operator/NifiCluster]** Added ability to set the `PodSpec.HostAliases` to provide Pod-level override of hostname resolution when DNS and other options are not applicable.
Expand Down
48 changes: 14 additions & 34 deletions PROJECT
Original file line number Diff line number Diff line change
@@ -1,81 +1,61 @@
domain: konpyutaika.com
layout: go.kubebuilder.io/v3
layout:
- go.kubebuilder.io/v3
plugins:
manifests.sdk.operatorframework.io/v2: {}
scorecard.sdk.operatorframework.io/v2: {}
projectName: nifikop
repo: github.com/konpyutaika/nifikop
resources:
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiCluster
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiUserGroup
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiUser
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/Okonpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiRegistryClient
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/Okonpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiDataflow
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
# TODO(user): Uncomment the below line if this resource's CRD is namespace scoped, else delete it.
# namespaced: true
# TODO(user): Uncomment the below line if this resource implements a controller, else delete it.
# controller: true
domain: konpyutaika.com
group: nifi
kind: NifiParameterContext
# TODO(user): Update the package path for your API if the below value is incorrect.
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
namespaced: true
controller: true
domain: konpyutaika.com
group: nifi
kind: NifiNodeGroupAutoscaler
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
version: "3"
plugins:
manifests.sdk.operatorframework.io/v2: {}
scorecard.sdk.operatorframework.io/v2: {}
35 changes: 35 additions & 0 deletions api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package v1alpha1

import (
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// DataflowState defines the state of a NifiDataflow
Expand All @@ -25,6 +27,18 @@ type ActionStep string
// ClusterState holds info about the cluster state
type ClusterState string

// NodeGroupAutoscalerState holds info autoscaler state
type NodeGroupAutoscalerState string

// ClusterReplicas holds info about the current number of replicas in the cluster
type ClusterReplicas int32

// ClusterReplicaSelector holds info about the pod selector for cluster replicas
type ClusterReplicaSelector string

// ClusterScalingStrategy holds info about how a cluster should be scaled
type ClusterScalingStrategy string

// ConfigurationState holds info about the configuration state
type ConfigurationState string

Expand Down Expand Up @@ -325,6 +339,9 @@ type NodeState struct {
InitClusterNode InitClusterNode `json:"initClusterNode"`
// PodIsReady whether or not the associated pod is ready
PodIsReady bool `json:"podIsReady"`
// CreationTime is the time at which this node was created. This must be sortable.
// +optional
CreationTime metav1.Time `json:"creationTime,omitempty"`
}

// RackAwarenessState holds info about rack awareness status
Expand Down Expand Up @@ -392,6 +409,24 @@ const (
NotInitClusterNode InitClusterNode = false
)

const (
// AutoscalerStateOutOfSync describes the status of a NifiNodeGroupAutoscaler as out of sync
AutoscalerStateOutOfSync NodeGroupAutoscalerState = "OutOfSync"
// AutoscalerStateInSync describes the status of a NifiNodeGroupAutoscaler as in sync
AutoscalerStateInSync NodeGroupAutoscalerState = "InSync"

// upscale strategy representing 'Scale > Disconnect the nodes > Offload data > Reconnect the node' strategy
GracefulClusterUpscaleStrategy ClusterScalingStrategy = "graceful"
// simply add a node to the cluster and nothing else
SimpleClusterUpscaleStrategy ClusterScalingStrategy = "simple"
// downscale strategy to remove the last node added
LIFOClusterDownscaleStrategy ClusterScalingStrategy = "lifo"
// downscale strategy avoiding primary/coordinator nodes
NonPrimaryClusterDownscaleStrategy ClusterScalingStrategy = "nonprimary"
// downscale strategy targeting nodes which are least busy in terms of # flowfiles in queues
LeastBusyClusterDownscaleStrategy ClusterScalingStrategy = "leastbusy"
)

func ClusterRefsEquals(clusterRefs []ClusterReference) bool {
c1 := clusterRefs[0]
name := c1.Name
Expand Down
54 changes: 51 additions & 3 deletions api/v1alpha1/nificluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package v1alpha1

import (
"fmt"
"sort"
"strconv"
"strings"

cmmeta "github.com/jetstack/cert-manager/pkg/apis/meta/v1"
Expand All @@ -15,7 +17,7 @@ const (
HttpListenerType = "http"
HttpsListenerType = "https"
S2sListenerType = "s2s"
prometheusListenerType = "prometheus"
PrometheusListenerType = "prometheus"
)

// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
Expand Down Expand Up @@ -75,7 +77,7 @@ type NifiClusterSpec struct {
// NodeUserIdentityTemplate specifies the template to be used when naming the node user identity (e.g. node-%d-mysuffix)
NodeUserIdentityTemplate *string `json:"nodeUserIdentityTemplate,omitempty"`
// all node requires an image, unique id, and storageConfigs settings
Nodes []Node `json:"nodes"`
Nodes []Node `json:"nodes" patchStrategy:"merge" patchMergeKey:"id"`
// Defines the configuration for PodDisruptionBudget
DisruptionBudget DisruptionBudget `json:"disruptionBudget,omitempty"`
// LdapConfiguration specifies the configuration if you want to use LDAP
Expand Down Expand Up @@ -161,6 +163,9 @@ type Node struct {
ReadOnlyConfig *ReadOnlyConfig `json:"readOnlyConfig,omitempty"`
// node configuration
NodeConfig *NodeConfig `json:"nodeConfig,omitempty"`
// Labels are used to distinguish nodes from one another. They are also used by NifiNodeGroupAutoscaler
// to be automatically scaled. See NifiNodeGroupAutoscaler.Spec.NodeLabelsSelector
Labels map[string]string `json:"labels,omitempty"`
}

type ReadOnlyConfig struct {
Expand Down Expand Up @@ -726,7 +731,7 @@ func (nProperties NifiProperties) GetAuthorizer() string {
func (nSpec *NifiClusterSpec) GetMetricPort() *int {

for _, iListener := range nSpec.ListenersConfig.InternalListeners {
if iListener.Type == prometheusListenerType {
if iListener.Type == PrometheusListenerType {
val := int(iListener.ContainerPort)
return &val
}
Expand Down Expand Up @@ -804,3 +809,46 @@ func (cluster NifiCluster) IsReady() bool {
func (cluster *NifiCluster) Id() string {
return cluster.Name
}

type Pair struct {
Key string
Value metav1.Time
}
type PairList []Pair

func (p PairList) Len() int { return len(p) }
func (p PairList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p PairList) Less(i, j int) bool { return p[i].Value.Before(&p[j].Value) }

// Order the nodes in the cluster by the time they were created. The list will be in ascending order.
// Older nodes will be in the beginning of the list, newer nodes at the end.
// Nodes for Clusters that existed prior to this feature (v0.11.0+) will not have a creationTime. In this case,
// LIFO will not be able to reliably determine the oldest node. A rolling restart of nodes in the cluster will
// resolve this issue going forward.
func (cluster *NifiCluster) GetCreationTimeOrderedNodes() []Node {
nodeIdCreationPairs := PairList{}

for k, v := range cluster.Status.NodesState {
nodeIdCreationPairs = append(nodeIdCreationPairs, Pair{k, v.CreationTime})
}

// nodeIdCreationPairs is now sorted by creation time in ascending order.
sort.Sort(nodeIdCreationPairs)

nodesMap := nodesToIdMap(cluster.Spec.Nodes)
timeOrderedNodes := []Node{}

for _, pair := range nodeIdCreationPairs {
id, _ := strconv.Atoi(pair.Key)
timeOrderedNodes = append(timeOrderedNodes, nodesMap[int32(id)])
}
return timeOrderedNodes
}

func nodesToIdMap(nodes []Node) (nodeMap map[int32]Node) {
nodeMap = make(map[int32]Node)
for _, node := range nodes {
nodeMap[node.Id] = node
}
return
}
46 changes: 46 additions & 0 deletions api/v1alpha1/nificluster_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package v1alpha1

import (
"testing"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestGetCreationTimeOrderedNodes(t *testing.T) {
cluster := &NifiCluster{
Spec: NifiClusterSpec{
Nodes: []Node{
{Id: 2, NodeConfigGroup: "scale-group", Labels: map[string]string{"scale_me": "true"}},
{Id: 3, NodeConfigGroup: "scale-group", Labels: map[string]string{"scale_me": "true"}},
{Id: 4, NodeConfigGroup: "scale-group", Labels: map[string]string{"scale_me": "true"}},
{Id: 5, NodeConfigGroup: "other-group", Labels: map[string]string{"other_group": "true"}},
},
},
Status: NifiClusterStatus{
NodesState: map[string]NodeState{
"2": {
CreationTime: v1.NewTime(time.Now().UTC().Add(time.Duration(5) * time.Hour)),
},
"3": {
CreationTime: v1.NewTime(time.Now().UTC().Add(time.Duration(15) * time.Hour)),
},
"4": {
CreationTime: v1.NewTime(time.Now().UTC().Add(time.Duration(10) * time.Hour)),
},
"5": {
CreationTime: v1.NewTime(time.Now().UTC().Add(time.Duration(20) * time.Hour)),
},
},
},
}

nodeList := cluster.GetCreationTimeOrderedNodes()

if len(nodeList) != 4 {
t.Errorf("Incorrect node list: %v+", nodeList)
}
if nodeList[0].Id != 2 || nodeList[1].Id != 4 || nodeList[2].Id != 3 || nodeList[3].Id != 5 {
t.Errorf("Incorrect node list: %v+", nodeList)
}
}
Loading