Skip to content

Commit

Permalink
Added new option to cancel and ignore completed job error, and output…
Browse files Browse the repository at this point in the history
… a summary instead (#2519)
  • Loading branch information
gapra-msft authored Jan 18, 2024
1 parent 36cf27d commit 7401768
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 78 deletions.
21 changes: 16 additions & 5 deletions cmd/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ package cmd
import (
"errors"
"fmt"

"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/spf13/cobra"
)

// TODO should this command be removed? Previously AzCopy was supposed to have an independent backend (out of proc)
// TODO but that's not the plan anymore
type rawCancelCmdArgs struct {
jobID string
jobID string
ignoreCompletedJobError bool
}

func (raw rawCancelCmdArgs) cook() (cookedCancelCmdArgs, error) {
Expand All @@ -42,11 +42,12 @@ func (raw rawCancelCmdArgs) cook() (cookedCancelCmdArgs, error) {
return cookedCancelCmdArgs{}, fmt.Errorf("invalid jobId string passed: %q", raw.jobID)
}

return cookedCancelCmdArgs{jobID: jobID}, nil
return cookedCancelCmdArgs{jobID: jobID, ignoreCompletedJobError: raw.ignoreCompletedJobError}, nil
}

type cookedCancelCmdArgs struct {
jobID common.JobID
jobID common.JobID
ignoreCompletedJobError bool
}

// handles the cancel command
Expand All @@ -55,6 +56,14 @@ func (cca cookedCancelCmdArgs) process() error {
var cancelJobResponse common.CancelPauseResumeResponse
Rpc(common.ERpcCmd.CancelJob(), cca.jobID, &cancelJobResponse)
if !cancelJobResponse.CancelledPauseResumed {
if cca.ignoreCompletedJobError && cancelJobResponse.JobStatus == common.EJobStatus.Completed() {
glcm.Info(cancelJobResponse.ErrorMsg)
resp := common.ListJobSummaryResponse{}
rpcCmd := common.ERpcCmd.ListJobSummary()
Rpc(rpcCmd, &cca.jobID, &resp)
PrintJobProgressSummary(resp)
return nil
}
return errors.New(cancelJobResponse.ErrorMsg)
}
return nil
Expand Down Expand Up @@ -88,7 +97,7 @@ func init() {

err = cooked.process()
if err != nil {
glcm.Error("failed to perform copy command due to error " + err.Error())
glcm.Error("failed to perform cancel command due to error " + err.Error())
}

glcm.Exit(nil, common.EExitCode.Success())
Expand All @@ -98,4 +107,6 @@ func init() {
Hidden: true,
}
rootCmd.AddCommand(cancelCmd)

cancelCmd.PersistentFlags().BoolVar(&raw.ignoreCompletedJobError, "ignore-error-if-completed", false, "")
}
13 changes: 7 additions & 6 deletions common/rpc-models.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,14 @@ type ListJobTransfersRequest struct {
}

type ResumeJobRequest struct {
JobID JobID
SourceSAS string
DestinationSAS string
JobID JobID
SourceSAS string
DestinationSAS string
SrcServiceClient *ServiceClient
DstServiceClient *ServiceClient
IncludeTransfer map[string]int
ExcludeTransfer map[string]int
CredentialInfo CredentialInfo
IncludeTransfer map[string]int
ExcludeTransfer map[string]int
CredentialInfo CredentialInfo
}

// represents the Details and details of a single transfer
Expand All @@ -362,6 +362,7 @@ type TransferDetail struct {
type CancelPauseResumeResponse struct {
ErrorMsg string
CancelledPauseResumed bool
JobStatus JobStatus
}

// represents the list of Details and details of number of transfers
Expand Down
9 changes: 7 additions & 2 deletions e2etest/declarativeHelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ type params struct {
destNull bool

disableParallelTesting bool
trailingDot common.TrailingDotOption
decompress bool

trailingDot common.TrailingDotOption
decompress bool
// looks like this for a folder transfer:
/*
INFO: source: /New folder/New Text Document.txt dest: /Test/New folder/New Text Document.txt
Expand All @@ -191,6 +192,9 @@ type params struct {
INFO: source: dest: /New Text Document.txt
*/

// cancel params
ignoreErrorIfCompleted bool

// benchmark params
mode string
fileCount int
Expand All @@ -214,6 +218,7 @@ func (Operation) Sync() Operation { return Operation(1 << 1) }
func (Operation) CopyAndSync() Operation { return eOperation.Copy() | eOperation.Sync() }
func (Operation) Remove() Operation { return Operation(1 << 2) }
func (Operation) Resume() Operation { return Operation(1 << 7) } // Resume should only ever be combined with Copy or Sync, and is a mid-job cancel/resume.
func (Operation) Cancel() Operation { return Operation(1 << 3) }
func (Operation) Benchmark() Operation { return Operation(1 << 4) }

func (o Operation) String() string {
Expand Down
3 changes: 2 additions & 1 deletion e2etest/declarativeRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func RunScenarios(
// construct all the scenarios
scenarios := make([]scenario, 0)
for _, op := range operations.getValues() {
if op == eOperation.Resume() {
if op == eOperation.Resume() || op == eOperation.Cancel() {
continue
}

Expand Down Expand Up @@ -180,6 +180,7 @@ func RunScenarios(
hs: hsToUse,
fs: fs.DeepCopy(),
needResume: operations&eOperation.Resume() != 0,
needCancel: operations&eOperation.Cancel() != 0,
stripTopDir: p.stripTopDir,
}

Expand Down
34 changes: 34 additions & 0 deletions e2etest/declarativeScenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type scenario struct {
a asserter
state scenarioState // TODO: does this really need to be a separate struct?
needResume bool
needCancel bool
chToStdin chan string
}

Expand Down Expand Up @@ -153,6 +154,14 @@ func (s *scenario) Run() {
return // resume failed. No point in running validation
}

// cancel if needed
if s.needCancel {
s.cancelAzCopy(azcopyDir)
}
if s.a.Failed() {
return // resume failed. No point in running validation
}

// check
s.validateTransferStates(azcopyDir)
if s.a.Failed() {
Expand Down Expand Up @@ -341,6 +350,31 @@ func (s *scenario) runAzCopy(logDirectory string) {
s.state.result = &result
}

func (s *scenario) cancelAzCopy(logDir string) {
r := newTestRunner()
s.operation = eOperation.Cancel()
r.SetAllFlags(s)

afterStart := func() string { return "" }
result, wasClean, err := r.ExecuteAzCopyCommand(
eOperation.Cancel(),
s.state.result.jobID.String(),
"",
false,
false,
s.fromTo,
afterStart,
s.chToStdin,
logDir,
)

if !wasClean {
s.a.AssertNoErr(err, "running AzCopy")
}

s.state.result = &result
}

func (s *scenario) resumeAzCopy(logDir string) {
s.chToStdin = make(chan string) // unubuffered seems the most predictable for our usages
defer close(s.chToStdin)
Expand Down
9 changes: 9 additions & 0 deletions e2etest/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func (t *TestRunner) SetAllFlags(s *scenario) {
return
}

if o == eOperation.Cancel() {
set("ignore-error-if-completed", p.ignoreErrorIfCompleted, "")
return
}

// TODO: TODO: nakulkar-msft there will be many more to add here
set("recursive", p.recursive, false)
set("as-subdir", !p.invertedAsSubdir, true)
Expand Down Expand Up @@ -268,6 +273,8 @@ func (t *TestRunner) ExecuteAzCopyCommand(operation Operation, src, dst string,
verb = "remove"
case eOperation.Resume():
verb = "jobs resume"
case eOperation.Cancel():
verb = "cancel"
case eOperation.Benchmark():
verb = "bench"
default:
Expand All @@ -279,6 +286,8 @@ func (t *TestRunner) ExecuteAzCopyCommand(operation Operation, src, dst string,
args = args[:2]
} else if operation == eOperation.Resume() {
args = args[:3]
} else if operation == eOperation.Cancel() {
args = args[:2]
}
args = append(args, t.computeArgs()...)
if needsFromTo {
Expand Down
61 changes: 61 additions & 0 deletions e2etest/zt_cancel_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright © Microsoft <[email protected]>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package e2etest

import (
"github.com/Azure/azure-storage-azcopy/v10/common"
"testing"
)

func TestCancel_CompletedJobCopySync(t *testing.T) {
RunScenarios(t, eOperation.CopyAndSync()|eOperation.Cancel(), eTestFromTo.Other(common.EFromTo.BlobBlob()), eValidate.Auto(), anonymousAuthOnly, anonymousAuthOnly, params{
recursive: true,
ignoreErrorIfCompleted: true,
}, nil, testFiles{
defaultSize: "1K",
shouldTransfer: []interface{}{
folder(""),
f("file1.txt"),
},
}, EAccountType.Standard(), EAccountType.Standard(), "")
}

func TestCancel_CompletedJobRemove(t *testing.T) {
blobRemove := TestFromTo{
desc: "BlobRemove",
useAllTos: true,
froms: []common.Location{
common.ELocation.Blob(),
},
tos: []common.Location{
common.ELocation.Unknown(),
},
}
RunScenarios(t, eOperation.Remove()|eOperation.Cancel(), blobRemove, eValidate.Auto(), anonymousAuthOnly, anonymousAuthOnly, params{
ignoreErrorIfCompleted: true,
}, nil, testFiles{
defaultSize: "1K",
shouldTransfer: []interface{}{
folder(""),
f("file1.txt"),
},
}, EAccountType.Standard(), EAccountType.Standard(), "")
}
70 changes: 7 additions & 63 deletions jobsAdmin/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ func ExecuteNewCopyJobPartOrder(order common.CopyJobPartOrderRequest) common.Cop
})
// Supply no plan MMF because we don't have one, and AddJobPart will create one on its own.
// Add this part to the Job and schedule its transfers
args := &ste.AddJobPartArgs {
PartNum: order.PartNum,
PlanFile: jppfn,
ExistingPlanMMF: nil,
SrcClient: order.SrcServiceClient,
DstClient: order.DstServiceClient,
SourceTokenCred: order.CredentialInfo.S2SSourceTokenCredential,
args := &ste.AddJobPartArgs{
PartNum: order.PartNum,
PlanFile: jppfn,
ExistingPlanMMF: nil,
SrcClient: order.SrcServiceClient,
DstClient: order.DstServiceClient,
SourceTokenCred: order.CredentialInfo.S2SSourceTokenCredential,
ScheduleTransfers: true,
}
jm.AddJobPart2(args)
Expand Down Expand Up @@ -226,62 +226,6 @@ func CancelPauseJobOrder(jobID common.JobID, desiredJobStatus common.JobStatus)
return jm.CancelPauseJobOrder(desiredJobStatus)
}

/*
// Search for the Part 0 of the Job, since the Part 0 status concludes the actual status of the Job
jpm, found := jm.JobPartMgr(0)
if !found {
return common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("job with JobId %s has a missing 0th part", jobID.String()),
}
}
jpp0 := jpm.Plan()
var jr common.CancelPauseResumeResponse
switch jpp0.JobStatus() { // Current status
case common.EJobStatus.Completed(): // You can't change state of a completed job
jr = common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("Can't %s JobID=%v because it has already completed", verb, jobID),
}
case common.EJobStatus.Cancelled():
// If the status of Job is cancelled, it means that it has already been cancelled
// No need to cancel further
jr = common.CancelPauseResumeResponse{
CancelledPauseResumed: false,
ErrorMsg: fmt.Sprintf("cannot cancel the job %s since it is already cancelled", jobID),
}
case common.EJobStatus.Cancelling():
// If the status of Job is cancelling, it means that it has already been requested for cancellation
// No need to cancel further
jr = common.CancelPauseResumeResponse{
CancelledPauseResumed: true,
ErrorMsg: fmt.Sprintf("cannot cancel the job %s since it has already been requested for cancellation", jobID),
}
case common.EJobStatus.InProgress():
// If the Job status is in Progress and Job is not completely ordered
// Job cannot be resumed later, hence graceful cancellation is not required
// hence sending the response immediately. Response CancelPauseResumeResponse
// returned has CancelledPauseResumed set to false, because that will let
// Job immediately stop.
fallthrough
case common.EJobStatus.Paused(): // Logically, It's OK to pause an already-paused job
jpp0.SetJobStatus(desiredJobStatus)
msg := fmt.Sprintf("JobID=%v %s", jobID,
common.Iff(desiredJobStatus == common.EJobStatus.Paused(), "paused", "canceled"))
if jm.ShouldLog(pipeline.LogInfo) {
jm.Log(pipeline.LogInfo, msg)
}
jm.Cancel() // Stop all inflight-chunks/transfer for this job (this includes all parts)
jr = common.CancelPauseResumeResponse{
CancelledPauseResumed: true,
ErrorMsg: msg,
}
}
return jr
}
*/
func ResumeJobOrder(req common.ResumeJobRequest) common.CancelPauseResumeResponse {
// Strip '?' if present as first character of the source sas / destination sas
if len(req.SourceSAS) > 0 && req.SourceSAS[0] == '?' {
Expand Down
Loading

0 comments on commit 7401768

Please sign in to comment.