Skip to content

Commit

Permalink
[Native] Add e2e test for scaled writer
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 committed Oct 17, 2023
1 parent 53f9760 commit 14f866e
Showing 1 changed file with 134 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createPrestoBenchTables;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createRegion;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createSupplier;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;

Expand Down Expand Up @@ -250,6 +251,138 @@ public void testScaleWriters()
dropTableIfExists(tmpTableName);
}

@Test
public void testCollectColumnStatisticsOnCreateTable()
{
Session session = buildSessionForTableWrite();
String tmpTableName = generateRandomTableName();
// TODO: add varbinary test support once velox supports varbinary in value node
// https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp#L915
assertUpdate(session, format("" +
"CREATE TABLE %s " +
"WITH ( " +
" partitioned_by = ARRAY['p_varchar'] " +
") " +
"AS " +
"SELECT c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_array, p_varchar " +
"FROM ( " +
" VALUES " +
" (null, null, null, null, null, null, 'p1'), " +
" (null, null, null, null, null, null, 'p1'), " +
" (true, BIGINT '1', DOUBLE '2.2', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), sequence(0, 10), 'p1')," +
" (false, BIGINT '0', DOUBLE '1.2', TIMESTAMP '2012-08-08 00:00', CAST('abc2' AS VARCHAR), sequence(10, 20), 'p1')," +
" (null, null, null, null, null, null, 'p2'), " +
" (null, null, null, null, null, null, 'p2'), " +
" (true, BIGINT '2', DOUBLE '3.3', TIMESTAMP '2012-09-09 01:00', CAST('cba1' AS VARCHAR), sequence(20, 25), 'p2'), " +
" (false, BIGINT '1', DOUBLE '2.3', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), sequence(30, 35), 'p2') " +
") AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_array, p_varchar)", tmpTableName), 8);

assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p1')", tmpTableName),
"SELECT * FROM (VALUES " +
"('c_boolean', null, 2.0E0, 0.5E0, null, null, null), " +
"('c_bigint', null, 2.0E0, 0.5E0, null, '0', '1'), " +
"('c_double', null, 2.0E0, 0.5E0, null, '1.2', '2.2'), " +
"('c_timestamp', null, 2.0E0, 0.5E0, null, null, null), " +
"('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null), " + // 8.0
"('c_array', 184.0E0, null, 0.5, null, null, null), " + // 176
"('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null), " +
"(null, null, null, null, 4.0E0, null, null)) AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_array, p_varchar)");
assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p2')", tmpTableName),
"SELECT * FROM (VALUES " +
"('c_boolean', null, 2.0E0, 0.5E0, null, null, null), " +
"('c_bigint', null, 2.0E0, 0.5E0, null, '1', '2'), " +
"('c_double', null, 2.0E0, 0.5E0, null, '2.3', '3.3'), " +
"('c_timestamp', null, 2.0E0, 0.5E0, null, null, null), " +
"('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null), " + // 8
"('c_array', 104.0E0, null, 0.5, null, null, null), " + // 96
"('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null), " +
"(null, null, null, null, 4.0E0, null, null)) AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_array, p_varchar)");

// non existing partition
assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p3')", tmpTableName),
"SELECT * FROM (VALUES " +
"('c_boolean', null, 0E0, 0E0, null, null, null), " +
"('c_bigint', null, 0E0, 0E0, null, null, null), " +
"('c_double', null, 0E0, 0E0, null, null, null), " +
"('c_timestamp', null, 0E0, 0E0, null, null, null), " +
"('c_varchar', 0E0, 0E0, 0E0, null, null, null), " +
"('c_array', null, 0E0, 0E0, null, null, null), " +
"('p_varchar', 0E0, 0E0, 0E0, null, null, null), " +
"(null, null, null, null, 0E0, null, null)) AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_array, p_varchar)");

dropTableIfExists(tmpTableName);
}

@Test
public void testCollectColumnStatisticsOnInsert()
{
Session session = buildSessionForTableWrite();
String tmpTableName = generateRandomTableName();
assertUpdate(session, format("" +
"CREATE TABLE %s ( " +
" c_boolean BOOLEAN, " +
" c_bigint BIGINT, " +
" c_double DOUBLE, " +
" c_timestamp TIMESTAMP, " +
" c_varchar VARCHAR, " +
" c_array ARRAY(BIGINT), " +
" p_varchar VARCHAR " +
") " +
"WITH ( " +
" partitioned_by = ARRAY['p_varchar'] " +
")", tmpTableName));

assertUpdate(format("" +
"INSERT INTO %s " +
"SELECT c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_array, p_varchar " +
"FROM ( " +
" VALUES " +
" (null, null, null, null, null, null, 'p1'), " +
" (null, null, null, null, null, null, 'p1'), " +
" (true, BIGINT '1', DOUBLE '2.2', TIMESTAMP '2012-08-08 01:00', CAST('abc1' AS VARCHAR), sequence(0, 10), 'p1')," +
" (false, BIGINT '0', DOUBLE '1.2', TIMESTAMP '2012-08-08 00:00', CAST('abc2' AS VARCHAR), sequence(10, 20), 'p1')," +
" (null, null, null, null, null, null, 'p2'), " +
" (null, null, null, null, null, null, 'p2'), " +
" (true, BIGINT '2', DOUBLE '3.3', TIMESTAMP '2012-09-09 01:00', CAST('cba1' AS VARCHAR), sequence(20, 25), 'p2'), " +
" (false, BIGINT '1', DOUBLE '2.3', TIMESTAMP '2012-09-09 00:00', CAST('cba2' AS VARCHAR), sequence(30, 35), 'p2') " +
") AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_array, p_varchar)", tmpTableName), 8);

assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p1')", tmpTableName),
"SELECT * FROM (VALUES " +
"('c_boolean', null, 2.0E0, 0.5E0, null, null, null), " +
"('c_bigint', null, 2.0E0, 0.5E0, null, '0', '1'), " +
"('c_double', null, 2.0E0, 0.5E0, null, '1.2', '2.2'), " +
"('c_timestamp', null, 2.0E0, 0.5E0, null, null, null), " +
"('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null), " + // 8
"('c_array', 184.0E0, null, 0.5E0, null, null, null), " + // 176
"('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null), " +
"(null, null, null, null, 4.0E0, null, null)) AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_array, p_varchar)");
assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p2')", tmpTableName),
"SELECT * FROM (VALUES " +
"('c_boolean', null, 2.0E0, 0.5E0, null, null, null), " +
"('c_bigint', null, 2.0E0, 0.5E0, null, '1', '2'), " +
"('c_double', null, 2.0E0, 0.5E0, null, '2.3', '3.3'), " +
"('c_timestamp', null, 2.0E0, 0.5E0, null, null, null), " +
"('c_varchar', 16.0E0, 2.0E0, 0.5E0, null, null, null), " + // 8
"('c_array', 104.0E0, null, 0.5, null, null, null), " + // 96
"('p_varchar', 8.0E0, 1.0E0, 0.0E0, null, null, null), " +
"(null, null, null, null, 4.0E0, null, null)) AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_array, p_varchar)");

// non existing partition
assertQuery(format("SHOW STATS FOR (SELECT * FROM %s WHERE p_varchar = 'p3')", tmpTableName),
"SELECT * FROM (VALUES " +
"('c_boolean', null, 0E0, 0E0, null, null, null), " +
"('c_bigint', null, 0E0, 0E0, null, null, null), " +
"('c_double', null, 0E0, 0E0, null, null, null), " +
"('c_timestamp', null, 0E0, 0E0, null, null, null), " +
"('c_varchar', 0E0, 0E0, 0E0, null, null, null), " +
"('c_array', null, 0E0, 0E0, null, null, null), " +
"('p_varchar', 0E0, 0E0, 0E0, null, null, null), " +
"(null, null, null, null, 0E0, null, null)) AS x (c_boolean, c_bigint, c_double, c_timestamp, c_varchar, c_array, p_varchar)");

dropTableIfExists(tmpTableName);
}

private void dropTableIfExists(String tableName)
{
computeExpected(String.format("DROP TABLE IF EXISTS %s", tableName), ImmutableList.of(BIGINT));
Expand All @@ -265,13 +398,12 @@ private String generateRandomTableName()

private Session buildSessionForTableWrite()
{
// TODO: enable this after column stats collection is enabled.
return Session.builder(getSession())
.setSystemProperty("scale_writers", "true")
.setSystemProperty("table_writer_merge_operator_enabled", "true")
.setSystemProperty("task_writer_count", "1")
.setSystemProperty("task_partitioned_writer_count", "2")
.setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "false")
.setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true")
.setCatalogSessionProperty("hive", "optimized_partition_update_serialization_enabled", "false")
.setCatalogSessionProperty("hive", "orc_compression_codec", "ZSTD")
.build();
Expand Down

0 comments on commit 14f866e

Please sign in to comment.