Skip to content

Commit

Permalink
style: fix mesos-datawatch code style, issue #71 #72
Browse files Browse the repository at this point in the history
  • Loading branch information
DeveloperJim committed Aug 15, 2019
1 parent f2a012b commit caf3a76
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 34 deletions.
6 changes: 4 additions & 2 deletions bcs-mesos/bcs-mesos-watch/cluster/mesos/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import (
schedulertypes "bk-bcs/bcs-mesos/bcs-scheduler/src/types"
"encoding/json"
"fmt"
"github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
"path"
"reflect"
"strconv"
"sync"
"time"

"github.com/samuel/go-zookeeper/zk"
"golang.org/x/net/context"
)

//NSControlInfo store all app info under one namespace
Expand Down Expand Up @@ -376,6 +377,7 @@ func (app *AppWatch) UpdateEvent(old, cur interface{}, force bool) {
app.report.ReportData(data)
}

//GetApplicationChannel get distribution channel for Application
func (app *AppWatch) GetApplicationChannel(application *schedulertypes.Application) string {
index := util.GetHashId(application.ID, ApplicationThreadNum)

Expand Down
10 changes: 8 additions & 2 deletions bcs-mesos/bcs-mesos-watch/cluster/mesos/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ import (
"bk-bcs/bcs-common/pkg/cache"
"bk-bcs/bcs-mesos/bcs-mesos-watch/cluster"
"bk-bcs/bcs-mesos/bcs-mesos-watch/types"

//schedulertypes "bk-bcs/bcs-mesos/bcs-scheduler/src/types"
"bk-bcs/bcs-common/common/blog"
commtypes "bk-bcs/bcs-common/common/types"
"encoding/json"
"fmt"
"golang.org/x/net/context"
"reflect"
"sync"
"time"

"golang.org/x/net/context"
)

//NSControlInfo store all app info under one namespace
Expand All @@ -35,12 +37,14 @@ import (
// cancel context.CancelFunc //for cancel sub goroutine
//}

//ConfigMapInfo wrapper for BCS ConfigMap
type ConfigMapInfo struct {
data *commtypes.BcsConfigMap
syncTime int64
reportTime int64
}

//NewConfigMapWatch create watch for BCS ConfigMap
func NewConfigMapWatch(cxt context.Context, client ZkClient, reporter cluster.Reporter, watchPath string) *ConfigMapWatch {

keyFunc := func(data interface{}) (string, error) {
Expand Down Expand Up @@ -70,6 +74,7 @@ func NewConfigMapWatch(cxt context.Context, client ZkClient, reporter cluster.Re
}
}

//ConfigMapWatch watch for configmap, watch all detail and store to local cache
type ConfigMapWatch struct {
eventLock sync.Mutex //lock for event
report cluster.Reporter //reporter
Expand All @@ -80,7 +85,7 @@ type ConfigMapWatch struct {
watchPath string
}

//to add path and node watch
//Work to add path and node watch
func (watch *ConfigMapWatch) Work() {
watch.ProcessAllConfigmaps()
tick := time.NewTicker(12 * time.Second)
Expand All @@ -96,6 +101,7 @@ func (watch *ConfigMapWatch) Work() {
}
}

//ProcessAllConfigmaps handle all configmap under all namespace
func (watch *ConfigMapWatch) ProcessAllConfigmaps() error {

currTime := time.Now().Unix()
Expand Down
9 changes: 8 additions & 1 deletion bcs-mesos/bcs-mesos-watch/cluster/mesos/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ import (
"bk-bcs/bcs-common/pkg/cache"
"bk-bcs/bcs-mesos/bcs-mesos-watch/cluster"
"bk-bcs/bcs-mesos/bcs-mesos-watch/types"

//schedulertypes "bk-bcs/bcs-mesos/bcs-scheduler/src/types"
"bk-bcs/bcs-common/common/blog"
schedulertypes "bk-bcs/bcs-mesos/bcs-scheduler/src/types"
"encoding/json"
"fmt"
"golang.org/x/net/context"
"reflect"
"sync"
"time"

"golang.org/x/net/context"
)

//NSControlInfo store all app info under one namespace
Expand All @@ -35,12 +37,14 @@ import (
// cancel context.CancelFunc //for cancel sub goroutine
//}

//DeploymentInfo wrapper for BCS Deployment
type DeploymentInfo struct {
data *schedulertypes.Deployment
syncTime int64
reportTime int64
}

//NewDeploymentWatch create deployment watch
func NewDeploymentWatch(cxt context.Context, client ZkClient, reporter cluster.Reporter, watchPath string) *DeploymentWatch {

keyFunc := func(data interface{}) (string, error) {
Expand Down Expand Up @@ -70,6 +74,7 @@ func NewDeploymentWatch(cxt context.Context, client ZkClient, reporter cluster.R
}
}

//DeploymentWatch watch all deployment data and store to local cache
type DeploymentWatch struct {
eventLock sync.Mutex //lock for event
report cluster.Reporter //reporter
Expand All @@ -80,6 +85,7 @@ type DeploymentWatch struct {
watchPath string
}

//Work to add path and node watch
func (watch *DeploymentWatch) Work() {
watch.ProcessAllDeployments()
tick := time.NewTicker(10 * time.Second)
Expand All @@ -95,6 +101,7 @@ func (watch *DeploymentWatch) Work() {
}
}

//ProcessAllDeployments handle all namespace deployment data
func (watch *DeploymentWatch) ProcessAllDeployments() error {

currTime := time.Now().Unix()
Expand Down
8 changes: 7 additions & 1 deletion bcs-mesos/bcs-mesos-watch/cluster/mesos/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ import (
"bk-bcs/bcs-mesos/bcs-mesos-watch/types"
"encoding/json"
"fmt"
"golang.org/x/net/context"
"reflect"
"sync"
"time"

"golang.org/x/net/context"
)

//EndpointInfo wrapper for BCSEndpoint
type EndpointInfo struct {
data *commtypes.BcsEndpoint
syncTime int64
reportTime int64
}

//NewEndpointWatch create endpoint watch
func NewEndpointWatch(cxt context.Context, client ZkClient, reporter cluster.Reporter, watchPath string) *EndpointWatch {

keyFunc := func(data interface{}) (string, error) {
Expand All @@ -52,6 +55,7 @@ func NewEndpointWatch(cxt context.Context, client ZkClient, reporter cluster.Rep
}
}

//EndpointWatch watch for Endpoint and store all datas to local cache
type EndpointWatch struct {
eventLock sync.Mutex //lock for event
report cluster.Reporter //reporter
Expand All @@ -61,6 +65,7 @@ type EndpointWatch struct {
watchPath string
}

//Work handle all Endpoint datas periodically
func (watch *EndpointWatch) Work() {
watch.ProcessAllEndpoints()
tick := time.NewTicker(10 * time.Second)
Expand All @@ -76,6 +81,7 @@ func (watch *EndpointWatch) Work() {
}
}

//ProcessAllEndpoints handle all namespace Endpoint data
func (watch *EndpointWatch) ProcessAllEndpoints() error {

currTime := time.Now().Unix()
Expand Down
37 changes: 24 additions & 13 deletions bcs-mesos/bcs-mesos-watch/cluster/mesos/exportservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@ import (
schedtypes "bk-bcs/bcs-mesos/bcs-scheduler/src/types"
"encoding/json"
"fmt"
"golang.org/x/net/context"
"reflect"
"strconv"
"strings"
"time"

"golang.org/x/net/context"
)

const (
networkHost = "host"
networkBridge = "bridge"
networkTypeCNM = "cnm"
)

//NewExportServiceWatch create export service watch
Expand Down Expand Up @@ -63,6 +70,7 @@ func esInfoKeyFunc(data interface{}) (string, error) {
return esInfo.bcsService.ObjectMeta.NameSpace + "." + esInfo.bcsService.ObjectMeta.Name, nil
}

//ExportServiceInfo wrapper for ServiceInfo
type ExportServiceInfo struct {
bcsService *commtypes.BcsService
exportService *lbtypes.ExportService
Expand Down Expand Up @@ -438,18 +446,18 @@ func (watch *ExportServiceWatch) addTaskGroup(tskgroup *schedtypes.TaskGroup) {
key, oneTask.ID, err.Error())
continue
}
if strings.ToLower(oneTask.Network) == "host" {
if strings.ToLower(oneTask.Network) == networkHost {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.ContainerPort)
} else if strings.ToLower(oneTask.Network) == "bridge" {
} else if strings.ToLower(oneTask.Network) == networkBridge {
if onePort.HostPort > 0 {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.HostPort)
} else {
backend.TargetIP = bcsInfo.IPAddress
backend.TargetPort = int(onePort.ContainerPort)
}
} else if strings.ToLower(oneTask.NetworkType) == "cnm" {
} else if strings.ToLower(oneTask.NetworkType) == networkTypeCNM {
if onePort.HostPort > 0 {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.HostPort)
Expand Down Expand Up @@ -534,18 +542,18 @@ func (watch *ExportServiceWatch) updateTaskGroup(tskgroup *schedtypes.TaskGroup)
key, oneTask.ID, err.Error())
continue
}
if strings.ToLower(oneTask.Network) == "host" {
if strings.ToLower(oneTask.Network) == networkHost {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.ContainerPort)
} else if strings.ToLower(oneTask.Network) == "bridge" {
} else if strings.ToLower(oneTask.Network) == networkBridge {
if onePort.HostPort > 0 {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.HostPort)
} else {
backend.TargetIP = bcsInfo.IPAddress
backend.TargetPort = int(onePort.ContainerPort)
}
} else if strings.ToLower(oneTask.NetworkType) == "cnm" {
} else if strings.ToLower(oneTask.NetworkType) == networkTypeCNM {
if onePort.HostPort > 0 {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.HostPort)
Expand Down Expand Up @@ -635,18 +643,18 @@ func (watch *ExportServiceWatch) deleteTaskGroup(tskgroup *schedtypes.TaskGroup)
key, oneTask.ID, err.Error())
continue
}
if strings.ToLower(oneTask.Network) == "host" {
if strings.ToLower(oneTask.Network) == networkHost {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.ContainerPort)
} else if strings.ToLower(oneTask.Network) == "bridge" {
} else if strings.ToLower(oneTask.Network) == networkBridge {
if onePort.HostPort > 0 {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.HostPort)
} else {
backend.TargetIP = bcsInfo.IPAddress
backend.TargetPort = int(onePort.ContainerPort)
}
} else if strings.ToLower(oneTask.NetworkType) == "cnm" {
} else if strings.ToLower(oneTask.NetworkType) == networkTypeCNM {
if onePort.HostPort > 0 {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.HostPort)
Expand Down Expand Up @@ -738,6 +746,7 @@ func (watch *ExportServiceWatch) UpdateEvent(obj interface{}) {
watch.report.ReportData(sync)
}

//GetExportserviceChannel get channel for dispatch
func (watch *ExportServiceWatch) GetExportserviceChannel(exportservice *lbtypes.ExportService) string {

index := util.GetHashId(exportservice.ServiceName, ExportserviceThreadNum)
Expand All @@ -746,6 +755,7 @@ func (watch *ExportServiceWatch) GetExportserviceChannel(exportservice *lbtypes.

}

//SyncExportServiceBackends export service backend synchronization
func (watch *ExportServiceWatch) SyncExportServiceBackends(esInfo *ExportServiceInfo) error {
basePath := fmt.Sprintf("%s/application/%s", watch.basePath, esInfo.exportService.Namespace)
blog.Info("sync all taskgroups under(%s)", basePath)
Expand Down Expand Up @@ -818,6 +828,7 @@ func (watch *ExportServiceWatch) SyncExportServiceBackends(esInfo *ExportService
return nil
}

//SyncEpTaskgroupBackend convert taskgroup to exportservice endpoint
func (watch *ExportServiceWatch) SyncEpTaskgroupBackend(esInfo *ExportServiceInfo, taskgroup *schedtypes.TaskGroup) error {
if taskgroup.Status != schedtypes.TASKGROUP_STATUS_RUNNING && taskgroup.Status != schedtypes.TASKGROUP_STATUS_LOST {
blog.V(3).Infof("ExportServiceWatch receive taskgroup add event, TaskGroup %s status %s, do nothing ", taskgroup.ID, taskgroup.Status)
Expand Down Expand Up @@ -859,13 +870,13 @@ func (watch *ExportServiceWatch) SyncEpTaskgroupBackend(esInfo *ExportServiceInf
}

//container docker host network, docker run --net=host
if strings.ToLower(oneTask.Network) == "host" {
if strings.ToLower(oneTask.Network) == networkHost {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.ContainerPort)
blog.V(3).Infof("ExportServiceWatch: service (%s %s) backend targetip %s targetport %d",
esInfo.exportService.Namespace, esInfo.exportService.ServiceName, backend.TargetIP, backend.TargetPort)
//container docker bridge network, docker run --net=bridge
} else if strings.ToLower(oneTask.Network) == "bridge" {
} else if strings.ToLower(oneTask.Network) == networkBridge {
if onePort.HostPort > 0 {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.HostPort)
Expand All @@ -876,7 +887,7 @@ func (watch *ExportServiceWatch) SyncEpTaskgroupBackend(esInfo *ExportServiceInf
blog.V(3).Infof("ExportServiceWatch: service (%s %s) backend targetip %s targetport %d",
esInfo.exportService.Namespace, esInfo.exportService.ServiceName, backend.TargetIP, backend.TargetPort)
//container docker user defined network, docker run --net=mynetwork
} else if strings.ToLower(oneTask.NetworkType) == "cnm" {
} else if strings.ToLower(oneTask.NetworkType) == networkTypeCNM {
if onePort.HostPort > 0 {
backend.TargetIP = bcsInfo.NodeAddress
backend.TargetPort = int(onePort.HostPort)
Expand Down
Loading

0 comments on commit caf3a76

Please sign in to comment.