-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: move joins to plan builder #3361
feat: move joins to plan builder #3361
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rodesai
Few nits called out below. Main issue is that the generics on the new code are wrong. We should be striving to have correct generics on new code. See comments inline.
@@ -135,6 +136,28 @@ public KsqlQueryBuilder withKsqlConfig(final KsqlConfig newConfig) { | |||
.push(context); | |||
} | |||
|
|||
@SuppressWarnings("unchecked") | |||
public KeySerde<Object> buildKeySerde( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generics on this call are wrong. It's a pervasive anti-pattern within the code base, but it's wrong. Returning KeySerde<Object>
is saying its a key serde that can handle Object
s, but it can't. The returned serde can only handle either Struct
or Windowed<Struct>
. The fact that you're having to first cast to (KeySerde)
is a clear indication of this: KeySerde<Struct>
can not be cast to KeySerde<Object>
because they are not compatible types.
This is why there are two methods for explicitly handling windowed vs non-windowed serde.
The correct return type for this method is KeySerde<?>
i.e. a key serde of an unknown type. This of course gives up type safety, which we should avoid if we can.
The following requires no casts and generates no warnings, though it will of course require other code to change:
public KeySerde<?> buildKeySerde(
final KeyFormat keyFormat,
final PhysicalSchema physicalSchema,
final QueryContext queryContext
) {
if (keyFormat.getWindowInfo().isPresent()) {
return buildKeySerde(
keyFormat.getFormatInfo(),
keyFormat.getWindowInfo().get(),
physicalSchema,
queryContext
);
}
return buildKeySerde(
keyFormat.getFormatInfo(),
physicalSchema,
queryContext
);
}
Unfortunately, the generics on our key type are... foobared, and will require some effort to correct and give us type safety. When introducing new functionality or refactoring existing, we should endeavour to make it type safe and then us hacky casts at the point this interacts with old code.
private StreamStreamJoinBuilder() { | ||
} | ||
|
||
public static KStream<Object, GenericRow> build( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generics are wrong again. This should be KStream<?, GenericRow>
, which again is unfortunately giving up any type information on the key :(
Really, the signature of this should be:
public static <K> KStream<K, GenericRow> build(
final KStream<K, GenericRow> left,
final KStream<K, GenericRow> right,
final StreamStreamJoin join,
final KsqlQueryBuilder queryBuilder,
final JoinedFactory joinedFactory
) {
private StreamTableJoinBuilder() { | ||
} | ||
|
||
public static KStream<Object, GenericRow> build( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
public static KStream<Object, GenericRow> build( | ||
final KStream<Object, GenericRow> left, | ||
final KStream<Object, GenericRow> right, | ||
final StreamStreamJoin join, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use of raw type - this should have generic types set.
private TableTableJoinBuilder() { | ||
} | ||
|
||
public static KTable<Object, GenericRow> build( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above
private static final JoinWindows WINDOWS = JoinWindows.of(BEFORE).after(AFTER); | ||
|
||
@Mock | ||
private KStream leftStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test us using raw types. Can we please change the test to use generic types please for all these fields.
Same goes for the other tests.
1912094
to
b55447b
Compare
b55447b
to
20b3c47
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rodesai, LGTM 'cept a few remaining raw types...
import org.apache.kafka.streams.kstream.KTable; | ||
|
||
public final class StreamTableJoinBuilder { | ||
private static final String SERDE_CTX = "left"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels like this should be somewhere it can be shared with other things that do a join and need the same constant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left them separate because the names don't need to be the same for different join types.
@Mock | ||
private JoinedFactory joinedFactory; | ||
|
||
private TableTableJoin join; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raw type?
private TableTableJoin join; | ||
|
||
@Rule | ||
public final MockitoRule mockitoRule = MockitoJUnit.rule(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per other PR - can we use @RunWith(MockitoJUnitRunner.class)
please?
@SuppressWarnings("unchecked") | ||
private void givenOuterJoin() { | ||
when(leftTable.outerJoin(any(KTable.class), any())).thenReturn(resultTable); | ||
join = new TableTableJoin( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raw type?
Moves code for joining kstreams/ktables out of SchemaKStream/Table and into execution plan builders. Also moves KsqlValueJoiner into ksql-streams, so that it can be used from the plan builders. Also adds a buildKeySerde API for building both windowed and unwindowed keys, since most of the plan builders dont have type information for the key.
20b3c47
to
e79abcc
Compare
Description
Moves code for joining kstreams/ktables out of SchemaKStream/Table
and into execution plan builders. Also moves KsqlValueJoiner into
ksql-streams, so that it can be used from the plan builders. Also
adds a buildKeySerde API for building both windowed and unwindowed
keys, since most of the plan builders dont have type information
for the key.
Testing done
Added unit tests for step builders
Reviewer checklist