Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce StorageConnector for Azure #14660

Merged
merged 31 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
26f5bfb
initial commit
LakshSingla Jul 5, 2023
2b9787a
classes n code
LakshSingla Jul 6, 2023
4631946
Merge branch 'master' into gcs-storage-connector
LakshSingla Jul 6, 2023
d41e967
add stubs for other classes
LakshSingla Jul 7, 2023
71c50fa
goc changes stash
LakshSingla Jul 12, 2023
d07b740
version before batch delete
LakshSingla Jul 14, 2023
7c482c5
storage connector final
LakshSingla Jul 19, 2023
39eff75
Merge branch 'master' into gcs-storage-connector
LakshSingla Jul 19, 2023
5730c35
cleanup S3 storage connector
LakshSingla Jul 19, 2023
b119af8
change byte format
LakshSingla Jul 19, 2023
8448be4
add azure files
LakshSingla Jul 25, 2023
7953f73
revert gcs changes
LakshSingla Jul 26, 2023
f657137
remove files
LakshSingla Jul 26, 2023
cffbb7e
remove RetryableAzureOutputStream since that is already done in the a…
LakshSingla Jul 26, 2023
efb3b85
docs
LakshSingla Jul 27, 2023
2782636
add tests, comments, validation
LakshSingla Aug 1, 2023
86ad32a
add coverage
LakshSingla Aug 2, 2023
d2b68f3
more test coverage, review
LakshSingla Aug 3, 2023
ef18323
tests fix
LakshSingla Aug 3, 2023
f87266d
fix import
LakshSingla Aug 3, 2023
9cc6bf3
add more tests
LakshSingla Aug 4, 2023
7e138ef
fixup tests
LakshSingla Aug 4, 2023
eafe836
more tests
LakshSingla Aug 6, 2023
47aeae6
more coverage
LakshSingla Aug 6, 2023
08cecb0
refactor, add comments
LakshSingla Aug 8, 2023
56062f4
Merge branch 'master' into azure-storage-connector
LakshSingla Aug 8, 2023
b458bd6
docs
LakshSingla Aug 8, 2023
850cb94
spellcheck
LakshSingla Aug 8, 2023
8639351
create dir before checking for permissions
LakshSingla Aug 8, 2023
575e2ec
check fix
LakshSingla Aug 8, 2023
e1da797
build fix
LakshSingla Aug 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,20 +349,35 @@ SQL-based ingestion supports using durable storage to store intermediate files t

### Durable storage configurations

The following common service properties control how durable storage behaves:
Durable storage is supported on Amazon S3 storage and Microsoft's Azure storage. There are a few common configurations that controls the behavior for both the services as documented below. Apart from the common configurations,
there are a few properties specific to each storage that must be set.

Common properties to configure the behavior of durable storage

|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
|`druid.msq.intermediate.storage.enable` | true | Required. Whether to enable durable storage for the cluster. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md).|
|`druid.msq.intermediate.storage.type` | `s3` for Amazon S3 | Required. The type of storage to use. `s3` is the only supported storage type. |
|`druid.msq.intermediate.storage.bucket` | n/a | The S3 bucket to store intermediate files. |
|`druid.msq.intermediate.storage.prefix` | n/a | S3 prefix to store intermediate stage results. Provide a unique value for the prefix. Don't share the same prefix between clusters. If the location includes other files or directories, then they will get cleaned up as well. |
|`druid.msq.intermediate.storage.tempDir`| n/a | Required. Directory path on the local disk to temporarily store intermediate stage results. |
|`druid.msq.intermediate.storage.enable` | false | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md).|
|`druid.msq.intermediate.storage.type` | n/a | Required. The type of storage to use. Set it to `s3` for S3 and `azure` for Azure |
|`druid.msq.intermediate.storage.tempDir`| n/a | Required. Directory path on the local disk to store temporary files required while uploading and downloading the data |
|`druid.msq.intermediate.storage.maxRetry` | 10 | Optional. Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. |
|`druid.msq.intermediate.storage.chunkSize` | 100MiB | Optional. Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.|

Following properties need to be set in addition to the common properties to enable durable storage on S3

|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
|`druid.msq.intermediate.storage.bucket` | n/a | Required. The S3 bucket where the files are uploaded to and download from |
|`druid.msq.intermediate.storage.prefix` | n/a | Required. Path prepended to all the paths uploaded to the bucket to namespace the connector's files. Provide a unique value for the prefix and do not share the same prefix between different clusters. If the location includes other files or directories, then they might get cleaned up as well. |

Following properties must be set in addition to the common properties to enable durable storage on Azure.

