Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[feat][io] Add support for partitioned tables (apache#8527)
Browse files Browse the repository at this point in the history
Co-authored-by: tison <[email protected]>
2 people authored and Demogorgon314 committed Dec 22, 2022
1 parent 4314bfd commit 60b4e22
Showing 1 changed file with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -68,8 +68,13 @@ public static class TableDefinition {
private TableDefinition(TableId tableId, List<ColumnId> columns) {
this(tableId, columns, null, null);
}
private TableDefinition(TableId tableId, List<ColumnId> columns,
List<ColumnId> nonKeyColumns, List<ColumnId> keyColumns) {

private TableDefinition(
TableId tableId,
List<ColumnId> columns,
List<ColumnId> nonKeyColumns,
List<ColumnId> keyColumns
) {
this.tableId = tableId;
this.columns = columns;
this.nonKeyColumns = nonKeyColumns;
@@ -92,13 +97,13 @@ public static TableDefinition of(TableId tableId, List<ColumnId> columns,
*/
public static TableId getTableId(Connection connection, String tableName) throws Exception {
DatabaseMetaData metadata = connection.getMetaData();
try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE"})) {
try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE", "PARTITIONED TABLE"})) {
if (rs.next()) {
String catalogName = rs.getString(1);
String schemaName = rs.getString(2);
String gotTableName = rs.getString(3);
checkState(tableName.equals(gotTableName),
"TableName not match: " + tableName + " Got: " + gotTableName);
"TableName not match: " + tableName + " Got: " + gotTableName);
if (log.isDebugEnabled()) {
log.debug("Get Table: {}, {}, {}", catalogName, schemaName, tableName);
}
@@ -113,21 +118,23 @@ public static TableId getTableId(Connection connection, String tableName) throws
* Get the {@link TableDefinition} for the given table.
*/
public static TableDefinition getTableDefinition(
Connection connection, TableId tableId,
Connection connection,
TableId tableId,
List<String> keyList,
List<String> nonKeyList,
boolean excludeNonDeclaredFields) throws Exception {
boolean excludeNonDeclaredFields
) throws Exception {
TableDefinition table = TableDefinition.of(
tableId, Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());

keyList = keyList == null ? Collections.emptyList() : keyList;
nonKeyList = nonKeyList == null ? Collections.emptyList() : nonKeyList;

try (ResultSet rs = connection.getMetaData().getColumns(
tableId.getCatalogName(),
tableId.getSchemaName(),
tableId.getTableName(),
null
tableId.getCatalogName(),
tableId.getSchemaName(),
tableId.getTableName(),
null
)) {
while (rs.next()) {
final String columnName = rs.getString(4);
@@ -205,7 +212,7 @@ public static StringJoiner buildUpdateSqlSetPart(TableDefinition table) {
}
StringJoiner setJoiner = new StringJoiner(",");

table.nonKeyColumns.forEach((columnId) ->{
table.nonKeyColumns.forEach((columnId) -> {
StringJoiner equals = new StringJoiner("=");
equals.add(columnId.getName()).add("? ");
setJoiner.add(equals.toString());
@@ -214,8 +221,6 @@ public static StringJoiner buildUpdateSqlSetPart(TableDefinition table) {
}

public static String buildDeleteSql(TableDefinition table) {
return "DELETE FROM "
+ table.tableId.getTableName()
+ combationWhere(table.keyColumns);
return "DELETE FROM " + table.tableId.getTableName() + combationWhere(table.keyColumns);
}
}

0 comments on commit 60b4e22

Please sign in to comment.