From d4aee94d7e1bf6276e6b9cbf70cbae1a353627f8 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 26 Aug 2024 17:34:01 +0800 Subject: [PATCH 1/8] exports/{userId}/{date}/{uuid}.zip - metadata_{start_page}_to_{end_page}.json - /content - {slug}.html - /highlights - {slug}.md --- packages/export-handler/package.json | 2 + packages/export-handler/src/index.ts | 198 +++++++++++++++---------- yarn.lock | 213 ++++++++++++++++++++++++++- 3 files changed, 331 insertions(+), 82 deletions(-) diff --git a/packages/export-handler/package.json b/packages/export-handler/package.json index 7e7f14fdf9..42d22ba733 100644 --- a/packages/export-handler/package.json +++ b/packages/export-handler/package.json @@ -18,6 +18,7 @@ "dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\"" }, "devDependencies": { + "@types/archiver": "^6.0.2", "@types/chai": "^4.3.4", "@types/mocha": "^10.0.1", "eslint-plugin-prettier": "^4.0.0" @@ -28,6 +29,7 @@ "@omnivore-app/api": "^1.0.4", "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0", + "archiver": "^7.0.1", "csv-stringify": "^6.4.0", "dotenv": "^16.0.1", "jsonwebtoken": "^8.5.1", diff --git a/packages/export-handler/src/index.ts b/packages/export-handler/src/index.ts index 1a3aafa27c..c07c69673f 100644 --- a/packages/export-handler/src/index.ts +++ b/packages/export-handler/src/index.ts @@ -1,10 +1,11 @@ import { File, Storage } from '@google-cloud/storage' -import { Omnivore } from '@omnivore-app/api' +import { Highlight, Omnivore } from '@omnivore-app/api' import { RedisDataSource } from '@omnivore/utils' import * as Sentry from '@sentry/serverless' -import { stringify } from 'csv-stringify' +import archiver from 'archiver' import * as dotenv from 'dotenv' import * as jwt from 'jsonwebtoken' +import { PassThrough } from 'stream' import { v4 as uuidv4 } from 'uuid' import { queueEmailJob } from './job' @@ -35,7 +36,7 @@ const createSignedUrl = async (file: File): Promise => { return signedUrl[0] } -export const sendExportCompletedEmail = async ( +const sendExportCompletedEmail = async ( redisDataSource: RedisDataSource, userId: string, urlToDownload: string @@ -47,6 +48,24 @@ export const sendExportCompletedEmail = async ( }) } +const formatHighlightQuote = (quote: string): string => { + // replace all empty lines with blockquote '>' to preserve paragraphs + return quote.replace(/^(?=\n)$|^\s*?\n/gm, '> ') +} + +const highlightToMarkdown = (highlight: Highlight): string => { + if (highlight.type === 'HIGHLIGHT' && highlight.quote) { + const quote = formatHighlightQuote(highlight.quote) + const labels = highlight.labels?.map((label) => `#${label.name}`).join(' ') + const note = highlight.annotation + return `> ${quote} ${labels ? `\n\n${labels}` : ''}${ + note ? `\n\n${note}` : '' + }` + } + + return '' +} + export const exporter = Sentry.GCPFunction.wrapHttpFunction( async (req, res) => { console.log('start to export') @@ -81,103 +100,132 @@ export const exporter = Sentry.GCPFunction.wrapHttpFunction( }) try { - // write the exported data to a csv file and upload it to gcs - // path style: exports///.csv + // export data as a zip file: + // exports/{userId}/{date}/{uuid}.zip + // - metadata.json + // - /content + // - {slug}.html + // - /highlights + // - {slug}.md const dateStr = new Date().toISOString() const fileUuid = uuidv4() - const fullPath = `exports/${claims.uid}/${dateStr}/${fileUuid}.csv` + const fullPath = `exports/${claims.uid}/${dateStr}/${fileUuid}.zip` + const file = createGCSFile(GCS_BUCKET, fullPath) - // stringify the data and pipe it to the write_stream - const stringifier = stringify({ - header: true, - columns: [ - 'id', - 'title', - 'description', - 'labels', - 'author', - 'site_name', - 'original_url', - 'slug', - 'updated_at', - 'saved_at', - 'type', - 'published_at', - 'url', - 'thumbnail', - 'read_at', - 'word_count', - 'reading_progress_percent', - 'archived_at', - ], + // Create a PassThrough stream + const passthroughStream = new PassThrough() + + // Pipe the PassThrough stream to the GCS file write stream + const writeStream = file.createWriteStream({ + metadata: { + contentType: 'application/zip', + }, }) - stringifier - .pipe( - file.createWriteStream({ - contentType: 'text/csv', - }) - ) - .on('error', (err) => { - console.error('error writing to file', err) - }) - .on('finish', () => { - console.log('done writing to file') - }) + passthroughStream.pipe(writeStream) + + // Handle any errors in the streams + writeStream.on('error', (err) => { + console.error('Error writing to GCS:', err) + }) + + writeStream.on('finish', () => { + console.log('File successfully written to GCS') + }) + + // Initialize archiver for zipping files + const archive = archiver('zip', { + zlib: { level: 9 }, // Compression level + }) + + // Handle any archiver errors + archive.on('error', (err) => { + throw err + }) + + // Pipe the archiver output to the PassThrough stream + archive.pipe(passthroughStream) // fetch data from the database const omnivore = new Omnivore({ apiKey: claims.token, }) + const batchSize = 20 let cursor = 0 let hasNext = false do { const response = await omnivore.items.search({ - first: 100, + first: batchSize, after: cursor, - includeContent: false, + includeContent: true, + query: 'in:all', }) const items = response.edges.map((edge) => edge.node) - cursor = response.pageInfo.endCursor - ? parseInt(response.pageInfo.endCursor) - : 0 - hasNext = response.pageInfo.hasNextPage + const size = items.length // write data to the csv file - if (items.length > 0) { - // write the list of urls, state and labels to the stream - items.forEach((item) => - stringifier.write({ - id: item.id, - title: item.title, - description: item.description, - labels: item.labels?.map((label) => label.name).join(','), - author: item.author, - site_name: item.siteName, - original_url: item.originalArticleUrl, - slug: item.slug, - updated_at: item.updatedAt, - saved_at: item.savedAt, - type: item.pageType, - published_at: item.publishedAt, - url: item.url, - thumbnail: item.image, - read_at: item.readAt, - word_count: item.wordsCount, - reading_progress_percent: item.readingProgressPercent, - archived_at: item.archivedAt, - }) - ) - - // sleep for 1 second to avoid rate limiting - await new Promise((resolve) => setTimeout(resolve, 1000)) + if (size > 0) { + // Add the metadata.json file to the root of the zip + const metadata = items.map((item) => ({ + id: item.id, + slug: item.slug, + title: item.title, + description: item.description, + author: item.author, + url: item.originalArticleUrl, + state: item.isArchived ? 'archived' : 'active', + readingProgress: item.readingProgressPercent, + thumbnail: item.image, + labels: item.labels?.map((label) => label.name), + savedAt: item.savedAt, + updatedAt: item.updatedAt, + publishedAt: item.publishedAt, + })) + + archive.append(JSON.stringify(metadata, null, 2), { + name: `metadata_${cursor}_to_${cursor + size}.json`, + }) + + // Loop through the items and add files to /content and /highlights directories + items.forEach((item) => { + const slug = item.slug + const content = item.content + const highlights = item.highlights + if (content) { + // Add content files to /content + archive.append(content, { + name: `content/${slug}.html`, + }) + } + + if (highlights?.length) { + const markdown = highlights.map(highlightToMarkdown).join('\n\n') + + // Add highlight files to /highlights + archive.append(markdown, { + name: `highlights/${slug}.md`, + }) + } + }) + + cursor = response.pageInfo.endCursor + ? parseInt(response.pageInfo.endCursor) + : 0 + hasNext = response.pageInfo.hasNextPage } } while (hasNext) - stringifier.end() + // Finalize the archive + await archive.finalize() + + // Wait until the zip file is completely written + await new Promise((resolve, reject) => { + writeStream.on('finish', resolve) + writeStream.on('error', reject) + }) // generate a temporary signed url for the csv file const signedUrl = await createSignedUrl(file) diff --git a/yarn.lock b/yarn.lock index 6252be4886..2ecfc53e4f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7807,6 +7807,13 @@ resolved "https://registry.yarnpkg.com/@types/analytics-node/-/analytics-node-3.1.7.tgz#cb97c80ee505094e44a0188c3ad25f70c67e3c65" integrity sha512-qoBHCXqFqC22Up8dus8YIloZ2t1f8MJx9b3E08ZBK04yJ/ai8U2WuFUnaIBiD1okw4VtuNjqKn9mgLHnLxb5OQ== +"@types/archiver@^6.0.2": + version "6.0.2" + resolved "https://registry.yarnpkg.com/@types/archiver/-/archiver-6.0.2.tgz#0daf8c83359cbde69de1e4b33dcade6a48a929e2" + integrity sha512-KmROQqbQzKGuaAbmK+ZcytkJ51+YqDa7NmbXjmtC5YBLSyQYo21YaUnQ3HbaPFKL1ooo6RQ6OPYPIDyxfpDDXw== + dependencies: + "@types/readdir-glob" "*" + "@types/aria-query@^4.2.0": version "4.2.2" resolved "https://registry.yarnpkg.com/@types/aria-query/-/aria-query-4.2.2.tgz#ed4e0ad92306a704f9fb132a0cfcf77486dbe2bc" @@ -8692,6 +8699,13 @@ dependencies: "@types/react" "*" +"@types/readdir-glob@*": + version "1.1.5" + resolved "https://registry.yarnpkg.com/@types/readdir-glob/-/readdir-glob-1.1.5.tgz#21a4a98898fc606cb568ad815f2a0eedc24d412a" + integrity sha512-raiuEPUYqXu+nvtY2Pe8s8FEmZ3x5yAH4VkLdihcPdalvsHltomrRC9BzuStrJ9yk06470hS0Crw0f1pXqD+Hg== + dependencies: + "@types/node" "*" + "@types/request@*": version "2.48.7" resolved "https://registry.yarnpkg.com/@types/request/-/request-2.48.7.tgz#a962d11a26e0d71d9a9913d96bb806dc4d4c2f19" @@ -10192,6 +10206,32 @@ arch@^2.2.0: resolved "https://registry.yarnpkg.com/arch/-/arch-2.2.0.tgz#1bc47818f305764f23ab3306b0bfc086c5a29d11" integrity sha512-Of/R0wqp83cgHozfIYLbBMnej79U/SVGOOyuB3VVFv1NRM/PSFMK12x9KVtiYzJqmnU5WR2qp0Z5rHb7sWGnFQ== +archiver-utils@^5.0.0, archiver-utils@^5.0.2: + version "5.0.2" + resolved "https://registry.yarnpkg.com/archiver-utils/-/archiver-utils-5.0.2.tgz#63bc719d951803efc72cf961a56ef810760dd14d" + integrity sha512-wuLJMmIBQYCsGZgYLTy5FIB2pF6Lfb6cXMSF8Qywwk3t20zWnAi7zLcQFdKQmIB8wyZpY5ER38x08GbwtR2cLA== + dependencies: + glob "^10.0.0" + graceful-fs "^4.2.0" + is-stream "^2.0.1" + lazystream "^1.0.0" + lodash "^4.17.15" + normalize-path "^3.0.0" + readable-stream "^4.0.0" + +archiver@^7.0.1: + version "7.0.1" + resolved "https://registry.yarnpkg.com/archiver/-/archiver-7.0.1.tgz#c9d91c350362040b8927379c7aa69c0655122f61" + integrity sha512-ZcbTaIqJOfCc03QwD468Unz/5Ir8ATtvAHsK+FdXbDIbGfihqh9mrvdcYunQzqn4HrvWWaFyaxJhGZagaJJpPQ== + dependencies: + archiver-utils "^5.0.2" + async "^3.2.4" + buffer-crc32 "^1.0.0" + readable-stream "^4.0.0" + readdir-glob "^1.1.2" + tar-stream "^3.0.0" + zip-stream "^6.0.1" + archy@^1.0.0, archy@~1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/archy/-/archy-1.0.0.tgz#f9c8c13757cc1dd7bc379ac77b2c62a5c2868c40" @@ -10612,6 +10652,11 @@ async@^3.2.0, async@^3.2.3: resolved "https://registry.yarnpkg.com/async/-/async-3.2.3.tgz#ac53dafd3f4720ee9e8a160628f18ea91df196c9" integrity sha512-spZRyzKL5l5BZQrr/6m/SqFdBN0q3OCI0f9rjfBzCMBIP4p75P620rR3gTmaksNOhmzgdxcaxdNfMy6anrbM0g== +async@^3.2.4: + version "3.2.6" + resolved "https://registry.yarnpkg.com/async/-/async-3.2.6.tgz#1b0728e14929d51b85b449b7f06e27c1145e38ce" + integrity sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA== + asynciterator.prototype@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/asynciterator.prototype/-/asynciterator.prototype-1.0.0.tgz#8c5df0514936cdd133604dfcc9d3fb93f09b2b62" @@ -11526,6 +11571,11 @@ bser@2.1.1: dependencies: node-int64 "^0.4.0" +buffer-crc32@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/buffer-crc32/-/buffer-crc32-1.0.0.tgz#a10993b9055081d55304bd9feb4a072de179f405" + integrity sha512-Db1SbgBS/fg/392AblrMJk97KggmvYhr4pB5ZIMTWtaivCPMWLkmb7m21cJvpvgK+J3nsU2CmmixNBZx4vFj/w== + buffer-crc32@~0.2.3: version "0.2.13" resolved "https://registry.yarnpkg.com/buffer-crc32/-/buffer-crc32-0.2.13.tgz#0d333e3f00eac50aa1454abd30ef8c2a5d9a7242" @@ -12771,6 +12821,17 @@ component-emitter@^1.2.1, component-emitter@^1.3.0: resolved "https://registry.yarnpkg.com/component-emitter/-/component-emitter-1.3.0.tgz#16e4070fba8ae29b679f2215853ee181ab2eabc0" integrity sha512-Rd3se6QB+sO1TwqZjscQrurpEPIfO0/yYnSin6Q/rD3mOutHvUrCAhJub3r90uNb+SESBuE0QYoB90YdfatsRg== +compress-commons@^6.0.2: + version "6.0.2" + resolved "https://registry.yarnpkg.com/compress-commons/-/compress-commons-6.0.2.tgz#26d31251a66b9d6ba23a84064ecd3a6a71d2609e" + integrity sha512-6FqVXeETqWPoGcfzrXb37E50NP0LXT8kAMu5ooZayhWWdgEY4lBEEcbQNXtkuKQsGduxiIcI4gOTsxTmuq/bSg== + dependencies: + crc-32 "^1.2.0" + crc32-stream "^6.0.0" + is-stream "^2.0.1" + normalize-path "^3.0.0" + readable-stream "^4.0.0" + compressible@^2.0.12, compressible@~2.0.16: version "2.0.18" resolved "https://registry.yarnpkg.com/compressible/-/compressible-2.0.18.tgz#af53cca6b070d4c3c0750fbd77286a6d7cc46fba" @@ -13262,6 +13323,19 @@ cpy@^8.1.2: p-filter "^2.1.0" p-map "^3.0.0" +crc-32@^1.2.0: + version "1.2.2" + resolved "https://registry.yarnpkg.com/crc-32/-/crc-32-1.2.2.tgz#3cad35a934b8bf71f25ca524b6da51fb7eace2ff" + integrity sha512-ROmzCKrTnOwybPcJApAA6WBWij23HVfGVNKqqrZpuyZOHqK2CwHSvpGuyt/UNNvaIjEd8X5IFGp4Mh+Ie1IHJQ== + +crc32-stream@^6.0.0: + version "6.0.0" + resolved "https://registry.yarnpkg.com/crc32-stream/-/crc32-stream-6.0.0.tgz#8529a3868f8b27abb915f6c3617c0fadedbf9430" + integrity sha512-piICUB6ei4IlTv1+653yq5+KoqfBYmj9bw6LqXoOneTMDXk5nM1qt12mFW1caG3LlJXEKW1Bp0WggEmIfQB34g== + dependencies: + crc-32 "^1.2.0" + readable-stream "^4.0.0" + create-ecdh@^4.0.0: version "4.0.4" resolved "https://registry.yarnpkg.com/create-ecdh/-/create-ecdh-4.0.4.tgz#d6e7f4bffa66736085a0762fd3a632684dabcc4e" @@ -15553,7 +15627,7 @@ eventid@^2.0.0: dependencies: uuid "^8.0.0" -events@^3.0.0, events@^3.2.0: +events@^3.0.0, events@^3.2.0, events@^3.3.0: version "3.3.0" resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400" integrity sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q== @@ -17116,6 +17190,18 @@ glob@7.2.0: once "^1.3.0" path-is-absolute "^1.0.0" +glob@^10.0.0: + version "10.4.5" + resolved "https://registry.yarnpkg.com/glob/-/glob-10.4.5.tgz#f4d9f0b90ffdbab09c9d77f5f29b4262517b0956" + integrity sha512-7Bv8RF0k6xjo7d4A/PxYLbUCfb6c+Vpd2/mB2yRDlew7Jb5hEXiCD9ibfO7wpk8i4sevK6DFny9h7EYbM3/sHg== + dependencies: + foreground-child "^3.1.0" + jackspeak "^3.1.2" + minimatch "^9.0.4" + minipass "^7.1.2" + package-json-from-dist "^1.0.0" + path-scurry "^1.11.1" + glob@^10.2.2: version "10.3.10" resolved "https://registry.yarnpkg.com/glob/-/glob-10.3.10.tgz#0351ebb809fd187fe421ab96af83d3a70715df4b" @@ -19269,7 +19355,7 @@ is-stream@^1.1.0: resolved "https://registry.yarnpkg.com/is-stream/-/is-stream-1.1.0.tgz#12d4a3dd4e68e0b79ceb8dbc84173ae80d91ca44" integrity sha1-EtSj3U5o4Lec6428hBc66A2RykQ= -is-stream@^2.0.0: +is-stream@^2.0.0, is-stream@^2.0.1: version "2.0.1" resolved "https://registry.yarnpkg.com/is-stream/-/is-stream-2.0.1.tgz#fac1e3d53b97ad5a9d0ae9cef2389f5810a5c077" integrity sha512-hFoiJiTl63nn+kstHGBtewWSKnQLpyb155KHheA1l39uvtO9nWIop1p3udqPcUd/xbF1VLMO4n7OI6p7RbngDg== @@ -19604,6 +19690,15 @@ jackspeak@^2.3.5: optionalDependencies: "@pkgjs/parseargs" "^0.11.0" +jackspeak@^3.1.2: + version "3.4.3" + resolved "https://registry.yarnpkg.com/jackspeak/-/jackspeak-3.4.3.tgz#8833a9d89ab4acde6188942bd1c53b6390ed5a8a" + integrity sha512-OGlZQpz2yfahA/Rd1Y8Cd9SIEsqvXkLVoSw/cgwhnhFMDbsQFeZYoJJ7bIZBS9BcamUW96asq/npPWugM+RQBw== + dependencies: + "@isaacs/cliui" "^8.0.2" + optionalDependencies: + "@pkgjs/parseargs" "^0.11.0" + jaeger-client@^3.15.0: version "3.18.1" resolved "https://registry.yarnpkg.com/jaeger-client/-/jaeger-client-3.18.1.tgz#a8c7a778244ba117f4fb8775eb6aa5508703564e" @@ -20737,6 +20832,13 @@ lazy-universal-dotenv@^3.0.1: dotenv "^8.0.0" dotenv-expand "^5.1.0" +lazystream@^1.0.0: + version "1.0.1" + resolved "https://registry.yarnpkg.com/lazystream/-/lazystream-1.0.1.tgz#494c831062f1f9408251ec44db1cba29242a2638" + integrity sha512-b94GiNHQNy6JNTrt5w6zNyffMrNkXZb3KTkCZJb2V1xaEGCk093vkZ2jk3tpaeP33/OiXC+WvK9AxUebnf5nbw== + dependencies: + readable-stream "^2.0.5" + lcov-parse@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/lcov-parse/-/lcov-parse-1.0.0.tgz#eb0d46b54111ebc561acb4c408ef9363bdc8f7e0" @@ -21597,6 +21699,11 @@ lowlight@^1.14.0: fault "^1.0.0" highlight.js "~10.7.0" +lru-cache@^10.2.0: + version "10.4.3" + resolved "https://registry.yarnpkg.com/lru-cache/-/lru-cache-10.4.3.tgz#410fc8a17b70e598013df257c2446b7f3383f119" + integrity sha512-JNAzZcXrCt42VGLuYz0zfAzDfAvJWW6AfYlDBQyDV5DClI2m5sAmK+OIO7s59XfsRsWHp02jAJrRadPRGTt6SQ== + lru-cache@^4.1.5: version "4.1.5" resolved "https://registry.yarnpkg.com/lru-cache/-/lru-cache-4.1.5.tgz#8bbe50ea85bed59bc9e33dcab8235ee9bcf443cd" @@ -22725,7 +22832,7 @@ minimatch@^9.0.0, minimatch@^9.0.1: dependencies: brace-expansion "^2.0.1" -minimatch@^9.0.3: +minimatch@^9.0.3, minimatch@^9.0.4: version "9.0.5" resolved "https://registry.yarnpkg.com/minimatch/-/minimatch-9.0.5.tgz#d74f9dd6b57d83d8e98cfb82133b03978bc929e5" integrity sha512-G6T0ZX48xgozx7587koeX9Ys2NYy6Gmv//P89sEte9V9whIapMNF4idKxnW2QtCcLiTWlb/wfCabAtAFWhhBow== @@ -22838,6 +22945,11 @@ minipass@^5.0.0: resolved "https://registry.yarnpkg.com/minipass/-/minipass-7.0.4.tgz#dbce03740f50a4786ba994c1fb908844d27b038c" integrity sha512-jYofLM5Dam9279rdkWzqHozUo4ybjdZmCsDHePy5V/PbBcVMiSZR97gmAy45aqi8CK1lG2ECd356FU86avfwUQ== +minipass@^7.1.2: + version "7.1.2" + resolved "https://registry.yarnpkg.com/minipass/-/minipass-7.1.2.tgz#93a9626ce5e5e66bd4db86849e7515e92340a707" + integrity sha512-qOOzS1cBTWYF4BH8fVePDBOO9iptMnGUEZwNc/cMWnTV2nVLZ7VoNWEPHkYczZA0pdoA7dl6e7FL659nX9S2aw== + ministyle@~0.1.3: version "0.1.4" resolved "https://registry.yarnpkg.com/ministyle/-/ministyle-0.1.4.tgz#b10481eb16aa8f7b6cd983817393a44da0e5a0cd" @@ -24894,6 +25006,11 @@ package-hash@^4.0.0: lodash.flattendeep "^4.4.0" release-zalgo "^1.0.0" +package-json-from-dist@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/package-json-from-dist/-/package-json-from-dist-1.0.0.tgz#e501cd3094b278495eb4258d4c9f6d5ac3019f00" + integrity sha512-dATvCeZN/8wQsGywez1mzHtTlP22H8OEfPrVMLNr4/eGa+ijtLn/6M5f0dY8UKNrC2O9UCU6SSoG3qRKnt7STw== + package-json@^6.3.0: version "6.5.0" resolved "https://registry.yarnpkg.com/package-json/-/package-json-6.5.0.tgz#6feedaca35e75725876d0b0e64974697fed145b0" @@ -25265,6 +25382,14 @@ path-scurry@^1.10.1, path-scurry@^1.6.1: lru-cache "^9.1.1 || ^10.0.0" minipass "^5.0.0 || ^6.0.2 || ^7.0.0" +path-scurry@^1.11.1: + version "1.11.1" + resolved "https://registry.yarnpkg.com/path-scurry/-/path-scurry-1.11.1.tgz#7960a668888594a0720b12a911d1a742ab9f11d2" + integrity sha512-Xa4Nw17FS9ApQFJ9umLiJS4orGjm7ZzwUrwamcGQuHSzDyth9boKDaycYdDcZDuqYATXw4HFXgaqWTctW/v1HA== + dependencies: + lru-cache "^10.2.0" + minipass "^5.0.0 || ^6.0.2 || ^7.0.0" + path-to-regexp@0.1.7: version "0.1.7" resolved "https://registry.yarnpkg.com/path-to-regexp/-/path-to-regexp-0.1.7.tgz#df604178005f522f15eb4490e7247a1bfaa67f8c" @@ -27409,6 +27534,19 @@ readable-stream@3, readable-stream@^3.6.2: string_decoder "^1.1.1" util-deprecate "^1.0.1" +readable-stream@^2.0.5: + version "2.3.8" + resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-2.3.8.tgz#91125e8042bba1b9887f49345f6277027ce8be9b" + integrity sha512-8p0AUk4XODgIewSi0l8Epjs+EVnWiK7NoDIEGU0HhE7+ZyY8D1IMY7odu5lRrFXGg71L15KG8QrPmum45RTtdA== + dependencies: + core-util-is "~1.0.0" + inherits "~2.0.3" + isarray "~1.0.0" + process-nextick-args "~2.0.0" + safe-buffer "~5.1.1" + string_decoder "~1.1.1" + util-deprecate "~1.0.1" + readable-stream@^3.0.0, readable-stream@^3.0.2, readable-stream@^3.0.6, readable-stream@^3.1.1, readable-stream@^3.4.0, readable-stream@^3.6.0: version "3.6.0" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.6.0.tgz#337bbda3adc0706bd3e024426a286d4b4b2c9198" @@ -27418,6 +27556,24 @@ readable-stream@^3.0.0, readable-stream@^3.0.2, readable-stream@^3.0.6, readable string_decoder "^1.1.1" util-deprecate "^1.0.1" +readable-stream@^4.0.0: + version "4.5.2" + resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-4.5.2.tgz#9e7fc4c45099baeed934bff6eb97ba6cf2729e09" + integrity sha512-yjavECdqeZ3GLXNgRXgeQEdz9fvDDkNKyHnbHRFtOr7/LcfgBcmct7t/ET+HaCTqfh06OzoAxrkN/IfjJBVe+g== + dependencies: + abort-controller "^3.0.0" + buffer "^6.0.3" + events "^3.3.0" + process "^0.11.10" + string_decoder "^1.3.0" + +readdir-glob@^1.1.2: + version "1.1.3" + resolved "https://registry.yarnpkg.com/readdir-glob/-/readdir-glob-1.1.3.tgz#c3d831f51f5e7bfa62fa2ffbe4b508c640f09584" + integrity sha512-v05I2k7xN8zXvPD9N+z/uhXPaj0sUFCe2rcWZIpBsqxfP7xXFQ0tipAd/wjj1YxWyWtUS5IDJpOG82JKt2EAVA== + dependencies: + minimatch "^5.1.0" + readdir-scoped-modules@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/readdir-scoped-modules/-/readdir-scoped-modules-1.1.0.tgz#8d45407b4f870a0dcaebc0e28670d18e74514309" @@ -29395,7 +29551,7 @@ string-template@~0.2.1: resolved "https://registry.yarnpkg.com/string-template/-/string-template-0.2.1.tgz#42932e598a352d01fc22ec3367d9d84eec6c9add" integrity sha1-QpMuWYo1LQH8IuwzZ9nYTuxsmt0= -"string-width-cjs@npm:string-width@^4.2.0", "string-width@^1.0.2 || 2 || 3 || 4", string-width@^4.2.2, string-width@^4.2.3: +"string-width-cjs@npm:string-width@^4.2.0": version "4.2.3" resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== @@ -29421,6 +29577,15 @@ string-width@^1.0.1: is-fullwidth-code-point "^2.0.0" strip-ansi "^4.0.0" +"string-width@^1.0.2 || 2 || 3 || 4", string-width@^4.2.2, string-width@^4.2.3: + version "4.2.3" + resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" + integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== + dependencies: + emoji-regex "^8.0.0" + is-fullwidth-code-point "^3.0.0" + strip-ansi "^6.0.1" + string-width@^3.0.0: version "3.1.0" resolved "https://registry.yarnpkg.com/string-width/-/string-width-3.1.0.tgz#22767be21b62af1081574306f69ac51b62203961" @@ -29561,7 +29726,7 @@ string.prototype.trimstart@^1.0.7: define-properties "^1.2.0" es-abstract "^1.22.1" -string_decoder@^1.0.0, string_decoder@^1.1.1: +string_decoder@^1.0.0, string_decoder@^1.1.1, string_decoder@^1.3.0: version "1.3.0" resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.3.0.tgz#42f114594a46cf1a8e30b0a84f56c78c3edac21e" integrity sha512-hkRX8U1WjJFd8LsDJ2yQ/wWWxaopEsABU1XfkM8A+j0+85JAGppt16cr1Whg6KIbb4okU6Mql6BOj+uup/wKeA== @@ -29575,7 +29740,7 @@ string_decoder@~1.1.1: dependencies: safe-buffer "~5.1.0" -"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@^6.0.1: +"strip-ansi-cjs@npm:strip-ansi@^6.0.1": version "6.0.1" resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== @@ -29610,6 +29775,13 @@ strip-ansi@^6.0.0: dependencies: ansi-regex "^5.0.0" +strip-ansi@^6.0.1: + version "6.0.1" + resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" + integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== + dependencies: + ansi-regex "^5.0.1" + strip-ansi@^7.0.0: version "7.0.1" resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-7.0.1.tgz#61740a08ce36b61e50e65653f07060d000975fb2" @@ -29957,6 +30129,15 @@ tar-stream@^2.1.4, tar-stream@~2.2.0: inherits "^2.0.3" readable-stream "^3.1.1" +tar-stream@^3.0.0: + version "3.1.7" + resolved "https://registry.yarnpkg.com/tar-stream/-/tar-stream-3.1.7.tgz#24b3fb5eabada19fe7338ed6d26e5f7c482e792b" + integrity sha512-qJj60CXt7IU1Ffyc3NJMjh6EkuCFej46zUqJ4J7pqYlThyd9bO0XBTmcOIhSzZJVWfsLks0+nle/j538YAW9RQ== + dependencies: + b4a "^1.6.4" + fast-fifo "^1.2.0" + streamx "^2.15.0" + tar-stream@^3.1.5: version "3.1.6" resolved "https://registry.yarnpkg.com/tar-stream/-/tar-stream-3.1.6.tgz#6520607b55a06f4a2e2e04db360ba7d338cc5bab" @@ -32309,7 +32490,7 @@ workerpool@6.2.1: resolved "https://registry.yarnpkg.com/workerpool/-/workerpool-6.2.1.tgz#46fc150c17d826b86a008e5a4508656777e9c343" integrity sha512-ILEIE97kDZvF9Wb9f6h5aXK4swSlKGUcOEGiIYb2OOu/IrDU9iwj0fD//SsA6E5ibwJxpEvhullJY4Sl4GcpAw== -"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0", wrap-ansi@^7.0.0: +"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0": version "7.0.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43" integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q== @@ -32335,6 +32516,15 @@ wrap-ansi@^6.0.1, wrap-ansi@^6.2.0: string-width "^4.1.0" strip-ansi "^6.0.0" +wrap-ansi@^7.0.0: + version "7.0.0" + resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43" + integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q== + dependencies: + ansi-styles "^4.0.0" + string-width "^4.1.0" + strip-ansi "^6.0.0" + wrap-ansi@^8.1.0: version "8.1.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-8.1.0.tgz#56dc22368ee570face1b49819975d9b9a5ead214" @@ -32665,6 +32855,15 @@ yup@^0.31.0: property-expr "^2.0.4" toposort "^2.0.2" +zip-stream@^6.0.1: + version "6.0.1" + resolved "https://registry.yarnpkg.com/zip-stream/-/zip-stream-6.0.1.tgz#e141b930ed60ccaf5d7fa9c8260e0d1748a2bbfb" + integrity sha512-zK7YHHz4ZXpW89AHXUPbQVGKI7uvkd3hzusTdotCg1UxyaVtg0zFJSTfW/Dq5f7OBBVnq6cZIaC8Ti4hb6dtCA== + dependencies: + archiver-utils "^5.0.0" + compress-commons "^6.0.2" + readable-stream "^4.0.0" + zod-to-json-schema@^3.22.3: version "3.22.4" resolved "https://registry.yarnpkg.com/zod-to-json-schema/-/zod-to-json-schema-3.22.4.tgz#f8cc691f6043e9084375e85fb1f76ebafe253d70" From a75acc5185ebca0c45a1bc71240f8987320ec7c7 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Mon, 26 Aug 2024 19:05:09 +0800 Subject: [PATCH 2/8] add export job and get api --- packages/api/src/jobs/export.ts | 40 ++++++++++++++++ packages/api/src/routers/export_router.ts | 58 +++++++++++++++++++++++ packages/api/src/util.ts | 3 ++ packages/api/src/utils/createTask.ts | 23 ++++++++- 4 files changed, 122 insertions(+), 2 deletions(-) create mode 100644 packages/api/src/jobs/export.ts create mode 100644 packages/api/src/routers/export_router.ts diff --git a/packages/api/src/jobs/export.ts b/packages/api/src/jobs/export.ts new file mode 100644 index 0000000000..8d6f6dc5b7 --- /dev/null +++ b/packages/api/src/jobs/export.ts @@ -0,0 +1,40 @@ +import axios from 'axios' +import jwt from 'jsonwebtoken' +import { env } from '../env' +import { findActiveUser } from '../services/user' +import { logger } from '../utils/logger' + +export interface ExportJobData { + userId: string +} + +export const EXPORT_JOB_NAME = 'export' + +export const exportJob = async (jobData: ExportJobData) => { + const { userId } = jobData + const user = await findActiveUser(userId) + if (!user) { + logger.error('user not found', { + userId, + }) + return + } + + logger.info('exporting all items...', { + userId, + }) + + const token = jwt.sign( + { + uid: userId, + }, + env.server.jwtSecret, + { expiresIn: '1d' } + ) + + await axios.post(env.queue.exportTaskHandlerUrl, undefined, { + headers: { + OmnivoreAuthorizationHeader: token, + }, + }) +} diff --git a/packages/api/src/routers/export_router.ts b/packages/api/src/routers/export_router.ts new file mode 100644 index 0000000000..495d1a3fc5 --- /dev/null +++ b/packages/api/src/routers/export_router.ts @@ -0,0 +1,58 @@ +import cors from 'cors' +import express, { Router } from 'express' +import { getClaimsByToken, getTokenByRequest } from '../utils/auth' +import { corsConfig } from '../utils/corsConfig' +import { queueExportJob } from '../utils/createTask' +import { logger } from '../utils/logger' + +export function exportRouter() { + const router = Router() + + // eslint-disable-next-line @typescript-eslint/no-misused-promises + router.get('/', cors(corsConfig), async (req, res) => { + const token = getTokenByRequest(req) + // get claims from token + const claims = await getClaimsByToken(token) + if (!claims) { + logger.error('Token not found') + return res.status(401).send({ + error: 'UNAUTHORIZED', + }) + } + + // get user by uid from claims + const userId = claims.uid + + try { + const job = await queueExportJob(userId) + + if (!job) { + logger.error('Failed to queue export job', { + userId, + }) + return res.status(500).send({ + error: 'INTERNAL_ERROR', + }) + } + + logger.info('Export job queued', { + userId, + jobId: job.id, + }) + + res.send({ + jobId: job.id, + }) + } catch (error) { + logger.error('Error exporting all items', { + userId, + error, + }) + return res.status(500).send({ + error: 'INTERNAL_ERROR', + }) + } + }) + + return router +} diff --git a/packages/api/src/util.ts b/packages/api/src/util.ts index 9590c2cd61..8d0eefc5cc 100755 --- a/packages/api/src/util.ts +++ b/packages/api/src/util.ts @@ -87,6 +87,7 @@ export interface BackendEnv { integrationExporterUrl: string integrationImporterUrl: string importerMetricsUrl: string + exportTaskHandlerUrl: string } fileUpload: { gcsUploadBucket: string @@ -199,6 +200,7 @@ const nullableEnvVars = [ 'INTERCOM_WEB_SECRET', 'INTERCOM_IOS_SECRET', 'INTERCOM_ANDROID_SECRET', + 'EXPORT_TASK_HANDLER_URL', ] // Allow some vars to be null/empty const envParser = @@ -300,6 +302,7 @@ export function getEnv(): BackendEnv { integrationExporterUrl: parse('INTEGRATION_EXPORTER_URL'), integrationImporterUrl: parse('INTEGRATION_IMPORTER_URL'), importerMetricsUrl: parse('IMPORTER_METRICS_COLLECTOR_URL'), + exportTaskHandlerUrl: parse('EXPORT_TASK_HANDLER_URL'), } const imageProxy = { url: parse('IMAGE_PROXY_URL'), diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 2b21e63b07..4b704d9c0e 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -29,6 +29,7 @@ import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action' import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook' import { SendEmailJobData, SEND_EMAIL_JOB } from '../jobs/email/send_email' import { EXPIRE_FOLDERS_JOB_NAME } from '../jobs/expire_folders' +import { EXPORT_JOB_NAME } from '../jobs/export' import { THUMBNAIL_JOB } from '../jobs/find_thumbnail' import { GENERATE_PREVIEW_CONTENT_JOB } from '../jobs/generate_preview_content' import { EXPORT_ALL_ITEMS_JOB_NAME } from '../jobs/integration/export_all_items' @@ -113,14 +114,13 @@ export const getJobPriority = (jobName: string): number => { case THUMBNAIL_JOB: return 10 case `${REFRESH_FEED_JOB_NAME}_low`: - case EXPORT_ITEM_JOB_NAME: case CREATE_DIGEST_JOB: return 50 - case EXPORT_ALL_ITEMS_JOB_NAME: case REFRESH_ALL_FEEDS_JOB_NAME: case GENERATE_PREVIEW_CONTENT_JOB: case PRUNE_TRASH_JOB: case EXPIRE_FOLDERS_JOB_NAME: + case EXPORT_JOB_NAME: return 100 default: @@ -1073,4 +1073,23 @@ export const enqueueExpireFoldersJob = async () => { ) } +export const queueExportJob = async (userId: string) => { + const queue = await getQueue() + if (!queue) { + return undefined + } + + return queue.add( + EXPORT_JOB_NAME, + { userId }, + { + jobId: `${EXPORT_JOB_NAME}_${userId}_${JOB_VERSION}`, + removeOnComplete: true, + removeOnFail: true, + priority: getJobPriority(EXPORT_JOB_NAME), + attempts: 1, + } + ) +} + export default createHttpTaskWithToken From 444c78f0cb0cdedee8c47870dbb758d0ab7c9189 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 27 Aug 2024 14:19:02 +0800 Subject: [PATCH 3/8] use async job to handle exporter --- packages/api/package.json | 2 + packages/api/src/jobs/export.ts | 161 ++++++++++++++++++++-- packages/api/src/routers/export_router.ts | 5 +- packages/api/src/services/send_emails.ts | 11 ++ packages/api/src/utils/parser.ts | 20 ++- packages/api/src/utils/uploads.ts | 13 +- packages/export-handler/package.json | 1 - 7 files changed, 192 insertions(+), 21 deletions(-) diff --git a/packages/api/package.json b/packages/api/package.json index 826a2217c3..dfb233d4c9 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -51,6 +51,7 @@ "alfaaz": "^1.1.0", "apollo-datasource": "^3.3.1", "apollo-server-express": "^3.6.3", + "archiver": "^7.0.1", "axios": "^0.27.2", "bcryptjs": "^2.4.3", "bullmq": "^5.1.1", @@ -123,6 +124,7 @@ "@istanbuljs/nyc-config-typescript": "^1.0.2", "@types/addressparser": "^1.0.1", "@types/analytics-node": "^3.1.7", + "@types/archiver": "^6.0.2", "@types/bcryptjs": "^2.4.2", "@types/chai": "^4.2.18", "@types/chai-as-promised": "^7.1.5", diff --git a/packages/api/src/jobs/export.ts b/packages/api/src/jobs/export.ts index 8d6f6dc5b7..3c78844110 100644 --- a/packages/api/src/jobs/export.ts +++ b/packages/api/src/jobs/export.ts @@ -1,14 +1,71 @@ -import axios from 'axios' -import jwt from 'jsonwebtoken' -import { env } from '../env' +import archiver, { Archiver } from 'archiver' +import { v4 as uuidv4 } from 'uuid' +import { LibraryItem } from '../entity/library_item' +import { findHighlightsByLibraryItemId } from '../services/highlights' +import { searchLibraryItems } from '../services/library_item' +import { sendExportCompletedEmail } from '../services/send_emails' import { findActiveUser } from '../services/user' import { logger } from '../utils/logger' +import { highlightToMarkdown } from '../utils/parser' +import { createGCSFile, generateDownloadSignedUrl } from '../utils/uploads' export interface ExportJobData { userId: string } export const EXPORT_JOB_NAME = 'export' +const GCS_BUCKET = 'omnivore-export' + +const uploadToBucket = async ( + userId: string, + items: Array, + cursor: number, + size: number, + archive: Archiver +): Promise => { + // Add the metadata.json file to the root of the zip + const metadata = items.map((item) => ({ + id: item.id, + slug: item.slug, + title: item.title, + description: item.description, + author: item.author, + url: item.originalUrl, + state: item.state, + readingProgress: item.readingProgressBottomPercent, + thumbnail: item.thumbnail, + labels: item.labelNames, + savedAt: item.savedAt, + updatedAt: item.updatedAt, + publishedAt: item.publishedAt, + })) + + const endCursor = cursor + size + archive.append(JSON.stringify(metadata, null, 2), { + name: `metadata_${cursor}_to_${endCursor}.json`, + }) + + // Loop through the items and add files to /content and /highlights directories + for (const item of items) { + const slug = item.slug + // Add content files to /content + archive.append(item.readableContent, { + name: `content/${slug}.html`, + }) + + if (item.highlightAnnotations?.length) { + const highlights = await findHighlightsByLibraryItemId(item.id, userId) + const markdown = highlights.map(highlightToMarkdown).join('\n\n') + + // Add highlight files to /highlights + archive.append(markdown, { + name: `highlights/${slug}.md`, + }) + } + } + + return endCursor +} export const exportJob = async (jobData: ExportJobData) => { const { userId } = jobData @@ -24,17 +81,95 @@ export const exportJob = async (jobData: ExportJobData) => { userId, }) - const token = jwt.sign( - { - uid: userId, - }, - env.server.jwtSecret, - { expiresIn: '1d' } - ) + // export data as a zip file: + // exports/{userId}/{date}/{uuid}.zip + // - metadata.json + // - /content + // - {slug}.html + // - /highlights + // - {slug}.md + const dateStr = new Date().toISOString() + const fileUuid = uuidv4() + const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip` + + const file = createGCSFile(GCS_BUCKET, fullPath) - await axios.post(env.queue.exportTaskHandlerUrl, undefined, { - headers: { - OmnivoreAuthorizationHeader: token, + // Create a write stream + const writeStream = file.createWriteStream({ + metadata: { + contentType: 'application/zip', }, }) + + // Handle any errors in the streams + writeStream.on('error', (err) => { + console.error('Error writing to GCS:', err) + }) + + writeStream.on('finish', () => { + console.log('File successfully written to GCS') + }) + + // Initialize archiver for zipping files + const archive = archiver('zip', { + zlib: { level: 9 }, // Compression level + }) + + // Handle any archiver errors + archive.on('error', (err) => { + console.error('Error zipping files:', err) + throw err + }) + + // Pipe the archiver output to the write stream + archive.pipe(writeStream) + + try { + // fetch data from the database + const batchSize = 20 + let cursor = 0 + let hasNext = false + do { + const items = await searchLibraryItems( + { + from: cursor, + size: batchSize, + query: 'in:all', + includeContent: false, + includeDeleted: false, + includePending: false, + }, + userId + ) + + const size = items.length + // write data to the csv file + if (size > 0) { + cursor = await uploadToBucket(userId, items, cursor, size, archive) + + hasNext = size === batchSize + } + } while (hasNext) + } catch (err) { + console.error('Error exporting data:', err) + } finally { + // Finalize the archive + await archive.finalize() + } + + // generate a temporary signed url for the zip file + const signedUrl = await generateDownloadSignedUrl(fullPath, { + expires: 60 * 60 * 24, // 24 hours + bucketName: GCS_BUCKET, + }) + + const job = await sendExportCompletedEmail(userId, signedUrl) + if (!job) { + logger.error('failed to send export completed email', { + userId, + signedUrl, + }) + + throw new Error('failed to send export completed email') + } } diff --git a/packages/api/src/routers/export_router.ts b/packages/api/src/routers/export_router.ts index 495d1a3fc5..2b5edb3599 100644 --- a/packages/api/src/routers/export_router.ts +++ b/packages/api/src/routers/export_router.ts @@ -1,5 +1,6 @@ import cors from 'cors' import express, { Router } from 'express' +import { jobStateToTaskState } from '../queue-processor' import { getClaimsByToken, getTokenByRequest } from '../utils/auth' import { corsConfig } from '../utils/corsConfig' import { queueExportJob } from '../utils/createTask' @@ -26,7 +27,7 @@ export function exportRouter() { try { const job = await queueExportJob(userId) - if (!job) { + if (!job || !job.id) { logger.error('Failed to queue export job', { userId, }) @@ -40,8 +41,10 @@ export function exportRouter() { jobId: job.id, }) + const jobState = await job.getState() res.send({ jobId: job.id, + state: jobStateToTaskState(jobState), }) } catch (error) { logger.error('Error exporting all items', { diff --git a/packages/api/src/services/send_emails.ts b/packages/api/src/services/send_emails.ts index f1138cc3a3..3d814f7082 100644 --- a/packages/api/src/services/send_emails.ts +++ b/packages/api/src/services/send_emails.ts @@ -113,3 +113,14 @@ export const sendPasswordResetEmail = async (user: { return !!result } + +export const sendExportCompletedEmail = async ( + userId: string, + urlToDownload: string +) => { + return enqueueSendEmail({ + userId, + subject: 'Your Omnivore export is ready', + html: `

Your export is ready. You can download it from the following link: ${urlToDownload}

`, + }) +} diff --git a/packages/api/src/utils/parser.ts b/packages/api/src/utils/parser.ts index 51aa15d06c..3acd068862 100644 --- a/packages/api/src/utils/parser.ts +++ b/packages/api/src/utils/parser.ts @@ -20,7 +20,7 @@ import showdown from 'showdown' import { ILike } from 'typeorm' import { promisify } from 'util' import { v4 as uuid } from 'uuid' -import { Highlight } from '../entity/highlight' +import { Highlight, HighlightType } from '../entity/highlight' import { StatusType } from '../entity/user' import { env } from '../env' import { PageType, PreparedDocumentInput } from '../generated/graphql' @@ -865,3 +865,21 @@ export const parseFeed = async ( return null } } + +const formatHighlightQuote = (quote: string): string => { + // replace all empty lines with blockquote '>' to preserve paragraphs + return quote.replace(/^(?=\n)$|^\s*?\n/gm, '> ') +} + +export const highlightToMarkdown = (highlight: Highlight): string => { + if (highlight.highlightType === HighlightType.Highlight && highlight.quote) { + const quote = formatHighlightQuote(highlight.quote) + const labels = highlight.labels?.map((label) => `#${label.name}`).join(' ') + const note = highlight.annotation + return `> ${quote} ${labels ? `\n\n${labels}` : ''}${ + note ? `\n\n${note}` : '' + }` + } + + return '' +} diff --git a/packages/api/src/utils/uploads.ts b/packages/api/src/utils/uploads.ts index 801f4f623c..c4b71b5cab 100644 --- a/packages/api/src/utils/uploads.ts +++ b/packages/api/src/utils/uploads.ts @@ -67,17 +67,17 @@ export const generateUploadSignedUrl = async ( export const generateDownloadSignedUrl = async ( filePathName: string, config?: { + bucketName?: string expires?: number } ): Promise => { const options: GetSignedUrlConfig = { version: 'v4', action: 'read', - expires: Date.now() + 240 * 60 * 1000, // four hours - ...config, + expires: config?.expires ?? Date.now() + 240 * 60 * 1000, // four hours } const [url] = await storage - .bucket(bucketName) + .bucket(config?.bucketName || bucketName) .file(filePathName) .getSignedUrl(options) logger.info(`generating download signed url: ${url}`) @@ -116,8 +116,11 @@ export const uploadToBucket = async ( .save(data, { timeout: 30000, ...options }) // default timeout 30s } -export const createGCSFile = (filename: string): File => { - return storage.bucket(bucketName).file(filename) +export const createGCSFile = ( + filename: string, + selectedBucket = bucketName +): File => { + return storage.bucket(selectedBucket).file(filename) } export const downloadFromUrl = async ( diff --git a/packages/export-handler/package.json b/packages/export-handler/package.json index 42d22ba733..b9c546bdb7 100644 --- a/packages/export-handler/package.json +++ b/packages/export-handler/package.json @@ -30,7 +30,6 @@ "@omnivore/utils": "1.0.0", "@sentry/serverless": "^7.77.0", "archiver": "^7.0.1", - "csv-stringify": "^6.4.0", "dotenv": "^16.0.1", "jsonwebtoken": "^8.5.1", "nodemon": "^2.0.15", From 48b3f736f0fc0606b5ae6cfb55d356fa6541d71a Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 27 Aug 2024 14:57:31 +0800 Subject: [PATCH 4/8] wait for write stream to finish --- packages/api/src/jobs/export.ts | 32 +++++++++++++++++++---------- packages/api/src/queue-processor.ts | 3 +++ packages/api/src/server.ts | 2 ++ packages/api/src/utils/uploads.ts | 8 ++++++++ 4 files changed, 34 insertions(+), 11 deletions(-) diff --git a/packages/api/src/jobs/export.ts b/packages/api/src/jobs/export.ts index 3c78844110..400d7ecbba 100644 --- a/packages/api/src/jobs/export.ts +++ b/packages/api/src/jobs/export.ts @@ -7,14 +7,13 @@ import { sendExportCompletedEmail } from '../services/send_emails' import { findActiveUser } from '../services/user' import { logger } from '../utils/logger' import { highlightToMarkdown } from '../utils/parser' -import { createGCSFile, generateDownloadSignedUrl } from '../utils/uploads' +import { createGCSFile } from '../utils/uploads' export interface ExportJobData { userId: string } export const EXPORT_JOB_NAME = 'export' -const GCS_BUCKET = 'omnivore-export' const uploadToBucket = async ( userId: string, @@ -92,7 +91,7 @@ export const exportJob = async (jobData: ExportJobData) => { const fileUuid = uuidv4() const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip` - const file = createGCSFile(GCS_BUCKET, fullPath) + const file = createGCSFile(fullPath) // Create a write stream const writeStream = file.createWriteStream({ @@ -103,11 +102,11 @@ export const exportJob = async (jobData: ExportJobData) => { // Handle any errors in the streams writeStream.on('error', (err) => { - console.error('Error writing to GCS:', err) + logger.error('Error writing to GCS:', err) }) writeStream.on('finish', () => { - console.log('File successfully written to GCS') + logger.info('File successfully written to GCS') }) // Initialize archiver for zipping files @@ -117,7 +116,6 @@ export const exportJob = async (jobData: ExportJobData) => { // Handle any archiver errors archive.on('error', (err) => { - console.error('Error zipping files:', err) throw err }) @@ -135,7 +133,7 @@ export const exportJob = async (jobData: ExportJobData) => { from: cursor, size: batchSize, query: 'in:all', - includeContent: false, + includeContent: true, includeDeleted: false, includePending: false, }, @@ -151,16 +149,28 @@ export const exportJob = async (jobData: ExportJobData) => { } } while (hasNext) } catch (err) { - console.error('Error exporting data:', err) + logger.error('Error exporting data:', err) + + throw err } finally { // Finalize the archive await archive.finalize() } + // Ensure that the writeStream has finished + await new Promise((resolve, reject) => { + writeStream.on('finish', resolve) + writeStream.on('error', reject) + }) + + logger.info('export completed', { + userId, + }) + // generate a temporary signed url for the zip file - const signedUrl = await generateDownloadSignedUrl(fullPath, { - expires: 60 * 60 * 24, // 24 hours - bucketName: GCS_BUCKET, + const [signedUrl] = await file.getSignedUrl({ + action: 'read', + expires: Date.now() + 86400 * 1000, // 15 minutes }) const job = await sendExportCompletedEmail(userId, signedUrl) diff --git a/packages/api/src/queue-processor.ts b/packages/api/src/queue-processor.ts index c0030300f7..cdc7e98174 100644 --- a/packages/api/src/queue-processor.ts +++ b/packages/api/src/queue-processor.ts @@ -35,6 +35,7 @@ import { expireFoldersJob, EXPIRE_FOLDERS_JOB_NAME, } from './jobs/expire_folders' +import { exportJob, EXPORT_JOB_NAME } from './jobs/export' import { findThumbnail, THUMBNAIL_JOB } from './jobs/find_thumbnail' import { generatePreviewContent, @@ -223,6 +224,8 @@ export const createWorker = (connection: ConnectionOptions) => return pruneTrashJob(job.data) case EXPIRE_FOLDERS_JOB_NAME: return expireFoldersJob() + case EXPORT_JOB_NAME: + return exportJob(job.data) default: logger.warning(`[queue-processor] unhandled job: ${job.name}`) } diff --git a/packages/api/src/server.ts b/packages/api/src/server.ts index 069f1f9cf3..09ba829469 100755 --- a/packages/api/src/server.ts +++ b/packages/api/src/server.ts @@ -24,6 +24,7 @@ import { mobileAuthRouter } from './routers/auth/mobile/mobile_auth_router' import { contentRouter } from './routers/content_router' import { digestRouter } from './routers/digest_router' import { explainRouter } from './routers/explain_router' +import { exportRouter } from './routers/export_router' import { integrationRouter } from './routers/integration_router' import { localDebugRouter } from './routers/local_debug_router' import { notificationRouter } from './routers/notification_router' @@ -106,6 +107,7 @@ export const createApp = (): Express => { app.use('/api/tasks', taskRouter()) app.use('/api/digest', digestRouter()) app.use('/api/content', contentRouter()) + app.use('/api/export', exportRouter()) app.use('/svc/pubsub/content', contentServiceRouter()) app.use('/svc/pubsub/links', linkServiceRouter()) diff --git a/packages/api/src/utils/uploads.ts b/packages/api/src/utils/uploads.ts index c4b71b5cab..b397451726 100644 --- a/packages/api/src/utils/uploads.ts +++ b/packages/api/src/utils/uploads.ts @@ -64,6 +64,14 @@ export const generateUploadSignedUrl = async ( return url } +const createSignedUrl = async (file: File): Promise => { + const signedUrl = await file.getSignedUrl({ + action: 'read', + expires: Date.now() + 15 * 60 * 1000, // 15 minutes + }) + return signedUrl[0] +} + export const generateDownloadSignedUrl = async ( filePathName: string, config?: { From 0e523d8c7312d6d10297e0e6f3c3d449329e5592 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 27 Aug 2024 15:33:04 +0800 Subject: [PATCH 5/8] upload readable content before exporting to cache the content --- packages/api/src/jobs/export.ts | 65 +++++++++++++++++++++++++++---- packages/api/src/utils/uploads.ts | 8 ---- 2 files changed, 57 insertions(+), 16 deletions(-) diff --git a/packages/api/src/jobs/export.ts b/packages/api/src/jobs/export.ts index 400d7ecbba..666af19818 100644 --- a/packages/api/src/jobs/export.ts +++ b/packages/api/src/jobs/export.ts @@ -2,12 +2,15 @@ import archiver, { Archiver } from 'archiver' import { v4 as uuidv4 } from 'uuid' import { LibraryItem } from '../entity/library_item' import { findHighlightsByLibraryItemId } from '../services/highlights' -import { searchLibraryItems } from '../services/library_item' +import { + findLibraryItemById, + searchLibraryItems, +} from '../services/library_item' import { sendExportCompletedEmail } from '../services/send_emails' import { findActiveUser } from '../services/user' import { logger } from '../utils/logger' import { highlightToMarkdown } from '../utils/parser' -import { createGCSFile } from '../utils/uploads' +import { contentFilePath, createGCSFile } from '../utils/uploads' export interface ExportJobData { userId: string @@ -15,6 +18,50 @@ export interface ExportJobData { export const EXPORT_JOB_NAME = 'export' +const uploadContent = async ( + userId: string, + libraryItem: LibraryItem, + archive: Archiver +) => { + const filePath = contentFilePath({ + userId, + libraryItemId: libraryItem.id, + format: 'readable', + savedAt: libraryItem.savedAt, + updatedAt: libraryItem.updatedAt, + }) + + const file = createGCSFile(filePath) + + // check if file is already uploaded + const [exists] = await file.exists() + if (!exists) { + logger.info(`File not found: ${filePath}`) + + // upload the content to GCS + const item = await findLibraryItemById(libraryItem.id, userId, { + select: ['readableContent'], + }) + if (!item?.readableContent) { + logger.error('Item not found', { + userId, + libraryItemId: libraryItem.id, + }) + return + } + + await file.save(item.readableContent, { + contentType: 'text/html', + private: true, + }) + } + + // append the existing file to the archive + archive.append(file.createReadStream(), { + name: `content/${libraryItem.slug}.html`, + }) +} + const uploadToBucket = async ( userId: string, items: Array, @@ -46,11 +93,8 @@ const uploadToBucket = async ( // Loop through the items and add files to /content and /highlights directories for (const item of items) { - const slug = item.slug // Add content files to /content - archive.append(item.readableContent, { - name: `content/${slug}.html`, - }) + await uploadContent(userId, item, archive) if (item.highlightAnnotations?.length) { const highlights = await findHighlightsByLibraryItemId(item.id, userId) @@ -58,7 +102,7 @@ const uploadToBucket = async ( // Add highlight files to /highlights archive.append(markdown, { - name: `highlights/${slug}.md`, + name: `highlights/${item.slug}.md`, }) } } @@ -133,7 +177,7 @@ export const exportJob = async (jobData: ExportJobData) => { from: cursor, size: batchSize, query: 'in:all', - includeContent: true, + includeContent: false, includeDeleted: false, includePending: false, }, @@ -173,6 +217,11 @@ export const exportJob = async (jobData: ExportJobData) => { expires: Date.now() + 86400 * 1000, // 15 minutes }) + logger.info('signed url for export:', { + userId, + signedUrl, + }) + const job = await sendExportCompletedEmail(userId, signedUrl) if (!job) { logger.error('failed to send export completed email', { diff --git a/packages/api/src/utils/uploads.ts b/packages/api/src/utils/uploads.ts index b397451726..c4b71b5cab 100644 --- a/packages/api/src/utils/uploads.ts +++ b/packages/api/src/utils/uploads.ts @@ -64,14 +64,6 @@ export const generateUploadSignedUrl = async ( return url } -const createSignedUrl = async (file: File): Promise => { - const signedUrl = await file.getSignedUrl({ - action: 'read', - expires: Date.now() + 15 * 60 * 1000, // 15 minutes - }) - return signedUrl[0] -} - export const generateDownloadSignedUrl = async ( filePathName: string, config?: { From f77ded31e1fa7f61a1de3a7c25c98f60aad04577 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 27 Aug 2024 17:48:05 +0800 Subject: [PATCH 6/8] save export tasks in db and check db before starting export --- packages/api/src/entity/export.ts | 51 ++++ packages/api/src/jobs/export.ts | 263 +++++++++++------- packages/api/src/routers/export_router.ts | 24 +- packages/api/src/services/export.ts | 34 +++ packages/api/src/services/send_emails.ts | 34 ++- .../0186.do.create_export_table.sql | 21 ++ .../0186.undo.create_export_table.sql | 9 + 7 files changed, 325 insertions(+), 111 deletions(-) create mode 100644 packages/api/src/entity/export.ts create mode 100644 packages/api/src/services/export.ts create mode 100755 packages/db/migrations/0186.do.create_export_table.sql create mode 100755 packages/db/migrations/0186.undo.create_export_table.sql diff --git a/packages/api/src/entity/export.ts b/packages/api/src/entity/export.ts new file mode 100644 index 0000000000..defb0edd22 --- /dev/null +++ b/packages/api/src/entity/export.ts @@ -0,0 +1,51 @@ +/* +CREATE TABLE omnivore.export ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(), + user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE, + task_id TEXT NOT NULL, + state TEXT NOT NULL, + total_items INT DEFAULT 0, + processed_items INT DEFAULT 0, + signed_url TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); +*/ + +import { + Column, + CreateDateColumn, + Entity, + PrimaryGeneratedColumn, + UpdateDateColumn, +} from 'typeorm' + +@Entity() +export class Export { + @PrimaryGeneratedColumn('uuid') + id!: string + + @Column('uuid') + userId!: string + + @Column('text') + taskId!: string + + @Column('text') + state!: string + + @Column('int', { default: 0 }) + totalItems!: number + + @Column('int', { default: 0 }) + processedItems!: number + + @Column('text', { nullable: true }) + signedUrl?: string + + @CreateDateColumn({ type: 'timestamptz' }) + createdAt!: Date + + @UpdateDateColumn({ type: 'timestamptz' }) + updatedAt!: Date +} diff --git a/packages/api/src/jobs/export.ts b/packages/api/src/jobs/export.ts index 666af19818..41b15a84e0 100644 --- a/packages/api/src/jobs/export.ts +++ b/packages/api/src/jobs/export.ts @@ -1,12 +1,14 @@ import archiver, { Archiver } from 'archiver' import { v4 as uuidv4 } from 'uuid' -import { LibraryItem } from '../entity/library_item' +import { LibraryItem, LibraryItemState } from '../entity/library_item' +import { TaskState } from '../generated/graphql' +import { findExportById, saveExport } from '../services/export' import { findHighlightsByLibraryItemId } from '../services/highlights' import { findLibraryItemById, searchLibraryItems, } from '../services/library_item' -import { sendExportCompletedEmail } from '../services/send_emails' +import { sendExportJobEmail } from '../services/send_emails' import { findActiveUser } from '../services/user' import { logger } from '../utils/logger' import { highlightToMarkdown } from '../utils/parser' @@ -14,10 +16,23 @@ import { contentFilePath, createGCSFile } from '../utils/uploads' export interface ExportJobData { userId: string + exportId: string } export const EXPORT_JOB_NAME = 'export' +const itemStateMappping = (state: LibraryItemState) => { + switch (state) { + case LibraryItemState.Archived: + return 'Archived' + case LibraryItemState.ContentNotFetched: + case LibraryItemState.Succeeded: + return 'Active' + default: + return 'Unknown' + } +} + const uploadContent = async ( userId: string, libraryItem: LibraryItem, @@ -77,7 +92,7 @@ const uploadToBucket = async ( description: item.description, author: item.author, url: item.originalUrl, - state: item.state, + state: itemStateMappping(item.state), readingProgress: item.readingProgressBottomPercent, thumbnail: item.thumbnail, labels: item.labelNames, @@ -111,124 +126,162 @@ const uploadToBucket = async ( } export const exportJob = async (jobData: ExportJobData) => { - const { userId } = jobData - const user = await findActiveUser(userId) - if (!user) { - logger.error('user not found', { - userId, - }) - return - } + const { userId, exportId } = jobData - logger.info('exporting all items...', { - userId, - }) + try { + const user = await findActiveUser(userId) + if (!user) { + logger.error('user not found', { + userId, + }) + return + } - // export data as a zip file: - // exports/{userId}/{date}/{uuid}.zip - // - metadata.json - // - /content - // - {slug}.html - // - /highlights - // - {slug}.md - const dateStr = new Date().toISOString() - const fileUuid = uuidv4() - const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip` - - const file = createGCSFile(fullPath) - - // Create a write stream - const writeStream = file.createWriteStream({ - metadata: { - contentType: 'application/zip', - }, - }) + const exportTask = await findExportById(exportId, userId) + if (!exportTask) { + logger.error('export task not found', { + userId, + exportId, + }) + return + } - // Handle any errors in the streams - writeStream.on('error', (err) => { - logger.error('Error writing to GCS:', err) - }) + await saveExport(userId, { + id: exportId, + state: TaskState.Running, + }) - writeStream.on('finish', () => { - logger.info('File successfully written to GCS') - }) + const emailJob = await sendExportJobEmail(userId, 'started') + if (!emailJob) { + logger.error('Failed to send export job email', { + userId, + }) + return + } - // Initialize archiver for zipping files - const archive = archiver('zip', { - zlib: { level: 9 }, // Compression level - }) + logger.info('exporting all items...', { + userId, + }) - // Handle any archiver errors - archive.on('error', (err) => { - throw err - }) + // export data as a zip file: + // exports/{userId}/{date}/{uuid}.zip + // - metadata.json + // - /content + // - {slug}.html + // - /highlights + // - {slug}.md + const dateStr = new Date().toISOString() + const fileUuid = uuidv4() + const fullPath = `exports/${userId}/${dateStr}/${fileUuid}.zip` + + const file = createGCSFile(fullPath) + + // Create a write stream + const writeStream = file.createWriteStream({ + metadata: { + contentType: 'application/zip', + }, + }) - // Pipe the archiver output to the write stream - archive.pipe(writeStream) + // Handle any errors in the streams + writeStream.on('error', (err) => { + logger.error('Error writing to GCS:', err) + }) - try { - // fetch data from the database - const batchSize = 20 - let cursor = 0 - let hasNext = false - do { - const items = await searchLibraryItems( - { - from: cursor, - size: batchSize, - query: 'in:all', - includeContent: false, - includeDeleted: false, - includePending: false, - }, - userId - ) - - const size = items.length - // write data to the csv file - if (size > 0) { - cursor = await uploadToBucket(userId, items, cursor, size, archive) - - hasNext = size === batchSize - } - } while (hasNext) - } catch (err) { - logger.error('Error exporting data:', err) - - throw err - } finally { - // Finalize the archive - await archive.finalize() - } + writeStream.on('finish', () => { + logger.info('File successfully written to GCS') + }) - // Ensure that the writeStream has finished - await new Promise((resolve, reject) => { - writeStream.on('finish', resolve) - writeStream.on('error', reject) - }) + // Initialize archiver for zipping files + const archive = archiver('zip', { + zlib: { level: 9 }, // Compression level + }) - logger.info('export completed', { - userId, - }) + // Handle any archiver errors + archive.on('error', (err) => { + throw err + }) - // generate a temporary signed url for the zip file - const [signedUrl] = await file.getSignedUrl({ - action: 'read', - expires: Date.now() + 86400 * 1000, // 15 minutes - }) + // Pipe the archiver output to the write stream + archive.pipe(writeStream) + + try { + // fetch data from the database + const batchSize = 20 + let cursor = 0 + let hasNext = false + do { + const items = await searchLibraryItems( + { + from: cursor, + size: batchSize, + query: 'in:all', + includeContent: false, + includeDeleted: false, + includePending: false, + }, + userId + ) + + const size = items.length + // write data to the csv file + if (size > 0) { + cursor = await uploadToBucket(userId, items, cursor, size, archive) + + hasNext = size === batchSize + } + } while (hasNext) + } finally { + // Finalize the archive + await archive.finalize() + } - logger.info('signed url for export:', { - userId, - signedUrl, - }) + // Ensure that the writeStream has finished + await new Promise((resolve, reject) => { + writeStream.on('finish', resolve) + writeStream.on('error', reject) + }) + + logger.info('export completed', { + userId, + }) + + // generate a temporary signed url for the zip file + const [signedUrl] = await file.getSignedUrl({ + action: 'read', + expires: Date.now() + 86400 * 1000, // 15 minutes + }) - const job = await sendExportCompletedEmail(userId, signedUrl) - if (!job) { - logger.error('failed to send export completed email', { + logger.info('signed url for export:', { userId, signedUrl, }) - throw new Error('failed to send export completed email') + await saveExport(userId, { + id: exportId, + state: TaskState.Succeeded, + }) + + const job = await sendExportJobEmail(userId, 'completed', signedUrl) + if (!job) { + logger.error('failed to send export completed email', { + userId, + signedUrl, + }) + } + } catch (error) { + logger.error('export failed', error) + + await saveExport(userId, { + id: exportId, + state: TaskState.Failed, + }) + + const job = await sendExportJobEmail(userId, 'failed') + if (!job) { + logger.error('failed to send export failed email', { + userId, + }) + } } } diff --git a/packages/api/src/routers/export_router.ts b/packages/api/src/routers/export_router.ts index 2b5edb3599..03f751ed46 100644 --- a/packages/api/src/routers/export_router.ts +++ b/packages/api/src/routers/export_router.ts @@ -1,6 +1,8 @@ import cors from 'cors' import express, { Router } from 'express' import { jobStateToTaskState } from '../queue-processor' +import { countExportsWithin24Hours, saveExport } from '../services/export' +import { sendExportJobEmail } from '../services/send_emails' import { getClaimsByToken, getTokenByRequest } from '../utils/auth' import { corsConfig } from '../utils/corsConfig' import { queueExportJob } from '../utils/createTask' @@ -25,6 +27,17 @@ export function exportRouter() { const userId = claims.uid try { + const exportsWithin24Hours = await countExportsWithin24Hours(userId) + if (exportsWithin24Hours >= 3) { + logger.error('User has reached the limit of exports within 24 hours', { + userId, + exportsWithin24Hours, + }) + return res.status(400).send({ + error: 'EXPORT_LIMIT_REACHED', + }) + } + const job = await queueExportJob(userId) if (!job || !job.id) { @@ -41,10 +54,17 @@ export function exportRouter() { jobId: job.id, }) + const taskId = job.id const jobState = await job.getState() + const state = jobStateToTaskState(jobState) + await saveExport(userId, { + taskId: job.id, + state, + }) + res.send({ - jobId: job.id, - state: jobStateToTaskState(jobState), + taskId, + state, }) } catch (error) { logger.error('Error exporting all items', { diff --git a/packages/api/src/services/export.ts b/packages/api/src/services/export.ts new file mode 100644 index 0000000000..f6ea147c49 --- /dev/null +++ b/packages/api/src/services/export.ts @@ -0,0 +1,34 @@ +import { In, MoreThan } from 'typeorm' +import { Export } from '../entity/export' +import { TaskState } from '../generated/graphql' +import { getRepository } from '../repository' + +export const saveExport = async ( + userId: string, + exportData: Partial +): Promise => { + return getRepository(Export).save({ + userId, + ...exportData, + }) +} + +export const countExportsWithin24Hours = async ( + userId: string +): Promise => { + return getRepository(Export).countBy({ + userId, + createdAt: MoreThan(new Date(Date.now() - 24 * 60 * 60 * 1000)), + state: In([TaskState.Pending, TaskState.Running, TaskState.Succeeded]), + }) +} + +export const findExportById = async ( + id: string, + userId: string +): Promise => { + return getRepository(Export).findOneBy({ + id, + userId, + }) +} diff --git a/packages/api/src/services/send_emails.ts b/packages/api/src/services/send_emails.ts index 3d814f7082..ed5c64307b 100644 --- a/packages/api/src/services/send_emails.ts +++ b/packages/api/src/services/send_emails.ts @@ -114,13 +114,39 @@ export const sendPasswordResetEmail = async (user: { return !!result } -export const sendExportCompletedEmail = async ( +export const sendExportJobEmail = async ( userId: string, - urlToDownload: string + state: 'completed' | 'failed' | 'started', + urlToDownload?: string ) => { + let subject = '' + let html = '' + + switch (state) { + case 'completed': + if (!urlToDownload) { + throw new Error('urlToDownload is required') + } + + subject = 'Your Omnivore export is ready' + html = `

Your export is ready. You can download it from the following link: ${urlToDownload}

` + break + case 'failed': + subject = 'Your Omnivore export failed' + html = '

Your export failed. Please try again later.

' + break + case 'started': + subject = 'Your Omnivore export has started' + html = + '

Your export has started. You will receive an email once it is completed.

' + break + default: + throw new Error('Invalid state') + } + return enqueueSendEmail({ userId, - subject: 'Your Omnivore export is ready', - html: `

Your export is ready. You can download it from the following link: ${urlToDownload}

`, + subject, + html, }) } diff --git a/packages/db/migrations/0186.do.create_export_table.sql b/packages/db/migrations/0186.do.create_export_table.sql new file mode 100755 index 0000000000..8770f494ea --- /dev/null +++ b/packages/db/migrations/0186.do.create_export_table.sql @@ -0,0 +1,21 @@ +-- Type: DO +-- Name: create_export_table +-- Description: Create a table to store the export information + +BEGIN; + +CREATE TABLE omnivore.export ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(), + user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE, + task_id TEXT NOT NULL, + state TEXT NOT NULL, + total_items INT DEFAULT 0, + processed_items INT DEFAULT 0, + signed_url TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX export_user_id_idx ON omnivore.export(user_id); + +COMMIT; diff --git a/packages/db/migrations/0186.undo.create_export_table.sql b/packages/db/migrations/0186.undo.create_export_table.sql new file mode 100755 index 0000000000..b481b771e2 --- /dev/null +++ b/packages/db/migrations/0186.undo.create_export_table.sql @@ -0,0 +1,9 @@ +-- Type: UNDO +-- Name: create_export_table +-- Description: Create a table to store the export information + +BEGIN; + +DROP TABLE omnivore.export; + +COMMIT; From 82943921b12e3a155fdc3c4b38bcb5e1a4ceb9f7 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Tue, 27 Aug 2024 18:05:06 +0800 Subject: [PATCH 7/8] fix table permission --- packages/api/src/entity/export.ts | 4 ++-- packages/api/src/routers/export_router.ts | 10 ++++++++-- packages/api/src/services/export.ts | 2 +- packages/api/src/utils/createTask.ts | 4 ++-- packages/db/migrations/0186.do.create_export_table.sql | 4 +++- 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/packages/api/src/entity/export.ts b/packages/api/src/entity/export.ts index defb0edd22..a1df58d2f3 100644 --- a/packages/api/src/entity/export.ts +++ b/packages/api/src/entity/export.ts @@ -28,8 +28,8 @@ export class Export { @Column('uuid') userId!: string - @Column('text') - taskId!: string + @Column('text', { nullable: true }) + taskId?: string @Column('text') state!: string diff --git a/packages/api/src/routers/export_router.ts b/packages/api/src/routers/export_router.ts index 03f751ed46..7b2a82ba8c 100644 --- a/packages/api/src/routers/export_router.ts +++ b/packages/api/src/routers/export_router.ts @@ -1,5 +1,6 @@ import cors from 'cors' import express, { Router } from 'express' +import { TaskState } from '../generated/graphql' import { jobStateToTaskState } from '../queue-processor' import { countExportsWithin24Hours, saveExport } from '../services/export' import { sendExportJobEmail } from '../services/send_emails' @@ -38,7 +39,11 @@ export function exportRouter() { }) } - const job = await queueExportJob(userId) + const exportTask = await saveExport(userId, { + state: TaskState.Pending, + }) + + const job = await queueExportJob(userId, exportTask.id) if (!job || !job.id) { logger.error('Failed to queue export job', { @@ -58,7 +63,8 @@ export function exportRouter() { const jobState = await job.getState() const state = jobStateToTaskState(jobState) await saveExport(userId, { - taskId: job.id, + id: exportTask.id, + taskId, state, }) diff --git a/packages/api/src/services/export.ts b/packages/api/src/services/export.ts index f6ea147c49..f1d71cfe41 100644 --- a/packages/api/src/services/export.ts +++ b/packages/api/src/services/export.ts @@ -8,8 +8,8 @@ export const saveExport = async ( exportData: Partial ): Promise => { return getRepository(Export).save({ - userId, ...exportData, + userId, }) } diff --git a/packages/api/src/utils/createTask.ts b/packages/api/src/utils/createTask.ts index 4b704d9c0e..0998c2231a 100644 --- a/packages/api/src/utils/createTask.ts +++ b/packages/api/src/utils/createTask.ts @@ -1073,7 +1073,7 @@ export const enqueueExpireFoldersJob = async () => { ) } -export const queueExportJob = async (userId: string) => { +export const queueExportJob = async (userId: string, exportId: string) => { const queue = await getQueue() if (!queue) { return undefined @@ -1081,7 +1081,7 @@ export const queueExportJob = async (userId: string) => { return queue.add( EXPORT_JOB_NAME, - { userId }, + { userId, exportId }, { jobId: `${EXPORT_JOB_NAME}_${userId}_${JOB_VERSION}`, removeOnComplete: true, diff --git a/packages/db/migrations/0186.do.create_export_table.sql b/packages/db/migrations/0186.do.create_export_table.sql index 8770f494ea..e1b323a32f 100755 --- a/packages/db/migrations/0186.do.create_export_table.sql +++ b/packages/db/migrations/0186.do.create_export_table.sql @@ -7,10 +7,10 @@ BEGIN; CREATE TABLE omnivore.export ( id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(), user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE, - task_id TEXT NOT NULL, state TEXT NOT NULL, total_items INT DEFAULT 0, processed_items INT DEFAULT 0, + task_id TEXT, signed_url TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP @@ -18,4 +18,6 @@ CREATE TABLE omnivore.export ( CREATE INDEX export_user_id_idx ON omnivore.export(user_id); +GRANT SELECT, INSERT, UPDATE, DELETE ON omnivore.export TO omnivore_user; + COMMIT; From 3b9dd90cce55262a3e2aca99d449e3472473a4a0 Mon Sep 17 00:00:00 2001 From: Hongbo Wu Date: Thu, 29 Aug 2024 12:42:42 +0800 Subject: [PATCH 8/8] remove comments --- packages/api/src/entity/export.ts | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/packages/api/src/entity/export.ts b/packages/api/src/entity/export.ts index a1df58d2f3..230248abf6 100644 --- a/packages/api/src/entity/export.ts +++ b/packages/api/src/entity/export.ts @@ -1,17 +1,3 @@ -/* -CREATE TABLE omnivore.export ( - id UUID PRIMARY KEY DEFAULT uuid_generate_v1mc(), - user_id UUID NOT NULL REFERENCES omnivore.user(id) ON DELETE CASCADE, - task_id TEXT NOT NULL, - state TEXT NOT NULL, - total_items INT DEFAULT 0, - processed_items INT DEFAULT 0, - signed_url TEXT, - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP -); -*/ - import { Column, CreateDateColumn,