Skip to content

Commit

Permalink
[Improve][Connector-V2][JDBC] Add exactly-once for JDBC source connec…
Browse files Browse the repository at this point in the history
…tor (#3750)

* [Improve][Connector-V2][JDBC] Add exactly-once for JDBC source connector
  • Loading branch information
TaoZex authored Dec 26, 2022
1 parent 861004d commit 5328e9d
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 35 deletions.
3 changes: 3 additions & 0 deletions docs/en/connector-v2/source/InfluxDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,17 @@ the `partition_num` of the InfluxDB when you select
> Tips: Ensure that `upper_bound` minus `lower_bound` is divided `bypartition_num`, otherwise the query results will overlap
### epoch [string]

returned time precision
- Optional values: H, m, s, MS, u, n
- default value: n

### query_timeout_sec [int]

the `query_timeout` of the InfluxDB when you select, in seconds

### connect_timeout_ms [long]

the timeout for connecting to InfluxDB, in milliseconds

### common options
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ e.g. If you use MySQL, should download and copy `mysql-connector-java-xxx.jar` t

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [schema projection](../../concept/connector-v2-features.md)

supports query SQL and can achieve projection effect.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> createEnumerator(

@Override
public SourceSplitEnumerator<JdbcSourceSplit, JdbcSourceState> restoreEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceState checkpointState) throws Exception {
return new JdbcSourceSplitEnumerator(enumeratorContext, jdbcSourceOptions, partitionParameter);
return new JdbcSourceSplitEnumerator(enumeratorContext, jdbcSourceOptions, partitionParameter, checkpointState);
}

private SeaTunnelRowType initTableField(Connection conn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.source;

import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split.JdbcNumericBetweenParametersProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;

Expand All @@ -28,6 +30,8 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -38,16 +42,28 @@ public class JdbcSourceSplitEnumerator implements SourceSplitEnumerator<JdbcSour
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitEnumerator.class);
private final SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext;

private final Map<Integer, Set<JdbcSourceSplit>> pendingSplits;
private final Map<Integer, List<JdbcSourceSplit>> pendingSplits;

private final Object stateLock = new Object();
private volatile boolean shouldEnumerate;

private JdbcSourceOptions jdbcSourceOptions;
private final PartitionParameter partitionParameter;

public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter) {
this(enumeratorContext, jdbcSourceOptions, partitionParameter, null);
}

public JdbcSourceSplitEnumerator(SourceSplitEnumerator.Context<JdbcSourceSplit> enumeratorContext, JdbcSourceOptions jdbcSourceOptions, PartitionParameter partitionParameter, JdbcSourceState sourceState) {
this.enumeratorContext = enumeratorContext;
this.jdbcSourceOptions = jdbcSourceOptions;
this.partitionParameter = partitionParameter;
this.pendingSplits = new HashMap<>();
this.shouldEnumerate = sourceState == null;
if (sourceState != null) {
this.shouldEnumerate = sourceState.isShouldEnumerate();
this.pendingSplits.putAll(sourceState.getPendingSplits());
}
}

@Override
Expand All @@ -57,12 +73,25 @@ public void open() {

@Override
public void run() throws Exception {
discoverySplits();
assignPendingSplits();
Set<Integer> readers = enumeratorContext.registeredReaders();
if (shouldEnumerate) {
Set<JdbcSourceSplit> newSplits = discoverySplits();

synchronized (stateLock) {
addPendingSplit(newSplits);
shouldEnumerate = false;
}

assignSplit(readers);
}

LOG.debug("No more splits to assign." +
" Sending NoMoreSplitsEvent to reader {}.", readers);
readers.forEach(enumeratorContext::signalNoMoreSplits);
}

private void discoverySplits() {
List<JdbcSourceSplit> allSplit = new ArrayList<>();
private Set<JdbcSourceSplit> discoverySplits() {
Set<JdbcSourceSplit> allSplit = new HashSet<>();
LOG.info("Starting to calculate splits.");
if (null != partitionParameter) {
int partitionNumber = partitionParameter.getPartitionNumber() != null ?
Expand All @@ -77,30 +106,7 @@ private void discoverySplits() {
} else {
allSplit.add(new JdbcSourceSplit(null, 0));
}
int numReaders = enumeratorContext.currentParallelism();
for (JdbcSourceSplit split : allSplit) {
int ownerReader = split.splitId % numReaders;
pendingSplits.computeIfAbsent(ownerReader, r -> new HashSet<>())
.add(split);
}
LOG.debug("Assigned {} to {} readers.", allSplit, numReaders);
LOG.info("Calculated splits successfully, the size of splits is {}.", allSplit.size());
}

private void assignPendingSplits() {
// Check if there's any pending splits for given readers
for (int pendingReader : enumeratorContext.registeredReaders()) {
// Remove pending assignment for the reader
final Set<JdbcSourceSplit> pendingAssignmentForReader =
pendingSplits.remove(pendingReader);

if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) {
// Assign pending splits to reader
LOG.info("Assigning splits to readers {}", pendingAssignmentForReader);
enumeratorContext.assignSplit(pendingReader, new ArrayList<>(pendingAssignmentForReader));
}
enumeratorContext.signalNoMoreSplits(pendingReader);
}
return allSplit;
}

@Override
Expand All @@ -110,26 +116,72 @@ public void close() throws IOException {

@Override
public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
LOG.debug("Add back splits {} to JdbcSourceSplitEnumerator.",
splits);
if (!splits.isEmpty()) {
addPendingSplit(splits);
assignSplit(Collections.singletonList(subtaskId));
}
}

private void addPendingSplit(Collection<JdbcSourceSplit> splits) {
int readerCount = enumeratorContext.currentParallelism();
for (JdbcSourceSplit split : splits) {
int ownerReader = getSplitOwner(split.splitId(), readerCount);
LOG.info("Assigning {} to {} reader.", split, ownerReader);
pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList<>())
.add(split);
}
}

private static int getSplitOwner(String tp, int numReaders) {
return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
}

private void assignSplit(Collection<Integer> readers) {
LOG.debug("Assign pendingSplits to readers {}", readers);

for (int reader : readers) {
List<JdbcSourceSplit> assignmentForReader = pendingSplits.remove(reader);
if (assignmentForReader != null && !assignmentForReader.isEmpty()) {
LOG.info("Assign splits {} to reader {}",
assignmentForReader, reader);
try {
enumeratorContext.assignSplit(reader, assignmentForReader);
} catch (Exception e) {
LOG.error("Failed to assign splits {} to reader {}",
assignmentForReader, reader, e);
pendingSplits.put(reader, assignmentForReader);
}
}
}
}

@Override
public int currentUnassignedSplitSize() {
return 0;
return pendingSplits.size();
}

@Override
public void handleSplitRequest(int subtaskId) {

throw new JdbcConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
String.format("Unsupported handleSplitRequest: %d", subtaskId));
}

@Override
public void registerReader(int subtaskId) {
// nothing
LOG.debug("Register reader {} to JdbcSourceSplitEnumerator.",
subtaskId);
if (!pendingSplits.isEmpty()) {
assignSplit(Collections.singletonList(subtaskId));
}
}

@Override
public JdbcSourceState snapshotState(long checkpointId) throws Exception {
return null;
synchronized (stateLock) {
return new JdbcSourceState(shouldEnumerate, pendingSplits);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,18 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.state;

import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceSplit;

import lombok.AllArgsConstructor;
import lombok.Getter;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

@AllArgsConstructor
@Getter
public class JdbcSourceState implements Serializable {
private boolean shouldEnumerate;
private Map<Integer, List<JdbcSourceSplit>> pendingSplits;
}

0 comments on commit 5328e9d

Please sign in to comment.