-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MSQ profile for Brokers and Historicals. #17140
Conversation
This patch adds a profile of MSQ named "Dart" that runs on Brokers and Historicals, and which is compatible with the standard SQL query API. For more high-level description, and notes on future work, refer to apache#17139. This patch contains the following changes, grouped into packages. Controller (org.apache.druid.msq.dart.controller): The controller runs on Brokers. Main classes are, - DartSqlResource, which serves /druid/v2/sql/dart/. - DartSqlEngine and DartQueryMaker, the entry points from SQL that actually run the MSQ controller code. - DartControllerContext, which configures the MSQ controller. - DartMessageRelays, which sets up relays (see "message relays" below) to read messages from workers' DartControllerClients. - DartTableInputSpecSlicer, which assigns work based on a TimelineServerView. Worker (org.apache.druid.msq.dart.worker) The worker runs on Historicals. Main classes are, - DartWorkerResource, which supplies the regular MSQ WorkerResource, plus Dart-specific APIs. - DartWorkerRunner, which runs MSQ worker code. - DartWorkerContext, which configures the MSQ worker. - DartProcessingBuffersProvider, which provides processing buffers from sliced-up merge buffers. - DartDataSegmentProvider, which provides segments from the Historical's local cache. Message relays (org.apache.druid.messages): To avoid the need for Historicals to contact Brokers during a query, which would create opportunities for queries to get stuck, all connections are opened from Broker to Historical. This is made possible by a message relay system, where the relay server (worker) has an outbox of messages. The relay client (controller) connects to the outbox and retrieves messages. Code for this system lives in the "server" package to keep it separate from the MSQ extension and make it easier to maintain. The worker-to-controller ControllerClient is implemented using message relays. Other changes: - Controller: Added the method "hasWorker". Used by the ControllerMessageListener to notify the appropriate controllers when a worker fails. - WorkerResource: No longer tries to respond more than once in the "httpGetChannelData" API. This comes up when a response due to resolved future is ready at about the same time as a timeout occurs. - MSQTaskQueryMaker: Refactor to separate out some useful functions for reuse in DartQueryMaker. - SqlEngine: Add "queryContext" to "resultTypeForSelect" and "resultTypeForInsert". This allows the DartSqlEngine to modify result format based on whether a "fullReport" context parameter is set. - LimitedOutputStream: New utility class. Used when in "fullReport" mode. - TimelineServerView: Add getDruidServerMetadata as a performance optimization. - CliHistorical: Add SegmentWrangler, so it can query inline data, lookups, etc. - ServiceLocation: Add "fromUri" method, relocating some code from ServiceClientImpl. - FixedServiceLocator: New locator for a fixed set of service locations. Useful for URI locations.
Marked draft since more test coverage is needed, although the main code is reviewable. Please refer to #17139 for a high-level description of the proposed change. |
...multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerServerId.java
Fixed
Show fixed
Hide fixed
...ulti-stage-query/src/main/java/org/apache/druid/msq/dart/worker/http/DartWorkerResource.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/messages/client/MessageRelays.java
Fixed
Show fixed
Hide fixed
RelDataType resultTypeForInsert(RelDataTypeFactory typeFactory, RelDataType validatedRowType); | ||
RelDataType resultTypeForInsert( | ||
RelDataTypeFactory typeFactory, | ||
RelDataType validatedRowType, |
Check notice
Code scanning / CodeQL
Useless parameter Note
RelDataType resultTypeForInsert( | ||
RelDataTypeFactory typeFactory, | ||
RelDataType validatedRowType, | ||
Map<String, Object> queryContext |
Check notice
Code scanning / CodeQL
Useless parameter Note
@GET | ||
@Produces(MediaType.APPLICATION_JSON) | ||
public GetQueriesResponse doGetRunningQueries( | ||
@QueryParam("selfOnly") final String selfOnly, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: boolean might be more appropriate perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with String
since that way the API can be /druid/v2/sql/dart/?selfOnly
(in this case a string would be set to ""
, so we can check for non-null string). There might be a cleaner way to do this, I am not sure, but this approach is used elsewhere in the codebase. If you know a better way please LMK.
public LimitedOutputStream(OutputStream out, long limit, Function<Long, String> exceptionMessageFn) | ||
{ | ||
this.out = out; | ||
this.limit = limit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can add a precondition for limit being >= 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
public String getDartQueryId() | ||
{ | ||
return dartQueryId; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it is better to re-use the QueryResourceId
that was added as a part of the merge-buffer allocation code instead of creating a dart query id. Both look semantically equivalent to me - a unique id that indicates the resources that it takes. Or is it better to have them separately, just in case we need them to diverge later?
In any case, should we have a separate class that explains the usages of the dart query id, why it differs and who populates it in a single place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, you're right that it's kind of the same idea. The downside of using QueryResourceId
is that it has a javadoc explaining in detail how it's set for native queries, and adding Dart stuff to that existing javadoc might make it confusing.
The dartQueryId
context parameter is simpler than the QueryResourceId
as well, it's only set in one place (DartSqlResource
) and only read in one place (DartQueryMaker
) rather than potentially being set and read in multiple places. It might make sense to have a class to wrap this, similar to QueryResourceId
, although I am not sure if it's worth it given the simpler nature.
For now I added this javadoc to DartSqlEngine#CTX_DART_QUERY_ID
:
/**
* Dart queryId must be globally unique, so we cannot use the user-provided {@link QueryContexts#CTX_SQL_QUERY_ID}
* or {@link BaseQuery#QUERY_ID}. Instead we generate a UUID in {@link DartSqlResource#doPost}, overriding whatever
* the user may have provided. This becomes the {@link Controller#queryId()}.
*
* The user-provided {@link QueryContexts#CTX_SQL_QUERY_ID} is still registered with the {@link SqlLifecycleManager}
* for purposes of query cancellation.
*
* The user-provided {@link BaseQuery#QUERY_ID} is ignored.
*/
public static final String CTX_DART_QUERY_ID = "dartQueryId";
final DartWorkerClient workerClient | ||
) | ||
{ | ||
this.workerIds = workerIds; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instances of this class seem to be accessed by multiple threads. Given that the worker ids is passed to it through the caller, we should probably defensively wrap them in a synchronized collection to prevent any accidental mutations from the callers of getWorkerIds
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok, because the workerIds
list is not modified throughout the lifecycle of the query.
* Add a controller. Throws {@link DruidException} if a controller with the same {@link Controller#queryId()} is | ||
* already registered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the dartQueryId in place, should this be indexed based on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is indexed by that currently, since the queryId()
on the controller is set to the dartQueryId
. I'll add a comment to the javadoc for Controller#queryId()
clarifying that.
/** | ||
* Remove a controller from the registry. | ||
*/ | ||
void remove(ControllerHolder holder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it sufficient to send the query id argument instead of the holder, as done with get(queryId)
?
void remove(ControllerHolder holder); | |
void remove(String id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's like this mostly because it's simpler for the calling code to have remove
accept the entire holder. The calling code is doing something like:
registry.register(holder);
// ... do stuff ...
registry.remove(holder);
So there is symmetry between register
and remove
.
|
||
@Provides | ||
@LazySingleton | ||
public ProcessingBuffersProvider createProcessingBuffersProvider( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be bound to @Dart
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't necessary right now, but might be useful later so I did add the annotation.
processing/src/main/java/org/apache/druid/io/LimitedOutputStream.java
Fixed
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments here and there.
@Override | ||
public List<ResourceAction> getQueryPermissions(String queryId) | ||
{ | ||
return getAdminPermissions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so only an admin user can run dart queries right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just for the "internal" APIs like the controller->worker APIs. End users would go through the DartSqlResource
, which doesn't use this class. I will add comments to clarify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be very clear, the permission model for Dart queries is the same as regular SQL queries. If Dart is enabled then any regular user can issue queries against the tables that they have permissions for.
workerIds.add(WorkerId.fromDruidServerMetadata(server, queryId).toString()); | ||
} | ||
|
||
// Shuffle workerIds, so we don't bias a single server to always be worker 0 (which tends to do more work). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is worker 0 doing more work? because its the non-leaf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be the leaf as well as non leaf. Like you would always have atleast one worker in each stage and that would be worker 0 so shuffling serverId's helps us not load one server I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Expanded the comment to say why:
Shuffle workerIds, so we don't bias towards specific servers when running multiple queries concurrently. For any given query, lower-numbered workers tend to do more work, because the controller prefers using lower-numbered workers when maxWorkerCount for a stage is less than the total number of workers.
/** | ||
* Registry for actively-running {@link Controller}. | ||
*/ | ||
public interface DartControllerRegistry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a need for this interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I guess not. I split it because I thought it was going to be useful in testing, but it didn't turn out to be. I'll collapse them.
|
||
// Block until messages are acknowledged. | ||
try { | ||
FutureUtils.getUnchecked(Futures.successfulAsList(futures), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should have some defensive timeout here to avoid blocking forever
if (dartQueryId instanceof String) { | ||
final ControllerHolder holder = controllerRegistry.get((String) dartQueryId); | ||
if (holder != null) { | ||
found = true; | ||
holder.cancel(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we log when the dartQueryId is not of a string type or is null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice for this logic to sit inside the cancellable itself but that would require implementing a DartStatement I suppose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we log when the dartQueryId is not of a string type or is null
Yes, added.
It would be nice for this logic to sit inside the cancellable itself but that would require implementing a DartStatement I suppose.
Yeah it would be a new type of statement. But the way the statement stuff is designed, it isn't easily extensible, so it would be a bigger project to add new statement types. Even the SqlStatementResource
(MSQ async query resource) uses HttpStatement
.
reportFuture = controllerExecutor.submit(() -> { | ||
try { | ||
final ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
final TaskReportQueryListener queryListener = new TaskReportQueryListener( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should these classes (TaskReportQueryListener) be renamed since now they are used outside tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. It's still writing a thing called a MSQTaskReport
that extends TaskReport
. So I think the name makes sense. However, it might make sense to define some common object that is used by the MSQTaskReport
container and the Dart API. If it's ok I'd like to leave this thought for the future.
@Override | ||
public void validateContext(Map<String, Object> queryContext) | ||
{ | ||
SqlEngines.validateNoSpecialContextKeys(queryContext, MSQTaskSqlEngine.SYSTEM_CONTEXT_PARAMETERS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO this class should have its own copy of MSQTaskSqlEngine.SYSTEM_CONTEXT_PARAMETERS
or maybe add a comment in the MSQTaskSqlEngine that these context parameters are disallowed for dart engine too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's good to have a shared static, so when keys are added we don't forget to add them in both places. So, I added a javadoc comment to MSQTaskSqlEngine.SYSTEM_CONTEXT_PARAMETERS
mentioning it's used in Dart also.
final List<ProcessingBuffers> pool = new ArrayList<>(poolSize); | ||
|
||
for (int i = 0; i < poolSize; i++) { | ||
final int sliceSize = buffer.capacity() / poolSize / processingThreads; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be moved out of this loop
return getClient(workerId).asyncRequest( | ||
new RequestBuilder(HttpMethod.POST, "/stop"), | ||
IgnoreHttpResponseHandler.INSTANCE | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we remove the worker from the map?
/** | ||
* Relays run on clients, and receive messages from a server. | ||
* Uses {@link MessageRelayClient} to communicate with the {@link MessageRelayResource} on a server. | ||
* that flows upstream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it incomplete?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it's just erroneous text. I deleted it.
In future, we should look into merging message relays with HTTP sync framework that is used by HTTP based segment and task management. |
Could you elaborate? Do you mean use the same base client object like |
{ | ||
// Fail workers when they're added, because when they're added, they shouldn't be running anything. If they are | ||
// running something, cancel it. | ||
workerFailed(node); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem to generate the right error message if the server is added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I just removed this, I think it isn't needed anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to have an "uber-doc" on how the message relays can be utilized and a high-level flow.
In future, can stuff like HttpServerInventoryView
be refactored to utilize these methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 even I found it confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a package-level javadoc about that to server/src/main/java/org/apache/druid/messages/package-info.java
. Yeah, I think it could be used in the future by the inventory view. Here's the comment:
/**
* Message relays provide a mechanism to send messages from server to client using long polling. The messages are
* sent in order, with acknowledgements from client to server when a message has been successfully delivered.
*
* This is useful when there is some need for some "downstream" servers to send low-latency messages to some
* "upstream" server, but where establishing connections from downstream servers to upstream servers would not be
* desirable. This is typically done when upstream servers want to keep state in-memory that is updated incrementally
* by downstream servers, and where there may be lots of instances of downstream servers.
*
* This structure has two main benefits. First, it prevents upstream servers from being overwhelmed by connections
* from downstream servers. Second, it allows upstream servers to drive the updates of their own state, and better
* handle events like restarts and leader changes.
*
* On the downstream (server) side, messages are placed into an {@link org.apache.druid.messages.server.Outbox}
* and served by a {@link org.apache.druid.messages.server.MessageRelayResource}.
*
* On the upstream (client) side, messages are retrieved by {@link org.apache.druid.messages.client.MessageRelays}
* using {@link org.apache.druid.messages.client.MessageRelayClient}.
*
* This is currently used by Dart (multi-stage-query engine running on Brokers and Historicals) to implement
* worker-to-controller messages. In the future it may also be used to implement
* {@link org.apache.druid.server.coordination.ChangeRequestHttpSyncer}.
*/
import org.apache.druid.messages.client.MessageRelay; | ||
|
||
/** | ||
* An outbox for messages sent from servers to clients. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Is outbox a batching mechanism, or also a way to order through the incoming messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does do batching but it's also order-preserving. I added a comment about that.
|
||
public OutboxQueue() | ||
{ | ||
this.epoch = ThreadLocalRandom.current().nextLong() & Long.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if its simpler to have an auto-incrementing counter here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also needs to be different from a previous version from the same host after a JVM reboot, so it can't simply start at zero. I figured a random 63 bit int would be sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor comments. LGTM
🚀
/** | ||
* Query has been canceled. | ||
*/ | ||
CANCELED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a logic state called finished ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Controllers are removed as soon as they finish, so a finished state isn't necessary.
|
||
private void workerFailed(final DruidNode node) | ||
{ | ||
for (final ControllerHolder holder : controllerRegistry.getAllHolders()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be api's on the worker, to cancel all work. Lets say a broker switch happens and we donot have the controller for what ever reason, In that case, all dart work on the historical should stop no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That happens in the DartWorkerRunner#BrokerListener
, which is code on the worker side. It cancels all work associated with a given Broker when that Broker goes offline.
workerIds.add(WorkerId.fromDruidServerMetadata(server, queryId).toString()); | ||
} | ||
|
||
// Shuffle workerIds, so we don't bias a single server to always be worker 0 (which tends to do more work). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be the leaf as well as non leaf. Like you would always have atleast one worker in each stage and that would be worker 0 so shuffling serverId's helps us not load one server I think.
/** | ||
* Production implementation of {@link DartControllerRegistry}. | ||
*/ | ||
public class DartControllerRegistryImpl implements DartControllerRegistry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would this work in multiple brokers?
What would happen if the broker gets nuked.
Shoudn't it be backed by persistent store like
- MYSQL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO its ok for the query to fail if the broker restarts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes my thinking was the Dart queries are more ephemeral in nature.
/** | ||
* Returns whether this controllerId replaces another one, i.e., if the host is the same and epoch is greater. | ||
*/ | ||
public boolean replaces(final ControllerServerId otherId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could not find references of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe newer/latest is a better method name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it turns out this was left over from a previous iteration. I've removed the entire class. Things that used it just use the controller host by itself now, which is a String
.
int nextRoundRobinWorker = 0; | ||
for (final QueryableDataSegment segment : prunedSegments) { | ||
final int worker; | ||
if (segment.getWorkerNumber() == UNKNOWN) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we prune out such segments ?, until we hook up downloading segments on demand on historicals ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that at this point, it's better to fail the query if there is some race (or even bug?) that leads to an UNKNOWN
worker. If we pruned the segment out then we might get partial results without realizing it.
import java.util.stream.Collectors; | ||
|
||
@Path(DartSqlResource.PATH + '/') | ||
public class DartSqlResource extends SqlResource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shoudn't SQLStatmentResource
be a better endpoint for this ?
The execEngine can be : "dart" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted Dart queries to be API-compatible with the regular SQL API at /druid/v2/sql/
. I added a comment about this:
/**
* Resource for Dart queries. API-compatible with {@link SqlResource}, so clients can be pointed from
* {@code /druid/v2/sql/} to {@code /druid/v2/sql/dart/} without code changes.
*/
@Path(DartSqlResource.PATH + '/')
public class DartSqlResource extends SqlResource
I do also think it would make sense to add an engine option to SqlStatementResource
in the future.
* | ||
* @return future that resolves to the next batch of messages | ||
*/ | ||
ListenableFuture<MessageBatch<MessageType>> getMessages(String clientHost, long epoch, long startWatermark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its unclear here that if the order is maintained.
This looks like something that the inventory view would use in the future no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Order is maintained. I've added a comment about that. And yeah, it could be used in the future by the inventory view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 even I found it confusing.
|
||
import java.util.Objects; | ||
|
||
public class QueryableDataSegment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Could we rename this class to something Dart or MSQ specific since worker is a concept limit to those engines ?
We have a lot of similar DataSegment classes which causes confusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to DartQueryableSegment
.
No, I was referring to https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java |
- Name controller threads properly. - Close worker clients when workers go offline or when the controller exits. Worker -> controller: - Cancel futures when the controller goes offline. Worker -> worker: - Close workerClient when a worker is canceled.
…while writing them.
server/src/main/java/org/apache/druid/messages/client/MessageListener.java
Dismissed
Show dismissed
Hide dismissed
server/src/main/java/org/apache/druid/messages/server/OutboxImpl.java
Dismissed
Show dismissed
Hide dismissed
Thank you for the reviews @abhishekagarwal87 @cryptoe @LakshSingla. |
This patch adds a profile of MSQ named "Dart" that runs on Brokers and Historicals, and which is compatible with the standard SQL query API. For more high-level description, and notes on future work, refer to apache#17139. This patch contains the following changes, grouped into packages. Controller (org.apache.druid.msq.dart.controller): The controller runs on Brokers. Main classes are, - DartSqlResource, which serves /druid/v2/sql/dart/. - DartSqlEngine and DartQueryMaker, the entry points from SQL that actually run the MSQ controller code. - DartControllerContext, which configures the MSQ controller. - DartMessageRelays, which sets up relays (see "message relays" below) to read messages from workers' DartControllerClients. - DartTableInputSpecSlicer, which assigns work based on a TimelineServerView. Worker (org.apache.druid.msq.dart.worker) The worker runs on Historicals. Main classes are, - DartWorkerResource, which supplies the regular MSQ WorkerResource, plus Dart-specific APIs. - DartWorkerRunner, which runs MSQ worker code. - DartWorkerContext, which configures the MSQ worker. - DartProcessingBuffersProvider, which provides processing buffers from sliced-up merge buffers. - DartDataSegmentProvider, which provides segments from the Historical's local cache. Message relays (org.apache.druid.messages): To avoid the need for Historicals to contact Brokers during a query, which would create opportunities for queries to get stuck, all connections are opened from Broker to Historical. This is made possible by a message relay system, where the relay server (worker) has an outbox of messages. The relay client (controller) connects to the outbox and retrieves messages. Code for this system lives in the "server" package to keep it separate from the MSQ extension and make it easier to maintain. The worker-to-controller ControllerClient is implemented using message relays. Other changes: - Controller: Added the method "hasWorker". Used by the ControllerMessageListener to notify the appropriate controllers when a worker fails. - WorkerResource: No longer tries to respond more than once in the "httpGetChannelData" API. This comes up when a response due to resolved future is ready at about the same time as a timeout occurs. - MSQTaskQueryMaker: Refactor to separate out some useful functions for reuse in DartQueryMaker. - SqlEngine: Add "queryContext" to "resultTypeForSelect" and "resultTypeForInsert". This allows the DartSqlEngine to modify result format based on whether a "fullReport" context parameter is set. - LimitedOutputStream: New utility class. Used when in "fullReport" mode. - TimelineServerView: Add getDruidServerMetadata as a performance optimization. - CliHistorical: Add SegmentWrangler, so it can query inline data, lookups, etc. - ServiceLocation: Add "fromUri" method, relocating some code from ServiceClientImpl. - FixedServiceLocator: New locator for a fixed set of service locations. Useful for URI locations.
The timeout handler should fire if the response has not been handled yet (i.e. if responseResolved was previously false). However, it erroneously fires only if the response *was* handled. This causes HTTP 500 errors if the timeout actually does fire. The timeout is 30 seconds, which can be hit during pipelined queries, if an earlier stage of the query hasn't produced its first frame within 30 seconds. This fixes a regression introduced in apache#17140.
The timeout handler should fire if the response has not been handled yet (i.e. if responseResolved was previously false). However, it erroneously fires only if the response *was* handled. This causes HTTP 500 errors if the timeout actually does fire. The timeout is 30 seconds, which can be hit during pipelined queries, if an earlier stage of the query hasn't produced its first frame within 30 seconds. This fixes a regression introduced in #17140.
…he#17328) The timeout handler should fire if the response has not been handled yet (i.e. if responseResolved was previously false). However, it erroneously fires only if the response *was* handled. This causes HTTP 500 errors if the timeout actually does fire. The timeout is 30 seconds, which can be hit during pipelined queries, if an earlier stage of the query hasn't produced its first frame within 30 seconds. This fixes a regression introduced in apache#17140.
…) (#17330) The timeout handler should fire if the response has not been handled yet (i.e. if responseResolved was previously false). However, it erroneously fires only if the response *was* handled. This causes HTTP 500 errors if the timeout actually does fire. The timeout is 30 seconds, which can be hit during pipelined queries, if an earlier stage of the query hasn't produced its first frame within 30 seconds. This fixes a regression introduced in #17140. Co-authored-by: Gian Merlino <[email protected]>
This patch adds a profile of MSQ named "Dart" that runs on Brokers and Historicals, and which is compatible with the standard SQL query API. For more high-level description, and notes on future work, refer to #17139.
This patch contains the following changes, grouped into packages.
Controller (org.apache.druid.msq.dart.controller):
The controller runs on Brokers. Main classes are,
Worker (org.apache.druid.msq.dart.worker)
The worker runs on Historicals. Main classes are,
Message relays (org.apache.druid.messages):
To avoid the need for Historicals to contact Brokers during a query, which would create opportunities for queries to get stuck, all connections are opened from Broker to Historical. This is made possible by a message relay system, where the relay server (worker) has an outbox of messages.
The relay client (controller) connects to the outbox and retrieves messages. Code for this system lives in the "server" package to keep it separate from the MSQ extension and make it easier to maintain. The worker-to-controller ControllerClient is implemented using message relays.
Other changes: