Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix artifact v4 upload above 8MB #31664

Merged
merged 18 commits into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion routers/api/actions/artifacts_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,49 @@ func listChunksByRunID(st storage.ObjectStorage, runID int64) (map[int64][]*chun
return chunksMap, nil
}

func listChunksByRunIDV4(st storage.ObjectStorage, runID int64, artifactID int64, blist *BlockList) ([]*chunkFileItem, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Syntetise a chunk list with the correct start/end and artifact entries, based on the xml Blocklist

storageDir := fmt.Sprintf("tmp%d", runID)
var chunks []*chunkFileItem
chunkMap := map[string]*chunkFileItem{}
dummy := &chunkFileItem{}
for _, name := range blist.Latest {
chunkMap[name] = dummy
}
if err := st.IterateObjects(storageDir, func(fpath string, obj storage.Object) error {
lunny marked this conversation as resolved.
Show resolved Hide resolved
baseName := filepath.Base(fpath)
if !strings.HasPrefix(baseName, "block-") {
return nil
}
// when read chunks from storage, it only contains storage dir and basename,
// no matter the subdirectory setting in storage config
item := chunkFileItem{Path: storageDir + "/" + baseName, ArtifactID: artifactID}
var size int64
var chunkName string
if _, err := fmt.Sscanf(baseName, "block-%d-%d-%s", &item.RunID, &size, &chunkName); err != nil {
return fmt.Errorf("parse content range error: %v", err)
}
item.End = item.Start + size - 1
if _, ok := chunkMap[chunkName]; ok {
chunkMap[chunkName] = &item
}
return nil
}); err != nil {
return nil, err
}
for i, name := range blist.Latest {
chunk, ok := chunkMap[name]
if !ok || chunk.Path == "" {
return nil, fmt.Errorf("missing Chunk (%d/%d): %s", i, len(blist.Latest), name)
}
chunks = append(chunks, chunk)
if i > 0 {
chunk.Start = chunkMap[blist.Latest[i-1]].End + 1
chunk.End += chunk.Start
}
}
return chunks, nil
}

