diff --git a/README.md b/README.md index 4afce7599..9dafa7e57 100644 --- a/README.md +++ b/README.md @@ -233,8 +233,7 @@ See [config.js](./config.js): - `apiKey`: [Algolia](https://www.algolia.com/) apiKey - **required** - `appId`: [Algolia](https://www.algolia.com/) appId - _default `OFCNCOG2CU`_ - `indexName`: [Algolia](https://www.algolia.com/) indexName - _default `npm-search`_ -- `bootstrapConcurrency`: How many docs to grab from npm registry at once in the bootstrap phase - _default `100`_ -- `replicateConcurrency`: How many changes to grab from npm registry at once in the replicate phase - _default `10`_ +- `bootstrapConcurrency`: How many docs to grab from npm registry at once in the bootstrap phase - _default `25`_ - `seq`: npm registry first [change sequence](http://docs.couchdb.org/en/2.0.0/json-structure.html#changes-information-for-a-database) to start replication. In normal operations you should never have to use this. - _default `0`_ - `npmRegistryEndpoint`: npm registry endpoint to replicate from - _default `https://replicate.npmjs.com/registry`_ diff --git a/src/bootstrap.ts b/src/bootstrap.ts index ec0d2fa23..b074a70a8 100644 --- a/src/bootstrap.ts +++ b/src/bootstrap.ts @@ -107,12 +107,10 @@ async function loop( const newLastId = res.rows[res.rows.length - 1].id; - const saved = await saveDocs({ docs: res.rows, index: bootstrapIndex }); + await saveDocs({ docs: res.rows, index: bootstrapIndex }); await stateManager.save({ bootstrapLastId: newLastId, }); - log.info(` - saved ${saved} packages`); - await logProgress(res.offset, res.rows.length); datadog.timing('loop', Date.now() - start); diff --git a/src/config.ts b/src/config.ts index ddaea2f53..7ee8b1d3d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -163,7 +163,6 @@ export const config = { apiKey: '', indexName: 'npm-search', bootstrapIndexName: 'npm-search-bootstrap', - replicateConcurrency: 1, bootstrapConcurrency: 25, timeToRedoBootstrap: ms('2 weeks'), seq: undefined, diff --git a/src/jsDelivr/__test__/index.test.ts b/src/jsDelivr/__test__/index.test.ts index ed6c58194..79fd84473 100644 --- a/src/jsDelivr/__test__/index.test.ts +++ b/src/jsDelivr/__test__/index.test.ts @@ -6,6 +6,7 @@ jest.mock('../../utils/log', () => { log: { info: jest.fn(), warn: jest.fn(), + error: jest.fn(), }, }; }); @@ -85,8 +86,8 @@ describe('files', () => { version: '3.33.0', }); expect(files).toEqual([]); - expect(log.warn.mock.calls[0][0].message).toMatchInlineSnapshot( - `"Response code 404 (Not Found)"` + expect(log.error.mock.calls[0][0]).toEqual( + 'Failed to fetch https://data.jsdelivr.com/v1/package/npm/thispackagedoesnotexist@3.33.0/flat' ); }); }); @@ -111,8 +112,8 @@ describe('files', () => { }, ]); expect(files).toMatchSnapshot(); - expect(log.warn.mock.calls[0][0].message).toMatchInlineSnapshot( - `"Response code 404 (Not Found)"` + expect(log.error.mock.calls[0][0]).toEqual( + 'Failed to fetch https://data.jsdelivr.com/v1/package/npm/thispackagedoesnotexist@3.33.0/flat' ); }); }); diff --git a/src/jsDelivr/index.ts b/src/jsDelivr/index.ts index 2f20a3301..d4648e434 100644 --- a/src/jsDelivr/index.ts +++ b/src/jsDelivr/index.ts @@ -26,7 +26,7 @@ export async function loadHits(): Promise { hits.set(pkg.name, pkg.hits); }); } catch (e) { - log.error(e); + log.error('Failed to fetch', e); } datadog.timing('jsdelivr.loadHits', Date.now() - start); @@ -85,16 +85,14 @@ export async function getFilesList( } let files: File[] = []; + const url = `${config.jsDelivrPackageEndpoint}/${pkg.name}@${pkg.version}/flat`; try { - const response = await request<{ default: string; files: File[] }>( - `${config.jsDelivrPackageEndpoint}/${pkg.name}@${pkg.version}/flat`, - { - responseType: 'json', - } - ); + const response = await request<{ default: string; files: File[] }>(url, { + responseType: 'json', + }); files = response.body.files; } catch (e) { - log.warn(e); + log.error(`Failed to fetch ${url}`, e); } datadog.timing('jsdelivr.getFilesList', Date.now() - start); diff --git a/src/saveDocs.ts b/src/saveDocs.ts index 404b63989..37ec1df60 100644 --- a/src/saveDocs.ts +++ b/src/saveDocs.ts @@ -41,17 +41,21 @@ export default async function saveDocs({ log.info('🔍 No pkgs found in response.'); return Promise.resolve(0); } - - log.info(`👔 Saving... ${names.length} packages`, names); + log.info(' => ', names); + log.info(' Adding metadata...'); let start2 = Date.now(); const pkgs = await addMetaData(rawPkgs); datadog.timing('saveDocs.addMetaData', Date.now() - start2); + log.info(` Saving...`); + start2 = Date.now(); await index.saveObjects(pkgs); datadog.timing('saveDocs.saveObjects', Date.now() - start2); + log.info(` Saved`); + datadog.timing('saveDocs', Date.now() - start); return pkgs.length; } diff --git a/src/watch.ts b/src/watch.ts index d30790af9..f846a34a0 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -1,28 +1,20 @@ import type { SearchIndex } from 'algoliasearch'; import type { QueueObject } from 'async'; import { queue } from 'async'; -import ms from 'ms'; import type { DatabaseChangesResultItem, DocumentLookupFailure } from 'nano'; import type { StateManager } from './StateManager'; -import { config } from './config'; import * as npm from './npm'; import saveDocs from './saveDocs'; import { datadog } from './utils/datadog'; import { log } from './utils/log'; import * as sentry from './utils/sentry'; -let loopStart = Date.now(); let totalSequence: number; // Cached npmInfo.seq let changesConsumer: QueueObject; /** - * Run watch and catchup. - * - * --- Catchup ? - * If the bootstrap is long or the process has been stopped long enough, - * we are lagging behind few changes. - * Catchup() will paginate through changes that we have missed. + * Run watch. * * --- Watch ? * Watch is "Long Polled. This mode is not paginated and the event system in CouchDB send @@ -43,12 +35,6 @@ let changesConsumer: QueueObject; * until an other package is updated. * It will never be up to date because he receive event at the same pace * as they arrive in listener A, even if it's not the same package. - * - * - * --- We could use catchup with a timeout between poll then? - * Yes ! - * When we are catched up, we could await between poll and we will receive N changes. - * But long-polling is more efficient in term of bandwidth and more reactive. */ async function run( stateManager: StateManager, @@ -60,54 +46,11 @@ async function run( changesConsumer = createChangeConsumer(stateManager, mainIndex); - await catchup(stateManager); - - log.info('🚀 Index is up to date, watch mode activated'); - await watch(stateManager); log.info('🚀 watch is done'); } -/** - * Loop through all changes that may have been missed. - */ -async function catchup(stateManager: StateManager): Promise { - let hasCaughtUp: boolean = false; - - while (!hasCaughtUp) { - loopStart = Date.now(); - - try { - const npmInfo = await npm.getInfo(); - totalSequence = npmInfo.seq; - - const { seq } = await stateManager.get(); - - log.info('🚀 Catchup: continue since sequence [%d]', seq); - - // Get one chunk of changes from registry - const changes = await npm.getChanges({ - since: seq, - limit: config.replicateConcurrency, - include_docs: true, - }); - - for (const change of changes.results) { - changesConsumer.push(change); - } - await changesConsumer.drain(); - - const newState = await stateManager.get(); - if (newState.seq! >= totalSequence) { - hasCaughtUp = true; - } - } catch (err) { - sentry.report(err); - } - } -} - /** * Active synchronous mode with Registry. * Changes are polled with a keep-alived connection. @@ -144,7 +87,7 @@ async function watch(stateManager: StateManager): Promise { } /** - * Process changes. + * Process changes in order. */ async function loop( mainIndex: SearchIndex, @@ -180,20 +123,15 @@ async function loop( } /** - * Log our process through catchup/watch. + * Log our process through watch. * */ -function logProgress(seq: number, nbChanges: number): void { - const ratePerSecond = nbChanges / ((Date.now() - loopStart) / 1000); - const remaining = ((totalSequence - seq) / ratePerSecond) * 1000 || 0; - +function logProgress(seq: number): void { log.info( - `🚀 Synced %d/%d changes (%d%), current rate: %d changes/s (%s remaining)`, + `🚀 Synced %d/%d changes (%d%)`, seq, totalSequence, - Math.floor((Math.max(seq, 1) / totalSequence) * 100), - Math.round(ratePerSecond), - ms(remaining) + Math.floor((Math.max(seq, 1) / totalSequence) * 100) ); } @@ -220,7 +158,7 @@ function createChangeConsumer( await stateManager.save({ seq, }); - logProgress(seq, 1); + logProgress(seq); } catch (err) { sentry.report(err); }