Skip to content

Commit

Permalink
Move datatx create transfer call to gateway ocmshareprovider.
Browse files Browse the repository at this point in the history
  • Loading branch information
Antoon Prins committed Jun 7, 2021
1 parent 418871c commit 60250dd
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 121 deletions.
64 changes: 43 additions & 21 deletions internal/grpc/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"

"github.com/ReneKroon/ttlcache/v2"
"github.com/cs3org/reva/pkg/datatx"
datatxreg "github.com/cs3org/reva/pkg/datatx/manager/registry"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/sharedconf"
Expand All @@ -42,27 +44,28 @@ func init() {
}

type config struct {
AuthRegistryEndpoint string `mapstructure:"authregistrysvc"`
ApplicationAuthEndpoint string `mapstructure:"applicationauthsvc"`
StorageRegistryEndpoint string `mapstructure:"storageregistrysvc"`
AppRegistryEndpoint string `mapstructure:"appregistrysvc"`
PreferencesEndpoint string `mapstructure:"preferencessvc"`
UserShareProviderEndpoint string `mapstructure:"usershareprovidersvc"`
PublicShareProviderEndpoint string `mapstructure:"publicshareprovidersvc"`
OCMShareProviderEndpoint string `mapstructure:"ocmshareprovidersvc"`
OCMInviteManagerEndpoint string `mapstructure:"ocminvitemanagersvc"`
OCMProviderAuthorizerEndpoint string `mapstructure:"ocmproviderauthorizersvc"`
OCMCoreEndpoint string `mapstructure:"ocmcoresvc"`
UserProviderEndpoint string `mapstructure:"userprovidersvc"`
GroupProviderEndpoint string `mapstructure:"groupprovidersvc"`
DataTxEndpoint string `mapstructure:"datatx"`
DataGatewayEndpoint string `mapstructure:"datagateway"`
CommitShareToStorageGrant bool `mapstructure:"commit_share_to_storage_grant"`
CommitShareToStorageRef bool `mapstructure:"commit_share_to_storage_ref"`
DisableHomeCreationOnLogin bool `mapstructure:"disable_home_creation_on_login"`
TransferSharedSecret string `mapstructure:"transfer_shared_secret"`
TransferExpires int64 `mapstructure:"transfer_expires"`
TokenManager string `mapstructure:"token_manager"`
AuthRegistryEndpoint string `mapstructure:"authregistrysvc"`
StorageRegistryEndpoint string `mapstructure:"storageregistrysvc"`
AppRegistryEndpoint string `mapstructure:"appregistrysvc"`
PreferencesEndpoint string `mapstructure:"preferencessvc"`
UserShareProviderEndpoint string `mapstructure:"usershareprovidersvc"`
PublicShareProviderEndpoint string `mapstructure:"publicshareprovidersvc"`
OCMShareProviderEndpoint string `mapstructure:"ocmshareprovidersvc"`
OCMInviteManagerEndpoint string `mapstructure:"ocminvitemanagersvc"`
OCMProviderAuthorizerEndpoint string `mapstructure:"ocmproviderauthorizersvc"`
OCMCoreEndpoint string `mapstructure:"ocmcoresvc"`
UserProviderEndpoint string `mapstructure:"userprovidersvc"`
GroupProviderEndpoint string `mapstructure:"groupprovidersvc"`
DataTxEndpoint string `mapstructure:"datatx"`
DataGatewayEndpoint string `mapstructure:"datagateway"`
CommitShareToStorageGrant bool `mapstructure:"commit_share_to_storage_grant"`
CommitShareToStorageRef bool `mapstructure:"commit_share_to_storage_ref"`
DisableHomeCreationOnLogin bool `mapstructure:"disable_home_creation_on_login"`
TransferSharedSecret string `mapstructure:"transfer_shared_secret"`
TransferExpires int64 `mapstructure:"transfer_expires"`
TokenManager string `mapstructure:"token_manager"`
DatatxManager string `mapstructure:"datatx_manager"`
DatatxManagers map[string]map[string]interface{} `mapstructure:"datatx_managers"`
// ShareFolder is the location where to create shares in the recipient's storage provider.
ShareFolder string `mapstructure:"share_folder"`
DataTransfersFolder string `mapstructure:"data_transfers_folder"`
Expand All @@ -83,6 +86,10 @@ func (c *config) init() {
c.DataTransfersFolder = "Data-Transfers"
}

if c.DatatxManager == "" {
c.DatatxManager = "rclone"
}

if c.TokenManager == "" {
c.TokenManager = "jwt"
}
Expand Down Expand Up @@ -120,6 +127,7 @@ type svc struct {
dataGatewayURL url.URL
tokenmgr token.Manager
etagCache *ttlcache.Cache `mapstructure:"etag_cache"`
dtxm datatx.Manager
}

// New creates a new gateway svc that acts as a proxy for any grpc operation.
Expand All @@ -144,6 +152,11 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
return nil, err
}

