diff --git a/cli/src/worker/annex.test.ts b/cli/src/worker/annex.test.ts new file mode 100644 index 0000000000..7d4f911e1d --- /dev/null +++ b/cli/src/worker/annex.test.ts @@ -0,0 +1,27 @@ +import { assertEquals, join } from "../deps.ts" +import { annexRelativePath, hashDirLower, hashDirMixed } from "./annex.ts" + +Deno.test("annexRelativePath() returns appropriate paths", () => { + assertEquals( + annexRelativePath("sub-01/anat/sub-01_T1w.nii.gz"), + join("..", ".."), + ) +}) + +Deno.test("hashDirLower() returns the correct key prefix", async () => { + assertEquals( + await hashDirLower( + "SHA256E-s311112--c3527d7944a9619afb57863a34e6af7ec3fe4f108e56c860d9e700699ff806fb.nii.gz", + ), + ["2ed", "6ea"], + ) +}) + +Deno.test("hashDirMixed() returns the correct key prefix", async () => { + assertEquals( + await hashDirMixed( + "SHA256E-s311112--c3527d7944a9619afb57863a34e6af7ec3fe4f108e56c860d9e700699ff806fb.nii.gz", + ), + ["Xk", "Mx"], + ) +}) diff --git a/cli/src/worker/annex.ts b/cli/src/worker/annex.ts new file mode 100644 index 0000000000..38d002b01c --- /dev/null +++ b/cli/src/worker/annex.ts @@ -0,0 +1,144 @@ +import { GitWorkerContext } from "./types/git-context.ts" +import { basename, dirname, git, join, relative } from "../deps.ts" +import { logger } from "../logger.ts" + +/** + * Why are we using hash wasm over web crypto? + * Web crypto cannot do streaming hashes of the common git-annex functions yet. + */ +import { createMD5, createSHA256 } from "npm:hash-wasm" + +/** + * Reusable hash factories + */ +const computeHashMD5 = await createMD5() +const computeHashSHA256 = await createSHA256() + +/** + * git-annex hashDirLower implementation based on https://git-annex.branchable.com/internals/hashing/ + * Compute the directory path from a git-annex filename + */ +export async function hashDirLower( + annexKey: string, +): Promise<[string, string]> { + const computeMD5 = await createMD5() + computeMD5.init() + computeMD5.update(annexKey) + const digest = computeMD5.digest("hex") + return [digest.slice(0, 3), digest.slice(3, 6)] +} + +/** + * git-annex hashDirMixed implementation based on https://git-annex.branchable.com/internals/hashing/ + */ +export async function hashDirMixed( + annexKey: string, +): Promise<[string, string]> { + const computeMD5 = await createMD5() + computeMD5.init() + computeMD5.update(annexKey) + const digest = computeMD5.digest("binary") + const firstWord = new DataView(digest.buffer).getUint32(0, true) + const nums = Array.from({ length: 4 }, (_, i) => (firstWord >> (6 * i)) & 31) + const letters = nums.map( + (num) => "0123456789zqjxkmvwgpfZQJXKMVWGPF".charAt(num), + ) + return [`${letters[1]}${letters[0]}`, `${letters[3]}${letters[2]}`] +} + +/** + * Return the relative path to the .git/annex directory from a repo relative path + * + * Used for symlink path cr\eation + */ +export function annexRelativePath(path: string) { + return relative(dirname(join("/", path)), "/") +} + +/** + * Add a file to a configured annex + * @param annexKeys Object with key to + * @param hash Git annex hash string (e.g. MD5E or SHA256) + * @param path Absolute path to the file being added + * @param relativePath Repo relative path for file being added + * @param size File size (to avoid additional stat call) + * @param context GitWorkerContext objects + */ +export async function annexAdd( + annexKeys: Record, + hash: string, + path: string, + relativePath: string, + size: number, + context: GitWorkerContext, +): Promise { + // E in the backend means include the file extension + let extension = "" + if (hash.endsWith("E")) { + const filename = basename(relativePath) + extension = filename.substring(filename.indexOf(".")) + } + // Compute hash + const computeHash = hash.startsWith("MD5") + ? computeHashMD5 + : computeHashSHA256 + computeHash.init() + const stream = context.fs.createReadStream(path, { + highWaterMark: 1024 * 1024 * 10, + }) + for await (const data of stream) { + computeHash.update(data) + } + const digest = computeHash.digest("hex") + const annexKey = `${hash}-s${size}--${digest}${extension}` + const annexPath = join( + ".git", + "annex", + "objects", + ...(await hashDirMixed(annexKey)), + annexKey, + annexKey, + ) + // Path to this file in our repo + const fileRepoPath = join(context.repoPath, relativePath) + + let link + let forceAdd = false + try { + // Test if the repo already has this object + link = await context.fs.promises.readlink(fileRepoPath) + } catch (_err) { + forceAdd = true + } + + // Calculate the relative symlinks for our file + const symlinkTarget = join( + annexRelativePath(relativePath), + annexPath, + ) + + // Key has changed if the existing link points to another object + if (forceAdd || link !== symlinkTarget) { + // Upload this key after the git commit + annexKeys[annexKey] = path + // This object has a new annex hash, update the symlink and add it + const symlinkTarget = join( + annexRelativePath(relativePath), + annexPath, + ) + // Verify parent directories exist + await context.fs.promises.mkdir(dirname(fileRepoPath), { recursive: true }) + // Remove the existing symlink or git file + await context.fs.promises.rm(fileRepoPath, { force: true }) + // Create our new symlink pointing at the right annex object + await context.fs.promises.symlink(symlinkTarget, fileRepoPath) + const options = { + ...context.config(), + filepath: relativePath, + } + await git.add(options) + return true + } else { + return false + } +} diff --git a/cli/src/worker/git.test.ts b/cli/src/worker/git.test.ts index 9182d19946..0c0c7f740a 100644 --- a/cli/src/worker/git.test.ts +++ b/cli/src/worker/git.test.ts @@ -1,30 +1,7 @@ -import { annexRelativePath, hashDirLower, hashDirMixed } from "./git.ts" import { assertArrayIncludes, assertEquals, git, join, walk, SEPARATOR } from "../deps.ts" import { addGitFiles } from "../commands/upload.ts" import fs from "node:fs" -Deno.test("annexRelativePath() returns appropriate paths", () => { - assertEquals(annexRelativePath("sub-01/anat/sub-01_T1w.nii.gz"), join('..', '..')) -}) - -Deno.test("hashDirLower() returns the correct key prefix", async () => { - assertEquals( - await hashDirLower( - "SHA256E-s311112--c3527d7944a9619afb57863a34e6af7ec3fe4f108e56c860d9e700699ff806fb.nii.gz", - ), - ["2ed", "6ea"], - ) -}) - -Deno.test("hashDirMixed() returns the correct key prefix", async () => { - assertEquals( - await hashDirMixed( - "SHA256E-s311112--c3527d7944a9619afb57863a34e6af7ec3fe4f108e56c860d9e700699ff806fb.nii.gz", - ), - ["Xk", "Mx"], - ) -}) - Deno.test("adds git and annexed content given a directory of files", async () => { const testUpload = await Deno.makeTempDir() const testRepo = await Deno.makeTempDir() diff --git a/cli/src/worker/git.ts b/cli/src/worker/git.ts index 0fd8f7e892..bd646a3ff7 100644 --- a/cli/src/worker/git.ts +++ b/cli/src/worker/git.ts @@ -1,6 +1,3 @@ -import { git, STAGE, TREE } from "../deps.ts" -import http from "npm:isomorphic-git@1.25.3/http/node/index.js" -import fs from "node:fs" import { decode } from "https://deno.land/x/djwt@v3.0.1/mod.ts" import { GitAnnexAttributes, @@ -8,71 +5,15 @@ import { matchGitAttributes, parseGitAttributes, } from "../gitattributes.ts" -import { basename, dirname, join, LevelName, relative } from "../deps.ts" +import { dirname, git, join, STAGE, TREE } from "../deps.ts" import { logger, setupLogging } from "../logger.ts" import { PromiseQueue } from "./queue.ts" import { checkKey, storeKey } from "./transferKey.ts" import { ProgressBar } from "../deps.ts" +import { annexAdd } from "./annex.ts" +import { GitWorkerContext, GitWorkerEventAdd } from "./types/git-context.ts" -/** - * Why are we using hash wasm over web crypto? - * Web crypto cannot do streaming hashes of the common git-annex functions yet. - */ -import { createMD5, createSHA256 } from "npm:hash-wasm" - -interface GitContext { - // Current working dataset - datasetId: string - // The path being uploaded from to OpenNeuro - sourcePath: string - // The path of our local clone (possibly in virtual fs) - repoPath: string - // URL for the remote git repo - repoEndpoint: string - // OpenNeuro git access short lived API key - authorization: string - // Author name - name: string - // Author email - email: string -} - -/** - * Events with no arguments - */ -interface GitWorkerEventGeneric { - data: { - command: "clone" | "commit" | "done" | "push" - } -} - -interface GitWorkerEventSetupData extends GitContext { - command: "setup" - logLevel: LevelName -} - -/** Setup event to set dataset and repo state for commands until next call */ -interface GitWorkerEventSetup { - data: GitWorkerEventSetupData -} - -/** Add event to add one file */ -interface GitWorkerEventAdd { - data: { - command: "add" - // Absolute path on the local system - path: string - // Dataset relative path - relativePath: string - } -} - -type GitWorkerEvent = - | GitWorkerEventSetup - | GitWorkerEventGeneric - | GitWorkerEventAdd - -let context: GitContext +let context: GitWorkerContext let attributesCache: GitAnnexAttributes /** @@ -89,36 +30,21 @@ async function done() { await globalThis.close() } -function gitOptions(dir: string) { - return { - fs, - http, - dir, - url: context.repoEndpoint, - headers: { - Authorization: `Bearer ${context.authorization}`, - }, - } -} - /** * Clone or fetch the draft */ async function update() { - const options = gitOptions(context.repoPath) try { - await fs.promises.access(join(context.repoPath, ".git")) + await context.fs.promises.access(join(context.repoPath, ".git")) logger.info( `Fetching ${context.datasetId} draft from "${context.repoEndpoint}"`, ) - await git.fetch(options) + await git.fetch(context.config()) } catch (_err) { logger.info( `Cloning ${context.datasetId} draft from "${context.repoEndpoint}"`, ) - await git.clone({ - ...options, - }) + await git.clone(context.config()) } logger.info(`${context.datasetId} draft fetched!`) } @@ -128,7 +54,7 @@ async function update() { */ async function getGitAttributes(): Promise { if (!attributesCache) { - const options = gitOptions(context.repoPath) + const options = context.config() try { const oid = await git.resolveRef({ ...options, ref: "main" }) || await git.resolveRef({ ...options, ref: "master" }) @@ -169,55 +95,11 @@ async function shouldBeAnnexed( return "SHA256E" } -/** - * git-annex hashDirLower implementation based on https://git-annex.branchable.com/internals/hashing/ - * Compute the directory path from a git-annex filename - */ -export async function hashDirLower( - annexKey: string, -): Promise<[string, string]> { - const computeMD5 = await createMD5() - computeMD5.init() - computeMD5.update(annexKey) - const digest = computeMD5.digest("hex") - return [digest.slice(0, 3), digest.slice(3, 6)] -} - -/** - * Return the relative path to the .git/annex directory from a repo relative path - * - * Used for symlink path creation - */ -export function annexRelativePath(path: string) { - return relative(dirname(join("/", path)), "/") -} - -/** - * git-annex hashDirMixed implementation based on https://git-annex.branchable.com/internals/hashing/ - */ -export async function hashDirMixed( - annexKey: string, -): Promise<[string, string]> { - const computeMD5 = await createMD5() - computeMD5.init() - computeMD5.update(annexKey) - const digest = computeMD5.digest("binary") - const firstWord = new DataView(digest.buffer).getUint32(0, true) - const nums = Array.from({ length: 4 }, (_, i) => (firstWord >> (6 * i)) & 31) - const letters = nums.map( - (num) => "0123456789zqjxkmvwgpfZQJXKMVWGPF".charAt(num), - ) - return [`${letters[1]}${letters[0]}`, `${letters[3]}${letters[2]}`] -} - -const computeHashMD5 = await createMD5() -const computeHashSHA256 = await createSHA256() - /** * git-annex add equivalent */ async function add(event: GitWorkerEventAdd) { - const { size } = await fs.promises.stat(event.data.path) + const { size } = await context.fs.promises.stat(event.data.path) const annexed = await shouldBeAnnexed( event.data.relativePath, size, @@ -225,82 +107,27 @@ async function add(event: GitWorkerEventAdd) { if (annexed === "GIT") { // Simple add case const options = { - ...gitOptions(context.repoPath), + ...context.config(), filepath: event.data.relativePath, } const targetPath = join(context.repoPath, event.data.relativePath) // Verify parent directories exist - await fs.promises.mkdir(dirname(targetPath), { recursive: true }) + await context.fs.promises.mkdir(dirname(targetPath), { recursive: true }) // Copy non-annexed files for git index creation - await fs.promises.copyFile(event.data.path, targetPath) + await context.fs.promises.copyFile(event.data.path, targetPath) await git.add(options) logger.info(`Add\t${event.data.relativePath}`) } else { - // E in the backend means include the file extension - let extension = "" - if (annexed.endsWith("E")) { - const filename = basename(event.data.relativePath) - extension = filename.substring(filename.indexOf(".")) - } - // Compute hash - const computeHash = annexed.startsWith("MD5") - ? computeHashMD5 - : computeHashSHA256 - computeHash.init() - const stream = fs.createReadStream(event.data.path, { - highWaterMark: 1024 * 1024 * 10, - }) - for await (const data of stream) { - computeHash.update(data) - } - const digest = computeHash.digest("hex") - const annexKey = `${annexed}-s${size}--${digest}${extension}` - const annexPath = join( - ".git", - "annex", - "objects", - ...(await hashDirMixed(annexKey)), - annexKey, - annexKey, - ) - // Path to this file in our repo - const fileRepoPath = join(context.repoPath, event.data.relativePath) - - let link - let forceAdd = false - try { - // Test if the repo already has this object - link = await fs.promises.readlink(fileRepoPath) - } catch (_err) { - forceAdd = true - } - - // Calculate the relative symlinks for our file - const symlinkTarget = join( - annexRelativePath(event.data.relativePath), - annexPath, - ) - - // Key has changed if the existing link points to another object - if (forceAdd || link !== symlinkTarget) { - // Upload this key after the git commit - annexKeys[annexKey] = event.data.path - // This object has a new annex hash, update the symlink and add it - const symlinkTarget = join( - annexRelativePath(event.data.relativePath), - annexPath, + if ( + await annexAdd( + annexKeys, + annexed, + event.data.path, + event.data.relativePath, + size, + context, ) - // Verify parent directories exist - await fs.promises.mkdir(dirname(fileRepoPath), { recursive: true }) - // Remove the existing symlink or git file - await fs.promises.rm(fileRepoPath, { force: true }) - // Create our new symlink pointing at the right annex object - await fs.promises.symlink(symlinkTarget, fileRepoPath) - const options = { - ...gitOptions(context.repoPath), - filepath: event.data.relativePath, - } - await git.add(options) + ) { logger.info(`Annexed\t${event.data.relativePath}`) } else { logger.info(`Unchanged\t${event.data.relativePath}`) @@ -327,7 +154,7 @@ interface OpenNeuroGitToken { * `git commit` equivalent */ async function commit() { - const options = gitOptions(context.repoPath) + const options = context.config() const decodedToken = decode(context.authorization) const { email, name } = decodedToken[1] as OpenNeuroGitToken let generateCommit = false @@ -432,7 +259,7 @@ async function push() { console.log("Pushing changes...") // Git push await git.push( - gitOptions(context.repoPath), + context.config(), ) const url = new URL(context.repoEndpoint) console.log( @@ -446,15 +273,15 @@ const workQueue = new PromiseQueue() // @ts-ignore Expected for workers self.onmessage = (event: GitWorkerEvent) => { if (event.data.command === "setup") { - context = { - datasetId: event.data.datasetId, - sourcePath: event.data.sourcePath, - repoPath: event.data.repoPath, - repoEndpoint: event.data.repoEndpoint, - authorization: event.data.authorization, - name: event.data.name, - email: event.data.email, - } + context = new GitWorkerContext( + event.data.datasetId, + event.data.sourcePath, + event.data.repoPath, + event.data.repoEndpoint, + event.data.authorization, + event.data.name, + event.data.email, + ) setupLogging(event.data.logLevel) } else if (event.data.command === "clone") { workQueue.enqueue(update) diff --git a/cli/src/worker/types/git-context.ts b/cli/src/worker/types/git-context.ts new file mode 100644 index 0000000000..075aac0349 --- /dev/null +++ b/cli/src/worker/types/git-context.ts @@ -0,0 +1,88 @@ +import http from "npm:isomorphic-git@1.25.3/http/node/index.js" +import fs from "node:fs" +import { LevelName } from "../../deps.ts" + +export class GitWorkerContext { + // Current working dataset ID + constructor( + public datasetId: string, + // The path being uploaded from to OpenNeuro + public sourcePath: string, + // The path of our local clone (possibly in virtual fs) + public repoPath: string, + // URL for the remote git repo + public repoEndpoint: string, + // OpenNeuro git access short lived API key + public authorization: string, + // Author name + public name: string, + // Author email + public email: string, + ) {} + + /** + * Return isomorphic config + */ + config(dir?: string) { + return { + fs, + http, + dir: dir ? dir : this.repoPath, + url: this.repoEndpoint, + headers: { + Authorization: `Bearer ${this.authorization}`, + }, + } + } + + /** + * Get proxy for fs + * TODO - Pick the right fs for the environment + */ + get fs() { + return fs + } + + /** + * Get proxy for http + * TODO - Pick the right http for the environment + */ + get http() { + return http + } +} + +/** + * Events with no arguments + */ +export interface GitWorkerEventGeneric { + data: { + command: "clone" | "commit" | "done" | "push" + } +} + +export interface GitWorkerEventSetupData extends GitWorkerContext { + command: "setup" + logLevel: LevelName +} + +/** Setup event to set dataset and repo state for commands until next call */ +export interface GitWorkerEventSetup { + data: GitWorkerEventSetupData +} + +/** Add event to add one file */ +export interface GitWorkerEventAdd { + data: { + command: "add" + // Absolute path on the local system + path: string + // Dataset relative path + relativePath: string + } +} + +export type GitWorkerEvent = + | GitWorkerEventSetup + | GitWorkerEventGeneric + | GitWorkerEventAdd diff --git a/deno.json b/deno.json index dc08c1f6c4..f4734400f4 100644 --- a/deno.json +++ b/deno.json @@ -15,6 +15,6 @@ }, "tasks": { "tests": "deno test --allow-read --allow-write cli/", - "coverage": "deno test --allow-read --allow-write --coverage cli/ && deno coverage ./coverage --lcov > coverage.lcov" + "coverage": "deno test --allow-read --allow-write --allow-net --coverage cli/ && deno coverage ./coverage --lcov > coverage.lcov" } }