Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev berg #31

Merged
merged 8 commits into from
May 29, 2019
28 changes: 28 additions & 0 deletions bcs-mesos/bcs-container-executor/app/bcs_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,9 @@ func (executor *BcsExecutor) LaunchTaskGroup(driver exec.ExecutorDriver, taskGro
driver.Stop()
}
}()
go executor.watchStartingTask()
logs.Infof("BcsExecutor init pod success.")

if setupErr := executor.netManager.SetUpPod(executor.podInst); setupErr != nil {
executor.updateTaskGroup(driver, taskGroup, mesos.TaskState_TASK_FAILED, "Pod Setup failed: "+setupErr.Error())
executor.status = ExecutorStatus_SHUTDOWN
Expand All @@ -353,6 +355,7 @@ func (executor *BcsExecutor) LaunchTaskGroup(driver exec.ExecutorDriver, taskGro
return
}
logs.Infof("BcsExecutor Setup pod network success.")

if startErr := executor.podInst.Start(); startErr != nil {
executor.updateTaskGroup(driver, taskGroup, mesos.TaskState_TASK_FAILED, "Pod Start failed: "+startErr.Error())
executor.status = ExecutorStatus_SHUTDOWN
Expand All @@ -362,6 +365,7 @@ func (executor *BcsExecutor) LaunchTaskGroup(driver exec.ExecutorDriver, taskGro
return
}
logs.Infof("BcsExecutor start pod success. update local container info, ready to watch Pod status")

executor.podStatus = container.PodStatus_STARTING
executor.updateTaskGroup(driver, taskGroup, mesos.TaskState_TASK_STARTING, "Pod is starting")
//start success, Get container info and reply to scheduler
Expand All @@ -381,6 +385,29 @@ func (executor *BcsExecutor) LaunchTaskGroup(driver exec.ExecutorDriver, taskGro
return
}

func (executor *BcsExecutor) watchStartingTask() {
for {
//time.Sleep(time.Minute)
if executor.podStatus != container.PodStatus_INIT &&
executor.podStatus != container.PodStatus_UNKNOWN {
logs.Infof("pod status %s, then return\n", executor.podStatus)
return
}

for _, task := range executor.podInst.GetContainerTasks() {

taskinfo := executor.tasks.TaskInfo[task.TaskId]
update := &mesos.TaskStatus{
TaskId: taskinfo.GetTaskId(),
State: mesos.TaskState_TASK_STARTING.Enum(),
Message: proto.String(fmt.Sprintf("task is starting")),
Source: mesos.TaskStatus_SOURCE_EXECUTOR.Enum(),
}
executor.driver.SendStatusUpdate(update)
}
}
}

//KillTask kill task by taskId
func (executor *BcsExecutor) KillTask(driver exec.ExecutorDriver, taskID *mesos.TaskID) {
logs.Infof("Kill task %s\n", taskID.GetValue())
Expand Down Expand Up @@ -841,6 +868,7 @@ func (executor *BcsExecutor) customSettingContainer(taskInfo *container.BcsConta
}
//resource info
taskInfo.Resource = dataClass.Resources
taskInfo.LimitResource = dataClass.LimitResources
taskInfo.NetLimit = dataClass.NetLimit
return nil

Expand Down
15 changes: 13 additions & 2 deletions bcs-mesos/bcs-container-executor/container/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (

const (
StopContainerGraceTime = 10
DefaultDockerCpuPeriod = 100000
)

//DockerContainer implement container interface
Expand Down Expand Up @@ -362,6 +363,15 @@ func (docker *DockerContainer) CreateContainer(containerName string, containerTa
}
}

if containerTask.LimitResource != nil && containerTask.LimitResource.Cpus > 0 {
hostConfig.CPUPeriod = DefaultDockerCpuPeriod
hostConfig.CPUQuota = int64(containerTask.LimitResource.Cpus * DefaultDockerCpuPeriod)
}
if containerTask.LimitResource != nil && containerTask.LimitResource.Mem >= 4 {
hostConfig.Memory = int64(containerTask.LimitResource.Mem * 1024 * 1024)
hostConfig.MemorySwap = int64(containerTask.LimitResource.Mem * 1024 * 1024)
}

//done(developerJim): setting portMapping
for key, value := range containerTask.PortBindings {
var tmp struct{}
Expand Down Expand Up @@ -521,8 +531,9 @@ func (docker *DockerContainer) InspectContainer(containerName string) (*BcsConta
func (docker *DockerContainer) PullImage(image string) error {
repo, tag := dockerclient.ParseRepositoryTag(image)
pullOpt := dockerclient.PullImageOptions{
Repository: repo,
Tag: tag,
Repository: repo,
Tag: tag,
InactivityTimeout: time.Minute * 3,
}
auth := dockerclient.AuthConfiguration{
Username: docker.user,
Expand Down
3 changes: 2 additions & 1 deletion bcs-mesos/bcs-container-executor/container/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ type BcsContainerTask struct {
PublishAllPorts bool //publish all ports in container
PortBindings map[string]BcsPort //port for container reflection, only useful for docker bridge
Labels []BcsKV //label for container
Resource *bcstypes.Resource //container resource limitation
Resource *bcstypes.Resource //container resource request
LimitResource *bcstypes.Resource // container resource limit
BcsMessages []*bcstypes.BcsMessage //bcs define message
RuntimeConf *BcsContainerInfo //container runtime info
HealthCheck healthcheck.Checker //for health check
Expand Down
19 changes: 16 additions & 3 deletions bcs-mesos/bcs-mesos-driver/mesosdriver/backend/v4http/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,32 @@ func (s *Scheduler) setVersionWithPodSpec(version *types.Version, spec *bcstype.
container := new(types.Container)
c := spec.PodSpec.Containers[i]

if c.Resources.Requests.Cpu == "" && c.Resources.Limits.Cpu != "" {
c.Resources.Requests.Cpu = c.Resources.Limits.Cpu
c.Resources.Requests.Mem = c.Resources.Limits.Mem
c.Resources.Requests.Storage = c.Resources.Limits.Storage
}

container.Type = c.Type
//Resources
container.Resources = new(types.Resource)
container.Resources.Cpus, _ = strconv.ParseFloat(c.Resources.Limits.Cpu, 64)
container.Resources.Mem, _ = strconv.ParseFloat(c.Resources.Limits.Mem, 64)
container.Resources.Disk, _ = strconv.ParseFloat(c.Resources.Limits.Storage, 64)
container.Resources.Cpus, _ = strconv.ParseFloat(c.Resources.Requests.Cpu, 64)
container.Resources.Mem, _ = strconv.ParseFloat(c.Resources.Requests.Mem, 64)
container.Resources.Disk, _ = strconv.ParseFloat(c.Resources.Requests.Storage, 64)

//limit resuroces
container.LimitResoures = new(types.Resource)
container.LimitResoures.Cpus, _ = strconv.ParseFloat(c.Resources.Limits.Cpu, 64)
container.LimitResoures.Mem, _ = strconv.ParseFloat(c.Resources.Limits.Mem, 64)
container.LimitResoures.Disk, _ = strconv.ParseFloat(c.Resources.Limits.Storage, 64)

container.DataClass = &types.DataClass{
Resources: new(types.Resource),
Msgs: []*types.BcsMessage{},
}

container.DataClass.Resources = container.Resources
container.DataClass.LimitResources = container.LimitResoures

//set network flow limit parameters
container.NetLimit = spec.PodSpec.NetLimit
Expand Down
32 changes: 16 additions & 16 deletions bcs-mesos/bcs-scheduler/src/manager/sched/backend/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ func (b *backend) CreateDeployment(deploymentDef *types.DeploymentDef) (int, err
return comm.BcsErrMesosSchedResourceExist, err
}
deployment := types.Deployment{
ObjectMeta: deploymentDef.ObjectMeta,
Selector: deploymentDef.Selector,
Strategy: deploymentDef.Strategy,
Status: types.DEPLOYMENT_STATUS_DEPLOYING,
ObjectMeta: deploymentDef.ObjectMeta,
Selector: deploymentDef.Selector,
Strategy: deploymentDef.Strategy,
Status: types.DEPLOYMENT_STATUS_DEPLOYING,
RawJson: deploymentDef.RawJson,
RawJsonBackup: nil,
}
Expand Down Expand Up @@ -127,7 +127,7 @@ func (b *backend) CreateDeployment(deploymentDef *types.DeploymentDef) (int, err
deployment.Status = types.DEPLOYMENT_STATUS_RUNNING
if err := b.store.SaveDeployment(&deployment); err != nil {
blog.Error("request create deployment: save(%s.%s), err:%s", ns, name, err.Error())
return comm.BcsErrCommCreateZkNodeFail, err
return comm.BcsErrCommCreateZkNodeFail, err
}
} else {
version := deploymentDef.Version
Expand Down Expand Up @@ -184,7 +184,7 @@ func (b *backend) createDeploymentApplication(version *types.Version) (int, erro

if version == nil {
err := errors.New("version data empty, cannot create application")
return comm.BcsErrCommRequestDataErr, err
return comm.BcsErrCommRequestDataErr, err
}
blog.Info("do create deployment application(%s.%s)", version.RunAs, version.ID)
if version.Instances <= 0 {
Expand Down Expand Up @@ -214,7 +214,7 @@ func (b *backend) createDeploymentApplication(version *types.Version) (int, erro
app, err := b.store.FetchApplication(version.RunAs, version.ID)
if err != nil && err != zk.ErrNoNode {
blog.Error("create deployment application, fetch application(%s.%s) ret:%s", version.RunAs, version.ID, err.Error())
return comm.BcsErrCommGetZkNodeFail, err
return comm.BcsErrCommGetZkNodeFail, err
}

if app != nil {
Expand Down Expand Up @@ -244,12 +244,12 @@ func (b *backend) createDeploymentApplication(version *types.Version) (int, erro
if err := b.SaveVersion(version.RunAs, version.ID, version); err != nil {
blog.Error("create deployment application: fail to SaveVersion(%s.%s), err:%s",
version.RunAs, version.ID, err.Error())
return comm.BcsErrCommCreateZkNodeFail, err
return comm.BcsErrCommCreateZkNodeFail, err
}
if err := b.LaunchApplication(version); err != nil {
blog.Error("create deployment application: application(%s.%s) launch error: %s",
version.RunAs, version.ID, err.Error())
return comm.BcsErrMesosSchedCommon, err
return comm.BcsErrMesosSchedCommon, err
}

return comm.BcsSuccess, nil
Expand Down Expand Up @@ -320,7 +320,7 @@ func (b *backend) UpdateDeployment(deployment *types.DeploymentDef) (int, error)
if app.Status != types.APP_STATUS_RUNNING && app.Status != types.APP_STATUS_ABNORMAL {
err = errors.New("deployment bind application is not running, cannot update, please try later")
blog.Warn("update deployment(%s.%s): application under status(%s) can not update", ns, name, app.Status)
return comm.BcsErrMesosSchedCommon, err
return comm.BcsErrMesosSchedCommon, err
}

for k, v := range currDeployment.Selector {
Expand Down Expand Up @@ -367,7 +367,7 @@ func (b *backend) UpdateDeployment(deployment *types.DeploymentDef) (int, error)
}
if version.CheckConstraints() == false {
err := errors.New("constraints error")
return comm.BcsErrCommRequestDataErr, err
return comm.BcsErrCommRequestDataErr, err
}
// lock extension application
b.store.LockApplication(ns + "." + version.ID)
Expand All @@ -376,7 +376,7 @@ func (b *backend) UpdateDeployment(deployment *types.DeploymentDef) (int, error)
app, err = b.store.FetchApplication(version.RunAs, version.ID)
if err != nil && err != zk.ErrNoNode {
blog.Error("update deployment, fetch application(%s.%s) err:%s", version.RunAs, version.ID, err.Error())
return comm.BcsErrCommGetZkNodeFail, err
return comm.BcsErrCommGetZkNodeFail, err
}
if app != nil {
err = errors.New("application already exist")
Expand Down Expand Up @@ -507,8 +507,8 @@ func (b *backend) CancelUpdateDeployment(ns string, name string) error {
ns, name, deployment.ApplicationExt.ApplicationName)
deployment.ApplicationExt = nil
}
if deployment.Application == nil{

if deployment.Application == nil {
blog.Warn("cancelupdate deployment(%s.%s), no bind application", ns, name)
return errors.New("no application to do recover")
}
Expand Down Expand Up @@ -580,7 +580,7 @@ func (b *backend) DeleteDeployment(ns string, name string, enforce bool) (int, e
ns, name, deployment.Application.ApplicationName)
err = b.sched.InnerDeleteApplication(ns, deployment.Application.ApplicationName, enforce)
if err != nil {
blog.Errorf("delete app (%s:%s) error %s", ns, deployment.ApplicationExt.ApplicationName, err.Error())
blog.Errorf("delete app (%s:%s) error %s", ns, deployment.Application.ApplicationName, err.Error())
}
blog.Info("delete deployment(%s.%s), call delete bind application(%s) return",
ns, name, deployment.Application.ApplicationName)
Expand Down Expand Up @@ -660,7 +660,7 @@ func (b *backend) CheckDeleteDeployment(ns string, name string) {
}
return

}
}
time.Sleep(3 * time.Second)
go b.CheckDeleteDeployment(ns, name)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ func (b *backend) RescheduleTaskgroup(taskgroupId string, hostRetainTime int64)
blog.Error("reschedule taskgroup(%s) fail, get application(%s.%s) return nil", taskgroupId, runAs, appID)
return errors.New("Application not found")
}
if app.Status == types.APP_STATUS_OPERATING {
/*if app.Status == types.APP_STATUS_OPERATING {
blog.Warn("reschedule taskgroup(%s) fail, application(%s.%s) status(%s) err", taskgroupId, runAs, appID, app.Status)
return errors.New("Operation Not Allowed")
}
if app.Status == types.APP_STATUS_ROLLINGUPDATE && app.SubStatus != types.APP_SUBSTATUS_ROLLINGUPDATE_UP {
blog.Error("reschedule taskgroup(%s) fail, application(%s.%s) status(%s:%s) err",
taskgroupId, runAs, appID, app.Status, app.SubStatus)
return errors.New("operation Not Allowed")
}
}*/

b.store.LockApplication(runAs + "." + appID)
defer b.store.UnLockApplication(runAs + "." + appID)
Expand Down Expand Up @@ -108,7 +108,7 @@ func (b *backend) RescheduleTaskgroup(taskgroupId string, hostRetainTime int64)
var rescheduleOpdata sched.TransRescheduleOpData

rescheduleOpdata.TaskGroupID = taskgroup.ID
rescheduleOpdata.Force = true
rescheduleOpdata.Force = true
rescheduleOpdata.IsInner = false
rescheduleOpdata.HostRetainTime = hostRetainTime
if rescheduleOpdata.HostRetainTime > 0 {
Expand Down
10 changes: 7 additions & 3 deletions bcs-mesos/bcs-scheduler/src/manager/sched/scheduler/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package scheduler

import (
"bk-bcs/bcs-common/common/blog"
"bk-bcs/bcs-mesos/bcs-scheduler/src/manager/store"
"bk-bcs/bcs-mesos/bcs-scheduler/src/mesosproto/mesos"
"bk-bcs/bcs-mesos/bcs-scheduler/src/mesosproto/sched"
"bk-bcs/bcs-mesos/bcs-scheduler/src/types"
Expand Down Expand Up @@ -165,6 +166,9 @@ func (s *Scheduler) ProcessCommandMessage(bcsMsg *types.BcsMessage) {
blog.Error("procss command message, but data empty")
return
}
runAs, appID := store.GetRunAsAndAppIDbyTaskID(bcsMsg.ResponseCommandTask.TaskId)
s.store.LockApplication(fmt.Sprintf("%s.%s", runAs, appID))
defer s.store.UnLockApplication(fmt.Sprintf("%s.%s", runAs, appID))

cmdId := bcsMsg.ResponseCommandTask.ID
taskId := bcsMsg.ResponseCommandTask.TaskId
Expand All @@ -177,13 +181,13 @@ func (s *Scheduler) ProcessCommandMessage(bcsMsg *types.BcsMessage) {
}

exist := false
for _, taskGroup := range command.Status.Taskgroups {
for _, taskGroup := range command.Status.Taskgroups {
for _, task := range taskGroup.Tasks {
if taskId == task.TaskId {
task.Status = bcsMsg.ResponseCommandTask.Status
task.Message = bcsMsg.ResponseCommandTask.Message
task.Message = bcsMsg.ResponseCommandTask.Message
task.CommInspect = bcsMsg.ResponseCommandTask.CommInspect
blog.Info("update command(%s) task(%s:%s:%s)", cmdId, taskId, task.Status, task.Message)
blog.Info("update command(%s) task(%s:%s:%s)", cmdId, taskId, task.Status, task.Message)
exist = true
break
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ func createOrLoadFrameworkInfo(config util.Scheduler, store store.Store) (*mesos
Hostname: proto.String(config.Hostname),
FailoverTimeout: proto.Float64(60 * 60 * 24 * 7),
Checkpoint: proto.Bool(true),
Capabilities: []*mesos.FrameworkInfo_Capability{
&mesos.FrameworkInfo_Capability{
Type: mesos.FrameworkInfo_Capability_PARTITION_AWARE.Enum(),
},
},
}

frameworkId, err := store.FetchFrameworkID()
Expand Down Expand Up @@ -1296,7 +1301,7 @@ func (s *Scheduler) GetClusterResource() (*commtype.BcsClusterResource, error) {

// Get cluster current resource information from mesos master
func (s *Scheduler) GetMesosResourceIn(mesosClient *client.Client) (*commtype.BcsClusterResource, error) {

if mesosClient == nil {
blog.Error("get cluster resource error: mesos Client is nil")
return nil, fmt.Errorf("system error: mesos client is nil")
Expand Down
Loading