datatxManager, err := getDatatxManager(c.DatatxManager, c.DatatxManagers)
if err != nil {
return nil, err
}

etagCache := ttlcache.NewCache()
_ = etagCache.SetTTL(time.Duration(c.EtagCacheTTL) * time.Second)
etagCache.SkipTTLExtensionOnHit(true)
Expand All @@ -153,6 +166,7 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
dataGatewayURL: *u,
tokenmgr: tokenManager,
etagCache: etagCache,
dtxm: datatxManager,
}

return s, nil
Expand Down Expand Up @@ -187,3 +201,11 @@ func getTokenManager(manager string, m map[string]map[string]interface{}) (token

return nil, errtypes.NotFound(fmt.Sprintf("driver %s not found for token manager", manager))
}

func getDatatxManager(manager string, m map[string]map[string]interface{}) (datatx.Manager, error) {
if f, ok := datatxreg.NewFuncs[manager]; ok {
return f(m[manager])
}

return nil, errtypes.NotFound(fmt.Sprintf("driver %s not found for token manager", manager))
}
95 changes: 70 additions & 25 deletions internal/grpc/services/gateway/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"path"
"strings"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/rgrpc/status"
"github.com/cs3org/reva/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/pkg/token"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -219,6 +221,70 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
}, nil
}

getShareReq := &ocm.GetReceivedOCMShareRequest{Ref: req.Ref}
getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq)
if err != nil {
log.Err(err).Msg("gateway: error calling GetReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
}, nil
}

if getShareRes.Status.Code != rpc.Code_CODE_OK {
log.Error().Msg("gateway: error calling GetReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
}, nil
}

share := getShareRes.Share
if share == nil {
panic("gateway: error updating a received share: the share is nil")
}

if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER {
srcRemote := share.GetShare().GetOwner().GetIdp()
// TODO do we actually know for sure the home path of the src reva instance?
srcPath := strings.TrimPrefix(share.GetShare().GetName(), "/home")
var srcToken string
srcTokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"]
if !ok {
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewNotFound(ctx, "token not found"),
}, nil
}
switch srcTokenOpaque.Decoder {
case "plain":
srcToken = string(srcTokenOpaque.Value)
default:
err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder)
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewInternal(ctx, err, "error updating received share"),
}, nil
}
destRemote := share.GetShare().GetGrantee().GetUserId().GetIdp()
destPath := path.Join(s.c.DataTransfersFolder, path.Base(share.GetShare().Name))
destToken, ok := token.ContextGetToken(ctx)
if !ok || destToken == "" {
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewInternal(ctx, err, "error updating received share"),
}, nil
}

datatxInfoStatus, err := s.dtxm.CreateTransfer(share.GetShare().GetId().OpaqueId, srcRemote, srcPath, srcToken, destRemote, destPath, destToken)
if err != nil {
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewInternal(ctx, err, "error updating received share"),
}, nil
}
log.Info().Msg("datatx transfer created: " + datatxInfoStatus.String())
return res, nil
}

