Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enforce quota #1557

Merged
merged 3 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ func (s *service) InitiateFileUpload(ctx context.Context, req *provider.Initiate
// seealso errtypes.StatusChecksumMismatch
case errtypes.PermissionDenied:
st = status.NewPermissionDenied(ctx, err, "permission denied")
case errtypes.InsufficientStorage:
st = status.NewInsufficientStorage(ctx, err, "insufficient storage")
default:
st = status.NewInternal(ctx, err, "error getting upload id: "+req.Ref.String())
}
Expand Down
3 changes: 3 additions & 0 deletions internal/http/services/owncloud/ocdav/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func HandleErrorStatus(log *zerolog.Logger, w http.ResponseWriter, s *rpc.Status
case rpc.Code_CODE_UNIMPLEMENTED:
log.Debug().Interface("status", s).Msg("not implemented")
w.WriteHeader(http.StatusNotImplemented)
case rpc.Code_CODE_INSUFFICIENT_STORAGE:
log.Debug().Interface("status", s).Msg("insufficient storage")
w.WriteHeader(http.StatusInsufficientStorage)
default:
log.Error().Interface("status", s).Msg("grpc request failed")
w.WriteHeader(http.StatusInternalServerError)
Expand Down
18 changes: 18 additions & 0 deletions pkg/errtypes/errtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ func (e ChecksumMismatch) IsChecksumMismatch() {}
// oc clienst issue: https://github.com/owncloud/core/issues/22711
const StatusChecksumMismatch = 419

// InsufficientStorage is the error to use when there is insufficient storage.
type InsufficientStorage string

func (e InsufficientStorage) Error() string { return "error: insufficient storage: " + string(e) }

// IsInsufficientStorage implements the IsInsufficientStorage interface.
func (e InsufficientStorage) IsInsufficientStorage() {}

// StatusInssufficientStorage 507 is an official http status code to indicate that there is insufficient storage
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/507
const StatusInssufficientStorage = 507

// IsNotFound is the interface to implement
// to specify that an a resource is not found.
type IsNotFound interface {
Expand Down Expand Up @@ -169,3 +181,9 @@ type IsBadRequest interface {
type IsChecksumMismatch interface {
IsChecksumMismatch()
}

// IsInsufficientStorage is the interface to implement
// to specify that a there is insufficient storage.
type IsInsufficientStorage interface {
IsInsufficientStorage()
}
12 changes: 12 additions & 0 deletions pkg/rgrpc/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,18 @@ func NewPermissionDenied(ctx context.Context, err error, msg string) *rpc.Status
}
}

// NewInsufficientStorage returns a Status with INSUFFICIENT_STORAGE and logs the msg.
func NewInsufficientStorage(ctx context.Context, err error, msg string) *rpc.Status {
log := appctx.GetLogger(ctx).With().CallerWithSkipFrameCount(3).Logger()
log.Err(err).Msg(msg)

return &rpc.Status{
Code: rpc.Code_CODE_INSUFFICIENT_STORAGE,
Message: msg,
Trace: getTrace(ctx),
}
}

// NewUnimplemented returns a Status with CODE_UNIMPLEMENTED and logs the msg.
func NewUnimplemented(ctx context.Context, err error, msg string) *rpc.Status {
log := appctx.GetLogger(ctx).With().CallerWithSkipFrameCount(3).Logger()
Expand Down
2 changes: 2 additions & 0 deletions pkg/rhttp/datatx/manager/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (m *manager) Handler(fs storage.FS) (http.Handler, error) {
w.WriteHeader(http.StatusForbidden)
case errtypes.InvalidCredentials:
w.WriteHeader(http.StatusUnauthorized)
case errtypes.InsufficientStorage:
w.WriteHeader(http.StatusInsufficientStorage)
default:
sublog.Error().Err(v).Msg("error uploading file")
w.WriteHeader(http.StatusInternalServerError)
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,8 @@ func (fs *Decomposedfs) Shutdown(ctx context.Context) error {

// GetQuota returns the quota available
// TODO Document in the cs3 should we return quota or free space?
func (fs *Decomposedfs) GetQuota(ctx context.Context) (uint64, uint64, error) {
func (fs *Decomposedfs) GetQuota(ctx context.Context) (total uint64, inUse uint64, err error) {
var n *node.Node
var err error
if n, err = fs.lu.HomeOrRootNode(ctx); err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -158,7 +157,7 @@ func (fs *Decomposedfs) GetQuota(ctx context.Context) (uint64, uint64, error) {
if err != nil {
return 0, 0, err
}
total := avail + ri.Size
total = avail + ri.Size

switch {
case quotaStr == node.QuotaUncalculated, quotaStr == node.QuotaUnknown, quotaStr == node.QuotaUnlimited:
Expand All @@ -171,6 +170,7 @@ func (fs *Decomposedfs) GetQuota(ctx context.Context) (uint64, uint64, error) {
}
}
}

return total, ri.Size, nil
}

Expand Down
37 changes: 37 additions & 0 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere

log.Debug().Interface("info", info).Interface("node", n).Interface("metadata", metadata).Msg("Decomposedfs: resolved filename")

_, err = checkQuota(ctx, fs, uint64(info.Size))
if err != nil {
return nil, err
}

upload, err := fs.NewUpload(ctx, info)
if err != nil {
return nil, err
Expand Down Expand Up @@ -417,12 +422,21 @@ func (upload *fileUpload) writeInfo() error {

// FinishUpload finishes an upload and moves the file to the internal destination
func (upload *fileUpload) FinishUpload(ctx context.Context) (err error) {

// ensure cleanup
defer upload.discardChunk()

fi, err := os.Stat(upload.binPath)
if err != nil {
appctx.GetLogger(upload.ctx).Err(err).Msg("Decomposedfs: could not stat uploaded file")
return
}

_, err = checkQuota(upload.ctx, upload.fs, uint64(fi.Size()))
if err != nil {
return err
}

n := node.New(
upload.info.Storage["NodeId"],
upload.info.Storage["NodeParentId"],
Expand Down Expand Up @@ -610,6 +624,12 @@ func (upload *fileUpload) discardChunk() {
return
}
}
if err := os.Remove(upload.infoPath); err != nil {
if !os.IsNotExist(err) {
appctx.GetLogger(upload.ctx).Err(err).Interface("info", upload.info).Str("infoPath", upload.infoPath).Interface("info", upload.info).Msg("Decomposedfs: could not discard chunk info")
return
}
}
}

// To implement the termination extension as specified in https://tus.io/protocols/resumable-upload.html#termination
Expand Down Expand Up @@ -685,3 +705,20 @@ func (upload *fileUpload) ConcatUploads(ctx context.Context, uploads []tusd.Uplo

return
}

func checkQuota(ctx context.Context, fs *Decomposedfs, fileSize uint64) (quotaSufficient bool, err error) {
total, inUse, err := fs.GetQuota(ctx)
if err != nil {
switch err.(type) {
case errtypes.NotFound:
// no quota for this storage (eg. no user context)
return true, nil
default:
return false, err
}
}
if !(total == 0) && fileSize > total-inUse {
return false, errtypes.InsufficientStorage("quota exceeded")
}
return true, nil
}