From d8dc7e3494ac9ca6e50664aac676d73190260493 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 5 Jun 2023 11:55:03 +0530 Subject: [PATCH 1/5] Limit select results in MSQ --- .../apache/druid/msq/exec/ControllerImpl.java | 124 +++++++++--------- .../org/apache/druid/msq/exec/Limits.java | 7 + .../apache/druid/msq/exec/MSQSelectTest.java | 63 +++++++++ .../apache/druid/msq/test/MSQTestBase.java | 2 +- 4 files changed, 134 insertions(+), 62 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index b39249d8111f..70ee715fa998 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1393,67 +1393,69 @@ private Yielder getFinalResultsYielder( return Yielders.each( Sequences.concat( - StreamSupport.stream(queryKernel.getResultPartitionsForStage(finalStageId).spliterator(), false) - .map( - readablePartition -> { - try { - return new FrameChannelSequence( - inputChannels.openChannel( - new StagePartition( - queryKernel.getStageDefinition(finalStageId).getId(), - readablePartition.getPartitionNumber() - ) - ) - ); - } - catch (IOException e) { - throw new RuntimeException(e); - } - } - ).collect(Collectors.toList()) - ).flatMap( - frame -> { - final Cursor cursor = FrameProcessors.makeCursor( - frame, - queryKernel.getStageDefinition(finalStageId).getFrameReader() - ); - - final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); - final ColumnMappings columnMappings = task.getQuerySpec().getColumnMappings(); - @SuppressWarnings("rawtypes") - final List selectors = - columnMappings.getMappings() - .stream() - .map( - mapping -> - columnSelectorFactory.makeColumnValueSelector(mapping.getQueryColumn()) - ).collect(Collectors.toList()); - - final List sqlTypeNames = task.getSqlTypeNames(); - final List retVal = new ArrayList<>(); - while (!cursor.isDone()) { - final Object[] row = new Object[columnMappings.size()]; - for (int i = 0; i < row.length; i++) { - final Object value = selectors.get(i).getObject(); - if (sqlTypeNames == null || task.getSqlResultsContext() == null) { - // SQL type unknown, or no SQL results context: pass-through as is. - row[i] = value; - } else { - row[i] = SqlResults.coerce( - context.jsonMapper(), - task.getSqlResultsContext(), - value, - sqlTypeNames.get(i) - ); - } - } - retVal.add(row); - cursor.advance(); - } - - return Sequences.simple(retVal); - } - ).withBaggage(resultReaderExec::shutdownNow) + StreamSupport.stream(queryKernel.getResultPartitionsForStage(finalStageId).spliterator(), false) + .map( + readablePartition -> { + try { + return new FrameChannelSequence( + inputChannels.openChannel( + new StagePartition( + queryKernel.getStageDefinition(finalStageId).getId(), + readablePartition.getPartitionNumber() + ) + ) + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + ).collect(Collectors.toList()) + ).flatMap( + frame -> { + final Cursor cursor = FrameProcessors.makeCursor( + frame, + queryKernel.getStageDefinition(finalStageId).getFrameReader() + ); + + final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); + final ColumnMappings columnMappings = task.getQuerySpec().getColumnMappings(); + @SuppressWarnings("rawtypes") + final List selectors = + columnMappings.getMappings() + .stream() + .map( + mapping -> + columnSelectorFactory.makeColumnValueSelector(mapping.getQueryColumn()) + ).collect(Collectors.toList()); + + final List sqlTypeNames = task.getSqlTypeNames(); + final List retVal = new ArrayList<>(); + while (!cursor.isDone()) { + final Object[] row = new Object[columnMappings.size()]; + for (int i = 0; i < row.length; i++) { + final Object value = selectors.get(i).getObject(); + if (sqlTypeNames == null || task.getSqlResultsContext() == null) { + // SQL type unknown, or no SQL results context: pass-through as is. + row[i] = value; + } else { + row[i] = SqlResults.coerce( + context.jsonMapper(), + task.getSqlResultsContext(), + value, + sqlTypeNames.get(i) + ); + } + } + retVal.add(row); + cursor.advance(); + } + + return Sequences.simple(retVal); + } + ) + .limit(Limits.MAX_SELECT_RESULT_ROWS) + .withBaggage(resultReaderExec::shutdownNow) ); } else { return null; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java index c946fd796c23..18c5024dfbed 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java @@ -90,4 +90,11 @@ public class Limits * {@link ClusterStatisticsMergeMode#SEQUENTIAL} mode is chosen. */ public static final long MAX_WORKERS_FOR_PARALLEL_MERGE = 100; + + /** + * Max number of rows in the query reports of the SELECT queries run by MSQ. This ensures that the reports donot blow + * up for queries operating on larger datasets. The full result of the select query should be available once the + * MSQ is able to run async queries + */ + public static final long MAX_SELECT_RESULT_ROWS = 3_000; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 0ac6550ea7a1..067911a251b4 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -22,13 +22,16 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.frame.util.DurableStorageUtils; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprEval; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.msq.indexing.MSQSpec; import org.apache.druid.msq.indexing.MSQTuningConfig; @@ -1784,6 +1787,66 @@ public void testGroupByOnFooWithDurableStoragePathAssertions() throws IOExceptio } } + @Test + public void testSelectRowsGetTruncatedInReports() throws IOException + { + RowSignature dummyRowSignature = RowSignature.builder().add("timestamp", ColumnType.LONG).build(); + + final int numFiles = 1000; + + final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json"); + final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath()); + + String externalFiles = String.join(", ", Collections.nCopies(numFiles, toReadFileNameAsJson)); + + List result = new ArrayList<>(); + for (int i = 0; i < Limits.MAX_SELECT_RESULT_ROWS; ++i) { + result.add(new Object[]{1}); + } + + testSelectQuery() + .setSql(StringUtils.format( + " SELECT 1 as \"timestamp\"\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [%s],\"type\":\"local\"}',\n" + + " '{\"type\": \"csv\", \"hasHeaderRow\": true}',\n" + + " '[{\"name\": \"timestamp\", \"type\": \"string\"}]'\n" + + " )\n" + + ")", + externalFiles + )) + .setExpectedRowSignature(dummyRowSignature) + .setExpectedMSQSpec( + MSQSpec + .builder() + .query(newScanQueryBuilder() + .dataSource(new ExternalDataSource( + new LocalInputSource(null, null, Collections.nCopies(numFiles, toRead)), + new CsvInputFormat(null, null, null, true, 0), + RowSignature.builder().add("timestamp", ColumnType.STRING).build() + )) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("v0") + .virtualColumns(new ExpressionVirtualColumn("v0", ExprEval.of(1L).toExpr(), ColumnType.LONG)) + .context(defaultScanQueryContext( + context, + RowSignature.builder().add("v0", ColumnType.LONG).build() + )) + .build() + ) + .columnMappings(new ColumnMappings( + ImmutableList.of( + new ColumnMapping("v0", "timestamp") + ) + )) + .tuningConfig(MSQTuningConfig.defaultConfig()) + .build()) + .setQueryContext(context) + .setExpectedResultRows(result) + .verifyResults(); + } + @Test public void testMultiValueStringWithIncorrectType() throws IOException { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index e44d1974b8a3..3104fa8e337b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -1317,7 +1317,7 @@ public void verifyResults() { Preconditions.checkArgument(expectedResultRows != null, "Result rows cannot be null"); Preconditions.checkArgument(expectedRowSignature != null, "Row signature cannot be null"); - Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec not "); + Preconditions.checkArgument(expectedMSQSpec != null, "MultiStageQuery Query spec cannot be null "); Pair, List>> specAndResults = runQueryWithResult(); if (specAndResults == null) { // A fault was expected and the assertion has been done in the runQueryWithResult From 3097e056eced6bb6f037b93c477ea38bcd32b37d Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Tue, 6 Jun 2023 09:24:15 +0530 Subject: [PATCH 2/5] reduce number of files in test --- .../src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 067911a251b4..b09e3aa01f11 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -1792,7 +1792,7 @@ public void testSelectRowsGetTruncatedInReports() throws IOException { RowSignature dummyRowSignature = RowSignature.builder().add("timestamp", ColumnType.LONG).build(); - final int numFiles = 1000; + final int numFiles = 200; final File toRead = MSQTestFileUtils.getResourceAsTemporaryFile(temporaryFolder, this, "/wikipedia-sampled.json"); final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath()); From 0b14367bc3e95e1c9b65abc28a3bc22a16d1e4f4 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Fri, 9 Jun 2023 12:00:15 +0530 Subject: [PATCH 3/5] add truncated flag --- .../apache/druid/msq/exec/ControllerImpl.java | 6 ++-- .../msq/indexing/report/MSQResultsReport.java | 36 +++++++++++++++---- .../indexing/report/MSQTaskReportTest.java | 13 ++----- .../apache/druid/msq/test/MSQTestBase.java | 16 +-------- .../testing/utils/MsqTestQueryHelper.java | 7 ++-- 5 files changed, 39 insertions(+), 39 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 70ee715fa998..845f65b01ad0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1454,7 +1454,9 @@ private Yielder getFinalResultsYielder( return Sequences.simple(retVal); } ) - .limit(Limits.MAX_SELECT_RESULT_ROWS) + // We add one more row than required in the iterator, so that we can determine if the results are + // truncated or not + .limit(Limits.MAX_SELECT_RESULT_ROWS + 1) .withBaggage(resultReaderExec::shutdownNow) ); } else { @@ -2028,7 +2030,7 @@ private static MSQResultsReport makeResultsTaskReport( ); } - return new MSQResultsReport(mappedSignature.build(), sqlTypeNames, resultsYielder); + return new MSQResultsReport(mappedSignature.build(), sqlTypeNames, resultsYielder, null); } private static MSQStatusReport makeStatusReport( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java index c145ae77d57a..56359f8cdf9c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java @@ -27,9 +27,11 @@ import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.segment.column.ColumnType; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -42,17 +44,30 @@ public class MSQResultsReport private final List signature; @Nullable private final List sqlTypeNames; - private final Yielder resultYielder; + private final List results; + private final boolean resultsTruncated; public MSQResultsReport( final List signature, @Nullable final List sqlTypeNames, - final Yielder resultYielder + Yielder resultYielder, + @Nullable Boolean resultsTruncated ) { this.signature = Preconditions.checkNotNull(signature, "signature"); this.sqlTypeNames = sqlTypeNames; - this.resultYielder = Preconditions.checkNotNull(resultYielder, "resultYielder"); + this.results = new ArrayList<>(); + int rowCount = 0; + while (!resultYielder.isDone() && rowCount < Limits.MAX_SELECT_RESULT_ROWS) { + results.add(resultYielder.get()); + resultYielder = resultYielder.next(null); + ++rowCount; + } + if (resultsTruncated != null) { + this.resultsTruncated = !resultYielder.isDone() || resultsTruncated; + } else { + this.resultsTruncated = !resultYielder.isDone(); + } } /** @@ -62,10 +77,11 @@ public MSQResultsReport( static MSQResultsReport fromJson( @JsonProperty("signature") final List signature, @JsonProperty("sqlTypeNames") @Nullable final List sqlTypeNames, - @JsonProperty("results") final List results + @JsonProperty("results") final List results, + @JsonProperty("resultsTruncated") final Boolean resultsTruncated ) { - return new MSQResultsReport(signature, sqlTypeNames, Yielders.each(Sequences.simple(results))); + return new MSQResultsReport(signature, sqlTypeNames, Yielders.each(Sequences.simple(results)), resultsTruncated); } @JsonProperty("signature") @@ -83,9 +99,15 @@ public List getSqlTypeNames() } @JsonProperty("results") - public Yielder getResultYielder() + public List getResults() + { + return results; + } + + @JsonProperty("resultsTruncted") + public boolean isResultsTruncated() { - return resultYielder; + return resultsTruncated; } public static class ColumnAndType diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java index aeca792024a9..6c111c7fd80b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/report/MSQTaskReportTest.java @@ -31,7 +31,6 @@ import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.msq.counters.CounterSnapshotsTree; import org.apache.druid.msq.guice.MSQIndexingModule; @@ -51,7 +50,6 @@ import java.io.File; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -105,7 +103,8 @@ public void testSerdeResultsReport() throws Exception new MSQResultsReport( Collections.singletonList(new MSQResultsReport.ColumnAndType("s", ColumnType.STRING)), ImmutableList.of(SqlTypeName.VARCHAR), - Yielders.each(Sequences.simple(results)) + Yielders.each(Sequences.simple(results)), + null ) ) ); @@ -125,13 +124,7 @@ public void testSerdeResultsReport() throws Exception Assert.assertEquals(report.getPayload().getStatus().getPendingTasks(), report2.getPayload().getStatus().getPendingTasks()); Assert.assertEquals(report.getPayload().getStages(), report2.getPayload().getStages()); - Yielder yielder = report2.getPayload().getResults().getResultYielder(); - final List results2 = new ArrayList<>(); - - while (!yielder.isDone()) { - results2.add(yielder.get()); - yielder = yielder.next(null); - } + final List results2 = report2.getPayload().getResults().getResults(); Assert.assertEquals(results.size(), results2.size()); for (int i = 0; i < results.size(); i++) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 3104fa8e337b..de922451e49e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -69,7 +69,6 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; -import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.metadata.input.InputSourceModule; @@ -769,20 +768,7 @@ public static List getRows(@Nullable MSQResultsReport resultsReport) if (resultsReport == null) { return null; } else { - Yielder yielder = resultsReport.getResultYielder(); - List rows = new ArrayList<>(); - while (!yielder.isDone()) { - rows.add(yielder.get()); - yielder = yielder.next(null); - } - try { - yielder.close(); - } - catch (IOException e) { - throw new ISE("Unable to get results from the report"); - } - - return rows; + return resultsReport.getResults(); } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java index 7525cb9d8749..5e1a40df0737 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java @@ -31,7 +31,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.msq.indexing.report.MSQResultsReport; import org.apache.druid.msq.indexing.report.MSQTaskReport; @@ -201,17 +200,15 @@ private void compareResults(String taskId, MsqQueryWithResults expectedQueryWith List> actualResults = new ArrayList<>(); - Yielder yielder = resultsReport.getResultYielder(); + List results = resultsReport.getResults(); List rowSignature = resultsReport.getSignature(); - while (!yielder.isDone()) { - Object[] row = yielder.get(); + for (Object[] row : results) { Map rowWithFieldNames = new LinkedHashMap<>(); for (int i = 0; i < row.length; ++i) { rowWithFieldNames.put(rowSignature.get(i).getName(), row[i]); } actualResults.add(rowWithFieldNames); - yielder = yielder.next(null); } QueryResultVerifier.ResultVerificationObject resultsComparison = QueryResultVerifier.compareResults( From acd78d714cc2b4d587b84890534cf0f161113499 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Mon, 12 Jun 2023 10:36:38 +0530 Subject: [PATCH 4/5] avoid materializing select results to list, use iterable instead --- .../apache/druid/msq/exec/ControllerImpl.java | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 845f65b01ad0..24ab74fda666 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -206,6 +206,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -1430,33 +1431,39 @@ private Yielder getFinalResultsYielder( ).collect(Collectors.toList()); final List sqlTypeNames = task.getSqlTypeNames(); - final List retVal = new ArrayList<>(); - while (!cursor.isDone()) { - final Object[] row = new Object[columnMappings.size()]; - for (int i = 0; i < row.length; i++) { - final Object value = selectors.get(i).getObject(); - if (sqlTypeNames == null || task.getSqlResultsContext() == null) { - // SQL type unknown, or no SQL results context: pass-through as is. - row[i] = value; - } else { - row[i] = SqlResults.coerce( - context.jsonMapper(), - task.getSqlResultsContext(), - value, - sqlTypeNames.get(i) - ); - } + Iterable retVal = () -> new Iterator() + { + @Override + public boolean hasNext() + { + return !cursor.isDone(); } - retVal.add(row); - cursor.advance(); - } + @Override + public Object[] next() + { + final Object[] row = new Object[columnMappings.size()]; + for (int i = 0; i < row.length; i++) { + final Object value = selectors.get(i).getObject(); + if (sqlTypeNames == null || task.getSqlResultsContext() == null) { + // SQL type unknown, or no SQL results context: pass-through as is. + row[i] = value; + } else { + row[i] = SqlResults.coerce( + context.jsonMapper(), + task.getSqlResultsContext(), + value, + sqlTypeNames.get(i) + ); + } + } + cursor.advance(); + return row; + } + }; return Sequences.simple(retVal); } ) - // We add one more row than required in the iterator, so that we can determine if the results are - // truncated or not - .limit(Limits.MAX_SELECT_RESULT_ROWS + 1) .withBaggage(resultReaderExec::shutdownNow) ); } else { From f44138f190a9e6c126432acb383102b8664cdd65 Mon Sep 17 00:00:00 2001 From: Laksh Singla Date: Wed, 14 Jun 2023 11:54:15 +0530 Subject: [PATCH 5/5] javadocs --- docs/api-reference/sql-ingestion-api.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/api-reference/sql-ingestion-api.md b/docs/api-reference/sql-ingestion-api.md index a9cceb8d4d96..43acca9b49c8 100644 --- a/docs/api-reference/sql-ingestion-api.md +++ b/docs/api-reference/sql-ingestion-api.md @@ -203,6 +203,8 @@ Keep the following in mind when using the task API to view reports: - As an experimental feature, the MSQ task engine supports running SELECT queries. SELECT query results are written into the `multiStageQuery.payload.results.results` task report key as an array of arrays. The behavior and result format of plain SELECT queries (without INSERT or REPLACE) is subject to change. +- `multiStageQuery.payload.results.resultsTruncated` denote whether the results of the report have been truncated to prevent +the reports from blowing up For an explanation of the fields in a report, see [Report response fields](#report-response-fields).