// if we don't need to create/delete references then we return early.
if !s.c.CommitShareToStorageGrant && !s.c.CommitShareToStorageRef {
return res, nil
Expand All @@ -234,31 +300,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
// TODO(labkode): if update field is displayName we need to do a rename on the storage to align
// share display name and storage filename.
if req.Field.GetState() != ocm.ShareState_SHARE_STATE_INVALID {
if req.Field.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED {
getShareReq := &ocm.GetReceivedOCMShareRequest{Ref: req.Ref}
getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq)
if err != nil {
log.Err(err).Msg("gateway: error calling GetReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
}, nil
}

if getShareRes.Status.Code != rpc.Code_CODE_OK {
log.Error().Msg("gateway: error calling GetReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
}, nil
}

share := getShareRes.Share
if share == nil {
panic("gateway: error updating a received share: the share is nil")
}
if req.Field.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED || req.Field.GetState() == ocm.ShareState_SHARE_STATE_PENDING {

createRefStatus, err := s.createOCMReference(ctx, share.Share)
return &ocm.UpdateReceivedOCMShareResponse{
Expand Down Expand Up @@ -348,6 +390,7 @@ func (s *svc) createOCMReference(ctx context.Context, share *ocm.Share) (*rpc.St
Path: refPath,
TargetUri: targetURI,
}
fmt.Printf("createReferenceReq: %v \n", createRefReq)

c, err := s.findByPath(ctx, refPath)
if err != nil {
Expand All @@ -370,5 +413,7 @@ func (s *svc) createOCMReference(ctx context.Context, share *ocm.Share) (*rpc.St
return status.NewInternal(ctx, err, "error updating received share"), nil
}

fmt.Printf("reference: %v \n", createRefRes)

return status.NewOK(ctx), nil
}
77 changes: 2 additions & 75 deletions internal/grpc/services/ocmshareprovider/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@ package ocmshareprovider

import (
"context"
"path"
"strings"

ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
"github.com/cs3org/reva/pkg/appctx"
"github.com/cs3org/reva/pkg/datatx"
datatxreg "github.com/cs3org/reva/pkg/datatx/manager/registry"
"github.com/cs3org/reva/pkg/errtypes"
"github.com/cs3org/reva/pkg/ocm/share"
"github.com/cs3org/reva/pkg/ocm/share/manager/registry"
"github.com/cs3org/reva/pkg/rgrpc"
"github.com/cs3org/reva/pkg/rgrpc/status"
"github.com/cs3org/reva/pkg/token"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"google.golang.org/grpc"
Expand All @@ -43,16 +37,13 @@ func init() {
}

type config struct {
Driver string `mapstructure:"driver"`
Drivers map[string]map[string]interface{} `mapstructure:"drivers"`
DatatxDriver string `mapstructure:"datatxdriver"`
DatatxDrivers map[string]map[string]interface{} `mapstructure:"datatxdrivers"`
Driver string `mapstructure:"driver"`
Drivers map[string]map[string]interface{} `mapstructure:"drivers"`
}

type service struct {
conf *config
sm share.Manager
dtxm datatx.Manager
}

func (c *config) init() {
Expand All @@ -72,13 +63,6 @@ func getShareManager(c *config) (share.Manager, error) {
return nil, errtypes.NotFound("driver not found: " + c.Driver)
}

func getDatatxManager(c *config) (datatx.Manager, error) {
if f, ok := datatxreg.NewFuncs[c.DatatxDriver]; ok {
return f(c.DatatxDrivers[c.DatatxDriver])
}
return nil, errtypes.NotFound("datatx driver not found: " + c.DatatxDriver)
}

func parseConfig(m map[string]interface{}) (*config, error) {
c := &config{}
if err := mapstructure.Decode(m, c); err != nil {
Expand All @@ -102,15 +86,9 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) {
return nil, err
}

dtxm, err := getDatatxManager(c)
if err != nil {
return nil, err
}

service := &service{
conf: c,
sm: sm,
dtxm: dtxm,
}

return service, nil
Expand Down Expand Up @@ -269,64 +247,13 @@ func (s *service) ListReceivedOCMShares(ctx context.Context, req *ocm.ListReceiv
}

func (s *service) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceivedOCMShareRequest) (*ocm.UpdateReceivedOCMShareResponse, error) {
log := appctx.GetLogger(ctx)

_, err := s.sm.UpdateReceivedShare(ctx, req.Ref, req.Field) // TODO(labkode): check what to update
if err != nil {
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewInternal(ctx, err, "error updating received share"),
}, nil
}

// initiate transfer in case this is a transfer type share
receivedShare, err := s.sm.GetReceivedShare(ctx, req.Ref)
if err != nil {
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewInternal(ctx, err, "error updating received share"),
}, nil
}
if receivedShare.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER {
srcRemote := receivedShare.GetShare().GetOwner().GetIdp()
// remove the home path for webdav transfer calls
// TODO do we actually know for sure the home path of the src reva instance ??
srcPath := strings.TrimPrefix(receivedShare.GetShare().GetName(), "/home")
var srcToken string
srcTokenOpaque, ok := receivedShare.GetShare().Grantee.Opaque.Map["token"]
if !ok {
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewNotFound(ctx, "token not found"),
}, nil
}
switch srcTokenOpaque.Decoder {
case "plain":
srcToken = string(srcTokenOpaque.Value)
default:
err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder)
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewInternal(ctx, err, "error updating received share"),
}, nil
}

destRemote := receivedShare.GetShare().GetGrantee().GetUserId().GetIdp()
// TODO how to get the data transfers folder?
destPath := path.Join("/Data-Transfers", path.Base(receivedShare.GetShare().Name))
destToken, ok := token.ContextGetToken(ctx)
if !ok || destToken == "" {
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewInternal(ctx, err, "error updating received share"),
}, nil
}

datatxInfoStatus, err := s.dtxm.CreateTransfer(receivedShare.GetShare().GetId().OpaqueId, srcRemote, srcPath, srcToken, destRemote, destPath, destToken)
if err != nil {
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewInternal(ctx, err, "error updating received share"),
}, nil
}
log.Info().Msg("datatx transfer created: " + datatxInfoStatus.String())

}

res := &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewOK(ctx),
}
Expand Down

0 comments on commit 60250dd

Please sign in to comment.