Skip to content

Commit

Permalink
clean chunk upload code
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Aug 28, 2024
1 parent 53ed534 commit 788ff27
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 63 deletions.
72 changes: 42 additions & 30 deletions pkg/storage/fs/cephfs/chunking.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ func IsChunked(fn string) (bool, error) {
}

// ChunkBLOBInfo stores info about a particular chunk
// example: given /users/peter/myfile.txt-chunking-1234-10-2
type ChunkBLOBInfo struct {
Path string
TransferID string
TotalChunks int
CurrentChunk int
Path string // example: /users/peter/myfile.txt
TransferID string // example: 1234
TotalChunks int // example: 10
CurrentChunk int // example: 2
}

// Not using the resource path in the chunk folder name allows uploading to
Expand Down Expand Up @@ -86,8 +87,8 @@ func GetChunkBLOBInfo(path string) (*ChunkBLOBInfo, error) {
// ChunkHandler manages chunked uploads, storing the chunks in a temporary directory
// until it gets the final chunk which is then returned.
type ChunkHandler struct {
user *User
chunkFolder string
user *User
uploadFolder string // example: /users/peter/.uploads
}

// NewChunkHandler creates a handler for chunked uploads.
Expand All @@ -97,31 +98,44 @@ func NewChunkHandler(ctx context.Context, fs *cephfs) *ChunkHandler {
return &ChunkHandler{u, path.Join(u.home, fs.conf.UploadFolder)}
}

func (c *ChunkHandler) getChunkTempFileName() string {
func (c *ChunkHandler) getTempFileName() string {
return fmt.Sprintf("__%d_%s", time.Now().Unix(), uuid.New().String())
}

func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (path string, err error) {
path = filepath.Join(c.chunkFolder, i.uploadID())
func (c *ChunkHandler) getAndCreateTransferFolderName(i *ChunkBLOBInfo) (path string, err error) {
path = filepath.Join(c.uploadFolder, i.uploadID())
c.user.op(func(cv *cacheVal) {
err = cv.mount.MakeDir(path, 0777)
})

return
}

// TODO(labkode): I don't like how this function looks like, better to refactor
func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chunk string, err error) {
chunkInfo, err := GetChunkBLOBInfo(path)
if err != nil {
err = fmt.Errorf("error getting chunk info from path: %s", path)
return
}

chunkTempFilename := c.getChunkTempFileName()
transferFolderName, err := c.getAndCreateTransferFolderName(chunkInfo)
if err != nil {
// TODO(labkode): skip error for now
// err = fmt.Errorf("error getting transfer folder anme", err)
return
}
fmt.Println("debugging: transferfoldername", transferFolderName)

// here we write a temporary file that will be renamed to the transfer folder
// with the correct sequence number filename.
// we do not store this before-rename temporary files inside the transfer folder
// to avoid errors when counting the number of chunks for finalizing the transfer.
tmpFilename := c.getTempFileName()
c.user.op(func(cv *cacheVal) {
var tmpFile *goceph.File
target := filepath.Join(c.chunkFolder, chunkTempFilename)
fmt.Println("debugging savechunk", target, c.chunkFolder, chunkTempFilename)
target := filepath.Join(c.uploadFolder, tmpFilename)
fmt.Println("debugging savechunk, target: ", target)
tmpFile, err = cv.mount.Open(target, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms)
defer closeFile(tmpFile)
if err != nil {
Expand All @@ -133,15 +147,9 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu
return
}

chunksFolderName, err := c.getChunkFolderName(chunkInfo)
if err != nil {
return
}
// c.logger.Info().Log("chunkfolder", chunksFolderName)

chunkTarget := filepath.Join(chunksFolderName, strconv.Itoa(chunkInfo.CurrentChunk))
chunkTarget := filepath.Join(transferFolderName, strconv.Itoa(chunkInfo.CurrentChunk))
c.user.op(func(cv *cacheVal) {
err = cv.mount.Rename(chunkTempFilename, chunkTarget)
err = cv.mount.Rename(tmpFilename, chunkTarget)
})
if err != nil {
return
Expand All @@ -158,7 +166,7 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu
var entry *goceph.DirEntry
var chunkFile, assembledFile *goceph.File

dir, err = cv.mount.OpenDir(chunksFolderName)
dir, err = cv.mount.OpenDir(transferFolderName)
defer closeDir(dir)

for entry, err = dir.ReadDir(); entry != nil && err == nil; entry, err = dir.ReadDir() {
Expand All @@ -171,16 +179,20 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu
return
}

chunk = filepath.Join(c.chunkFolder, c.getChunkTempFileName())
assembledFile, err = cv.mount.Open(chunk, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms)
// from now on we do have all the necessary chunks,
// so we create a temporary file where all the chunks will be written
// before being renamed to the requested location, from the example: /users/peter/myfile.txt

assemblyFilename := filepath.Join(c.uploadFolder, c.getTempFileName())
assembledFile, err = cv.mount.Open(assemblyFilename, os.O_CREATE|os.O_WRONLY, c.user.fs.conf.FilePerms)
defer closeFile(assembledFile)
defer deleteFile(cv.mount, chunk)
defer deleteFile(cv.mount, assemblyFilename)
if err != nil {
return
}

for i := 0; i < numEntries; i++ {
target := filepath.Join(chunksFolderName, strconv.Itoa(i))
target := filepath.Join(transferFolderName, strconv.Itoa(i))

chunkFile, err = cv.mount.Open(target, os.O_RDONLY, 0)
if err != nil {
Expand All @@ -193,22 +205,22 @@ func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chu
}
}

// necessary approach in case assembly fails
// clean all the chunks that made the assembly file
for i := 0; i < numEntries; i++ {
target := filepath.Join(chunksFolderName, strconv.Itoa(i))
target := filepath.Join(transferFolderName, strconv.Itoa(i))
err = cv.mount.Unlink(target)
if err != nil {
return
}
}
_ = cv.mount.Unlink(chunksFolderName)
})

return true, chunk, nil
return
}

// WriteChunk saves an intermediate chunk temporarily and assembles all chunks
// once the final one is received.
// this function will return the original filename (myfile.txt) and the assemblyPath when
// the upload is completed
func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, string, error) {
finish, chunk, err := c.saveChunk(fn, r)
if err != nil {
Expand Down
73 changes: 40 additions & 33 deletions pkg/storage/fs/cephfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,46 +40,53 @@ func (fs *cephfs) Upload(ctx context.Context, ref *provider.Reference, r io.Read
if err != nil {
return errors.Wrap(err, "cephfs: error checking if path is chunked")
}
if ok {
fmt.Println("debugging: chunked upload", p)
var assembledFile string
p, assembledFile, err = NewChunkHandler(ctx, fs).WriteChunk(p, r)
fmt.Println("debugging assembly", p, assembledFile, r)
if err != nil {
return errors.Wrapf(err, "error writing chunk %v %v %v", p, r, assembledFile)
}
if p == "" {
return errtypes.PartialContent(ref.String())
}

if !ok {
fmt.Println("debugging: upload is not chunked", p)
var file io.WriteCloser
user.op(func(cv *cacheVal) {
r, err = cv.mount.Open(assembledFile, os.O_RDONLY, 0)
})
if err != nil {
return errors.Wrap(err, "cephfs: error opening assembled file")
}
defer r.Close()
defer user.op(func(cv *cacheVal) {
_ = cv.mount.Unlink(assembledFile)
file, err = cv.mount.Open(p, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fs.conf.FilePerms)
if err != nil {
err = errors.Wrap(err, "cephfs: error opening binary file")
return
}
defer file.Close()

_, err = io.Copy(file, r)
if err != nil {
err = errors.Wrap(err, "cephfs: error writing to binary file")
return
}
})

return nil
}

var file io.WriteCloser
user.op(func(cv *cacheVal) {
file, err = cv.mount.Open(p, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, fs.conf.FilePerms)
if err != nil {
err = errors.Wrap(err, "cephfs: error opening binary file")
return
}
defer file.Close()
// upload is chunked
fmt.Println("debugging: upload is chunked", p)

var assembledFile string

_, err = io.Copy(file, r)
if err != nil {
err = errors.Wrap(err, "cephfs: error writing to binary file")
return
}
// iniate the chunk handler
originalFilename, assembledFile, err := NewChunkHandler(ctx, fs).WriteChunk(p, r)
fmt.Println("debugging: assembly file", originalFilename, assembledFile, r)
if err != nil {
return errors.Wrapf(err, "error writing chunk %v %v %v", p, r, assembledFile)
}
if originalFilename == "" { // means we wrote a chunk only
return errtypes.PartialContent(ref.String())
}
user.op(func(cv *cacheVal) {
err = cv.mount.Rename(assembledFile, originalFilename)
})
if err != nil {
return errors.Wrap(err, "cephfs: error renaming assembled file")
}
defer user.op(func(cv *cacheVal) {
_ = cv.mount.Unlink(assembledFile)
})
return nil

return err
}

func (fs *cephfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, metadata map[string]string) (map[string]string, error) {
Expand Down

0 comments on commit 788ff27

Please sign in to comment.