Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARTITION BY should support nested fields #2218

Closed
rmoff opened this issue Nov 30, 2018 · 3 comments
Closed

PARTITION BY should support nested fields #2218

rmoff opened this issue Nov 30, 2018 · 3 comments

Comments

@rmoff
Copy link
Member

rmoff commented Nov 30, 2018

ksql> DESCRIBE CUSTOMERS_STREAM_SRC;

Name                 : CUSTOMERS_STREAM_SRC
 Field   | Type
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 ROWTIME | BIGINT           (system)
 ROWKEY  | VARCHAR(STRING)  (system)
 BEFORE  | STRUCT<ID BIGINT, FIRST_NAME VARCHAR(STRING), LAST_NAME VARCHAR(STRING), EMAIL VARCHAR(STRING), GENDER VARCHAR(STRING), CLUB_STATUS VARCHAR(STRING), COMMENTS VARCHAR(STRING), CREATE_TS BIGINT, UPDATE_TS BIGINT>
 AFTER   | STRUCT<ID BIGINT, FIRST_NAME VARCHAR(STRING), LAST_NAME VARCHAR(STRING), EMAIL VARCHAR(STRING), GENDER VARCHAR(STRING), CLUB_STATUS VARCHAR(STRING), COMMENTS VARCHAR(STRING), CREATE_TS BIGINT, UPDATE_TS BIGINT>
 SOURCE  | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, TXID VARCHAR(STRING), SCN BIGINT, SNAPSHOT BOOLEAN>
 OP      | VARCHAR(STRING)
 TS_MS   | BIGINT
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

Try to repartition stream based on AFTER->ID:

ksql> CREATE STREAM CUSTOMERS_STREAM WITH (PARTITIONS=1) AS SELECT * FROM CUSTOMERS_STREAM_SRC PARTITION BY AFTER->ID;
line 1:224: mismatched input '->' expecting ';'
Caused by: org.antlr.v4.runtime.InputMismatchException

Workaround is a two-stream approach, to flatten the column and then re-partition:

CREATE STREAM CUSTOMERS_STREAM1 WITH (PARTITIONS=1) AS SELECT AFTER->ID AS ID ,AFTER->FIRST_NAME AS FIRST_NAME ,AFTER->LAST_NAME AS LAST_NAME ,AFTER->EMAIL AS EMAIL ,AFTER->GENDER AS GENDER ,AFTER->CLUB_STATUS AS CLUB_STATUS ,AFTER->COMMENTS AS COMMENTS FROM CUSTOMERS_STREAM_SRC;

CREATE STREAM CUSTOMERS_STREAM WITH (PARTITIONS=1) AS SELECT * FROM CUSTOMERS_STREAM1 PARTITION BY ID;
@terryf82
Copy link

Looks like it is possible to get by with just one stream, as long as you select & alias the nested field first?

CREATE STREAM CUSTOMERS_STREAM WITH (PARTITIONS=1) AS SELECT AFTER->ID AS ID, .... PARTITION BY ID;

It even looks to work with CASE logic, too -

CREATE STREAM CUSTOMERS_STREAM WITH (PARTITIONS=1) AS SELECT CASE WHEN OP = 'd' THEN BEFORE->ID ELSE AFTER->ID END AS ID, .... PARTITION BY ID;

@agavra
Copy link
Contributor

agavra commented Dec 10, 2019

I believe this should be fixed by #4018 - I'll add a QTT test to verify this. @terryf82 - I think your suggestion will no longer work in the next release because of #3982.

@agavra
Copy link
Contributor

agavra commented Dec 10, 2019

Confirmed this works:

    {
      "name": "stream | initially set | partition by struct dereference expression | key in value | no aliasing",
      "statements": [
        "CREATE STREAM INPUT (foo VARCHAR, bar STRUCT<val VARCHAR>) WITH (kafka_topic='input_topic', key='foo', value_format='JSON');",
        "CREATE STREAM OUTPUT AS SELECT bar FROM INPUT PARTITION BY bar->val;"
      ],
      "inputs": [
        {"topic": "input_topic", "key": "1", "value": {"foo": "1", "bar": {"val":  "val"}}}
      ],
      "outputs": [
        {"topic": "OUTPUT", "key": "val", "value": {"BAR": {"VAL": "val"}}}
      ],
      "post": {
        "sources": [
          {"name": "OUTPUT", "type": "stream", "keyField": null}
        ]
      }
    },

I won't commit this because there is already a test for expression support and this doesn't add any coverage.

@agavra agavra closed this as completed Dec 10, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants