Skip to content

Commit

Permalink
Add REST API to list job queue status, pause/resume job queue and lis…
Browse files Browse the repository at this point in the history
…t schedulers

Signed-off-by: stonezdj <[email protected]>
  • Loading branch information
stonezdj committed Oct 28, 2022
1 parent 703ca8d commit b07b9a3
Show file tree
Hide file tree
Showing 21 changed files with 1,010 additions and 25 deletions.
163 changes: 162 additions & 1 deletion api/v2.0/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4593,6 +4593,115 @@ paths:
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/jobservice/queues:
get:
operationId: listJobQueues
summary: list job queues
description: list job queue
tags:
- jobservice
parameters:
- $ref: '#/parameters/requestId'
responses:
'200':
description: List job queue successfully.
schema:
type: array
items:
$ref: '#/definitions/JobQueue'
'401':
$ref: '#/responses/401'
'403':
$ref: '#/responses/403'
'404':
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/jobservice/queues/{job_type}:
put:
operationId: actionPendingJobs
summary: stop and clean, pause, resume pending jobs in the queue
description: stop and clean, pause, resume pending jobs in the queue
tags:
- jobservice
parameters:
- $ref: '#/parameters/requestId'
- name: job_type
in: path
required: true
type: string
description: The type of the job. 'all' stands for all job types
- name: action_request
in: body
required: true
schema:
$ref: '#/definitions/ActionRequest'
responses:
'200':
description: take action to the jobs in the queue successfully.
'401':
$ref: '#/responses/401'
'403':
$ref: '#/responses/403'
'404':
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/jobservice/schedules:
get:
operationId: listSchedules
description: List schedules
tags:
- jobservice
parameters:
- $ref: '#/parameters/requestId'
- $ref: '#/parameters/page'
- $ref: '#/parameters/pageSize'
responses:
'200':
description: list schedule successfully.
schema:
type: array
items:
type: object
$ref: '#/definitions/ScheduleTask'
headers:
X-Total-Count:
description: The total count of available items
type: integer
Link:
description: Link to previous page and next page
type: string
'401':
$ref: '#/responses/401'
'403':
$ref: '#/responses/403'
'404':
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/jobservice/schedulerstatus:
get:
operationId: getSchedulerStatus
description: Get scheduler status
tags:
- jobservice
parameters:
- $ref: '#/parameters/requestId'
responses:
'200':
description: Get scheduler status successfully.
schema:
type: object
$ref: '#/definitions/SchedulerStatus'
'401':
$ref: '#/responses/401'
'403':
$ref: '#/responses/403'
'404':
$ref: '#/responses/404'
'500':
$ref: '#/responses/500'
/ping:
get:
operationId: getPing
Expand Down Expand Up @@ -9374,4 +9483,56 @@ definitions:
checkin_at:
type: string
format: date-time
description: The checkin time of the worker
description: The checkin time of the worker
ActionRequest:
type: object
description: The request to stop, pause or resume
properties:
action:
type: string
description: The action of the request, should be stop, pause or resume
JobQueue:
type: object
description: the job queue info
properties:
job_type:
type: string
description: The type of the current job
count:
type: integer
description: The count of jobs in the current queue
latency:
type: integer
description: The latency of current queue (seconds)
paused:
type: boolean
description: The paused status of current queue
ScheduleTask:
type: object
description: the schedule task info
properties:
id:
type: integer
description: the id of the Schedule task
vendor_type:
type: string
description: the vendor type of the current schedule task
vendor_id:
type: integer
description: the vendor id of the current task
extra_attrs:
type: string
description: the extra attributes
creation_time:
type: string
format: date-time
revision:
type: integer
description: the revision of current schedule
SchedulerStatus:
type: object
description: the scheduler status
properties:
paused:
type: boolean
description: if the scheduler is paused
144 changes: 134 additions & 10 deletions src/controller/jobmonitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/goharbor/harbor/src/lib/q"
libRedis "github.com/goharbor/harbor/src/lib/redis"
jm "github.com/goharbor/harbor/src/pkg/jobmonitor"
"github.com/goharbor/harbor/src/pkg/scheduler"
"github.com/goharbor/harbor/src/pkg/task"
)

Expand All @@ -42,23 +41,139 @@ type WorkerPoolController interface {
ListWorker(ctx context.Context, poolID string) ([]*jm.Worker, error)
// StopRunningJob stop the running job
StopRunningJob(ctx context.Context, jobID string) error
// ListQueue lists job queues
ListQueue(ctx context.Context) ([]*jm.Queue, error)
// StopPendingJob stop the pending job
StopPendingJob(ctx context.Context, jobType string) error
// PauseJobQueues suspend the all schedules or resume the all schedules
PauseJobQueues(ctx context.Context, jobType string, pause bool) error
// SchedulerStatus get the job scheduler status
SchedulerStatus(ctx context.Context) (bool, error)
}

type workerPoolController struct {
poolManager jm.PoolManager
workerManager jm.WorkerManager
taskManager task.Manager
sch scheduler.Scheduler
monitorClient func() (jm.JobServiceMonitorClient, error)
poolManager jm.PoolManager
workerManager jm.WorkerManager
queueManager jm.QueueManager
taskManager task.Manager
monitorClient func() (jm.JobServiceMonitorClient, error)
jobServiceRedisClient func() (jm.RedisClient, error)
}

func (w *workerPoolController) ListQueue(ctx context.Context) ([]*jm.Queue, error) {
mClient, err := w.monitorClient()
if err != nil {
return nil, err
}
qs, err := mClient.Queues()
if err != nil {
return nil, err
}
redisClient, err := w.jobServiceRedisClient()
if err != nil {
return nil, err
}
// the original queue doesn't include the paused status, fetch it from the redis
statusMap, err := redisClient.AllJobTypeStatus(ctx)
if err != nil {
return nil, err
}
result := make([]*jm.Queue, 0)
for _, q := range qs {
result = append(result, &jm.Queue{
JobType: q.JobName,
Count: q.Count,
Latency: q.Latency,
Paused: statusMap[q.JobName],
})
}
return result, nil
}

func (w *workerPoolController) StopPendingJob(ctx context.Context, jobType string) error {
redisClient, err := w.jobServiceRedisClient()
if err != nil {
return err
}
if strings.EqualFold(jobType, "all") {
jobTypes, err := redisClient.AllJobTypes(ctx)
if err != nil {
return err
}
for _, jobType := range jobTypes {
if err := w.StopPendingJob(ctx, jobType); err != nil {
return err
}
}
return nil
}
jobIDs, err := redisClient.StopPendingJobs(ctx, jobType)
if err != nil {
return err
}
return w.UpdateJobStatusInTask(ctx, jobIDs)
}
func (w *workerPoolController) UpdateJobStatusInTask(ctx context.Context, jobIDs []string) error {
for _, jobID := range jobIDs {
ts, err := w.taskManager.List(ctx, q.New(q.KeyWords{"job_id": jobID}))
if err != nil {
return err
}
if len(ts) == 0 {
continue
}
ts[0].Status = "Stopped"
if err := w.taskManager.Update(ctx, ts[0], "Status"); err != nil {
return err
}
}
return nil
}

func (w *workerPoolController) PauseJobQueues(ctx context.Context, jobType string, pause bool) error {
redisClient, err := w.jobServiceRedisClient()
if err != nil {
return err
}
if strings.EqualFold(jobType, "all") {
jobTypes, err := redisClient.AllJobTypes(ctx)
if err != nil {
return err
}
for _, jobType := range jobTypes {
if err := w.PauseJobQueues(ctx, jobType, pause); err != nil {
return err
}
}
return nil
}
if pause {
return redisClient.PauseJob(ctx, jobType)
}
return redisClient.UnpauseJob(ctx, jobType)
}

func (w *workerPoolController) SchedulerStatus(ctx context.Context) (bool, error) {
redisClient, err := w.jobServiceRedisClient()
if err != nil {
return false, err
}
statusMap, err := redisClient.AllJobTypeStatus(ctx)
if err != nil {
return false, err
}
return statusMap["SCHEDULER"], nil
}

// NewWorkerPoolController ...
func NewWorkerPoolController() WorkerPoolController {
return &workerPoolController{
poolManager: jm.NewPoolManager(),
workerManager: jm.NewWorkerManager(),
taskManager: task.NewManager(),
monitorClient: jobServiceMonitorClient,
poolManager: jm.NewPoolManager(),
workerManager: jm.NewWorkerManager(),
taskManager: task.NewManager(),
queueManager: jm.NewQueueClient(),
monitorClient: jobServiceMonitorClient,
jobServiceRedisClient: jobServiceRedisClient,
}
}

Expand Down Expand Up @@ -116,6 +231,15 @@ func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) {
return work.NewClient(fmt.Sprintf("{%s}", config.Namespace), pool), nil
}

func jobServiceRedisClient() (jm.RedisClient, error) {
cfg, err := job.GlobalClient.JobServiceConfig()
if err != nil {
return nil, err
}
config := cfg.RedisPoolConfig
return jm.NewRedisClient(config)
}

func (w *workerPoolController) ListWorker(ctx context.Context, poolID string) ([]*jm.Worker, error) {
mClient, err := w.monitorClient()
if err != nil {
Expand Down
Loading

0 comments on commit b07b9a3

Please sign in to comment.