Skip to content

Commit

Permalink
Feature/storage/stream position deadlock bug (#14301)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmcc-msft authored Aug 31, 2020
1 parent c9ea66c commit 398ab3c
Show file tree
Hide file tree
Showing 56 changed files with 7,855 additions and 15 deletions.
1 change: 1 addition & 0 deletions sdk/storage/Azure.Storage.Blobs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.6.0-preview.1 (Unreleased)
- Fixed bug where BlobClient.Upload(), BlockBlobClient.Upload(), AppendBlobClient.AppendBlock(), and PageBlobClient.UploadPages() would deadlock if the content stream's position was not 0.
- Fixed bug in BlobBaseClient.OpenRead() causing us to do more download called than necessary.

## 12.5.1 (2020-08-18)
Expand Down
3 changes: 2 additions & 1 deletion sdk/storage/Azure.Storage.Blobs/src/AppendBlobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -962,14 +962,15 @@ internal async Task<Response<BlobAppendInfo>> AppendBlockInternal(
try
{
BlobErrors.VerifyHttpsCustomerProvidedKey(Uri, CustomerProvidedKey);
Errors.VerifyStreamPosition(content, nameof(content));

content = content?.WithNoDispose().WithProgress(progressHandler);
return await BlobRestClient.AppendBlob.AppendBlockAsync(
ClientDiagnostics,
Pipeline,
Uri,
body: content,
contentLength: content?.Length ?? 0,
contentLength: (content?.Length - content?.Position) ?? 0,
version: Version.ToVersionString(),
transactionalContentHash: transactionalContentHash,
leaseId: conditions?.LeaseId,
Expand Down
7 changes: 5 additions & 2 deletions sdk/storage/Azure.Storage.Blobs/src/BlockBlobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -716,12 +716,14 @@ internal virtual async Task<Response<BlobContentInfo>> UploadInternal(
$"{nameof(conditions)}: {conditions}");
try
{
Errors.VerifyStreamPosition(content, nameof(content));

return await BlobRestClient.BlockBlob.UploadAsync(
ClientDiagnostics,
Pipeline,
Uri,
body: content,
contentLength: content?.Length ?? 0,
contentLength: (content?.Length - content?.Position) ?? 0,
version: Version.ToVersionString(),
blobContentType: blobHttpHeaders?.ContentType,
blobContentEncoding: blobHttpHeaders?.ContentEncoding,
Expand Down Expand Up @@ -967,14 +969,15 @@ internal virtual async Task<Response<BlockInfo>> StageBlockInternal(
$"{nameof(conditions)}: {conditions}");
try
{
Errors.VerifyStreamPosition(content, nameof(content));
content = content.WithNoDispose().WithProgress(progressHandler);
return await BlobRestClient.BlockBlob.StageBlockAsync(
ClientDiagnostics,
Pipeline,
Uri,
blockId: base64BlockId,
body: content,
contentLength: content.Length,
contentLength: (content?.Length - content?.Position) ?? 0,
version: Version.ToVersionString(),
transactionalContentHash: transactionalContentHash,
leaseId: conditions?.LeaseId,
Expand Down
5 changes: 3 additions & 2 deletions sdk/storage/Azure.Storage.Blobs/src/PageBlobClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,15 +1083,16 @@ internal async Task<Response<PageInfo>> UploadPagesInternal(
$"{nameof(conditions)}: {conditions}");
try
{
Errors.VerifyStreamPosition(content, nameof(content));
content = content?.WithNoDispose().WithProgress(progressHandler);
var range = new HttpRange(offset, content?.Length ?? null);
var range = new HttpRange(offset, (content?.Length - content?.Position) ?? null);

return await BlobRestClient.PageBlob.UploadPagesAsync(
ClientDiagnostics,
Pipeline,
Uri,
body: content,
contentLength: content?.Length ?? 0,
contentLength: (content?.Length - content?.Position) ?? 0,
version: Version.ToVersionString(),
transactionalContentHash: transactionalContentHash,
timeout: default,
Expand Down
51 changes: 51 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/tests/AppendBlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,57 @@ public async Task AppendBlockAsync_ProgressReporting()
Assert.AreEqual(blobSize, progress.List[progress.List.Count - 1]);
}

[Test]
public async Task AppendBlockAsync_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blobName = GetNewBlobName();
AppendBlobClient blob = InstrumentClient(test.Container.GetAppendBlobClient(blobName));
await blob.CreateAsync();
const int blobSize = Constants.KB;
var data = GetRandomBuffer(blobSize);

// Act
using Stream stream = new MemoryStream(data);
stream.Position = stream.Length;

await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.AppendBlockAsync(stream),
e => Assert.AreEqual("content.Position must be less than content.Length. Please set content.Position to the start of the data to upload.", e.Message));
}

[Test]
public async Task AppendBlockAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blobName = GetNewBlobName();
AppendBlobClient blob = InstrumentClient(test.Container.GetAppendBlobClient(blobName));
await blob.CreateAsync();
const int blobSize = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(blobSize);
byte[] expectedData = new byte[blobSize - position];
Array.Copy(data, position, expectedData, 0, blobSize - position);

// Act
using Stream stream = new MemoryStream(data)
{
Position = position
};
await blob.AppendBlockAsync(stream);

// Assert
Response<BlobDownloadInfo> result = await blob.DownloadAsync();
var dataResult = new MemoryStream();
await result.Value.Content.CopyToAsync(dataResult);
Assert.AreEqual(blobSize - position, dataResult.Length);
TestHelper.AssertSequenceEqual(expectedData, dataResult.ToArray());
}

[Test]
public async Task AppendBlockFromUriAsync_Min()
{
Expand Down
88 changes: 88 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/tests/BlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,94 @@ public async Task UploadAsync_File_UploadsBlock()
Assert.AreEqual(BlobType.Block, properties.Value.BlobType);
}

[Test]
public async Task UploadAsync_Stream_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

BlobClient blob = InstrumentClient(test.Container.GetBlobClient(GetNewBlobName()));
long size = Constants.KB;
byte[] data = GetRandomBuffer(size);

using Stream stream = new MemoryStream(data)
{
Position = size
};

// Act
await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.UploadAsync(
content: stream),
e => Assert.AreEqual("content.Position must be less than content.Length. Please set content.Position to the start of the data to upload.", e.Message));
}

[Test]
public async Task UploadAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

BlobClient blob = InstrumentClient(test.Container.GetBlobClient(GetNewBlobName()));
long size = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};

// Act
await blob.UploadAsync(content: stream);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task UploadAsync_NonZeroStreamPositionMultipleBlocks()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlobClient blob = InstrumentClient(test.Container.GetBlobClient(GetNewBlobName()));
long size = 2 * Constants.KB;
long position = 300;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};


BlobUploadOptions options = new BlobUploadOptions
{
TransferOptions = new StorageTransferOptions
{
MaximumTransferSize = 512,
InitialTransferSize = 512
}
};

// Act
await blob.UploadAsync(
content: stream,
options: options);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
[TestCase(1)]
[TestCase(4)]
Expand Down
150 changes: 150 additions & 0 deletions sdk/storage/Azure.Storage.Blobs/tests/BlockBlobClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,66 @@ public async Task StageBlockAsync_ProgressReporting()
Assert.AreEqual(100 * Constants.MB, progress.List[progress.List.Count - 1]);
}

[Test]
public async Task StageBlockAsync_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blockBlobName = GetNewBlobName();
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(blockBlobName));
byte[] data = GetRandomBuffer(Size);

using Stream stream = new MemoryStream(data)
{
Position = Size
};

// Act
await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.StageBlockAsync(
base64BlockId: ToBase64(GetNewBlockName()),
content: stream),
e => Assert.AreEqual("content.Position must be less than content.Length. Please set content.Position to the start of the data to upload.", e.Message));
}

[Test]
public async Task StageBlockAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(GetNewBlobName()));
long size = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};

string blockId = ToBase64(GetNewBlockName());

// Act
await blob.StageBlockAsync(
blockId,
content: stream);

await blob.CommitBlockListAsync(new List<string>()
{
blockId
});

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task StageBlockFromUriAsync_Min()
{
Expand Down Expand Up @@ -2060,6 +2120,96 @@ public async Task UploadAsync_VersionId()
}
}

[Test]
public async Task UploadAsync_InvalidStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
var blockBlobName = GetNewBlobName();
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(blockBlobName));
byte[] data = GetRandomBuffer(Size);

using Stream stream = new MemoryStream(data)
{
Position = Size
};

// Act
await TestHelper.AssertExpectedExceptionAsync<ArgumentException>(
blob.UploadAsync(
content: stream),
e => Assert.AreEqual("content.Position must be less than content.Length. Please set content.Position to the start of the data to upload.", e.Message));
}

[Test]
public async Task UploadAsync_NonZeroStreamPosition()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(GetNewBlobName()));
long size = Constants.KB;
long position = 512;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};

// Act
await blob.UploadAsync(content: stream);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task UploadAsync_NonZeroStreamPositionMultipleBlocks()
{
await using DisposingContainer test = await GetTestContainerAsync();

// Arrange
BlockBlobClient blob = InstrumentClient(test.Container.GetBlockBlobClient(GetNewBlobName()));
long size = 2 * Constants.KB;
long position = 300;
byte[] data = GetRandomBuffer(size);
byte[] expectedData = new byte[size - position];
Array.Copy(data, position, expectedData, 0, size - position);

using Stream stream = new MemoryStream(data)
{
Position = position
};


BlobUploadOptions options = new BlobUploadOptions
{
TransferOptions = new StorageTransferOptions
{
MaximumTransferSize = 512,
InitialTransferSize = 512
}
};

// Act
await blob.UploadAsync(
content: stream,
options: options);

// Assert
Response<BlobDownloadInfo> downloadResponse = await blob.DownloadAsync();
var actual = new MemoryStream();
await downloadResponse.Value.Content.CopyToAsync(actual);
TestHelper.AssertSequenceEqual(expectedData, actual.ToArray());
}

[Test]
public async Task GetBlockBlobClient_AsciiName()
{
Expand Down
Loading

0 comments on commit 398ab3c

Please sign in to comment.