Skip to content

Commit

Permalink
[#7836] YSQL: Batching IN queries on Hash Keys
Browse files Browse the repository at this point in the history
Summary:
Before this change, IN conditions bound to hash key columns produce one request per possible values of the hash keys. For example, consider a query `SELECT * FROM sample_table WHERE h1 IN (1,4,6,8);` where sample_table has a primary index with `h1` as its full hash component. We send 4 requests, one per each in element, of the form `SELECT * FROM sample_table WHERE h1 = 1;`, `SELECT * FROM sample_table WHERE h1 = 4;` etc. If the IN condition was bound to a range column, we would send the entire filter at once as a singular condition and send just one request per partition of `sample_table_pkey`. The reason why we couldn't do this with hash column IN filters was because the `DocRowwiseIterator` could not perform skip scans over hash columns so it did not have the necessary infrastructure to process IN conditions on hash columns.

This diff fixes the above issue by having IN conditions on hash columns behave similar to those on range columns. In order to do this, we did the following changes:
  - We adjusted pgsql/qlscanspec and ScanChoices to be able to carry out skip scans on hash column IN conditions.
  - We added infrastructure in pg_doc_op.h to convert IN filters of the form `h1 IN (v1,v2,...,vn)` a condition expression of the form `(yb_hash_code(h1), h1) IN ((yb_hash_code(v1), v1), (yb_hash_code(v2), v2), (yb_hash_code(v3), v3), ..., (yb_hash_code(vn), vn))`. If we have multiple hash partitions on the table we form one request per partition and the RHS of the hash condition on each partition request is ensured to only have values from (v1,v2,...vn) that are relevant to it. This feature also works similarly for multicolumn hash keys.

This feature is disabled when serializable isolation level is used for now as there isn't infrastructure to lock multiple non-contiguous rows as such filters would require.

This feature's enablement is controlled by the autoflag GUC `yb_enable_hash_batch_in`.
We also added a tserver flag `ysql_hash_batch_permutation_limit` that specifies a limit on the number of hash permutations a query must produce in order to be eligible to use this feature. Without this check, we can materialize an unbounded number of hash permutations in memory and cause an OOM crash.

Test Plan:
```
./yb_build.sh release --java-test org.yb.pgsql.TestPgRegressIndex
./yb_build.sh --java-test 'org.yb.pgsql.TestPgRegressHashInQueries'
```

Reviewers: smishra, neil, amartsinchyk, kpopali

Reviewed By: kpopali

Subscribers: mbautin, kpopali, kannan, ssong, yql, mihnea, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D19672
  • Loading branch information
tanujnay112 committed Mar 17, 2023
1 parent dbd16ee commit fc57665
Show file tree
Hide file tree
Showing 43 changed files with 2,337 additions and 510 deletions.
18 changes: 18 additions & 0 deletions java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Scanner;
import java.util.Set;
import java.util.TreeMap;
Expand Down Expand Up @@ -2132,6 +2134,22 @@ protected String getExplainAnalyzeOutput(Statement stmt, String query) throws Ex
}
}

static final Pattern roundtrips_pattern = Pattern.compile("Storage Read Requests: (\\d+)\\s*$");

protected Long getNumStorageRoundtrips(Statement stmt, String query) throws Exception {
try (ResultSet rs = stmt.executeQuery(
"EXPLAIN (ANALYZE, DIST, COSTS OFF, TIMING OFF) " + query)) {
while (rs.next()) {
String line = rs.getString(1);
Matcher m = roundtrips_pattern.matcher(line);
if (m.find()) {
return Long.parseLong(m.group(1));
}
}
}
return null;
}

