diff --git a/flyteadmin/dataproxy/service.go b/flyteadmin/dataproxy/service.go index 27d03e3eda..6cca7dd01c 100644 --- a/flyteadmin/dataproxy/service.go +++ b/flyteadmin/dataproxy/service.go @@ -179,7 +179,7 @@ func (s Service) CreateDownloadLink(ctx context.Context, req *service.CreateDown } if len(nativeURL) == 0 { - return nil, errors.NewFlyteAdminErrorf(codes.Internal, "no deckUrl found for request [%+v]", req) + return nil, errors.NewFlyteAdminErrorf(codes.NotFound, "no deckUrl found for request [%+v]", req) } ref := storage.DataReference(nativeURL) diff --git a/flyteadmin/dataproxy/service_test.go b/flyteadmin/dataproxy/service_test.go index c66b9df9b7..a9eb792c80 100644 --- a/flyteadmin/dataproxy/service_test.go +++ b/flyteadmin/dataproxy/service_test.go @@ -170,34 +170,50 @@ func (t testMetadata) Exists() bool { return t.exists } -func TestCreateDownloadLink(t *testing.T) { +type CreateDownloadLinkDependencies struct { + dataStore *storage.DataStore + nodeExecutionManager *mocks.NodeExecutionInterface + taskExecutionManager *mocks.TaskExecutionInterface +} + +func setupCreateDownloadLink(t *testing.T) (Service, CreateDownloadLinkDependencies) { dataStore := commonMocks.GetMockStorageClient() nodeExecutionManager := &mocks.NodeExecutionInterface{} - nodeExecutionManager.EXPECT().GetNodeExecution(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) { + taskExecutionManager := &mocks.TaskExecutionInterface{} + s, err := NewService(config.DataProxyConfig{Download: config.DataProxyDownloadConfig{MaxExpiresIn: stdlibConfig.Duration{Duration: time.Hour}}}, nodeExecutionManager, dataStore, taskExecutionManager) + assert.NoError(t, err) + return s, CreateDownloadLinkDependencies{ + dataStore: dataStore, + nodeExecutionManager: nodeExecutionManager, + taskExecutionManager: taskExecutionManager, + } +} + +func TestCreateDownloadLink(t *testing.T) { + nodeExecutionFunc := func(ctx context.Context, request *admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) { return &admin.NodeExecution{ Closure: &admin.NodeExecutionClosure{ DeckUri: "s3://something/something", }, }, nil - }) - taskExecutionManager := &mocks.TaskExecutionInterface{} - - s, err := NewService(config.DataProxyConfig{Download: config.DataProxyDownloadConfig{MaxExpiresIn: stdlibConfig.Duration{Duration: time.Hour}}}, nodeExecutionManager, dataStore, taskExecutionManager) - assert.NoError(t, err) + } t.Run("Invalid expiry", func(t *testing.T) { - _, err = s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ + s, dependencies := setupCreateDownloadLink(t) + dependencies.nodeExecutionManager.EXPECT().GetNodeExecution(mock.Anything, mock.Anything).RunAndReturn(nodeExecutionFunc) + _, err := s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ ExpiresIn: durationpb.New(-time.Hour), }) assert.Error(t, err) }) t.Run("item not found", func(t *testing.T) { - dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, ref storage.DataReference) (storage.Metadata, error) { + s, dependencies := setupCreateDownloadLink(t) + dependencies.dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, ref storage.DataReference) (storage.Metadata, error) { return testMetadata{exists: false}, nil } - - _, err = s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ + dependencies.nodeExecutionManager.EXPECT().GetNodeExecution(mock.Anything, mock.Anything).RunAndReturn(nodeExecutionFunc) + _, err := s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ ArtifactType: service.ArtifactType_ARTIFACT_TYPE_DECK, Source: &service.CreateDownloadLinkRequest_NodeExecutionId{ NodeExecutionId: &core.NodeExecutionIdentifier{}, @@ -212,11 +228,12 @@ func TestCreateDownloadLink(t *testing.T) { }) t.Run("valid config", func(t *testing.T) { - dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, ref storage.DataReference) (storage.Metadata, error) { + s, dependencies := setupCreateDownloadLink(t) + dependencies.dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, ref storage.DataReference) (storage.Metadata, error) { return testMetadata{exists: true}, nil } - - _, err = s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ + dependencies.nodeExecutionManager.EXPECT().GetNodeExecution(mock.Anything, mock.Anything).RunAndReturn(nodeExecutionFunc) + _, err := s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ ArtifactType: service.ArtifactType_ARTIFACT_TYPE_DECK, Source: &service.CreateDownloadLinkRequest_NodeExecutionId{ NodeExecutionId: &core.NodeExecutionIdentifier{}, @@ -227,12 +244,40 @@ func TestCreateDownloadLink(t *testing.T) { assert.NoError(t, err) }) + t.Run("no deckUrl found", func(t *testing.T) { + s, dependencies := setupCreateDownloadLink(t) + dependencies.dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, ref storage.DataReference) (storage.Metadata, error) { + return testMetadata{exists: true}, nil + } + nodeExecutionFuncWithoutDecks := func(ctx context.Context, request *admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) { + return &admin.NodeExecution{ + Closure: &admin.NodeExecutionClosure{ + DeckUri: "", + }, + }, nil + } + dependencies.nodeExecutionManager.EXPECT().GetNodeExecution(mock.Anything, mock.Anything).RunAndReturn(nodeExecutionFuncWithoutDecks) + _, err := s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ + ArtifactType: service.ArtifactType_ARTIFACT_TYPE_DECK, + Source: &service.CreateDownloadLinkRequest_NodeExecutionId{ + NodeExecutionId: &core.NodeExecutionIdentifier{}, + }, + ExpiresIn: durationpb.New(time.Hour), + }) + assert.Error(t, err) + st, ok := status.FromError(err) + assert.True(t, ok) + assert.Equal(t, codes.NotFound, st.Code()) + assert.Contains(t, st.Message(), "no deckUrl found for request") + }) + t.Run("head failed", func(t *testing.T) { - dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, ref storage.DataReference) (storage.Metadata, error) { + s, dependencies := setupCreateDownloadLink(t) + dependencies.dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, ref storage.DataReference) (storage.Metadata, error) { return testMetadata{}, fmt.Errorf("head fail") } - - _, err = s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ + dependencies.nodeExecutionManager.EXPECT().GetNodeExecution(mock.Anything, mock.Anything).RunAndReturn(nodeExecutionFunc) + _, err := s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ ArtifactType: service.ArtifactType_ARTIFACT_TYPE_DECK, Source: &service.CreateDownloadLinkRequest_NodeExecutionId{ NodeExecutionId: &core.NodeExecutionIdentifier{}, @@ -247,11 +292,12 @@ func TestCreateDownloadLink(t *testing.T) { }) t.Run("use default ExpiresIn", func(t *testing.T) { - dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, ref storage.DataReference) (storage.Metadata, error) { + s, dependencies := setupCreateDownloadLink(t) + dependencies.dataStore.ComposedProtobufStore.(*commonMocks.TestDataStore).HeadCb = func(ctx context.Context, ref storage.DataReference) (storage.Metadata, error) { return testMetadata{exists: true}, nil } - - _, err = s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ + dependencies.nodeExecutionManager.EXPECT().GetNodeExecution(mock.Anything, mock.Anything).RunAndReturn(nodeExecutionFunc) + _, err := s.CreateDownloadLink(context.Background(), &service.CreateDownloadLinkRequest{ ArtifactType: service.ArtifactType_ARTIFACT_TYPE_DECK, Source: &service.CreateDownloadLinkRequest_NodeExecutionId{ NodeExecutionId: &core.NodeExecutionIdentifier{},