Skip to content

Commit

Permalink
Datalake inputstream (#21322)
Browse files Browse the repository at this point in the history
* DataLakeFileClient add openInputStream()

* datalake inputstream tests

added tests
removed unncecessary openinputstream overload

* changelog

* Fixed blob artifacts from feature port

Fixed docstrings and parameters that used "blob" instead of "file"
terminology.
Fixed a docstring link that referenced cut blob-only functionality.

* FileInputStream fix and docs issues

DataLakeFileInputStream now uses logger.logThrowableAsError.
Header and docstring fixes.

* Casts throwable to match checked exception type

* CI fixes

* fixes

* checkstyle

added a suppression to deal with a checkstyle bug
minor fixes

* Refactored StorageInputStream

StorageInputStream now has an implementation of dispatchRead() and only
delegates out the implementation of the client read operation itself.

* checkstyle suppression and pr feedback

* PR feedback

* import cleanup

* Undo abstract method shifting

* import cleanup

* imports and checkstyle supression to match blobs

* minor fixes

* PR comments

* docstring

* api rewrite

openInputStream returns a result class containing InputStream and
PathProperties members.
The returned InputStream is a BlobInputStream instance.
Ported blob inputStream test class

* PR feedback

* Styling

* reverted *Result type implements Closable

* cleanup imports

Co-authored-by: jschrepp-MSFT <[email protected]>
  • Loading branch information
jaschrep-msft and jaschrep-msft authored Jun 29, 2021
1 parent 70e7db4 commit e603f6a
Show file tree
Hide file tree
Showing 27 changed files with 4,185 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,9 @@ public final class BlobInputStream extends StorageInputStream {
@Override
protected synchronized ByteBuffer dispatchRead(final int readLength, final long offset) throws IOException {
try {
ByteBuffer currentBuffer = this.blobClient.downloadWithResponse(new BlobRange(offset,
(long) readLength), null, this.accessCondition, false)
.flatMap(response -> {
return FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(ByteBuffer::wrap);
})
ByteBuffer currentBuffer = this.blobClient.downloadWithResponse(
new BlobRange(offset, (long) readLength), null, this.accessCondition, false)
.flatMap(response -> FluxUtil.collectBytesInByteBufferStream(response.getValue()).map(ByteBuffer::wrap))
.block();

this.bufferSize = readLength;
Expand All @@ -77,15 +75,13 @@ protected synchronized ByteBuffer dispatchRead(final int readLength, final long
} catch (final BlobStorageException e) {
this.streamFaulted = true;
this.lastError = new IOException(e);

throw this.lastError;
}
}

/**
* Gets the blob properties.
* <p>
* If no data has been read from the stream, a network call is made to get properties. Otherwise, the blob
* properties obtained from the download are stored.
* Gets the blob properties as fetched upon download.
*
* @return {@link BlobProperties}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public synchronized void close() {
}

/**
* Dispatches a read operation of N bytes.
* Dispatches a read operation of N bytes and updates stream state accordingly.
*
* @param readLength An <code>int</code> which represents the number of bytes to read.
* @param offset The start point of data to be acquired.
Expand Down
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-file-datalake/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.7.0-beta.1 (Unreleased)
- Added support for openInputStream to sync data lake file clients
- Added support for the 2020-10-02 service version.
- Added support to specify Parquet Input Serialization when querying a file.
- Updated DownloadRetryOptions.maxRetryRequests to default downloads to retry 5 times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ public Mono<PathInfo> upload(Flux<ByteBuffer> data, ParallelTransferOptions para

/**
* Creates a new file.
* <p>
* To avoid overwriting, pass "*" to {@link DataLakeRequestConditions#setIfNoneMatch(String)}.
*
* <p><strong>Code Samples</strong></p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobQueryResponse;
import com.azure.storage.blob.options.BlobDownloadToFileOptions;
import com.azure.storage.blob.options.BlobInputStreamOptions;
import com.azure.storage.blob.specialized.BlobInputStream;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.ParallelTransferOptions;
import com.azure.storage.common.Utility;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.FluxInputStream;
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.common.implementation.UploadUtils;
import com.azure.storage.file.datalake.implementation.models.InternalDataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.implementation.util.DataLakeImplUtils;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
import com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.models.FileQueryAsyncResponse;
import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
import com.azure.storage.file.datalake.models.FileQueryResponse;
Expand Down Expand Up @@ -207,7 +213,6 @@ public PathInfo upload(InputStream data, long length, boolean overwrite) {

/**
* Creates a new file.
* <p>
* To avoid overwriting, pass "*" to {@link DataLakeRequestConditions#setIfNoneMatch(String)}.
*
* <p><strong>Code Samples</strong></p>
Expand Down Expand Up @@ -496,6 +501,30 @@ public FileReadResponse readWithResponse(OutputStream stream, FileRange range, D
}, logger);
}

/**
* Opens a file input stream to download the file. Locks on ETags.
*
* @return An {@link InputStream} object that represents the stream to use for reading from the file.
* @throws DataLakeStorageException If a storage service error occurred.
*/
public DataLakeFileOpenInputStreamResult openInputStream() {
return openInputStream(null);
}

/**
* Opens a file input stream to download the specified range of the file. Defaults to ETag locking if the option
* is not specified.
*
* @param options {@link DataLakeFileInputStreamOptions}
* @return A {@link DataLakeFileOpenInputStreamResult} object that contains the stream to use for reading from the file.
* @throws DataLakeStorageException If a storage service error occurred.
*/
public DataLakeFileOpenInputStreamResult openInputStream(DataLakeFileInputStreamOptions options) {
BlobInputStreamOptions convertedOptions = Transforms.toBlobInputStreamOptions(options);
BlobInputStream inputStream = blockBlobClient.openInputStream(convertedOptions);
return new InternalDataLakeFileOpenInputStreamResult(inputStream,
Transforms.toPathProperties(inputStream.getProperties()));
}

/**
* Reads the entire file into a file specified by the path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import com.azure.storage.blob.models.BlobRetentionPolicy;
import com.azure.storage.blob.models.BlobServiceProperties;
import com.azure.storage.blob.models.BlobSignedIdentifier;
import com.azure.storage.blob.models.ConsistentReadControl;
import com.azure.storage.blob.models.ListBlobContainersOptions;
import com.azure.storage.blob.models.StaticWebsite;
import com.azure.storage.blob.options.BlobInputStreamOptions;
import com.azure.storage.blob.options.BlobQueryOptions;
import com.azure.storage.blob.options.UndeleteBlobContainerOptions;
import com.azure.storage.file.datalake.implementation.models.BlobItemInternal;
Expand Down Expand Up @@ -84,6 +86,7 @@
import com.azure.storage.file.datalake.models.PathProperties;
import com.azure.storage.file.datalake.models.PublicAccessType;
import com.azure.storage.file.datalake.models.UserDelegationKey;
import com.azure.storage.file.datalake.options.DataLakeFileInputStreamOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
import com.azure.storage.file.datalake.options.FileSystemUndeleteOptions;

Expand Down Expand Up @@ -224,6 +227,32 @@ static BlobHttpHeaders toBlobHttpHeaders(PathHttpHeaders pathHTTPHeaders) {
.setContentMd5(pathHTTPHeaders.getContentMd5());
}

static BlobInputStreamOptions toBlobInputStreamOptions(DataLakeFileInputStreamOptions options) {
if (options == null) {
return null;
}
return new BlobInputStreamOptions()
.setBlockSize(options.getBlockSize())
.setRange(toBlobRange(options.getRange()))
.setRequestConditions(toBlobRequestConditions(options.getRequestConditions()))
.setConsistentReadControl(toBlobConsistentReadControl(options.getConsistentReadControl()));
}

static com.azure.storage.blob.models.ConsistentReadControl toBlobConsistentReadControl(
com.azure.storage.file.datalake.models.ConsistentReadControl datalakeConsistentReadControl) {
if (datalakeConsistentReadControl == null) {
return null;
}
switch (datalakeConsistentReadControl) {
case NONE:
return ConsistentReadControl.NONE;
case ETAG:
return ConsistentReadControl.ETAG;
default:
throw new IllegalArgumentException("Could not convert ConsistentReadControl");
}
}

static BlobRange toBlobRange(FileRange fileRange) {
if (fileRange == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.implementation.models;

import com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult;
import com.azure.storage.file.datalake.models.PathProperties;

import java.io.InputStream;

public class InternalDataLakeFileOpenInputStreamResult implements DataLakeFileOpenInputStreamResult {

private final InputStream inputStream;
private final PathProperties properties;

public InternalDataLakeFileOpenInputStreamResult(InputStream inputStream, PathProperties properties) {
this.inputStream = inputStream;
this.properties = properties;
}

@Override
public InputStream getInputStream() {
return inputStream;
}

@Override
public PathProperties getProperties() {
return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.models;

/**
* Defines values to indicate what strategy the SDK should use when reading from a blob to ensure the view of the data
* is consistent and not changed during the read.
*/
public enum ConsistentReadControl {
/**
* No consistent read control. The client will honor user provided {@link DataLakeRequestConditions#getIfMatch()}
*/
NONE,

/**
* Default value. Consistent read control based on eTag.
* If {@link DataLakeRequestConditions#getIfMatch()} is set, the client will honor this value.
* Otherwise, {@link DataLakeRequestConditions#getIfMatch()} is set to the latest eTag.
* Note: Modification of the base blob will result in an {@code IOException} or a {@code BlobStorageException} if
* eTag is the only form of consistent read control being employed.
*/
ETAG,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.models;

import java.io.InputStream;

/**
* Result of opening an {@link InputStream} to a datalake file.
*/
public interface DataLakeFileOpenInputStreamResult {
/**
* @return the {@link InputStream} of the target file.
*/
InputStream getInputStream();

/**
* @return the {@link PathProperties} of the target file.
*/
PathProperties getProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.file.datalake.options;

import com.azure.core.annotation.Fluent;
import com.azure.storage.file.datalake.models.ConsistentReadControl;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.FileRange;

/**
* Extended options that may be passed when opening a blob input stream.
*/
@Fluent
public class DataLakeFileInputStreamOptions {

private FileRange range;
private DataLakeRequestConditions requestConditions;
private Integer blockSize;
private ConsistentReadControl consistentReadControl;

/**
* @return {@link FileRange}
*/
public FileRange getRange() {
return range;
}

/**
* @param range {@link FileRange}
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setRange(FileRange range) {
this.range = range;
return this;
}

/**
* @return {@link DataLakeRequestConditions}
*/
public DataLakeRequestConditions getRequestConditions() {
return requestConditions;
}

/**
* @param requestConditions {@link DataLakeRequestConditions}
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setRequestConditions(DataLakeRequestConditions requestConditions) {
this.requestConditions = requestConditions;
return this;
}

/**
* @return The size of each data chunk returned from the service. If block size is large, input stream will make
* fewer network calls, but each individual call will send more data and will therefore take longer.
* The default value is 4 MB.
*/
public Integer getBlockSize() {
return blockSize;
}

/**
* @param blockSize The size of each data chunk returned from the service. If block size is large, input stream
* will make fewer network calls, but each individual call will send more data and will therefore take longer.
* The default value is 4 MB.
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setBlockSize(Integer blockSize) {
this.blockSize = blockSize;
return this;
}

/**
* @return {@link ConsistentReadControl} Default is E-Tag.
*/
public ConsistentReadControl getConsistentReadControl() {
return consistentReadControl;
}

/**
* @param consistentReadControl {@link ConsistentReadControl} Default is E-Tag.
* @return The updated options.
*/
public DataLakeFileInputStreamOptions setConsistentReadControl(ConsistentReadControl consistentReadControl) {
this.consistentReadControl = consistentReadControl;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.storage.blob.BlobUrlParts
import com.azure.storage.blob.models.BlobErrorCode
import com.azure.storage.blob.models.BlobStorageException
import com.azure.storage.blob.options.BlobParallelUploadOptions
import com.azure.storage.common.ParallelTransferOptions
import com.azure.storage.common.ProgressReceiver
import com.azure.storage.common.implementation.Constants
import com.azure.storage.blob.models.BlockListType
import com.azure.storage.common.test.shared.extensions.LiveOnly
import com.azure.storage.common.test.shared.extensions.RequiredServiceVersion
import com.azure.storage.common.test.shared.policy.MockFailureResponsePolicy
import com.azure.storage.file.datalake.models.DownloadRetryOptions
import com.azure.storage.file.datalake.models.AccessTier
import com.azure.storage.file.datalake.models.DataLakeRequestConditions
import com.azure.storage.file.datalake.models.DataLakeStorageException
Expand Down
Loading

0 comments on commit e603f6a

Please sign in to comment.