|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
|`druid.msq.intermediate.storage.container` | n/a | Required. The Azure container where the files are uploaded to and downloaded from. |
|`druid.msq.intermediate.storage.prefix` | n/a | Required. Path prepended to all the paths uploaded to the container to namespace the connector's files. Provide a unique value for the prefix and do not share the same prefix between different clusters. If the location includes other files or directories, then they might get cleaned up as well. |

In addition to the common service properties, there are certain properties that you configure on the Overlord specifically to clean up intermediate files:
Durable storage creates files on the remote storage and is cleaned up once the job no longer requires those files. However, due to failures causing abrupt exit of the tasks, these files might not get cleaned up.
Therefore, there are certain properties that you configure on the Overlord specifically to clean up intermediate files for the tasks that have completed and would no longer require these files:

|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
Expand Down
13 changes: 12 additions & 1 deletion extensions-core/azure-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-assistedinject</artifactId>
<version>${guice.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -152,6 +152,17 @@
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> pre
public long getObjectSize(CloudObjectLocation location)
{
try {
final CloudBlob blobWithAttributes = storage.getBlobReferenceWithAttributes(
final CloudBlob blobWithAttributes = storage.getBlockBlobReferenceWithAttributes(
location.getBucket(),
location.getPath()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public InputStream openStream() throws IOException
public InputStream openStream(long offset) throws IOException
{
try {
return azureStorage.getBlobInputStream(offset, containerName, blobPath);
return azureStorage.getBlockBlobInputStream(offset, containerName, blobPath);
}
catch (StorageException | URISyntaxException e) {
if (AzureUtils.AZURE_RETRY.apply(e)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ DataSegment uploadDataSegment(
)
throws StorageException, IOException, URISyntaxException
{
azureStorage.uploadBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath);
azureStorage.uploadBlockBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath);

final DataSegment outSegment = segment
.withSize(size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,26 @@
import com.google.common.base.Supplier;
import com.microsoft.azure.storage.ResultContinuation;
import com.microsoft.azure.storage.ResultSegment;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;

import javax.annotation.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
Expand All @@ -48,6 +55,9 @@ public class AzureStorage
{
private static final boolean USE_FLAT_BLOB_LISTING = true;

// Default value from Azure library
private static final int DELTA_BACKOFF_MS = 30_000;

private static final Logger log = new Logger(AzureStorage.class);

/**
Expand All @@ -70,14 +80,28 @@ public AzureStorage(

public List<String> emptyCloudBlobDirectory(final String containerName, final String virtualDirPath)
throws StorageException, URISyntaxException
{
return emptyCloudBlobDirectory(containerName, virtualDirPath, null);
}

public List<String> emptyCloudBlobDirectory(final String containerName, final String virtualDirPath, final Integer maxAttempts)
throws StorageException, URISyntaxException
{
List<String> deletedFiles = new ArrayList<>();
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);

for (ListBlobItem blobItem : container.listBlobs(virtualDirPath, true, null, null, null)) {
Iterable<ListBlobItem> blobItems = container.listBlobs(
virtualDirPath,
USE_FLAT_BLOB_LISTING,
null,
getRequestOptionsWithRetry(maxAttempts),
null
);

for (ListBlobItem blobItem : blobItems) {
CloudBlob cloudBlob = (CloudBlob) blobItem;
log.info("Removing file[%s] from Azure.", cloudBlob.getName());
if (cloudBlob.deleteIfExists()) {
log.debug("Removing file[%s] from Azure.", cloudBlob.getName());
if (cloudBlob.deleteIfExists(DeleteSnapshotsOption.NONE, null, getRequestOptionsWithRetry(maxAttempts), null)) {
deletedFiles.add(cloudBlob.getName());
}
}
Expand All @@ -89,7 +113,7 @@ public List<String> emptyCloudBlobDirectory(final String containerName, final St
return deletedFiles;
}

public void uploadBlob(final File file, final String containerName, final String blobPath)
public void uploadBlockBlob(final File file, final String containerName, final String blobPath)
throws IOException, StorageException, URISyntaxException
{
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
Expand All @@ -98,36 +122,127 @@ public void uploadBlob(final File file, final String containerName, final String
}
}

public CloudBlob getBlobReferenceWithAttributes(final String containerName, final String blobPath)
public OutputStream getBlockBlobOutputStream(
final String containerName,
final String blobPath,
@Nullable final Integer streamWriteSizeBytes,
Integer maxAttempts
) throws URISyntaxException, StorageException
{
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
CloudBlockBlob blockBlobReference = container.getBlockBlobReference(blobPath);

if (blockBlobReference.exists()) {
throw new RE("Reference already exists");
}

if (streamWriteSizeBytes != null) {
blockBlobReference.setStreamWriteSizeInBytes(streamWriteSizeBytes);
}

return blockBlobReference.openOutputStream(null, getRequestOptionsWithRetry(maxAttempts), null);

}

public CloudBlob getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
final CloudBlockBlob blobReference = getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath);
blobReference.downloadAttributes();
return blobReference;
}

public long getBlobLength(final String containerName, final String blobPath)
public long getBlockBlobLength(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength();
}

public InputStream getBlockBlobInputStream(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
return getBlockBlobInputStream(0L, containerName, blobPath);
}

public InputStream getBlockBlobInputStream(long offset, final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
return getBlockBlobInputStream(offset, null, containerName, blobPath);
}

public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
return getBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength();
return getBlockBlobInputStream(offset, length, containerName, blobPath, null);
}

public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath, Integer maxAttempts)
throws URISyntaxException, StorageException
{
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
return container.getBlockBlobReference(blobPath)
.openInputStream(offset, length, null, getRequestOptionsWithRetry(maxAttempts), null);
}

public InputStream getBlobInputStream(final String containerName, final String blobPath)
public void batchDeleteFiles(String containerName, Iterable<String> paths, Integer maxAttempts)
throws URISyntaxException, StorageException
{
return getBlobInputStream(0L, containerName, blobPath);
CloudBlobContainer cloudBlobContainer = getOrCreateCloudBlobContainer(containerName);
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved
BlobDeleteBatchOperation blobDeleteBatchOperation = new BlobDeleteBatchOperation();
for (String path : paths) {
CloudBlob blobReference = cloudBlobContainer.getBlockBlobReference(path);
blobDeleteBatchOperation.addSubOperation(blobReference);
}
cloudBlobClient.get().executeBatch(blobDeleteBatchOperation, getRequestOptionsWithRetry(maxAttempts), null);
}

public InputStream getBlobInputStream(long offset, final String containerName, final String blobPath)
public List<String> listDir(final String containerName, final String virtualDirPath)
throws URISyntaxException, StorageException
{
return listDir(containerName, virtualDirPath, null);
}

public List<String> listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts)
throws StorageException, URISyntaxException
{
List<String> files = new ArrayList<>();
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
return container.getBlockBlobReference(blobPath).openInputStream(offset, null, null, null, null);

for (ListBlobItem blobItem :
container.listBlobs(virtualDirPath, USE_FLAT_BLOB_LISTING, null, getRequestOptionsWithRetry(maxAttempts), null)) {
CloudBlob cloudBlob = (CloudBlob) blobItem;
files.add(cloudBlob.getName());
}

return files;
}

public boolean getBlobExists(String container, String blobPath) throws URISyntaxException, StorageException
public boolean getBlockBlobExists(String container, String blobPath) throws URISyntaxException, StorageException
{
return getBlockBlobExists(container, blobPath, null);
}


public boolean getBlockBlobExists(String container, String blobPath, Integer maxAttempts)
throws URISyntaxException, StorageException
{
return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath).exists();
return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath)
.exists(null, getRequestOptionsWithRetry(maxAttempts), null);
}

/**
* If maxAttempts is provided, this method returns request options with retry built in.
* Retry backoff is exponential backoff, with maxAttempts set to the one provided
*/
@Nullable
private BlobRequestOptions getRequestOptionsWithRetry(Integer maxAttempts)
{
if (maxAttempts == null) {
return null;
}
BlobRequestOptions requestOptions = new BlobRequestOptions();
requestOptions.setRetryPolicyFactory(new RetryExponentialRetry(DELTA_BACKOFF_MS, maxAttempts));
return requestOptions;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
public class AzureStorageDruidModule implements DruidModule
{

static final String SCHEME = "azure";
public static final String SCHEME = "azure";
public static final String
STORAGE_CONNECTION_STRING_WITH_KEY = "DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s";
public static final String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void pushTaskFile(final File logFile, String taskKey)
try {
AzureUtils.retryAzureOperation(
() -> {
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey);
azureStorage.uploadBlockBlob(logFile, config.getContainer(), taskKey);
return null;
},
config.getMaxTries()
Expand Down Expand Up @@ -129,12 +129,12 @@ private Optional<InputStream> streamTaskFile(final String taskid, final long off
{
final String container = config.getContainer();
try {
if (!azureStorage.getBlobExists(container, taskKey)) {
if (!azureStorage.getBlockBlobExists(container, taskKey)) {
return Optional.absent();
}
try {
final long start;
final long length = azureStorage.getBlobLength(container, taskKey);
final long length = azureStorage.getBlockBlobLength(container, taskKey);

if (offset > 0 && offset < length) {
start = offset;
Expand All @@ -144,7 +144,7 @@ private Optional<InputStream> streamTaskFile(final String taskid, final long off
start = 0;
}

InputStream stream = azureStorage.getBlobInputStream(container, taskKey);
InputStream stream = azureStorage.getBlockBlobInputStream(container, taskKey);
stream.skip(start);

return Optional.of(stream);
Expand Down
Loading