Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add checking for new checkpoint #14353

Merged
merged 13 commits into from
Sep 4, 2023
Merged

Add checking for new checkpoint #14353

merged 13 commits into from
Sep 4, 2023

Conversation

panhongan
Copy link
Contributor

@panhongan panhongan commented May 30, 2023

Fixes #XXXX.

org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.CheckpointNotice#handle()

Description

There was NPE when parsed the checkpoint in context.
I checked from the overlord log.

  1. Previous task
2023-05-19T02:49:20,653 INFO [qtp1751325469-204] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Checkpointing [KafkaDataSourceMetadata{Seekable
StreamStartSequenceNumbers=SeekableStreamStartSequenceNumbers{stream='xxxxxxxxx', partitionSequenceNumberMap={16=24076850286}, exclusivePar
titions=[]}}] for taskGroup [16]

2023-05-19T02:49:20,662 WARN [IndexTaskClient-xxxxxxxxx.10-6] org.apache.druid.indexing.common.IndexTaskClient - Exception while sending request
org.apache.druid.java.util.common.IAE: Received 400 Bad Request with body: Can't pause, task is not in a pausable state (state: [PAUSED])
        at org.apache.druid.indexing.common.IndexTaskClient.submitRequest(IndexTaskClient.java:356) ~[druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
        at org.apache.druid.indexing.common.IndexTaskClient.submitRequestWithEmptyContent(IndexTaskClient.java:220) ~[druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
        at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient.pause(SeekableStreamIndexTaskClient.java:108) ~[druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
        at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient.lambda$pauseAsync$4(SeekableStreamIndexTaskClient.java:339) ~[druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_222]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
2023-05-19T02:49:20,668 INFO [KafkaSupervisor-xxxxxxxxx-Worker-7] org.apache.druid.indexing.overlord.RemoteTaskRunner - Shutdown [index_kafka_xxxxxxxxx_0ac74dd2c82397f_knoggfhl] because: [An exception occured while waiting for task [index_kafka_xxxxxxxxx_0ac74dd2c82397f_knoggfhl] to pause: [org.apache.druid.java.util.common.IAE: Received 400 Bad Request with body: Can't pause, task is not in a pausable state (state: [PAUSED])]]

2023-05-19T02:49:20,778 INFO [KafkaSupervisor-xxxxxxxxx-Worker-7] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - All tasks in taskGroup [16] have failed, tasks will be re-created
2023-05-19T02:49:20,778 INFO [KafkaSupervisor-xxxxxxxxx] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Handled checkpoint notice, new checkpoint is [null] for taskGroup [16]
  1. Relay task
2023-05-19T02:58:04,739 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception while running task.
java.lang.NullPointerException: null
	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:213) ~[guava-16.0.1.jar:?]
	at org.apache.druid.indexing.seekablestream.SequenceMetadata.<init>(SequenceMetadata.java:80) ~[druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.initializeSequences(SeekableStreamIndexTaskRunner.java:329) ~[druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:382) ~[druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:277) [druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:164) [druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:421) [druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:393) [druid-indexing-service-0.19.0-iap9.jar:0.19.0-iap9]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_222]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

And the checkpoint contains null, like:

"context" : {
    "checkpoints" : "{\"0\":{\"16\":24076850286},\"1\":null}",
    "IS_INCREMENTAL_HANDOFF_SUPPORTED" : true,
    "forceTimeChunkLock" : true
  }

Fixed the bug ...

final Map<PartitionIdType, SequenceOffsetType> newCheckpoint = checkpointTaskGroup(taskGroup, false).get();
if (MapUtils.isNotEmpty(newCheckpoint)) {
  taskGroup.addNewCheckpoint(newCheckpoint);
  log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId);
} else {
  log.warn("New checkpoint is null for taskGroup [%s]", taskGroupId);
}

Release note


Key changed classes in this PR
  • org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.CheckpointNotice

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@panhongan panhongan marked this pull request as draft May 30, 2023 07:50
@panhongan panhongan marked this pull request as ready for review May 30, 2023 08:52
@cryptoe
Copy link
Contributor

cryptoe commented Jun 6, 2023

@panhongan Have you tested this on a local druid cluster ?

@panhongan
Copy link
Contributor Author

@panhongan Have you tested this on a local druid cluster ?

Yes, I tested in my local druid cluster.

@AmatyaAvadhanula
Copy link
Contributor

@panhongan thank you for testing this on a cluster. Could you please look at the failing checks as well?


EasyMock.reset(spec);
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [SeekableStreamSupervisorSpec.getDataSchema](1) should be avoided because it has been deprecated.
@panhongan
Copy link
Contributor Author

panhongan commented Aug 12, 2023

@panhongan thank you for testing this on a cluster. Could you please look at the failing checks as well?

@AmatyaAvadhanula @cryptoe @pranavbhole
I fixed the issue, can you help review and approve?

Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM @panhongan!

@AmatyaAvadhanula AmatyaAvadhanula merged commit d4e972e into apache:master Sep 4, 2023
jakubmatyszewski pushed a commit to jakubmatyszewski/druid that referenced this pull request Sep 8, 2023
Check that a checkpoint is non-empty before adding it to the checkpoint sequence 
in a SeekableStreamSupervisor
@LakshSingla LakshSingla added this to the 28.0 milestone Oct 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants