Skip to content

Commit

Permalink
tests: Add integration test for http api (#2563) (#2597)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 21, 2021
1 parent 36fe023 commit 4c1896a
Show file tree
Hide file tree
Showing 4 changed files with 420 additions and 34 deletions.
67 changes: 36 additions & 31 deletions cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ func (h *HTTPHandler) ResumeChangefeed(c *gin.Context) {
// check if the changefeed exists && check if the etcdClient work well
_, _, err := h.capture.etcdClient.GetChangeFeedStatus(c, changefeedID)
if err != nil {
if cerror.ErrChangeFeedNotExists.Equal(err) {
c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
return
}
c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
return
}
Expand Down Expand Up @@ -696,6 +700,37 @@ func (h *HTTPHandler) Health(c *gin.Context) {
c.Status(http.StatusOK)
}

// SetLogLevel changes TiCDC log level dynamically.
// @Summary Change TiCDC log level
// @Description change TiCDC log level dynamically
// @Tags common
// @Accept json
// @Produce json
// @Param log_level body string true "log level"
// @Success 200
// @Failure 400 {object} model.HTTPError
// @Router /api/v1/log [post]
func SetLogLevel(c *gin.Context) {
// get json data from request body
data := struct {
Level string `json:"log_level"`
}{}
err := c.BindJSON(&data)
if err != nil {
c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
return
}

err = logutil.SetLogLevel(data.Level)
if err != nil {
c.IndentedJSON(http.StatusBadRequest,
model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("fail to change log level: %s", err)))
return
}
log.Warn("log level changed", zap.String("level", data.Level))
c.Status(http.StatusOK)
}

// forwardToOwner forward an request to owner
func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
// every request can only forward to owner one time
Expand Down Expand Up @@ -727,6 +762,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
return
}

// init a request
req, _ := http.NewRequest(c.Request.Method, c.Request.RequestURI, c.Request.Body)
req.URL.Host = owner.AdvertiseAddr
if tslConfig != nil {
Expand Down Expand Up @@ -766,34 +802,3 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {
return
}
}

// SetLogLevel changes TiCDC log level dynamically.
// @Summary Change TiCDC log level
// @Description change TiCDC log level dynamically
// @Tags common
// @Accept json
// @Produce json
// @Param log_level body string true "log level"
// @Success 200
// @Failure 400 {object} model.HTTPError
// @Router /api/v1/log [post]
func SetLogLevel(c *gin.Context) {
// get json data from request body
data := struct {
Level string `json:"log_level"`
}{}
err := c.BindJSON(&data)
if err != nil {
c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
return
}

err = logutil.SetLogLevel(data.Level)
if err != nil {
c.IndentedJSON(http.StatusBadRequest,
model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("fail to change log level: %s", err)))
return
}
log.Warn("log level changed", zap.String("level", data.Level))
c.Status(http.StatusOK)
}
5 changes: 2 additions & 3 deletions cdc/http_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,11 @@ import (

// newRouter create a router for OpenAPI
func newRouter(capture2 *capture.Capture) *gin.Engine {
router := gin.New()

// discard gin log output
gin.SetMode(gin.ReleaseMode)
gin.DefaultWriter = ioutil.Discard

router := gin.New()

// request will timeout after 10 second
router.Use(timeoutMiddleware(time.Second * 10))

Expand Down
65 changes: 65 additions & 0 deletions tests/http_api/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/bin/bash

set -e

CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
sudo pip install -U requests==2.26.0

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR --multiple-upstream-pd true

cd $WORK_DIR

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
# wait for cdc run
sleep 1

TOPIC_NAME="ticdc-http-api-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}";;
*) SINK_URI="mysql://normal:[email protected]:3306/";;
esac

python $CUR/util/test_case.py check_health
python $CUR/util/test_case.py get_status

python $CUR/util/test_case.py create_changefeed "$SINK_URI"

run_sql "CREATE table test.simple(id int primary key, val int);"
run_sql "CREATE table test.\`simple-dash\`(id int primary key, val int);"
# wait for above sql done in the up source
sleep 1

sequential_cases=(
"list_changefeed"
"get_changefeed"
"pause_changefeed"
"update_changefeed"
"resume_changefeed"
"rebalance_table"
"move_table"
"get_processor"
"list_processor"
"set_log_level"
"remove_changefeed"
"resign_owner"
)

for case in $sequential_cases; do {
python $CUR/util/test_case.py "$case";
} done;

cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Loading

0 comments on commit 4c1896a

Please sign in to comment.