-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: clean up timestamp extraction policy from physical plan #3936
refactor: clean up timestamp extraction policy from physical plan #3936
Conversation
9a34136
to
b215558
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love this change @rodesai
nits and comments below.
@@ -13,7 +13,7 @@ | |||
* specific language governing permissions and limitations under the License. | |||
*/ | |||
|
|||
package io.confluent.ksql.util.timestamp; | |||
package io.confluent.ksql.execution.timestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't it make more sense for this to belong in the KS specific ksql-streams module, given its dealing with KS types?
Same for the actual extraction policies themselves?
In fact, do we even need the extraction policies now? Can we not just convert the TimestampColumn
straight into a TimestampExtractor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, do we even need the extraction policies now? Can we not just convert the TimestampColumn straight into a TimestampExtractor?
Yeah, I noticed this too. Left it out of this patch though.
@@ -172,7 +171,7 @@ public void setUp() { | |||
ORDERS_SCHEMA, | |||
SerdeOption.none(), | |||
KeyField.of(ColumnRef.withoutSource(ColumnName.of("ORDERTIME"))), | |||
new MetadataTimestampExtractionPolicy(), | |||
Optional.empty(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth having at least one test that has column and testing its formatted correctly?
@@ -83,7 +83,7 @@ public void tearDown() { | |||
schema, | |||
SerdeOption.none(), | |||
KeyField.of(schema.value().get(0).ref()), | |||
new MetadataTimestampExtractionPolicy(), | |||
Optional.empty(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto, probably worth augmenting existing tests to test this works with a extractor policy
"$ref" : "#/definitions/LongColumnTimestampExtractionPolicy" | ||
} ] | ||
"timestampColumn" : { | ||
"$ref" : "#/definitions/TimestampColumn" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
much nicer!
private static TimestampExtractor timestampExtractor( | ||
final KsqlConfig ksqlConfig, | ||
final LogicalSchema sourceSchema, | ||
final Optional<TimestampColumn> timestampColumn) { | ||
final TimestampExtractionPolicy timestampPolicy = TimestampExtractionPolicyFactory.create( | ||
ksqlConfig, | ||
sourceSchema, | ||
timestampColumn | ||
); | ||
final int timestampIndex = timestampColumn.map(TimestampColumn::getColumn) | ||
.map(c -> sourceSchema.valueColumnIndex(c).orElseThrow(IllegalStateException::new)) | ||
.orElse(-1); | ||
return timestampPolicy.create(timestampIndex); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, I think this code can be simplified more. Do we even need our own TimestampExtractionPolicy
any more? Why not just have
final TimestampExtractor extractor = TimestampExtractorFactory.create(ksqlConfig, sourceSchema, timestampColumn);
i.e. get rid of TimestampExtractionPolicy
, rename TimestampExtractionPolicyFactory
-> TimestampExtractorFactory
and move this logic into that class and have it return TimestampExtractor
.
This PR makes the code cleaner, which is awesome. If you make this change it will be cleaner still.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I was thinking the same thing. Left it for a follow-up though, to keep this PR focused.
This patch cleans up the timestamp extraction policy from the physical plan. Instead, we include a pojo called TimestampColumn, that includes the information necessary to construction the timestamp extractor - the column name, and optionally a format string.
b215558
to
83aa109
Compare
Description
This patch cleans up the timestamp extraction policy from the physical
plan. Instead, we include a pojo called TimestampColumn, that includes
the information necessary to construction the timestamp extractor -
the column name, and optionally a format string. This is part of a larger
effort to decouple the persisted plan from KSQL's internals.
This is part of an effort to get the plan schemas to the form proposed
here: #3969