Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(gate): use stream during artifact upload to s3 #841

Merged
merged 13 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
419 changes: 228 additions & 191 deletions .ghjk/lock.json

Large diffs are not rendered by default.

194 changes: 194 additions & 0 deletions deno.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions import_map.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@
"@std/path": "jsr:@std/path@^1.0.2",
"@std/path/": "jsr:/@std/path@^1.0.2/",
"@std/uuid": "jsr:@std/uuid@^1.0.1",
"aws-sdk/client-s3": "https://esm.sh/@aws-sdk/[email protected]?pin=v131",
"aws-sdk/s3-request-presigner": "https://esm.sh/@aws-sdk/[email protected]?pin=v131",
"aws-sdk/client-s3": "https://esm.sh/@aws-sdk/[email protected]?pin=v135",
"aws-sdk/lib-storage": "https://esm.sh/@aws-sdk/[email protected]?pin=v135",
"aws-sdk/s3-request-presigner": "https://esm.sh/@aws-sdk/[email protected]?pin=v135",
"dispose": "https://deno.land/x/[email protected]/mod.ts",
"graphql": "npm:[email protected]",
"jwt": "https://deno.land/x/[email protected]/mod.ts",
Expand Down
1 change: 0 additions & 1 deletion src/typegate/src/runtimes/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import { Runtime } from "./Runtime.ts";
import type { ComputeStage } from "../engine/query_engine.ts";
import type { RuntimeInitParams } from "../types.ts";
// import { iterParentStages, JSONValue } from "../utils.ts";
import {
GetObjectCommand,
type GetObjectCommandInput,
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/services/artifact_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ export class ArtifactService {
}

// TODO key?
const hash = await this.store.persistence.save(stream);
const hash = await this.store.persistence.save(stream, meta.sizeInBytes);
if (hash !== meta.hash) {
await this.store.persistence.delete(hash);
logger.warn("hash mismatch: {} {}", hash, meta.hash);
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/typegate/artifacts/local.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class LocalArtifactPersistence implements ArtifactPersistence {
await Deno.remove(this.dirs.artifacts, { recursive: true });
}

async save(stream: ReadableStream): Promise<string> {
async save(stream: ReadableStream, size: number): Promise<string> {
const tmpFile = await Deno.makeTempFile({ dir: this.dirs.temp });
const file = await Deno.open(tmpFile, { write: true, truncate: true });
const hasher = createHash("sha256");
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/typegate/artifacts/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export type ArtifactMeta = z.infer<typeof artifactMetaSchema>;

export interface ArtifactPersistence extends AsyncDisposable {
dirs: Dirs;
save(stream: ReadableStream): Promise<string>;
save(stream: ReadableStream, size: number): Promise<string>;
delete(hash: string): Promise<void>;
has(hash: string): Promise<boolean>;
/** Fetch the artifact to local file system and returns the path */
Expand Down
74 changes: 35 additions & 39 deletions src/typegate/src/typegate/artifacts/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { getLogger } from "../../log.ts";
// deno-lint-ignore no-external-import
import { createHash } from "node:crypto";
import type { TypegateCryptoKeys } from "../../crypto.ts";
import { S3 } from "aws-sdk/client-s3";
import { S3, S3Client } from "aws-sdk/client-s3";
import type {
ArtifactPersistence,
RefCounter,
Expand All @@ -20,6 +20,7 @@ import { exists } from "@std/fs/exists";
import { dirname } from "@std/path";
import { chunk } from "@std/collections/chunk";
import { ArtifactError } from "./mod.ts";
import { Upload } from "aws-sdk/lib-storage";

const logger = getLogger(import.meta);

Expand Down Expand Up @@ -52,13 +53,14 @@ class SharedArtifactPersistence implements ArtifactPersistence {
): Promise<SharedArtifactPersistence> {
const localShadow = await LocalArtifactPersistence.init(baseDir);
const s3 = new S3(syncConfig.s3);
return new SharedArtifactPersistence(localShadow, s3, syncConfig.s3Bucket);
return new SharedArtifactPersistence(localShadow, s3, syncConfig.s3Bucket, syncConfig);
}

constructor(
private localShadow: LocalArtifactPersistence,
private s3: S3,
private s3Bucket: string,
private syncConfig: SyncConfig
) {}

get dirs() {
Expand All @@ -70,51 +72,45 @@ class SharedArtifactPersistence implements ArtifactPersistence {
this.s3.destroy();
}

async save(stream: ReadableStream<any>): Promise<string> {
async save(stream: ReadableStream<any>, size: number): Promise<string> {
const hasher = createHash("sha256");

// TODO compatibility with Node.js streams?
// const stream2 = stream.pipeThrough(new HashTransformStream(hasher));
//
// const tempKey = resolveS3Key(
// `tmp/${Math.random().toString(36).substring(2)}`,
// );
//
// const _ = await this.s3.putObject({
// Bucket: this.s3Bucket,
// Body: stream2,
// Key: tempKey,
// });
// const hash = hasher.digest("hex");
//
// await this.s3.copyObject({
// Bucket: this.s3Bucket,
// CopySource: tempKey,
// Key: resolveS3Key(hash),
// });
//
// await this.s3.deleteObject({
// Bucket: this.s3Bucket,
// Key: tempKey,
// });
//
// return hash;

const tmpFile = await Deno.makeTempFile({ dir: this.dirs.temp });
const file = await Deno.open(tmpFile, { write: true, truncate: true });
await stream
.pipeThrough(new HashTransformStream(hasher))
.pipeTo(file.writable);
const stream2 = stream.pipeThrough(new HashTransformStream(hasher));

// temporary key is needed as we won't be able to get the hash sum of the file,
// which we use as the key of the object,
// before going through whole stream.
// so we create a temporary key to store the file/object and then copy the object after we have computed the hash.
const tempKey = resolveS3Key(this.s3Bucket,
destifo marked this conversation as resolved.
Show resolved Hide resolved
`tmp/${Math.random().toString(36).substring(2)}`,
);

const upload = new Upload({
client: new S3Client(this.syncConfig.s3),
params: {
Bucket: this.s3Bucket,
Key: tempKey,
Body: stream2,
ContentLength: size,
},
});

const _ = await upload.done();

const hash = hasher.digest("hex");
const body = await Deno.readFile(tmpFile);
logger.info(`persisting artifact to S3: ${hash}`);
const _ = await this.s3.putObject({

await this.s3.copyObject({
Bucket: this.s3Bucket,
Body: body,
CopySource: `${this.s3Bucket}/${tempKey}`,
Key: resolveS3Key(this.s3Bucket, hash),
});


await this.s3.deleteObject({
Bucket: this.s3Bucket,
Key: tempKey,
});

return hash;
}

Expand Down
Loading