Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1835][CIP-8] Add tier writer base and memory tier writer #3065

Closed
wants to merge 4 commits into from

Conversation

FMX
Copy link
Contributor

@FMX FMX commented Jan 15, 2025

What changes were proposed in this pull request?

  1. Add tier writer base.
  2. Add memory tier writer.
  3. Add corresponding UTs.

Code coverage report:
截屏2025-01-15 17 53 23

NOTE:
Evict test needs file tier writer, so the branch coverage is not 100% in the memory tier.

Why are the changes needed?

Refactor partition data writer for CIP-8.

Does this PR introduce any user-facing change?

NO.

How was this patch tested?

GA.

@FMX FMX requested a review from RexXiong January 15, 2025 09:54
}
waitTime -= WAIT_INTERVAL_MS
}
if (counter.get > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numPendingWrites can bypass counter.get>0

flushBuffer = inputBuffer
}

def close(evict: Boolean = false): Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we synchronized when close?

metaHandler.beforeWrite(buf)
ensureNotClosed()
writerInternal(buf)
updateMemoryMetric(buf.readableBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move updateMemoryMetric to MemoryTierWriter

val dupBuf = flushBuffer.retainedDuplicate()
// this flusher buffer is from memory tier writer, so that we can not keep the buffer
flushTask = genFlushTask(finalFlush, false)
MemoryManager.instance.releaseMemoryFileStorage(numBytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to MemoryTierWriter, releaseMemoryFileStorage and incrementDiskBuffer after evict finished

storageManager) {

override def needEvict(): Boolean = {
flushBuffer.readableBytes() > memoryFileStorageMaxFileSize && storageManager.localOrDfsStorageAvailable()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lazy val for storageManager.localOrDfsStorageAvailable()


protected def writerInternal(buf: ByteBuf): Unit

def updateMemoryMetric(numBytes: Int): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateMemoryMetric can be removed.

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, merge to main(v0.6.0)

@RexXiong RexXiong closed this in a77a64b Jan 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants