Skip to content

Commit

Permalink
Session properties not needed for conversion, removing from request
Browse files Browse the repository at this point in the history
  • Loading branch information
BryanCutler committed Sep 12, 2024
1 parent bf44304 commit b5468e9
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ public SpiCheckerAdapter(com.facebook.presto.spi.plan.PlanChecker checker)
@Override
public void validate(PlanNode planNode, Session session, Metadata metadata, WarningCollector warningCollector)
{
checker.validate(planNode, session.toConnectorSession(), warningCollector);
checker.validate(planNode, warningCollector);
}

@Override
public void validateFragment(PlanFragment planFragment, Session session, Metadata metadata, WarningCollector warningCollector)
{
checker.validateFragment(toSimplePlanFragment(planFragment), session.toConnectorSession(), warningCollector);
checker.validateFragment(toSimplePlanFragment(planFragment), warningCollector);
}

private SimplePlanFragment toSimplePlanFragment(PlanFragment planFragment)
Expand Down
24 changes: 5 additions & 19 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1425,37 +1425,23 @@ void PrestoServer::convertToVeloxPlan(
std::string planFragmentJson = oss.str();
protocol::PlanFragment planFragment = json::parse(planFragmentJson);

// Populate Session values from headers
protocol::SessionRepresentation session = protocol::SessionRepresentation();
session.queryId = headers.getSingleOrEmpty(protocol::PRESTO_QUERY_ID_HEADER);
if (headers.exists(protocol::PRESTO_TIME_ZONE_KEY_HEADER)) {
session.timeZoneKey = strtol(
headers.getSingleOrEmpty(protocol::PRESTO_TIME_ZONE_KEY_HEADER).c_str(),
nullptr,
10);
VELOX_CHECK(errno != ERANGE, "TimeZoneKey is out of range");
}

// Create a task id for a query context needed for plan conversion
protocol::TaskId taskId = session.queryId + ".0.0.0";
std::shared_ptr<velox::core::QueryCtx> queryCtx =
taskManager_->getQueryContextManager()->findOrCreateQueryCtx(
taskId, session);
auto queryCtx = core::QueryCtx::create();
VeloxInteractiveQueryPlanConverter converter(queryCtx.get(), pool_.get());

// Empty TableWriteInfo needed for plan conversion
// Create static taskId and empty TableWriteInfo needed for plan conversion
protocol::TaskId taskId = "velox-plan-conversion.0.0.0";
auto tableWriteInfo = std::make_shared<protocol::TableWriteInfo>();

// Convert to Velox plan to validate
// Attempt to convert the plan fragment to a Velox plan
converter.toVeloxQueryPlan(planFragment, tableWriteInfo, taskId);

// TODO: send converted plan back in response
} catch (VeloxException& e) {
error = e.message();
} catch (std::exception& e) {
error = e.what();
}

// Return ok status if conversion succeeded or error if failed
if (error.empty()) {
http::sendOkResponse(downstream, json(R"({ "status": "ok" })"));
} else {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
Expand Down Expand Up @@ -85,20 +84,20 @@ private URL getPlanValidateUrl()
}

@Override
public void validate(PlanNode planNode, ConnectorSession session, WarningCollector warningCollector)
public void validate(PlanNode planNode, WarningCollector warningCollector)
{
// NO-OP, only validating fragments
}

@Override
public void validateFragment(SimplePlanFragment planFragment, ConnectorSession connectorSession, WarningCollector warningCollector)
public void validateFragment(SimplePlanFragment planFragment, WarningCollector warningCollector)
{
try {
if (!planFragment.getPartitioning().isCoordinatorOnly() && !isInternalSystemConnector(planFragment.getRoot())) {
runValidation(planFragment, connectorSession);
runValidation(planFragment);
}
else if (LOG.isDebugEnabled()) {
LOG.debug("Skipping Native Plan Validation for query: %s, plan: %s", connectorSession.getQueryId(), planFragment.getId());
LOG.debug("Skipping Native Plan Validation for plan fragment id: %s", planFragment.getId());
}
}
catch (final PrestoException e) {
Expand All @@ -125,41 +124,36 @@ private boolean isInternalSystemConnector(PlanNode planNode)
}
}

