Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using table functions on table source results in NPE #4033

Closed
big-andy-coates opened this issue Dec 3, 2019 · 1 comment · Fixed by #4085
Closed

Using table functions on table source results in NPE #4033

big-andy-coates opened this issue Dec 3, 2019 · 1 comment · Fixed by #4085
Assignees
Milestone

Comments

@big-andy-coates
Copy link
Contributor

big-andy-coates commented Dec 3, 2019

To Reproduce
Steps to reproduce the behavior, include:

Add this test to table-functions.json:

    {
      "name": "with table source",
      "statements": [
        "CREATE TABLE TEST (ID BIGINT, MY_ARR ARRAY<BIGINT>) WITH (kafka_topic='test_topic', KEY='ID', value_format='JSON');",
        "CREATE TABLE OUTPUT AS SELECT ID, EXPLODE(MY_ARR) VAL FROM TEST;"
      ],
      "inputs": [
        {"topic": "test_topic", "key": 0, "value": {"ID":  0, "MY_ARR": [1, 2]}},
        {"topic": "test_topic", "key": 1, "value": {"ID":  1, "MY_ARR": [3, 4]}}
      ],
      "outputs": [
        {"topic": "OUTPUT", "key": "0", "value": {"ID":  0, "VAL": 1}},
        {"topic": "OUTPUT", "key": "0", "value": {"ID":  0, "VAL": 2}},
        {"topic": "OUTPUT", "key": "1", "value": {"ID":  1, "VAL": 3}},
        {"topic": "OUTPUT", "key": "1", "value": {"ID":  1, "VAL": 4}}
      ]
    }

Expected behavior

For the test case to pass, (may need some tweaking as its just a cut & paste job).

Actual behaviour

NPE thrown :

io.confluent.ksql.util.KsqlStatementException: source
Statement: CREATE TABLE OUTPUT AS SELECT
  TEST.ID ID,
  EXPLODE(TEST.MY_ARR) VAL
FROM TEST TEST
EMIT CHANGES
Statement: CREATE TABLE OUTPUT AS SELECT ID, EXPLODE(MY_ARR) VAL FROM TEST;

	at io.confluent.ksql.test.tools.TestExecutorUtil.execute(TestExecutorUtil.java:321)
	at io.confluent.ksql.test.tools.TestExecutorUtil.lambda$execute$3(TestExecutorUtil.java:267)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at io.confluent.ksql.test.tools.TestExecutorUtil.execute(TestExecutorUtil.java:278)
	at io.confluent.ksql.test.tools.TestExecutorUtil.doBuildQueries(TestExecutorUtil.java:210)
	at io.confluent.ksql.test.tools.TestExecutorUtil.buildStreamsTopologyTestDrivers(TestExecutorUtil.java:100)
	at io.confluent.ksql.test.tools.TestExecutor.buildAndExecuteQuery(TestExecutor.java:135)
	at io.confluent.ksql.test.EndToEndEngineTestUtil.shouldBuildAndExecuteQuery(EndToEndEngineTestUtil.java:46)
	at io.confluent.ksql.test.QueryTranslationTest.shouldBuildAndExecuteQueries(QueryTranslationTest.java:82)
	at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runners.Suite.runChild(Suite.java:128)
	at org.junit.runners.Suite.runChild(Suite.java:27)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
	at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: java.lang.NullPointerException: source
	at java.util.Objects.requireNonNull(Objects.java:228)
	at io.confluent.ksql.execution.plan.StreamFlatMap.<init>(StreamFlatMap.java:40)
	at io.confluent.ksql.execution.streams.ExecutionStepFactory.streamFlatMap(ExecutionStepFactory.java:195)
	at io.confluent.ksql.structured.SchemaKStream.flatMap(SchemaKStream.java:539)
	at io.confluent.ksql.planner.plan.FlatMapNode.buildStream(FlatMapNode.java:113)
	at io.confluent.ksql.planner.plan.ProjectNode.buildStream(ProjectNode.java:102)
	at io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode.buildStream(KsqlStructuredDataOutputNode.java:114)
	at io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:73)
	at io.confluent.ksql.engine.QueryEngine.buildPhysicalPlan(QueryEngine.java:105)
	at io.confluent.ksql.engine.EngineExecutor.planQuery(EngineExecutor.java:201)
	at io.confluent.ksql.engine.EngineExecutor.plan(EngineExecutor.java:152)
	at io.confluent.ksql.engine.KsqlEngine.plan(KsqlEngine.java:176)
	at io.confluent.ksql.engine.KsqlEngine.execute(KsqlEngine.java:196)
	at io.confluent.ksql.test.tools.TestExecutorUtil.execute(TestExecutorUtil.java:314)
	... 43 more

@purplefox
Copy link
Contributor

This is happening because the source is a SchemaKTable not a SchemaKStream - but the flat mapping code needs a SchemaKStream and the underlying KS KTable doesn't support FlatMap anyway.

We could just explicitly disallow table functions where the source is a table. Easy workaround here is for the user to create the source as a stream, is there any advantage of them using a table here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants