Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fea clb controller sts mesos #670

Merged
merged 8 commits into from
Nov 24, 2020
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ network:pre
clb-controller:pre
mkdir -p ${PACKAGEPATH}/bcs-services/bcs-clb-controller
cp -R ./install/conf/bcs-services/bcs-clb-controller ${PACKAGEPATH}/bcs-services
cp ./bcs-services/bcs-clb-controller/docker/Dockerfile ${PACKAGEPATH}/bcs-services/bcs-clb-controller/Dockerfile.old
go build ${LDFLAG} -o ${PACKAGEPATH}/bcs-services/bcs-clb-controller/bcs-clb-controller ./bcs-services/bcs-clb-controller/main.go
cp ${PACKAGEPATH}/bcs-services/bcs-clb-controller/bcs-clb-controller ${PACKAGEPATH}/bcs-services/bcs-clb-controller/clb-controller

cpuset:pre
mkdir -p ${PACKAGEPATH}/bcs-services/bcs-cpuset-device
Expand Down
67 changes: 50 additions & 17 deletions bcs-common/common/storage/watch/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,55 @@ package watch
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"

"github.com/Tencent/bk-bcs/bcs-common/common/codec"
"github.com/Tencent/bk-bcs/bcs-common/common/http/httpclient"
"github.com/Tencent/bk-bcs/bcs-common/common/types"
"github.com/Tencent/bk-bcs/bcs-services/bcs-storage/storage/errors"
"github.com/Tencent/bk-bcs/bcs-services/bcs-storage/storage/operator"
)

// Get a new Watcher with empty WatchOptions
// New get a new Watcher with empty WatchOptions
func New(client *httpclient.HttpClient) *Watcher {
return NewWithOption(&operator.WatchOptions{}, client)
return NewWithOption(&types.WatchOptions{}, client)
}

