diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java similarity index 98% rename from inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java rename to inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java index a727eba46bc..7eb16946e95 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/config/OpenApiConstants.java +++ b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.audit.config; +package org.apache.inlong.audit.consts; /** * Open api constants diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java index 766f64f70dd..decec2bd95a 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/AbstractCache.java @@ -37,10 +37,10 @@ import java.util.concurrent.TimeUnit; import static org.apache.inlong.audit.config.ConfigConstants.DATE_FORMAT; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_CACHE_MAX_SIZE; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_CACHE_EXPIRED_HOURS; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_CACHE_MAX_SIZE; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_CACHE_EXPIRED_HOURS; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_CACHE_MAX_SIZE; import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG; /** diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java index 45984e203b7..4160c5da94a 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/cache/RealTimeQuery.java @@ -51,8 +51,8 @@ import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_IDLE_CONNECTIONS; import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MAX_TOTAL_CONNECTIONS; import static org.apache.inlong.audit.config.ConfigConstants.KEY_DATASOURCE_MIN_IDLE_CONNECTIONS; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE; import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL; import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL; import static org.apache.inlong.audit.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL; diff --git a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java index fa1c56ab239..8c363a6b098 100644 --- a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java +++ b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/ApiService.java @@ -48,41 +48,41 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_DAY_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_AUDIT_PROXY_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IDS_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_GET_IPS_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_HOUR_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_MINUTES_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE; -import static org.apache.inlong.audit.config.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT; -import static org.apache.inlong.audit.config.OpenApiConstants.HTTP_RESPOND_CODE; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_BACKLOG_SIZE; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_DAY_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_AUDIT_PROXY_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IDS_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_GET_IPS_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_HOUR_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_MINUTES_PATH; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_REAL_LIMITER_QPS; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_API_THREAD_POOL_SIZE; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_BODY_SUCCESS; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE; -import static org.apache.inlong.audit.config.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_COMPONENT; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_CYCLE; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_ID; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_AUDIT_TAG; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_END_TIME; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_GROUP_Id; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_INLONG_STREAM_Id; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_IP; -import static org.apache.inlong.audit.config.OpenApiConstants.PARAMS_START_TIME; -import static org.apache.inlong.audit.config.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_BACKLOG_SIZE; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_DAY_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_AUDIT_PROXY_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_IDS_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_IPS_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_HOUR_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_MINUTES_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT; +import static org.apache.inlong.audit.consts.OpenApiConstants.HTTP_RESPOND_CODE; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_BACKLOG_SIZE; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_DAY_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_AUDIT_PROXY_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_IDS_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_IPS_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_HOUR_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_MINUTES_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_REAL_LIMITER_QPS; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_SUCCESS; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE; +import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_SERVER_BIND_PORT; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_COMPONENT; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_CYCLE; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_ID; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_TAG; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_END_TIME; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_INLONG_GROUP_Id; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_INLONG_STREAM_Id; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_IP; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_START_TIME; +import static org.apache.inlong.audit.consts.OpenApiConstants.VALUE_HTTP_HEADER_CONTENT_TYPE; import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG; import static org.apache.inlong.audit.entities.ApiType.DAY; import static org.apache.inlong.audit.entities.ApiType.GET_AUDIT_PROXY; diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java new file mode 100644 index 00000000000..12fb8712704 --- /dev/null +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkParallelismOptimizer.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.inlong.audit.AuditIdEnum; +import org.apache.inlong.audit.entity.FlowType; +import org.apache.inlong.manager.pojo.audit.AuditInfo; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.StringJoiner; + +import static java.lang.Math.ceil; +import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_MINUTES_PATH; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_CYCLE; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_AUDIT_ID; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_END_TIME; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_INLONG_GROUP_Id; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_INLONG_STREAM_Id; +import static org.apache.inlong.audit.consts.OpenApiConstants.PARAMS_START_TIME; +import static org.apache.inlong.manager.common.consts.InlongConstants.*; + + +/** + * This class is used to calculate the recommended parallelism based on the maximum message per second per core. + * The data volume is calculated based on the average data count per hour. + * The data count is retrieved from the inlong audit API. + */ +@Slf4j +@Component +public class FlinkParallelismOptimizer { + + @Value("${audit.query.url:http://127.0.0.1:10080}") + public String auditQueryUrl; + + private static final int MAX_PARALLELISM = 2048; + private long maximumMessagePerSecondPerCore = 1000L; + private static final int DEFAULT_PARALLELISM = 1; + private static final long DEFAULT_ERROR_DATA_VOLUME = 0L; + private static final FlowType DEFAULT_FLOWTYPE = FlowType.OUTPUT; + private static final String DEFAULT_AUDIT_TYPE = "DataProxy"; + private static final String AUDIT_CYCLE_REALTIME = "1"; + // maxmimum data scale counting range in hours + private static final int DATA_SCALE_COUNTING_RANGE_IN_HOURS = 1; + private static final String AUDIT_QUERY_DATE_TIME_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS"; //sample time format: 2024-08-23T22:47:38.866 + + private static final String LOGTS_DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; + private static final String TIMEZONE_REGEX = "([+-])(\\d):"; + + + /** + * Calculate recommended parallelism based on maximum message per second per core + * + * @return Recommended parallelism + */ + public int calculateRecommendedParallelism(List streamInfos) { + long averageDataVolume; + InlongStreamInfo streamInfo = streamInfos.get(0); + try { + averageDataVolume = getAverageDataVolume(streamInfo); + log.info("Retrieved data volume: {}", averageDataVolume); + } catch (Exception e) { + log.error("Error retrieving data volume: {}", e.getMessage(), e); + log.warn("Using default data volume: {}", DEFAULT_ERROR_DATA_VOLUME); + averageDataVolume = DEFAULT_ERROR_DATA_VOLUME; + } + int newParallelism = (int) (averageDataVolume / maximumMessagePerSecondPerCore); + newParallelism = Math.max(newParallelism, DEFAULT_PARALLELISM); // Ensure parallelism is at least the default value + newParallelism = Math.min(newParallelism, MAX_PARALLELISM); // Ensure parallelism is at most MAX_PARALLELISM + log.info("Calculated parallelism: {} for data volume: {}", newParallelism, averageDataVolume); + return newParallelism; + } + + /** + * Initialize maximum message per second per core based on configuration + * + * @param maximumMessagePerSecondPerCore The maximum messages per second per core + */ + public void setMaximumMessagePerSecondPerCore(Integer maximumMessagePerSecondPerCore) { + if (maximumMessagePerSecondPerCore == null || maximumMessagePerSecondPerCore <= 0) { + log.error("Illegal flink.maxpercore property, must be nonnull and positive, using default value: {}", maximumMessagePerSecondPerCore); + } else { + this.maximumMessagePerSecondPerCore = maximumMessagePerSecondPerCore; + } + } + + /** + * Get average data volume on the scale specified by DATA_SCALE_COUNTING_RANGE_IN_HOURS + * + * @param streamInfo inlong stream info + * @return The average data count per hour + */ + private long getAverageDataVolume(InlongStreamInfo streamInfo) { + // Since the audit module uses local time, we need to use ZonedDateTime to get the current time + String dataTimeZone = streamInfo.getSourceList().get(0).getDataTimeZone(); + + // This regex pattern matches a time zone offset in the format of "GMT+/-X:00" + // where X is a single digit (e.g., "GMT+8:00"). The pattern captures the "+" or "-" sign + // and the single digit, then it replaces the single digit with two digits by adding a "0" in front of it. + // For example, "GMT+8:00" becomes "GMT+08:00" in order to match standard offset-based ZoneId. + dataTimeZone = dataTimeZone.replaceAll(TIMEZONE_REGEX, "$10$2:"); + ZoneId dataZone = ZoneId.of(dataTimeZone); + + ZonedDateTime endTime = ZonedDateTime.now(dataZone); + ZonedDateTime startTime = endTime.minusHours(DATA_SCALE_COUNTING_RANGE_IN_HOURS); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(AUDIT_QUERY_DATE_TIME_FORMAT); + + // counting data volume on with DATA_PROXY_OUTPUT auditId + int auditId = AuditIdEnum.getAuditId(DEFAULT_AUDIT_TYPE, DEFAULT_FLOWTYPE).getValue(); + StringJoiner urlParameters = new StringJoiner(AMPERSAND) + .add(PARAMS_START_TIME + EQUAL + startTime.format(formatter)) + .add(PARAMS_END_TIME + EQUAL + endTime.format(formatter)) + .add(PARAMS_INLONG_GROUP_Id + EQUAL + streamInfo.getInlongGroupId()) + .add(PARAMS_INLONG_STREAM_Id + EQUAL + streamInfo.getInlongStreamId()) + .add(PARAMS_AUDIT_ID + EQUAL + auditId) + .add(PARAMS_AUDIT_CYCLE + EQUAL + AUDIT_CYCLE_REALTIME); + + String url = auditQueryUrl + DEFAULT_API_MINUTES_PATH + QUESTION_MARK + urlParameters; + + return getAverageDataVolumeFromAuditInfo(url); + } + + /** + * Request audit data from inlong audit API, parse the response and return the total count in the given time range. + * + * @param url The URL to request data from + * @return The total count of the audit data + */ + private long getAverageDataVolumeFromAuditInfo(String url) { + log.debug("Requesting audit data from URL: {}", url); + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(url); + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + return parseResponseAndCalculateAverageDataVolume(response); + } catch (IOException e) { + log.error("Error executing HTTP request to audit API: {}", url, e); + } + } catch (IOException e) { + log.error("Error creating or closing HTTP client: {}", url, e); + } + return DEFAULT_ERROR_DATA_VOLUME; + } + + /** + * Parse the HTTP response and calculate the total count from the audit data. + * + * @param response The HTTP response + * @return The total count of the audit data + * @throws IOException If an I/O error occurs + */ + private long parseResponseAndCalculateAverageDataVolume(CloseableHttpResponse response) throws IOException { + HttpEntity entity = response.getEntity(); + if (entity == null) { + log.warn("Empty response entity from audit API, returning default count."); + return DEFAULT_ERROR_DATA_VOLUME; + } + + String responseString = EntityUtils.toString(entity); + log.debug("Flink dynamic parallelism optimizer got response from audit API: {}", responseString); + + JsonObject jsonObject = JsonParser.parseString(responseString).getAsJsonObject(); + AuditInfo[] auditInfoArray = new Gson().fromJson(jsonObject.getAsJsonArray("data"), AuditInfo[].class); + + ZonedDateTime minLogTs = null; + ZonedDateTime maxLogTs = null; + DateTimeFormatter logTsFormatter = DateTimeFormatter.ofPattern(LOGTS_DATE_TIME_FORMAT).withZone(ZoneId.systemDefault()); + long totalCount = 0L; + for (AuditInfo auditData : auditInfoArray) { + if (auditData != null) { + ZonedDateTime logTs = ZonedDateTime.parse(auditData.getLogTs(), logTsFormatter); + if (minLogTs == null || logTs.isBefore(minLogTs)) { + minLogTs = logTs; + } + if (maxLogTs == null || logTs.isAfter(maxLogTs)) { + maxLogTs = logTs; + } + log.debug("parsed AuditInfo, Count: {}, Size: {}", auditData.getCount(), auditData.getSize()); + totalCount += auditData.getCount(); + } else { + log.error("Null AuditInfo found in response data."); + } + } + + if (minLogTs != null && maxLogTs != null) { + long timeDifferenceInSeconds = maxLogTs.toEpochSecond() - minLogTs.toEpochSecond(); + log.info("Time difference in seconds: {}", timeDifferenceInSeconds); + if (timeDifferenceInSeconds > 0) { + return totalCount / timeDifferenceInSeconds; + } + } + return DEFAULT_ERROR_DATA_VOLUME; + } +} diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java index f3809ce8aeb..664863b759a 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java @@ -17,14 +17,6 @@ package org.apache.inlong.manager.plugin.flink; -import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.manager.common.exceptions.BusinessException; -import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig; -import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; -import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest; -import org.apache.inlong.manager.plugin.flink.enums.Constants; -import org.apache.inlong.manager.plugin.util.FlinkUtils; - import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobID; @@ -39,6 +31,14 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig; +import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; +import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest; +import org.apache.inlong.manager.plugin.flink.enums.Constants; +import org.apache.inlong.manager.plugin.util.ApplicationContextProvider; +import org.apache.inlong.manager.plugin.util.FlinkUtils; import java.io.File; import java.net.MalformedURLException; @@ -69,6 +69,8 @@ public class FlinkService { // map Configuration to FlinkClientService private final Map flinkClientServices = new HashMap<>(); + private final FlinkParallelismOptimizer flinkParallelismOptimizer; + /** * Constructor of FlinkService. */ @@ -76,6 +78,7 @@ public FlinkService() throws Exception { flinkConfig = FlinkUtils.getFlinkConfigFromFile(); parallelism = flinkConfig.getParallelism(); savepointDirectory = flinkConfig.getSavepointDirectory(); + flinkParallelismOptimizer = ApplicationContextProvider.getContext().getBean(FlinkParallelismOptimizer.class); // let spring inject the bean } private static class FlinkServiceHolder { diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java index 7d2948510ca..61885ae6ddf 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java @@ -40,4 +40,8 @@ public class FlinkConfig { // flink version private String version; + private Integer maxpercore; + + // whether to enable dynamic parallelism + private Boolean dynamicParallelism; } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java index b5628e8664f..04be120e146 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java @@ -43,6 +43,10 @@ public class Constants { public static final String FLINK_VERSION = "flink.version"; + public static final String FLINK_MAXPERCORE = "flink.maxpercore"; + + public static final String FLINK_DYNAMIC_PARALLELISM = "flink.dynamicParallelism"; + // dataflow public static final String SOURCE_INFO = "source_info"; diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/ApplicationContextProvider.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/ApplicationContextProvider.java new file mode 100644 index 00000000000..fe6afddb8c2 --- /dev/null +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/ApplicationContextProvider.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.util; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + +/** + * Get the Spring ApplicationContext + * instantiate class with Spring in non-Spring managed environment + */ +@Component +public class ApplicationContextProvider implements ApplicationContextAware { + + private static ApplicationContext context; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) { + context = applicationContext; + } + + public static ApplicationContext getContext() { + return context; + } +} diff --git a/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties index ff34f913f81..c9e155da450 100644 --- a/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties +++ b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties @@ -35,3 +35,7 @@ flink.savepoint.directory=file:///data/inlong-sort/savepoints flink.parallelism=1 # flink stop request drain flink.drain=false +# Flink max data count per core per second +flink.maxpercore=1000 +# switch on or off dynamic parallelism based on data scale +flink.dynamicParallelism=true