Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IcebergIO] cleanups, improvements, and test tweaks #33592

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;

@AutoValue
public abstract class IcebergCatalogConfig implements Serializable {
private transient @MonotonicNonNull Catalog cachedCatalog;

@Pure
@Nullable
public abstract String getCatalogName();
Expand All @@ -47,6 +51,9 @@ public static Builder builder() {
}

public org.apache.iceberg.catalog.Catalog catalog() {
if (cachedCatalog != null) {
return cachedCatalog;
}
String catalogName = getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
Expand All @@ -63,7 +70,8 @@ public org.apache.iceberg.catalog.Catalog catalog() {
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
return CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
return cachedCatalog;
}

@AutoValue.Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class DestinationState {
boolean write(Record record) {
routingPartitionKey.partition(getPartitionableRecord(record));

if (!writers.asMap().containsKey(routingPartitionKey) && openWriters >= maxNumWriters) {
if (writers.getIfPresent(routingPartitionKey) == null && openWriters >= maxNumWriters) {
return false;
}
RecordWriter writer = fetchWriterForPartition(routingPartitionKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.iceberg.catalog;

import java.io.IOException;
import java.util.Map;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -50,7 +49,7 @@ public Catalog createCatalog() {
}

@Override
public void catalogCleanup() throws IOException {
public void catalogCleanup() {
for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) {
// only delete tables that were created in this test run
if (tableIdentifier.name().contains(String.valueOf(SALT))) {
Expand All @@ -70,6 +69,7 @@ public Map<String, Object> managedIcebergConfig(String tableId) {
.put("gcp_location", "us-central1")
.put("warehouse", warehouse)
.put("catalog-impl", BQMS_CATALOG)
.put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.beam.sdk.io.iceberg.catalog;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;

public class HadoopCatalogIT extends IcebergCatalogBaseIT {
Expand All @@ -46,7 +49,12 @@ public Catalog createCatalog() {

@Override
public void catalogCleanup() throws IOException {
((HadoopCatalog) catalog).close();
HadoopCatalog hadoopCatalog = (HadoopCatalog) catalog;
List<TableIdentifier> tables = hadoopCatalog.listTables(Namespace.of(testName.getMethodName()));
for (TableIdentifier identifier : tables) {
hadoopCatalog.dropTable(identifier);
}
hadoopCatalog.close();
}

@Override
Expand All @@ -58,6 +66,7 @@ public Map<String, Object> managedIcebergConfig(String tableId) {
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
.put("warehouse", warehouse)
.put("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO")
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
*/
package org.apache.beam.sdk.io.iceberg.catalog;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.io.iceberg.catalog.hiveutils.HiveMetastoreExtension;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.junit.AfterClass;
import org.junit.BeforeClass;

/**
* Read and write tests using {@link HiveCatalog}.
Expand All @@ -37,18 +41,33 @@
*/
public class HiveCatalogIT extends IcebergCatalogBaseIT {
private static HiveMetastoreExtension hiveMetastoreExtension;
private static final String TEST_DB = "test_db";

private String testDb() {
return "test_db_" + testName.getMethodName();
}

@Override
public String tableId() {
return String.format("%s.%s", TEST_DB, testName.getMethodName());
return String.format("%s.%s", testDb(), "test_table");
}

@BeforeClass
public static void setUpClass() throws MetaException {
String warehouse = warehouse(HiveCatalogIT.class);
hiveMetastoreExtension = new HiveMetastoreExtension(warehouse);
}

@AfterClass
public static void tearDown() throws Exception {
if (hiveMetastoreExtension != null) {
hiveMetastoreExtension.cleanup();
}
}

@Override
public void catalogSetup() throws Exception {
hiveMetastoreExtension = new HiveMetastoreExtension(warehouse);
String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(TEST_DB);
Database db = new Database(TEST_DB, "description", dbPath, Maps.newHashMap());
String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(testDb());
Database db = new Database(testDb(), "description", dbPath, Maps.newHashMap());
hiveMetastoreExtension.metastoreClient().createDatabase(db);
}

Expand All @@ -66,7 +85,11 @@ public Catalog createCatalog() {
@Override
public void catalogCleanup() throws Exception {
if (hiveMetastoreExtension != null) {
hiveMetastoreExtension.cleanup();
List<String> tables = hiveMetastoreExtension.metastoreClient().getAllTables(testDb());
for (String table : tables) {
hiveMetastoreExtension.metastoreClient().dropTable(testDb(), table, true, false);
}
hiveMetastoreExtension.metastoreClient().dropDatabase(testDb());
}
}

Expand All @@ -81,7 +104,10 @@ public Map<String, Object> managedIcebergConfig(String tableId) {

return ImmutableMap.<String, Object>builder()
.put("table", tableId)
.put("name", "hive_" + catalogName)
.put("catalog_name", "hive_" + catalogName)
.put(
"catalog_properties",
ImmutableMap.of("io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO"))
.put("config_properties", confProperties)
.build();
}
Expand Down
Loading
Loading