// Get a new Watcher with provided WatchOptions
func NewWithOption(opts *operator.WatchOptions, client *httpclient.HttpClient) *Watcher {
// EventType event type
type EventType int32

const (
// Nop no operation event
Nop EventType = iota
// Add add event
Add
// Del delete event
Del
// Chg change event
Chg
// SChg self change event
SChg
// Brk event
Brk EventType = -1
)

// Event event of watch
type Event struct {
Type EventType `json:"type"`
Value operator.M `json:"value"`
}

var (
// EventWatchBreak watch break event
EventWatchBreak = &Event{Type: Brk, Value: nil}
// EventWatchBreakBytes watch break event content
EventWatchBreakBytes, _ = json.Marshal(EventWatchBreak)
)

// NewWithOption get a new Watcher with provided WatchOptions
func NewWithOption(opts *types.WatchOptions, client *httpclient.HttpClient) *Watcher {
return &Watcher{
opts: opts,
client: client,
Expand All @@ -42,14 +75,14 @@ func NewWithOption(opts *operator.WatchOptions, client *httpclient.HttpClient) *
type Watcher struct {
client *httpclient.HttpClient

opts *operator.WatchOptions
storageUrl []string
opts *types.WatchOptions
storageURL []string
ctx context.Context
cancel context.CancelFunc
closed bool

resp *http.Response
event *operator.Event
event *Event
err error

nextSignal chan struct{}
Expand All @@ -62,10 +95,10 @@ func (w *Watcher) Connect(storageURL []string) (err error) {
return errors.EventWatchAlreadyConnect
}

w.storageUrl = storageURL
w.storageURL = storageURL

if w.opts == nil {
w.opts = &operator.WatchOptions{}
w.opts = &types.WatchOptions{}
}

if err = w.connect(); err != nil {
Expand All @@ -87,7 +120,7 @@ func (w *Watcher) connect() (err error) {
return
}

for _, u := range w.storageUrl {
for _, u := range w.storageURL {
r, err := http.NewRequest("POST", u, body)
if err != nil {
continue
Expand All @@ -110,13 +143,13 @@ func (w *Watcher) watching() {
case <-w.ctx.Done():
return
case <-w.nextSignal:
w.event = new(operator.Event)
w.event = new(Event)
if w.err = codec.DecJsonReader(w.resp.Body, w.event); w.err == io.ErrUnexpectedEOF && !w.closed {
if w.err = w.connect(); w.err != nil {
w.event = operator.EventWatchBreak
w.event = EventWatchBreak
}
}
if w.event.Type == operator.Nop {
if w.event.Type == Nop {
w.Close()
return
}
Expand All @@ -125,7 +158,7 @@ func (w *Watcher) watching() {
}
}

// Stop watch and close the connection. It must be stop watch first then close connection,
// Close stop watch and close the connection. It must be stop watch first then close connection,
// because watching() will check if the watch is stop after get a EOF error and if not it will reconnect.
func (w *Watcher) Close() {
w.cancel()
Expand All @@ -137,12 +170,12 @@ func (w *Watcher) Close() {
}
}

// Get next event
func (w *Watcher) Next() (*operator.Event, error) {
// Next get next event
func (w *Watcher) Next() (*Event, error) {
w.nextSignal <- struct{}{}
select {
case <-w.ctx.Done():
return operator.EventWatchBreak, nil
return EventWatchBreak, nil
case <-w.receiveSignal:
return w.event, w.err
}
Expand Down
27 changes: 24 additions & 3 deletions bcs-common/common/types/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package types
import (
"encoding/base64"
"encoding/json"
"time"

"github.com/Tencent/bk-bcs/bcs-common/common/codec"
)

Expand Down Expand Up @@ -122,6 +124,7 @@ type BcsStorageRenderIf struct {
User string `json:"user"`
}

// Gen generate string
func (render *BcsStorageRenderIf) Gen() (r string, err error) {
s, err := json.Marshal(render)
if err != nil {
Expand All @@ -130,6 +133,7 @@ func (render *BcsStorageRenderIf) Gen() (r string, err error) {
return base64.StdEncoding.EncodeToString(s), nil
}

// GetData get data
func (render *BcsStorageRenderIf) GetData() (dc *DeployConfig, err error) {
var tmp []byte
if err = codec.EncJson(render.Data, &tmp); err != nil {
Expand Down Expand Up @@ -159,19 +163,36 @@ type BcsStorageMetricIf struct {
Data interface{} `json:"data"`
}

// BcsStorageClusterSettingsIf define storage cluster-ip relationship interface data interaction
// BcsStorageClusterRelationIf define storage cluster-ip relationship interface data interaction
type BcsStorageClusterRelationIf struct {
Ips []string `json:"ips"`
}

// BcsHostIf define storage set host config interface data interaction
// BcsStorageHostIf define storage set host config interface data interaction
type BcsStorageHostIf struct {
Ip string `json:"ip"`
ClusterId string `json:"clusterId"`
Data interface{} `json:"data"`
}

// BcsStableVersionIf define storage stableVersion interface data interaction
// BcsStorageStableVersionIf define storage stableVersion interface data interaction
type BcsStorageStableVersionIf struct {
Version string `json:"version"`
}

// WatchOptions watch options
type WatchOptions struct {
// Only watch the node itself, including children added, children removed and node value change.
// Will not receive existing children's event.
SelfOnly bool `json:"selfOnly"`

// Max time of events will received. Watch will be ended after the last event. 0 for infinity.
MaxEvents uint `json:"maxEvents"`

// The max waiting time of each event. Watch will be ended after timeout. 0 for no limit.
Timeout time.Duration `json:"timeout"`

// The value-change event will be checked if it's different from last status. If not then this event
// will be ignored. And it will not trigger timeout reset.
MustDiff string `json:"mustDiff"`
}
4 changes: 2 additions & 2 deletions bcs-network/qcloud-eip/eip/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func getMaxPrivateIPNumPerENI(cores, mem int) int {
if cores == 2 {
return 10
}
if cores == 4 && mem < 16 {
if cores == 4 && mem <= 16 {
return 10
}
if cores == 4 && mem > 16 {
Expand All @@ -129,7 +129,7 @@ func getMaxENINumPerCVM(cores, mem int) int {
if cores == 2 {
return 2
}
if cores == 4 && mem < 16 {
if cores == 4 && mem <= 16 {
return 4
}
if cores == 4 && mem > 16 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,10 @@ func (clbRule *ClbRule) ToString() string {

// ClbStatefulSetHttPRule http rule for stateful set
type ClbStatefulSetHttpRule struct {
StartPort int `json:"startPort"`
StartIndex int `json:"startIndex,omitempty"`
EndIndex int `json:"endIndex,omitempty"`
StartPort int `json:"startPort"`
StartIndex int `json:"startIndex,omitempty"`
EndIndex int `json:"endIndex,omitempty"`
SegmentLength int `json:"segmentLength,omitempty"`
ClbHttpRule
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ type KubeRegistry struct {
}

// NewKubeRegistry create new registry for clb ingress
func NewKubeRegistry(clbname string, informer informerv1.ClbIngressInformer, lister listerv1.ClbIngressLister, client ingressClientV1.ClbV1Interface) (clbingress.Registry, error) {
func NewKubeRegistry(
clbname string,
informer informerv1.ClbIngressInformer,
lister listerv1.ClbIngressLister,
client ingressClientV1.ClbV1Interface) (clbingress.Registry, error) {

return &KubeRegistry{
clbName: clbname,
Expand Down Expand Up @@ -71,7 +75,8 @@ func (kr *KubeRegistry) ListIngresses() ([]*ingressv1.ClbIngress, error) {
blog.V(5).Infof("index: %d ingress for clb %s\n ingress: %v", index, kr.clbName, ingress)
}
// get ingresses for all clb
requirementIngressForAll, err := labels.NewRequirement("bmsf.tencent.com/clbname", selection.Equals, []string{"all"})
requirementIngressForAll, err := labels.NewRequirement(
"bmsf.tencent.com/clbname", selection.Equals, []string{"all"})
if err != nil {
return nil, fmt.Errorf("create requirement of clb ingress for all clb failed, err %s", err.Error())
}
Expand Down
Loading