Skip to content

Commit

Permalink
[bugfix][zeta] Fixed multi-table job data loss and latency issues (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
ic4y committed Jul 6, 2023
1 parent 67a4a44 commit 0fa45a7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("MagicNumber")
@Slf4j
Expand Down Expand Up @@ -113,43 +111,6 @@ public void close() throws IOException {
}
}

public CompletableFuture<Boolean> registryScheduleFlushTask(
ScheduledExecutorService scheduledExecutorService) {
// todo Register when the job started, Unload at the end(pause/cancel/crash) of the job
CompletableFuture<Boolean> completedFuture = new CompletableFuture();
Runnable scheduleFlushTask =
new Runnable() {
@Override
public void run() {
if (!prepareClose
&& shuffleBufferSize > 0
&& System.currentTimeMillis() - lastModify
> shuffleBatchFlushInterval) {

try {
shuffleFlush();
} catch (Exception e) {
log.error("Execute schedule task error.", e);
}
}

// submit next task
if (!prepareClose) {
Runnable nextScheduleFlushTask = this;
scheduledExecutorService.schedule(
nextScheduleFlushTask,
shuffleBatchFlushInterval,
TimeUnit.MILLISECONDS);
} else {
completedFuture.complete(true);
}
}
};
scheduledExecutorService.schedule(
scheduleFlushTask, shuffleBatchFlushInterval, TimeUnit.MILLISECONDS);
return completedFuture;
}

private synchronized void shuffleItem(Record<?> record) {
String shuffleKey = shuffleStrategy.createShuffleKey(record, pipelineId, taskIndex);
shuffleBuffer.computeIfAbsent(shuffleKey, key -> new LinkedList<>()).add(record);
Expand All @@ -160,8 +121,6 @@ private synchronized void shuffleItem(Record<?> record) {
&& System.currentTimeMillis() - lastModify > shuffleBatchFlushInterval)) {
shuffleFlush();
}

lastModify = System.currentTimeMillis();
}

private synchronized void shuffleFlush() {
Expand All @@ -185,5 +144,6 @@ private synchronized void shuffleFlush() {
shuffleQueueBatch.clear();
}
shuffleBufferSize = 0;
lastModify = System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle
private final ShuffleAction shuffleAction;
private final int shuffleBatchSize;
private final IQueue<Record<?>>[] shuffles;
private List<Record<?>> unsentBuffer;
private Map<Integer, List<Record<?>>> unsentBufferMap = new HashMap<>();
private final Map<Integer, Barrier> alignedBarriers = new HashMap<>();
private long currentCheckpointId = Long.MAX_VALUE;
private int alignedBarriersCounter = 0;
Expand Down Expand Up @@ -71,6 +71,8 @@ public void collect(Collector<Record<?>> collector) throws Exception {

for (int i = 0; i < shuffles.length; i++) {
IQueue<Record<?>> shuffleQueue = shuffles[i];
List<Record<?>> unsentBuffer =
unsentBufferMap.computeIfAbsent(i, k -> new LinkedList<>());
if (shuffleQueue.size() == 0) {
emptyShuffleQueueCount++;
continue;
Expand All @@ -84,9 +86,9 @@ public void collect(Collector<Record<?>> collector) throws Exception {
List<Record<?>> shuffleBatch = new LinkedList<>();
if (alignedBarriersCounter > 0) {
shuffleBatch.add(shuffleQueue.take());
} else if (unsentBuffer != null && !unsentBuffer.isEmpty()) {
shuffleBatch = unsentBuffer;
unsentBuffer = null;
} else if (!unsentBuffer.isEmpty()) {
shuffleBatch.addAll(unsentBuffer);
unsentBuffer.clear();
}

shuffleQueue.drainTo(shuffleBatch, shuffleBatchSize);
Expand Down Expand Up @@ -121,9 +123,8 @@ public void collect(Collector<Record<?>> collector) throws Exception {
}

if (recordIndex + 1 < shuffleBatch.size()) {
unsentBuffer =
new LinkedList<>(
shuffleBatch.subList(recordIndex + 1, shuffleBatch.size()));
unsentBuffer.addAll(
shuffleBatch.subList(recordIndex + 1, shuffleBatch.size()));
}
break;
} else {
Expand Down

0 comments on commit 0fa45a7

Please sign in to comment.