Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support kafka transactional topics #6496

Merged
merged 30 commits into from
Feb 18, 2019

Conversation

surekhasaharan
Copy link

@surekhasaharan surekhasaharan commented Oct 19, 2018

This PR adds support for transactional kafka by updating kafka version to latest and modifying code to support this. Found that integration-tests had a dependency on druid-kafka-eight which is removed now as well as the integration test for kafka firehose. Kafka-eight uses ConsumerConnector which is removed from latest version of kafka, so this module is incompatible with latest kafka version as is. One option is to fix it to work with latest versions or remove it (as was attempted before with #5841). So with this PR, I am only removing the integration test and deprecating kafka-eight. We might want to remove it completely if it's unused and no one wants to keep it.

  • update kafka version to latest v2.0.1
  • Remove the skipOffsetGaps option since it's not used or required anymore
  • Adjust kafka consumer to use transactional semantics
  • Remove kafka-eight integration test
  • Deprecate kafka-eight module classes
  • Update tests
  • Update docs

* update kafka to version 2.0.0
* Remove the skipOffsetGaps option since it's not used anymore
* Adjust kafka consumer to use transactional semantics
* Update tests
@gianm
Copy link
Contributor

gianm commented Oct 20, 2018

@surekhasaharan There's an unused import that is tripping the CI:

[ERROR] /home/travis/build/apache/incubator-druid/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java:187:8: Unused import - java.util.concurrent.atomic.AtomicInteger. [UnusedImports]

@gianm
Copy link
Contributor

gianm commented Oct 20, 2018

Meant to fix #5404. (Just commenting here so it shows up in the PR links. @surekhasaharan - FYI - I see you included it in the title but GitHub doesn't automatically create links out of the title. It should be in the description.)

@surekhasaharan
Copy link
Author

Meant to fix #5404. (Just commenting here so it shows up in the PR links. @surekhasaharan - FYI - I see you included it in the title but GitHub doesn't automatically create links out of the title. It should be in the description.)

ok thanks, good to know.

@gianm
Copy link
Contributor

gianm commented Oct 21, 2018

I saw an integration test error on part 1, and restarted it. I'm not sure if it's an honest error or not.

@surekhasaharan
Copy link
Author

thanks @gianm , IT test failure looks legit, I will take a look.

@gianm gianm added the WIP label Oct 25, 2018
@jihoonson jihoonson removed the WIP label Nov 21, 2018
@jihoonson jihoonson self-assigned this Nov 27, 2018
&& assignment.remove(record.partition())) {
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
KafkaIndexTask.assignPartitions(consumer, topic, assignment);
stillReading = !assignment.isEmpty();
}
}

if (nextOffsets.get(currentPartition) != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be placed inside the loop, otherwise it only updates the nextOffsets of at most 1 partition. In the case where the polled records contain records of more than 1 partition, the nextOffsets of all those partitions should be updated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, inside the loop nextOffsets is updated by nextOffsets.put(record.partition(), record.offset() + 1); That said, let me double check, I'm a little rusty on this PR now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. If I understand correctly, the line nextOffsets.put(currentPartition, endOffsets.get(currentPartition)); is supposed to update the nextOffsets of the partitions with polled offsets that have exceeded the partition's endOffsets. But what if, in the list of polled records, 2 or more partitions have offsets that exceed their respective endOffsets. Then since nextOffsets.put(currentPartition, endOffsets.get(currentPartition)); is outside the loop, it would only update the nextOffsets of 1 such partition. I remember talking to @jihoonson about this, he may know more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember I tried to test these changes when working on #6431 but unit tests were failing because of the above issue.

@gianm gianm mentioned this pull request Jan 11, 2019
@jon-wei jon-wei removed this from the 0.14.0 milestone Feb 5, 2019
@surekhasaharan surekhasaharan removed the WIP label Feb 6, 2019
Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@gianm gianm merged commit 80a2ef7 into apache:master Feb 18, 2019
gianm pushed a commit to implydata/druid-public that referenced this pull request Mar 9, 2019
* Support kafka transactional topics

* update kafka to version 2.0.0
* Remove the skipOffsetGaps option since it's not used anymore
* Adjust kafka consumer to use transactional semantics
* Update tests

* Remove unused import from test

* Fix compilation

* Invoke transaction api to fix a unit test

* temporary modification of travis.yml for debugging

* another attempt to get travis tasklogs

* update kafka to 2.0.1 at all places

* Remove druid-kafka-eight dependency from integration-tests, remove the kafka firehose test and deprecate kafka-eight classes

* Add deprecated in docs for kafka-eight and kafka-simple extensions

* Remove skipOffsetGaps and code changes for transaction support

* Fix indentation

* remove skipOffsetGaps from kinesis

* Add transaction api to KafkaRecordSupplierTest

* Fix indent

* Fix test

* update kafka version to 2.1.0
// for Kafka, the end offsets are exclusive, so skip it
if (isEndSequenceOffsetsExclusive() &&
createSequenceNumber(record.getSequenceNumber()).compareTo(
createSequenceNumber(endOffsets.get(record.getPartitionId()))) == 0) {
createSequenceNumber(endOffsets.get(record.getPartitionId()))) >= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this possible that the sequence number of record is larger than the end offset? The end offset must be either unlimited or max(actually consumed offsets of all replicas).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i though it could be possible with transactions enabled, as some of the offsets could be used by transactional markers. I did not realize that end offset is unlimited or max, where is that set ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial value of end offset is unlimited, and is updated later in setEndOffsets(). The endOffsets are sent from the supervisor in setEndOffsets() which is max of offsets of all running replicas.

@surekhasaharan surekhasaharan deleted the transactional-kafka branch April 8, 2019 22:48
gianm pushed a commit to implydata/druid-public that referenced this pull request Apr 9, 2019
* Support kafka transactional topics

* update kafka to version 2.0.0
* Remove the skipOffsetGaps option since it's not used anymore
* Adjust kafka consumer to use transactional semantics
* Update tests

* Remove unused import from test

* Fix compilation

* Invoke transaction api to fix a unit test

* temporary modification of travis.yml for debugging

* another attempt to get travis tasklogs

* update kafka to 2.0.1 at all places

* Remove druid-kafka-eight dependency from integration-tests, remove the kafka firehose test and deprecate kafka-eight classes

* Add deprecated in docs for kafka-eight and kafka-simple extensions

* Remove skipOffsetGaps and code changes for transaction support

* Fix indentation

* remove skipOffsetGaps from kinesis

* Add transaction api to KafkaRecordSupplierTest

* Fix indent

* Fix test

* update kafka version to 2.1.0
@jihoonson jihoonson added this to the 0.15.0 milestone May 16, 2019
@jihoonson jihoonson changed the title Support kafka transactional topics (#5404) Support kafka transactional topics Sep 16, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants