From 895f42ca3b8726c0399dc1041b2bfc11253af744 Mon Sep 17 00:00:00 2001 From: healchow Date: Fri, 28 Jul 2023 10:39:41 +0800 Subject: [PATCH] [INLONG-8564][Manager] Fix the task not working of deleting StreamSources (#8593) --- .../dao/mapper/InlongGroupEntityMapper.java | 7 +- .../dao/mapper/StreamSourceEntityMapper.java | 8 ++- .../mappers/InlongGroupEntityMapper.xml | 18 +---- .../mappers/StreamSourceEntityMapper.xml | 41 ++++++++---- .../group/coordinator/Coordinator.java | 31 --------- .../source/StreamSourceServiceImpl.java | 3 +- ...rTask.java => DeleteStreamSourceTask.java} | 66 +++++++++++-------- .../main/resources/application-dev.properties | 5 +- .../resources/application-prod.properties | 6 +- .../resources/application-test.properties | 5 ++ 10 files changed, 92 insertions(+), 98 deletions(-) delete mode 100644 inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/coordinator/Coordinator.java rename inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/{DelGroupCoordinatorTask.java => DeleteStreamSourceTask.java} (58%) diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java index 7d5b7bf3243..3e8189b3013 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongGroupEntityMapper.java @@ -76,6 +76,7 @@ List> countGroupByUser(@Param(value = "username") String use * @param limit max item count * @return all matched group ids */ + @MultiTenantQuery(with = false) List selectDeletedGroupIdsWithTimeBefore(@Param("timeBefore") Date timeBefore, @Param("limit") Integer limit); @@ -84,10 +85,10 @@ List selectDeletedGroupIdsWithTimeBefore(@Param("timeBefore") Date timeB * * @param timeAfter the latest modify time after which to select * @param limit max item count - * @return + * @return all matched group ids */ - List selectDeletedGroupIdsWithTimeAfter(@Param("timeAfter") Date timeAfter, - @Param("limit") Integer limit); + @MultiTenantQuery(with = false) + List selectDeletedGroupIdsWithTimeAfter(@Param("timeAfter") Date timeAfter, @Param("limit") Integer limit); int updateByPrimaryKey(InlongGroupEntity record); diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java index 805c73206a1..ca984fb04b1 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java @@ -135,9 +135,6 @@ List selectHeartbeatTimeoutIds(@Param("sourceTypeList") List so int updateByPrimaryKeySelective(StreamSourceEntity record); - int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId, - @Param("status") Integer status); - int updateByPrimaryKey(StreamSourceEntity record); /** @@ -193,6 +190,11 @@ void updateStatusByIds(@Param("idList") List idList, @Param("status") I */ void updateStatusByDeleted(); + int logicalDeleteByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId, + @Param("status") Integer status); + + int logicalDeleteByIds(@Param("idList") List idList, @Param("status") Integer status); + /** * Logical delete stream source by agentIp, change status at same time. * diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml index 600638c7219..923b5f1b539 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml @@ -391,32 +391,20 @@ diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index 7ae871cad4c..d927608c572 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -360,18 +360,6 @@ and inlong_cluster_name = #{clusterName, jdbcType=VARCHAR} - - update stream_source - - is_deleted = id, - previous_status = status, - status = #{status, jdbcType=INTEGER}, - version = version + 1 - - where is_deleted = 0 - and inlong_group_id = #{groupId, jdbcType=VARCHAR} - and inlong_stream_id = #{streamId, jdbcType=VARCHAR} - update stream_source @@ -551,7 +539,6 @@ and status not in (101, 102, 104, 105, 110) - update stream_source @@ -563,6 +550,33 @@ and status not in (99, 201, 301) + + + update stream_source + + is_deleted = id, + previous_status = status, + status = #{status, jdbcType=INTEGER}, + version = version + 1 + + where is_deleted = 0 + and inlong_group_id = #{groupId, jdbcType=VARCHAR} + and inlong_stream_id = #{streamId, jdbcType=VARCHAR} + + + + update stream_source + + is_deleted = id, + previous_status = status, + status = #{status, jdbcType=INTEGER}, + version = version + 1 + + where id in + + #{item} + + update stream_source @@ -577,6 +591,7 @@ and status = #{targetStatus, jdbcType=INTEGER} + delete from stream_source diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/coordinator/Coordinator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/coordinator/Coordinator.java deleted file mode 100644 index 95ba2fc8e84..00000000000 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/coordinator/Coordinator.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.service.group.coordinator; - -/** - * Operator which ensure the consistency of the state of all components within the group - */ -public interface Coordinator { - - /** - * Make inner state of one group to eventual consistency - * @param inlongGroupId - */ - public void coordinate(String inlongGroupId); - -} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index c18aef1fe9c..fe6109fba13 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -494,7 +494,8 @@ public Boolean forceDelete(String groupId, String streamId, String operator) { Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY); Preconditions.expectNotBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY); - int sourceCount = sourceMapper.updateByRelatedId(groupId, streamId, SourceStatus.TO_BE_ISSUED_DELETE.getCode()); + int sourceCount = sourceMapper.logicalDeleteByRelatedId(groupId, streamId, + SourceStatus.TO_BE_ISSUED_DELETE.getCode()); int fieldCount = sourceFieldMapper.updateByRelatedId(groupId, streamId); LOGGER.info("success to force delete source for groupId={} and streamId={} by user={}," + " update {} sources and {} fields", groupId, streamId, operator, sourceCount, fieldCount); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DelGroupCoordinatorTask.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java similarity index 58% rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DelGroupCoordinatorTask.java rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java index dcbd21e84db..f4def6697fd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DelGroupCoordinatorTask.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/task/DeleteStreamSourceTask.java @@ -21,7 +21,6 @@ import org.apache.inlong.manager.dao.entity.StreamSourceEntity; import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper; import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper; -import org.apache.inlong.manager.service.group.coordinator.Coordinator; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; @@ -35,67 +34,76 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.TimerTask; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; import java.util.concurrent.TimeUnit; /** - * Stop all stream source which is running when group is deleted + * Delete all stream source which is running but its inlong group was deleted. */ @Slf4j @Service -public class DelGroupCoordinatorTask extends TimerTask implements Coordinator, InitializingBean { +public class DeleteStreamSourceTask extends TimerTask implements InitializingBean { - private static final int INITIAL_DELAY = 300; - private static final int INTERVAL = 1800; + private static final int INITIAL_DELAY_MINUTES = 5; + private static final int INTERVAL_MINUTES = 60; - @Value("${group.compromise.batchSize:100}") + @Value("${group.deleted.batchSize:100}") private Integer batchSize; + @Value("${group.deleted.latest.hours:10}") + private Integer latestHours; + @Autowired private InlongGroupEntityMapper groupMapper; @Autowired private StreamSourceEntityMapper sourceMapper; @Override - public void afterPropertiesSet() throws Exception { - log.info("start delete group compromise task"); - ScheduledExecutorService executor = - Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("group-compromise" - + "-%s").build()); - executor.scheduleWithFixedDelay(this, INITIAL_DELAY, INTERVAL, TimeUnit.SECONDS); + public void afterPropertiesSet() { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("inlong-group-delete-%s").build(); + ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, threadFactory, new AbortPolicy()); + executor.scheduleWithFixedDelay(this, INITIAL_DELAY_MINUTES, INTERVAL_MINUTES, TimeUnit.MINUTES); + + log.info("success to start the delete stream source task"); } @Override public void run() { - LocalDateTime now = LocalDateTime.now(); - LocalDateTime twoHoursAgo = now.minusHours(2).truncatedTo(ChronoUnit.HOURS); - Date modifyTime = Date.from(twoHoursAgo.atZone(ZoneId.systemDefault()).toInstant()); + LocalDateTime currentTime = LocalDateTime.now(); + LocalDateTime latestTime = currentTime.minusHours(latestHours).truncatedTo(ChronoUnit.HOURS); + Date modifyTime = Date.from(latestTime.atZone(ZoneId.systemDefault()).toInstant()); + List groupIds = groupMapper.selectDeletedGroupIdsWithTimeAfter(modifyTime, batchSize); if (CollectionUtils.isEmpty(groupIds)) { return; } - for (String groupId : groupIds) { - coordinate(groupId); - } + + deleteSources(groupIds); } - @Override - public void coordinate(String inlongGroupId) { - List sourceList = sourceMapper.selectByRelatedId(inlongGroupId, null, null); + private void deleteSources(List inlongGroupIds) { + List sourceList = sourceMapper.selectByGroupIds(inlongGroupIds); if (CollectionUtils.isEmpty(sourceList)) { return; } + + List idList = new ArrayList<>(); for (StreamSourceEntity source : sourceList) { - if (SourceStatus.SOURCE_NORMAL.getCode().equals(source.getStatus()) && StringUtils.isNotBlank( - source.getInlongClusterNodeGroup())) { - source.setPreviousStatus(source.getStatus()); - source.setStatus(SourceStatus.TO_BE_ISSUED_DELETE.getCode()); - source.setIsDeleted(source.getId()); - sourceMapper.updateByPrimaryKey(source); + if (SourceStatus.SOURCE_NORMAL.getCode().equals(source.getStatus()) + && StringUtils.isNotBlank(source.getInlongClusterNodeGroup())) { + idList.add(source.getId()); } } + + if (CollectionUtils.isNotEmpty(idList)) { + sourceMapper.logicalDeleteByIds(idList, SourceStatus.TO_BE_ISSUED_DELETE.getCode()); + log.info("success to delete stream source with id in {}", idList); + } } -} \ No newline at end of file +} diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index 5f35ddc8f71..9e77553ac5b 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -98,4 +98,7 @@ source.update.interval=60 source.cleansing.enabled=false source.cleansing.interval=600 - +# Select the InlongGroupIds whose latest modification time is within how many hours, the default is 10 hours +group.deleted.latest.hours=10 +# The maximum size when querying InlongGroupIds in batches, those InlongGroupIds will be used to delete the related StreamSources. +group.deleted.batchSize=100 diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index e57a4329451..c3bb35f3cef 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -97,5 +97,7 @@ source.update.interval=60 source.cleansing.enabled=false source.cleansing.interval=600 -# Group batch size to compromise in one period -group.compromise.batchSize=100 +# Select the InlongGroupIds whose latest modification time is within how many hours, the default is 10 hours +group.deleted.latest.hours=10 +# The maximum size when querying InlongGroupIds in batches, those InlongGroupIds will be used to delete the related StreamSources. +group.deleted.batchSize=100 diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index e0c0d884246..9e77553ac5b 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -97,3 +97,8 @@ source.update.interval=60 # If turned on, tasks in the incorrect state are periodically deleted source.cleansing.enabled=false source.cleansing.interval=600 + +# Select the InlongGroupIds whose latest modification time is within how many hours, the default is 10 hours +group.deleted.latest.hours=10 +# The maximum size when querying InlongGroupIds in batches, those InlongGroupIds will be used to delete the related StreamSources. +group.deleted.batchSize=100