Skip to content

Commit

Permalink
[Feature] Support flow control in zeta (apache#309)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Sep 1, 2023
1 parent 40a2c67 commit 81601fd
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ public interface EnvCommonOptions {
.withDescription(
"The interval (in milliseconds) between two consecutive checkpoints.");

Option<Integer> READ_LIMIT_ROW_PER_SECOND =
Options.key("read_limit.rows_per_second")
.intType()
.noDefaultValue()
.withDescription(
"The each parallelism row limit per second for read data from source.");

Option<Integer> READ_LIMIT_BYTES_PER_SECOND =
Options.key("read_limit.bytes_per_second")
.intType()
.noDefaultValue()
.withDescription(
"The each parallelism bytes limit per second for read data from source.");
Option<Long> CHECKPOINT_TIMEOUT =
Options.key("checkpoint.timeout")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public static OptionRule getEnvOptionRules() {
EnvCommonOptions.JARS,
EnvCommonOptions.CHECKPOINT_INTERVAL,
EnvCommonOptions.CHECKPOINT_TIMEOUT,
EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND,
EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
EnvCommonOptions.CUSTOM_PARAMETERS)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public final class SeaTunnelRow implements Serializable {
/** The array to store the actual internal format values. */
private final Object[] fields;

private volatile long size;
private volatile int size;

public SeaTunnelRow(int arity) {
this.fields = new Object[arity];
Expand Down Expand Up @@ -102,7 +102,7 @@ public boolean isNullAt(int pos) {

public long getBytesSize(SeaTunnelRowType rowType) {
if (size == 0) {
long s = 0;
int s = 0;
for (int i = 0; i < fields.length; i++) {
s += getBytesForValue(fields[i], rowType.getFieldType(i));
}
Expand Down Expand Up @@ -194,9 +194,9 @@ private int getBytesForArray(Object v, BasicType<?> dataType) {
}
}

public long getBytesSize() {
public int getBytesSize() {
if (size == 0) {
long s = 0;
int s = 0;
for (int i = 0; i < fields.length; i++) {
s += getBytesForValue(fields[i]);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.core.starter.flowcontrol;

import org.apache.seatunnel.shade.com.google.common.util.concurrent.RateLimiter;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;

public class FlowControlGate {

private final RateLimiter bytesRateLimiter;
private final RateLimiter countRateLimiter;

private FlowControlGate(FlowControlStrategy flowControlStrategy) {
this.bytesRateLimiter = RateLimiter.create(flowControlStrategy.getBytesPerSecond());
this.countRateLimiter = RateLimiter.create(flowControlStrategy.getCountPreSecond());
}

public void audit(SeaTunnelRow row) {
bytesRateLimiter.acquire(row.getBytesSize());
countRateLimiter.acquire();
}

public static FlowControlGate create(FlowControlStrategy flowControlStrategy) {
return new FlowControlGate(flowControlStrategy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.core.starter.flowcontrol;

import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class FlowControlStrategy {

int bytesPerSecond;
int countPreSecond;

public FlowControlStrategy(int bytesPerSecond, int countPreSecond) {
if (bytesPerSecond <= 0 || countPreSecond <= 0) {
throw new IllegalArgumentException(
"bytesPerSecond and countPreSecond must be positive");
}
this.bytesPerSecond = bytesPerSecond;
this.countPreSecond = countPreSecond;
}

public static FlowControlStrategy of(int bytesPerSecond, int countPreSecond) {
return new FlowControlStrategy(bytesPerSecond, countPreSecond);
}

public static FlowControlStrategy ofBytes(int bytesPerSecond) {
return new FlowControlStrategy(bytesPerSecond, Integer.MAX_VALUE);
}

public static FlowControlStrategy ofCount(int countPreSecond) {
return new FlowControlStrategy(Integer.MAX_VALUE, countPreSecond);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.apache.seatunnel.core.starter.flowcontrol;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FlowControlGateTest {

private static final int rowSize = 181;

@Test
public void testWithBytes() {
Clock clock = Clock.systemDefaultZone();
FlowControlGate flowControlGate = FlowControlGate.create(FlowControlStrategy.ofBytes(100));
List<SeaTunnelRow> rows = getRows(10);
long start = clock.millis();
for (SeaTunnelRow row : rows) {
flowControlGate.audit(row);
}
long end = clock.millis();
long useTime = rowSize * 10 / 100 * 1000;

Assertions.assertTrue(end - start > useTime * 0.8 && end - start < useTime * 1.2);
}

@Test
public void testWithCount() {
Clock clock = Clock.systemDefaultZone();
FlowControlGate flowControlGate = FlowControlGate.create(FlowControlStrategy.ofCount(2));
List<SeaTunnelRow> rows = getRows(10);
long start = clock.millis();
for (SeaTunnelRow row : rows) {
flowControlGate.audit(row);
}
long end = clock.millis();
long useTime = 10 / 2 * 1000;

Assertions.assertTrue(end - start > useTime * 0.8 && end - start < useTime * 1.2);
}

@Test
public void testWithBytesAndCount() {
Clock clock = Clock.systemDefaultZone();
FlowControlGate flowControlGate = FlowControlGate.create(FlowControlStrategy.of(100, 2));
List<SeaTunnelRow> rows = getRows(10);
long start = clock.millis();
for (SeaTunnelRow row : rows) {
flowControlGate.audit(row);
}
long end = clock.millis();
long useTime = rowSize * 10 / 100 * 1000;

Assertions.assertTrue(end - start > useTime * 0.8 && end - start < useTime * 1.2);
}

/** return row list with size, each row size is 181 */
private List<SeaTunnelRow> getRows(int size) {
Map<String, Object> map = new HashMap<>();
map.put(
"key1",
new SeaTunnelRow(
new Object[] {
1, "test", 1L, new BigDecimal("3333.333"),
}));
map.put(
"key2",
new SeaTunnelRow(
new Object[] {
1, "test", 1L, new BigDecimal("3333.333"),
}));

List<SeaTunnelRow> rows = new ArrayList<>();
for (int i = 0; i < size; i++) {
rows.add(
new SeaTunnelRow(
new Object[] {
1,
"test",
1L,
map,
new BigDecimal("3333.333"),
new String[] {"test2", "test", "3333.333"}
}));
}
return rows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,10 @@ private List<PhysicalVertex> getSourceTask(
(PhysicalExecutionFlow<
SourceAction,
SourceConfig>)
f);
f,
jobImmutableInformation
.getJobConfig()
.getEnvOptions());
} else {
return new TransformSeaTunnelTask(
jobImmutableInformation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -56,31 +58,39 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {
private final Meter sourceReceivedBytesPerSeconds;

private volatile boolean emptyThisPollNext;
private FlowControlGate flowControlGate;

public SeaTunnelSourceCollector(
Object checkpointLock,
List<OneInputFlowLifeCycle<Record<?>>> outputs,
MetricsContext metricsContext) {
MetricsContext metricsContext,
FlowControlStrategy flowControlStrategy) {
this.checkpointLock = checkpointLock;
this.outputs = outputs;
sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
sourceReceivedBytes = metricsContext.counter(SOURCE_RECEIVED_BYTES);
sourceReceivedBytesPerSeconds = metricsContext.meter(SOURCE_RECEIVED_BYTES_PER_SECONDS);
if (flowControlStrategy != null) {
flowControlGate = FlowControlGate.create(flowControlStrategy);
}
}

@Override
public void collect(T row) {
try {
sendRecordToNext(new Record<>(row));
emptyThisPollNext = false;
sourceReceivedCount.inc();
sourceReceivedQPS.markEvent();
if (row instanceof SeaTunnelRow) {
long size = ((SeaTunnelRow) row).getBytesSize();
sourceReceivedBytes.inc(size);
sourceReceivedBytesPerSeconds.markEvent(size);
if (flowControlGate != null) {
flowControlGate.audit((SeaTunnelRow) row);
}
}
sendRecordToNext(new Record<>(row));
emptyThisPollNext = false;
sourceReceivedCount.inc();
sourceReceivedQPS.markEvent();
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit 81601fd

Please sign in to comment.