From 4c1896a0881aade3b27fb6a895ab3b75e16eee9f Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Sat, 21 Aug 2021 14:30:01 +0800 Subject: [PATCH] tests: Add integration test for http api (#2563) (#2597) --- cdc/capture/http_handler.go | 67 ++++--- cdc/http_router.go | 5 +- tests/http_api/run.sh | 65 +++++++ tests/http_api/util/test_case.py | 317 +++++++++++++++++++++++++++++++ 4 files changed, 420 insertions(+), 34 deletions(-) create mode 100644 tests/http_api/run.sh create mode 100644 tests/http_api/util/test_case.py diff --git a/cdc/capture/http_handler.go b/cdc/capture/http_handler.go index f12e19bdd4d..c223efb8674 100644 --- a/cdc/capture/http_handler.go +++ b/cdc/capture/http_handler.go @@ -305,6 +305,10 @@ func (h *HTTPHandler) ResumeChangefeed(c *gin.Context) { // check if the changefeed exists && check if the etcdClient work well _, _, err := h.capture.etcdClient.GetChangeFeedStatus(c, changefeedID) if err != nil { + if cerror.ErrChangeFeedNotExists.Equal(err) { + c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + return + } c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) return } @@ -696,6 +700,37 @@ func (h *HTTPHandler) Health(c *gin.Context) { c.Status(http.StatusOK) } +// SetLogLevel changes TiCDC log level dynamically. +// @Summary Change TiCDC log level +// @Description change TiCDC log level dynamically +// @Tags common +// @Accept json +// @Produce json +// @Param log_level body string true "log level" +// @Success 200 +// @Failure 400 {object} model.HTTPError +// @Router /api/v1/log [post] +func SetLogLevel(c *gin.Context) { + // get json data from request body + data := struct { + Level string `json:"log_level"` + }{} + err := c.BindJSON(&data) + if err != nil { + c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + return + } + + err = logutil.SetLogLevel(data.Level) + if err != nil { + c.IndentedJSON(http.StatusBadRequest, + model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("fail to change log level: %s", err))) + return + } + log.Warn("log level changed", zap.String("level", data.Level)) + c.Status(http.StatusOK) +} + // forwardToOwner forward an request to owner func (h *HTTPHandler) forwardToOwner(c *gin.Context) { // every request can only forward to owner one time @@ -727,6 +762,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { return } + // init a request req, _ := http.NewRequest(c.Request.Method, c.Request.RequestURI, c.Request.Body) req.URL.Host = owner.AdvertiseAddr if tslConfig != nil { @@ -766,34 +802,3 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { return } } - -// SetLogLevel changes TiCDC log level dynamically. -// @Summary Change TiCDC log level -// @Description change TiCDC log level dynamically -// @Tags common -// @Accept json -// @Produce json -// @Param log_level body string true "log level" -// @Success 200 -// @Failure 400 {object} model.HTTPError -// @Router /api/v1/log [post] -func SetLogLevel(c *gin.Context) { - // get json data from request body - data := struct { - Level string `json:"log_level"` - }{} - err := c.BindJSON(&data) - if err != nil { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - - err = logutil.SetLogLevel(data.Level) - if err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("fail to change log level: %s", err))) - return - } - log.Warn("log level changed", zap.String("level", data.Level)) - c.Status(http.StatusOK) -} diff --git a/cdc/http_router.go b/cdc/http_router.go index 5d6f203b887..cc99e0ecd8e 100644 --- a/cdc/http_router.go +++ b/cdc/http_router.go @@ -30,12 +30,11 @@ import ( // newRouter create a router for OpenAPI func newRouter(capture2 *capture.Capture) *gin.Engine { - router := gin.New() - // discard gin log output - gin.SetMode(gin.ReleaseMode) gin.DefaultWriter = ioutil.Discard + router := gin.New() + // request will timeout after 10 second router.Use(timeoutMiddleware(time.Second * 10)) diff --git a/tests/http_api/run.sh b/tests/http_api/run.sh new file mode 100644 index 00000000000..0d0f0793b02 --- /dev/null +++ b/tests/http_api/run.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function run() { + sudo pip install -U requests==2.26.0 + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR --multiple-upstream-pd true + + cd $WORK_DIR + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + # wait for cdc run + sleep 1 + + TOPIC_NAME="ticdc-http-api-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&kafka-version=${KAFKA_VERSION}";; + *) SINK_URI="mysql://normal:123456@127.0.0.1:3306/";; + esac + + python $CUR/util/test_case.py check_health + python $CUR/util/test_case.py get_status + + python $CUR/util/test_case.py create_changefeed "$SINK_URI" + + run_sql "CREATE table test.simple(id int primary key, val int);" + run_sql "CREATE table test.\`simple-dash\`(id int primary key, val int);" + # wait for above sql done in the up source + sleep 1 + + sequential_cases=( + "list_changefeed" + "get_changefeed" + "pause_changefeed" + "update_changefeed" + "resume_changefeed" + "rebalance_table" + "move_table" + "get_processor" + "list_processor" + "set_log_level" + "remove_changefeed" + "resign_owner" + ) + + for case in $sequential_cases; do { + python $CUR/util/test_case.py "$case"; + } done; + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +check_logs $WORK_DIR +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" diff --git a/tests/http_api/util/test_case.py b/tests/http_api/util/test_case.py new file mode 100644 index 00000000000..c74e4eba400 --- /dev/null +++ b/tests/http_api/util/test_case.py @@ -0,0 +1,317 @@ +import sys +import requests as rq +import time +import json + + +# the max retry time +RETRY_TIME = 10 + +BASE_URL = "http://127.0.0.1:8300/api/v1" + + +# we should write some SQLs in the run.sh after call create_changefeed +def create_changefeed(sink_uri): + url = BASE_URL+"/changefeeds" + # create changefeed + for i in range(1, 4): + data = { + "changefeed_id": "changefeed-test"+str(i), + "sink_uri": "blackhole://", + "ignore_ineligible_table": True + } + # set sink_uri + if sink_uri != "": + data["sink_uri"] = sink_uri + + data = json.dumps(data) + headers = {"Content-Type": "application/json"} + resp = rq.post(url, data=data, headers=headers) + assert resp.status_code == rq.codes.accepted + + # create changefeed fail because sink_uri is invalid + data = json.dumps({ + "changefeed_id": "changefeed-test", + "sink_uri": "mysql://127.0.0.1:1111", + "ignore_ineligible_table": True + }) + headers = {"Content-Type": "application/json"} + resp = rq.post(url, data=data, headers=headers) + assert resp.status_code == rq.codes.bad_request + + print("pass test: create changefeed") + + +def list_changefeed(): + # test state: all + url = BASE_URL+"/changefeeds?state=all" + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + + # test state: normal + url = BASE_URL+"/changefeeds?state=normal" + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + data = resp.json() + for changefeed in data: + assert changefeed["state"] == "normal" + + # test state: stopped + url = BASE_URL+"/changefeeds?state=stopped" + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + data = resp.json() + for changefeed in data: + assert changefeed["state"] == "stopped" + + print("pass test: list changefeed") + +def get_changefeed(): + # test get changefeed success + url = BASE_URL+"/changefeeds/changefeed-test1" + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + + # test get changefeed failed + url = BASE_URL+"/changefeeds/changefeed-not-exists" + resp = rq.get(url) + assert resp.status_code == rq.codes.bad_request + data = resp.json() + assert data["error_code"] == "CDC:ErrChangeFeedNotExists" + + print("pass test: get changefeed") + + +def pause_changefeed(): + # pause changefeed + url = BASE_URL+"/changefeeds/changefeed-test2/pause" + for i in range(RETRY_TIME): + resp = rq.post(url) + if resp.status_code == rq.codes.accepted: + break + assert resp.status_code == rq.codes.accepted + # check if pause changefeed success + url = BASE_URL+"/changefeeds/changefeed-test2" + for i in range(RETRY_TIME): + time.sleep(1) + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + data = resp.json() + if data["state"] == "stopped": + break + assert data["state"] == "stopped" + + # test pause changefeed failed + url = BASE_URL+"/changefeeds/changefeed-not-exists/pause" + resp = rq.post(url) + assert resp.status_code == rq.codes.bad_request + data = resp.json() + assert data["error_code"] == "CDC:ErrChangeFeedNotExists" + + print("pass test: pause changefeed") + +def update_changefeed(): + # update fail + # can only update a stopped changefeed + url = BASE_URL+"/changefeeds/changefeed-test1" + data = json.dumps({"mounter_worker_num": 32}) + headers = {"Content-Type": "application/json"} + resp = rq.put(url, data=data, headers=headers) + assert resp.status_code == rq.codes.bad_request + + # update success + url = BASE_URL+"/changefeeds/changefeed-test2" + data = json.dumps({"mounter_worker_num": 32}) + headers = {"Content-Type": "application/json"} + resp = rq.put(url, data=data, headers=headers) + assert resp.status_code == rq.codes.accepted + + # update fail + # can't update start_ts + url = BASE_URL+"/changefeeds/changefeed-test2" + data = json.dumps({"start_ts": 0}) + headers = {"Content-Type": "application/json"} + resp = rq.put(url, data=data, headers=headers) + assert resp.status_code == rq.codes.bad_request + + print("pass test: update changefeed") + + +def resume_changefeed(): + # resume changefeed + url = BASE_URL+"/changefeeds/changefeed-test2/resume" + resp = rq.post(url) + assert resp.status_code == rq.codes.accepted + + # check if resume changefeed success + url = BASE_URL+"/changefeeds/changefeed-test2" + for i in range(RETRY_TIME): + time.sleep(1) + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + data = resp.json() + if data["state"] == "normal": + break + assert data["state"] == "normal" + + # test resume changefeed failed + url = BASE_URL+"/changefeeds/changefeed-not-exists/resume" + resp = rq.post(url) + assert resp.status_code == rq.codes.bad_request + data = resp.json() + assert data["error_code"] == "CDC:ErrChangeFeedNotExists" + + print("pass test: resume changefeed") + + +def remove_changefeed(): + # remove changefeed + url = BASE_URL+"/changefeeds/changefeed-test3" + resp = rq.delete(url) + assert resp.status_code == rq.codes.accepted + + # check if remove changefeed success + url = BASE_URL+"/changefeeds/changefeed-test3" + for i in range(RETRY_TIME): + time.sleep(1) + resp = rq.get(url) + if resp.status_code == rq.codes.bad_request: + break + + assert resp.status_code == rq.codes.bad_request + assert resp.json()["error_code"] == "CDC:ErrChangeFeedNotExists" + + # test remove changefeed failed + url = BASE_URL+"/changefeeds/changefeed-not-exists" + resp = rq.delete(url) + assert (resp.status_code == rq.codes.bad_request or resp.status_code == rq.codes.internal_server_error) + data = resp.json() + assert data["error_code"] == "CDC:ErrChangeFeedNotExists" + + print("pass test: remove changefeed") + + +def rebalance_table(): + # rebalance_table + url = BASE_URL + "/changefeeds/changefeed-test1/tables/rebalance_table" + resp = rq.post(url) + assert resp.status_code == rq.codes.accepted + + print("pass test: rebalance table") + + +def move_table(): + # move table + url = BASE_URL + "/changefeeds/changefeed-test1/tables/move_table" + data = json.dumps({"capture_id": "test-aaa-aa", "table_id": 11}) + headers = {"Content-Type": "application/json"} + resp = rq.post(url, data=data, headers=headers) + assert resp.status_code == rq.codes.accepted + + # move table fail + # not allow empty capture_id + data = json.dumps({"capture_id": "", "table_id": 11}) + headers = {"Content-Type": "application/json"} + resp = rq.post(url, data=data, headers=headers) + assert resp.status_code == rq.codes.bad_request + + print("pass test: move table") + + +def resign_owner(): + url = BASE_URL + "/owner/resign" + resp = rq.post(url) + assert resp.status_code == rq.codes.accepted + + print("pass test: resign owner") + + +def list_capture(): + url = BASE_URL + "/captures" + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + + print("pass test: list captures") + + +def list_processor(): + url = BASE_URL + "/processors" + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + + print("pass test: list processors") + + +# must at least one table is sync will the test success +def get_processor(): + url = BASE_URL + "/processors" + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + data = resp.json()[0] + url = url + "/" + data["changefeed_id"] + "/" + data["capture_id"] + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + + print("pass test: get processors") + + +def check_health(): + url = BASE_URL + "/health" + for i in range(RETRY_TIME): + time.sleep(1) + resp = rq.get(url) + if resp.status_code == rq.codes.ok: + break + assert resp.status_code == rq.codes.ok + + print("pass test: check health") + + +def get_status(): + url = BASE_URL + "/status" + resp = rq.get(url) + assert resp.status_code == rq.codes.ok + assert resp.json()["is_owner"] + + print("pass test: get status") + + +def set_log_level(): + url = BASE_URL + "/log" + data = json.dumps({"log_level": "debug"}) + headers = {"Content-Type": "application/json"} + resp = rq.post(url, data=data, headers=headers) + assert resp.status_code == rq.codes.ok + + data = json.dumps({"log_level": "info"}) + resp = rq.post(url, data=data, headers=headers) + assert resp.status_code == rq.codes.ok + + print("pass test: set log level") + + +if __name__ == "__main__": + # test all the case as the order list in this map + FUNC_MAP = { + "check_health": check_health, + "get_status": get_status, + "create_changefeed": create_changefeed, + "list_changefeed": list_changefeed, + "get_changefeed": get_changefeed, + "pause_changefeed": pause_changefeed, + "update_changefeed": update_changefeed, + "resume_changefeed": resume_changefeed, + "rebalance_table": rebalance_table, + "move_table": move_table, + "get_processor": get_processor, + "list_processor": list_processor, + "set_log_level": set_log_level, + "remove_changefeed": remove_changefeed, + "resign_owner": resign_owner, + } + + func = FUNC_MAP[sys.argv[1]] + if len(sys.argv) >= 2: + func(*sys.argv[2:]) + else: + func()