Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a relationship to manage ER based on tableConstraints #18892

Merged
merged 9 commits into from
Dec 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public EntityNotFoundException(String message) {
super(Response.Status.NOT_FOUND, ERROR_TYPE, message);
}

private EntityNotFoundException(String message, Throwable cause) {
public EntityNotFoundException(String message, Throwable cause) {
super(Response.Status.NOT_FOUND, ERROR_TYPE, message, cause);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2741,25 +2741,6 @@ default List<String> listAfter(ListFilter filter, int limit, String afterName, S
return listAfter(
getTableName(), filter.getQueryParams(), condition, condition, limit, afterName, afterId);
}

@ConnectionAwareSqlQuery(
value =
"SELECT json FROM table_entity "
+ "WHERE JSON_SEARCH(JSON_EXTRACT(json, '$.tableConstraints[*].referredColumns'), "
+ "'one', :fqn) IS NOT NULL",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT json "
+ "FROM table_entity "
+ "WHERE EXISTS ("
+ " SELECT 1"
+ " FROM jsonb_array_elements(json->'tableConstraints') AS constraints"
+ " CROSS JOIN jsonb_array_elements_text(constraints->'referredColumns') AS referredColumn "
+ " WHERE referredColumn LIKE :fqn"
+ ")",
connectionType = POSTGRES)
List<String> findRelatedTables(@Bind("fqn") String fqn);
}

interface StoredProcedureDAO extends EntityDAO<StoredProcedure> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -89,6 +90,7 @@
import org.openmetadata.schema.type.csv.CsvFile;
import org.openmetadata.schema.type.csv.CsvHeader;
import org.openmetadata.schema.type.csv.CsvImportResult;
import org.openmetadata.sdk.exception.EntitySpecViolationException;
import org.openmetadata.sdk.exception.SuggestionException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.CatalogExceptionMessage;
Expand Down Expand Up @@ -676,6 +678,7 @@ public void prepare(Table table, boolean update) {
.withDatabase(schema.getDatabase())
.withService(schema.getService())
.withServiceType(schema.getServiceType());
validateTableConstraints(table);
}

@Override
Expand All @@ -693,6 +696,8 @@ public void storeEntity(Table table, boolean update) {

// Restore the relationships
table.withColumns(columnWithTags).withService(service);
// Store ER relationships based on table constraints
addConstraintRelationship(table, table.getTableConstraints());
}

@Override
Expand Down Expand Up @@ -1117,6 +1122,33 @@ private List<CustomMetric> getCustomMetrics(Table table, String columnName) {
return customMetrics;
}

private void validateTableConstraints(Table table) {
if (!nullOrEmpty(table.getTableConstraints())) {
Set<TableConstraint> constraintSet = new HashSet<>();
for (TableConstraint constraint : table.getTableConstraints()) {
if (!constraintSet.add(constraint)) {
throw new EntitySpecViolationException(
"Duplicate constraint found in request: " + constraint);
}
for (String column : constraint.getColumns()) {
validateColumn(table, column);
}
if (!nullOrEmpty(constraint.getReferredColumns())) {
for (String column : constraint.getReferredColumns()) {
String toParent = FullyQualifiedName.getParentFQN(column);
String columnName = FullyQualifiedName.getColumnName(column);
try {
Table toTable = findByName(toParent, NON_DELETED);
validateColumn(toTable, columnName);
} catch (EntityNotFoundException e) {
throw new EntitySpecViolationException("Table not found: " + toParent);
}
}
}
}
}
}

/** Handles entity updated from PUT and POST operation. */
public class TableUpdater extends ColumnEntityUpdater {
public TableUpdater(Table original, Table updated, Operation operation) {
Expand All @@ -1129,7 +1161,7 @@ public void entitySpecificUpdate() {
Table updatedTable = updated;
DatabaseUtil.validateColumns(updatedTable.getColumns());
recordChange("tableType", origTable.getTableType(), updatedTable.getTableType());
updateConstraints(origTable, updatedTable);
updateTableConstraints(origTable, updatedTable, operation);
updateColumns(
COLUMN_FIELD, origTable.getColumns(), updated.getColumns(), EntityUtil.columnMatch);
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
Expand All @@ -1138,10 +1170,26 @@ public void entitySpecificUpdate() {
recordChange("locationPath", original.getLocationPath(), updated.getLocationPath());
}

private void updateConstraints(Table origTable, Table updatedTable) {
private void updateTableConstraints(Table origTable, Table updatedTable, Operation operation) {
validateTableConstraints(updatedTable);
if (operation.isPatch()
&& !nullOrEmpty(updatedTable.getTableConstraints())
&& !nullOrEmpty(origTable.getTableConstraints())) {
List<TableConstraint> newConstraints = new ArrayList<>();
for (TableConstraint constraint : updatedTable.getTableConstraints()) {
TableConstraint existing =
origTable.getTableConstraints().stream()
.filter(c -> EntityUtil.tableConstraintMatch.test(c, constraint))
.findAny()
.orElse(null);
if (existing == null) {
newConstraints.add(constraint);
}
}
checkDuplicateTableConstraints(origTable, newConstraints);
}
List<TableConstraint> origConstraints = listOrEmpty(origTable.getTableConstraints());
List<TableConstraint> updatedConstraints = listOrEmpty(updatedTable.getTableConstraints());

origConstraints.sort(EntityUtil.compareTableConstraint);
origConstraints.stream().map(TableConstraint::getColumns).forEach(Collections::sort);

Expand All @@ -1157,6 +1205,66 @@ private void updateConstraints(Table origTable, Table updatedTable) {
added,
deleted,
EntityUtil.tableConstraintMatch);

// manage table ER relationship based on table constraints
addConstraintRelationship(origTable, added);
deleteConstraintRelationship(origTable, deleted);
}
}

private void checkDuplicateTableConstraints(
Table origTable, List<TableConstraint> newConstraints) {
if (!nullOrEmpty(origTable.getTableConstraints()) && !nullOrEmpty(newConstraints)) {
Set<TableConstraint> origConstraints =
new HashSet<>(listOrEmpty(origTable.getTableConstraints()));
for (TableConstraint constraint : newConstraints) {
if (!origConstraints.add(constraint)) {
throw new EntitySpecViolationException("Table Constraint is Duplicate: " + constraint);
}
}
}
}

private void addConstraintRelationship(Table table, List<TableConstraint> constraints) {
if (!nullOrEmpty(constraints)) {
for (TableConstraint constraint : constraints) {
if (!nullOrEmpty(constraint.getReferredColumns())) {
for (String column : constraint.getReferredColumns()) {
String toParent = FullyQualifiedName.getParentFQN(column);
try {
EntityReference toTable =
Entity.getEntityReferenceByName(TABLE, toParent, NON_DELETED);
addRelationship(
table.getId(), toTable.getId(), TABLE, TABLE, Relationship.RELATED_TO);
} catch (EntityNotFoundException e) {
throw EntityNotFoundException.byName(
String.format(
"Failed to add table constraint due to missing table %s", toParent));
}
}
}
}
}
}

private void deleteConstraintRelationship(Table table, List<TableConstraint> constraints) {
if (!nullOrEmpty(constraints)) {
for (TableConstraint constraint : constraints) {
if (!nullOrEmpty(constraint.getReferredColumns())) {
for (String column : constraint.getReferredColumns()) {
String toParent = FullyQualifiedName.getParentFQN(column);
try {
EntityReference toTable = Entity.getEntityReferenceByName(TABLE, toParent, ALL);
deleteRelationship(
table.getId(), TABLE, toTable.getId(), TABLE, Relationship.RELATED_TO);
} catch (EntityNotFoundException e) {
throw EntityNotFoundException.byName(
String.format(
"Failed to add table constraint due to missing table %s", toParent));
}
}
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addDisplayNameToCustomProperty;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addEditGlossaryTermsToDataConsumerPolicy;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addRelationsForTableConstraints;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addViewAllRuleToOrgPolicy;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.migrateServiceTypesAndConnections;

Expand All @@ -22,5 +23,6 @@ public void runDataMigration() {
addViewAllRuleToOrgPolicy(collectionDAO);
addEditGlossaryTermsToDataConsumerPolicy(collectionDAO);
addDisplayNameToCustomProperty(handle, false);
addRelationsForTableConstraints(handle, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addDisplayNameToCustomProperty;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addEditGlossaryTermsToDataConsumerPolicy;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addRelationsForTableConstraints;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.addViewAllRuleToOrgPolicy;
import static org.openmetadata.service.migration.utils.v160.MigrationUtil.migrateServiceTypesAndConnections;

Expand All @@ -22,5 +23,6 @@ public void runDataMigration() {
addViewAllRuleToOrgPolicy(collectionDAO);
addEditGlossaryTermsToDataConsumerPolicy(collectionDAO);
addDisplayNameToCustomProperty(handle, true);
addRelationsForTableConstraints(handle, true);
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
package org.openmetadata.service.migration.utils.v160;

import static org.openmetadata.common.utils.CommonUtil.listOf;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.schema.type.Include.NON_DELETED;
import static org.openmetadata.service.Entity.TABLE;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.policies.Policy;
import org.openmetadata.schema.entity.policies.accessControl.Rule;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.schema.type.Relationship;
import org.openmetadata.schema.type.TableConstraint;
import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.PolicyRepository;
import org.openmetadata.service.jdbi3.TableRepository;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;

@Slf4j
Expand Down Expand Up @@ -114,6 +125,89 @@ public static void addOperationsToPolicyRule(
}
}

public static void addRelationsForTableConstraints(Handle handle, boolean postgresql) {
LOG.info("Starting table constraint relationship migration");
final int batchSize = 1000;
int offset = 0;
String fetchQuery =
"SELECT id, json FROM table_entity "
+ "WHERE JSON_LENGTH(JSON_EXTRACT(json, '$.tableConstraints')) > 0 "
+ "AND JSON_LENGTH(JSON_EXTRACT(json, '$.tableConstraints[*].referredColumns')) > 0 "
+ "LIMIT :limit OFFSET :offset";

if (postgresql) {
fetchQuery =
"SELECT id, json FROM table_entity "
+ "WHERE jsonb_typeof(json->'tableConstraints') = 'array' "
+ "AND jsonb_array_length(json->'tableConstraints') > 0 "
+ "AND EXISTS ("
+ " SELECT 1 FROM jsonb_array_elements(json->'tableConstraints') AS tc "
+ " WHERE jsonb_typeof(tc->'referredColumns') = 'array' "
+ " AND jsonb_array_length(tc->'referredColumns') > 0"
+ ") "
+ "LIMIT :limit OFFSET :offset";
}

TableRepository tableRepository = (TableRepository) Entity.getEntityRepository(TABLE);

while (true) {
List<Map<String, Object>> tables =
handle
.createQuery(fetchQuery)
.bind("limit", batchSize)
.bind("offset", offset)
.mapToMap()
.list();

if (tables.isEmpty()) {
break;
}

for (Map<String, Object> tableRow : tables) {
String tableId = (String) tableRow.get("id");
String json = tableRow.get("json").toString();
try {
Table table = JsonUtils.readValue(json, Table.class);
addConstraintRelationship(table, table.getTableConstraints(), tableRepository);
} catch (Exception e) {
LOG.error("Error processing table ID '{}': {}", tableId, e.getMessage());
}
}

offset += batchSize;
LOG.debug("Processed of table constraint up to offset {}", offset);
}
}

private static void addConstraintRelationship(
Table table, List<TableConstraint> constraints, TableRepository tableRepository) {
if (!nullOrEmpty(constraints)) {
for (TableConstraint constraint : constraints) {
if (!nullOrEmpty(constraint.getReferredColumns())) {
List<EntityReference> relationships =
tableRepository.findTo(table.getId(), TABLE, Relationship.RELATED_TO, TABLE);
Map<UUID, EntityReference> relatedTables = new HashMap<>();
relationships.forEach(r -> relatedTables.put(r.getId(), r));
for (String column : constraint.getReferredColumns()) {
String toParent = FullyQualifiedName.getParentFQN(column);
try {
EntityReference toTable =
Entity.getEntityReferenceByName(TABLE, toParent, NON_DELETED);
if (!relatedTables.containsKey(toTable.getId())) {
tableRepository.addRelationship(
table.getId(), toTable.getId(), TABLE, TABLE, Relationship.RELATED_TO);
}
} catch (EntityNotFoundException e) {
throw EntityNotFoundException.byName(
String.format(
"Failed to add table constraint due to missing table %s", toParent));
}
}
}
}
}
}

public static void migrateServiceTypesAndConnections(Handle handle, boolean postgresql) {
LOG.info("Starting service type and connection type migrations");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,15 @@ static List<Map<String, Object>> populateEntityRelationshipData(Table entity) {
// We need to query the table_entity table to find the references this current table
// has with other tables. We pick this info from the ES however in case of re-indexing this info
// needs to be picked from the db
CollectionDAO dao = Entity.getCollectionDAO();
List<String> json_array =
dao.tableDAO().findRelatedTables(entity.getFullyQualifiedName() + "%");
harshach marked this conversation as resolved.
Show resolved Hide resolved
for (String json : json_array) {
Table foreign_table = JsonUtils.readValue(json, Table.class);
processConstraints(foreign_table, entity, constraints, false);
List<CollectionDAO.EntityRelationshipRecord> relatedTables =
Entity.getCollectionDAO()
.relationshipDAO()
.findFrom(entity.getId(), Entity.TABLE, Relationship.RELATED_TO.ordinal());

for (CollectionDAO.EntityRelationshipRecord table : relatedTables) {
Table foreignTable =
Entity.getEntity(Entity.TABLE, table.getId(), "tableConstraints", NON_DELETED);
processConstraints(foreignTable, entity, constraints, false);
}
return constraints;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class EnumBackwardCompatibilityTest {
/** */
@Test
void testRelationshipEnumBackwardCompatible() {
assertEquals(22, Relationship.values().length);
assertEquals(23, Relationship.values().length);
assertEquals(21, Relationship.DEFAULTS_TO.ordinal());
assertEquals(20, Relationship.EDITED_BY.ordinal());
assertEquals(19, Relationship.EXPERT.ordinal());
Expand Down
Loading
Loading