-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel]Remove two file IO in CRC loading by reusing log listing #4112
base: master
Are you sure you want to change the base?
Conversation
@@ -219,7 +219,7 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter( | |||
endVersionOpt); | |||
|
|||
// Must be final to be used in lambda | |||
final AtomicBoolean hasReturnedAnElement = new AtomicBoolean(false); | |||
final AtomicBoolean hasReturnedlogOrCheckPoint = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hasReturnedCommitOrCheckpoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can add a comment here. Something like: This variable is used to help determine if we should throw an error if the table history is not reconstructable. Only commit and checkpoint files are applicable.
@@ -266,7 +269,11 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter( | |||
} | |||
} | |||
|
|||
hasReturnedAnElement.set(true); | |||
// Only log and checkpoint could use to construct table state. | |||
if (FileNames.isCommitFile(getName(fs.getPath())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized this. isCommitFile
takes in a String path. so does isCheckpointFile
. Want to update all of these occurences?
It can just be : FileNames.isCommitFile(fs.getPath())
@@ -266,7 +269,11 @@ public static CloseableIterator<FileStatus> listDeltaLogFilesAsIter( | |||
} | |||
} | |||
|
|||
hasReturnedAnElement.set(true); | |||
// Only log and checkpoint could use to construct table state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can delete this comment line
@@ -319,10 +323,16 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT | |||
listedFileStatuses, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errr ... we should just totally re-do this logic. For example, listedCheckpointAndDeltaFileStatuses
is an incorrect variable name now since this list now also contains CRC files.
Something like this:
Map<DeltaLogFileType, List<FileStatus>> partitionedFiles = files.stream()
.collect(Collectors.groupingBy(
file -> {
String name = file.getPath().getName();
if (FileNames.isCommitFile(name)) {
return DeltaLogFileType.COMMIT;
} else if (FileNames.isCheckpointFile(name)) {
return DeltaLogFileType.CHECKPOINT;
} else if (FileNames.isCrcFile(name)) {
return DeltaLogFileType.CRC;
} else {
return DeltaLogFileType.OTHER;
}
},
LinkedHashMap::new, // Ensure order is maintained
Collectors.toList()
));
List<FileStatus> commitFiles = partitionedFiles.getOrDefault(DeltaLogFileType.COMMIT, Collections.emptyList());
List<FileStatus> checkpointFiles = partitionedFiles.getOrDefault(DeltaLogFileType.CHECKPOINT, Collections.emptyList());
List<FileStatus> crcFiles = partitionedFiles.getOrDefault(DeltaLogFileType.CRC, Collections.emptyList());
I just
@@ -493,6 +503,15 @@ public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionT | |||
newVersion, | |||
deltasAfterCheckpoint, | |||
latestCompleteCheckpointFileStatuses, | |||
listedChecksumFileStatues.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't it just be the last one?
expJsonVersionsRead = Seq(11), | ||
expParquetVersionsRead = Seq(10), | ||
expParquetReadSetSizes = Seq(1), | ||
expChecksumReadSet = Nil | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the indentation here is all messaged up (not your fault). I think the bracket here needs to be 2 spaces to the left.
I also think the next loadPandMCheckMetrics
is also wrong -- that entire block needs to go to the left.
@@ -263,4 +269,8 @@ private String formatList(List<FileStatus> list) { | |||
return "\n " | |||
+ list.stream().map(FileStatus::toString).collect(Collectors.joining(",\n ")); | |||
} | |||
|
|||
public Optional<FileStatus> getLatestChecksum() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this should be next to the other gettings, and it should be inserted to be in the same order as the parameter in the constructor
@@ -414,11 +415,15 @@ private Map<String, DomainMetadata> loadDomainMetadataMap(Engine engine) { | |||
snapshotHint.map(SnapshotHint::getVersion).orElse(-1L) + 1, | |||
logSegment.getCheckpointVersionOpt().orElse(0L), | |||
// Only find the CRC within 100 versions. | |||
snapshotVersion - 100, | |||
0L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove the 0L? seems like a good safeguard?
@@ -263,4 +269,8 @@ private String formatList(List<FileStatus> list) { | |||
return "\n " | |||
+ list.stream().map(FileStatus::toString).collect(Collectors.joining(",\n ")); | |||
} | |||
|
|||
public Optional<FileStatus> getLatestChecksum() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and I think these needs method docs? This checksum might not be for the version
that this LogSegment represents, right?
also ... suppose we have 10.checkpoint and 11.json and 12.json ...
what are the possible ranges of the version for which this CRC could be for? Could we have 8.crc? what are the guarantees of this version range?
@@ -414,11 +415,15 @@ private Map<String, DomainMetadata> loadDomainMetadataMap(Engine engine) { | |||
snapshotHint.map(SnapshotHint::getVersion).orElse(-1L) + 1, | |||
logSegment.getCheckpointVersionOpt().orElse(0L), | |||
// Only find the CRC within 100 versions. | |||
snapshotVersion - 100, | |||
0L)); | |||
snapshotVersion - 100)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need this crcSearchLowerBound
? we are not even doing a LIST call now?
Which Delta project/connector is this regarding?
Description
Incremental CRC loading for P&M was added in #4077.
For snapshot N, we try to read N.crc, if N.crc missing, we list CRC files and find the one up to N-100. These two file listing operation could be actually avoided, because we have already listed files under _delta_log folder.
This PR collects latest CRC in the first file listing and remove two unnecessary IO.
How was this patch tested?
LogReplay test
Does this PR introduce any user-facing changes?
No