Skip to content

Commit

Permalink
add if-match check to finish upload
Browse files Browse the repository at this point in the history
  • Loading branch information
David Christofas committed Feb 22, 2022
1 parent 24c4944 commit e25b9b3
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 9 deletions.
9 changes: 5 additions & 4 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,17 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
}, nil
}

if ifMatch := req.GetIfMatch(); ifMatch != "" {
metadata := map[string]string{}
ifMatch := req.GetIfMatch()
if ifMatch != "" {
sRes, err := s.Stat(ctx, &provider.StatRequest{Ref: req.Ref})
if err != nil {
return nil, err
}

switch sRes.Status.Code {
case rpc.Code_CODE_OK:
if sRes.Info.Etag != req.GetIfMatch() {
if sRes.Info.Etag != ifMatch {
return &provider.InitiateFileUploadResponse{
Status: status.NewFailedPrecondition(ctx, errors.New("etag doesn't match"), "etag doesn't match"),
}, nil
Expand All @@ -311,7 +313,7 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
Status: sRes.Status,
}, nil
}

metadata["if-match"] = ifMatch
}

// FIXME these should be part of the InitiateFileUploadRequest object
Expand All @@ -321,7 +323,6 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
}
}

metadata := map[string]string{}
var uploadLength int64
if req.Opaque != nil && req.Opaque.Map != nil {
if req.Opaque.Map["Upload-Length"] != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/rhttp/datatx/manager/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
w.WriteHeader(http.StatusUnauthorized)
case errtypes.InsufficientStorage:
w.WriteHeader(http.StatusInsufficientStorage)
case errtypes.PreconditionFailed:
w.WriteHeader(http.StatusPreconditionFailed)
default:
sublog.Error().Err(v).Msg("error uploading file")
w.WriteHeader(http.StatusInternalServerError)
Expand Down
2 changes: 2 additions & 0 deletions pkg/rhttp/datatx/manager/spaces/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
w.WriteHeader(http.StatusUnauthorized)
case errtypes.InsufficientStorage:
w.WriteHeader(http.StatusInsufficientStorage)
case errtypes.PreconditionFailed:
w.WriteHeader(http.StatusPreconditionFailed)
default:
sublog.Error().Err(v).Msg("error uploading file")
w.WriteHeader(http.StatusInternalServerError)
Expand Down
26 changes: 21 additions & 5 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,24 +124,27 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere
}

if metadata != nil {
if metadata["mtime"] != "" {
info.MetaData["mtime"] = metadata["mtime"]
if mtime, ok := metadata["mtime"]; ok {
info.MetaData["mtime"] = mtime
}
if _, ok := metadata["sizedeferred"]; ok {
info.SizeIsDeferred = true
}
if metadata["checksum"] != "" {
parts := strings.SplitN(metadata["checksum"], " ", 2)
if checksum, ok := metadata["checksum"]; ok {
parts := strings.SplitN(checksum, " ", 2)
if len(parts) != 2 {
return nil, errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'")
}
switch parts[0] {
case "sha1", "md5", "adler32":
info.MetaData["checksum"] = metadata["checksum"]
info.MetaData["checksum"] = checksum
default:
return nil, errtypes.BadRequest("unsupported checksum algorithm: " + parts[0])
}
}
if ifMatch, ok := metadata["if-match"]; ok {
info.MetaData["if-match"] = ifMatch
}
}

log.Debug().Interface("info", info).Interface("node", n).Interface("metadata", metadata).Msg("Decomposedfs: resolved filename")
Expand Down Expand Up @@ -538,6 +541,19 @@ func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {
// if target exists create new version
versionsPath := ""
if fi, err = os.Stat(targetPath); err == nil {
// When the if-match header was set we need to check if the
// etag still matches before finishing the upload.
if ifMatch, ok := upload.info.MetaData["if-match"]; ok {
var targetEtag string
targetEtag, err = node.CalculateEtag(n.ID, fi.ModTime())
if err != nil {
return errtypes.InternalError(err.Error())
}
if ifMatch != targetEtag {
return errtypes.PreconditionFailed("etag doesn't match")
}
}

// FIXME move versioning to blobs ... no need to copy all the metadata! well ... it does if we want to version metadata...
// versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries
versionsPath = upload.fs.lu.InternalPath(n.ID + ".REV." + fi.ModTime().UTC().Format(time.RFC3339Nano))
Expand Down

0 comments on commit e25b9b3

Please sign in to comment.