Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move joins to plan builder #3361

Merged
merged 1 commit into from
Sep 20, 2019

Conversation

rodesai
Copy link
Contributor

@rodesai rodesai commented Sep 16, 2019

Description

Moves code for joining kstreams/ktables out of SchemaKStream/Table
and into execution plan builders. Also moves KsqlValueJoiner into
ksql-streams, so that it can be used from the plan builders. Also
adds a buildKeySerde API for building both windowed and unwindowed
keys, since most of the plan builders dont have type information
for the key.

Testing done

Added unit tests for step builders

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@rodesai rodesai requested a review from a team as a code owner September 16, 2019 18:33
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rodesai

Few nits called out below. Main issue is that the generics on the new code are wrong. We should be striving to have correct generics on new code. See comments inline.

@@ -135,6 +136,28 @@ public KsqlQueryBuilder withKsqlConfig(final KsqlConfig newConfig) {
.push(context);
}

@SuppressWarnings("unchecked")
public KeySerde<Object> buildKeySerde(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generics on this call are wrong. It's a pervasive anti-pattern within the code base, but it's wrong. Returning KeySerde<Object> is saying its a key serde that can handle Objects, but it can't. The returned serde can only handle either Struct or Windowed<Struct>. The fact that you're having to first cast to (KeySerde) is a clear indication of this: KeySerde<Struct> can not be cast to KeySerde<Object> because they are not compatible types.

This is why there are two methods for explicitly handling windowed vs non-windowed serde.

The correct return type for this method is KeySerde<?> i.e. a key serde of an unknown type. This of course gives up type safety, which we should avoid if we can.

The following requires no casts and generates no warnings, though it will of course require other code to change:

   public KeySerde<?> buildKeySerde(
      final KeyFormat keyFormat,
      final PhysicalSchema physicalSchema,
      final QueryContext queryContext
  ) {
    if (keyFormat.getWindowInfo().isPresent()) {
      return buildKeySerde(
          keyFormat.getFormatInfo(),
          keyFormat.getWindowInfo().get(),
          physicalSchema,
          queryContext
      );
    }

    return buildKeySerde(
        keyFormat.getFormatInfo(),
        physicalSchema,
        queryContext
    );
  }

Unfortunately, the generics on our key type are... foobared, and will require some effort to correct and give us type safety. When introducing new functionality or refactoring existing, we should endeavour to make it type safe and then us hacky casts at the point this interacts with old code.

private StreamStreamJoinBuilder() {
}

public static KStream<Object, GenericRow> build(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generics are wrong again. This should be KStream<?, GenericRow>, which again is unfortunately giving up any type information on the key :(

Really, the signature of this should be:

public static <K>  KStream<K, GenericRow> build(
     final KStream<K, GenericRow> left,
      final KStream<K, GenericRow> right,
      final StreamStreamJoin join,
      final KsqlQueryBuilder queryBuilder,
      final JoinedFactory joinedFactory
) {

private StreamTableJoinBuilder() {
}

public static KStream<Object, GenericRow> build(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

public static KStream<Object, GenericRow> build(
final KStream<Object, GenericRow> left,
final KStream<Object, GenericRow> right,
final StreamStreamJoin join,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of raw type - this should have generic types set.

private TableTableJoinBuilder() {
}

public static KTable<Object, GenericRow> build(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above

private static final JoinWindows WINDOWS = JoinWindows.of(BEFORE).after(AFTER);

@Mock
private KStream leftStream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test us using raw types. Can we please change the test to use generic types please for all these fields.

Same goes for the other tests.

@rodesai rodesai force-pushed the move-join-to-plan-builder branch from 1912094 to b55447b Compare September 18, 2019 17:48
@rodesai rodesai force-pushed the move-join-to-plan-builder branch from b55447b to 20b3c47 Compare September 18, 2019 22:19
@big-andy-coates big-andy-coates requested a review from a team September 19, 2019 15:55
Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rodesai, LGTM 'cept a few remaining raw types...

import org.apache.kafka.streams.kstream.KTable;

public final class StreamTableJoinBuilder {
private static final String SERDE_CTX = "left";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like this should be somewhere it can be shared with other things that do a join and need the same constant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left them separate because the names don't need to be the same for different join types.

@Mock
private JoinedFactory joinedFactory;

private TableTableJoin join;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raw type?

private TableTableJoin join;

@Rule
public final MockitoRule mockitoRule = MockitoJUnit.rule();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per other PR - can we use @RunWith(MockitoJUnitRunner.class) please?

@SuppressWarnings("unchecked")
private void givenOuterJoin() {
when(leftTable.outerJoin(any(KTable.class), any())).thenReturn(resultTable);
join = new TableTableJoin(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raw type?

Moves code for joining kstreams/ktables out of SchemaKStream/Table
and into execution plan builders. Also moves KsqlValueJoiner into
ksql-streams, so that it can be used from the plan builders. Also
adds a buildKeySerde API for building both windowed and unwindowed
keys, since most of the plan builders dont have type information
for the key.
@rodesai rodesai force-pushed the move-join-to-plan-builder branch from 20b3c47 to e79abcc Compare September 20, 2019 04:31
@rodesai rodesai merged commit e243c74 into confluentinc:master Sep 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants