Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support parallelize scrapers #356

Merged
merged 8 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/scrape.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ on:
default: "auto_moneyman"
required: false
description: "The name of the worksheet to write to"
parallelScrapes:
default: "1"
required: false
description: "Number of parallel scrapes to run"
schedule:
- cron: "33 10 * * *"
env:
Expand Down Expand Up @@ -57,6 +61,7 @@ jobs:
-e BUXFER_ACCOUNTS
-e TRANSACTION_HASH_TYPE
-e WEB_POST_URL
-e MAX_PARALLEL_SCRAPERS
${{ env.REGISTRY }}/${{ steps.normalize-repository-name.outputs.repository }}:latest
env:
DEBUG: ""
Expand Down Expand Up @@ -85,3 +90,4 @@ jobs:
BUXFER_ACCOUNTS: ${{ secrets.BUXFER_ACCOUNTS }}
TRANSACTION_HASH_TYPE: ${{ secrets.TRANSACTION_HASH_TYPE }}
WEB_POST_URL: ${{ secrets.WEB_POST_URL }}
MAX_PARALLEL_SCRAPERS: ${{ github.event.inputs.parallelScrapes }}
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ Example:
| `TRANSACTION_HASH_TYPE` | `` | The hash type to use for the transaction hash. Can be `moneyman` or empty. The default will be changed to `moneyman` in the upcoming versions |
| `HIDDEN_DEPRECATIONS` | '' | A comma separated list of deprecations to hide |
| `PUPPETEER_EXECUTABLE_PATH` | `undefined` | An ExecutablePath for the scraper. if undefined defaults to system. |
| `MAX_PARALLEL_SCRAPERS` | `1` | The maximum number of parallel scrapers to run |

### Get notified in telegram

