Skip to content

Commit

Permalink
make checkstyle happy
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Sep 26, 2022
1 parent 4f6f9a7 commit d97c450
Showing 1 changed file with 48 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ public TimelineServiceImpl(@Nonnull AspectDao aspectDao, @Nonnull EntityRegistry
aspects.add(GLOBAL_TAGS_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, GLOBAL_TAGS_ASPECT_NAME, new GlobalTagsDiffer());
}
break;
break;
case OWNER: {
aspects.add(OWNERSHIP_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, OWNERSHIP_ASPECT_NAME, new OwnershipDiffer());
}
break;
break;
case DOCUMENTATION: {
aspects.add(INSTITUTIONAL_MEMORY_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, INSTITUTIONAL_MEMORY_ASPECT_NAME,
Expand All @@ -110,20 +110,20 @@ public TimelineServiceImpl(@Nonnull AspectDao aspectDao, @Nonnull EntityRegistry
aspects.add(SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, SCHEMA_METADATA_ASPECT_NAME, new SchemaMetadataDiffer());
}
break;
break;
case GLOSSARY_TERM: {
aspects.add(GLOSSARY_TERMS_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, GLOSSARY_TERMS_ASPECT_NAME, new GlossaryTermsDiffer());
aspects.add(EDITABLE_SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, EDITABLE_SCHEMA_METADATA_ASPECT_NAME,
new EditableSchemaMetadataDiffer());
}
break;
break;
case TECHNICAL_SCHEMA: {
aspects.add(SCHEMA_METADATA_ASPECT_NAME);
_diffFactory.addDiffer(entityType, elementName, SCHEMA_METADATA_ASPECT_NAME, new SchemaMetadataDiffer());
}
break;
break;
default:
break;
}
Expand Down Expand Up @@ -163,21 +163,25 @@ public List<ChangeTransaction> getTimeline(@Nonnull final Urn urn, @Nonnull fina
startTimeMillis = endTimeMillis - DEFAULT_LOOKBACK_TIME_WINDOW_MILLIS;
}

// Pull full list of aspects for entity and filter timeseries aspects for range query
// Pull full list of aspects for entity and filter timeseries aspects for range
// query
EntitySpec entitySpec = _entityRegistry.getEntitySpec(urn.getEntityType());
List<AspectSpec> aspectSpecs = entitySpec.getAspectSpecs();
Set<String> fullAspectNames = aspectSpecs.stream()
.filter(aspectSpec -> !aspectSpec.isTimeseries())
.map(AspectSpec::getName)
.collect(Collectors.toSet());
List<EntityAspect> aspectsInRange = this._aspectDao.getAspectsInRange(urn, fullAspectNames, startTimeMillis, endTimeMillis);
List<EntityAspect> aspectsInRange = this._aspectDao.getAspectsInRange(urn, fullAspectNames, startTimeMillis,
endTimeMillis);

// Prepopulate with all versioned aspectNames -> ignore timeseries using registry
// Prepopulate with all versioned aspectNames -> ignore timeseries using
// registry
Map<String, TreeSet<EntityAspect>> aspectRowSetMap = constructAspectRowSetMap(urn, fullAspectNames, aspectsInRange);

Map<Long, SortedMap<String, Long>> timestampVersionCache = constructTimestampVersionCache(aspectRowSetMap);

// TODO: There are some extra steps happening here, we need to clean up how transactions get combined across differs
// TODO: There are some extra steps happening here, we need to clean up how
// transactions get combined across differs
SortedMap<Long, List<ChangeTransaction>> semanticDiffs = aspectRowSetMap.entrySet()
.stream()
.filter(entry -> aspectNames.contains(entry.getKey()))
Expand All @@ -186,28 +190,31 @@ public List<ChangeTransaction> getTimeline(@Nonnull final Urn urn, @Nonnull fina
.collect(TreeMap::new, this::combineComputedDiffsPerTransactionId, this::combineComputedDiffsPerTransactionId);
// TODO:Move this down
assignSemanticVersions(semanticDiffs);
List<ChangeTransaction> changeTransactions =
semanticDiffs.values().stream().collect(ArrayList::new, ArrayList::addAll, ArrayList::addAll);
List<ChangeTransaction> changeTransactions = semanticDiffs.values().stream().collect(ArrayList::new,
ArrayList::addAll, ArrayList::addAll);
List<ChangeTransaction> combinedChangeTransactions = combineTransactionsByTimestamp(changeTransactions,
timestampVersionCache);
combinedChangeTransactions.sort(Comparator.comparing(ChangeTransaction::getTimestamp));
return combinedChangeTransactions;
}

/**
* Constructs a map from aspect name to a sorted set of DB aspects by created timestamp. Set includes all aspects
* relevant to an entity and does a lookback by 1 for all aspects, creating sentinel values for when the oldest aspect
* Constructs a map from aspect name to a sorted set of DB aspects by created
* timestamp. Set includes all aspects
* relevant to an entity and does a lookback by 1 for all aspects, creating
* sentinel values for when the oldest aspect
* possible has been retrieved or no value exists in the DB for an aspect
* @param urn urn of the entity
*
* @param urn urn of the entity
* @param fullAspectNames full list of aspects relevant to the entity
* @param aspectsInRange aspects returned by the range query by timestampm
* @param aspectsInRange aspects returned by the range query by timestampm
* @return map constructed as described
*/
private Map<String, TreeSet<EntityAspect>> constructAspectRowSetMap(Urn urn, Set<String> fullAspectNames,
List<EntityAspect> aspectsInRange) {
Map<String, TreeSet<EntityAspect>> aspectRowSetMap = new HashMap<>();
fullAspectNames.forEach(aspectName ->
aspectRowSetMap.put(aspectName, new TreeSet<>(Comparator.comparing(EntityAspect::getCreatedOn))));
fullAspectNames.forEach(
aspectName -> aspectRowSetMap.put(aspectName, new TreeSet<>(Comparator.comparing(EntityAspect::getCreatedOn))));
aspectsInRange.forEach(row -> {
TreeSet<EntityAspect> rowList = aspectRowSetMap.get(row.getAspect());
rowList.add(row);
Expand All @@ -224,7 +231,8 @@ private Map<String, TreeSet<EntityAspect>> constructAspectRowSetMap(Urn urn, Set
oldestAspect = aspectMinVersion.getValue().first();
}
Long nextVersion = nextVersions.get(aspectMinVersion.getKey());
// Fill out sentinel value if the oldest value possible has been retrieved, else get previous version prior to time range
// Fill out sentinel value if the oldest value possible has been retrieved, else
// get previous version prior to time range
if (oldestAspect != null && isOldestPossible(oldestAspect, nextVersion)) {
aspectMinVersion.getValue().add(createSentinel(aspectMinVersion.getKey()));
} else {
Expand Down Expand Up @@ -259,11 +267,15 @@ private MissingEntityAspect createSentinel(String aspectName) {
}

/**
* Constructs a map from timestamp to a sorted map of aspect name -> version for use in constructing the version stamp
* @param aspectRowSetMap map constructed as described in {@link TimelineServiceImpl#constructAspectRowSetMap}
* Constructs a map from timestamp to a sorted map of aspect name -> version for
* use in constructing the version stamp
*
* @param aspectRowSetMap map constructed as described in
* {@link TimelineServiceImpl#constructAspectRowSetMap}
* @return map as described
*/
private Map<Long, SortedMap<String, Long>> constructTimestampVersionCache(Map<String, TreeSet<EntityAspect>> aspectRowSetMap) {
private Map<Long, SortedMap<String, Long>> constructTimestampVersionCache(
Map<String, TreeSet<EntityAspect>> aspectRowSetMap) {
Set<EntityAspect> aspects = aspectRowSetMap.values().stream()
.flatMap(TreeSet::stream)
.filter(aspect -> aspect.getVersion() != -1L)
Expand All @@ -276,7 +288,7 @@ private Map<Long, SortedMap<String, Long>> constructTimestampVersionCache(Map<St
SortedMap<String, Long> versionStampMap = new TreeMap<>(Comparator.naturalOrder());
for (TreeSet<EntityAspect> aspectSet : aspectRowSetMap.values()) {
EntityAspect maybeMatch = null;
for (EntityAspect aspect2 : aspectSet) {
for (EntityAspect aspect2 : aspectSet) {
if (aspect2 instanceof MissingEntityAspect) {
continue;
}
Expand Down Expand Up @@ -379,8 +391,8 @@ private void assignSemanticVersions(SortedMap<Long, List<ChangeTransaction>> cha
assert (transactionId < entry.getKey());
transactionId = entry.getKey();
SemanticChangeType highestChangeInGroup = SemanticChangeType.NONE;
ChangeTransaction highestChangeTransaction =
entry.getValue().stream().max(Comparator.comparing(ChangeTransaction::getSemVerChange)).orElse(null);
ChangeTransaction highestChangeTransaction = entry.getValue().stream()
.max(Comparator.comparing(ChangeTransaction::getSemVerChange)).orElse(null);
if (highestChangeTransaction != null) {
highestChangeInGroup = highestChangeTransaction.getSemVerChange();
}
Expand All @@ -402,11 +414,12 @@ private SemanticVersion getGroupSemanticVersion(SemanticChangeType highestChange
.qualifier(BUILD_VALUE_COMPUTED)
.build();
}
// Evaluate the version for this group based on previous version and the hightest semantic change type in the group.
// Evaluate the version for this group based on previous version and the
// hightest semantic change type in the group.
if (highestChangeInGroup == SemanticChangeType.MAJOR) {
// Bump up major, reset all lower to 0s.
return SemanticVersion.builder()
.majorVersion(previousVersion.getMajorVersion()+1)
.majorVersion(previousVersion.getMajorVersion() + 1)
.minorVersion(0)
.patchVersion(0)
.qualifier(BUILD_VALUE_COMPUTED)
Expand All @@ -415,7 +428,7 @@ private SemanticVersion getGroupSemanticVersion(SemanticChangeType highestChange
// Bump up minor, reset all lower to 0s.
return SemanticVersion.builder()
.majorVersion(previousVersion.getMajorVersion())
.minorVersion(previousVersion.getMinorVersion()+1)
.minorVersion(previousVersion.getMinorVersion() + 1)
.patchVersion(0)
.qualifier(BUILD_VALUE_COMPUTED)
.build();
Expand All @@ -424,17 +437,17 @@ private SemanticVersion getGroupSemanticVersion(SemanticChangeType highestChange
return SemanticVersion.builder()
.majorVersion(previousVersion.getMajorVersion())
.minorVersion(previousVersion.getMinorVersion())
.patchVersion(previousVersion.getPatchVersion()+1)
.patchVersion(previousVersion.getPatchVersion() + 1)
.qualifier(BUILD_VALUE_COMPUTED)
.build();
}
return previousVersion;
}

private List<ChangeTransaction> combineTransactionsByTimestamp(List<ChangeTransaction> changeTransactions, Map<Long,
SortedMap<String, Long>> timestampVersionCache) {
Map<Long, List<ChangeTransaction>> transactionsByTimestamp =
changeTransactions.stream().collect(Collectors.groupingBy(ChangeTransaction::getTimestamp));
private List<ChangeTransaction> combineTransactionsByTimestamp(List<ChangeTransaction> changeTransactions,
Map<Long, SortedMap<String, Long>> timestampVersionCache) {
Map<Long, List<ChangeTransaction>> transactionsByTimestamp = changeTransactions.stream()
.collect(Collectors.groupingBy(ChangeTransaction::getTimestamp));
List<ChangeTransaction> combinedChangeTransactions = new ArrayList<>();
for (List<ChangeTransaction> transactionList : transactionsByTimestamp.values()) {
if (!transactionList.isEmpty()) {
Expand All @@ -444,9 +457,9 @@ private List<ChangeTransaction> combineTransactionsByTimestamp(List<ChangeTransa
for (int i = 1; i < transactionList.size(); i++) {
ChangeTransaction element = transactionList.get(i);
result.getChangeEvents().addAll(element.getChangeEvents());
maxSemanticChangeType =
maxSemanticChangeType.compareTo(element.getSemVerChange()) >= 0 ? maxSemanticChangeType
: element.getSemVerChange();
maxSemanticChangeType = maxSemanticChangeType.compareTo(element.getSemVerChange()) >= 0
? maxSemanticChangeType
: element.getSemVerChange();
maxSemVer = maxSemVer.compareTo(element.getSemVer()) >= 0 ? maxSemVer : element.getSemVer();
}
result.setSemVerChange(maxSemanticChangeType);
Expand Down

0 comments on commit d97c450

Please sign in to comment.