From bb180b298846a0f7214d07ee9bcb66a900e368f5 Mon Sep 17 00:00:00 2001 From: xiaofan2012 <41982310+xiaofan2022@users.noreply.github.com> Date: Thu, 26 Oct 2023 17:47:05 +0800 Subject: [PATCH] [Feature][Connector-V2] HTTP supports page increase #5477 (#5561) --------- Co-authored-by: xiaofan2022 Co-authored-by: Eric --- docs/en/connector-v2/source/Http.md | 33 ++++ .../seatunnel/http/config/HttpConfig.java | 19 +++ .../seatunnel/http/config/PageInfo.java | 35 +++++ .../seatunnel/http/source/HttpSource.java | 30 +++- .../http/source/HttpSourceReader.java | 116 +++++++++++--- .../seatunnel/e2e/connector/http/HttpIT.java | 7 + .../http_page_increase_no_page_num.conf | 85 +++++++++++ .../http_page_increase_page_num.conf | 85 +++++++++++ .../src/test/resources/mockserver-config.json | 144 ++++++++++++++++++ 9 files changed, 529 insertions(+), 25 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf diff --git a/docs/en/connector-v2/source/Http.md b/docs/en/connector-v2/source/Http.md index f3e6a221bb0..0c01be813e4 100644 --- a/docs/en/connector-v2/source/Http.md +++ b/docs/en/connector-v2/source/Http.md @@ -48,6 +48,10 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor | schema | Config | No | - | Http and seatunnel data structure mapping | | schema.fields | Config | No | - | The schema fields of upstream data | | json_field | Config | No | - | This parameter helps you configure the schema,so this parameter must be used with schema. | +| pageing | Config | No | - | This parameter is used for paging queries | +| pageing.page_field | String | No | - | This parameter is used to specify the page field name in the request parameter | +| pageing.total_page_size | Int | No | - | This parameter is used to control the total number of pages | +| pageing.batch_size | Int | No | - | The batch size returned per request is used to determine whether to continue when the total number of pages is unknown | | content_json | String | No | - | This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. | | format | String | No | json | The format of upstream data, now only support `json` `text`, default `json`. | | method | String | No | get | Http request method, only supports GET, POST method. | @@ -310,6 +314,35 @@ source { - Test data can be found at this link [mockserver-config.json](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json) - See this link for task configuration [http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf). +### pageing + +```hocon +source { + Http { + url = "http://localhost:8080/mock/queryData" + method = "GET" + format = "json" + params={ + page: "${page}" + } + content_field = "$.data.*" + pageing={ + total_page_size=20 + page_field=page + #when don't know the total_page_size use batch_size if read size URL = Options.key("url").stringType().noDefaultValue().withDescription("Http request url"); + public static final Option TOTAL_PAGE_SIZE = + Options.key("total_page_size") + .longType() + .defaultValue(0L) + .withDescription("total page size"); + public static final Option BATCH_SIZE = + Options.key("batch_size") + .intType() + .defaultValue(100) + .withDescription( + "the batch size returned per request is used to determine whether to continue when the total number of pages is unknown"); + public static final Option PAGE_FIELD = + Options.key("page_field") + .stringType() + .defaultValue("page") + .withDescription( + "this parameter is used to specify the page field name in the request parameter"); + public static final Option> PAGEING = + Options.key("pageing").mapType().noDefaultValue().withDescription("pageing"); public static final Option METHOD = Options.key("method") .enumType(HttpRequestMethod.class) diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java new file mode 100644 index 00000000000..a5c7061347a --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/PageInfo.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.http.config; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import java.io.Serializable; + +@Setter +@Getter +@ToString +public class PageInfo implements Serializable { + + private Long totalPageSize; + + private Integer batchSize; + private String pageField; + private Long pageIndex; +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java index 8e8311b6505..e0a30ad061a 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java @@ -43,6 +43,7 @@ import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField; +import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo; import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; import org.apache.seatunnel.format.json.JsonDeserializationSchema; @@ -53,6 +54,7 @@ @AutoService(SeaTunnelSource.class) public class HttpSource extends AbstractSingleSplitSource { protected final HttpParameter httpParameter = new HttpParameter(); + protected PageInfo pageInfo; protected SeaTunnelRowType rowType; protected JsonField jsonField; protected String contentField; @@ -83,6 +85,31 @@ public void prepare(Config pluginConfig) throws PrepareFailException { } this.httpParameter.buildWithConfig(pluginConfig); buildSchemaWithConfig(pluginConfig); + buildPagingWithConfig(pluginConfig); + } + + private void buildPagingWithConfig(Config pluginConfig) { + if (pluginConfig.hasPath(HttpConfig.PAGEING.key())) { + pageInfo = new PageInfo(); + Config pageConfig = pluginConfig.getConfig(HttpConfig.PAGEING.key()); + if (pageConfig.hasPath(HttpConfig.TOTAL_PAGE_SIZE.key())) { + pageInfo.setTotalPageSize(pageConfig.getLong(HttpConfig.TOTAL_PAGE_SIZE.key())); + } + if (pageConfig.hasPath(HttpConfig.TOTAL_PAGE_SIZE.key())) { + pageInfo.setTotalPageSize(pageConfig.getLong(HttpConfig.TOTAL_PAGE_SIZE.key())); + } else { + pageInfo.setTotalPageSize(HttpConfig.TOTAL_PAGE_SIZE.defaultValue()); + } + + if (pageConfig.hasPath(HttpConfig.BATCH_SIZE.key())) { + pageInfo.setBatchSize(pageConfig.getInt(HttpConfig.BATCH_SIZE.key())); + } else { + pageInfo.setBatchSize(HttpConfig.BATCH_SIZE.defaultValue()); + } + if (pageConfig.hasPath(HttpConfig.PAGE_FIELD.key())) { + pageInfo.setPageField(pageConfig.getString(HttpConfig.PAGE_FIELD.key())); + } + } } protected void buildSchemaWithConfig(Config pluginConfig) { @@ -141,7 +168,8 @@ public AbstractSingleSplitReader createReader( readerContext, this.deserializationSchema, jsonField, - contentField); + contentField, + pageInfo); } private JsonField getJsonField(Config jsonFieldConf) { diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java index 1b9969a2379..3c4669659ff 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse; import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField; +import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo; import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException; @@ -36,6 +37,7 @@ import com.jayway.jsonpath.JsonPath; import com.jayway.jsonpath.Option; import com.jayway.jsonpath.ReadContext; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.io.BufferedReader; @@ -46,8 +48,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; @Slf4j +@Setter public class HttpSourceReader extends AbstractSingleSplitReader { protected final SingleSplitReaderContext context; protected final HttpParameter httpParameter; @@ -61,6 +65,8 @@ public class HttpSourceReader extends AbstractSingleSplitReader { private final String contentJson; private final Configuration jsonConfiguration = Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS); + private boolean noMoreElementFlag = true; + private Optional pageInfoOptional = Optional.empty(); public HttpSourceReader( HttpParameter httpParameter, @@ -75,6 +81,21 @@ public HttpSourceReader( this.contentJson = contentJson; } + public HttpSourceReader( + HttpParameter httpParameter, + SingleSplitReaderContext context, + DeserializationSchema deserializationSchema, + JsonField jsonField, + String contentJson, + PageInfo pageInfo) { + this.context = context; + this.httpParameter = httpParameter; + this.deserializationCollector = new DeserializationCollector(deserializationSchema); + this.jsonField = jsonField; + this.contentJson = contentJson; + this.pageInfoOptional = Optional.ofNullable(pageInfo); + } + @Override public void open() { httpClient = new HttpClientProvider(httpParameter); @@ -87,40 +108,72 @@ public void close() throws IOException { } } - @Override - public void pollNext(Collector output) throws Exception { - try { - HttpResponse response = - httpClient.execute( - this.httpParameter.getUrl(), - this.httpParameter.getMethod().getMethod(), - this.httpParameter.getHeaders(), - this.httpParameter.getParams(), - this.httpParameter.getBody()); - if (HttpResponse.STATUS_OK == response.getCode()) { - String content = response.getContent(); - if (!Strings.isNullOrEmpty(content)) { - if (this.httpParameter.isEnableMultilines()) { - StringReader stringReader = new StringReader(content); - BufferedReader bufferedReader = new BufferedReader(stringReader); - String lineStr; - while ((lineStr = bufferedReader.readLine()) != null) { - collect(output, lineStr); - } - } else { - collect(output, content); + public void pollAndCollectData(Collector output) throws Exception { + HttpResponse response = + httpClient.execute( + this.httpParameter.getUrl(), + this.httpParameter.getMethod().getMethod(), + this.httpParameter.getHeaders(), + this.httpParameter.getParams(), + this.httpParameter.getBody()); + if (HttpResponse.STATUS_OK == response.getCode()) { + String content = response.getContent(); + if (!Strings.isNullOrEmpty(content)) { + if (this.httpParameter.isEnableMultilines()) { + StringReader stringReader = new StringReader(content); + BufferedReader bufferedReader = new BufferedReader(stringReader); + String lineStr; + while ((lineStr = bufferedReader.readLine()) != null) { + collect(output, lineStr); } + } else { + collect(output, content); } - return; } + log.info( + "http client execute success request param:[{}], http response status code:[{}], content:[{}]", + httpParameter.getParams(), + response.getCode(), + response.getContent()); + } else { log.error( "http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent()); + } + } + + private void updateRequestParam(PageInfo pageInfo) { + if (this.httpParameter.getParams() == null) { + httpParameter.setParams(new HashMap<>()); + } + this.httpParameter + .getParams() + .put(pageInfo.getPageField(), pageInfo.getPageIndex().toString()); + } + + @Override + public void pollNext(Collector output) throws Exception { + try { + if (pageInfoOptional.isPresent()) { + noMoreElementFlag = false; + Long pageIndex = 1L; + while (!noMoreElementFlag) { + PageInfo info = pageInfoOptional.get(); + // increment page + info.setPageIndex(pageIndex); + // set request param + updateRequestParam(info); + pollAndCollectData(output); + pageIndex += 1; + } + } else { + pollAndCollectData(output); + } } catch (Exception e) { log.error(e.getMessage(), e); } finally { - if (Boundedness.BOUNDED.equals(context.getBoundedness())) { + if (Boundedness.BOUNDED.equals(context.getBoundedness()) && noMoreElementFlag) { // signal to the source that we have reached the end of the data. log.info("Closed the bounded http source"); context.signalNoMoreElement(); @@ -140,6 +193,21 @@ private void collect(Collector output, String data) throws IOExcep this.initJsonPath(jsonField); data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString(); } + // page increase + if (pageInfoOptional.isPresent()) { + // Determine whether the task is completed by specifying the presence of the 'total + // page' field + PageInfo pageInfo = pageInfoOptional.get(); + if (pageInfo.getTotalPageSize() > 0) { + noMoreElementFlag = pageInfo.getPageIndex() >= pageInfo.getTotalPageSize(); + } else { + // no 'total page' configured + int readSize = JsonUtils.stringToJsonNode(data).size(); + // if read size < BatchSize : read finish + // if read size = BatchSize : read next page. + noMoreElementFlag = readSize < pageInfo.getBatchSize(); + } + } deserializationCollector.collect(data.getBytes(), output); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java index d65617bb555..bd85ed876e5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpIT.java @@ -145,6 +145,13 @@ public void testSourceToAssertSink(TestContainer container) Container.ExecResult execResult14 = container.executeJob("/http_jsonrequestbody_to_assert.conf"); Assertions.assertEquals(0, execResult14.getExitCode()); + + Container.ExecResult execResult15 = + container.executeJob("/http_page_increase_page_num.conf"); + Assertions.assertEquals(0, execResult15.getExitCode()); + Container.ExecResult execResult16 = + container.executeJob("/http_page_increase_no_page_num.conf"); + Assertions.assertEquals(0, execResult16.getExitCode()); } public String getMockServerConfig() { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf new file mode 100644 index 00000000000..387201b379d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_no_page_num.conf @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Http { + result_table_name = "http" + url = "http://mockserver:1080/query/pagesNoPageNum" + method = "GET" + format = "json" + json_field = { + name = "$.data[*].name" + age = "$.data[*].age" + } + pageing = { + batch_size=10 + page_field = page + } + schema = { + fields { + name = string + age = int + } + } + } +} + +sink { + Console { + source_table_name = "http" + } + Assert { + source_table_name = "http" + rules { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 12 + }, + { + rule_type = MAX_ROW + rule_value = 12 + } + ] + field_rules = [ + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = age + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf new file mode 100644 index 00000000000..c3202d6f7b2 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_page_increase_page_num.conf @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + Http { + result_table_name = "http" + url = "http://mockserver:1080/query/pages" + method = "GET" + format = "json" + json_field = { + name = "$.data[*].name" + age = "$.data[*].age" + } + pageing = { + total_page_size = 2 + page_field = page + } + schema = { + fields { + name = string + age = int + } + } + } +} + +sink { + Console { + source_table_name = "http" + } + Assert { + source_table_name = "http" + rules { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 4 + }, + { + rule_type = MAX_ROW + rule_value = 4 + } + ] + field_rules = [ + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = age + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json index 2c419277e04..9cb561225d6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json @@ -4526,5 +4526,149 @@ "Content-Type": "application/json" } } + }, + { + "httpRequest": { + "method" : "GET", + "path": "/query/pages", + "queryStringParameters": { + "page": "1" + } + }, + "httpResponse": { + "body": + { + "status": null, + "msg": null, + "data": [ + { + "name": "name1", + "age": 69 + }, + { + "name": "name2", + "age": 51 + } + ], + "currentPageIndex": 1, + "totalPage": 2 + } + } + }, + { + "httpRequest": { + "method" : "GET", + "path": "/query/pages", + "queryStringParameters": { + "page": "2" + } + }, + "httpResponse": { + "body": + { + "status": null, + "msg": null, + "data": [ + { + "name": "name1", + "age": 69 + }, + { + "name": "name2", + "age": 51 + } + ], + "currentPageIndex": 2, + "totalPage": 2 + } + } + }, + { + "httpRequest": { + "method" : "GET", + "path": "/query/pagesNoPageNum", + "queryStringParameters": { + "page": "1" + } + }, + "httpResponse": { + "body": + { + "status": null, + "msg": null, + "data": [ + { + "name": "name1", + "age": 69 + }, + { + "name": "name2", + "age": 51 + }, + { + "name": "name3", + "age": 36 + }, + { + "name": "name4", + "age": 51 + }, + { + "name": "name5", + "age": 74 + }, + { + "name": "name6", + "age": 51 + }, + { + "name": "name7", + "age": 67 + }, + { + "name": "name8", + "age": 12 + }, + { + "name": "name9", + "age": 45 + }, + { + "name": "name10", + "age": 23 + } + ], + "currentPageIndex": 1, + "hasNext": true + } + } + }, + { + "httpRequest": { + "method" : "GET", + "path": "/query/pagesNoPageNum", + "queryStringParameters": { + "page": "2" + } + }, + "httpResponse": { + "body": + { + "status": null, + "msg": null, + "data": [ + { + "name": "name11", + "age": 69 + }, + { + "name": "name22", + "age": 51 + } + ], + "currentPageIndex": 2, + "hasNext": false + } + } } ]