-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improve][Connector][PulsarSource]Improve pulsar deserialization #3990
Conversation
.build(); | ||
} else { | ||
// TODO: use format SPI | ||
throw new SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't throw UNSUPPORTED_DATA_TYPE, CommonErrorCode.UNSUPPORTED_OPERATION maybe better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
Please add change to change log in pulsar doc |
done. |
Please add schema / format config into e2e test case. |
…ator-seatunnel into pulsar-source-desrialize
@Hisoka-X add complete. |
Hi, please fix code style. |
…ator-seatunnel into pulsar-source-desrialize
done. |
@Hisoka-X @ashulin @TyrantLucifer PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the ci error.
@ashulin PTAL. |
Hello everyone, who has time to help review this pr. thanks. |
@TyrantLucifer @Hisoka-X @ashulin PTAL , thanks. |
Hello everyone, who has time to help review this pr. thanks. @EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin |
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.testcontainers</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted.
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
<version>4.12</version> | ||
<scope>compile</scope> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted.
<dependency> | ||
<groupId>org.testcontainers</groupId> | ||
<artifactId>pulsar</artifactId> | ||
<version>1.17.6</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use properties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
<dependency> | ||
<groupId>org.testcontainers</groupId> | ||
<artifactId>pulsar</artifactId> | ||
<version>1.17.6</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use properties
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>jul-to-slf4j</artifactId> | ||
<version>${slf4j.version}</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is needed when the test is running, and this log class is missing when ci.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @hailin0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hailin0 PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hailin0 PTAL. thanks.
This pr time is quite long, there are other pr waiting for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin Please approve CI check,thanks. |
Check did not find a failure, why is it prompted 'Some checks were not successful', can you retry the check. |
Some check timout and be canceled. Never mind, ci already pass. |
@EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin PTAL,thanks. |
hi,all,Who has time to help review this pr, it takes a long time, I can add e2e test to #4382 PulsarSink after completion. |
@EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin PTAL, thanks. |
please approve ci. thanks. |
@Hisoka-X @TyrantLucifer @hailin0 please approve ci. thanks. |
Sorry for late response, done |
@Hisoka-X @TyrantLucifer @ashulin @hailin0 @EricJoy2048 PTAL, thanks. |
done. |
@Hisoka-X @TyrantLucifer @ashulin @hailin0 @EricJoy2048 PTAL, thanks. |
Hi all ,Who has time to review , it's been a long time, #4382 pulsar sink is waiting for this pr. |
public static final Option<String> FORMAT = | ||
Options.key("format") | ||
.stringType() | ||
.noDefaultValue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No defaule value? Or the default value is json
? I suggest define the default value in option. public static final String DEFAULT_FORMAT = "json";
is not a good way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been a little busy recently, I'll update it in the next few days.
public static final Option<String> FIELD_DELIMITER = | ||
Options.key("field_delimiter") | ||
.stringType() | ||
.noDefaultValue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
} | ||
|
||
sink { | ||
Console {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need check the data read from the source. You can add a LocalFile Sink Or Pulsar Sink and then read the data from the target file(or topic when you use Pulsar Sink). Check every row and column.
#4111 This ability has been merged, I will close this pr. |
Purpose of this pull request
Improve pulsar deserialization
1.Add format and field_delimiter configuration.
2.Rich deserialization methods.
Check list
New License Guide
release-note
.