func mergeChunksForRun(ctx *ArtifactContext, st storage.ObjectStorage, runID int64, artifactName string) error {
// read all db artifacts by name
artifacts, err := db.Find[actions.ActionArtifact](ctx, actions.FindArtifactsOptions{
Expand Down Expand Up @@ -230,7 +273,7 @@ func mergeChunksForArtifact(ctx *ArtifactContext, chunks []*chunkFileItem, st st
rawChecksum := hash.Sum(nil)
actualChecksum := hex.EncodeToString(rawChecksum)
if !strings.HasSuffix(checksum, actualChecksum) {
return fmt.Errorf("update artifact error checksum is invalid")
return fmt.Errorf("update artifact error checksum is invalid %v vs %v", checksum, actualChecksum)
}
}

Expand Down
119 changes: 91 additions & 28 deletions routers/api/actions/artifactsv4.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@ package actions
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=block
// 1.3. Continue Upload Zip Content to Blobstorage (unauthenticated request), repeat until everything is uploaded
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=appendBlock
// 1.4. Unknown xml payload to Blobstorage (unauthenticated request), ignored for now
// 1.4. BlockList xml payload to Blobstorage (unauthenticated request)
// Files of about 800MB are parallel in parallel and / or out of order, this file is needed to enshure the correct order
// PUT: http://localhost:3000/twirp/github.actions.results.api.v1.ArtifactService/UploadArtifact?sig=mO7y35r4GyjN7fwg0DTv3-Fv1NDXD84KLEgLpoPOtDI=&expires=2024-01-23+21%3A48%3A37.20833956+%2B0100+CET&artifactName=test&taskID=75&comp=blockList
// Request
// <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
// <BlockList>
// <Latest>blockId1</Latest>
// <Latest>blockId2</Latest>
// </BlockList>
// 1.5. FinalizeArtifact
// Post: /twirp/github.actions.results.api.v1.ArtifactService/FinalizeArtifact
// Request
Expand Down Expand Up @@ -82,6 +89,7 @@ import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/xml"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -295,33 +303,57 @@ func (r *artifactV4Routes) uploadArtifact(ctx *ArtifactContext) {
comp := ctx.Req.URL.Query().Get("comp")
switch comp {
case "block", "appendBlock":
// get artifact by name
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
if err != nil {
log.Error("Error artifact not found: %v", err)
ctx.Error(http.StatusNotFound, "Error artifact not found")
return
blockid := ctx.Req.URL.Query().Get("blockid")
if blockid == "" {
// get artifact by name
artifact, err := r.getArtifactByName(ctx, task.Job.RunID, artifactName)
if err != nil {
log.Error("Error artifact not found: %v", err)
ctx.Error(http.StatusNotFound, "Error artifact not found")
return
}

_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
if err != nil {
log.Error("Error runner api getting task: task is not running")
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
return
}
artifact.FileCompressedSize += ctx.Req.ContentLength
artifact.FileSize += ctx.Req.ContentLength
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
log.Error("Error UpdateArtifactByID: %v", err)
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
return
}
} else {
_, err := r.fs.Save(fmt.Sprintf("tmp%d/block-%d-%d-%s", task.Job.RunID, task.Job.RunID, ctx.Req.ContentLength, blockid), ctx.Req.Body, -1)
if err != nil {
log.Error("Error runner api getting task: task is not running")
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a blockid, delay ordering chunks to the end and use it's blockid to form the name

I notice that this might be a security issue, because an attacker could control the filesystem name, but what alternatives do we have?
Santizing is probably the way to go here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to base64url encoding of the blockid that doesn't allow to traverse the filesystem or do other bad things

}
}

_, err = appendUploadChunk(r.fs, ctx, artifact, artifact.FileSize, ctx.Req.ContentLength, artifact.RunID)
ctx.JSON(http.StatusCreated, "appended")
case "blocklist":
_, err := r.fs.Save(fmt.Sprintf("tmp%d/%d-blocklist", task.Job.RunID, task.Job.RunID), ctx.Req.Body, -1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we got the final block order by blockid in xml, save it for the merge step

if err != nil {
log.Error("Error runner api getting task: task is not running")
ctx.Error(http.StatusInternalServerError, "Error runner api getting task: task is not running")
return
}
artifact.FileCompressedSize += ctx.Req.ContentLength
artifact.FileSize += ctx.Req.ContentLength
if err := actions.UpdateArtifactByID(ctx, artifact.ID, artifact); err != nil {
log.Error("Error UpdateArtifactByID: %v", err)
ctx.Error(http.StatusInternalServerError, "Error UpdateArtifactByID")
return
}
ctx.JSON(http.StatusCreated, "appended")
case "blocklist":
ctx.JSON(http.StatusCreated, "created")
}
}

type BlockList struct {
Latest []string `xml:"Latest"`
}

type Latest struct {
Value string `xml:",chardata"`
}

func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
var req FinalizeArtifactRequest

Expand All @@ -340,18 +372,49 @@ func (r *artifactV4Routes) finalizeArtifact(ctx *ArtifactContext) {
ctx.Error(http.StatusNotFound, "Error artifact not found")
return
}
chunkMap, err := listChunksByRunID(r.fs, runID)

blockListName := fmt.Sprintf("tmp%d/%d-blocklist", runID, runID)
var chunks []*chunkFileItem
s, err := r.fs.Open(blockListName)
if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
chunks, ok := chunkMap[artifact.ID]
if !ok {
log.Error("Error merge chunks")
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
log.Warn("Error merge chunks: %v", err)
chunkMap, err := listChunksByRunID(r.fs, runID)
if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
chunks, ok = chunkMap[artifact.ID]
if !ok {
log.Error("Error merge chunks")
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My tests don't upload a blockmap yet, fallback try old broken mode

} else {
err = r.fs.Delete(blockListName)
if err != nil {
log.Warn("Failed to delete blockList %s: %v", blockListName, err)
}

xdec := xml.NewDecoder(s)
blockList := &BlockList{}
err = xdec.Decode(blockList)
if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
chunks, err = listChunksByRunIDV4(r.fs, runID, artifact.ID, blockList)

if err != nil {
log.Error("Error merge chunks: %v", err)
ctx.Error(http.StatusInternalServerError, "Error merge chunks")
return
}
artifact.FileSize = chunks[len(chunks)-1].End + 1
artifact.FileCompressedSize = chunks[len(chunks)-1].End + 1
}

checksum := ""
if req.Hash != nil {
checksum = req.Hash.Value
Expand Down
Loading