-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support kafka transactional topics #6496
Conversation
* update kafka to version 2.0.0 * Remove the skipOffsetGaps option since it's not used anymore * Adjust kafka consumer to use transactional semantics * Update tests
@surekhasaharan There's an unused import that is tripping the CI:
|
Meant to fix #5404. (Just commenting here so it shows up in the PR links. @surekhasaharan - FYI - I see you included it in the title but GitHub doesn't automatically create links out of the title. It should be in the description.) |
ok thanks, good to know. |
I saw an integration test error on part 1, and restarted it. I'm not sure if it's an honest error or not. |
thanks @gianm , IT test failure looks legit, I will take a look. |
…e kafka firehose test and deprecate kafka-eight classes
&& assignment.remove(record.partition())) { | ||
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); | ||
KafkaIndexTask.assignPartitions(consumer, topic, assignment); | ||
stillReading = !assignment.isEmpty(); | ||
} | ||
} | ||
|
||
if (nextOffsets.get(currentPartition) != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be placed inside the loop, otherwise it only updates the nextOffsets
of at most 1 partition. In the case where the polled records
contain records of more than 1 partition, the nextOffsets of all those partitions should be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, inside the loop nextOffsets
is updated by nextOffsets.put(record.partition(), record.offset() + 1);
That said, let me double check, I'm a little rusty on this PR now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply. If I understand correctly, the line nextOffsets.put(currentPartition, endOffsets.get(currentPartition));
is supposed to update the nextOffsets
of the partitions with polled offsets that have exceeded the partition's endOffsets
. But what if, in the list of polled records
, 2 or more partitions have offsets that exceed their respective endOffsets. Then since nextOffsets.put(currentPartition, endOffsets.get(currentPartition));
is outside the loop, it would only update the nextOffsets
of 1 such partition. I remember talking to @jihoonson about this, he may know more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember I tried to test these changes when working on #6431 but unit tests were failing because of the above issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* Support kafka transactional topics * update kafka to version 2.0.0 * Remove the skipOffsetGaps option since it's not used anymore * Adjust kafka consumer to use transactional semantics * Update tests * Remove unused import from test * Fix compilation * Invoke transaction api to fix a unit test * temporary modification of travis.yml for debugging * another attempt to get travis tasklogs * update kafka to 2.0.1 at all places * Remove druid-kafka-eight dependency from integration-tests, remove the kafka firehose test and deprecate kafka-eight classes * Add deprecated in docs for kafka-eight and kafka-simple extensions * Remove skipOffsetGaps and code changes for transaction support * Fix indentation * remove skipOffsetGaps from kinesis * Add transaction api to KafkaRecordSupplierTest * Fix indent * Fix test * update kafka version to 2.1.0
// for Kafka, the end offsets are exclusive, so skip it | ||
if (isEndSequenceOffsetsExclusive() && | ||
createSequenceNumber(record.getSequenceNumber()).compareTo( | ||
createSequenceNumber(endOffsets.get(record.getPartitionId()))) == 0) { | ||
createSequenceNumber(endOffsets.get(record.getPartitionId()))) >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this possible that the sequence number of record
is larger than the end offset? The end offset must be either unlimited
or max(actually consumed offsets of all replicas)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i though it could be possible with transactions enabled, as some of the offsets could be used by transactional markers. I did not realize that end offset is unlimited
or max, where is that set ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial value of end offset is unlimited, and is updated later in setEndOffsets()
. The endOffsets are sent from the supervisor in setEndOffsets()
which is max of offsets of all running replicas.
* Support kafka transactional topics * update kafka to version 2.0.0 * Remove the skipOffsetGaps option since it's not used anymore * Adjust kafka consumer to use transactional semantics * Update tests * Remove unused import from test * Fix compilation * Invoke transaction api to fix a unit test * temporary modification of travis.yml for debugging * another attempt to get travis tasklogs * update kafka to 2.0.1 at all places * Remove druid-kafka-eight dependency from integration-tests, remove the kafka firehose test and deprecate kafka-eight classes * Add deprecated in docs for kafka-eight and kafka-simple extensions * Remove skipOffsetGaps and code changes for transaction support * Fix indentation * remove skipOffsetGaps from kinesis * Add transaction api to KafkaRecordSupplierTest * Fix indent * Fix test * update kafka version to 2.1.0
This PR adds support for transactional kafka by updating kafka version to latest and modifying code to support this. Found that integration-tests had a dependency on
druid-kafka-eight
which is removed now as well as the integration test for kafka firehose. Kafka-eight usesConsumerConnector
which is removed from latest version of kafka, so this module is incompatible with latest kafka version as is. One option is to fix it to work with latest versions or remove it (as was attempted before with #5841). So with this PR, I am only removing the integration test and deprecatingkafka-eight
. We might want to remove it completely if it's unused and no one wants to keep it.kafka-eight
integration testkafka-eight
module classes