Skip to content

Commit

Permalink
http_*: add a HTTP API to list changefeed info (#1917)
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored Jun 5, 2021
1 parent 05ca2d2 commit 90116b2
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 1 deletion.
146 changes: 146 additions & 0 deletions cdc/http_api_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cdc

import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/httputil"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.uber.org/zap"
)

const (
// apiOpVarChangefeeds is the key of list option in HTTP API
apiOpVarChangefeeds = "state"
)

// JSONTime used to wrap time into json format
type JSONTime time.Time

// MarshalJSON use to specify the time format
func (t JSONTime) MarshalJSON() ([]byte, error) {
stamp := fmt.Sprintf("\"%s\"", time.Time(t).Format("2006-01-02 15:04:05.000"))
return []byte(stamp), nil
}

// err of cdc http api
type httpError struct {
Error string `json:"error"`
}

// ChangefeedCommonInfo holds some common usage information of a changefeed and use by RESTful API only.
type ChangefeedCommonInfo struct {
ID string `json:"id"`
FeedState model.FeedState `json:"state"`
CheckpointTSO uint64 `json:"checkpoint-tso"`
CheckpointTime JSONTime `json:"checkpoint-time"`
RunningError *model.RunningError `json:"error"`
}

// handleHealth check if is this server is health
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

// handleChangefeeds dispatch the request to the specified handleFunc according to the request method.
func (s *Server) handleChangefeeds(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodGet {
s.handleChangefeedsList(w, req)
return
}
// Only the get method is allowed at this stage,
// if it is not the get method, an error will be returned directly
writeErrorJSON(w, http.StatusBadRequest, *cerror.ErrSupportGetOnly)
}

// handleChangefeedsList will only received request with Get method from dispatcher.
func (s *Server) handleChangefeedsList(w http.ResponseWriter, req *http.Request) {
err := req.ParseForm()
if err != nil {
writeInternalServerErrorJSON(w, cerror.WrapError(cerror.ErrInternalServerError, err))
return
}
state := req.Form.Get(apiOpVarChangefeeds)

statuses, err := s.etcdClient.GetAllChangeFeedStatus(req.Context())
if err != nil {
writeInternalServerErrorJSON(w, err)
return
}

changefeedIDs := make(map[string]struct{}, len(statuses))
for cid := range statuses {
changefeedIDs[cid] = struct{}{}
}

resps := make([]*ChangefeedCommonInfo, 0)
for changefeedID := range changefeedIDs {
cfInfo, err := s.etcdClient.GetChangeFeedInfo(req.Context(), changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
writeInternalServerErrorJSON(w, err)
return
}
if !httputil.IsFiltered(state, cfInfo.State) {
continue
}
cfStatus, _, err := s.etcdClient.GetChangeFeedStatus(req.Context(), changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
writeInternalServerErrorJSON(w, err)
return
}
resp := &ChangefeedCommonInfo{
ID: changefeedID,
}

if cfInfo != nil {
resp.FeedState = cfInfo.State
resp.RunningError = cfInfo.Error
}

if cfStatus != nil {
resp.CheckpointTSO = cfStatus.CheckpointTs
tm := oracle.GetTimeFromTS(cfStatus.CheckpointTs)
resp.CheckpointTime = JSONTime(tm)
}
resps = append(resps, resp)
}
writeData(w, resps)
}

func writeInternalServerErrorJSON(w http.ResponseWriter, err error) {
writeErrorJSON(w, http.StatusInternalServerError, *cerror.ErrInternalServerError.Wrap(err))
}

func writeErrorJSON(w http.ResponseWriter, statusCode int, cerr errors.Error) {
httpErr := httpError{Error: cerr.Error()}
jsonStr, err := json.MarshalIndent(httpErr, "", " ")
if err != nil {
log.Error("invalid json data", zap.Reflect("data", err), zap.Error(err))
return
}
w.WriteHeader(statusCode)
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(jsonStr)
if err != nil {
log.Error("fail to write data", zap.Error(err))
}
}
1 change: 1 addition & 0 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (s *Server) handleChangefeedQuery(w http.ResponseWriter, req *http.Request)
return
}
}

if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
Expand Down
3 changes: 2 additions & 1 deletion cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ func (s *Server) startStatusHTTP() error {
serverMux.HandleFunc("/capture/owner/rebalance_trigger", s.handleRebalanceTrigger)
serverMux.HandleFunc("/capture/owner/move_table", s.handleMoveTable)
serverMux.HandleFunc("/capture/owner/changefeed/query", s.handleChangefeedQuery)

serverMux.HandleFunc("/admin/log", handleAdminLogLevel)
serverMux.HandleFunc("/api/v1/changefeeds", s.handleChangefeeds)
serverMux.HandleFunc("/api/v1/health", s.handleHealth)

if util.FailpointBuild {
// `http.StripPrefix` is needed because `failpoint.HttpHandler` assumes that it handles the prefix `/`.
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,11 @@ error = '''
fail to create changefeed because start-ts %d is earlier than GC safepoint at %d
'''

["CDC:ErrSupportGetOnly"]
error = '''
this api supports GET method only
'''

["CDC:ErrSupportPostOnly"]
error = '''
this api supports POST method only
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ var (
ErrCaptureResignOwner = errors.Normalize("resign owner failed", errors.RFCCodeText("CDC:ErrCaptureResignOwner"))
ErrWaitHandleOperationTimeout = errors.Normalize("waiting processor to handle the operation finished timeout", errors.RFCCodeText("CDC:ErrWaitHandleOperationTimeout"))
ErrSupportPostOnly = errors.Normalize("this api supports POST method only", errors.RFCCodeText("CDC:ErrSupportPostOnly"))
ErrSupportGetOnly = errors.Normalize("this api supports GET method only", errors.RFCCodeText("CDC:ErrSupportGetOnly"))
ErrAPIInvalidParam = errors.Normalize("invalid api parameter", errors.RFCCodeText("CDC:ErrAPIInvalidParam"))
ErrInternalServerError = errors.Normalize("internal server error", errors.RFCCodeText("CDC:ErrInternalServerError"))
ErrOwnerSortDir = errors.Normalize("owner sort dir", errors.RFCCodeText("CDC:ErrOwnerSortDir"))
Expand Down
19 changes: 19 additions & 0 deletions pkg/httputil/httputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package httputil
import (
"net/http"

"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/security"
)

Expand Down Expand Up @@ -43,3 +44,21 @@ func NewClient(credential *security.Credential) (*Client, error) {
Client: http.Client{Transport: transport},
}, nil
}

// IsFiltered return true if the given feedState matches the whiteList.
func IsFiltered(whiteList string, feedState model.FeedState) bool {
if whiteList == "all" {
return true
}
if whiteList == "" {
switch feedState {
case model.StateNormal:
return true
case model.StateStopped:
return true
case model.StateFailed:
return true
}
}
return whiteList == string(feedState)
}

0 comments on commit 90116b2

Please sign in to comment.