From 1ad992b26bf1df6078fef23d58f0d18714945ce2 Mon Sep 17 00:00:00 2001 From: Antoon Prins Date: Thu, 4 Nov 2021 15:09:54 +0100 Subject: [PATCH] Merge branch 'main' into datatx-pull-model --- .../grpc/services/gateway/ocmshareprovider.go | 238 +++++------------- 1 file changed, 64 insertions(+), 174 deletions(-) diff --git a/internal/grpc/services/gateway/ocmshareprovider.go b/internal/grpc/services/gateway/ocmshareprovider.go index 09e595534d6..d0675b7c6e8 100644 --- a/internal/grpc/services/gateway/ocmshareprovider.go +++ b/internal/grpc/services/gateway/ocmshareprovider.go @@ -207,36 +207,6 @@ func (s *svc) ListReceivedOCMShares(ctx context.Context, req *ocm.ListReceivedOC func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceivedOCMShareRequest) (*ocm.UpdateReceivedOCMShareResponse, error) { log := appctx.GetLogger(ctx) - - getShareReq := &ocm.GetReceivedOCMShareRequest{Ref: req.Ref} - getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq) - if err != nil { - log.Err(err).Msg("gateway: error calling GetReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - - if getShareRes.Status.Code != rpc.Code_CODE_OK { - log.Error().Msg("gateway: error calling GetReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - - share := getShareRes.Share - if share == nil { - panic("gateway: error updating a received share: the share is nil") - } - - // return early if transfer type share has already been accepted - if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER && share.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED { - log.Err(err).Msg("gateway: transfer type share already accepted") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewOK(ctx), - }, nil - } - c, err := pool.GetOCMShareProviderClient(s.c.OCMShareProviderEndpoint) if err != nil { err = errors.Wrap(err, "gateway: error calling GetOCMShareProviderClient") @@ -275,208 +245,128 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive }, } getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq) - // we don't commit to storage invalid update fields or empty display names. - if req.Field.GetState() == ocm.ShareState_SHARE_STATE_INVALID && req.Field.GetDisplayName() == "" { - log.Error().Msg("the update field is invalid, aborting reference manipulation") - return res, nil - - } - - // TODO(labkode): if update field is displayName we need to do a rename on the storage to align - // share display name and storage filename. - if req.Field.GetState() != ocm.ShareState_SHARE_STATE_INVALID { - if req.Field.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED { - if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER { - srcIdp := share.GetShare().GetOwner().GetIdp() - meshProvider, err := s.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{ - Domain: srcIdp, - }) if err != nil { log.Err(err).Msg("gateway: error calling GetReceivedShare") return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, }, nil } if getShareRes.Status.Code != rpc.Code_CODE_OK { log.Error().Msg("gateway: error calling GetReceivedShare") - var srcEndpoint string - var srcEndpointBaseURI string - // target URI scheme will be the webdav endpoint scheme - var srcEndpointScheme string - for _, s := range meshProvider.ProviderInfo.Services { - if strings.ToLower(s.Endpoint.Type.Name) == "webdav" { - url, err := url.Parse(s.Endpoint.Path) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path) - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - srcEndpoint = url.Host - srcEndpointBaseURI = url.Path - srcEndpointScheme = url.Scheme - break - } - } - - var srcToken string - srcTokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"] - if !ok { return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewNotFound(ctx, "token not found"), - }, nil - } - switch srcTokenOpaque.Decoder { - case "plain": - srcToken = string(srcTokenOpaque.Value) - default: - err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder) - return &ocm.UpdateReceivedOCMShareResponse{ - Status: status.NewInternal(ctx, err, "error updating received share"), - }, nil - } - - srcPath := path.Join(srcEndpointBaseURI, share.GetShare().Name) - srcTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", srcEndpointScheme, srcToken, srcEndpoint, srcPath) - - // get the webdav endpoint of the grantee's idp - var granteeIdp string - if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER { - granteeIdp = share.GetShare().GetGrantee().GetUserId().Idp - } - if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP { - granteeIdp = share.GetShare().GetGrantee().GetGroupId().Idp - } - destWebdavEndpoint, err := s.getWebdavEndpoint(ctx, granteeIdp) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - url, err := url.Parse(destWebdavEndpoint) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint) - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, - }, nil - } - destEndpoint := url.Host - destEndpointBaseURI := url.Path - destEndpointScheme := url.Scheme - destToken := ctxpkg.ContextMustGetToken(ctx) - homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{}) - if err != nil { - log.Err(err).Msg("gateway: error calling UpdateReceivedShare") - return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, }, nil } - destPath := path.Join(destEndpointBaseURI, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name)) - destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destEndpoint, destPath) share := getShareRes.Share if share == nil { panic("gateway: error updating a received share: the share is nil") - opaqueObj := &types.Opaque{ - Map: map[string]*types.OpaqueEntry{ - "shareId": { - Decoder: "plain", - Value: []byte(share.GetShare().GetId().OpaqueId), - }, - }, - } - req := &datatx.PullTransferRequest{ - SrcTargetUri: srcTargetURI, - DestTargetUri: destTargetURI, - Opaque: opaqueObj, } if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER { - idp := share.GetShare().GetOwner().GetIdp() + srcIdp := share.GetShare().GetOwner().GetIdp() meshProvider, err := s.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{ - Domain: idp, + Domain: srcIdp, }) if err != nil { log.Err(err).Msg("gateway: error calling GetInfoByDomain") return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil } - var endpoint string - var endpointScheme string + var srcEndpoint string + var srcEndpointBaseURI string + // target URI scheme will be the webdav endpoint scheme + var srcEndpointScheme string for _, s := range meshProvider.ProviderInfo.Services { - fmt.Printf("provider info service: %v\n", s) - fmt.Printf(" endpoint type name: %v\n", s.Endpoint.Type.Name) - fmt.Printf(" endpoint Path: %v\n", s.Endpoint.Path) - fmt.Printf(" service host: %v\n", s.GetHost()) if strings.ToLower(s.Endpoint.Type.Name) == "webdav" { - url, _ := url.Parse(s.Endpoint.Path) - endpoint = url.Host + url.Path - endpointScheme = url.Scheme + url, err := url.Parse(s.Endpoint.Path) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + srcEndpoint = url.Host + srcEndpointBaseURI = url.Path + srcEndpointScheme = url.Scheme break } } - var token string - tokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"] + var srcToken string + srcTokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"] if !ok { return &ocm.UpdateReceivedOCMShareResponse{ Status: status.NewNotFound(ctx, "token not found"), }, nil } - switch tokenOpaque.Decoder { + switch srcTokenOpaque.Decoder { case "plain": - token = string(tokenOpaque.Value) + srcToken = string(srcTokenOpaque.Value) default: - err := errtypes.NotSupported("opaque entry decoder not recognized: " + tokenOpaque.Decoder) + err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder) return &ocm.UpdateReceivedOCMShareResponse{ Status: status.NewInternal(ctx, err, "error updating received share"), }, nil } - // TODO we provide all necessary info with the targetURI - // either provide this in a 'proper' way or, - // inject the necessary services into datatx service and resolve everything there - targetURI := fmt.Sprintf("://%s@%s?name=%s&endpointscheme=%s", token, endpoint, share.GetShare().Name, endpointScheme) - // src: https(from src webdav endoint)://token@srcwebdavendpoint?name=path - // target idem taken from the grantee - // /home/DataTransfers/home/mytransfer/innerfolder/ - fmt.Printf("idp: %v\n", idp) - fmt.Printf("endpoint: %v\n", endpoint) - fmt.Printf("token: %v\n", token) - fmt.Printf("targetURI: %v\n", targetURI) + srcPath := path.Join(srcEndpointBaseURI, share.GetShare().Name) + srcTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", srcEndpointScheme, srcToken, srcEndpoint, srcPath) // get the webdav endpoint of the grantee's idp - // assume grantee is of type user - granteeIdpEndpoint, err := s.getWebdavEndpoint(ctx, share.GetShare().GetGrantee().GetUserId().Idp) + var granteeIdp string + if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER { + granteeIdp = share.GetShare().GetGrantee().GetUserId().Idp + } + if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP { + granteeIdp = share.GetShare().GetGrantee().GetGroupId().Idp + } + destWebdavEndpoint, err := s.getWebdavEndpoint(ctx, granteeIdp) if err != nil { - log.Err(err).Msg("gateway: error calling PullTransfer") + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") return &ocm.UpdateReceivedOCMShareResponse{ - Status: &rpc.Status{ - Code: rpc.Code_CODE_INTERNAL, - }, + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + url, err := url.Parse(destWebdavEndpoint) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + destEndpoint := url.Host + destEndpointBaseURI := url.Path + destEndpointScheme := url.Scheme + destToken := ctxpkg.ContextMustGetToken(ctx) + homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{}) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, }, nil } + destPath := path.Join(destEndpointBaseURI, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name)) + destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destEndpoint, destPath) opaqueObj := &types.Opaque{ Map: map[string]*types.OpaqueEntry{ - "shareId": &types.OpaqueEntry{ + "shareId": { Decoder: "plain", Value: []byte(share.GetShare().GetId().OpaqueId), }, - "endpoint": &types.OpaqueEntry{ - Decoder: "plain", - Value: []byte(granteeIdpEndpoint), - }, }, } req := &datatx.PullTransferRequest{ - TargetUri: targetURI, - Opaque: opaqueObj, + SrcTargetUri: srcTargetURI, + DestTargetUri: destTargetURI, + Opaque: opaqueObj, } res, err := s.PullTransfer(ctx, req) if err != nil {