protected Long getNumDocdbRequests(Statement stmt, String query) throws Exception {
// Executing query once just in case if master catalog cache is not refreshed
stmt.execute(query);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

package org.yb.pgsql;

import org.junit.Test;
import org.junit.runner.RunWith;

import org.yb.util.YBTestRunnerNonTsanOnly;
import org.yb.util.RegexMatcher;

import java.sql.Connection;
import java.sql.Statement;

import java.util.HashSet;
import java.util.Set;

import static org.yb.AssertionWrappers.*;

@RunWith(value=YBTestRunnerNonTsanOnly.class)
public class TestPgRegressHashInQueries extends BasePgSQLTest {

@Override
protected Integer getYsqlRequestLimit() {
// This is so number of roundtrips equals the number of request operators created.
return 1;
}

@Test
public void testInQueryBatchingOnHashKey() throws Exception {

String createTable = "CREATE TABLE t1 (a int PRIMARY KEY, b int) SPLIT INTO 3 TABLETS";
String insertTable = "INSERT INTO t1 SELECT i, i FROM (SELECT generate_series(1, 1024) i) t";

try (Statement statement = connection.createStatement()) {
statement.execute(createTable);
statement.execute(insertTable);
}

// Generate select query required to run batched IN.
// SELECT * FROM t1 WHERE a IN (1, 2, 3, .... 511, 512);
int num_rows = 512;
String query = "SELECT * FROM t1 WHERE a IN (";
for(int i = 1; i < num_rows; ++i) {
query += i + ", ";
}
query += num_rows + ")";
Set<Row> expectedRows = new HashSet<>();
for (int i = 1; i <= num_rows; i++) {
expectedRows.add(new Row(i, i));
}

try (Statement statement = connection.createStatement()) {
statement.execute("SET yb_enable_hash_batch_in = false");
long noBatchingNumRequests = getNumStorageRoundtrips(statement, query);
assertEquals(512, noBatchingNumRequests);
assertRowSet(statement, query, expectedRows);

statement.execute("SET yb_enable_hash_batch_in = true");
long batchingNumRequests = getNumStorageRoundtrips(statement, query);
assertRowSet(statement, query, expectedRows);
// We send three requests as the number of tablets created are three.
assertEquals(3, batchingNumRequests);
}
}

@Test
public void testInQueryBatchingOnMixedKey() throws Exception {

String createTable =
"CREATE TABLE t1 (a int, b int, PRIMARY KEY(a hash, b asc)) SPLIT INTO 3 TABLETS";
String insertTable1 =
"INSERT INTO t1 SELECT i, i FROM (SELECT generate_series(1, 1024) i) t";
String insertTable2 =
"INSERT INTO t1 SELECT i, i+1 FROM (SELECT generate_series(1, 1024) i) t";

try (Statement statement = connection.createStatement()) {
statement.execute(createTable);
statement.execute(insertTable1);
statement.execute(insertTable2);
}

// Generate select query required to run batched IN.
// SELECT * FROM t1 WHERE a IN (1, 2, 3, .... 511, 512);
int upper_limit = 512;
String query = "SELECT * FROM t1 WHERE a IN (";
for(int i = 1; i < upper_limit; ++i) {
query += i + ", ";
}
query += upper_limit + ") AND b IN (";

for(int i = 1; i < upper_limit; ++i) {
if ((i % 2) == 0) {
query += i + ", ";
}
}
query += upper_limit + ")";

Set<Row> expectedRows = new HashSet<>();
for (int i = 1; i <= upper_limit; i++) {
if ((i % 2) == 1) {
expectedRows.add(new Row(i, i+1));
} else {
expectedRows.add(new Row(i, i));
}
}

try (Statement statement = connection.createStatement()) {
statement.execute("SET yb_enable_hash_batch_in = false");
long noBatchingNumRequests = getNumStorageRoundtrips(statement, query);
assertEquals(512, noBatchingNumRequests);
assertRowSet(statement, query, expectedRows);

statement.execute("SET yb_enable_hash_batch_in = true");
long batchingNumRequests = getNumStorageRoundtrips(statement, query);
assertRowSet(statement, query, expectedRows);
// We send three requests as the number of tablets created are three.
assertEquals(3, batchingNumRequests);
}
}

@Test
public void testInQueryBatchingNestLoopHashKey() throws Exception {
String createTable1 = "CREATE TABLE x (a int PRIMARY KEY, b int) SPLIT INTO 3 TABLETS";
String insertTable1 = "INSERT INTO x SELECT i*2, i FROM (SELECT generate_series(1, 4096) i) t";
String createTable2 = "CREATE TABLE y (a int PRIMARY KEY, b int) SPLIT INTO 3 TABLETS";
String insertTable2 = "INSERT INTO y SELECT i*5, i FROM (SELECT generate_series(1, 4096) i) t";

try (Statement statement = connection.createStatement()) {
statement.execute(createTable1);
statement.execute(insertTable1);
statement.execute(createTable2);
statement.execute(insertTable2);
}

// Generate NL Join query and enable NL Join batching in it with different batch sizes.
// These get automatically converted to batched IN queries. We should expect the best
// performance when we enable IN batching.
String query = "SELECT * FROM x t1 JOIN y t2 ON t1.a = t2.a";

Set<Row> expectedRows = new HashSet<>();
for (int i = 1; i <= 819; i++) {
expectedRows.add(new Row(i*10, i*5, i*10, i*2));
}

try (Statement statement = connection.createStatement()) {
// Enabling NL Join batching
statement.execute("SET enable_hashjoin = off");
statement.execute("SET enable_mergejoin = off");
statement.execute("SET enable_seqscan = off");
statement.execute("SET enable_material = off");

statement.execute("SET yb_bnl_batch_size = 3;");
statement.execute("SET yb_enable_hash_batch_in = false");
long noBatchingSmallBatchSizeNumRPCs = getNumStorageRoundtrips(statement, query);
assertEquals(4102, noBatchingSmallBatchSizeNumRPCs);
assertRowSet(statement, query, expectedRows);

statement.execute("SET yb_bnl_batch_size = 1024;");
statement.execute("SET yb_enable_hash_batch_in = false");
long noBatchingLargeBatchSizeNumRPCs = getNumStorageRoundtrips(statement, query);
assertEquals(4102, noBatchingLargeBatchSizeNumRPCs);
assertRowSet(statement, query, expectedRows);

statement.execute("SET yb_bnl_batch_size = 1024;");
statement.execute("SET yb_enable_hash_batch_in = true");
long batchingLargeBatchSizeNumRPCs = getNumStorageRoundtrips(statement, query);
assertEquals(12, batchingLargeBatchSizeNumRPCs);
assertRowSet(statement, query, expectedRows);
}
}

@Test
public void schedule() throws Exception {
runPgRegressTest("yb_hash_in_schedule");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -812,9 +812,9 @@ SELECT * FROM ft2 WHERE c1 = ANY (ARRAY(SELECT c1 FROM ft1 WHERE c1 < 5));
c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
----+----+-------+------------------------------+--------------------------+----+------------+-----
1 | 1 | 00001 | Fri Jan 02 00:00:00 1970 PST | Fri Jan 02 00:00:00 1970 | 1 | 1 | foo
4 | 4 | 00004 | Mon Jan 05 00:00:00 1970 PST | Mon Jan 05 00:00:00 1970 | 4 | 4 | foo
2 | 2 | 00002 | Sat Jan 03 00:00:00 1970 PST | Sat Jan 03 00:00:00 1970 | 2 | 2 | foo
3 | 3 | 00003 | Sun Jan 04 00:00:00 1970 PST | Sun Jan 04 00:00:00 1970 | 3 | 3 | foo
4 | 4 | 00004 | Mon Jan 05 00:00:00 1970 PST | Mon Jan 05 00:00:00 1970 | 4 | 4 | foo
(4 rows)

-- we should not push order by clause with volatile expressions or unsafe
Expand Down
7 changes: 5 additions & 2 deletions src/postgres/src/backend/nodes/copyfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -902,8 +902,11 @@ _copyYbBatchedNestLoop(const YbBatchedNestLoop *from)
* copy remainder of node
*/
COPY_SCALAR_FIELD(num_hashClauseInfos);
COPY_POINTER_FIELD(hashClauseInfos,
from->num_hashClauseInfos * sizeof(YbBNLHashClauseInfo));

if (from->num_hashClauseInfos > 0)
COPY_POINTER_FIELD(
hashClauseInfos,
from->num_hashClauseInfos * sizeof(YbBNLHashClauseInfo));

for (int i = 0; i < from->num_hashClauseInfos; i++)
{
Expand Down
11 changes: 10 additions & 1 deletion src/postgres/src/backend/utils/misc/guc.c
Original file line number Diff line number Diff line change
Expand Up @@ -2103,7 +2103,16 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},

{
{"yb_enable_hash_batch_in", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("GUC variable that enables batching RPCs of generated for IN queries on hash "
"keys issued to the same tablets."),
NULL
},
&yb_enable_hash_batch_in,
true,
NULL, NULL, NULL
},
{
{"yb_bypass_cond_recheck", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("If true then condition rechecking is bypassed at YSQL if the condition is bound to DocDB."),
Expand Down
1 change: 1 addition & 0 deletions src/postgres/src/test/regress/expected/yb_create_index.out
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,7 @@ SELECT * FROM test_method WHERE h2 = 258;
2 | 258 | 1 | 2 | 10 | 20
(1 row)

DROP TABLE test_method;
-- Test more HASH key cases in PRIMARY KEY
CREATE TABLE test_hash (
h1 int, h2 int, r1 int, r2 int, v1 int, v2 int);
Expand Down
6 changes: 2 additions & 4 deletions src/postgres/src/test/regress/expected/yb_hash_code.out
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ EXPLAIN (COSTS OFF, TIMING OFF, SUMMARY OFF, ANALYZE) SELECT x, yb_hash_code(x)
Sort Method: quicksort Memory: 25kB
-> Index Scan using test_table_one_primary_pkey on test_table_one_primary (actual rows=3 loops=1)
Index Cond: ((x = ANY ('{1,2,3,4}'::integer[])) AND (yb_hash_code(x) < 50000))
Rows Removed by Index Recheck: 1
(6 rows)
(5 rows)

SELECT x, yb_hash_code(x) FROM test_table_one_primary WHERE x IN (1, 2, 3, 4) AND yb_hash_code(x) < 50000 ORDER BY x;
x | yb_hash_code
Expand Down Expand Up @@ -938,9 +937,8 @@ EXPLAIN (COSTS OFF, TIMING OFF, SUMMARY OFF, ANALYZE) SELECT v1, yb_hash_code(v4
----------------------------------------------------------------------------------------------------------------------
Index Only Scan using test_index_only_scan_recheck_v4_v1_idx on test_index_only_scan_recheck (actual rows=2 loops=1)
Index Cond: ((v4 = ANY ('{1,2,3}'::integer[])) AND (yb_hash_code(v4) < 50000))
Rows Removed by Index Recheck: 1
Heap Fetches: 0
(4 rows)
(3 rows)

SELECT v1, yb_hash_code(v4) FROM test_index_only_scan_recheck WHERE v4 IN (1, 2, 3) AND yb_hash_code(v4) < 50000;
v1 | yb_hash_code
Expand Down
Loading

0 comments on commit fc57665

Please sign in to comment.