Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Full Object Checksum API #2026

Merged
merged 6 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/go-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
ENABLE_HTTPS: 1
MINIO_KMS_MASTER_KEY: my-minio-key:6368616e676520746869732070617373776f726420746f206120736563726574
MINIO_CI_CD: true
MINT_NO_FULL_OBJECT: true
run: |
New-Item -ItemType Directory -Path "$env:temp/certs-dir"
Copy-Item -Path testcerts\* -Destination "$env:temp/certs-dir"
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jobs:
MINIO_KMS_MASTER_KEY: my-minio-key:6368616e676520746869732070617373776f726420746f206120736563726574
SSL_CERT_FILE: /tmp/certs-dir/public.crt
MINIO_CI_CD: true
MINT_NO_FULL_OBJECT: true
run: |
sudo apt update -y
sudo apt install devscripts -y
Expand Down
18 changes: 10 additions & 8 deletions api-datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,11 @@ type UploadInfo struct {
// Verified checksum values, if any.
// Values are base64 (standard) encoded.
// For multipart objects this is a checksum of the checksum of each part.
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
ChecksumCRC64NVME string
}

// RestoreInfo contains information of the restore operation of an archived object
Expand Down Expand Up @@ -215,10 +216,11 @@ type ObjectInfo struct {
Restore *RestoreInfo

// Checksum values
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
ChecksumCRC64NVME string

Internal *struct {
K int // Data blocks
Expand Down
48 changes: 22 additions & 26 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,7 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5, !opts.DisableContentSha256)
if len(hashSums) == 0 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
addAutoChecksumHeaders(&opts)
}

// Initiate a new multipart upload.
Expand All @@ -113,7 +110,6 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := opts.AutoChecksum.Hasher()
for partNumber <= totalPartsCount {
Expand Down Expand Up @@ -154,7 +150,6 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, sha256Hex: sha256Hex, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
Expand Down Expand Up @@ -182,18 +177,21 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
allParts := make([]ObjectPart, 0, len(partsInfo))
for i := 1; i < partNumber; i++ {
part, ok := partsInfo[i]
if !ok {
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
allParts = append(allParts, part)
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ChecksumCRC64NVME: part.ChecksumCRC64NVME,
})
}

Expand All @@ -203,12 +201,8 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{opts.AutoChecksum.Key(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
applyAutoChecksum(&opts, allParts)

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
Expand Down Expand Up @@ -354,10 +348,11 @@ func (c *Client) uploadPart(ctx context.Context, p uploadPartParams) (ObjectPart
// Once successfully uploaded, return completed part.
h := resp.Header
objPart := ObjectPart{
ChecksumCRC32: h.Get("x-amz-checksum-crc32"),
ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
ChecksumCRC32: h.Get(ChecksumCRC32.Key()),
ChecksumCRC32C: h.Get(ChecksumCRC32C.Key()),
ChecksumSHA1: h.Get(ChecksumSHA1.Key()),
ChecksumSHA256: h.Get(ChecksumSHA256.Key()),
ChecksumCRC64NVME: h.Get(ChecksumCRC64NVME.Key()),
}
objPart.Size = p.size
objPart.PartNumber = p.partNumber
Expand Down Expand Up @@ -457,9 +452,10 @@ func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, object
Expiration: expTime,
ExpirationRuleID: ruleID,

ChecksumSHA256: completeMultipartUploadResult.ChecksumSHA256,
ChecksumSHA1: completeMultipartUploadResult.ChecksumSHA1,
ChecksumCRC32: completeMultipartUploadResult.ChecksumCRC32,
ChecksumCRC32C: completeMultipartUploadResult.ChecksumCRC32C,
ChecksumSHA256: completeMultipartUploadResult.ChecksumSHA256,
ChecksumSHA1: completeMultipartUploadResult.ChecksumSHA1,
ChecksumCRC32: completeMultipartUploadResult.ChecksumCRC32,
ChecksumCRC32C: completeMultipartUploadResult.ChecksumCRC32C,
ChecksumCRC64NVME: completeMultipartUploadResult.ChecksumCRC64NVME,
}, nil
}
99 changes: 39 additions & 60 deletions api-put-object-streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
}
withChecksum := c.trailingHeaderSupport
if withChecksum {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
addAutoChecksumHeaders(&opts)
}
// Initiate a new multipart upload.
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
Expand Down Expand Up @@ -240,6 +237,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN

// Gather the responses as they occur and update any
// progress bar.
allParts := make([]ObjectPart, 0, totalPartsCount)
for u := 1; u <= totalPartsCount; u++ {
select {
case <-ctx.Done():
Expand All @@ -248,16 +246,17 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
if uploadRes.Error != nil {
return UploadInfo{}, uploadRes.Error
}

allParts = append(allParts, uploadRes.Part)
// Update the totalUploadedSize.
totalUploadedSize += uploadRes.Size
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: uploadRes.Part.ETag,
PartNumber: uploadRes.Part.PartNumber,
ChecksumCRC32: uploadRes.Part.ChecksumCRC32,
ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C,
ChecksumSHA1: uploadRes.Part.ChecksumSHA1,
ChecksumSHA256: uploadRes.Part.ChecksumSHA256,
ETag: uploadRes.Part.ETag,
PartNumber: uploadRes.Part.PartNumber,
ChecksumCRC32: uploadRes.Part.ChecksumCRC32,
ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C,
ChecksumSHA1: uploadRes.Part.ChecksumSHA1,
ChecksumSHA256: uploadRes.Part.ChecksumSHA256,
ChecksumCRC64NVME: uploadRes.Part.ChecksumCRC64NVME,
})
}
}
Expand All @@ -275,15 +274,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
AutoChecksum: opts.AutoChecksum,
}
if withChecksum {
// Add hash of hashes.
crc := opts.AutoChecksum.Hasher()
for _, part := range complMultipartUpload.Parts {
cs, err := base64.StdEncoding.DecodeString(part.Checksum(opts.AutoChecksum))
if err == nil {
crc.Write(cs)
}
}
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
applyAutoChecksum(&opts, allParts)
}

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
Expand Down Expand Up @@ -312,10 +303,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
}

if !opts.SendContentMd5 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
addAutoChecksumHeaders(&opts)
}

// Calculate the optimal parts info for a given size.
Expand All @@ -342,7 +330,6 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := opts.AutoChecksum.Hasher()
md5Hash := c.md5Hasher()
Expand Down Expand Up @@ -389,7 +376,6 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set(opts.AutoChecksum.KeyCapitalized(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

// Update progress reader appropriately to the latest offset
Expand Down Expand Up @@ -420,18 +406,21 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
allParts := make([]ObjectPart, 0, len(partsInfo))
for i := 1; i < partNumber; i++ {
part, ok := partsInfo[i]
if !ok {
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
allParts = append(allParts, part)
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ChecksumCRC64NVME: part.ChecksumCRC64NVME,
})
}

Expand All @@ -442,12 +431,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
applyAutoChecksum(&opts, allParts)
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
Expand Down Expand Up @@ -475,10 +459,7 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
opts.AutoChecksum = opts.Checksum
}
if !opts.SendContentMd5 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = opts.AutoChecksum.String()
addAutoChecksumHeaders(&opts)
}

// Cancel all when an error occurs.
Expand Down Expand Up @@ -510,7 +491,6 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
crc := opts.AutoChecksum.Hasher()

// Total data read and written to server. should be equal to 'size' at the end of the call.
Expand Down Expand Up @@ -570,7 +550,6 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set(opts.AutoChecksum.Key(), base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

wg.Add(1)
Expand Down Expand Up @@ -630,18 +609,21 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
allParts := make([]ObjectPart, 0, len(partsInfo))
for i := 1; i < partNumber; i++ {
part, ok := partsInfo[i]
if !ok {
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
allParts = append(allParts, part)
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
ChecksumCRC64NVME: part.ChecksumCRC64NVME,
})
}

Expand All @@ -652,12 +634,8 @@ func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketNam
ServerSideEncryption: opts.ServerSideEncryption,
AutoChecksum: opts.AutoChecksum,
}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{opts.AutoChecksum.KeyCapitalized(): base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
applyAutoChecksum(&opts, allParts)

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
Expand Down Expand Up @@ -823,9 +801,10 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string,
ExpirationRuleID: ruleID,

// Checksum values
ChecksumCRC32: h.Get("x-amz-checksum-crc32"),
ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
ChecksumCRC32: h.Get(ChecksumCRC32.Key()),
ChecksumCRC32C: h.Get(ChecksumCRC32C.Key()),
ChecksumSHA1: h.Get(ChecksumSHA1.Key()),
ChecksumSHA256: h.Get(ChecksumSHA256.Key()),
ChecksumCRC64NVME: h.Get(ChecksumCRC64NVME.Key()),
}, nil
}
Loading
Loading