Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming writes flush and sync flow #2856

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 70 additions & 27 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,56 +1108,99 @@ func (fs *fileSystem) lookUpOrCreateChildDirInode(
return child, nil
}

// Synchronize the supplied file inode to GCS, updating the index as
// promoteToGenerationBacked updates the file system maps for the given file inode
// after it has been synced to GCS.
// The inode is removed from the localFileInodes map and added to the
// generationBackedInodes map.
//
// LOCKS_EXCLUDED(fs.mu)
// LOCKS_REQUIRED(f)
func (fs *fileSystem) promoteToGenerationBacked(f *inode.FileInode) {
fs.mu.Lock()
delete(fs.localFileInodes, f.Name())
if _, ok := fs.generationBackedInodes[f.Name()]; !ok {
fs.generationBackedInodes[f.Name()] = f
}
fs.mu.Unlock()

// We need not update fileIndex:
//
// We've held the inode lock the whole time, so there's no way that this
// inode could have been booted from the index. Therefore, if it's not in the
// index at the moment, it must not have been in there when we started. That
// is, it must have been clobbered remotely.
//
// In other words, either this inode is still in the index or it has been
// clobbered and *should* be anonymous.
}

// Flushes the supplied file inode to GCS, updating the index as
// appropriate.
//
// LOCKS_EXCLUDED(fs.mu)
// LOCKS_REQUIRED(f)
func (fs *fileSystem) syncFile(
func (fs *fileSystem) flushFile(
ctx context.Context,
f *inode.FileInode) (err error) {
f *inode.FileInode) error {
// SyncFile can be triggered for unlinked files if the fileHandle is open by
// same or another user. This indicates a potential file clobbering scenario:
// - The file was deleted (unlinked) while a handle to it was still open.
if f.IsLocal() && f.IsUnlinked() {
err = &gcsfuse_errors.FileClobberedError{
return &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("file %s was unlinked while it was still open, indicating file clobbering", f.Name().LocalName()),
}
return
}

// Sync the inode.
err = f.Sync(ctx)
// Flush the inode.
err := f.Flush(ctx)
if err != nil {
err = fmt.Errorf("FileInode.Sync: %w", err)
// If the inode was local file inode, treat it as unlinked.
fs.mu.Lock()
delete(fs.localFileInodes, f.Name())
fs.mu.Unlock()
return
return err
}

// Once the inode is synced to GCS, it is no longer an localFileInode.
// Delete the entry from localFileInodes map and add it to generationBackedInodes.
fs.mu.Lock()
delete(fs.localFileInodes, f.Name())
_, ok := fs.generationBackedInodes[f.Name()]
if !ok {
fs.generationBackedInodes[f.Name()] = f
// Promote the inode to generationBackedInodes in fs maps.
fs.promoteToGenerationBacked(f)
return nil
}

// Synchronizes the supplied file inode to GCS, updating the index as
// appropriate.
//
// LOCKS_EXCLUDED(fs.mu)
// LOCKS_REQUIRED(f)
func (fs *fileSystem) syncFile(
ctx context.Context,
f *inode.FileInode) error {
// SyncFile can be triggered for unlinked files if the fileHandle is open by
// same or another user. This indicates a potential file clobbering scenario:
// - The file was deleted (unlinked) while a handle to it was still open.
if f.IsLocal() && f.IsUnlinked() {
return &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("file %s was unlinked while it was still open, indicating file clobbering", f.Name().LocalName()),
}
}
fs.mu.Unlock()

// We need not update fileIndex:
//
// We've held the inode lock the whole time, so there's no way that this
// inode could have been booted from the index. Therefore, if it's not in the
// index at the moment, it must not have been in there when we started. That
// is, it must have been clobbered remotely.
//
// In other words, either this inode is still in the index or it has been
// clobbered and *should* be anonymous.
// Sync the inode.
gcsSynced, err := f.Sync(ctx)
if err != nil {
err = fmt.Errorf("FileInode.Sync: %w", err)
// If the inode was local file inode, treat it as unlinked.
fs.mu.Lock()
delete(fs.localFileInodes, f.Name())
fs.mu.Unlock()
return err
}

return
// If gcsSynced is true, it means the inode was fully synced to GCS In this
// case, we need to promote the inode to generationBackedInodes in fs maps.
if gcsSynced {
fs.promoteToGenerationBacked(f)
}
return nil
}

// Decrement the supplied inode's lookup count, destroying it if the inode says
Expand Down Expand Up @@ -2546,7 +2589,7 @@ func (fs *fileSystem) FlushFile(
defer in.Unlock()

// Sync it.
if err := fs.syncFile(ctx, in); err != nil {
if err := fs.flushFile(ctx, in); err != nil {
return err
}

Expand Down
59 changes: 52 additions & 7 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func (f *FileInode) writeUsingTempFile(ctx context.Context, data []byte, offset
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) writeUsingBufferedWrites(ctx context.Context, data []byte, offset int64) error {
err := f.bwh.Write(data, offset)
if err == bufferedwrites.ErrOutOfOrderWrite || err == bufferedwrites.ErrUploadFailure {
if errors.Is(err, bufferedwrites.ErrOutOfOrderWrite) || errors.Is(err, bufferedwrites.ErrUploadFailure) {
// Finalize the object.
flushErr := f.flushUsingBufferedWriteHandler()
if flushErr != nil {
Expand Down Expand Up @@ -729,20 +729,42 @@ func (f *FileInode) fetchLatestGcsObject(ctx context.Context) (*gcs.Object, erro
return latestGcsObj, err
}

// Sync writes out contents to GCS. If this fails due to the generation having been
// clobbered, failure is propagated back to the calling function as an error.
// Sync writes out contents to GCS. If this fails due to the generation
// having been clobbered, failure is propagated back to the calling
// function as an error.
//
// After this method succeeds, SourceGeneration will return the new generation
// by which this inode should be known (which may be the same as before). If it
// For buffered writes, this method only waits for any partial buffers to be
// uploaded to GCS. It does not guarantee that the entire contents of the file
// have been persisted.
//
// For non-buffered writes, this method writes the entire contents to GCS.
// If this method succeeds, SourceGeneration will return the new generation by
// which this inode should be known (which may be the same as before). If it
// fails, the generation will not change.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Sync(ctx context.Context) (err error) {
func (f *FileInode) Sync(ctx context.Context) (gcsSynced bool, err error) {
// If we have not been dirtied, there is nothing to do.
if f.content == nil {
if f.content == nil && f.bwh == nil {
return
}

if f.bwh != nil {
// bwh.Sync does not finalize the upload, so return gcsSynced as false.
return false, f.bwh.Sync()
}
err = f.syncUsingContent(ctx)
if err != nil {
return false, err
}
return true, nil
}

// syncUsingContent syncs the inode content to GCS. It fetches the latest GCS
// object, syncs the content and updates the inode state.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) syncUsingContent(ctx context.Context) (err error) {
latestGcsObj, err := f.fetchLatestGcsObject(ctx)
if err != nil {
return
Expand Down Expand Up @@ -772,6 +794,29 @@ func (f *FileInode) Sync(ctx context.Context) (err error) {
return
}

// Flush writes out contents to GCS. If this fails due to the generation
// having been clobbered, failure is propagated back to the calling
// function as an error.
//
// After this method succeeds, SourceGeneration will return the new generation
// by which this inode should be known (which may be the same as before). If it
// fails, the generation will not change.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Flush(ctx context.Context) (err error) {
// If we have not been dirtied, there is nothing to do.
if f.content == nil && f.bwh == nil {
return
}

// Flush using the appropriate method based on whether we're using a
// buffered write handler.
if f.bwh != nil {
return f.flushUsingBufferedWriteHandler()
}
return f.syncUsingContent(ctx)
}

func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
Expand Down
Loading
Loading