Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(deps): metadata-io - remove parquet dependency #6046

Merged
merged 2 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.parquet.SemanticVersion;
import org.apache.maven.artifact.versioning.ComparableVersion;

import static com.linkedin.datahub.graphql.types.timeline.utils.TimelineUtils.*;

Expand All @@ -45,12 +45,12 @@ public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransaction
truncateSemanticVersion(changeTransactions.get(changeTransactions.size() - 1).getSemVer());

String semanticVersionFilterString = versionCutoff == null ? latestSemanticVersionString : versionCutoff;
Optional<SemanticVersion> semanticVersionFilterOptional = createSemanticVersion(semanticVersionFilterString);
Optional<ComparableVersion> semanticVersionFilterOptional = createSemanticVersion(semanticVersionFilterString);
if (!semanticVersionFilterOptional.isPresent()) {
return result;
}

SemanticVersion semanticVersionFilter = semanticVersionFilterOptional.get();
ComparableVersion semanticVersionFilter = semanticVersionFilterOptional.get();

List<ChangeTransaction> reversedChangeTransactions = changeTransactions.stream()
.map(TimelineUtils::semanticVersionChangeTransactionPair)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@
import com.linkedin.util.Pair;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.parquet.SemanticVersion;
import org.apache.maven.artifact.versioning.ComparableVersion;


@Slf4j
public class TimelineUtils {

public static Optional<Pair<SemanticVersion, ChangeTransaction>> semanticVersionChangeTransactionPair(
public static Optional<Pair<ComparableVersion, ChangeTransaction>> semanticVersionChangeTransactionPair(
ChangeTransaction changeTransaction) {
Optional<SemanticVersion> semanticVersion = createSemanticVersion(changeTransaction.getSemVer());
Optional<ComparableVersion> semanticVersion = createSemanticVersion(changeTransaction.getSemVer());
return semanticVersion.map(version -> Pair.of(version, changeTransaction));
}

public static Optional<SemanticVersion> createSemanticVersion(String semanticVersionString) {
public static Optional<ComparableVersion> createSemanticVersion(String semanticVersionString) {
String truncatedSemanticVersion = truncateSemanticVersion(semanticVersionString);
try {
SemanticVersion semanticVersion = SemanticVersion.parse(truncatedSemanticVersion);
ComparableVersion semanticVersion = new ComparableVersion(truncatedSemanticVersion);
return Optional.of(semanticVersion);
} catch (SemanticVersion.SemanticVersionParseException e) {
} catch (Exception e) {
return Optional.empty();
}
}
Expand Down
1 change: 0 additions & 1 deletion metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ dependencies {
compile externalDependency.datastaxOssNativeProtocol
compile externalDependency.datastaxOssCore
compile externalDependency.datastaxOssQueryBuilder
compile externalDependency.parquet
compile externalDependency.elasticSearchRest
compile externalDependency.elasticSearchTransport
compile externalDependency.javatuples
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.linkedin.metadata.timeline;

import lombok.Builder;
import lombok.Getter;


@Builder
public class SemanticVersion {
@Getter
private int majorVersion;
@Getter
private int minorVersion;
@Getter
private int patchVersion;
@Getter
private String qualifier;

public String toString() {
return String.format(String.format("%d.%d.%d-%s", majorVersion, minorVersion, patchVersion, qualifier));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.linkedin.metadata.timeline.differ.OwnershipDiffer;
import com.linkedin.metadata.timeline.differ.SchemaMetadataDiffer;
import org.apache.commons.collections.CollectionUtils;
import org.apache.parquet.SemanticVersion;

import javax.annotation.Nonnull;
import java.sql.Timestamp;
Expand Down Expand Up @@ -60,7 +59,7 @@ public class TimelineServiceImpl implements TimelineService {
private static final long DEFAULT_LOOKBACK_TIME_WINDOW_MILLIS = 7 * 24 * 60 * 60 * 1000L; // 1 week lookback
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long FIRST_TRANSACTION_ID = 0;
private static final String BUILD_VALUE_COMPUTED = "-computed";
private static final String BUILD_VALUE_COMPUTED = "computed";

private final AspectDao _aspectDao;
private final AspectDifferFactory _diffFactory;
Expand Down Expand Up @@ -89,12 +88,12 @@ public TimelineServiceImpl(@Nonnull AspectDao aspectDao, @Nonnull EntityRegistry
aspects.add(GLOBAL_TAGS_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, GLOBAL_TAGS_ASPECT_NAME, new GlobalTagsDiffer());
}
break;
break;
case OWNER: {
aspects.add(OWNERSHIP_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, OWNERSHIP_ASPECT_NAME, new OwnershipDiffer());
}
break;
break;
case DOCUMENTATION: {
aspects.add(INSTITUTIONAL_MEMORY_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, INSTITUTIONAL_MEMORY_ASPECT_NAME,
Expand All @@ -111,20 +110,20 @@ public TimelineServiceImpl(@Nonnull AspectDao aspectDao, @Nonnull EntityRegistry
aspects.add(SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, SCHEMA_METADATA_ASPECT_NAME, new SchemaMetadataDiffer());
}
break;
break;
case GLOSSARY_TERM: {
aspects.add(GLOSSARY_TERMS_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, GLOSSARY_TERMS_ASPECT_NAME, new GlossaryTermsDiffer());
aspects.add(EDITABLE_SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, EDITABLE_SCHEMA_METADATA_ASPECT_NAME,
new EditableSchemaMetadataDiffer());
}
break;
break;
case TECHNICAL_SCHEMA: {
aspects.add(SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, SCHEMA_METADATA_ASPECT_NAME, new SchemaMetadataDiffer());
}
break;
break;
default:
break;
}
Expand Down Expand Up @@ -164,21 +163,25 @@ public List<ChangeTransaction> getTimeline(@Nonnull final Urn urn, @Nonnull fina
startTimeMillis = endTimeMillis - DEFAULT_LOOKBACK_TIME_WINDOW_MILLIS;
}

// Pull full list of aspects for entity and filter timeseries aspects for range query
// Pull full list of aspects for entity and filter timeseries aspects for range
// query
EntitySpec entitySpec = _entityRegistry.getEntitySpec(urn.getEntityType());
List<AspectSpec> aspectSpecs = entitySpec.getAspectSpecs();
Set<String> fullAspectNames = aspectSpecs.stream()
.filter(aspectSpec -> !aspectSpec.isTimeseries())
.map(AspectSpec::getName)
.collect(Collectors.toSet());
List<EntityAspect> aspectsInRange = this._aspectDao.getAspectsInRange(urn, fullAspectNames, startTimeMillis, endTimeMillis);
List<EntityAspect> aspectsInRange = this._aspectDao.getAspectsInRange(urn, fullAspectNames, startTimeMillis,
endTimeMillis);

// Prepopulate with all versioned aspectNames -> ignore timeseries using registry
// Prepopulate with all versioned aspectNames -> ignore timeseries using
// registry
Map<String, TreeSet<EntityAspect>> aspectRowSetMap = constructAspectRowSetMap(urn, fullAspectNames, aspectsInRange);

Map<Long, SortedMap<String, Long>> timestampVersionCache = constructTimestampVersionCache(aspectRowSetMap);

// TODO: There are some extra steps happening here, we need to clean up how transactions get combined across differs
// TODO: There are some extra steps happening here, we need to clean up how
// transactions get combined across differs
SortedMap<Long, List<ChangeTransaction>> semanticDiffs = aspectRowSetMap.entrySet()
.stream()
.filter(entry -> aspectNames.contains(entry.getKey()))
Expand All @@ -187,28 +190,31 @@ public List<ChangeTransaction> getTimeline(@Nonnull final Urn urn, @Nonnull fina
.collect(TreeMap::new, this::combineComputedDiffsPerTransactionId, this::combineComputedDiffsPerTransactionId);
// TODO:Move this down
assignSemanticVersions(semanticDiffs);
List<ChangeTransaction> changeTransactions =
semanticDiffs.values().stream().collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);
List<ChangeTransaction> changeTransactions = semanticDiffs.values().stream().collect(ArrayList::new,
ArrayList::addAll, ArrayList::addAll);
List<ChangeTransaction> combinedChangeTransactions = combineTransactionsByTimestamp(changeTransactions,
timestampVersionCache);
combinedChangeTransactions.sort(Comparator.comparing(ChangeTransaction::getTimestamp));
return combinedChangeTransactions;
}

/**
* Constructs a map from aspect name to a sorted set of DB aspects by created timestamp. Set includes all aspects
* relevant to an entity and does a lookback by 1 for all aspects, creating sentinel values for when the oldest aspect
* Constructs a map from aspect name to a sorted set of DB aspects by created
* timestamp. Set includes all aspects
* relevant to an entity and does a lookback by 1 for all aspects, creating
* sentinel values for when the oldest aspect
* possible has been retrieved or no value exists in the DB for an aspect
* @param urn urn of the entity
*
* @param urn urn of the entity
* @param fullAspectNames full list of aspects relevant to the entity
* @param aspectsInRange aspects returned by the range query by timestampm
* @param aspectsInRange aspects returned by the range query by timestampm
* @return map constructed as described
*/
private Map<String, TreeSet<EntityAspect>> constructAspectRowSetMap(Urn urn, Set<String> fullAspectNames,
List<EntityAspect> aspectsInRange) {
Map<String, TreeSet<EntityAspect>> aspectRowSetMap = new HashMap<>();
fullAspectNames.forEach(aspectName ->
aspectRowSetMap.put(aspectName, new TreeSet<>(Comparator.comparing(EntityAspect::getCreatedOn))));
fullAspectNames.forEach(
aspectName -> aspectRowSetMap.put(aspectName, new TreeSet<>(Comparator.comparing(EntityAspect::getCreatedOn))));
aspectsInRange.forEach(row -> {
TreeSet<EntityAspect> rowList = aspectRowSetMap.get(row.getAspect());
rowList.add(row);
Expand All @@ -225,7 +231,8 @@ private Map<String, TreeSet<EntityAspect>> constructAspectRowSetMap(Urn urn, Set
oldestAspect = aspectMinVersion.getValue().first();
}
Long nextVersion = nextVersions.get(aspectMinVersion.getKey());
// Fill out sentinel value if the oldest value possible has been retrieved, else get previous version prior to time range
// Fill out sentinel value if the oldest value possible has been retrieved, else
// get previous version prior to time range
if (oldestAspect != null && isOldestPossible(oldestAspect, nextVersion)) {
aspectMinVersion.getValue().add(createSentinel(aspectMinVersion.getKey()));
} else {
Expand Down Expand Up @@ -260,11 +267,15 @@ private MissingEntityAspect createSentinel(String aspectName) {
}

/**
* Constructs a map from timestamp to a sorted map of aspect name -> version for use in constructing the version stamp
* @param aspectRowSetMap map constructed as described in {@link TimelineServiceImpl#constructAspectRowSetMap}
* Constructs a map from timestamp to a sorted map of aspect name -> version for
* use in constructing the version stamp
*
* @param aspectRowSetMap map constructed as described in
* {@link TimelineServiceImpl#constructAspectRowSetMap}
* @return map as described
*/
private Map<Long, SortedMap<String, Long>> constructTimestampVersionCache(Map<String, TreeSet<EntityAspect>> aspectRowSetMap) {
private Map<Long, SortedMap<String, Long>> constructTimestampVersionCache(
Map<String, TreeSet<EntityAspect>> aspectRowSetMap) {
Set<EntityAspect> aspects = aspectRowSetMap.values().stream()
.flatMap(TreeSet::stream)
.filter(aspect -> aspect.getVersion() != -1L)
Expand All @@ -277,7 +288,7 @@ private Map<Long, SortedMap<String, Long>> constructTimestampVersionCache(Map<St
SortedMap<String, Long> versionStampMap = new TreeMap<>(Comparator.naturalOrder());
for (TreeSet<EntityAspect> aspectSet : aspectRowSetMap.values()) {
EntityAspect maybeMatch = null;
for (EntityAspect aspect2 : aspectSet) {
for (EntityAspect aspect2 : aspectSet) {
if (aspect2 instanceof MissingEntityAspect) {
continue;
}
Expand Down Expand Up @@ -380,8 +391,8 @@ private void assignSemanticVersions(SortedMap<Long, List<ChangeTransaction>> cha
assert (transactionId < entry.getKey());
transactionId = entry.getKey();
SemanticChangeType highestChangeInGroup = SemanticChangeType.NONE;
ChangeTransaction highestChangeTransaction =
entry.getValue().stream().max(Comparator.comparing(ChangeTransaction::getSemVerChange)).orElse(null);
ChangeTransaction highestChangeTransaction = entry.getValue().stream()
.max(Comparator.comparing(ChangeTransaction::getSemVerChange)).orElse(null);
if (highestChangeTransaction != null) {
highestChangeInGroup = highestChangeTransaction.getSemVerChange();
}
Expand All @@ -396,27 +407,47 @@ private SemanticVersion getGroupSemanticVersion(SemanticChangeType highestChange
SemanticVersion previousVersion) {
if (previousVersion == null) {
// Start with all 0s if there is no previous version.
return new SemanticVersion(0, 0, 0, null, null, BUILD_VALUE_COMPUTED);
return SemanticVersion.builder()
.majorVersion(0)
.minorVersion(0)
.patchVersion(0)
.qualifier(BUILD_VALUE_COMPUTED)
.build();
}
// Evaluate the version for this group based on previous version and the hightest semantic change type in the group.
// Evaluate the version for this group based on previous version and the
// hightest semantic change type in the group.
if (highestChangeInGroup == SemanticChangeType.MAJOR) {
// Bump up major, reset all lower to 0s.
return new SemanticVersion(previousVersion.major + 1, 0, 0, null, null, BUILD_VALUE_COMPUTED);
return SemanticVersion.builder()
.majorVersion(previousVersion.getMajorVersion() + 1)
.minorVersion(0)
.patchVersion(0)
.qualifier(BUILD_VALUE_COMPUTED)
.build();
} else if (highestChangeInGroup == SemanticChangeType.MINOR) {
// Bump up minor, reset all lower to 0s.
return new SemanticVersion(previousVersion.major, previousVersion.minor + 1, 0, null, null, BUILD_VALUE_COMPUTED);
return SemanticVersion.builder()
.majorVersion(previousVersion.getMajorVersion())
.minorVersion(previousVersion.getMinorVersion() + 1)
.patchVersion(0)
.qualifier(BUILD_VALUE_COMPUTED)
.build();
} else if (highestChangeInGroup == SemanticChangeType.PATCH) {
// Bump up patch.
return new SemanticVersion(previousVersion.major, previousVersion.minor, previousVersion.patch + 1, null, null,
BUILD_VALUE_COMPUTED);
return SemanticVersion.builder()
.majorVersion(previousVersion.getMajorVersion())
.minorVersion(previousVersion.getMinorVersion())
.patchVersion(previousVersion.getPatchVersion() + 1)
.qualifier(BUILD_VALUE_COMPUTED)
.build();
}
return previousVersion;
}

private List<ChangeTransaction> combineTransactionsByTimestamp(List<ChangeTransaction> changeTransactions, Map<Long,
SortedMap<String, Long>> timestampVersionCache) {
Map<Long, List<ChangeTransaction>> transactionsByTimestamp =
changeTransactions.stream().collect(Collectors.groupingBy(ChangeTransaction::getTimestamp));
private List<ChangeTransaction> combineTransactionsByTimestamp(List<ChangeTransaction> changeTransactions,
Map<Long, SortedMap<String, Long>> timestampVersionCache) {
Map<Long, List<ChangeTransaction>> transactionsByTimestamp = changeTransactions.stream()
.collect(Collectors.groupingBy(ChangeTransaction::getTimestamp));
List<ChangeTransaction> combinedChangeTransactions = new ArrayList<>();
for (List<ChangeTransaction> transactionList : transactionsByTimestamp.values()) {
if (!transactionList.isEmpty()) {
Expand All @@ -426,9 +457,9 @@ private List<ChangeTransaction> combineTransactionsByTimestamp(List<ChangeTransa
for (int i = 1; i < transactionList.size(); i++) {
ChangeTransaction element = transactionList.get(i);
result.getChangeEvents().addAll(element.getChangeEvents());
maxSemanticChangeType =
maxSemanticChangeType.compareTo(element.getSemVerChange()) >= 0 ? maxSemanticChangeType
: element.getSemVerChange();
maxSemanticChangeType = maxSemanticChangeType.compareTo(element.getSemVerChange()) >= 0
? maxSemanticChangeType
: element.getSemVerChange();
maxSemVer = maxSemVer.compareTo(element.getSemVer()) >= 0 ? maxSemVer : element.getSemVer();
}
result.setSemVerChange(maxSemanticChangeType);
Expand Down