Skip to content

Commit

Permalink
Structured Message: DataLake Append (#43275)
Browse files Browse the repository at this point in the history
* datalake append

* null fix

* fixes
  • Loading branch information
jaschrep-msft authored Apr 17, 2024
1 parent d3a625d commit 69ff822
Show file tree
Hide file tree
Showing 11 changed files with 94 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Security.Cryptography;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Diagnostics;
using Azure.Core.Pipeline;
using Azure.Core.TestFramework;
using Azure.Storage.Shared;
Expand Down Expand Up @@ -232,7 +233,7 @@ void AssertChecksum(RequestHeaders headers, string headerName)
};
}

#if BlobSDK
#if BlobSDK || DataLakeSDK
internal static Action<Request> GetRequestStructuredMessageAssertion(
StructuredMessage.Flags flags,
Func<Request, bool> isStructuredMessageExpected = default,
Expand Down Expand Up @@ -315,7 +316,7 @@ void AssertChecksum(ResponseHeaders headers, string headerName)
};
}

#if BlobSDK
#if BlobSDK || DataLakeSDK
internal static Action<Response> GetResponseStructuredMessageAssertion(
StructuredMessage.Flags flags,
Func<Response, bool> isStructuredMessageExpected = default)
Expand Down Expand Up @@ -422,7 +423,7 @@ public virtual async Task UploadPartitionSuccessfulHashComputation(StorageChecks
};

// make pipeline assertion for checking checksum was present on upload
#if BlobSDK
#if BlobSDK || DataLakeSDK
var assertion = algorithm.ResolveAuto() == StorageChecksumAlgorithm.StorageCrc64
? GetRequestStructuredMessageAssertion(StructuredMessage.Flags.StorageCrc64, null, dataLength)
: GetRequestChecksumHeaderAssertion(algorithm);
Expand Down Expand Up @@ -531,9 +532,9 @@ public virtual async Task UploadPartitionTamperedStreamThrows(StorageChecksumAlg
// Act
streamTamperPolicy.TransformRequestBody = true;
AsyncTestDelegate operation = async () => await UploadPartitionAsync(client, stream, validationOptions);

using var listener = AzureEventSourceListener.CreateConsoleLogger();
// Assert
#if BlobSDK
#if BlobSDK || DataLakeSDK
AssertWriteChecksumMismatch(operation, algorithm,
expectStructuredMessage: algorithm.ResolveAuto() == StorageChecksumAlgorithm.StorageCrc64);
#else
Expand All @@ -553,7 +554,7 @@ public virtual async Task UploadPartitionUsesDefaultClientValidationOptions(
var data = GetRandomBuffer(dataLength);

// make pipeline assertion for checking checksum was present on upload
#if BlobSDK
#if BlobSDK || DataLakeSDK
var assertion = clientAlgorithm.ResolveAuto() == StorageChecksumAlgorithm.StorageCrc64
? GetRequestStructuredMessageAssertion(StructuredMessage.Flags.StorageCrc64, null, dataLength)
: GetRequestChecksumHeaderAssertion(clientAlgorithm);
Expand Down Expand Up @@ -599,7 +600,7 @@ public virtual async Task UploadPartitionOverwritesDefaultClientValidationOption
};

// make pipeline assertion for checking checksum was present on upload
#if BlobSDK
#if BlobSDK || DataLakeSDK
var assertion = overrideAlgorithm.ResolveAuto() == StorageChecksumAlgorithm.StorageCrc64
? GetRequestStructuredMessageAssertion(StructuredMessage.Flags.StorageCrc64, null, dataLength)
: GetRequestChecksumHeaderAssertion(overrideAlgorithm);
Expand Down Expand Up @@ -1021,7 +1022,7 @@ public virtual async Task ParallelUploadOneShotSuccessfulHashComputation(Storage
};

// make pipeline assertion for checking checksum was present on upload
#if BlobSDK
#if BlobSDK || DataLakeSDK
var assertion = algorithm.ResolveAuto() == StorageChecksumAlgorithm.StorageCrc64
? GetRequestStructuredMessageAssertion(StructuredMessage.Flags.StorageCrc64, ParallelUploadIsChecksumExpected, dataLength)
: GetRequestChecksumHeaderAssertion(algorithm, isChecksumExpected: ParallelUploadIsChecksumExpected);
Expand Down Expand Up @@ -1115,7 +1116,7 @@ public virtual async Task ParallelUploadUsesDefaultClientValidationOptions(
};

// make pipeline assertion for checking checksum was present on upload
#if BlobSDK
#if BlobSDK || DataLakeSDK
var assertion = clientAlgorithm.ResolveAuto() == StorageChecksumAlgorithm.StorageCrc64 && !split
? GetRequestStructuredMessageAssertion(StructuredMessage.Flags.StorageCrc64, ParallelUploadIsChecksumExpected, dataLength)
: GetRequestChecksumHeaderAssertion(clientAlgorithm, isChecksumExpected: ParallelUploadIsChecksumExpected);
Expand Down Expand Up @@ -1173,7 +1174,7 @@ public virtual async Task ParallelUploadOverwritesDefaultClientValidationOptions
};

