Skip to content

Commit

Permalink
[Feature][Connector-V2] HTTP supports page increase #5477 (#5561)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: xiaofan2022 <[email protected]>
Co-authored-by: Eric <[email protected]>
  • Loading branch information
3 people authored Oct 26, 2023
1 parent 5174639 commit bb180b2
Show file tree
Hide file tree
Showing 9 changed files with 529 additions and 25 deletions.
33 changes: 33 additions & 0 deletions docs/en/connector-v2/source/Http.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
| schema | Config | No | - | Http and seatunnel data structure mapping |
| schema.fields | Config | No | - | The schema fields of upstream data |
| json_field | Config | No | - | This parameter helps you configure the schema,so this parameter must be used with schema. |
| pageing | Config | No | - | This parameter is used for paging queries |
| pageing.page_field | String | No | - | This parameter is used to specify the page field name in the request parameter |
| pageing.total_page_size | Int | No | - | This parameter is used to control the total number of pages |
| pageing.batch_size | Int | No | - | The batch size returned per request is used to determine whether to continue when the total number of pages is unknown |
| content_json | String | No | - | This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. |
| format | String | No | json | The format of upstream data, now only support `json` `text`, default `json`. |
| method | String | No | get | Http request method, only supports GET, POST method. |
Expand Down Expand Up @@ -310,6 +314,35 @@ source {
- Test data can be found at this link [mockserver-config.json](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-config.json)
- See this link for task configuration [http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf).

### pageing

```hocon
source {
Http {
url = "http://localhost:8080/mock/queryData"
method = "GET"
format = "json"
params={
page: "${page}"
}
content_field = "$.data.*"
pageing={
total_page_size=20
page_field=page
#when don't know the total_page_size use batch_size if read size<batch_size finish ,otherwise continue
#batch_size=10
}
schema = {
fields {
name = string
age = string
}
}
}
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,25 @@ public class HttpConfig {
public static final boolean DEFAULT_ENABLE_MULTI_LINES = false;
public static final Option<String> URL =
Options.key("url").stringType().noDefaultValue().withDescription("Http request url");
public static final Option<Long> TOTAL_PAGE_SIZE =
Options.key("total_page_size")
.longType()
.defaultValue(0L)
.withDescription("total page size");
public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(100)
.withDescription(
"the batch size returned per request is used to determine whether to continue when the total number of pages is unknown");
public static final Option<String> PAGE_FIELD =
Options.key("page_field")
.stringType()
.defaultValue("page")
.withDescription(
"this parameter is used to specify the page field name in the request parameter");
public static final Option<Map<String, String>> PAGEING =
Options.key("pageing").mapType().noDefaultValue().withDescription("pageing");
public static final Option<HttpRequestMethod> METHOD =
Options.key("method")
.enumType(HttpRequestMethod.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.http.config;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.io.Serializable;

@Setter
@Getter
@ToString
public class PageInfo implements Serializable {

private Long totalPageSize;

private Integer batchSize;
private String pageField;
private Long pageIndex;
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;

Expand All @@ -53,6 +54,7 @@
@AutoService(SeaTunnelSource.class)
public class HttpSource extends AbstractSingleSplitSource<SeaTunnelRow> {
protected final HttpParameter httpParameter = new HttpParameter();
protected PageInfo pageInfo;
protected SeaTunnelRowType rowType;
protected JsonField jsonField;
protected String contentField;
Expand Down Expand Up @@ -83,6 +85,31 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
}
this.httpParameter.buildWithConfig(pluginConfig);
buildSchemaWithConfig(pluginConfig);
buildPagingWithConfig(pluginConfig);
}

private void buildPagingWithConfig(Config pluginConfig) {
if (pluginConfig.hasPath(HttpConfig.PAGEING.key())) {
pageInfo = new PageInfo();
Config pageConfig = pluginConfig.getConfig(HttpConfig.PAGEING.key());
if (pageConfig.hasPath(HttpConfig.TOTAL_PAGE_SIZE.key())) {
pageInfo.setTotalPageSize(pageConfig.getLong(HttpConfig.TOTAL_PAGE_SIZE.key()));
}
if (pageConfig.hasPath(HttpConfig.TOTAL_PAGE_SIZE.key())) {
pageInfo.setTotalPageSize(pageConfig.getLong(HttpConfig.TOTAL_PAGE_SIZE.key()));
} else {
pageInfo.setTotalPageSize(HttpConfig.TOTAL_PAGE_SIZE.defaultValue());
}

if (pageConfig.hasPath(HttpConfig.BATCH_SIZE.key())) {
pageInfo.setBatchSize(pageConfig.getInt(HttpConfig.BATCH_SIZE.key()));
} else {
pageInfo.setBatchSize(HttpConfig.BATCH_SIZE.defaultValue());
}
if (pageConfig.hasPath(HttpConfig.PAGE_FIELD.key())) {
pageInfo.setPageField(pageConfig.getString(HttpConfig.PAGE_FIELD.key()));
}
}
}

protected void buildSchemaWithConfig(Config pluginConfig) {
Expand Down Expand Up @@ -141,7 +168,8 @@ public AbstractSingleSplitReader<SeaTunnelRow> createReader(
readerContext,
this.deserializationSchema,
jsonField,
contentField);
contentField,
pageInfo);
}

private JsonField getJsonField(Config jsonFieldConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
import org.apache.seatunnel.connectors.seatunnel.http.config.PageInfo;
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;

Expand All @@ -36,6 +37,7 @@
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ReadContext;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.io.BufferedReader;
Expand All @@ -46,8 +48,10 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

@Slf4j
@Setter
public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
protected final SingleSplitReaderContext context;
protected final HttpParameter httpParameter;
Expand All @@ -61,6 +65,8 @@ public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
private final String contentJson;
private final Configuration jsonConfiguration =
Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
private boolean noMoreElementFlag = true;
private Optional<PageInfo> pageInfoOptional = Optional.empty();

public HttpSourceReader(
HttpParameter httpParameter,
Expand All @@ -75,6 +81,21 @@ public HttpSourceReader(
this.contentJson = contentJson;
}

public HttpSourceReader(
HttpParameter httpParameter,
SingleSplitReaderContext context,
DeserializationSchema<SeaTunnelRow> deserializationSchema,
JsonField jsonField,
String contentJson,
PageInfo pageInfo) {
this.context = context;
this.httpParameter = httpParameter;
this.deserializationCollector = new DeserializationCollector(deserializationSchema);
this.jsonField = jsonField;
this.contentJson = contentJson;
this.pageInfoOptional = Optional.ofNullable(pageInfo);
}

@Override
public void open() {
httpClient = new HttpClientProvider(httpParameter);
Expand All @@ -87,40 +108,72 @@ public void close() throws IOException {
}
}

@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
try {
HttpResponse response =
httpClient.execute(
this.httpParameter.getUrl(),
this.httpParameter.getMethod().getMethod(),
this.httpParameter.getHeaders(),
this.httpParameter.getParams(),
this.httpParameter.getBody());
if (HttpResponse.STATUS_OK == response.getCode()) {
String content = response.getContent();
if (!Strings.isNullOrEmpty(content)) {
if (this.httpParameter.isEnableMultilines()) {
StringReader stringReader = new StringReader(content);
BufferedReader bufferedReader = new BufferedReader(stringReader);
String lineStr;
while ((lineStr = bufferedReader.readLine()) != null) {
collect(output, lineStr);
}
} else {
collect(output, content);
public void pollAndCollectData(Collector<SeaTunnelRow> output) throws Exception {
HttpResponse response =
httpClient.execute(
this.httpParameter.getUrl(),
this.httpParameter.getMethod().getMethod(),
this.httpParameter.getHeaders(),
this.httpParameter.getParams(),
this.httpParameter.getBody());
if (HttpResponse.STATUS_OK == response.getCode()) {
String content = response.getContent();
if (!Strings.isNullOrEmpty(content)) {
if (this.httpParameter.isEnableMultilines()) {
StringReader stringReader = new StringReader(content);
BufferedReader bufferedReader = new BufferedReader(stringReader);
String lineStr;
while ((lineStr = bufferedReader.readLine()) != null) {
collect(output, lineStr);
}
} else {
collect(output, content);
}
return;
}
log.info(
"http client execute success request param:[{}], http response status code:[{}], content:[{}]",
httpParameter.getParams(),
response.getCode(),
response.getContent());
} else {
log.error(
"http client execute exception, http response status code:[{}], content:[{}]",
response.getCode(),
response.getContent());
}
}

private void updateRequestParam(PageInfo pageInfo) {
if (this.httpParameter.getParams() == null) {
httpParameter.setParams(new HashMap<>());
}
this.httpParameter
.getParams()
.put(pageInfo.getPageField(), pageInfo.getPageIndex().toString());
}

@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
try {
if (pageInfoOptional.isPresent()) {
noMoreElementFlag = false;
Long pageIndex = 1L;
while (!noMoreElementFlag) {
PageInfo info = pageInfoOptional.get();
// increment page
info.setPageIndex(pageIndex);
// set request param
updateRequestParam(info);
pollAndCollectData(output);
pageIndex += 1;
}
} else {
pollAndCollectData(output);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
if (Boundedness.BOUNDED.equals(context.getBoundedness()) && noMoreElementFlag) {
// signal to the source that we have reached the end of the data.
log.info("Closed the bounded http source");
context.signalNoMoreElement();
Expand All @@ -140,6 +193,21 @@ private void collect(Collector<SeaTunnelRow> output, String data) throws IOExcep
this.initJsonPath(jsonField);
data = JsonUtils.toJsonNode(parseToMap(decodeJSON(data), jsonField)).toString();
}
// page increase
if (pageInfoOptional.isPresent()) {
// Determine whether the task is completed by specifying the presence of the 'total
// page' field
PageInfo pageInfo = pageInfoOptional.get();
if (pageInfo.getTotalPageSize() > 0) {
noMoreElementFlag = pageInfo.getPageIndex() >= pageInfo.getTotalPageSize();
} else {
// no 'total page' configured
int readSize = JsonUtils.stringToJsonNode(data).size();
// if read size < BatchSize : read finish
// if read size = BatchSize : read next page.
noMoreElementFlag = readSize < pageInfo.getBatchSize();
}
}
deserializationCollector.collect(data.getBytes(), output);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ public void testSourceToAssertSink(TestContainer container)
Container.ExecResult execResult14 =
container.executeJob("/http_jsonrequestbody_to_assert.conf");
Assertions.assertEquals(0, execResult14.getExitCode());

Container.ExecResult execResult15 =
container.executeJob("/http_page_increase_page_num.conf");
Assertions.assertEquals(0, execResult15.getExitCode());
Container.ExecResult execResult16 =
container.executeJob("/http_page_increase_no_page_num.conf");
Assertions.assertEquals(0, execResult16.getExitCode());
}

public String getMockServerConfig() {
Expand Down
Loading

0 comments on commit bb180b2

Please sign in to comment.