Skip to content

Commit

Permalink
Enable failure recovery for Iceberg connector
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Feb 14, 2022
1 parent a8bc0d3 commit ec8d1c0
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.trino.spi.connector.MaterializedViewFreshness;
import io.trino.spi.connector.MaterializedViewNotFoundException;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SystemTable;
Expand Down Expand Up @@ -418,7 +419,7 @@ public void setSchemaAuthorization(ConnectorSession session, String schemaName,
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
Optional<ConnectorTableLayout> layout = getNewTableLayout(session, tableMetadata);
finishCreateTable(session, beginCreateTable(session, tableMetadata, layout), ImmutableList.of(), ImmutableList.of());
finishCreateTable(session, beginCreateTable(session, tableMetadata, layout, RetryMode.NO_RETRIES), ImmutableList.of(), ImmutableList.of());
}

@Override
Expand All @@ -436,7 +437,7 @@ public Optional<ConnectorTableLayout> getNewTableLayout(ConnectorSession session
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout)
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
verify(transaction == null, "transaction already set");
transaction = newCreateTableTransaction(catalog, tableMetadata, session);
Expand Down Expand Up @@ -492,7 +493,7 @@ private Optional<ConnectorTableLayout> getWriteLayout(Schema tableSchema, Partit
}

@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns, RetryMode retryMode)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
Expand Down Expand Up @@ -563,7 +564,8 @@ public Optional<ConnectorTableExecuteHandle> getTableHandleForExecute(
ConnectorSession session,
ConnectorTableHandle connectorTableHandle,
String procedureName,
Map<String, Object> executeProperties)
Map<String, Object> executeProperties,
RetryMode retryMode)
{
IcebergTableHandle tableHandle = (IcebergTableHandle) connectorTableHandle;

Expand Down Expand Up @@ -1067,7 +1069,7 @@ public boolean delegateMaterializedViewRefreshToConnector(ConnectorSession sessi
}

@Override
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles)
public ConnectorInsertTableHandle beginRefreshMaterializedView(ConnectorSession session, ConnectorTableHandle tableHandle, List<ConnectorTableHandle> sourceTableHandles, RetryMode retryMode)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = catalog.loadTable(session, table.getSchemaTableName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import io.trino.operator.RetryPolicy;
import io.trino.testing.BaseFailureRecoveryTest;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;

import static java.lang.String.format;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

public abstract class BaseIcebergFailureRecoveryTest
extends BaseFailureRecoveryTest
{
protected BaseIcebergFailureRecoveryTest(RetryPolicy retryPolicy)
{
super(retryPolicy);
}

@Override
protected boolean areWriteRetriesSupported()
{
return true;
}

@Override
public void testAnalyzeStatistics()
{
assertThatThrownBy(super::testAnalyzeStatistics)
.hasMessageContaining("This connector does not support analyze");
}

@Override
public void testDelete()
{
assertThatThrownBy(super::testDelete)
.hasMessageContaining("This connector only supports delete where one or more identity-transformed partitions are deleted entirely");
}

@Override
public void testDeleteWithSubquery()
{
assertThatThrownBy(super::testDelete)
.hasMessageContaining("This connector only supports delete where one or more identity-transformed partitions are deleted entirely");
}

@Override
protected void createPartitionedLineitemTable(String tableName, List<String> columns, String partitionColumn)
{
@Language("SQL") String sql = format(
"CREATE TABLE %s WITH (partitioning=array['%s']) AS SELECT %s FROM tpch.tiny.lineitem",
tableName,
partitionColumn,
String.join(",", columns));
getQueryRunner().execute(sql);
}

@Override
public void testUpdate()
{
assertThatThrownBy(super::testUpdate)
.hasMessageContaining("This connector does not support updates");
}

@Override
public void testUpdateWithSubquery()
{
assertThatThrownBy(super::testUpdateWithSubquery)
.hasMessageContaining("This connector does not support updates");
}

@Test(invocationCount = INVOCATION_COUNT)
public void testCreatePartitionedTable()
{
testTableModification(
Optional.empty(),
"CREATE TABLE <table> WITH (partitioning = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders",
Optional.of("DROP TABLE <table>"));
}

@Test(invocationCount = INVOCATION_COUNT)
public void testInsertIntoNewPartition()
{
testTableModification(
Optional.of("CREATE TABLE <table> WITH (partitioning = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders"),
"INSERT INTO <table> SELECT *, 'partition2' p FROM orders",
Optional.of("DROP TABLE <table>"));
}

@Test(invocationCount = INVOCATION_COUNT)
public void testInsertIntoExistingPartition()
{
testTableModification(
Optional.of("CREATE TABLE <table> WITH (partitioning = ARRAY['p']) AS SELECT *, 'partition1' p FROM orders"),
"INSERT INTO <table> SELECT *, 'partition1' p FROM orders",
Optional.of("DROP TABLE <table>"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import io.trino.operator.RetryPolicy;
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;

import java.util.List;
import java.util.Map;

public class TestIcebergQueryFailureRecoveryTest
extends BaseIcebergFailureRecoveryTest
{
protected TestIcebergQueryFailureRecoveryTest()
{
super(RetryPolicy.QUERY);
}

@Override
protected QueryRunner createQueryRunner(List<TpchTable<?>> requiredTpchTables, Map<String, String> configProperties, Map<String, String> coordinatorProperties)
throws Exception
{
return IcebergQueryRunner.builder()
.setInitialTables(requiredTpchTables)
.setCoordinatorProperties(coordinatorProperties)
.setExtraProperties(configProperties)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import io.trino.operator.RetryPolicy;
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;

import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class TestIcebergTaskFailureRecoveryTest
extends BaseIcebergFailureRecoveryTest
{
protected TestIcebergTaskFailureRecoveryTest()
{
super(RetryPolicy.TASK);
}

@Override
protected QueryRunner createQueryRunner(List<TpchTable<?>> requiredTpchTables, Map<String, String> configProperties, Map<String, String> coordinatorProperties)
throws Exception
{
return IcebergQueryRunner.builder()
.setInitialTables(requiredTpchTables)
.setCoordinatorProperties(coordinatorProperties)
.setExtraProperties(ImmutableMap.<String, String>builder()
.putAll(configProperties)
// currently not supported for fault tolerant execution mode
.put("enable-dynamic-filtering", "false")
.build())
.build();
}

@Override
public void testJoinDynamicFilteringEnabled()
{
assertThatThrownBy(super::testJoinDynamicFilteringEnabled)
.hasMessageContaining("Dynamic filtering is not supported with automatic task retries enabled");
}
}

0 comments on commit ec8d1c0

Please sign in to comment.