// make pipeline assertion for checking checksum was present on upload
#if BlobSDK
#if BlobSDK || DataLakeSDK
var assertion = overrideAlgorithm.ResolveAuto() == StorageChecksumAlgorithm.StorageCrc64 && !split
? GetRequestStructuredMessageAssertion(StructuredMessage.Flags.StorageCrc64, ParallelUploadIsChecksumExpected, dataLength)
: GetRequestChecksumHeaderAssertion(overrideAlgorithm, isChecksumExpected: ParallelUploadIsChecksumExpected);
Expand Down Expand Up @@ -1751,7 +1752,7 @@ public virtual async Task DownloadSuccessfulHashVerification(StorageChecksumAlgo
Assert.IsTrue(dest.ToArray().SequenceEqual(data));
}

#if BlobSDK
#if BlobSDK || DataLakeSDK
[TestCase(StorageChecksumAlgorithm.StorageCrc64, Constants.StructuredMessage.MaxDownloadCrcWithHeader, false, false)]
[TestCase(StorageChecksumAlgorithm.StorageCrc64, Constants.StructuredMessage.MaxDownloadCrcWithHeader-1, false, false)]
[TestCase(StorageChecksumAlgorithm.StorageCrc64, Constants.StructuredMessage.MaxDownloadCrcWithHeader+1, true, false)]
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/Azure.Storage.Files.DataLake/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "net",
"TagPrefix": "net/storage/Azure.Storage.Files.DataLake",
"Tag": "net/storage/Azure.Storage.Files.DataLake_2eae678081"
"Tag": "net/storage/Azure.Storage.Files.DataLake_ade992acfe"
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
<Compile Include="$(AzureStorageSharedSources)StorageServerTimeoutPolicy.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StorageVersionExtensions.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StructuredMessage.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StructuredMessageDecodingStream.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StructuredMessageEncodingStream.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)TransferValidationOptionsExtensions.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)UriExtensions.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)UriQueryParamsCollection.cs" LinkBase="Shared" />
Expand Down
40 changes: 34 additions & 6 deletions sdk/storage/Azure.Storage.Files.DataLake/src/DataLakeFileClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Azure.Storage.Blobs.Models;
using Azure.Storage.Files.DataLake.Models;
using Azure.Storage.Sas;
using Azure.Storage.Shared;
using Metadata = System.Collections.Generic.IDictionary<string, string>;

namespace Azure.Storage.Files.DataLake
Expand Down Expand Up @@ -2331,13 +2332,36 @@ internal virtual async Task<Response> AppendInternal(
using (ClientConfiguration.Pipeline.BeginLoggingScope(nameof(DataLakeFileClient)))
{
// compute hash BEFORE attaching progress handler
ContentHasher.GetHashResult hashResult = await ContentHasher.GetHashOrDefaultInternal(
content,
validationOptions,
async,
cancellationToken).ConfigureAwait(false);
ContentHasher.GetHashResult hashResult = null;
long contentLength = (content?.Length - content?.Position) ?? 0;
long? structuredContentLength = default;
string structuredBodyType = null;
if (content != null &&
validationOptions != null &&
validationOptions.ChecksumAlgorithm.ResolveAuto() == StorageChecksumAlgorithm.StorageCrc64 &&
validationOptions.PrecalculatedChecksum.IsEmpty)
{
// report progress in terms of caller bytes, not encoded bytes
structuredContentLength = contentLength;
structuredBodyType = Constants.StructuredMessage.CrcStructuredMessage;
content = content.WithNoDispose().WithProgress(progressHandler);
content = new StructuredMessageEncodingStream(
content,
Constants.StructuredMessage.DefaultSegmentContentLength,
StructuredMessage.Flags.StorageCrc64);
contentLength = content.Length - content.Position;
}
else
{
// compute hash BEFORE attaching progress handler
hashResult = await ContentHasher.GetHashOrDefaultInternal(
content,
validationOptions,
async,
cancellationToken).ConfigureAwait(false);
content = content?.WithNoDispose().WithProgress(progressHandler);
}

content = content?.WithNoDispose().WithProgress(progressHandler);
ClientConfiguration.Pipeline.LogMethodEnter(
nameof(DataLakeFileClient),
message:
Expand Down Expand Up @@ -2372,6 +2396,8 @@ internal virtual async Task<Response> AppendInternal(
encryptionKey: ClientConfiguration.CustomerProvidedKey?.EncryptionKey,
encryptionKeySha256: ClientConfiguration.CustomerProvidedKey?.EncryptionKeyHash,
encryptionAlgorithm: ClientConfiguration.CustomerProvidedKey?.EncryptionAlgorithm == null ? null : EncryptionAlgorithmTypeInternal.AES256,
structuredBodyType: structuredBodyType,
structuredContentLength: structuredContentLength,
leaseId: leaseId,
leaseAction: leaseAction,
leaseDuration: leaseDurationLong,
Expand All @@ -2391,6 +2417,8 @@ internal virtual async Task<Response> AppendInternal(
encryptionKey: ClientConfiguration.CustomerProvidedKey?.EncryptionKey,
encryptionKeySha256: ClientConfiguration.CustomerProvidedKey?.EncryptionKeyHash,
encryptionAlgorithm: ClientConfiguration.CustomerProvidedKey?.EncryptionAlgorithm == null ? null : EncryptionAlgorithmTypeInternal.AES256,
structuredBodyType: structuredBodyType,
structuredContentLength: structuredContentLength,
leaseId: leaseId,
leaseAction: leaseAction,
leaseDuration: leaseDurationLong,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 69ff822

Please sign in to comment.