Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Datatx initiate transfer #1759

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/datatx-initiate-transfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Create transfer type share

`datatx-initiate-transfer` starts the transfer after the transfer type share has been accepted.

https://github.com/cs3org/reva/pull/1759
9 changes: 7 additions & 2 deletions cmd/reva/transfer-get-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main

import (
"errors"
"fmt"
"io"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand All @@ -44,8 +45,11 @@ func transferGetStatusCommand() *command {
return err
}

getStatusRequest := &datatx.GetTransferStatusRequest{}

getStatusRequest := &datatx.GetTransferStatusRequest{
TxId: &datatx.TxId{
OpaqueId: *txID,
},
}
getStatusResponse, err := client.GetTransferStatus(ctx, getStatusRequest)
if err != nil {
return err
Expand All @@ -54,6 +58,7 @@ func transferGetStatusCommand() *command {
return formatError(getStatusResponse.Status)
}

fmt.Printf("get-transfer-status response: %v\n", getStatusResponse)
return nil
}
return cmd
Expand Down
1 change: 1 addition & 0 deletions cmd/revad/runtime/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
_ "github.com/cs3org/reva/pkg/auth/manager/loader"
_ "github.com/cs3org/reva/pkg/auth/registry/loader"
_ "github.com/cs3org/reva/pkg/cbox/loader"
_ "github.com/cs3org/reva/pkg/datatx/manager/loader"
_ "github.com/cs3org/reva/pkg/group/manager/loader"
_ "github.com/cs3org/reva/pkg/metrics/driver/loader"
_ "github.com/cs3org/reva/pkg/ocm/invite/manager/loader"
Expand Down
74 changes: 68 additions & 6 deletions internal/grpc/services/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ import (
"context"

datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
driver "github.com/cs3org/reva/pkg/datatx"
"github.com/cs3org/reva/pkg/datatx/manager/registry"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/rgrpc/status"
"github.com/cs3org/reva/pkg/storage"
fsreg "github.com/cs3org/reva/pkg/storage/fs/registry"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"google.golang.org/grpc"
Expand All @@ -35,19 +39,45 @@ func init() {
}

type config struct {
Driver string `mapstructure:"driver"`
Drivers map[string]map[string]interface{} `mapstructure:"drivers"`
FSDriver string `mapstructure:"fsdriver"`
FSDrivers map[string]map[string]interface{} `mapstructure:"fsdrivers"`
}

type service struct {
conf *config
conf *config
datatx driver.Manager
storage storage.FS
}

func (c *config) init() {
if c.Driver == "" {
c.Driver = "rclone"
}
if c.FSDriver == "" {
c.FSDriver = "localhome"
}
}

func (s *service) Register(ss *grpc.Server) {
datatx.RegisterTxAPIServer(ss, s)
}

func getDatatxManager(c *config) (driver.Manager, error) {
if f, ok := registry.NewFuncs[c.Driver]; ok {
return f(c.Drivers[c.Driver])
}
return nil, errtypes.NotFound("driver not found: " + c.Driver)
}

func getFS(c *config) (storage.FS, error) {
if f, ok := fsreg.NewFuncs[c.FSDriver]; ok {
return f(c.Drivers[c.FSDriver])
}
return nil, errtypes.NotFound("driver not found: " + c.FSDriver)
}

func parseConfig(m map[string]interface{}) (*config, error) {
c := &config{}
if err := mapstructure.Decode(m, c); err != nil {
Expand All @@ -66,8 +96,20 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
}
c.init()

datatx, err := getDatatxManager(c)
if err != nil {
return nil, err
}

fs, err := getFS(c)
if err != nil {
return nil, err
}

service := &service{
conf: c,
conf: c,
datatx: datatx,
storage: fs,
}

return service, nil
Expand All @@ -87,14 +129,34 @@ func (s *service) CreateTransfer(ctx context.Context, req *datatx.CreateTransfer
}, nil
}

func (s *service) GetTransferStatus(ctx context.Context, in *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) {
func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) {
txStatus, err := s.datatx.GetTransferStatus(req.TxId.OpaqueId)
if err != nil {
return &datatx.GetTransferStatusResponse{
Status: status.NewInternal(ctx, err, "error requesting transfer status"),
}, nil
}
return &datatx.GetTransferStatusResponse{
Status: status.NewUnimplemented(ctx, errtypes.NotSupported("GetTransferStatus not implemented"), "GetTransferStatus not implemented"),
Status: status.NewOK(ctx),
TxInfo: &datatx.TxInfo{
Id: req.TxId,
Status: txStatus,
},
}, nil
}

func (s *service) CancelTransfer(ctx context.Context, in *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) {
func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) {
txStatus, err := s.datatx.CancelTransfer(req.TxId.OpaqueId)
if err != nil {
return &datatx.CancelTransferResponse{
Status: status.NewInternal(ctx, err, "error cancelling transfer"),
}, nil
}
return &datatx.CancelTransferResponse{
Status: status.NewUnimplemented(ctx, errtypes.NotSupported("CancelTransfer not implemented"), "CancelTransfer not implemented"),
Status: status.NewOK(ctx),
TxInfo: &datatx.TxInfo{
Id: req.TxId,
Status: txStatus,
},
}, nil
}
65 changes: 44 additions & 21 deletions internal/grpc/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"

"github.com/ReneKroon/ttlcache/v2"
"github.com/cs3org/reva/pkg/datatx"
datatxreg "github.com/cs3org/reva/pkg/datatx/manager/registry"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/sharedconf"
Expand All @@ -42,27 +44,29 @@ func init() {
}

type config struct {
AuthRegistryEndpoint string `mapstructure:"authregistrysvc"`
ApplicationAuthEndpoint string `mapstructure:"applicationauthsvc"`
StorageRegistryEndpoint string `mapstructure:"storageregistrysvc"`
AppRegistryEndpoint string `mapstructure:"appregistrysvc"`
PreferencesEndpoint string `mapstructure:"preferencessvc"`
UserShareProviderEndpoint string `mapstructure:"usershareprovidersvc"`
PublicShareProviderEndpoint string `mapstructure:"publicshareprovidersvc"`
OCMShareProviderEndpoint string `mapstructure:"ocmshareprovidersvc"`
OCMInviteManagerEndpoint string `mapstructure:"ocminvitemanagersvc"`
OCMProviderAuthorizerEndpoint string `mapstructure:"ocmproviderauthorizersvc"`
OCMCoreEndpoint string `mapstructure:"ocmcoresvc"`
UserProviderEndpoint string `mapstructure:"userprovidersvc"`
GroupProviderEndpoint string `mapstructure:"groupprovidersvc"`
DataTxEndpoint string `mapstructure:"datatx"`
DataGatewayEndpoint string `mapstructure:"datagateway"`
CommitShareToStorageGrant bool `mapstructure:"commit_share_to_storage_grant"`
CommitShareToStorageRef bool `mapstructure:"commit_share_to_storage_ref"`
DisableHomeCreationOnLogin bool `mapstructure:"disable_home_creation_on_login"`
TransferSharedSecret string `mapstructure:"transfer_shared_secret"`
TransferExpires int64 `mapstructure:"transfer_expires"`
TokenManager string `mapstructure:"token_manager"`
AuthRegistryEndpoint string `mapstructure:"authregistrysvc"`
ApplicationAuthEndpoint string `mapstructure:"applicationauthsvc"`
StorageRegistryEndpoint string `mapstructure:"storageregistrysvc"`
AppRegistryEndpoint string `mapstructure:"appregistrysvc"`
PreferencesEndpoint string `mapstructure:"preferencessvc"`
UserShareProviderEndpoint string `mapstructure:"usershareprovidersvc"`
PublicShareProviderEndpoint string `mapstructure:"publicshareprovidersvc"`
OCMShareProviderEndpoint string `mapstructure:"ocmshareprovidersvc"`
OCMInviteManagerEndpoint string `mapstructure:"ocminvitemanagersvc"`
OCMProviderAuthorizerEndpoint string `mapstructure:"ocmproviderauthorizersvc"`
OCMCoreEndpoint string `mapstructure:"ocmcoresvc"`
UserProviderEndpoint string `mapstructure:"userprovidersvc"`
GroupProviderEndpoint string `mapstructure:"groupprovidersvc"`
DataTxEndpoint string `mapstructure:"datatx"`
DataGatewayEndpoint string `mapstructure:"datagateway"`
CommitShareToStorageGrant bool `mapstructure:"commit_share_to_storage_grant"`
CommitShareToStorageRef bool `mapstructure:"commit_share_to_storage_ref"`
DisableHomeCreationOnLogin bool `mapstructure:"disable_home_creation_on_login"`
TransferSharedSecret string `mapstructure:"transfer_shared_secret"`
TransferExpires int64 `mapstructure:"transfer_expires"`
TokenManager string `mapstructure:"token_manager"`
DatatxManager string `mapstructure:"datatx_manager"`
DatatxManagers map[string]map[string]interface{} `mapstructure:"datatx_managers"`
// ShareFolder is the location where to create shares in the recipient's storage provider.
ShareFolder string `mapstructure:"share_folder"`
DataTransfersFolder string `mapstructure:"data_transfers_folder"`
Expand All @@ -83,6 +87,10 @@ func (c *config) init() {
c.DataTransfersFolder = "Data-Transfers"
}

if c.DatatxManager == "" {
c.DatatxManager = "rclone"
}

if c.TokenManager == "" {
c.TokenManager = "jwt"
}
Expand Down Expand Up @@ -120,6 +128,7 @@ type svc struct {
dataGatewayURL url.URL
tokenmgr token.Manager
etagCache *ttlcache.Cache `mapstructure:"etag_cache"`
dtxm datatx.Manager
}

// New creates a new gateway svc that acts as a proxy for any grpc operation.
Expand All @@ -144,6 +153,11 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
return nil, err
}

datatxManager, err := getDatatxManager(c.DatatxManager, c.DatatxManagers)
if err != nil {
return nil, err
}

etagCache := ttlcache.NewCache()
_ = etagCache.SetTTL(time.Duration(c.EtagCacheTTL) * time.Second)
etagCache.SkipTTLExtensionOnHit(true)
Expand All @@ -153,6 +167,7 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
dataGatewayURL: *u,
tokenmgr: tokenManager,
etagCache: etagCache,
dtxm: datatxManager,
}

return s, nil
Expand Down Expand Up @@ -187,3 +202,11 @@ func getTokenManager(manager string, m map[string]map[string]interface{}) (token

return nil, errtypes.NotFound(fmt.Sprintf("driver %s not found for token manager", manager))
}

func getDatatxManager(manager string, m map[string]map[string]interface{}) (datatx.Manager, error) {
if f, ok := datatxreg.NewFuncs[manager]; ok {
return f(m[manager])
}

return nil, errtypes.NotFound(fmt.Sprintf("driver %s not found for token manager", manager))
}
Loading