-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: clean up format pojos in serialized model #3970
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rodesai LGTM
Formats.of( | ||
outputNode.getKsqlTopic().getKeyFormat(), | ||
outputNode.getKsqlTopic().getValueFormat(), | ||
outputNode.getSerdeOptions() | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate code, consider pulling out to one call above the if/else.
new KsqlTopic( | ||
createStream.getKafkaTopicName(), | ||
KeyFormat.of(createStream.getFormats().getKeyFormat(), createStream.getWindowInfo()), | ||
ValueFormat.of(createStream.getFormats().getValueFormat()) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate code: pull out to a method that takes a CreateSourceCommand
and returns KsqlTopic
|
||
@Override | ||
public int hashCode() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: blank line.
public interface Formats { | ||
KeyFormat getKeyFormat(); | ||
public final class Formats { | ||
// TODO: wrap in a type for type safety |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outstanding todo...
Adding checkstyle to catch this: confluentinc/common#229
@JsonProperty(value = "options", required = true) final Set<SerdeOption> options) { | ||
this.keyFormat = Objects.requireNonNull(keyFormat, "keyFormat"); | ||
this.valueFormat = Objects.requireNonNull(valueFormat, "valueFormat"); | ||
this.options = Objects.requireNonNull(options, "options"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take immutable copy, then you can mark type as immutable.
(Adding test for this: #3978)
This patch cleans up the format pojos in the serialized model, and drops KsqlTopic from the serialized plan. For plan steps that require kafka io (repartitions, changelogs, etc), formats are described by key and value FormatInfo instances, which just contain the serialzation format (json/avro/csv) and required params (schema name, delimiter). The windowing information is now computed either at the source or by the aggregate nodes, and is passed along via the key serde factory when building the streams app. The ddl commands now directly store the information needed about the kafka topic: name, formats, and windowing info for windowed sources.
5bc2109
to
4ebd6bb
Compare
This patch cleans up the format pojos in the serialized model, and
drops KsqlTopic from the serialized plan.
For plan steps that require kafka io (repartitions, changelogs, etc),
formats are described by key and value FormatInfo instances, which
just contain the serialzation format (json/avro/csv) and required
params (schema name, delimiter).
The windowing information is now computed either at the source or
by the aggregate nodes, and is passed along via the key serde
factory when building the streams app.
The ddl commands now directly store the information needed about
the kafka topic: name, formats, and windowing info for windowed
sources.
This is part of an effort to get the plan schemas to the form proposed
here: #3969