From 98a1656c259744afb003bf9b4230ec20122b6701 Mon Sep 17 00:00:00 2001 From: Eric Date: Tue, 9 Jul 2024 20:36:20 +0800 Subject: [PATCH 1/3] Add EventService --- .../seatunnel/engine/server/EventService.java | 100 ++++++++++++++++++ .../engine/server/SeaTunnelServer.java | 10 +- .../engine/server/TaskExecutionService.java | 54 +--------- .../engine/server/master/JobMaster.java | 9 ++ 4 files changed, 123 insertions(+), 50 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java new file mode 100644 index 00000000000..0c7b654b216 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/EventService.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server; + +import org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.seatunnel.api.event.Event; +import org.apache.seatunnel.common.utils.RetryUtils; +import org.apache.seatunnel.engine.server.event.JobEventReportOperation; +import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; + +import com.hazelcast.spi.impl.NodeEngineImpl; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +public class EventService { + private final BlockingQueue eventBuffer; + + private ExecutorService eventForwardService; + + private final NodeEngineImpl nodeEngine; + + public EventService(NodeEngineImpl nodeEngine) { + eventBuffer = new ArrayBlockingQueue<>(2048); + initEventForwardService(); + this.nodeEngine = nodeEngine; + } + + private void initEventForwardService() { + eventForwardService = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build()); + eventForwardService.submit( + () -> { + List events = new ArrayList<>(); + RetryUtils.RetryMaterial retryMaterial = + new RetryUtils.RetryMaterial(2, true, e -> true); + while (!Thread.currentThread().isInterrupted()) { + try { + events.clear(); + + Event first = eventBuffer.take(); + events.add(first); + + eventBuffer.drainTo(events, 500); + JobEventReportOperation operation = new JobEventReportOperation(events); + + RetryUtils.retryWithException( + () -> + NodeEngineUtil.sendOperationToMasterNode( + nodeEngine, operation) + .join(), + retryMaterial); + + log.debug("Event forward success, events " + events.size()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.info("Event forward thread interrupted"); + } catch (Throwable t) { + log.warn("Event forward failed, discard events " + events.size(), t); + } + } + }); + } + + public void reportEvent(Event e) { + while (!eventBuffer.offer(e)) { + eventBuffer.poll(); + log.warn("Event buffer is full, discard the oldest event"); + } + } + + public void shutdownNow() { + if (eventForwardService != null) { + eventForwardService.shutdownNow(); + } + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java index 765869fd030..831b5f5ea5f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java @@ -76,6 +76,8 @@ public class SeaTunnelServer private volatile boolean isRunning = true; + @Getter private EventService eventService; + public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) { this.liveOperationRegistry = new LiveOperationRegistry(); this.seaTunnelConfig = seaTunnelConfig; @@ -110,6 +112,8 @@ public void init(NodeEngine engine, Properties hzProperties) { new DefaultClassLoaderService( seaTunnelConfig.getEngineConfig().isClassloaderCacheMode()); + eventService = new EventService(nodeEngine); + if (EngineConfig.ClusterRole.MASTER_AND_WORKER.ordinal() == seaTunnelConfig.getEngineConfig().getClusterRole().ordinal()) { startWorker(); @@ -143,7 +147,7 @@ private void startMaster() { private void startWorker() { taskExecutionService = new TaskExecutionService( - classLoaderService, nodeEngine, nodeEngine.getProperties()); + classLoaderService, nodeEngine, nodeEngine.getProperties(), eventService); nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService); taskExecutionService.start(); getSlotService(); @@ -170,6 +174,10 @@ public void shutdown(boolean terminate) { if (coordinatorService != null) { coordinatorService.shutdown(); } + + if (eventService != null) { + eventService.shutdownNow(); + } } @Override diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 19878545edc..beb51ac0820 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.api.common.metrics.MetricTags; import org.apache.seatunnel.api.event.Event; import org.apache.seatunnel.common.utils.ExceptionUtils; -import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.common.utils.StringFormatUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.ConfigProvider; @@ -30,7 +29,6 @@ import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.classloader.ClassLoaderService; import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier; -import org.apache.seatunnel.engine.server.event.JobEventReportOperation; import org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException; import org.apache.seatunnel.engine.server.execution.ExecutionState; import org.apache.seatunnel.engine.server.execution.ProgressState; @@ -49,7 +47,6 @@ import org.apache.seatunnel.engine.server.task.SeaTunnelTask; import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation; import org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation; -import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; import org.apache.commons.collections4.CollectionUtils; @@ -73,7 +70,6 @@ import java.io.IOException; import java.net.URL; -import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -81,7 +77,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -147,13 +142,13 @@ public class TaskExecutionService implements DynamicMetricsProvider { private final ServerConnectorPackageClient serverConnectorPackageClient; - private final BlockingQueue eventBuffer; - private final ExecutorService eventForwardService; + private final EventService eventService; public TaskExecutionService( ClassLoaderService classLoaderService, NodeEngineImpl nodeEngine, - HazelcastProperties properties) { + HazelcastProperties properties, + EventService eventService) { seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); this.hzInstanceName = nodeEngine.getHazelcastInstance().getName(); this.nodeEngine = nodeEngine; @@ -176,42 +171,7 @@ public TaskExecutionService( serverConnectorPackageClient = new ServerConnectorPackageClient(nodeEngine, seaTunnelConfig); - eventBuffer = new ArrayBlockingQueue<>(2048); - eventForwardService = - Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setNameFormat("event-forwarder-%d").build()); - eventForwardService.submit( - () -> { - List events = new ArrayList<>(); - RetryUtils.RetryMaterial retryMaterial = - new RetryUtils.RetryMaterial(2, true, e -> true); - while (!Thread.currentThread().isInterrupted()) { - try { - events.clear(); - - Event first = eventBuffer.take(); - events.add(first); - - eventBuffer.drainTo(events, 500); - JobEventReportOperation operation = new JobEventReportOperation(events); - - RetryUtils.retryWithException( - () -> - NodeEngineUtil.sendOperationToMasterNode( - nodeEngine, operation) - .join(), - retryMaterial); - - logger.fine("Event forward success, events " + events.size()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.info("Event forward thread interrupted"); - } catch (Throwable t) { - logger.warning( - "Event forward failed, discard events " + events.size(), t); - } - } - }); + this.eventService = eventService; } public void start() { @@ -222,7 +182,6 @@ public void shutdown() { isRunning = false; executorService.shutdownNow(); scheduledExecutorService.shutdown(); - eventForwardService.shutdownNow(); } public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation) { @@ -668,10 +627,7 @@ public void printTaskExecutionRuntimeInfo() { } public void reportEvent(Event e) { - while (!eventBuffer.offer(e)) { - eventBuffer.poll(); - logger.warning("Event buffer is full, discard the oldest event"); - } + eventService.reportEvent(e); } private final class BlockingWorker implements Runnable { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 29d8611f139..4c68d29cc4b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -384,6 +384,15 @@ public static void handleSaveMode(SeaTunnelSink sink) { } } + private void reportEventOfSaveMode( + long jobId, TablePath tablePath, int indexOfCount, long startTime, long finishedTime) { + seaTunnelServer + .getEventService() + .reportEvent( + new SaveModeFinishedEvent( + jobId, tablePath, indexOfCount, startTime, finishedTime)); + } + public void handleCheckpointError(long pipelineId, boolean neverRestore) { if (neverRestore) { this.neverNeedRestore(); From f80834bcabfa551daec446758f144e1c92676025 Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 10 Jul 2024 13:40:44 +0800 Subject: [PATCH 2/3] Add EventService --- .../seatunnel/engine/server/TaskExecutionService.java | 1 - .../apache/seatunnel/engine/server/master/JobMaster.java | 9 --------- 2 files changed, 10 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index beb51ac0820..b6bbc6a35b6 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -51,7 +51,6 @@ import org.apache.commons.collections4.CollectionUtils; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.hazelcast.instance.impl.NodeState; import com.hazelcast.internal.metrics.DynamicMetricsProvider; import com.hazelcast.internal.metrics.MetricDescriptor; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 4c68d29cc4b..29d8611f139 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -384,15 +384,6 @@ public static void handleSaveMode(SeaTunnelSink sink) { } } - private void reportEventOfSaveMode( - long jobId, TablePath tablePath, int indexOfCount, long startTime, long finishedTime) { - seaTunnelServer - .getEventService() - .reportEvent( - new SaveModeFinishedEvent( - jobId, tablePath, indexOfCount, startTime, finishedTime)); - } - public void handleCheckpointError(long pipelineId, boolean neverRestore) { if (neverRestore) { this.neverNeedRestore(); From b75894a7500d29a5f3b9d9832726b527e07b02e6 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 5 Sep 2024 17:15:49 +0800 Subject: [PATCH 3/3] update test case --- .../org/apache/seatunnel/engine/server/TaskExecutionService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 0835324e79f..b32dd7c6a97 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -53,7 +53,6 @@ import org.apache.commons.collections4.CollectionUtils; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.hazelcast.core.OperationTimeoutException; import com.hazelcast.instance.impl.NodeState; import com.hazelcast.internal.metrics.DynamicMetricsProvider;