Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add coordinator API for unused segments #14846

Merged

Conversation

adarshsanjeev
Copy link
Contributor

There is a current issue due to inconsistent metadata between worker and controller in MSQ. A controller can receive one set of segments, which are then marked as unused by, say, a compaction job. The worker would be unable to get the segment information as MetadataResource.

This PR resolves this by allowing the worker to fetch the segment information even if it is unused.

Release Notes

  • Adds includeUnused as an optional parameter to the org.apache.druid.server.http.MetadataResource#getSegment coordinator API. The API will also return unused segments if the parameter is set.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@kfaraz
Copy link
Contributor

kfaraz commented Aug 16, 2023

Do we really want to fetch the information of unused segments in the worker task? Wouldn't the task just discard the segment once it realises that the fetched segment is unused?

@adarshsanjeev
Copy link
Contributor Author

The controller already has a picture of the metadata containing only used segments at that point and assigns segments to the workers. On the worker side, we just want to fetch the segment if it has not been deleted completely, even if it is not used anymore.

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.
Left some comments.

/**
* Fetches segment metadata for the given dataSource and segmentId.
*/
ListenableFuture<DataSegment> fetchSegment(String dataSource, String segmentId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should rename the fetchUsedSegment call and rename that to fetchSegment taking a boolean "includeUnused".

The callers in most places can set this variable to false.

We can javadoc the behavior changes with this parameter.

WDYT ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it a separate API instead of a parameter since this probably shouldn't be used in most other places, so adding an extra parameter for every current call seemed untidy. I am okay with changing it to a parameter since it would cut down on the interface though.

Copy link
Contributor

@cryptoe cryptoe Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are 3 places in production+ test code where you would have to make the changes. I guess it should be fine?

Screenshot 2023-08-18 at 10 21 20 AM

*
* @return DataSegment segment corresponding to given id
*/
DataSegment retrieveSegmentForId(String id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should merge this method with DataSegment retrieveUsedSegmentForId(String id); by taking the same boolean parameter as discussed above.

@abhishekagarwal87
Copy link
Contributor

@kfaraz - Here is a situation. Let's say you launch an MSQ job to read data from a druid table. At this time T0, controller has a view of the used segments that make up the table. Controller will take the ids of these segments and distribute to worker. As workers operating on the segment at time T1, the segment is no longer available. Because the interval, this segment corresponds to, has been compacted. Worker could have discarded the segment if the segment just falls out of the retention window. But here, workers should instead find those new segments. Instead of taking that route, we are going to let the job run on the previous snapshot of the system. For that to happen, MSQ workers need to find those segments that have been marked unused.

@kfaraz
Copy link
Contributor

kfaraz commented Aug 17, 2023

Thanks for the clarification, @abhishekagarwal87 !

Instead of taking that route, we are going to let the job run on the previous snapshot of the system.

If the segments of the previous snapshot (v1) have already been marked unused, that implies that a new version (v2) has completely overshadowed v1. At this point, if we continue with v1 in an MSQ replace job, wouldn't that mean that we would overwrite v2 and potentially miss out on some data that was present in v2 but not in v1?

Also, to avoid this, would a better alternative be to fetch the list of used and non-overshadowed segments in the controller task. IIUC, in the case where new segments of v2 are created (that result in overshadowing of v1) after the controller task has already started should cause the MSQ workers to fail anyway.

cc: @cryptoe

@adarshsanjeev
Copy link
Contributor Author

@kfaraz

Wouldn't that mean that we would overwrite v2 and potentially miss out on some data that was present in v2 but not in v1?Would a better alternative be to fetch the list of used and non-overshadowed segments in the controller task

Yes, it would. Only the controller decides which version to use (which it does by fetching the list of used and non-overshadowed segments). This change is only to the getSegmentById API, which the worker uses with the id from the list of segments which have already been fetched. This is currently the case as well, however, this is causing an issue where the worker is not able to access any segments marked as unused after the controller fetches the segment. This PR changes this behaviour.

the case where new segments of v2 are created (that result in overshadowing of v1) after the controller task has already started should cause the MSQ workers to fail anyway.

That is currently the case, however, this is causing an issue where some tasks like compaction/reindex can stop any MSQ tasks from running. Is there a reason to prefer failing the task over providing the results from the snapshot that the MSQ task was started? I think the snapshot approach makes sense.

@cryptoe
Copy link
Contributor

cryptoe commented Aug 18, 2023

If the segments of the previous snapshot (v1) have already been marked unused, that implies that a new version (v2) has completely overshadowed v1. At this point, if we continue with v1 in an MSQ replace job, wouldn't that mean that we would overwrite v2 and potentially miss out on some data that was present in v2 but not in v1?

Adding on to what @adarshsanjeev said, There are 3 distinct usecases:

  1. Select case --> select * from foo which is okay to be run on the snapshot version of the coordinator since this is also the case of the current query engine.
  2. Insert into new DS -> which is insert into bar select * from foo which is same as a select on point 1.
  3. Insert/Replace into same DS -> insert/replace into foo select * from foo The controller takes a lock on the target data source ie foo hence we cannot have a case where we miss out on data on the same DS as compaction/reindex triggered will cause lock contention.

@kfaraz
Copy link
Contributor

kfaraz commented Aug 18, 2023

Thanks for the clarification, @adarshsanjeev , @cryptoe .
I was thinking about the 3rd case and it makes sense for it to have a lock, thus preventing any overwrites while this task is already in progress.

So I assume this PR is to ensure that cases 1 and 2 work correctly?

@cryptoe
Copy link
Contributor

cryptoe commented Aug 18, 2023

So I assume this PR is to ensure that cases 1 and 2 work correctly?

Exactly 🚀

Copy link
Contributor

@cryptoe cryptoe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left minor comments.
Changes LGTM!!

{
final String path = StringUtils.format(
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s",
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s%s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks better to read. We can adjust %s to true/false based on the flag passed. Wdyt?

Suggested change
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s%s",
"/druid/coordinator/v1/metadata/datasources/%s/segments/%s?includeUnused=%s",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it this way for consistency with the other APIs. The current pattern is to add a flag without value and check if it has been passed or not. ("includeOvershadowedStatus", "full", etc). Do we want to start moving away from this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like

"/druid/coordinator/v1/datasources/%s/handoffComplete?interval=%s&partitionNumber=%d&version=%s",
is more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that. Changing to this pattern.


taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_IS_REINDEX,
MSQControllerTask.isReindexTask(task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets rename this to isReplaceTask ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only for a replace task which reads from the datasource it is replacing, so wouldn't reindex be better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaceInputDataSourceTask? How does that sound ?

@abhishekagarwal87 abhishekagarwal87 merged commit dfb5a98 into apache:master Aug 23, 2023
@LakshSingla LakshSingla added this to the 28.0 milestone Oct 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants