Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AzDatalake] File Client Upload/Download Support #21261

Merged
merged 62 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
e8167a2
Enable gocritic during linting (#20715)
jhendrixMSFT Apr 28, 2023
86627ae
Cosmos DB: Enable merge support (#20716)
ealsur Apr 28, 2023
8ac8c6d
[azservicebus, azeventhubs] Stress test and logging improvement (#20710)
richardpark-msft May 1, 2023
9111616
update proxy version (#20712)
azure-sdk May 1, 2023
d6bf190
Return an error when you try to send a message that's too large. (#20…
richardpark-msft May 1, 2023
e2693bd
Changes in test that is failing in pipeline (#20693)
siminsavani-msft May 2, 2023
03f0ac3
[azservicebus, azeventhubs] Treat 'entity full' as a fatal error (#20…
richardpark-msft May 2, 2023
838842d
[azservicebus/azeventhubs] Redirect stderr and stdout to tee (#20726)
richardpark-msft May 3, 2023
20b4dd8
Update changelog with latest features (#20730)
jhendrixMSFT May 3, 2023
745d967
pass along the artifact name so we can override it later (#20732)
azure-sdk May 3, 2023
6dfd0cb
[azeventhubs] Fixing checkpoint store race condition (#20727)
richardpark-msft May 3, 2023
ed7f3c7
Fix azidentity troubleshooting guide link (#20736)
chlowell May 3, 2023
b2cddab
[Release] sdk/resourcemanager/paloaltonetworksngfw/armpanngfw/0.1.0 (…
Alancere May 4, 2023
2a8d96d
add sdk/resourcemanager/postgresql/armpostgresql live test (#20685)
Alancere May 4, 2023
0d22aed
add sdk/resourcemanager/eventhub/armeventhub live test (#20686)
Alancere May 4, 2023
5fa7df4
add sdk/resourcemanager/compute/armcompute live test (#20048)
Alancere May 4, 2023
c005ed6
sdk/resourcemanager/network/armnetwork live test (#20331)
Alancere May 4, 2023
36f766d
add sdk/resourcemanager/cosmos/armcosmos live test (#20705)
Alancere May 4, 2023
9c9d62a
Increment package version after release of azcore (#20740)
azure-sdk May 4, 2023
8bc3450
[azeventhubs] Improperly resetting etag in the checkpoint store (#20737)
richardpark-msft May 4, 2023
e1a6152
Eng workflows sync and branch cleanup additions (#20743)
azure-sdk May 4, 2023
04b463d
[azeventhubs] Latest start position can also be inclusive (ie, get th…
richardpark-msft May 4, 2023
8849196
Update GitHubEventProcessor version and remove pull_request_review pr…
azure-sdk May 5, 2023
27f5ee0
Rename DisableAuthorityValidationAndInstanceDiscovery (#20746)
chlowell May 5, 2023
2eec707
fix (#20707)
Alancere May 6, 2023
22db2d4
AzFile (#20739)
souravgupta-msft May 8, 2023
0cbfd88
azfile: Fixing connection string parsing logic (#20798)
souravgupta-msft May 8, 2023
d54fb08
[azadmin] fix flaky test (#20758)
gracewilcox May 8, 2023
ad8ebd9
Prepare azidentity v1.3.0 for release (#20756)
chlowell May 8, 2023
e2a6f70
Fix broken podman link (#20801)
azure-sdk May 8, 2023
a59d912
[azquery] update doc comments (#20755)
gracewilcox May 8, 2023
bd3b467
Fixed contribution section (#20752)
bobtabor-msft May 8, 2023
132a01a
[azeventhubs,azservicebus] Some API cleanup, renames (#20754)
richardpark-msft May 8, 2023
8db51ca
Add supporting features to enable distributed tracing (#20301) (#20708)
jhendrixMSFT May 9, 2023
4a66b4f
Restore ARM CAE support for azcore beta (#20657)
chlowell May 9, 2023
7d4a3cb
Upgrade to stable azcore (#20808)
chlowell May 9, 2023
068c3be
Increment package version after release of data/azcosmos (#20807)
azure-sdk May 9, 2023
8e0f66e
Updating changelog (#20810)
souravgupta-msft May 9, 2023
ce926c4
Add fake package to azcore (#20711)
jhendrixMSFT May 9, 2023
1a145c5
Updating CHANGELOG.md (#20809)
siminsavani-msft May 9, 2023
90dfc5c
changelog (#20811)
tasherif-msft May 9, 2023
c7eda59
Increment package version after release of storage/azfile (#20813)
azure-sdk May 9, 2023
7fac0b5
Update changelog (azblob) (#20815)
siminsavani-msft May 9, 2023
498a2ef
[azquery] migration guide (#20742)
gracewilcox May 9, 2023
ccb967e
Increment package version after release of monitor/azquery (#20820)
azure-sdk May 9, 2023
f4e6a22
[keyvault] prep for release (#20819)
gracewilcox May 10, 2023
8fd8eda
Merge branch 'main' into feature/azdatalake
tasherif-msft May 11, 2023
c94fa00
Merge remote-tracking branch 'upstream/feature/azdatalake' into featu…
tasherif-msft May 11, 2023
fc0b2b5
Merge remote-tracking branch 'upstream/feature/azdatalake' into featu…
tasherif-msft Jun 12, 2023
6fb1694
Merge remote-tracking branch 'upstream/feature/azdatalake' into featu…
tasherif-msft Jun 19, 2023
4f7fe43
Merge remote-tracking branch 'upstream/feature/azdatalake' into featu…
tasherif-msft Jun 26, 2023
3dac9d0
Merge remote-tracking branch 'upstream/feature/azdatalake' into featu…
tasherif-msft Jul 4, 2023
a0a861b
Merge remote-tracking branch 'upstream/feature/azdatalake' into featu…
tasherif-msft Jul 7, 2023
124e27e
Merge remote-tracking branch 'upstream/feature/azdatalake' into featu…
tasherif-msft Jul 19, 2023
0f5a52c
Merge remote-tracking branch 'upstream/feature/azdatalake' into featu…
tasherif-msft Jul 24, 2023
25295fc
added all upload methods
tasherif-msft Jul 25, 2023
1e62e9a
added more tests for upload stream
tasherif-msft Jul 25, 2023
5d6eb85
added more tests
tasherif-msft Jul 25, 2023
5049137
added downloaders
tasherif-msft Jul 26, 2023
efd1cc3
added more tests
tasherif-msft Jul 26, 2023
098208d
cleanup
tasherif-msft Jul 27, 2023
9a10991
feedback
tasherif-msft Jul 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions sdk/storage/azdatalake/file/chunkwriting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
//go:build go1.18
// +build go1.18

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

package file

import (
"bytes"
"context"
"errors"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"io"
"sync"
)

// chunkWriter provides methods to upload chunks that represent a file to a server.
// This allows us to provide a local implementation that fakes the server for hermetic testing.
type chunkWriter interface {
AppendData(context.Context, int64, io.ReadSeekCloser, *AppendDataOptions) (AppendDataResponse, error)
FlushData(context.Context, int64, *FlushDataOptions) (FlushDataResponse, error)
}

// bufferManager provides an abstraction for the management of buffers.
// this is mostly for testing purposes, but does allow for different implementations without changing the algorithm.
type bufferManager[T ~[]byte] interface {
// Acquire returns the channel that contains the pool of buffers.
Acquire() <-chan T

// Release releases the buffer back to the pool for reuse/cleanup.
Release(T)

// Grow grows the number of buffers, up to the predefined max.
// It returns the total number of buffers or an error.
// No error is returned if the number of buffers has reached max.
// This is called only from the reading goroutine.
Grow() (int, error)

// Free cleans up all buffers.
Free()
}

// copyFromReader copies a source io.Reader to file storage using concurrent uploads.
func copyFromReader[T ~[]byte](ctx context.Context, src io.Reader, dst chunkWriter, options UploadStreamOptions, getBufferManager func(maxBuffers int, bufferSize int64) bufferManager[T]) error {
options.setDefaults()
actualSize := int64(0)
wg := sync.WaitGroup{} // Used to know when all outgoing chunks have finished processing
errCh := make(chan error, 1) // contains the first error encountered during processing
var err error

buffers := getBufferManager(int(options.Concurrency), options.ChunkSize)
defer buffers.Free()

// this controls the lifetime of the uploading goroutines.
// if an error is encountered, cancel() is called which will terminate all uploads.
// NOTE: the ordering is important here. cancel MUST execute before
// cleaning up the buffers so that any uploading goroutines exit first,
// releasing their buffers back to the pool for cleanup.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// This goroutine grabs a buffer, reads from the stream into the buffer,
// then creates a goroutine to upload/stage the chunk.
for chunkNum := uint32(0); true; chunkNum++ {
var buffer T
select {
case buffer = <-buffers.Acquire():
// got a buffer
default:
// no buffer available; allocate a new buffer if possible
if _, err := buffers.Grow(); err != nil {
return err
}

// either grab the newly allocated buffer or wait for one to become available
buffer = <-buffers.Acquire()
}

var n int
n, err = io.ReadFull(src, buffer)

if n > 0 {
// some data was read, upload it
wg.Add(1) // We're posting a buffer to be sent

// NOTE: we must pass chunkNum as an arg to our goroutine else
// it's captured by reference and can change underneath us!
go func(chunkNum uint32) {
// Upload the outgoing chunk, matching the number of bytes read
offset := int64(chunkNum) * options.ChunkSize
appendDataOpts := options.getAppendDataOptions()
actualSize += int64(len(buffer[:n]))
_, err := dst.AppendData(ctx, offset, streaming.NopCloser(bytes.NewReader(buffer[:n])), appendDataOpts)
if err != nil {
select {
case errCh <- err:
// error was set
default:
// some other error is already set
}
cancel()
}
buffers.Release(buffer) // The goroutine reading from the stream can reuse this buffer now

// signal that the chunk has been staged.
// we MUST do this after attempting to write to errCh
// to avoid it racing with the reading goroutine.
wg.Done()
}(chunkNum)
} else {
// nothing was read so the buffer is empty, send it back for reuse/clean-up.
buffers.Release(buffer)
}

if err != nil { // The reader is done, no more outgoing buffers
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
// these are expected errors, we don't surface those
err = nil
} else {
// some other error happened, terminate any outstanding uploads
cancel()
}
break
}
}

wg.Wait() // Wait for all outgoing chunks to complete

if err != nil {
// there was an error reading from src, favor this error over any error during staging
return err
}

select {
case err = <-errCh:
// there was an error during staging
return err
default:
// no error was encountered
}

// All chunks uploaded, return nil error
flushOpts := options.getFlushDataOptions()
_, err = dst.FlushData(ctx, actualSize, flushOpts)
return err
}

// mmbPool implements the bufferManager interface.
// it uses anonymous memory mapped files for buffers.
// don't use this type directly, use newMMBPool() instead.
type mmbPool struct {
buffers chan mmb
count int
max int
size int64
}

func newMMBPool(maxBuffers int, bufferSize int64) bufferManager[mmb] {
return &mmbPool{
buffers: make(chan mmb, maxBuffers),
max: maxBuffers,
size: bufferSize,
}
}

func (pool *mmbPool) Acquire() <-chan mmb {
return pool.buffers
}

func (pool *mmbPool) Grow() (int, error) {
if pool.count < pool.max {
buffer, err := newMMB(pool.size)
if err != nil {
return 0, err
}
pool.buffers <- buffer
pool.count++
}
return pool.count, nil
}

func (pool *mmbPool) Release(buffer mmb) {
pool.buffers <- buffer
}

func (pool *mmbPool) Free() {
for i := 0; i < pool.count; i++ {
buffer := <-pool.buffers
buffer.delete()
}
pool.count = 0
}
Loading