Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] [DataMovement] Added exception handling during internal transfer #46968

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,20 @@ public static async Task<JobPartPlanFile> CreateJobPartPlanFileAsync(
FileName = fileName
};

using (FileStream fileStream = File.Create(result.FileName.ToString()))
try
{
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
using (FileStream fileStream = File.Create(result.FileName.ToString()))
{
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception)
{
// will handle if file has not been created yet
File.Delete(result.FileName.ToString());
throw;
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,18 @@ public static async Task<JobPlanFile> CreateJobPlanFileAsync(
string filePath = Path.Combine(checkpointerPath, fileName);

JobPlanFile jobPlanFile = new(id, filePath);
using (FileStream fileStream = File.Create(jobPlanFile.FilePath))
try
{
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
using (FileStream fileStream = File.Create(jobPlanFile.FilePath))
{
await headerStream.CopyToAsync(fileStream, DefaultBufferSize, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception)
{
// will handle if file has not been created yet
File.Delete(jobPlanFile.FilePath);
throw;
}

return jobPlanFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@ public override async Task AddNewJobPartAsync(
CancellationHelper.ThrowIfCancellationRequested(cancellationToken);
headerStream.Position = 0;

if (!_transferStates.ContainsKey(transferId))
{
// We should never get here because AddNewJobAsync should
// always be called first.
throw Errors.MissingTransferIdAddPartCheckpointer(transferId, partNumber);
}

JobPartPlanFile mappedFile = await JobPartPlanFile.CreateJobPartPlanFileAsync(
_pathToCheckpointer,
transferId,
Expand All @@ -121,16 +128,7 @@ public override async Task AddNewJobPartAsync(
cancellationToken).ConfigureAwait(false);

// Add the job part into the current state
if (_transferStates.ContainsKey(transferId))
{
_transferStates[transferId].JobParts.Add(partNumber, mappedFile);
}
else
{
// We should never get here because AddNewJobAsync should
// always be called first.
throw Errors.MissingTransferIdAddPartCheckpointer(transferId, partNumber);
}
_transferStates[transferId].JobParts.Add(partNumber, mappedFile);
}

public override Task<int> CurrentJobPartCountAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ public static ServiceToServiceJobPart CreateJobPartFromCheckpoint(

public override async Task ProcessPartToChunkAsync()
{
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);

long? fileLength = default;
StorageResourceItemProperties sourceProperties = default;
try
{
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);

long? fileLength = default;
StorageResourceItemProperties sourceProperties = default;
fileLength = _sourceResource.Length;
sourceProperties = await _sourceResource.GetPropertiesAsync(_cancellationToken).ConfigureAwait(false);
await _destinationResource.SetPermissionsAsync(
Expand All @@ -193,60 +193,58 @@ await _destinationResource.SetPermissionsAsync(
_cancellationToken).ConfigureAwait(false);

fileLength = sourceProperties.ResourceLength;
}
catch (Exception ex)
{
// TODO: logging when given the event handler
await InvokeFailedArg(ex).ConfigureAwait(false);
return;
}
if (!fileLength.HasValue)
{
await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false);
return;
}
long length = fileLength.Value;
if (!fileLength.HasValue)
{
await InvokeFailedArg(Errors.UnableToGetLength()).ConfigureAwait(false);
return;
}
long length = fileLength.Value;

// Perform a single copy operation
if (_initialTransferSize >= length)
{
await QueueChunkToChannelAsync(
async () =>
await StartSingleCallCopy(length).ConfigureAwait(false))
.ConfigureAwait(false);
return;
}
// Perform a single copy operation
if (_initialTransferSize >= length)
{
await QueueChunkToChannelAsync(
async () =>
await StartSingleCallCopy(length).ConfigureAwait(false))
.ConfigureAwait(false);
return;
}

// Perform a series of chunk copies followed by a commit
long blockSize = _transferChunkSize;

_commitBlockHandler = GetCommitController(
expectedLength: length,
blockSize: blockSize,
this,
_destinationResource.TransferType,
sourceProperties);
// If we cannot upload in one shot, initiate the parallel block uploader
if (await CreateDestinationResource(length, blockSize).ConfigureAwait(false))
{
List<(long Offset, long Length)> commitBlockList = GetRangeList(blockSize, length);
if (_destinationResource.TransferType == DataTransferOrder.Unordered)
// Perform a series of chunk copies followed by a commit
long blockSize = _transferChunkSize;

_commitBlockHandler = GetCommitController(
expectedLength: length,
blockSize: blockSize,
this,
_destinationResource.TransferType,
sourceProperties);
// If we cannot upload in one shot, initiate the parallel block uploader
if (await CreateDestinationResource(length, blockSize).ConfigureAwait(false))
{
await QueueStageBlockRequests(commitBlockList, length, sourceProperties).ConfigureAwait(false);
List<(long Offset, long Length)> commitBlockList = GetRangeList(blockSize, length);
if (_destinationResource.TransferType == DataTransferOrder.Unordered)
{
await QueueStageBlockRequests(commitBlockList, length, sourceProperties).ConfigureAwait(false);
}
else // Sequential
{
// Queue the first partitioned block task
await QueueStageBlockRequest(
commitBlockList[0].Offset,
commitBlockList[0].Length,
length,
sourceProperties).ConfigureAwait(false);
}
}
else // Sequential
else
{
// Queue the first partitioned block task
await QueueStageBlockRequest(
commitBlockList[0].Offset,
commitBlockList[0].Length,
length,
sourceProperties).ConfigureAwait(false);
await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false);
}
}
else
catch (Exception ex)
{
await CheckAndUpdateCancellationStateAsync().ConfigureAwait(false);
await InvokeFailedArg(ex).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,10 @@ public override async Task ProcessPartToChunkAsync()
// Attempt to get the length, it's possible the file could
// not be accessible (or does not exist).
string operationName = $"{nameof(TransferManager.StartTransferAsync)}";
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
long? fileLength = default;
try
{
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
long? fileLength = default;
StorageResourceItemProperties properties = await _sourceResource.GetPropertiesAsync(_cancellationToken).ConfigureAwait(false);
fileLength = properties.ResourceLength;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,15 @@ public void DisposeHandlers()
/// <returns>An IEnumerable that contains the job parts</returns>
public virtual async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync()
{
await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
try
{
await OnJobStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}
int partNumber = 0;

if (_jobParts.Count == 0)
Expand Down Expand Up @@ -324,7 +332,18 @@ public virtual async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync(
}
}

if (!await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false))
bool isEnumerationComplete;
try
{
isEnumerationComplete = await _checkpointer.IsEnumerationCompleteAsync(_dataTransfer.Id, _cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
yield break;
}

if (!isEnumerationComplete)
{
await foreach (JobPartInternal jobPartInternal in GetStorageResourcesAsync().ConfigureAwait(false))
{
Expand All @@ -333,8 +352,15 @@ public virtual async IAsyncEnumerable<JobPartInternal> ProcessJobToJobPartAsync(
}
}

// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
try
{
// Call regardless of the outcome of enumeration so job can pause/finish
await OnEnumerationComplete().ConfigureAwait(false);
}
catch (Exception ex)
{
await InvokeFailedArgAsync(ex).ConfigureAwait(false);
}
}

private async IAsyncEnumerable<JobPartInternal> GetStorageResourcesAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,9 @@ public override async Task ProcessPartToChunkAsync()
{
// we can default the length to 0 because we know the destination is local and
// does not require a length to be created.
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);

try
{
await OnTransferStateChangedAsync(DataTransferState.InProgress).ConfigureAwait(false);
if (!_sourceResource.Length.HasValue)
{
await UnknownDownloadInternal().ConfigureAwait(false);
Expand Down
Loading