private void runValidation(SimplePlanFragment planFragment, ConnectorSession connectorSession)
private void runValidation(SimplePlanFragment planFragment)
throws IOException
{
LOG.debug("Starting native plan validation, query: %s, plan: %s", connectorSession.getQueryId(), planFragment.getId());
LOG.debug("Starting native plan validation for plan fragment id: %s", planFragment.getId());

String requestBodyJson = planFragmentJsonCodec.toJson(planFragment);
LOG.debug("Native validation for plan fragment: %s", requestBodyJson);

Request request = buildRequest(connectorSession, requestBodyJson);
Request request = buildRequest(requestBodyJson);

try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
String responseBody = response.body() != null ? response.body().string() : "{}";
LOG.error("Query failed by native plan checker with code: %d, response: %s", response.code(), responseBody);
LOG.error("Native plan checker failed with code: %d, response: %s", response.code(), responseBody);
if (config.isQueryFailOnError()) {
throw new PrestoException(QUERY_REJECTED, "Query failed by native plan checker with code: " + response.code() + ", message: " + responseBody);
throw new PrestoException(QUERY_REJECTED, "Query failed by native plan checker with code: " + response.code() + ", response: " + responseBody);
}
}
}

LOG.debug("Native plan validation successful, query: %s, plan: %s", connectorSession.getQueryId(), planFragment.getId());
LOG.debug("Native plan validation complete for plan fragment id: %s, successful=%s failOnError=%s", planFragment.getId(), response.isSuccessful(), config.isQueryFailOnError());
}
}

private Request buildRequest(ConnectorSession connectorSession, String requestBodyJson)
private Request buildRequest(String requestBodyJson)
{
Request.Builder builder = new Request.Builder()
.url(getPlanValidateUrl())
.addHeader("CONTENT_TYPE", "APPLICATION_JSON")
.post(RequestBody.create(JSON_CONTENT_TYPE, requestBodyJson));

// Add Session values to header
builder.addHeader(PRESTO_QUERY_ID, connectorSession.getQueryId());
builder.addHeader(PRESTO_TIME_ZONE, String.valueOf(connectorSession.getTimeZoneKey().getKey()));
// TODO: systemProperties, catalogProperties not in ConnectorSession

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
package com.facebook.presto.plancheckerproviders;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.common.type.TimeZoneKey;
import com.facebook.presto.plancheckerproviders.nativechecker.NativePlanChecker;
import com.facebook.presto.plancheckerproviders.nativechecker.NativePlanCheckerConfig;
import com.facebook.presto.plancheckerproviders.nativechecker.NativePlanCheckerProvider;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.NodePoolType;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.plan.PartitioningHandle;
import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.plan.SimplePlanFragment;
Expand Down Expand Up @@ -73,9 +72,6 @@ public void testNativePlanMockValidate()
SimplePlanFragment fragmentMock = mock(SimplePlanFragment.class);
when(fragmentMock.getRoot()).thenReturn(nodeMock);
when(fragmentMock.getPartitioning()).thenReturn(handleMock);
ConnectorSession sessionMock = mock(ConnectorSession.class);
when(sessionMock.getQueryId()).thenReturn("nativePlanValidate-test-queryId-0001");
when(sessionMock.getTimeZoneKey()).thenReturn(TimeZoneKey.UTC_KEY);

JsonCodec<SimplePlanFragment> jsonCodecMock = spy(PLAN_FRAGMENT_JSON_CODEC);
when(jsonCodecMock.toJson(any(SimplePlanFragment.class))).thenReturn("{}");
Expand All @@ -87,12 +83,12 @@ public void testNativePlanMockValidate()
NativePlanChecker checker = new NativePlanChecker(nodeManager, jsonCodecMock, config);

server.enqueue(new MockResponse().setBody("{ \"status\": \"ok\" }"));
checker.validateFragment(fragmentMock, sessionMock, null);
checker.validateFragment(fragmentMock, null);

config.setQueryFailOnError(true);
server.enqueue(new MockResponse().setResponseCode(500).setBody("{ \"error\": \"fubar\" }"));
assertThrows(IllegalStateException.class,
() -> checker.validateFragment(fragmentMock, sessionMock, null));
assertThrows(PrestoException.class,
() -> checker.validateFragment(fragmentMock, null));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@

package com.facebook.presto.spi.plan;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.WarningCollector;

public interface PlanChecker
{
void validate(PlanNode planNode, ConnectorSession session, WarningCollector warningCollector);
void validate(PlanNode planNode, WarningCollector warningCollector);

default void validateFragment(SimplePlanFragment planFragment, ConnectorSession session, WarningCollector warningCollector)
default void validateFragment(SimplePlanFragment planFragment, WarningCollector warningCollector)
{
validate(planFragment.getRoot(), session, warningCollector);
validate(planFragment.getRoot(), warningCollector);
}
}

0 comments on commit b5468e9

Please sign in to comment.