Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector][PulsarSource]Improve pulsar deserialization #3990

Closed
wants to merge 22 commits into from

Conversation

lightzhao
Copy link
Contributor

Purpose of this pull request

Improve pulsar deserialization
1.Add format and field_delimiter configuration.
2.Rich deserialization methods.

Check list

.build();
} else {
// TODO: use format SPI
throw new SeaTunnelJsonFormatException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't throw UNSUPPORTED_DATA_TYPE, CommonErrorCode.UNSUPPORTED_OPERATION maybe better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@Hisoka-X
Copy link
Member

Hisoka-X commented Jan 24, 2023

Please add change to change log in pulsar doc

@lightzhao
Copy link
Contributor Author

Please add change to change log in pulsar doc

done.

@Hisoka-X
Copy link
Member

Please add schema / format config into e2e test case.

@lightzhao
Copy link
Contributor Author

Please add schema / format config into e2e test case.

@Hisoka-X add complete.

@lightzhao lightzhao requested a review from Hisoka-X February 16, 2023 11:05
@Hisoka-X
Copy link
Member

Hi, please fix code style.

@lightzhao
Copy link
Contributor Author

Hi, please fix code style.

done.

@Hisoka-X Hisoka-X requested a review from ashulin February 18, 2023 02:16
@lightzhao
Copy link
Contributor Author

@Hisoka-X @ashulin @TyrantLucifer PTAL.

@lightzhao
Copy link
Contributor Author

Copy link
Member

@TyrantLucifer TyrantLucifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the ci error.

@lightzhao
Copy link
Contributor Author

Please fix the ci error.
I don't know why these two ci errors occur occasionally. My local compilation is all right.

image

@lightzhao
Copy link
Contributor Author

lightzhao commented Mar 3, 2023

@ashulin PTAL.

@lightzhao lightzhao requested review from TyrantLucifer and removed request for Hisoka-X and ashulin March 17, 2023 01:23
@lightzhao
Copy link
Contributor Author

Hello everyone, who has time to help review this pr. thanks.

@lightzhao lightzhao requested review from Hisoka-X and removed request for ashulin March 30, 2023 03:09
@lightzhao
Copy link
Contributor Author

@TyrantLucifer @Hisoka-X @ashulin PTAL , thanks.

@lightzhao lightzhao requested a review from TyrantLucifer April 3, 2023 02:40
@lightzhao
Copy link
Contributor Author

Hello everyone, who has time to help review this pr. thanks. @EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin

@lightzhao lightzhao requested a review from ashulin April 4, 2023 02:12
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted.

Comment on lines 89 to 93
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>compile</scope>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleted.

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>1.17.6</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use properties

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
<version>1.17.6</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use properties

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
<version>${slf4j.version}</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is needed when the test is running, and this log class is missing when ci.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hailin0 PTAL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hailin0 PTAL. thanks.
This pr time is quite long, there are other pr waiting for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lightzhao
Copy link
Contributor Author

@EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin Please approve CI check,thanks.

@lightzhao
Copy link
Contributor Author

Check did not find a failure, why is it prompted 'Some checks were not successful', can you retry the check.

@Hisoka-X
Copy link
Member

Hisoka-X commented Apr 6, 2023

Check did not find a failure, why is it prompted 'Some checks were not successful', can you retry the check.

Some check timout and be canceled. Never mind, ci already pass.

@lightzhao
Copy link
Contributor Author

@EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin PTAL,thanks.

@lightzhao
Copy link
Contributor Author

hi,all,Who has time to help review this pr, it takes a long time, I can add e2e test to #4382 PulsarSink after completion.

@lightzhao
Copy link
Contributor Author

@EricJoy2048 @TyrantLucifer @Hisoka-X @ashulin PTAL, thanks.

@lightzhao
Copy link
Contributor Author

Is there a problem with e2e?
image

@lightzhao
Copy link
Contributor Author

please approve ci. thanks.

@lightzhao
Copy link
Contributor Author

@Hisoka-X @TyrantLucifer @hailin0 please approve ci. thanks.

@Hisoka-X
Copy link
Member

Hisoka-X commented May 6, 2023

@Hisoka-X @TyrantLucifer @hailin0 please approve ci. thanks.

Sorry for late response, done

@lightzhao
Copy link
Contributor Author

@Hisoka-X @TyrantLucifer @ashulin @hailin0 @EricJoy2048 PTAL, thanks.

@lightzhao
Copy link
Contributor Author

Please ensure that your pull request's CI/CD process passes. Thank you.

done.

@lightzhao
Copy link
Contributor Author

@Hisoka-X @TyrantLucifer @ashulin @hailin0 @EricJoy2048 PTAL, thanks.

@lightzhao
Copy link
Contributor Author

Hi all ,Who has time to review , it's been a long time, #4382 pulsar sink is waiting for this pr.

public static final Option<String> FORMAT =
Options.key("format")
.stringType()
.noDefaultValue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No defaule value? Or the default value is json? I suggest define the default value in option. public static final String DEFAULT_FORMAT = "json"; is not a good way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been a little busy recently, I'll update it in the next few days.

public static final Option<String> FIELD_DELIMITER =
Options.key("field_delimiter")
.stringType()
.noDefaultValue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

}

sink {
Console {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need check the data read from the source. You can add a LocalFile Sink Or Pulsar Sink and then read the data from the target file(or topic when you use Pulsar Sink). Check every row and column.

@lightzhao
Copy link
Contributor Author

#4111 This ability has been merged, I will close this pr.

@lightzhao lightzhao closed this May 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants