Skip to content

Commit

Permalink
Optimize downloading of chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
corny committed Aug 16, 2023
1 parent c4f6e57 commit ae551f8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 78 deletions.
128 changes: 52 additions & 76 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"sync/atomic"
)

const (
Expand Down Expand Up @@ -337,19 +337,8 @@ func (c *Client) GetStreamContext(ctx context.Context, video *Video, format *For
contentLength = c.downloadOnce(req, w, format)
} else {
// we have length information, let's download by chunks!
data, err := c.downloadChunked(ctx, req, format)
if err != nil {
return nil, 0, err
}

go func() {
if _, err := w.Write(data); err != nil {
w.CloseWithError(err)
return
}
c.downloadChunked(ctx, req, w, format)

w.Close() //nolint:errcheck
}()
}

return r, contentLength, nil
Expand Down Expand Up @@ -378,11 +367,6 @@ func (c *Client) downloadOnce(req *http.Request, w *io.PipeWriter, _ *Format) in
return length
}

type chunkData struct {
index int
data []byte
}

func (c *Client) getChunkSize() int64 {
if c.ChunkSize > 0 {
return c.ChunkSize
Expand All @@ -405,71 +389,54 @@ func (c *Client) getMaxRoutines(limit int) int {
return routines
}

func (c *Client) downloadChunked(ctx context.Context, req *http.Request, format *Format) ([]byte, error) {
func (c *Client) downloadChunked(ctx context.Context, req *http.Request, w *io.PipeWriter, format *Format) {
chunks := getChunks(format.ContentLength, c.getChunkSize())
maxRoutines := c.getMaxRoutines(len(chunks))

chunkChan := make(chan chunk, len(chunks))
chunkDataChan := make(chan chunkData, len(chunks))
errChan := make(chan error, 1)

for _, c := range chunks {
chunkChan <- c
cancelCtx, cancel := context.WithCancel(ctx)
abort := func(err error) {
w.CloseWithError(err)
cancel()
}
close(chunkChan)

var wg sync.WaitGroup

currentChunk := atomic.Uint32{}
for i := 0; i < maxRoutines; i++ {
wg.Add(1)

go func() {
defer wg.Done()

for {
select {
case <-ctx.Done():
errChan <- context.DeadlineExceeded
return
case ch, open := <-chunkChan:
if !open {
return
}

data, err := c.downloadChunk(req.Clone(ctx), ch)
if err != nil {
errChan <- err
return
}

chunkDataChan <- chunkData{ch.index, data}
}
i := int(currentChunk.Add(1)) - 1
if i > len(chunks) {
// no more chunks
return
}
}()
}
wg.Wait()

close(errChan)
close(chunkDataChan)
chunk := &chunks[i]
err := c.downloadChunk(req.Clone(cancelCtx), chunk)
close(chunk.data)

for err := range errChan {
if err != nil {
return nil, err
}
if err != nil {
abort(err)
return
}
}()
}

chunkDatas := make([]chunkData, len(chunks))

for cd := range chunkDataChan {
chunkDatas[cd.index] = cd
}
go func() {
// copy chunks into the PipeWriter
for i := 0; i < len(chunks); i++ {
select {
case <-cancelCtx.Done():
return
case data := <-chunks[i].data:
_, err := io.Copy(w, bytes.NewBuffer(data))

data := make([]byte, 0, format.ContentLength)
for _, chunk := range chunkDatas {
data = append(data, chunk.data...)
}
if err != nil {
abort(err)
}
}
}

return data, nil
// everything succeeded
w.Close()
}()
}

// GetStreamURL returns the url for a specific format
Expand Down Expand Up @@ -615,28 +582,37 @@ func (c *Client) httpPostBodyBytes(ctx context.Context, url string, body interfa
return io.ReadAll(resp.Body)
}

// downloadChunk returns the chunk bytes.
// downloadChunk writes the response data into the data channel of the chunk.
// Downloading in multiple chunks is much faster:
// https://github.com/kkdai/youtube/pull/190
func (c *Client) downloadChunk(req *http.Request, chunk chunk) ([]byte, error) {
func (c *Client) downloadChunk(req *http.Request, chunk *chunk) error {
q := req.URL.Query()
q.Set("range", fmt.Sprintf("%d-%d", chunk.start, chunk.end))
req.URL.RawQuery = q.Encode()

resp, err := c.httpDo(req)
if err != nil {
return nil, ErrUnexpectedStatusCode(resp.StatusCode)
return ErrUnexpectedStatusCode(resp.StatusCode)
}
defer resp.Body.Close()

if resp.StatusCode < http.StatusOK && resp.StatusCode >= 300 {
return nil, ErrUnexpectedStatusCode(resp.StatusCode)
return ErrUnexpectedStatusCode(resp.StatusCode)
}

b, err := io.ReadAll(resp.Body)
expected := int(chunk.end-chunk.start) + 1
data, err := io.ReadAll(resp.Body)
n := len(data)

if err != nil {
return nil, fmt.Errorf("read chunk body: %w", err)
return err
}

if n != expected {
return fmt.Errorf("chunk at offset %d has invalid size: expected=%d actual=%d", chunk.start, expected, n)
}

return b, nil
chunk.data <- data

return nil
}
4 changes: 2 additions & 2 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
)

type chunk struct {
index int
start int64
end int64
data chan []byte
}

func getChunks(totalSize, chunkSize int64) []chunk {
Expand All @@ -21,7 +21,7 @@ func getChunks(totalSize, chunkSize int64) []chunk {
end = totalSize - 1
}

chunks = append(chunks, chunk{i, start, end})
chunks = append(chunks, chunk{start, end, make(chan []byte)})
}

return chunks
Expand Down

0 comments on commit ae551f8

Please sign in to comment.