Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: backport #5781 to 6.0.x as a workaround for KAFKA-10179 #5788

Merged
merged 2 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void shouldAggregateWithNoWindow() {
// Then:
assertOutputOf(resultStream0, expected, is(expected));
assertTableCanBeUsedAsSource(expected, is(expected));
assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT, 4, resultStream0, resultStream1);
assertTopicsCleanedUp(TopicCleanupPolicy.COMPACT, 5, resultStream0, resultStream1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyDescription;
Expand Down Expand Up @@ -171,6 +172,7 @@ public void before() {
when(ksqlStreamBuilder.buildValueSerde(any(), any(), any())).thenReturn(rowSerde);
when(ksqlStreamBuilder.getFunctionRegistry()).thenReturn(functionRegistry);

when(rowSerde.serializer()).thenReturn(mock(Serializer.class));
when(rowSerde.deserializer()).thenReturn(mock(Deserializer.class));

when(dataSource.getKsqlTopic()).thenReturn(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static io.confluent.ksql.planner.plan.JoinNode.JoinType.LEFT;
import static io.confluent.ksql.planner.plan.JoinNode.JoinType.OUTER;
import static io.confluent.ksql.planner.plan.PlanTestUtil.SOURCE_NODE;
import static io.confluent.ksql.planner.plan.PlanTestUtil.SOURCE_NODE_FORCE_CHANGELOG;
import static io.confluent.ksql.planner.plan.PlanTestUtil.getNodeByName;
import static java.util.Optional.empty;
import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -216,11 +217,11 @@ public void shouldBuildSourceNode() {
setupTopicClientExpectations(1, 1);
buildJoin();
final TopologyDescription.Source node = (TopologyDescription.Source) getNodeByName(
builder.build(), SOURCE_NODE);
builder.build(), SOURCE_NODE_FORCE_CHANGELOG);
final List<String> successors = node.successors().stream().map(TopologyDescription.Node::name)
.collect(Collectors.toList());
assertThat(node.predecessors(), equalTo(Collections.emptySet()));
assertThat(successors, equalTo(Collections.singletonList("KTABLE-SOURCE-0000000001")));
assertThat(successors, equalTo(Collections.singletonList("KTABLE-SOURCE-0000000002")));
assertThat(node.topicSet(), equalTo(ImmutableSet.of("test2")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ final class PlanTestUtil {

static final String TRANSFORM_NODE = "KSTREAM-TRANSFORMVALUES-0000000001";
static final String SOURCE_NODE = "KSTREAM-SOURCE-0000000000";
static final String SOURCE_NODE_FORCE_CHANGELOG = "KSTREAM-SOURCE-0000000001";

private PlanTestUtil() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@
@Immutable
public final class TableSource extends SourceStep<KTableHolder<Struct>> {

private final Boolean forceChangelog;

public TableSource(
@JsonProperty(value = "properties", required = true)
final ExecutionStepPropertiesV1 properties,
@JsonProperty(value = "topicName", required = true) final String topicName,
@JsonProperty(value = "formats", required = true) final Formats formats,
@JsonProperty("timestampColumn") final Optional<TimestampColumn> timestampColumn,
@JsonProperty(value = "sourceSchema", required = true) final LogicalSchema sourceSchema
@JsonProperty(value = "sourceSchema", required = true) final LogicalSchema sourceSchema,
@JsonProperty(value = "forceChangelog") final Optional<Boolean> forceChangelog
) {
super(properties, topicName, formats, timestampColumn, sourceSchema);
this.forceChangelog = forceChangelog.orElse(false);
}

public Boolean isForceChangelog() {
return forceChangelog;
}

@Override
Expand All @@ -55,11 +63,13 @@ public boolean equals(final Object o) {
&& Objects.equals(topicName, that.topicName)
&& Objects.equals(formats, that.formats)
&& Objects.equals(timestampColumn, that.timestampColumn)
&& Objects.equals(sourceSchema, that.sourceSchema);
&& Objects.equals(sourceSchema, that.sourceSchema)
&& Objects.equals(forceChangelog, that.forceChangelog);
}

@Override
public int hashCode() {
return Objects.hash(properties, topicName, formats, timestampColumn, sourceSchema);
return Objects.hash(
properties, topicName, formats, timestampColumn, sourceSchema, forceChangelog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,27 @@ public void shouldImplementEquals() {
new EqualsTester()
.addEqualityGroup(
new TableSource(
properties1, "topic1", formats1, Optional.of(timestamp1), schema1),
properties1, "topic1", formats1, Optional.of(timestamp1), schema1, Optional.of(false)),
new TableSource(
properties1, "topic1", formats1, Optional.of(timestamp1), schema1))
properties1, "topic1", formats1, Optional.of(timestamp1), schema1, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties2, "topic1", formats1, Optional.of(timestamp1), schema1))
properties2, "topic1", formats1, Optional.of(timestamp1), schema1, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties1, "topic2", formats1, Optional.of(timestamp1), schema1))
properties1, "topic2", formats1, Optional.of(timestamp1), schema1, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties1, "topic1", formats2, Optional.of(timestamp1), schema1))
properties1, "topic1", formats2, Optional.of(timestamp1), schema1, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties1, "topic1", formats1, Optional.of(timestamp2), schema1))
properties1, "topic1", formats1, Optional.of(timestamp2), schema1, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties1, "topic1", formats1, Optional.of(timestamp1), schema2))
properties1, "topic1", formats1, Optional.of(timestamp1), schema2, Optional.of(false)))
.addEqualityGroup(
new TableSource(
properties1, "topic1", formats1, Optional.of(timestamp1), schema1, Optional.of(true)))
.testEquals();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM I1 (ID INTEGER KEY, V0 INTEGER, V1 INTEGER) WITH (KAFKA_TOPIC='i1', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "I1",
"schema" : "`ID` INTEGER KEY, `V0` INTEGER, `V1` INTEGER",
"topicName" : "i1",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE TABLE I2 (ID INTEGER PRIMARY KEY, V0 INTEGER, V1 INTEGER) WITH (KAFKA_TOPIC='i2', VALUE_FORMAT='JSON');",
"ddlCommand" : {
"@type" : "createTableV1",
"sourceName" : "I2",
"schema" : "`ID` INTEGER KEY, `V0` INTEGER, `V1` INTEGER",
"topicName" : "i2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM I1 I1\nINNER JOIN I2 I2 ON ((AS_VALUE(I1.ID) = I2.ID))\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`I2_ID` INTEGER KEY, `I1_ID` INTEGER, `I1_V0` INTEGER, `I1_V1` INTEGER, `I2_V0` INTEGER, `I2_V1` INTEGER",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
}
},
"queryPlan" : {
"sources" : [ "I1", "I2" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamTableJoinV1",
"properties" : {
"queryContext" : "Join"
},
"joinType" : "INNER",
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"leftSource" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "PrependAliasLeft"
},
"source" : {
"@type" : "streamSelectKeyV2",
"properties" : {
"queryContext" : "LeftSourceKeyed"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Left/Source"
},
"topicName" : "i1",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` INTEGER KEY, `V0` INTEGER, `V1` INTEGER"
},
"keyExpression" : "AS_VALUE(ID)"
},
"keyColumnNames" : [ "I1_KSQL_COL_0" ],
"selectExpressions" : [ "V0 AS I1_V0", "V1 AS I1_V1", "ROWTIME AS I1_ROWTIME", "ID AS I1_ID", "KSQL_COL_0 AS I1_KSQL_COL_0" ]
},
"rightSource" : {
"@type" : "tableSelectV1",
"properties" : {
"queryContext" : "PrependAliasRight"
},
"source" : {
"@type" : "tableSourceV1",
"properties" : {
"queryContext" : "KafkaTopic_Right/Source"
},
"topicName" : "i2",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"sourceSchema" : "`ID` INTEGER KEY, `V0` INTEGER, `V1` INTEGER",
"forceChangelog" : true
},
"keyColumnNames" : [ "I2_ID" ],
"selectExpressions" : [ "V0 AS I2_V0", "V1 AS I2_V1", "ROWTIME AS I2_ROWTIME", "ID AS I2_ID" ]
},
"keyColName" : "I2_ID"
},
"keyColumnNames" : [ "I2_ID" ],
"selectExpressions" : [ "I1_ID AS I1_ID", "I1_V0 AS I1_V0", "I1_V1 AS I1_V1", "I2_V0 AS I2_V0", "I2_V1 AS I2_V1" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.error.classifier.regex" : ""
}
}
Loading