Skip to content

Commit

Permalink
code changes to separate out flush and sync flow (#2856)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashmeenkaur authored Jan 6, 2025
1 parent 52632e8 commit edc5e7b
Show file tree
Hide file tree
Showing 4 changed files with 735 additions and 265 deletions.
97 changes: 70 additions & 27 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1108,56 +1108,99 @@ func (fs *fileSystem) lookUpOrCreateChildDirInode(
return child, nil
}

// Synchronize the supplied file inode to GCS, updating the index as
// promoteToGenerationBacked updates the file system maps for the given file inode
// after it has been synced to GCS.
// The inode is removed from the localFileInodes map and added to the
// generationBackedInodes map.
//
// LOCKS_EXCLUDED(fs.mu)
// LOCKS_REQUIRED(f)
func (fs *fileSystem) promoteToGenerationBacked(f *inode.FileInode) {
fs.mu.Lock()
delete(fs.localFileInodes, f.Name())
if _, ok := fs.generationBackedInodes[f.Name()]; !ok {
fs.generationBackedInodes[f.Name()] = f
}
fs.mu.Unlock()

// We need not update fileIndex:
//
// We've held the inode lock the whole time, so there's no way that this
// inode could have been booted from the index. Therefore, if it's not in the
// index at the moment, it must not have been in there when we started. That
// is, it must have been clobbered remotely.
//
// In other words, either this inode is still in the index or it has been
// clobbered and *should* be anonymous.
}

// Flushes the supplied file inode to GCS, updating the index as
// appropriate.
//
// LOCKS_EXCLUDED(fs.mu)
// LOCKS_REQUIRED(f)
func (fs *fileSystem) syncFile(
func (fs *fileSystem) flushFile(
ctx context.Context,
f *inode.FileInode) (err error) {
f *inode.FileInode) error {
// SyncFile can be triggered for unlinked files if the fileHandle is open by
// same or another user. This indicates a potential file clobbering scenario:
// - The file was deleted (unlinked) while a handle to it was still open.
if f.IsLocal() && f.IsUnlinked() {
err = &gcsfuse_errors.FileClobberedError{
return &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("file %s was unlinked while it was still open, indicating file clobbering", f.Name().LocalName()),
}
return
}

// Sync the inode.
err = f.Sync(ctx)
// Flush the inode.
err := f.Flush(ctx)
if err != nil {
err = fmt.Errorf("FileInode.Sync: %w", err)
// If the inode was local file inode, treat it as unlinked.
fs.mu.Lock()
delete(fs.localFileInodes, f.Name())
fs.mu.Unlock()
return
return err
}

// Once the inode is synced to GCS, it is no longer an localFileInode.
// Delete the entry from localFileInodes map and add it to generationBackedInodes.
fs.mu.Lock()
delete(fs.localFileInodes, f.Name())
_, ok := fs.generationBackedInodes[f.Name()]
if !ok {
fs.generationBackedInodes[f.Name()] = f
// Promote the inode to generationBackedInodes in fs maps.
fs.promoteToGenerationBacked(f)
return nil
}

// Synchronizes the supplied file inode to GCS, updating the index as
// appropriate.
//
// LOCKS_EXCLUDED(fs.mu)
// LOCKS_REQUIRED(f)
func (fs *fileSystem) syncFile(
ctx context.Context,
f *inode.FileInode) error {
// SyncFile can be triggered for unlinked files if the fileHandle is open by
// same or another user. This indicates a potential file clobbering scenario:
// - The file was deleted (unlinked) while a handle to it was still open.
if f.IsLocal() && f.IsUnlinked() {
return &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("file %s was unlinked while it was still open, indicating file clobbering", f.Name().LocalName()),
}
}
fs.mu.Unlock()

// We need not update fileIndex:
//
// We've held the inode lock the whole time, so there's no way that this
// inode could have been booted from the index. Therefore, if it's not in the
// index at the moment, it must not have been in there when we started. That
// is, it must have been clobbered remotely.
//
// In other words, either this inode is still in the index or it has been
// clobbered and *should* be anonymous.
// Sync the inode.
gcsSynced, err := f.Sync(ctx)
if err != nil {
err = fmt.Errorf("FileInode.Sync: %w", err)
// If the inode was local file inode, treat it as unlinked.
fs.mu.Lock()
delete(fs.localFileInodes, f.Name())
fs.mu.Unlock()
return err
}

return
// If gcsSynced is true, it means the inode was fully synced to GCS In this
// case, we need to promote the inode to generationBackedInodes in fs maps.
if gcsSynced {
fs.promoteToGenerationBacked(f)
}
return nil
}

// Decrement the supplied inode's lookup count, destroying it if the inode says
Expand Down Expand Up @@ -2546,7 +2589,7 @@ func (fs *fileSystem) FlushFile(
defer in.Unlock()

// Sync it.
if err := fs.syncFile(ctx, in); err != nil {
if err := fs.flushFile(ctx, in); err != nil {
return err
}

Expand Down
59 changes: 52 additions & 7 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func (f *FileInode) writeUsingTempFile(ctx context.Context, data []byte, offset
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) writeUsingBufferedWrites(ctx context.Context, data []byte, offset int64) error {
err := f.bwh.Write(data, offset)
if err == bufferedwrites.ErrOutOfOrderWrite || err == bufferedwrites.ErrUploadFailure {
if errors.Is(err, bufferedwrites.ErrOutOfOrderWrite) || errors.Is(err, bufferedwrites.ErrUploadFailure) {
// Finalize the object.
flushErr := f.flushUsingBufferedWriteHandler()
if flushErr != nil {
Expand Down Expand Up @@ -729,20 +729,42 @@ func (f *FileInode) fetchLatestGcsObject(ctx context.Context) (*gcs.Object, erro
return latestGcsObj, err
}

// Sync writes out contents to GCS. If this fails due to the generation having been
// clobbered, failure is propagated back to the calling function as an error.
// Sync writes out contents to GCS. If this fails due to the generation
// having been clobbered, failure is propagated back to the calling
// function as an error.
//
// After this method succeeds, SourceGeneration will return the new generation
// by which this inode should be known (which may be the same as before). If it
// For buffered writes, this method only waits for any partial buffers to be
// uploaded to GCS. It does not guarantee that the entire contents of the file
// have been persisted.
//
// For non-buffered writes, this method writes the entire contents to GCS.
// If this method succeeds, SourceGeneration will return the new generation by
// which this inode should be known (which may be the same as before). If it
// fails, the generation will not change.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Sync(ctx context.Context) (err error) {
func (f *FileInode) Sync(ctx context.Context) (gcsSynced bool, err error) {
// If we have not been dirtied, there is nothing to do.
if f.content == nil {
if f.content == nil && f.bwh == nil {
return
}

if f.bwh != nil {
// bwh.Sync does not finalize the upload, so return gcsSynced as false.
return false, f.bwh.Sync()
}
err = f.syncUsingContent(ctx)
if err != nil {
return false, err
}
return true, nil
}

// syncUsingContent syncs the inode content to GCS. It fetches the latest GCS
// object, syncs the content and updates the inode state.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) syncUsingContent(ctx context.Context) (err error) {
latestGcsObj, err := f.fetchLatestGcsObject(ctx)
if err != nil {
return
Expand Down Expand Up @@ -772,6 +794,29 @@ func (f *FileInode) Sync(ctx context.Context) (err error) {
return
}

// Flush writes out contents to GCS. If this fails due to the generation
// having been clobbered, failure is propagated back to the calling
// function as an error.
//
// After this method succeeds, SourceGeneration will return the new generation
// by which this inode should be known (which may be the same as before). If it
// fails, the generation will not change.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Flush(ctx context.Context) (err error) {
// If we have not been dirtied, there is nothing to do.
if f.content == nil && f.bwh == nil {
return
}

// Flush using the appropriate method based on whether we're using a
// buffered write handler.
if f.bwh != nil {
return f.flushUsingBufferedWriteHandler()
}
return f.syncUsingContent(ctx)
}

func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
Expand Down
Loading

0 comments on commit edc5e7b

Please sign in to comment.