-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improve][Connector-V2][Doris]Refactor some Doris Sink code as well as support 2pc and cdc #4235
Conversation
826bde8
to
e2349d8
Compare
...doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
Outdated
Show resolved
Hide resolved
...v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
Outdated
Show resolved
Hide resolved
...v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
Outdated
Show resolved
Hide resolved
...or-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
Outdated
Show resolved
Hide resolved
...r-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
Outdated
Show resolved
Hide resolved
...or-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
Outdated
Show resolved
Hide resolved
BTW, It is best to add an adapted Doris version |
It seems that has some confilcts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/** Serializer for DorisWriterState. */ | ||
public class DorisSinkStateSerializer implements Serializer<DorisSinkState> { | ||
@Override | ||
public byte[] serialize(DorisSinkState dorisSinkState) throws IOException { | ||
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
final DataOutputStream out = new DataOutputStream(baos)) { | ||
out.writeUTF(dorisSinkState.getLabelPrefix()); | ||
out.flush(); | ||
return baos.toByteArray(); | ||
} | ||
} | ||
|
||
@Override | ||
public DorisSinkState deserialize(byte[] serialized) throws IOException { | ||
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized); | ||
final DataInputStream in = new DataInputStream(bais)) { | ||
final String labelPrefix = in.readUTF(); | ||
final long checkpointId = in.readLong(); | ||
return new DorisSinkState(labelPrefix, checkpointId); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out.writeUTF(dorisSinkState.getLabelPrefix());
but
final String labelPrefix = in.readUTF();
final long checkpointId = in.readLong();
Not match.
There some problem, you add support for 2pc, but you don't open 2pc in your e2e config. It cause 2pc can't pass with ci. |
Purpose of this pull request
There were some problems with the existing Doris connector, and I refactor some Doris sink code as well as support 2pc and cdc
Due to insufficient workflow test resources, all Doris tests cannot be completed, so I disable the test class first. I have successfully tested it locally. Later, I will streamline the docker image so that the workflow test can be successful
Check list
New License Guide
release-note
.