-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Validate X-Druid-Task-Id header in request/response and support retrying on outdated TaskLocation information, add KafkaIndexTaskClient unit tests #3006
Conversation
final Optional<ChatHandler> handler = handlers.get(handlerId); | ||
|
||
if (handler.isPresent()) { | ||
return handler.get(); | ||
} else { | ||
return Response.status(Response.Status.NOT_FOUND).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This return Response()
is a big troll; the code does return a 404 to the caller, but not for the reason you would think. It returns a 404 because there's no method in the returned object annotated with a @path param so the request doesn't get matched to any endpoint and thus the Jetty framework returns a 404. The Response object itself is not respected; in other words, you can't just set a response header on the object and have it propagate back to the caller.
Hence, I think the options are you'd either have to return a dummy object that would match any path so you would have access to the HTTP response, or you'd have to do it on the servlet filter level, which is what this PR does. I changed this to just return null since it does the exact same thing and might save someone else from being trolled in the future. :P
null | ||
); | ||
request = new Request(method, serviceUri.toURL()); | ||
request.addHeader("X-Druid-Task-Id", id); // used to validate that we are talking to the correct worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use teh constant defined for this value?
👍 |
private ObjectMapper mapper; | ||
|
||
@Inject | ||
public KafkaIndexTaskClientFactory(@Global HttpClient httpClient, ObjectMapper mapper) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You assume the ObjectMapper
is @Json
latter on. Can you annotate it as such here to make sure you always get the json one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Didn't realize until you pointed it out that there was a @smile mapper as well.
Relatively minor comment to help future developers. Otherwise 👍 |
…kLocation may not remain constant
This PR is a fix for the issue described in #2993:
To restate the issue, it is possible that the location of a task (specifically the worker's port) reported by the RTR is incorrect. This may happen if a MM is restarted and recovers the tasks that were running when it was shutdown but chooses different port assignments for the tasks. The RTR's view of the task locations will get updated eventually through ZK announcements, but for a period of time there will be inconsistencies. If we try to communicate with a task during this time, it's possible that we might end up sending a command to an unintended recipient which isn't a great situation.
There's two main changes in this PR:
On the ChatHandler side, if the incoming request has a X-Druid-Task-Id header, validate that this header matches the task's ID and if not return a 404. As part of the response, set the X-Druid-Task-Id header with the task's actual ID.
On the KafkaIndexTaskClient's side, set X-Druid-Task-Id header in the request, and if we get back a 404 and the header in the response doesn't match the ID of the task we thought we were talking to, wait for a bit and retry later.
(2) is complicated by the issue that the previous implementation of KafkaIndexTaskClient assumed that the host and port passed in during a request would stay constant across retries. Since this is no longer the case, in order to keep the retry logic abstracted from the caller, we need to pass in a TaskLocationProvider that's defined by the caller which can be queried on each retry to detect when the host and port have changed.
Also added unit tests for KafkaIndexTaskClient since the logic has become somewhat more complex.
Closes #2993