Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http_*: add a HTTP API to list changefeed info #1917

Merged
merged 28 commits into from
Jun 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0e756b3
http_handler:add an HTTP API
asddongmen May 30, 2021
663eba9
http_handler:update api/v1/changefeeds
asddongmen May 31, 2021
b7f6786
http_handler: update
asddongmen Jun 1, 2021
45e8137
http_*: add unit test && wrap error message
asddongmen Jun 2, 2021
4d93743
Merge branch 'pingcap:master' into changefeed_http_api
asddongmen Jun 2, 2021
acbc236
Merge branch 'changefeed_http_api' of github.com:asddongmen/ticdc int…
asddongmen Jun 2, 2021
7ed1133
http_*: update
asddongmen Jun 3, 2021
8005025
http_*: extract filter as a func && update time format
asddongmen Jun 3, 2021
91f9013
http_*: format http error && add handleHealth
asddongmen Jun 3, 2021
605e028
Merge branch 'master' into changefeed_http_api
asddongmen Jun 3, 2021
ee6f209
http_*: put http api relevant code in a new file
asddongmen Jun 4, 2021
7888fc0
http_*: fix lint error
asddongmen Jun 4, 2021
3b43430
http_*: fix golangCI lint
asddongmen Jun 4, 2021
01333dc
Update cdc/http_status_test.go
Jun 4, 2021
d0ebe08
Merge branch 'master' into changefeed_http_api
asddongmen Jun 4, 2021
83d1dcf
http_*:fix lint error
asddongmen Jun 4, 2021
6d35fca
Merge branch 'changefeed_http_api' of github.com:asddongmen/ticdc int…
asddongmen Jun 4, 2021
c555cbe
Merge branch 'master' into changefeed_http_api
asddongmen Jun 4, 2021
d0fce2d
http_*:update to fit new owner
asddongmen Jun 4, 2021
8d69ced
Merge branch 'master' into changefeed_http_api
asddongmen Jun 5, 2021
dc691a7
Merge branch 'master' into changefeed_http_api
asddongmen Jun 5, 2021
be4bd12
Merge branch 'master' into changefeed_http_api
asddongmen Jun 5, 2021
9a6aad9
http_*: delete error code in httpErr
asddongmen Jun 5, 2021
8b88cde
Merge branch 'changefeed_http_api' of github.com:asddongmen/ticdc int…
asddongmen Jun 5, 2021
f798668
Update pkg/httputil/httputil.go
asddongmen Jun 5, 2021
825ec21
Merge branch 'master' into changefeed_http_api
asddongmen Jun 5, 2021
42a2518
http_handler: update
asddongmen Jun 5, 2021
e1867df
Merge branch 'master' into changefeed_http_api
ti-chi-bot Jun 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions cdc/http_api_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cdc

import (
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/httputil"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.uber.org/zap"
)

const (
// apiOpVarChangefeeds is the key of list option in HTTP API
apiOpVarChangefeeds = "state"
)

// JSONTime used to wrap time into json format
type JSONTime time.Time

// MarshalJSON use to specify the time format
func (t JSONTime) MarshalJSON() ([]byte, error) {
stamp := fmt.Sprintf("\"%s\"", time.Time(t).Format("2006-01-02 15:04:05.000"))
return []byte(stamp), nil
}

// err of cdc http api
type httpError struct {
Error string `json:"error"`
}

// ChangefeedCommonInfo holds some common usage information of a changefeed and use by RESTful API only.
type ChangefeedCommonInfo struct {
ID string `json:"id"`
FeedState model.FeedState `json:"state"`
CheckpointTSO uint64 `json:"checkpoint-tso"`
CheckpointTime JSONTime `json:"checkpoint-time"`
RunningError *model.RunningError `json:"error"`
}

// handleHealth check if is this server is health
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

// handleChangefeeds dispatch the request to the specified handleFunc according to the request method.
func (s *Server) handleChangefeeds(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodGet {
s.handleChangefeedsList(w, req)
return
}
// Only the get method is allowed at this stage,
// if it is not the get method, an error will be returned directly
writeErrorJSON(w, http.StatusBadRequest, *cerror.ErrSupportGetOnly)
}

// handleChangefeedsList will only received request with Get method from dispatcher.
func (s *Server) handleChangefeedsList(w http.ResponseWriter, req *http.Request) {
err := req.ParseForm()
if err != nil {
writeInternalServerErrorJSON(w, cerror.WrapError(cerror.ErrInternalServerError, err))
return
}
state := req.Form.Get(apiOpVarChangefeeds)

statuses, err := s.etcdClient.GetAllChangeFeedStatus(req.Context())
if err != nil {
writeInternalServerErrorJSON(w, err)
return
}

changefeedIDs := make(map[string]struct{}, len(statuses))
for cid := range statuses {
changefeedIDs[cid] = struct{}{}
}

resps := make([]*ChangefeedCommonInfo, 0)
for changefeedID := range changefeedIDs {
cfInfo, err := s.etcdClient.GetChangeFeedInfo(req.Context(), changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
writeInternalServerErrorJSON(w, err)
return
}
if !httputil.IsFiltered(state, cfInfo.State) {
continue
}
cfStatus, _, err := s.etcdClient.GetChangeFeedStatus(req.Context(), changefeedID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
writeInternalServerErrorJSON(w, err)
return
}
resp := &ChangefeedCommonInfo{
ID: changefeedID,
}

if cfInfo != nil {
resp.FeedState = cfInfo.State
resp.RunningError = cfInfo.Error
}

if cfStatus != nil {
resp.CheckpointTSO = cfStatus.CheckpointTs
tm := oracle.GetTimeFromTS(cfStatus.CheckpointTs)
resp.CheckpointTime = JSONTime(tm)
}
resps = append(resps, resp)
}
writeData(w, resps)
}

func writeInternalServerErrorJSON(w http.ResponseWriter, err error) {
writeErrorJSON(w, http.StatusInternalServerError, *cerror.ErrInternalServerError.Wrap(err))
}

func writeErrorJSON(w http.ResponseWriter, statusCode int, cerr errors.Error) {
httpErr := httpError{Error: cerr.Error()}
jsonStr, err := json.MarshalIndent(httpErr, "", " ")
if err != nil {
log.Error("invalid json data", zap.Reflect("data", err), zap.Error(err))
return
}
w.WriteHeader(statusCode)
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(jsonStr)
if err != nil {
log.Error("fail to write data", zap.Error(err))
}
}
1 change: 1 addition & 0 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (s *Server) handleChangefeedQuery(w http.ResponseWriter, req *http.Request)
return
}
}

if s.captureV2 == nil {
// for test only
handleOwnerResp(w, concurrency.ErrElectionNotLeader)
Expand Down
3 changes: 2 additions & 1 deletion cdc/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ func (s *Server) startStatusHTTP() error {
serverMux.HandleFunc("/capture/owner/rebalance_trigger", s.handleRebalanceTrigger)
serverMux.HandleFunc("/capture/owner/move_table", s.handleMoveTable)
serverMux.HandleFunc("/capture/owner/changefeed/query", s.handleChangefeedQuery)

serverMux.HandleFunc("/admin/log", handleAdminLogLevel)
serverMux.HandleFunc("/api/v1/changefeeds", s.handleChangefeeds)
serverMux.HandleFunc("/api/v1/health", s.handleHealth)

if util.FailpointBuild {
// `http.StripPrefix` is needed because `failpoint.HttpHandler` assumes that it handles the prefix `/`.
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,11 @@ error = '''
fail to create changefeed because start-ts %d is earlier than GC safepoint at %d
'''

["CDC:ErrSupportGetOnly"]
error = '''
this api supports GET method only
'''

["CDC:ErrSupportPostOnly"]
error = '''
this api supports POST method only
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ var (
ErrCaptureResignOwner = errors.Normalize("resign owner failed", errors.RFCCodeText("CDC:ErrCaptureResignOwner"))
ErrWaitHandleOperationTimeout = errors.Normalize("waiting processor to handle the operation finished timeout", errors.RFCCodeText("CDC:ErrWaitHandleOperationTimeout"))
ErrSupportPostOnly = errors.Normalize("this api supports POST method only", errors.RFCCodeText("CDC:ErrSupportPostOnly"))
ErrSupportGetOnly = errors.Normalize("this api supports GET method only", errors.RFCCodeText("CDC:ErrSupportGetOnly"))
ErrAPIInvalidParam = errors.Normalize("invalid api parameter", errors.RFCCodeText("CDC:ErrAPIInvalidParam"))
ErrInternalServerError = errors.Normalize("internal server error", errors.RFCCodeText("CDC:ErrInternalServerError"))
ErrOwnerSortDir = errors.Normalize("owner sort dir", errors.RFCCodeText("CDC:ErrOwnerSortDir"))
Expand Down
19 changes: 19 additions & 0 deletions pkg/httputil/httputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package httputil
import (
"net/http"

"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/security"
)

Expand Down Expand Up @@ -43,3 +44,21 @@ func NewClient(credential *security.Credential) (*Client, error) {
Client: http.Client{Transport: transport},
}, nil
}

// IsFiltered return true if the given feedState matches the whiteList.
func IsFiltered(whiteList string, feedState model.FeedState) bool {
if whiteList == "all" {
return true
}
if whiteList == "" {
switch feedState {
case model.StateNormal:
return true
case model.StateStopped:
return true
case model.StateFailed:
return true
}
}
return whiteList == string(feedState)
}