Skip to content

Commit

Permalink
[Feature][Connector-V2] Suport choose the start page in http paging (#…
Browse files Browse the repository at this point in the history
…7180)

* feature-http page specifies the home page

* update

* update

* update

---------

Co-authored-by: gaoxi <[email protected]>
Co-authored-by: Jia Fan <[email protected]>
  • Loading branch information
3 people authored Aug 22, 2024
1 parent 70efe8e commit ed15f0d
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/Http.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
| pageing.page_field | String | No | - | This parameter is used to specify the page field name in the request parameter |
| pageing.total_page_size | Int | No | - | This parameter is used to control the total number of pages |
| pageing.batch_size | Int | No | - | The batch size returned per request is used to determine whether to continue when the total number of pages is unknown |
| pageing.start_page_number | Int | No | 1 | Specify the page number from which synchronization starts |
| content_json | String | No | - | This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`. |
| format | String | No | text | The format of upstream data, now only support `json` `text`, default `text`. |
| method | String | No | get | Http request method, only supports GET, POST method. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public class HttpConfig {
.defaultValue(100)
.withDescription(
"the batch size returned per request is used to determine whether to continue when the total number of pages is unknown");
public static final Option<Long> START_PAGE_NUMBER =
Options.key("start_page_number")
.longType()
.defaultValue(1L)
.withDescription("which page to start synchronizing from");
public static final Option<String> PAGE_FIELD =
Options.key("page_field")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ private void buildPagingWithConfig(Config pluginConfig) {
} else {
pageInfo.setTotalPageSize(HttpConfig.TOTAL_PAGE_SIZE.defaultValue());
}
if (pageConfig.hasPath(HttpConfig.START_PAGE_NUMBER.key())) {
pageInfo.setPageIndex(pageConfig.getLong(HttpConfig.START_PAGE_NUMBER.key()));
} else {
pageInfo.setPageIndex(HttpConfig.START_PAGE_NUMBER.defaultValue());
}

if (pageConfig.hasPath(HttpConfig.BATCH_SIZE.key())) {
pageInfo.setBatchSize(pageConfig.getInt(HttpConfig.BATCH_SIZE.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule.Builder getHttpBuilder() {
.optional(HttpConfig.PARAMS)
.optional(HttpConfig.FORMAT)
.optional(HttpConfig.BODY)
.optional(HttpConfig.PAGEING)
.optional(HttpConfig.JSON_FIELD)
.optional(HttpConfig.CONTENT_FIELD)
.conditional(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {
try {
if (pageInfoOptional.isPresent()) {
noMoreElementFlag = false;
Long pageIndex = 1L;
PageInfo info = pageInfoOptional.get();
Long pageIndex = info.getPageIndex();
while (!noMoreElementFlag) {
PageInfo info = pageInfoOptional.get();
// increment page
info.setPageIndex(pageIndex);
// set request param
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ public void testSourceToAssertSink(TestContainer container)

Container.ExecResult execResult18 = container.executeJob("/httpnoschema_to_http.conf");
Assertions.assertEquals(0, execResult18.getExitCode());

Container.ExecResult execResult19 =
container.executeJob("/http_page_increase_start_num.conf");
Assertions.assertEquals(0, execResult19.getExitCode());
}

@DisabledOnContainer(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Http {
result_table_name = "http"
url = "http://mockserver:1080/query/pages"
method = "GET"
format = "json"
json_field = {
name = "$.data[*].name"
age = "$.data[*].age"
}
pageing = {
total_page_size = 2
page_field = page
start_page_number = 2
}
schema = {
fields {
name = string
age = int
}
}
}
}

sink {
Assert {
source_table_name = "http"
rules {
row_rules = [
{
rule_type = MIN_ROW
rule_value = 2
},
{
rule_type = MAX_ROW
rule_value = 2
}
]
field_rules = [
{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = age
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
}
}

0 comments on commit ed15f0d

Please sign in to comment.