Expand Down
180 changes: 97 additions & 83 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
},
"homepage": "https://github.com/daniel-hauser/moneyman#readme",
"dependencies": {
"async": "^3.2.6",
"azure-kusto-data": "^6.0.2",
"azure-kusto-ingest": "^6.0.2",
"buxfer-ts-client": "^1.1.0",
Expand All @@ -38,11 +39,12 @@
"google-auth-library": "^9.14.1",
"google-spreadsheet": "^4.1.4",
"hash-it": "^6.0.0",
"israeli-bank-scrapers": "^5.1.4",
"israeli-bank-scrapers": "^5.2.0",
"telegraf": "^4.16.3",
"ynab": "^2.5.0"
},
"devDependencies": {
"@types/async": "^3.2.24",
"@types/debug": "^4.1.12",
"@types/jest": "^29.5.13",
"husky": "^9.1.6",
Expand Down
274 changes: 274 additions & 0 deletions patches/israeli-bank-scrapers+5.2.0.patch

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions src/browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import puppeteer, {
type Browser,
type PuppeteerLaunchOptions,
} from "puppeteer";
import { createLogger } from "./utils/logger.js";

export const browserArgs = ["--disable-dev-shm-usage", "--no-sandbox"];
export const browserExecutablePath =
process.env.PUPPETEER_EXECUTABLE_PATH || undefined;

const logger = createLogger("browser");

export async function createBrowser(): Promise<Browser> {
const options = {
args: browserArgs,
executablePath: browserExecutablePath,
} satisfies PuppeteerLaunchOptions;

logger("Creating browser", options);
return puppeteer.launch(options);
}
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const {
BUXFER_ACCOUNTS = "",
TRANSACTION_HASH_TYPE = "",
WEB_POST_URL = "",
MAX_PARALLEL_SCRAPERS = "",
} = process.env;

/**
Expand All @@ -34,6 +35,7 @@ export const daysBackToScrape = DAYS_BACK || 10;
export const worksheetName = WORKSHEET_NAME || "_moneyman";
export const futureMonthsToScrape = parseInt(FUTURE_MONTHS, 10);
export const systemTimezone = Intl.DateTimeFormat().resolvedOptions().timeZone;
export const parallelScrapers = MAX_PARALLEL_SCRAPERS || 1;

const accountsToScrape = ACCOUNTS_TO_SCRAPE.split(",")
.filter(Boolean)
Expand Down
135 changes: 87 additions & 48 deletions src/data/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@ import { performance } from "perf_hooks";
import { getAccountTransactions } from "./scrape.js";
import { AccountConfig, AccountScrapeResult } from "../types.js";
import { createLogger } from "../utils/logger.js";
import { createBrowser } from "../browser.js";
import { send, sendError } from "../notifier.js";
import { getFailureScreenShotPath } from "../utils/failureScreenshot.js";
import { ScraperOptions } from "israeli-bank-scrapers";
import { parallelScrapers } from "../config.js";
import { parallelLimit } from "async";
import os from "node:os";

const logger = createLogger("data");
const logger = createLogger("scraper");

export async function scrapeAccounts(
accounts: Array<AccountConfig>,
Expand All @@ -18,67 +25,74 @@ export async function scrapeAccounts(

logger(`scraping %d accounts`, accounts.length);
logger(`start date %s`, startDate.toISOString());

let futureMonths: number | undefined = undefined;
if (!Number.isNaN(futureMonthsToScrape)) {
logger(`months to scrap: %d`, futureMonthsToScrape);
futureMonths = futureMonthsToScrape;
}

const status: Array<string> = [];
const results: Array<AccountScrapeResult> = [];

for (let i = 0; i < accounts.length; i++) {
const account = accounts[i];

logger(`scraping account #${i} (type=${account.companyId})`);
const result = await scrapeAccount(
account,
startDate,
futureMonthsToScrape,
async (message) => {
status[i] = message;
await scrapeStatusChanged?.(status);
},
);

results.push({
companyId: account.companyId,
result,
});
logger(`scraping account #${i} ended`);
logger("Creating a browser");
const browser = await createBrowser();
logger(`Browser created, starting to scrape ${accounts.length} accounts`);

if (Number(parallelScrapers) > 1) {
logger(`Running with ${parallelScrapers} parallel scrapers`);
send(
`System info: ${JSON.stringify(
{
parallelScrapers: Number(parallelScrapers),
availableParallelism: os.availableParallelism(),
totalMemoryGB: (os.totalmem() / 1000000000).toFixed(2),
freeMemoryGB: (os.freemem() / 1000000000).toFixed(2),
cpus: os.cpus().length,
},
null,
2,
)}`,
);
}

logger(`scraping ended`);
const stats = getStats(results);
logger(
`Got ${stats.transactions} transactions from ${stats.accounts} accounts`,
const results = await parallelLimit<AccountConfig, AccountScrapeResult[]>(
accounts.map(
(account, i) => async () =>
scrapeAccount(
logger.extend(`#${i} (${account.companyId})`),
account,
{
browserContext: await browser.createBrowserContext(),
startDate,
companyId: account.companyId,
futureMonthsToScrape: futureMonths,
storeFailureScreenShotPath: getFailureScreenShotPath(
account.companyId,
),
},
async (message, append = false) => {
status[i] = append ? `${status[i]} ${message}` : message;
return scrapeStatusChanged?.(status);
},
),
),
Number(parallelScrapers),
);

const duration = (performance.now() - start) / 1000;
logger(`total duration: ${duration}s`);

logger(`scraping ended, total duration: ${duration.toFixed(1)}s`);
await scrapeStatusChanged?.(status, duration);

return results;
}

export async function scrapeAccount(
account: AccountConfig,
startDate: Date,
futureMonthsToScrape: number,
setStatusMessage: (message: string) => Promise<void>,
) {
let message = "";
const start = performance.now();
const result = await getAccountTransactions(
account,
startDate,
futureMonthsToScrape,
(cid, step) => setStatusMessage((message = `[${cid}] ${step}`)),
);
try {
logger(`closing browser`);
await browser?.close();
} catch (e) {
sendError(e, "browser.close");
logger(`failed to close browser`, e);
}

const duration = (performance.now() - start) / 1000;
logger(`scraping took ${duration.toFixed(1)}s`);
await setStatusMessage(`${message}, took ${duration.toFixed(1)}s`);
return result;
logger(getStats(results));
return results;
}

function getStats(results: Array<AccountScrapeResult>) {
Expand All @@ -99,3 +113,28 @@ function getStats(results: Array<AccountScrapeResult>) {
transactions,
};
}

async function scrapeAccount(
logger: debug.IDebugger,
account: AccountConfig,
scraperOptions: ScraperOptions,
setStatusMessage: (message: string, append?: boolean) => Promise<void>,
) {
logger(`scraping`);

const scraperStart = performance.now();
const result = await getAccountTransactions(
account,
scraperOptions,
(cid, step) => setStatusMessage(`[${cid}] ${step}`),
);

const duration = (performance.now() - scraperStart) / 1000;
logger(`scraping ended, took ${duration.toFixed(1)}s`);
await setStatusMessage(`, took ${duration.toFixed(1)}s`, true);

return {
companyId: account.companyId,
result,
};
}
21 changes: 7 additions & 14 deletions src/data/scrape.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
import { createScraper, ScraperScrapingResult } from "israeli-bank-scrapers";
import {
createScraper,
ScraperOptions,
ScraperScrapingResult,
} from "israeli-bank-scrapers";
import { AccountConfig } from "../types.js";
import { ScraperErrorTypes } from "israeli-bank-scrapers/lib/scrapers/errors.js";
import { createLogger } from "../utils/logger.js";
import { getFailureScreenShotPath } from "../utils/failureScreenshot.js";

const logger = createLogger("scrape");

export async function getAccountTransactions(
account: AccountConfig,
startDate: Date,
futureMonthsToScrape: number,
options: ScraperOptions,
onProgress: (companyId: string, status: string) => void,
): Promise<ScraperScrapingResult> {
logger(`started`);
try {
const scraper = createScraper({
executablePath: process.env.PUPPETEER_EXECUTABLE_PATH || undefined,
startDate,
companyId: account.companyId,
args: ["--disable-dev-shm-usage", "--no-sandbox"],
futureMonthsToScrape: Number.isNaN(futureMonthsToScrape)
? undefined
: futureMonthsToScrape,
storeFailureScreenShotPath: getFailureScreenShotPath(account.companyId),
});
const scraper = createScraper(options);

scraper.onProgress((companyId, { type }) => {
logger(`[${companyId}] ${type}`);
Expand Down
2 changes: 1 addition & 1 deletion src/utils/failureScreenshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export function getFailureScreenShotPath(companyId: string) {
}

const filePath = path.join(companyDir, `${companyId}-${Date.now()}.png`);
logger("getFailureScreenShotPath", { filePath });
logger("getFailureScreenShotPath %o", filePath);

return filePath;
}
Expand Down