Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-v2][Mongodb]Optimize reading logic #5001

Merged
merged 1 commit into from
Jul 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import org.bson.BsonDocument;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -38,6 +37,8 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/** MongoReader reads MongoDB by splits (queries). */
@Slf4j
Expand Down Expand Up @@ -70,7 +71,7 @@ public MongodbReader(
}

@Override
public void open() throws Exception {
public void open() {
if (cursor != null) {
cursor.close();
}
Expand All @@ -87,26 +88,18 @@ public void close() {
public void pollNext(Collector<SeaTunnelRow> output) {
synchronized (output.getCheckpointLock()) {
MongoSplit currentSplit = pendingSplits.poll();
if (null != currentSplit) {
if (currentSplit != null) {
if (cursor != null) {
// current split is in-progress
return;
}
log.info("Prepared to read split {}", currentSplit.splitId());
FindIterable<BsonDocument> rs =
clientProvider
.getDefaultCollection()
.find(currentSplit.getQuery())
.projection(currentSplit.getProjection())
.batchSize(readOptions.getFetchSize())
.noCursorTimeout(readOptions.isNoCursorTimeout())
.maxTime(readOptions.getMaxTimeMS(), TimeUnit.MINUTES);
cursor = rs.iterator();
while (cursor.hasNext()) {
SeaTunnelRow deserialize = deserializer.deserialize(cursor.next());
output.collect(deserialize);
try {
getCursor(currentSplit);
cursorToStream().map(deserializer::deserialize).forEach(output::collect);
} finally {
closeCurrentSplit();
}
closeCurrentSplit();
}
if (noMoreSplit && pendingSplits.isEmpty()) {
// signal to the source that we have reached the end of the data.
Expand All @@ -116,6 +109,23 @@ public void pollNext(Collector<SeaTunnelRow> output) {
}
}

private void getCursor(MongoSplit split) {
cursor =
clientProvider
.getDefaultCollection()
.find(split.getQuery())
.projection(split.getProjection())
.batchSize(readOptions.getFetchSize())
.noCursorTimeout(readOptions.isNoCursorTimeout())
.maxTime(readOptions.getMaxTimeMS(), TimeUnit.MINUTES)
.iterator();
}

private Stream<BsonDocument> cursorToStream() {
Iterable<BsonDocument> iterable = () -> cursor;
return StreamSupport.stream(iterable.spliterator(), false);
}

@Override
public List<MongoSplit> snapshotState(long checkpointId) {
return new ArrayList<>(pendingSplits);
Expand Down