Skip to content

Commit

Permalink
Support multipart upload
Browse files Browse the repository at this point in the history
  • Loading branch information
pingiun committed Aug 3, 2021
1 parent 93e92ec commit ea8917b
Showing 1 changed file with 60 additions and 2 deletions.
62 changes: 60 additions & 2 deletions server/lib/object-storage.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import {
CompletedPart,
CompleteMultipartUploadCommand,
CreateMultipartUploadCommand,
DeleteObjectCommand,
DeleteObjectsCommand,
GetObjectCommand,
ListObjectsV2Command,
PutObjectCommand,
S3Client
S3Client,
UploadPartCommand
} from "@aws-sdk/client-s3"
import { CONFIG } from "@server/initializers/config"
import { logger } from '@server/helpers/logger'
import { createReadStream, createWriteStream, ensureDir, ReadStream } from "fs-extra"
import { createReadStream, createWriteStream, ensureDir, open, close, ReadStream, stat, Stats } from "fs-extra"
import { Readable } from "stream"
import { pipeline } from "stream/promises"
import { dirname } from "path"
import { min } from "lodash"

type BucketInfo = {BUCKET_NAME: string, PREFIX?: string, BASE_URL?: string}
const ONE_MIB = 1024 * 1024

function getS3Client () {
return new S3Client({ endpoint: `https://${CONFIG.OBJECT_STORAGE.ENDPOINT}` })
Expand All @@ -31,8 +37,60 @@ async function objectStoragePut (options: {filename: string, content: string | R
return await s3Client.send(command)
}

async function multiPartUpload (file: {filename: string, path: string}, stats: Stats, bucketInfo: BucketInfo) {
const { filename, path } = file
const key = bucketInfo.PREFIX + filename
const s3Client = getS3Client()
const createMultipartCommand = new CreateMultipartUploadCommand({
Bucket: bucketInfo.BUCKET_NAME,
Key: key
})
const createResponse = await s3Client.send(createMultipartCommand)
let partNumber = 1
const parts: CompletedPart[] = []
const fd = await open(path, 'r')
for (let start = 0; start < stats.size; start += 10 * ONE_MIB) {
logger.debug('Uploading part %d of file to %s/%s%s', partNumber, bucketInfo.BUCKET_NAME, bucketInfo.PREFIX, file.filename)
const stream: ReadStream & {byteLength: number} =
createReadStream(
path,
{ fd: fd, autoClose: false, start: start, end: (start + 10 * ONE_MIB) - 1 }
) as ReadStream & {byteLength: number}
// The s3 sdk needs to know the length of the http body beforehand, but doesn't support
// streams with start and end set, so it just tries to stat the file in stream.path.
// This fails for us because we only want to send part of the file. The stream type
// is modified so we can set the byteLength here, which s3 detects because array buffers
// have this field set
stream.byteLength = min([ stats.size - start, 10 * ONE_MIB ])
const uploadPartCommand = new UploadPartCommand({
Bucket: bucketInfo.BUCKET_NAME,
Key: key,
UploadId: createResponse.UploadId,
PartNumber: partNumber,
Body: stream
})
const uploadResponse = await s3Client.send(uploadPartCommand)
parts.push({ ETag: uploadResponse.ETag, PartNumber: partNumber })
partNumber += 1
}
await close(fd)
const completeUploadCommand = new CompleteMultipartUploadCommand({
Bucket: bucketInfo.BUCKET_NAME,
Key: key,
UploadId: createResponse.UploadId,
MultipartUpload: { Parts: parts }
})
await s3Client.send(completeUploadCommand)
logger.debug('Completed in %d parts of file to %s/%s%s', partNumber - 1, bucketInfo.BUCKET_NAME, bucketInfo.PREFIX, file.filename)
}

export async function storeObject (file: {path: string, filename: string}, bucketInfo: BucketInfo) {
logger.debug('Uploading file to %s/%s%s', bucketInfo.BUCKET_NAME, bucketInfo.PREFIX, file.filename)
const stats = await stat(file.path)
// If bigger than 100 MiB we do a multipart upload
if (stats.size > 5 * ONE_MIB) {
return await multiPartUpload(file, stats, bucketInfo)
}
const fileStream = createReadStream(file.path)
return await objectStoragePut({ filename: file.filename, content: fileStream, bucketInfo })
}
Expand Down

0 comments on commit ea8917b

Please sign in to comment.