-
Notifications
You must be signed in to change notification settings - Fork 490
/
Copy pathazureservice.go
311 lines (269 loc) · 10.3 KB
/
azureservice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
// Package azurestore provides a Azure Blob Storage based backend
// AzureStore is a storage backend that uses the AzService interface in order to store uploads in Azure Blob Storage.
// It stores the uploads in a container specified in two different BlockBlob: The `[id].info` blobs are used to store the fileinfo in JSON format. The `[id]` blobs without an extension contain the raw binary data uploaded.
// If the upload is not finished within a week, the uncommited blocks will be discarded.
// Support for setting the default Continaer access type and Blob access tier varies on your Azure Storage Account and its limits.
// More information about Container access types and limts
// https://docs.microsoft.com/en-us/azure/storage/blobs/anonymous-read-access-configure?tabs=portal
// More information about Blob access tiers and limits
// https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-performance-tiers
// https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview#access-tiers-for-block-blob-data
package azurestore
import (
"context"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"io"
"sort"
"strings"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/tus/tusd/v2/pkg/handler"
)
const (
InfoBlobSuffix string = ".info"
MaxBlockBlobSize int64 = blockblob.MaxBlocks * blockblob.MaxStageBlockBytes
MaxBlockBlobChunkSize int64 = blockblob.MaxStageBlockBytes
)
type azService struct {
ContainerClient *container.Client
ContainerName string
BlobAccessTier *blob.AccessTier
}
type AzService interface {
NewBlob(ctx context.Context, name string) (AzBlob, error)
}
type AzConfig struct {
AccountName string
AccountKey string
BlobAccessTier string
ContainerName string
ContainerAccessType string
Endpoint string
}
type AzBlob interface {
// Delete the blob
Delete(ctx context.Context) error
// Upload the blob
Upload(ctx context.Context, body io.ReadSeeker) error
// Download returns a readcloser to download the contents of the blob
Download(ctx context.Context) (io.ReadCloser, error)
// Get the offset of the blob and its indexes
GetOffset(ctx context.Context) (int64, error)
// Commit the uploaded blocks to the BlockBlob
Commit(ctx context.Context) error
}
type BlockBlob struct {
BlobClient *blockblob.Client
Indexes []int
BlobAccessTier *blob.AccessTier
}
type InfoBlob struct {
BlobClient *blockblob.Client
}
// New Azure service for communication to Azure BlockBlob Storage API
func NewAzureService(config *AzConfig) (AzService, error) {
// struct to store your credentials.
cred, err := azblob.NewSharedKeyCredential(config.AccountName, config.AccountKey)
if err != nil {
return nil, err
}
serviceURL := fmt.Sprintf("%s/%s", config.Endpoint, config.ContainerName)
retryOpts := policy.RetryOptions{
MaxRetries: 5,
RetryDelay: 100, // Retry after 100ms initially
MaxRetryDelay: 5000, // Max retry delay 5 seconds
}
containerClient, err := container.NewClientWithSharedKeyCredential(serviceURL, cred, &container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: retryOpts,
},
})
if err != nil {
return nil, err
}
containerCreateOptions := &container.CreateOptions{}
switch config.ContainerAccessType {
case "container":
containerCreateOptions.Access = to.Ptr(container.PublicAccessTypeContainer)
case "blob":
containerCreateOptions.Access = to.Ptr(container.PublicAccessTypeBlob)
default:
// Leaving Access nil will default to private access
}
_, err = containerClient.Create(context.Background(), containerCreateOptions)
if err != nil && !strings.Contains(err.Error(), "ContainerAlreadyExists") {
return nil, err
}
// Does not support the premium access tiers yet.
var blobAccessTier *blob.AccessTier
switch config.BlobAccessTier {
case "archive":
blobAccessTier = to.Ptr(blob.AccessTierArchive)
case "cool":
blobAccessTier = to.Ptr(blob.AccessTierCool)
case "hot":
blobAccessTier = to.Ptr(blob.AccessTierHot)
}
return &azService{
ContainerClient: containerClient,
ContainerName: config.ContainerName,
BlobAccessTier: blobAccessTier,
}, nil
}
// Determine if we return a InfoBlob or BlockBlob, based on the name
func (service *azService) NewBlob(ctx context.Context, name string) (AzBlob, error) {
blobClient := service.ContainerClient.NewBlockBlobClient(name)
if strings.HasSuffix(name, InfoBlobSuffix) {
return &InfoBlob{BlobClient: blobClient}, nil
}
return &BlockBlob{
BlobClient: blobClient,
Indexes: []int{},
BlobAccessTier: service.BlobAccessTier,
}, nil
}
// Delete the blockBlob from Azure Blob Storage
func (blockBlob *BlockBlob) Delete(ctx context.Context) error {
// Specify that you want to delete both the blob and its snapshots
deleteOptions := &azblob.DeleteBlobOptions{
DeleteSnapshots: to.Ptr(azblob.DeleteSnapshotsOptionTypeInclude),
}
_, err := blockBlob.BlobClient.Delete(ctx, deleteOptions)
return err
}
// Upload a block to Azure Blob Storage and add it to the indexes to be after upload is finished
func (blockBlob *BlockBlob) Upload(ctx context.Context, body io.ReadSeeker) error {
// Keep track of the indexes
var index int
if len(blockBlob.Indexes) == 0 {
index = 0
} else {
index = blockBlob.Indexes[len(blockBlob.Indexes)-1] + 1
}
blockBlob.Indexes = append(blockBlob.Indexes, index)
blockID := blockIDIntToBase64(index)
readSeekCloserBody := readSeekCloser{body}
_, err := blockBlob.BlobClient.StageBlock(ctx, blockID, readSeekCloserBody, nil)
return err
}
// Download the blockBlob from Azure Blob Storage
func (blockBlob *BlockBlob) Download(ctx context.Context) (io.ReadCloser, error) {
resp, err := blockBlob.BlobClient.DownloadStream(ctx, nil)
if err != nil {
return nil, checkForNotFoundError(err)
}
return resp.Body, nil
}
func (blockBlob *BlockBlob) GetOffset(ctx context.Context) (int64, error) {
// Get the offset of the file from azure storage
// For the blob, show each block (ID and size) that is a committed part of it.
var indexes []int
var offset int64
resp, err := blockBlob.BlobClient.GetBlockList(ctx, blockblob.BlockListTypeAll, nil)
if err != nil {
return 0, checkForNotFoundError(err)
}
// Need committed blocks to be added to offset to know how big the file really is
for _, block := range resp.CommittedBlocks {
offset += *block.Size
indexes = append(indexes, blockIDBase64ToInt(block.Name))
}
// Need to get the uncommitted blocks so that we can commit them
for _, block := range resp.UncommittedBlocks {
offset += *block.Size
indexes = append(indexes, blockIDBase64ToInt(block.Name))
}
// Sort the block IDs in ascending order. This is required as Azure returns the block lists alphabetically
// and we store the indexes as base64 encoded ints.
sort.Ints(indexes)
blockBlob.Indexes = indexes
return offset, nil
}
// After all the blocks have been uploaded, we commit the unstaged blocks by sending a Block List
func (blockBlob *BlockBlob) Commit(ctx context.Context) error {
base64BlockIDs := make([]string, len(blockBlob.Indexes))
for i, id := range blockBlob.Indexes {
base64BlockIDs[i] = blockIDIntToBase64(id)
}
_, err := blockBlob.BlobClient.CommitBlockList(ctx, base64BlockIDs, &blockblob.CommitBlockListOptions{
Tier: blockBlob.BlobAccessTier,
})
return err
}
// Delete the infoBlob from Azure Blob Storage
func (infoBlob *InfoBlob) Delete(ctx context.Context) error {
_, err := infoBlob.BlobClient.Delete(ctx, nil)
return err
}
// Upload the infoBlob to Azure Blob Storage
// Because the info file is presumed to be smaller than azblob.BlockBlobMaxUploadBlobBytes (256MiB), we can upload it all in one go
// New uploaded data will create a new, or overwrite the existing block blob
func (infoBlob *InfoBlob) Upload(ctx context.Context, body io.ReadSeeker) error {
_, err := infoBlob.BlobClient.UploadStream(ctx, body, nil)
return err
}
// Download the infoBlob from Azure Blob Storage
func (infoBlob *InfoBlob) Download(ctx context.Context) (io.ReadCloser, error) {
resp, err := infoBlob.BlobClient.DownloadStream(ctx, nil)
if err != nil {
return nil, checkForNotFoundError(err)
}
return resp.Body, nil
}
// infoBlob does not utilise offset, so just return 0, nil
func (infoBlob *InfoBlob) GetOffset(ctx context.Context) (int64, error) {
return 0, nil
}
// infoBlob does not have uncommited blocks, so just return nil
func (infoBlob *InfoBlob) Commit(ctx context.Context) error {
return nil
}
// === Helper Functions ===
// These helper functions convert a binary block ID to a base-64 string and vice versa
// NOTE: The blockID must be <= 64 bytes and ALL blockIDs for the block must be the same length
func blockIDBinaryToBase64(blockID []byte) string {
return base64.StdEncoding.EncodeToString(blockID)
}
func blockIDBase64ToBinary(blockID *string) []byte {
binary, _ := base64.StdEncoding.DecodeString(*blockID)
return binary
}
// These helper functions convert an int block ID to a base-64 string and vice versa
func blockIDIntToBase64(blockID int) string {
binaryBlockID := (&[4]byte{})[:] // All block IDs are 4 bytes long
binary.LittleEndian.PutUint32(binaryBlockID, uint32(blockID))
return blockIDBinaryToBase64(binaryBlockID)
}
func blockIDBase64ToInt(blockID *string) int {
blockIDBase64ToBinary(blockID)
return int(binary.LittleEndian.Uint32(blockIDBase64ToBinary(blockID)))
}
// readSeekCloser is a wrapper that adds a no-op Close method to an io.ReadSeeker.
type readSeekCloser struct {
io.ReadSeeker
}
// Close implements io.Closer for readSeekCloser.
func (rsc readSeekCloser) Close() error {
return nil
}
// checkForNotFoundError checks if the error indicates that a resource was not found.
// If so, we return the corresponding tusd error.
func checkForNotFoundError(err error) error {
var azureError *azcore.ResponseError
if errors.As(err, &azureError) {
code := bloberror.Code(azureError.ErrorCode)
if code == bloberror.BlobNotFound || azureError.StatusCode == 404 {
return handler.ErrNotFound
}
}
return err
}