Skip to content

Commit

Permalink
[INLONG-8564][Manager] Fix the task not working of deleting StreamSou…
Browse files Browse the repository at this point in the history
…rces (#8593)
  • Loading branch information
healchow authored Jul 28, 2023
1 parent 0bf55ec commit 895f42c
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ List<Map<String, Object>> countGroupByUser(@Param(value = "username") String use
* @param limit max item count
* @return all matched group ids
*/
@MultiTenantQuery(with = false)
List<String> selectDeletedGroupIdsWithTimeBefore(@Param("timeBefore") Date timeBefore,
@Param("limit") Integer limit);

Expand All @@ -84,10 +85,10 @@ List<String> selectDeletedGroupIdsWithTimeBefore(@Param("timeBefore") Date timeB
*
* @param timeAfter the latest modify time after which to select
* @param limit max item count
* @return
* @return all matched group ids
*/
List<String> selectDeletedGroupIdsWithTimeAfter(@Param("timeAfter") Date timeAfter,
@Param("limit") Integer limit);
@MultiTenantQuery(with = false)
List<String> selectDeletedGroupIdsWithTimeAfter(@Param("timeAfter") Date timeAfter, @Param("limit") Integer limit);

int updateByPrimaryKey(InlongGroupEntity record);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,6 @@ List<Integer> selectHeartbeatTimeoutIds(@Param("sourceTypeList") List<String> so

int updateByPrimaryKeySelective(StreamSourceEntity record);

int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
@Param("status") Integer status);

int updateByPrimaryKey(StreamSourceEntity record);

/**
Expand Down Expand Up @@ -193,6 +190,11 @@ void updateStatusByIds(@Param("idList") List<Integer> idList, @Param("status") I
*/
void updateStatusByDeleted();

int logicalDeleteByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId,
@Param("status") Integer status);

int logicalDeleteByIds(@Param("idList") List<Integer> idList, @Param("status") Integer status);

/**
* Logical delete stream source by agentIp, change status at same time.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,32 +391,20 @@
</where>
</select>
<select id="selectDeletedGroupIdsWithTimeBefore" resultType="java.lang.String">
<bind name="_isInlongService" value="LoginUser.InlongService"/>
select inlong_group_id
from inlong_group
<where>
modify_time &lt;= #{timeBefore, jdbcType=TIMESTAMP}
<if test="_isInlongService == false">
and tenant = #{tenant,jdbcType=VARCHAR}
</if>
</where>
where modify_time &lt;= #{timeBefore, jdbcType=TIMESTAMP}
group by inlong_group_id
having min(is_deleted) > 0
limit #{limit, jdbcType=INTEGER}
</select>
<select id="selectDeletedGroupIdsWithTimeAfter" resultType="java.lang.String">
<bind name="_isInlongService" value="LoginUser.InlongService"/>
select inlong_group_id
from inlong_group
<where>
modify_time >= #{timeAfter, jdbcType=TIMESTAMP}
<if test="_isInlongService == false">
and tenant = #{tenant,jdbcType=VARCHAR}
</if>
</where>
where modify_time >= #{timeAfter, jdbcType=TIMESTAMP}
group by inlong_group_id
having min(is_deleted) > 0
limit #{limit, jdbcType=INTEGER}
limit #{limit, jdbcType=INTEGER}
</select>

<update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.InlongGroupEntity">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,18 +360,6 @@
and inlong_cluster_name = #{clusterName, jdbcType=VARCHAR}
</where>
</select>
<update id="updateByRelatedId">
update stream_source
<set>
is_deleted = id,
previous_status = status,
status = #{status, jdbcType=INTEGER},
version = version + 1
</set>
where is_deleted = 0
and inlong_group_id = #{groupId, jdbcType=VARCHAR}
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</update>

<update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
update stream_source
Expand Down Expand Up @@ -551,7 +539,6 @@
and status not in (101, 102, 104, 105, 110)
</where>
</update>

<update id="updateStatusByDeleted">
update stream_source
<set>
Expand All @@ -563,6 +550,33 @@
and status not in (99, 201, 301)
</where>
</update>

<update id="logicalDeleteByRelatedId">
update stream_source
<set>
is_deleted = id,
previous_status = status,
status = #{status, jdbcType=INTEGER},
version = version + 1
</set>
where is_deleted = 0
and inlong_group_id = #{groupId, jdbcType=VARCHAR}
and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</update>

<update id="logicalDeleteByIds">
update stream_source
<set>
is_deleted = id,
previous_status = status,
status = #{status, jdbcType=INTEGER},
version = version + 1
</set>
where id in
<foreach item="item" index="index" collection="idList" open="(" close=")" separator=",">
#{item}
</foreach>
</update>
<update id="logicalDeleteByAgentIp">
update stream_source
<set>
Expand All @@ -577,6 +591,7 @@
and status = #{targetStatus, jdbcType=INTEGER}
</if>
</update>

<delete id="deleteByRelatedId">
delete
from stream_source
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,8 @@ public Boolean forceDelete(String groupId, String streamId, String operator) {
Preconditions.expectNotBlank(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY);
Preconditions.expectNotBlank(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY);

int sourceCount = sourceMapper.updateByRelatedId(groupId, streamId, SourceStatus.TO_BE_ISSUED_DELETE.getCode());
int sourceCount = sourceMapper.logicalDeleteByRelatedId(groupId, streamId,
SourceStatus.TO_BE_ISSUED_DELETE.getCode());
int fieldCount = sourceFieldMapper.updateByRelatedId(groupId, streamId);
LOGGER.info("success to force delete source for groupId={} and streamId={} by user={},"
+ " update {} sources and {} fields", groupId, streamId, operator, sourceCount, fieldCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.service.group.coordinator.Coordinator;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -35,67 +34,76 @@
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.TimeUnit;

/**
* Stop all stream source which is running when group is deleted
* Delete all stream source which is running but its inlong group was deleted.
*/
@Slf4j
@Service
public class DelGroupCoordinatorTask extends TimerTask implements Coordinator, InitializingBean {
public class DeleteStreamSourceTask extends TimerTask implements InitializingBean {

private static final int INITIAL_DELAY = 300;
private static final int INTERVAL = 1800;
private static final int INITIAL_DELAY_MINUTES = 5;
private static final int INTERVAL_MINUTES = 60;

@Value("${group.compromise.batchSize:100}")
@Value("${group.deleted.batchSize:100}")
private Integer batchSize;
@Value("${group.deleted.latest.hours:10}")
private Integer latestHours;

@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
private StreamSourceEntityMapper sourceMapper;

@Override
public void afterPropertiesSet() throws Exception {
log.info("start delete group compromise task");
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("group-compromise"
+ "-%s").build());
executor.scheduleWithFixedDelay(this, INITIAL_DELAY, INTERVAL, TimeUnit.SECONDS);
public void afterPropertiesSet() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("inlong-group-delete-%s").build();
ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, threadFactory, new AbortPolicy());
executor.scheduleWithFixedDelay(this, INITIAL_DELAY_MINUTES, INTERVAL_MINUTES, TimeUnit.MINUTES);

log.info("success to start the delete stream source task");
}

@Override
public void run() {
LocalDateTime now = LocalDateTime.now();
LocalDateTime twoHoursAgo = now.minusHours(2).truncatedTo(ChronoUnit.HOURS);
Date modifyTime = Date.from(twoHoursAgo.atZone(ZoneId.systemDefault()).toInstant());
LocalDateTime currentTime = LocalDateTime.now();
LocalDateTime latestTime = currentTime.minusHours(latestHours).truncatedTo(ChronoUnit.HOURS);
Date modifyTime = Date.from(latestTime.atZone(ZoneId.systemDefault()).toInstant());

List<String> groupIds = groupMapper.selectDeletedGroupIdsWithTimeAfter(modifyTime, batchSize);
if (CollectionUtils.isEmpty(groupIds)) {
return;
}
for (String groupId : groupIds) {
coordinate(groupId);
}

deleteSources(groupIds);
}

@Override
public void coordinate(String inlongGroupId) {
List<StreamSourceEntity> sourceList = sourceMapper.selectByRelatedId(inlongGroupId, null, null);
private void deleteSources(List<String> inlongGroupIds) {
List<StreamSourceEntity> sourceList = sourceMapper.selectByGroupIds(inlongGroupIds);
if (CollectionUtils.isEmpty(sourceList)) {
return;
}

List<Integer> idList = new ArrayList<>();
for (StreamSourceEntity source : sourceList) {
if (SourceStatus.SOURCE_NORMAL.getCode().equals(source.getStatus()) && StringUtils.isNotBlank(
source.getInlongClusterNodeGroup())) {
source.setPreviousStatus(source.getStatus());
source.setStatus(SourceStatus.TO_BE_ISSUED_DELETE.getCode());
source.setIsDeleted(source.getId());
sourceMapper.updateByPrimaryKey(source);
if (SourceStatus.SOURCE_NORMAL.getCode().equals(source.getStatus())
&& StringUtils.isNotBlank(source.getInlongClusterNodeGroup())) {
idList.add(source.getId());
}
}

if (CollectionUtils.isNotEmpty(idList)) {
sourceMapper.logicalDeleteByIds(idList, SourceStatus.TO_BE_ISSUED_DELETE.getCode());
log.info("success to delete stream source with id in {}", idList);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,7 @@ source.update.interval=60
source.cleansing.enabled=false
source.cleansing.interval=600


# Select the InlongGroupIds whose latest modification time is within how many hours, the default is 10 hours
group.deleted.latest.hours=10
# The maximum size when querying InlongGroupIds in batches, those InlongGroupIds will be used to delete the related StreamSources.
group.deleted.batchSize=100
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,7 @@ source.update.interval=60
source.cleansing.enabled=false
source.cleansing.interval=600

# Group batch size to compromise in one period
group.compromise.batchSize=100
# Select the InlongGroupIds whose latest modification time is within how many hours, the default is 10 hours
group.deleted.latest.hours=10
# The maximum size when querying InlongGroupIds in batches, those InlongGroupIds will be used to delete the related StreamSources.
group.deleted.batchSize=100
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,8 @@ source.update.interval=60
# If turned on, tasks in the incorrect state are periodically deleted
source.cleansing.enabled=false
source.cleansing.interval=600

# Select the InlongGroupIds whose latest modification time is within how many hours, the default is 10 hours
group.deleted.latest.hours=10
# The maximum size when querying InlongGroupIds in batches, those InlongGroupIds will be used to delete the related StreamSources.
group.deleted.batchSize=100

0 comments on commit 895f42c

Please sign in to comment.