Skip to content

Commit

Permalink
Allow to store query results into a file in Presto on Spark
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Aug 25, 2020
1 parent 6fa3632 commit 6923691
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ protected void setup(Binder binder)
jsonCodecBinder(binder).bindJsonCodec(OperatorStats.class);
jsonCodecBinder(binder).bindJsonCodec(QueryInfo.class);
jsonCodecBinder(binder).bindJsonCodec(PrestoSparkQueryStatusInfo.class);
jsonCodecBinder(binder).bindJsonCodec(PrestoSparkQueryData.class);

// index manager
binder.bind(IndexManager.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public class PrestoSparkQueryExecutionFactory
private final JsonCodec<TaskInfo> taskInfoJsonCodec;
private final JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec;
private final JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec;
private final JsonCodec<PrestoSparkQueryData> queryDataJsonCodec;
private final TransactionManager transactionManager;
private final AccessControl accessControl;
private final Metadata metadata;
Expand All @@ -201,6 +202,7 @@ public PrestoSparkQueryExecutionFactory(
JsonCodec<TaskInfo> taskInfoJsonCodec,
JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec,
JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec,
JsonCodec<PrestoSparkQueryData> queryDataJsonCodec,
TransactionManager transactionManager,
AccessControl accessControl,
Metadata metadata,
Expand All @@ -221,6 +223,7 @@ public PrestoSparkQueryExecutionFactory(
this.taskInfoJsonCodec = requireNonNull(taskInfoJsonCodec, "taskInfoJsonCodec is null");
this.sparkTaskDescriptorJsonCodec = requireNonNull(sparkTaskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null");
this.queryStatusInfoJsonCodec = requireNonNull(queryStatusInfoJsonCodec, "queryStatusInfoJsonCodec is null");
this.queryDataJsonCodec = requireNonNull(queryDataJsonCodec, "queryDataJsonCodec is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.metadata = requireNonNull(metadata, "metadata is null");
Expand All @@ -239,11 +242,13 @@ public IPrestoSparkQueryExecution create(
String sql,
Optional<String> sparkQueueName,
PrestoSparkTaskExecutorFactoryProvider executorFactoryProvider,
Optional<Path> queryStatusInfoOutputPath)
Optional<Path> queryStatusInfoOutputPath,
Optional<Path> queryDataOutputPath)
{
PrestoSparkConfInitializer.checkInitialized(sparkContext);

queryStatusInfoOutputPath.ifPresent(path -> checkArgument(notExists(path), "File already exist: %s", path));
queryDataOutputPath.ifPresent(path -> checkArgument(notExists(path), "File already exist: %s", path));

QueryStateTimer queryStateTimer = new QueryStateTimer(systemTicker());

Expand Down Expand Up @@ -312,12 +317,14 @@ public IPrestoSparkQueryExecution create(
taskInfoJsonCodec,
sparkTaskDescriptorJsonCodec,
queryStatusInfoJsonCodec,
queryDataJsonCodec,
rddFactory,
tableWriteInfo,
transactionManager,
new PagesSerde(blockEncodingManager, Optional.empty(), Optional.empty(), Optional.empty()),
executionExceptionFactory,
queryStatusInfoOutputPath);
queryStatusInfoOutputPath,
queryDataOutputPath);
}
catch (RuntimeException executionFailure) {
queryStateTimer.beginFinishing();
Expand Down Expand Up @@ -356,7 +363,7 @@ public IPrestoSparkQueryExecution create(
Optional.ofNullable(planAndMore),
warningCollector,
OptionalLong.empty());
writePrestoSparkQueryInfo(
writeJsonFile(
queryStatusInfoOutputPath.get(),
prestoSparkQueryStatusInfo,
queryStatusInfoJsonCodec);
Expand Down Expand Up @@ -619,13 +626,10 @@ private static QueryError toQueryError(ExecutionFailureInfo executionFailureInfo
executionFailureInfo.toFailureInfo());
}

private static void writePrestoSparkQueryInfo(
Path queryStatusInfoOutputPath,
PrestoSparkQueryStatusInfo queryInfo,
JsonCodec<PrestoSparkQueryStatusInfo> queryInfoJsonCodec)
private static <T> void writeJsonFile(Path outputPath, T object, JsonCodec<T> codec)
{
try {
Files.write(queryStatusInfoOutputPath, queryInfoJsonCodec.toJsonBytes(queryInfo));
Files.write(outputPath, codec.toJsonBytes(object));
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down Expand Up @@ -654,12 +658,14 @@ public static class PrestoSparkQueryExecution
private final JsonCodec<TaskInfo> taskInfoJsonCodec;
private final JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec;
private final JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec;
private final JsonCodec<PrestoSparkQueryData> queryDataJsonCodec;
private final PrestoSparkRddFactory rddFactory;
private final TableWriteInfo tableWriteInfo;
private final TransactionManager transactionManager;
private final PagesSerde pagesSerde;
private final PrestoSparkExecutionExceptionFactory executionExceptionFactory;
private final Optional<Path> queryStatusInfoOutputPath;
private final Optional<Path> queryDataOutputPath;

private PrestoSparkQueryExecution(
JavaSparkContext sparkContext,
Expand All @@ -678,12 +684,14 @@ private PrestoSparkQueryExecution(
JsonCodec<TaskInfo> taskInfoJsonCodec,
JsonCodec<PrestoSparkTaskDescriptor> sparkTaskDescriptorJsonCodec,
JsonCodec<PrestoSparkQueryStatusInfo> queryStatusInfoJsonCodec,
JsonCodec<PrestoSparkQueryData> queryDataJsonCodec,
PrestoSparkRddFactory rddFactory,
TableWriteInfo tableWriteInfo,
TransactionManager transactionManager,
PagesSerde pagesSerde,
PrestoSparkExecutionExceptionFactory executionExceptionFactory,
Optional<Path> queryStatusInfoOutputPath)
Optional<Path> queryStatusInfoOutputPath,
Optional<Path> queryDataOutputPath)
{
this.sparkContext = requireNonNull(sparkContext, "sparkContext is null");
this.session = requireNonNull(session, "session is null");
Expand All @@ -702,12 +710,14 @@ private PrestoSparkQueryExecution(
this.taskInfoJsonCodec = requireNonNull(taskInfoJsonCodec, "taskInfoJsonCodec is null");
this.sparkTaskDescriptorJsonCodec = requireNonNull(sparkTaskDescriptorJsonCodec, "sparkTaskDescriptorJsonCodec is null");
this.queryStatusInfoJsonCodec = requireNonNull(queryStatusInfoJsonCodec, "queryStatusInfoJsonCodec is null");
this.queryDataJsonCodec = requireNonNull(queryDataJsonCodec, "queryDataJsonCodec is null");
this.rddFactory = requireNonNull(rddFactory, "rddFactory is null");
this.tableWriteInfo = requireNonNull(tableWriteInfo, "tableWriteInfo is null");
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.pagesSerde = requireNonNull(pagesSerde, "pagesSerde is null");
this.executionExceptionFactory = requireNonNull(executionExceptionFactory, "executionExceptionFactory is null");
this.queryStatusInfoOutputPath = requireNonNull(queryStatusInfoOutputPath, "queryStatusInfoOutputPath is null");
this.queryDataOutputPath = requireNonNull(queryDataOutputPath, "queryDataOutputPath is null");
}

@Override
Expand Down Expand Up @@ -805,6 +815,13 @@ else if (executionException instanceof PrestoSparkExecutionException) {
log.error(eventFailure, "Error publishing query completed event");
}

queryDataOutputPath.ifPresent(path -> writeJsonFile(
path,
new PrestoSparkQueryData(
getOutputColumns(planAndMore),
results),
queryDataJsonCodec));

return results;
}

Expand Down Expand Up @@ -926,7 +943,7 @@ private void queryCompletedEvent(Optional<ExecutionFailureInfo> failureInfo, Opt
Optional.of(planAndMore),
warningCollector,
updateCount);
writePrestoSparkQueryInfo(
writeJsonFile(
queryStatusInfoOutputPath.get(),
prestoSparkQueryStatusInfo,
queryStatusInfoJsonCodec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ public MaterializedResult execute(Session session, String sql)
sql,
Optional.empty(),
new TestingPrestoSparkTaskExecutorFactoryProvider(instanceId),
Optional.empty(),
Optional.empty());
List<List<Object>> results = execution.execute();
List<MaterializedRow> rows = results.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ IPrestoSparkQueryExecution create(
String sql,
Optional<String> sparkQueueName,
PrestoSparkTaskExecutorFactoryProvider executorFactoryProvider,
Optional<Path> queryStatusInfoOutputPath);
Optional<Path> queryStatusInfoOutputPath,
Optional<Path> queryDataOutputPath);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spark;

import com.facebook.presto.client.Column;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

import java.util.List;

import static java.util.Objects.requireNonNull;

public class PrestoSparkQueryData
{
private final List<Column> columns;
private final List<List<Object>> data;

@JsonCreator
public PrestoSparkQueryData(
@JsonProperty("columns") List<Column> columns,
@JsonProperty("data") List<List<Object>> data)
{
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
this.data = ImmutableList.copyOf(requireNonNull(data, "data is null"));
}

@JsonProperty
public List<Column> getColumns()
{
return columns;
}

@JsonProperty
public List<List<Object>> getData()
{
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void run()
Optional.empty(),
query,
Optional.empty(),
Optional.empty(),
Optional.empty());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public void run(
Optional<String> traceToken,
String query,
Optional<String> sparkQueueName,
Optional<Path> queryStatusInfoOutputPath)
Optional<Path> queryStatusInfoOutputPath,
Optional<Path> queryDataOutputPath)
{
IPrestoSparkQueryExecutionFactory queryExecutionFactory = driverPrestoSparkService.getQueryExecutionFactory();

Expand All @@ -103,7 +104,8 @@ public void run(
query,
sparkQueueName,
new DistributionBasedPrestoSparkTaskExecutorFactoryProvider(distribution),
queryStatusInfoOutputPath);
queryStatusInfoOutputPath,
queryDataOutputPath);

List<List<Object>> results = queryExecution.execute();

Expand Down

0 comments on commit 6923691

Please sign in to comment.