Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick yurthub code of pool-coordinator-dev #1156

Merged
merged 20 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
16a399e
add yurthub leader election and coordinator framework (#1035)
rambohe-ch Nov 10, 2022
3e69468
pool-coordinator implementation of yurthub (#1073)
Congrool Dec 7, 2022
0e6b9d6
generate coordinator client and pass through SubjectAccessReview acco…
Congrool Jan 4, 2023
7ae8fbb
Fix (#1125)
LaurenceLiZhixin Jan 4, 2023
2bff621
set isPoolCacheSynced as true if lease updated (#1131)
Congrool Jan 10, 2023
39029b3
bugfix: do not send pool-scoped resource list/watch request to pool-c…
Congrool Jan 11, 2023
0a8659c
Fix: pool-coordinator (#1126)
LaurenceLiZhixin Jan 11, 2023
2c1d57b
Bugfix of pool-coordinator and enable unit test to pass (#1137)
Congrool Jan 11, 2023
948ce9f
bugfix: return fake getter if poolcoordinator is not enabled (#1138)
Congrool Jan 12, 2023
ce76fda
mocked etcd client should not use copied locks (#1139)
Congrool Jan 12, 2023
512cec6
add unit test for poolcoordinator cert manager (#1140)
Congrool Jan 12, 2023
edf7ccd
Fix (#1141)
LaurenceLiZhixin Jan 12, 2023
68b3de9
fix coordinator sync problem and update ready condition (#1142)
Congrool Jan 12, 2023
157ac45
Fix: dns bug for endpointslices v1beta1 version (#1144)
LaurenceLiZhixin Jan 13, 2023
7886dbc
Fix: set inElecting to false when handling follow hub, and add metric…
LaurenceLiZhixin Jan 13, 2023
5781203
fix coredns cannot get /apis/discovery.k8s.io/v1 if cluster does not …
Congrool Jan 16, 2023
735b40a
Fix: add PoolScopeResource validation and dynamic configuration suppo…
LaurenceLiZhixin Jan 17, 2023
e84c4f6
bugfix: ReplaceComponentList should not delete resources of other gvr…
Congrool Jan 18, 2023
9bc4d1c
fix go.mod
Congrool Jan 17, 2023
634409d
fix unit test of yurthub options
Congrool Jan 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions charts/openyurt/templates/yurt-controller-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ spec:
spec:
serviceAccountName: yurt-controller-manager
hostNetwork: true
imagePullSecrets:
- name: regsecret
tolerations:
- operator: "Exists"
- operator: "Exists"
affinity:
nodeAffinity:
# we prefer allocating ecm on cloud node
Expand All @@ -180,4 +182,4 @@ spec:
- yurt-controller-manager
{{- if .Values.imagePullSecrets }}
imagePullSecrets: {{ toYaml .Values.imagePullSecrets | nindent 8 }}
{{- end }}
{{- end }}
41 changes: 35 additions & 6 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"net/url"
"path/filepath"
"strings"
"time"

Expand All @@ -38,6 +39,7 @@ import (
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/cmd/yurthub/app/options"
Expand Down Expand Up @@ -89,6 +91,16 @@ type YurtHubConfiguration struct {
YurtHubProxyServerServing *apiserver.DeprecatedInsecureServingInfo
YurtHubDummyProxyServerServing *apiserver.DeprecatedInsecureServingInfo
YurtHubSecureProxyServerServing *apiserver.SecureServingInfo
YurtHubProxyServerAddr string
ProxiedClient kubernetes.Interface
DiskCachePath string
CoordinatorPKIDir string
EnableCoordinator bool
CoordinatorServerURL *url.URL
CoordinatorStoragePrefix string
CoordinatorStorageAddr string // ip:port
CoordinatorClient kubernetes.Interface
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand All @@ -98,6 +110,14 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
return nil, err
}

var coordinatorServerURL *url.URL
if options.EnableCoordinator {
coordinatorServerURL, err = url.Parse(options.CoordinatorServerAddr)
if err != nil {
return nil, err
}
}

storageManager, err := disk.NewDiskStorage(options.DiskCachePath)
if err != nil {
klog.Errorf("could not create storage manager, %v", err)
Expand All @@ -112,7 +132,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
}

workingMode := util.WorkingMode(options.WorkingMode)
sharedFactory, yurtSharedFactory, err := createSharedInformers(fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort), options.EnableNodePool)
proxiedClient, sharedFactory, yurtSharedFactory, err := createClientAndSharedInformers(fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort), options.EnableNodePool)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -145,6 +165,15 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
FilterManager: filterManager,
MinRequestTimeout: options.MinRequestTimeout,
TenantNs: tenantNs,
YurtHubProxyServerAddr: fmt.Sprintf("http://%s:%d", options.YurtHubProxyHost, options.YurtHubProxyPort),
ProxiedClient: proxiedClient,
DiskCachePath: options.DiskCachePath,
CoordinatorPKIDir: filepath.Join(options.RootDir, "poolcoordinator"),
EnableCoordinator: options.EnableCoordinator,
CoordinatorServerURL: coordinatorServerURL,
CoordinatorStoragePrefix: options.CoordinatorStoragePrefix,
CoordinatorStorageAddr: options.CoordinatorStorageAddr,
LeaderElection: options.LeaderElection,
}

certMgr, err := createCertManager(options, us)
Expand Down Expand Up @@ -200,18 +229,18 @@ func parseRemoteServers(serverAddr string) ([]*url.URL, error) {
}

// createSharedInformers create sharedInformers from the given proxyAddr.
func createSharedInformers(proxyAddr string, enableNodePool bool) (informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) {
func createClientAndSharedInformers(proxyAddr string, enableNodePool bool) (kubernetes.Interface, informers.SharedInformerFactory, yurtinformers.SharedInformerFactory, error) {
var kubeConfig *rest.Config
var yurtClient yurtclientset.Interface
var err error
kubeConfig, err = clientcmd.BuildConfigFromFlags(proxyAddr, "")
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

client, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

fakeYurtClient := &fake.Clientset{}
Expand All @@ -222,11 +251,11 @@ func createSharedInformers(proxyAddr string, enableNodePool bool) (informers.Sha
if enableNodePool {
yurtClient, err = yurtclientset.NewForConfig(kubeConfig)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}

return informers.NewSharedInformerFactory(client, 24*time.Hour),
return client, informers.NewSharedInformerFactory(client, 24*time.Hour),
yurtinformers.NewSharedInformerFactory(yurtClient, 24*time.Hour), nil
}

Expand Down
53 changes: 53 additions & 0 deletions cmd/yurthub/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import (
"time"

"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection/resourcelock"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"

Expand Down Expand Up @@ -76,6 +79,11 @@ type YurtHubOptions struct {
CACertHashes []string
UnsafeSkipCAVerification bool
ClientForTest kubernetes.Interface
EnableCoordinator bool
CoordinatorServerAddr string
CoordinatorStoragePrefix string
CoordinatorStorageAddr string
LeaderElection componentbaseconfig.LeaderElectionConfiguration
}

// NewYurtHubOptions creates a new YurtHubOptions with a default config.
Expand Down Expand Up @@ -109,6 +117,18 @@ func NewYurtHubOptions() *YurtHubOptions {
MinRequestTimeout: time.Second * 1800,
CACertHashes: make([]string, 0),
UnsafeSkipCAVerification: true,
CoordinatorServerAddr: fmt.Sprintf("https://%s:%s", util.DefaultPoolCoordinatorAPIServerSvcName, util.DefaultPoolCoordinatorAPIServerSvcPort),
CoordinatorStorageAddr: fmt.Sprintf("https://%s:%s", util.DefaultPoolCoordinatorEtcdSvcName, util.DefaultPoolCoordinatorEtcdSvcPort),
CoordinatorStoragePrefix: "/registry",
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceName: projectinfo.GetHubName(),
ResourceNamespace: "kube-system",
},
}
return o
}
Expand Down Expand Up @@ -182,6 +202,39 @@ func (o *YurtHubOptions) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&o.MinRequestTimeout, "min-request-timeout", o.MinRequestTimeout, "An optional field indicating at least how long a proxy handler must keep a request open before timing it out. Currently only honored by the local watch request handler(use request parameter timeoutSeconds firstly), which picks a randomized value above this number as the connection timeout, to spread out load.")
fs.StringSliceVar(&o.CACertHashes, "discovery-token-ca-cert-hash", o.CACertHashes, "For token-based discovery, validate that the root CA public key matches this hash (format: \"<type>:<value>\").")
fs.BoolVar(&o.UnsafeSkipCAVerification, "discovery-token-unsafe-skip-ca-verification", o.UnsafeSkipCAVerification, "For token-based discovery, allow joining without --discovery-token-ca-cert-hash pinning.")
fs.BoolVar(&o.EnableCoordinator, "enable-coordinator", o.EnableCoordinator, "make yurthub aware of the pool coordinator")
fs.StringVar(&o.CoordinatorServerAddr, "coordinator-server-addr", o.CoordinatorServerAddr, "Coordinator APIServer address in format https://host:port")
fs.StringVar(&o.CoordinatorStoragePrefix, "coordinator-storage-prefix", o.CoordinatorStoragePrefix, "Pool-Coordinator etcd storage prefix, same as etcd-prefix of Kube-APIServer")
fs.StringVar(&o.CoordinatorStorageAddr, "coordinator-storage-addr", o.CoordinatorStorageAddr, "Address of Pool-Coordinator etcd, in the format host:port")
bindFlags(&o.LeaderElection, fs)
}

// bindFlags binds the LeaderElectionConfiguration struct fields to a flagset
func bindFlags(l *componentbaseconfig.LeaderElectionConfiguration, fs *pflag.FlagSet) {
fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+
"Start a leader election client and gain leadership based on pool coordinator")
fs.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
fs.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+
"The interval between attempts by the acting master to renew a leadership slot "+
"before it stops leading. This must be less than or equal to the lease duration. "+
"This is only applicable if leader election is enabled.")
fs.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
fs.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+
"The type of resource object that is used for locking during "+
"leader election. Supported options are `leases` (default), `endpoints` and `configmaps`.")
fs.StringVar(&l.ResourceName, "leader-elect-resource-name", l.ResourceName, ""+
"The name of resource object that is used for locking during "+
"leader election.")
fs.StringVar(&l.ResourceNamespace, "leader-elect-resource-namespace", l.ResourceNamespace, ""+
"The namespace of resource object that is used for locking during "+
"leader election.")
}

// verifyDummyIP verify the specified ip is valid or not and set the default ip if empty
Expand Down
15 changes: 15 additions & 0 deletions cmd/yurthub/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"time"

"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/leaderelection/resourcelock"
componentbaseconfig "k8s.io/component-base/config"

"github.com/openyurtio/openyurt/pkg/projectinfo"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/disk"
Expand Down Expand Up @@ -60,6 +63,18 @@ func TestNewYurtHubOptions(t *testing.T) {
MinRequestTimeout: time.Second * 1800,
CACertHashes: make([]string, 0),
UnsafeSkipCAVerification: true,
CoordinatorServerAddr: fmt.Sprintf("https://%s:%s", util.DefaultPoolCoordinatorAPIServerSvcName, util.DefaultPoolCoordinatorAPIServerSvcPort),
CoordinatorStorageAddr: fmt.Sprintf("https://%s:%s", util.DefaultPoolCoordinatorEtcdSvcName, util.DefaultPoolCoordinatorEtcdSvcPort),
CoordinatorStoragePrefix: "/registry",
LeaderElection: componentbaseconfig.LeaderElectionConfiguration{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
ResourceLock: resourcelock.LeasesResourceLock,
ResourceName: projectinfo.GetHubName(),
ResourceNamespace: "kube-system",
},
}

options := NewYurtHubOptions()
Expand Down
Loading