Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clean up timestamp extraction policy from physical plan #3936

Merged

Conversation

rodesai
Copy link
Contributor

@rodesai rodesai commented Nov 21, 2019

Description

This patch cleans up the timestamp extraction policy from the physical
plan. Instead, we include a pojo called TimestampColumn, that includes
the information necessary to construction the timestamp extractor -
the column name, and optionally a format string. This is part of a larger
effort to decouple the persisted plan from KSQL's internals.

This is part of an effort to get the plan schemas to the form proposed
here: #3969

@rodesai rodesai requested a review from a team as a code owner November 21, 2019 03:49
@rodesai rodesai force-pushed the clean-up-timestamp-extractor-from-ksqlplan branch 2 times, most recently from 9a34136 to b215558 Compare November 22, 2019 09:00
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this change @rodesai

nits and comments below.

@@ -13,7 +13,7 @@
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.util.timestamp;
package io.confluent.ksql.execution.timestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't it make more sense for this to belong in the KS specific ksql-streams module, given its dealing with KS types?

Same for the actual extraction policies themselves?

In fact, do we even need the extraction policies now? Can we not just convert the TimestampColumn straight into a TimestampExtractor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, do we even need the extraction policies now? Can we not just convert the TimestampColumn straight into a TimestampExtractor?

Yeah, I noticed this too. Left it out of this patch though.

@@ -172,7 +171,7 @@ public void setUp() {
ORDERS_SCHEMA,
SerdeOption.none(),
KeyField.of(ColumnRef.withoutSource(ColumnName.of("ORDERTIME"))),
new MetadataTimestampExtractionPolicy(),
Optional.empty(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth having at least one test that has column and testing its formatted correctly?

@@ -83,7 +83,7 @@ public void tearDown() {
schema,
SerdeOption.none(),
KeyField.of(schema.value().get(0).ref()),
new MetadataTimestampExtractionPolicy(),
Optional.empty(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, probably worth augmenting existing tests to test this works with a extractor policy

"$ref" : "#/definitions/LongColumnTimestampExtractionPolicy"
} ]
"timestampColumn" : {
"$ref" : "#/definitions/TimestampColumn"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much nicer!

Comment on lines +177 to +191
private static TimestampExtractor timestampExtractor(
final KsqlConfig ksqlConfig,
final LogicalSchema sourceSchema,
final Optional<TimestampColumn> timestampColumn) {
final TimestampExtractionPolicy timestampPolicy = TimestampExtractionPolicyFactory.create(
ksqlConfig,
sourceSchema,
timestampColumn
);
final int timestampIndex = timestampColumn.map(TimestampColumn::getColumn)
.map(c -> sourceSchema.valueColumnIndex(c).orElseThrow(IllegalStateException::new))
.orElse(-1);
return timestampPolicy.create(timestampIndex);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, I think this code can be simplified more. Do we even need our own TimestampExtractionPolicy any more? Why not just have

   final TimestampExtractor extractor = TimestampExtractorFactory.create(ksqlConfig, sourceSchema, timestampColumn);

i.e. get rid of TimestampExtractionPolicy, rename TimestampExtractionPolicyFactory -> TimestampExtractorFactory and move this logic into that class and have it return TimestampExtractor.


This PR makes the code cleaner, which is awesome. If you make this change it will be cleaner still.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was thinking the same thing. Left it for a follow-up though, to keep this PR focused.

This patch cleans up the timestamp extraction policy from the physical
plan. Instead, we include a pojo called TimestampColumn, that includes
the information necessary to construction the timestamp extractor -
the column name, and optionally a format string.
@rodesai rodesai force-pushed the clean-up-timestamp-extractor-from-ksqlplan branch from b215558 to 83aa109 Compare November 27, 2019 08:09
@rodesai rodesai merged commit cb817aa into confluentinc:master Nov 27, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants