Skip to content

Commit

Permalink
executor(engine): support openapi registration for job master (#5696)
Browse files Browse the repository at this point in the history
ref #5689
  • Loading branch information
sleepymole authored Jun 2, 2022
1 parent 9a0bfd8 commit d0c1970
Show file tree
Hide file tree
Showing 15 changed files with 411 additions and 80 deletions.
1 change: 1 addition & 0 deletions engine/cmd/executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func main() {
case flag.ErrHelp:
os.Exit(0)
default:
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(2)
}

Expand Down
44 changes: 0 additions & 44 deletions engine/executor/http.go

This file was deleted.

92 changes: 92 additions & 0 deletions engine/executor/openapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"net/http"
"strings"
"sync"

"github.com/gin-gonic/gin"

libModel "github.com/pingcap/tiflow/engine/lib/model"
)

const jobAPIPrefix = "/api/v1/jobs/"

func jobAPIBasePath(jobID libModel.JobID) string {
return jobAPIPrefix + jobID + "/"
}

type jobAPIServer struct {
rwm sync.RWMutex
handlers map[libModel.JobID]http.Handler
}

func newJobAPIServer() *jobAPIServer {
return &jobAPIServer{
handlers: make(map[libModel.JobID]http.Handler),
}
}

func (s *jobAPIServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h, ok := s.match(r.URL.Path); ok {
h.ServeHTTP(w, r)
} else {
http.NotFound(w, r)
}
}

func (s *jobAPIServer) match(path string) (http.Handler, bool) {
if !strings.HasPrefix(path, jobAPIPrefix) {
return nil, false
}
path = strings.TrimPrefix(path, jobAPIPrefix)
fields := strings.SplitN(path, "/", 2)
if len(fields) != 2 {
return nil, false
}
JobID := fields[0]
s.rwm.RLock()
h, ok := s.handlers[JobID]
s.rwm.RUnlock()
return h, ok
}

func (s *jobAPIServer) initialize(jobID libModel.JobID, f func(apiGroup *gin.RouterGroup)) {
engine := gin.New()
apiGroup := engine.Group(jobAPIBasePath(jobID))
f(apiGroup)

s.rwm.Lock()
s.handlers[jobID] = engine
s.rwm.Unlock()
}

func (s *jobAPIServer) listenStoppedJobs(ctx context.Context, stoppedJobs <-chan libModel.JobID) error {
for {
select {
case jobID, ok := <-stoppedJobs:
if !ok {
return nil
}
s.rwm.Lock()
delete(s.handlers, jobID)
s.rwm.Unlock()
case <-ctx.Done():
return ctx.Err()
}
}
}
95 changes: 95 additions & 0 deletions engine/executor/openapi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"

libModel "github.com/pingcap/tiflow/engine/lib/model"
)

func TestJobAPIServer(t *testing.T) {
jobAPISrv := newJobAPIServer()

jobAPISrv.initialize("job1", func(apiGroup *gin.RouterGroup) {
apiGroup.GET("/status", func(c *gin.Context) {
c.String(http.StatusOK, "job1 status")
})
})
jobAPISrv.initialize("job2", func(apiGroup *gin.RouterGroup) {
apiGroup.GET("/status", func(c *gin.Context) {
c.String(http.StatusOK, "job2 status")
})
})

// test job1
{
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/api/v1/jobs/job1/status", nil)
jobAPISrv.ServeHTTP(w, r)
require.Equal(t, http.StatusOK, w.Code)
require.Equal(t, "job1 status", w.Body.String())
}
// test job2
{
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/api/v1/jobs/job2/status", nil)
jobAPISrv.ServeHTTP(w, r)
require.Equal(t, http.StatusOK, w.Code)
require.Equal(t, "job2 status", w.Body.String())
}
// test not found
{
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/api/v1/jobs/job3/status", nil)
jobAPISrv.ServeHTTP(w, r)
require.Equal(t, http.StatusNotFound, w.Code)
}

wg := sync.WaitGroup{}
stoppedJobs := make(chan libModel.JobID, 16)
wg.Add(1)
go func() {
defer wg.Done()
err := jobAPISrv.listenStoppedJobs(context.Background(), stoppedJobs)
require.NoError(t, err)
}()

stoppedJobs <- "job1"
require.Eventually(t, func() bool {
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/api/v1/jobs/job1/status", nil)
jobAPISrv.ServeHTTP(w, r)
return w.Code == http.StatusNotFound
}, time.Second, time.Millisecond*100)

stoppedJobs <- "job2"
require.Eventually(t, func() bool {
w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/api/v1/jobs/job2/status", nil)
jobAPISrv.ServeHTTP(w, r)
return w.Code == http.StatusNotFound
}, time.Second, time.Millisecond*100)

close(stoppedJobs)
wg.Wait()
}
46 changes: 40 additions & 6 deletions engine/executor/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@ package executor
import (
"context"
"encoding/json"
"net/http"
"net/http/pprof"
"strings"
"time"

pcErrors "github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
p2pImpl "github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/tcpserver"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand All @@ -36,9 +33,12 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/tiflow/dm/dm/common"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/engine/client"
pb "github.com/pingcap/tiflow/engine/enginepb"
"github.com/pingcap/tiflow/engine/executor/worker"
"github.com/pingcap/tiflow/engine/lib"
libModel "github.com/pingcap/tiflow/engine/lib/model"
"github.com/pingcap/tiflow/engine/lib/registry"
"github.com/pingcap/tiflow/engine/model"
Expand All @@ -53,11 +53,16 @@ import (
"github.com/pingcap/tiflow/engine/pkg/meta/metaclient"
pkgOrm "github.com/pingcap/tiflow/engine/pkg/orm"
"github.com/pingcap/tiflow/engine/pkg/p2p"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/pingcap/tiflow/engine/pkg/rpcutil"
"github.com/pingcap/tiflow/engine/pkg/serverutils"
"github.com/pingcap/tiflow/engine/pkg/tenant"
"github.com/pingcap/tiflow/engine/test"
"github.com/pingcap/tiflow/engine/test/mock"
p2pImpl "github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/tcpserver"
)

// Server is a executor server abstraction
Expand Down Expand Up @@ -89,6 +94,7 @@ type Server struct {
p2pMsgRouter p2pImpl.MessageRouter
discoveryKeeper *serverutils.DiscoveryKeepaliver
resourceBroker broker.Broker
jobAPISrv *jobAPIServer
}

// NewServer creates a new executor server instance
Expand All @@ -97,6 +103,7 @@ func NewServer(cfg *Config, ctx *test.Context) *Server {
cfg: cfg,
testCtx: ctx,
cliUpdateCh: make(chan cliUpdateInfo),
jobAPISrv: newJobAPIServer(),
}
return &s
}
Expand Down Expand Up @@ -196,6 +203,10 @@ func (s *Server) makeTask(
log.L().Error("Failed to create worker", zap.Error(err))
return nil, err
}
if jm, ok := newWorker.(lib.BaseJobMasterExt); ok {
jobID := newWorker.ID()
s.jobAPISrv.initialize(jobID, jm.TriggerOpenAPIInitialize)
}
return newWorker, nil
}

Expand Down Expand Up @@ -340,6 +351,12 @@ func (s *Server) Run(ctx context.Context) error {
return s.taskRunner.Run(ctx)
})

wg.Go(func() error {
taskStopReceiver := s.taskRunner.TaskStopReceiver()
defer taskStopReceiver.Close()
return s.jobAPISrv.listenStoppedJobs(ctx, taskStopReceiver.C)
})

err := s.initClients(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -425,7 +442,24 @@ func (s *Server) startTCPService(ctx context.Context, wg *errgroup.Group) error
})

wg.Go(func() error {
return httpHandler(s.tcpServer.HTTP1Listener())
mux := http.NewServeMux()

mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
mux.Handle("/metrics", promutil.HTTPHandlerForMetric())
mux.Handle(jobAPIPrefix, s.jobAPISrv)

httpSrv := &http.Server{
Handler: mux,
}
err := httpSrv.Serve(s.tcpServer.HTTP1Listener())
if err != nil && !common.IsErrNetClosing(err) && err != http.ErrServerClosed {
log.L().Error("http server returned", log.ShortError(err))
}
return err
})
return nil
}
Expand Down
Loading

0 comments on commit d0c1970

Please sign in to comment.