diff --git a/docs/en/Connector-v2-release-state.md b/docs/en/Connector-v2-release-state.md index 2691192a336..d483c5d3548 100644 --- a/docs/en/Connector-v2-release-state.md +++ b/docs/en/Connector-v2-release-state.md @@ -44,6 +44,7 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex | [LocalFile](connector-v2/source/LocalFile.md) | Source | Beta | 2.2.0-beta | | [MongoDB](connector-v2/source/MongoDB.md) | Source | Beta | 2.2.0-beta | | [MongoDB](connector-v2/sink/MongoDB.md) | Sink | Beta | 2.2.0-beta | +| [MyHours](connector-v2/source/MyHours.md) | Source | Alpha | 2.2.0-beta | | [Neo4j](connector-v2/sink/Neo4j.md) | Sink | Alpha | 2.2.0-beta | | [OssFile](connector-v2/sink/OssFile.md) | Sink | Alpha | 2.2.0-beta | | [OssFile](connector-v2/source/OssFile.md) | Source | Beta | 2.2.0-beta | diff --git a/docs/en/connector-v2/source/MyHours.md b/docs/en/connector-v2/source/MyHours.md new file mode 100644 index 00000000000..c5ca3268c83 --- /dev/null +++ b/docs/en/connector-v2/source/MyHours.md @@ -0,0 +1,177 @@ +# My Hours + +> My Hours source connector + +## Description + +Used to read data from My Hours. + +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [schema projection](../../concept/connector-v2-features.md) +- [ ] [parallelism](../../concept/connector-v2-features.md) +- [ ] [support user-defined split](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +| --------------------------- | ------ | -------- | ------------- | +| url | String | Yes | - | +| email | String | Yes | - | +| password | String | Yes | - | +| method | String | No | get | +| schema.fields | Config | No | - | +| format | String | No | json | +| params | Map | No | - | +| body | String | No | - | +| poll_interval_ms | int | No | - | +| retry | int | No | - | +| retry_backoff_multiplier_ms | int | No | 100 | +| retry_backoff_max_ms | int | No | 10000 | +| common-options | config | No | - | + +### url [String] + +http request url + +### email [String] + +email for login + +### password [String] + +password for login + +### method [String] + +http request method, only supports GET, POST method + +### params [Map] + +http params + +### body [String] + +http body + +### poll_interval_ms [int] + +request http api interval(millis) in stream mode + +### retry [int] + +The max retry times if request http return to `IOException` + +### retry_backoff_multiplier_ms [int] + +The retry-backoff times(millis) multiplier if request http failed + +### retry_backoff_max_ms [int] + +The maximum retry-backoff times(millis) if request http failed + +### format [String] + +the format of upstream data, now only support `json` `text`, default `json`. + +when you assign format is `json`, you should also assign schema option, for example: + +upstream data is the following: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +you should assign schema as the following: + +```hocon + +schema { + fields { + code = int + data = string + success = boolean + } +} + +``` + +connector will generate data as the following: + +| code | data | success | +|------|-------------|---------| +| 200 | get success | true | + +when you assign format is `text`, connector will do nothing for upstream data, for example: + +upstream data is the following: + +```json + +{"code": 200, "data": "get success", "success": true} + +``` + +connector will generate data as the following: + +| content | +|---------| +| {"code": 200, "data": "get success", "success": true} | + +### schema [Config] + +#### fields [Config] + +the schema fields of upstream data + +### common options + +Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details + +## Example + +```hocon +MyHours{ + url = "https://api2.myhours.com/api/Projects/getAll" + email = "seatunnel@test.com" + password = "seatunnel" + schema { + fields { + name = string + archived = boolean + dateArchived = string + dateCreated = string + clientName = string + budgetAlertPercent = string + budgetType = int + totalTimeLogged = double + budgetValue = double + totalAmount = double + totalExpense = double + laborCost = double + totalCost = double + billableTimeLogged = double + totalBillableAmount = double + billable = boolean + roundType = int + roundInterval = int + budgetSpentPercentage = double + budgetTarget = int + budgetPeriodType = string + budgetSpent = string + id = string + } + } +} +``` + +## Changelog + +### next version + +- Add My Hours Source Connector diff --git a/plugin-mapping.properties b/plugin-mapping.properties index c943de4e35a..e7fe7c79ff8 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -142,4 +142,5 @@ seatunnel.sink.Amazondynamodb = connector-amazondynamodb seatunnel.source.Cassandra = connector-cassandra seatunnel.sink.Cassandra = connector-cassandra seatunnel.sink.StarRocks = connector-starrocks +seatunnel.source.MyHours = connector-http-myhours seatunnel.sink.InfluxDB = connector-influxdb \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java index 0eae378722f..67743428197 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/client/HttpClientProvider.java @@ -100,11 +100,11 @@ public void onRetry(Attempt attempt) { .build(); } - public HttpResponse execute(String url, String method, Map headers, Map params) throws Exception { + public HttpResponse execute(String url, String method, Map headers, Map params, String body) throws Exception { // convert method option to uppercase method = method.toUpperCase(Locale.ROOT); if (HttpPost.METHOD_NAME.equals(method)) { - return doPost(url, headers, params); + return doPost(url, headers, params, body); } if (HttpGet.METHOD_NAME.equals(method)) { return doGet(url, headers, params); @@ -243,6 +243,31 @@ public HttpResponse doPost(String url, Map headers, String body) return getResponse(httpPost); } + /** + * Send a post request with request headers , request parameters and request body + * + * @param url request address + * @param headers request header map + * @param params request parameter map + * @param body request body + * @return http response result + * @throws Exception information + */ + public HttpResponse doPost(String url, Map headers, Map params, String body) throws Exception { + // create a new http get + HttpPost httpPost = new HttpPost(url); + // set default request config + httpPost.setConfig(REQUEST_CONFIG); + // set request header + addHeaders(httpPost, headers); + // set request params + addParameters(httpPost, params); + // add body in request + addBody(httpPost, body); + // return http response + return getResponse(httpPost); + } + /** * Send a put request without request parameters * diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java index f2f1b33cc60..aaccb57ac5b 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpParameter.java @@ -28,15 +28,15 @@ @Data @SuppressWarnings("MagicNumber") public class HttpParameter implements Serializable { - private String url; - private String method; - private Map headers; - private Map params; - private String body; - private int pollIntervalMillis; - private int retry; - private int retryBackoffMultiplierMillis = 100; - private int retryBackoffMaxMillis = 10000; + protected String url; + protected String method; + protected Map headers; + protected Map params; + protected String body; + protected int pollIntervalMillis; + protected int retry; + protected int retryBackoffMultiplierMillis = 100; + protected int retryBackoffMaxMillis = 10000; public void buildWithConfig(Config pluginConfig) { // set url diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java index 9887f7c71ed..8fbb9bff2f0 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java @@ -65,6 +65,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException { throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); } this.httpParameter.buildWithConfig(pluginConfig); + buildSchemaWithConfig(pluginConfig); + } + + protected void buildSchemaWithConfig(Config pluginConfig) { if (pluginConfig.hasPath(HttpConfig.SCHEMA)) { Config schema = pluginConfig.getConfig(HttpConfig.SCHEMA); this.rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType(); diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java index 1582c644238..5bacae516b8 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java @@ -61,7 +61,7 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { try { - HttpResponse response = httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams()); + HttpResponse response = httpClient.execute(this.httpParameter.getUrl(), this.httpParameter.getMethod(), this.httpParameter.getHeaders(), this.httpParameter.getParams(), this.httpParameter.getBody()); if (HttpResponse.STATUS_OK == response.getCode()) { String content = response.getContent(); if (!Strings.isNullOrEmpty(content)) { diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml b/seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml new file mode 100644 index 00000000000..c4d9d61c29e --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/pom.xml @@ -0,0 +1,40 @@ + + + + + connector-http + org.apache.seatunnel + ${revision} + + 4.0.0 + + connector-http-myhours + + + + org.apache.seatunnel + connector-http-base + ${project.version} + + + + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java new file mode 100644 index 00000000000..9d7e7a9375f --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/MyHoursSource.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.myhours.source; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider; +import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse; +import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource; +import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader; +import org.apache.seatunnel.connectors.seatunnel.myhours.source.config.MyHoursSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.myhours.source.config.MyHoursSourceParameter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; +import com.google.common.base.Strings; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +@Slf4j +@AutoService(SeaTunnelSource.class) +public class MyHoursSource extends HttpSource { + private final MyHoursSourceParameter myHoursSourceParameter = new MyHoursSourceParameter(); + + @Override + public String getPluginName() { + return "MyHours"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, MyHoursSourceConfig.URL, MyHoursSourceConfig.EMAIL, MyHoursSourceConfig.PASSWORD); + if (!result.isSuccess()) { + throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg()); + } + //Login to get accessToken + String accessToken = null; + accessToken = getAccessToken(pluginConfig); + this.myHoursSourceParameter.buildWithConfig(pluginConfig, accessToken); + buildSchemaWithConfig(pluginConfig); + } + + @Override + public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { + return new HttpSourceReader(this.myHoursSourceParameter, readerContext, this.deserializationSchema); + } + + private String getAccessToken(Config pluginConfig){ + MyHoursSourceParameter myHoursLoginParameter = new MyHoursSourceParameter(); + myHoursLoginParameter.buildWithLoginConfig(pluginConfig); + HttpClientProvider loginHttpClient = new HttpClientProvider(myHoursLoginParameter); + try { + HttpResponse response = loginHttpClient.doPost(myHoursLoginParameter.getUrl(), myHoursLoginParameter.getBody()); + if (HttpResponse.STATUS_OK == response.getCode()) { + String content = response.getContent(); + if (!Strings.isNullOrEmpty(content)) { + Map contentMap = JsonUtils.toMap(content); + return contentMap.get(MyHoursSourceConfig.ACCESSTOKEN); + } + } + throw new RuntimeException(String.format("login http client execute exception, http response status code:[%d], content:[%s]", + response.getCode(), + response.getContent())); + } catch (Exception e) { + throw new RuntimeException("login http client execute exception"); + } finally { + if (Objects.nonNull(loginHttpClient)) { + try { + loginHttpClient.close(); + } catch (IOException e) { + log.warn(e.getMessage(), e); + } + } + } + } + +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java new file mode 100644 index 00000000000..32466a76152 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceConfig.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.myhours.source.config; + +public class MyHoursSourceConfig { + public static final String URL = "url"; + public static final String POST = "POST"; + public static final String EMAIL = "email"; + public static final String PASSWORD = "password"; + public static final String GRANTTYPE = "grantType"; + public static final String CLIENTID = "clientId"; + public static final String API = "api"; + public static final String AUTHORIZATION = "Authorization"; + public static final String ACCESSTOKEN = "accessToken"; + public static final String ACCESSTOKEN_PREFIX = "Bearer"; + public static final String AUTHORIZATION_URL = "https://api2.myhours.com/api/tokens/login"; +} diff --git a/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java new file mode 100644 index 00000000000..40beedb2763 --- /dev/null +++ b/seatunnel-connectors-v2/connector-http/connector-http-myhours/src/main/java/org/apache/seatunnel/connectors/seatunnel/myhours/source/config/MyHoursSourceParameter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.myhours.source.config; + +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import java.util.HashMap; +import java.util.Map; + +public class MyHoursSourceParameter extends HttpParameter { + public void buildWithConfig(Config pluginConfig, String accessToken) { + super.buildWithConfig(pluginConfig); + //put authorization in headers + this.headers = this.getHeaders() == null ? new HashMap<>() : this.getHeaders(); + this.headers.put(MyHoursSourceConfig.AUTHORIZATION, MyHoursSourceConfig.ACCESSTOKEN_PREFIX + " " + accessToken); + this.setHeaders(this.headers); + } + + public void buildWithLoginConfig(Config pluginConfig) { + // set url + this.setUrl(MyHoursSourceConfig.AUTHORIZATION_URL); + // set method + this.setMethod(MyHoursSourceConfig.POST); + // set body + Map bodyParams = new HashMap(); + String email = pluginConfig.getString(MyHoursSourceConfig.EMAIL); + String password = pluginConfig.getString(MyHoursSourceConfig.PASSWORD); + bodyParams.put(MyHoursSourceConfig.GRANTTYPE, MyHoursSourceConfig.PASSWORD); + bodyParams.put(MyHoursSourceConfig.EMAIL, email); + bodyParams.put(MyHoursSourceConfig.PASSWORD, password); + bodyParams.put(MyHoursSourceConfig.CLIENTID, MyHoursSourceConfig.API); + String body = JsonUtils.toJsonString(bodyParams); + this.setBody(body); + // set retry + if (pluginConfig.hasPath(HttpConfig.RETRY)) { + this.setRetry(pluginConfig.getInt(HttpConfig.RETRY)); + if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)) { + this.setRetryBackoffMultiplierMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)); + } + if (pluginConfig.hasPath(HttpConfig.RETRY_BACKOFF_MAX_MS)) { + this.setRetryBackoffMaxMillis(pluginConfig.getInt(HttpConfig.RETRY_BACKOFF_MAX_MS)); + } + } + } +} diff --git a/seatunnel-connectors-v2/connector-http/pom.xml b/seatunnel-connectors-v2/connector-http/pom.xml index 5b699bef0d3..7e61374d956 100644 --- a/seatunnel-connectors-v2/connector-http/pom.xml +++ b/seatunnel-connectors-v2/connector-http/pom.xml @@ -33,6 +33,7 @@ connector-http-base connector-http-feishu connector-http-wechat + connector-http-myhours \ No newline at end of file diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index d29e0d706d5..3cf94f3b1b7 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -153,6 +153,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-http-myhours + ${project.version} + provided + org.apache.seatunnel connector-jdbc