Skip to content

Commit

Permalink
Updates on files
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Rogalskiy committed Nov 11, 2021
1 parent 19bf186 commit 416b412
Show file tree
Hide file tree
Showing 8 changed files with 1,632 additions and 0 deletions.
185 changes: 185 additions & 0 deletions src/error/PlaidConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import fs from 'fs';
import plaid from 'plaid';
import mysql2 from 'mysql2';
import moment from 'moment';

// trying to land this as part of proper typescript
// see https://github.com/microsoft/TypeScript/issues/37695
type Obj<T> = Record<string, T>;

export interface PlaidConfig {
clientId: string;
secret: string;
publicKey: string;
env: string;
institutionTokens: Obj<string>;
}

export interface DbConfig {
host: string;
user: string;
password: string;
database: string;
}

interface SyncConfig {
plaid: PlaidConfig;
db: DbConfig;
}

// load config. TODO: validate config
const {plaid: plaidConf, db: dbConf}: SyncConfig = JSON.parse(fs.readFileSync(`./config.json`, `utf-8`));

const dbClient = mysql2.createPool(dbConf).promise();
const plaidClient = new plaid.Client(
plaidConf.clientId,
plaidConf.secret,
plaidConf.publicKey,
plaid.environments[plaidConf.env],
{version: '2019-05-29'},
);

function arrayAsInsertSql(tableName: string, rows: any[]): string {
if (rows.length < 1) {
return ``;
}

const esc = (str: string) => `\`${str}\``;
const sqlParts: string[] = [];
const columns = Object.keys(rows[0]);
sqlParts.push(`INSERT INTO ${esc(tableName)} (${columns.map(esc).join(`, `)}) VALUES`);
rows.forEach((row, idx) => {
const comma = idx < rows.length - 1 ? `,` : ``;
sqlParts.push(`(${columns.map((col) => JSON.stringify(row[col] ?? null)).join(`, `)})${comma}`);
});
const valueUpdates = columns
.filter((col) => col !== `id`)
.map((col) => `${esc(col)}=VALUES(${esc(col)})`)
.join(`, `);
sqlParts.push(`ON DUPLICATE KEY UPDATE ${valueUpdates}`); // upsert if primary key already exists
sqlParts.push(`;`);
return sqlParts.join(`\n`);
}

async function updateTable(tableName: string, rows: any[]) {
const sql = arrayAsInsertSql(tableName, rows);
if (!sql) return; // nothing to execute

const sqlPreview = sql.length > 100 ? `${sql.substr(0, 97).replace(/\n/g, ` `)}...` : sql;
console.log(`Executing ${sqlPreview}`);

// run query
const [queryResult] = await dbClient.execute(sql);
console.log((queryResult as any).info);

// write to filesystem so we have some form of a log
fs.writeFileSync(`tables/${tableName}.sql`, sql);
}

async function fetchCategories() {
console.log(`fetching categories`);
const {categories} = await plaidClient.getCategories();
const categoryRows = categories.map(({category_id, group, hierarchy}) => ({
id: category_id,
group,
category: hierarchy[0],
category1: hierarchy[1],
category2: hierarchy[2],
}));
return categoryRows;
}

async function syncAccounts(institutionTokens: Obj<string>, historyMonths = 1) {
const accountRows = [];
const institutionRows = [];
const transactionRows = [];
const categoryRows = await fetchCategories();

// refreshing transactions is not available in dev env :(
// const accessTokens = Object.values(institutionTokens);
// console.log(`refreshing transactions across ${accessTokens.length} accounts`);
// await Promise.all(accessTokens.map((accessToken) => plaidClient.refreshTransactions(accessToken)));

for (const [institutionName, accessToken] of Object.entries(institutionTokens)) {
console.log(`fetching transactions for`, institutionName, accessToken);

// accounts //
try {
const {accounts, item: institution} = await plaidClient.getAccounts(accessToken);
institutionRows.push({
id: institution.institution_id,
name: institutionName,
});

for (const account of accounts) {
let curBalance = account.balances.current;
if (account.type === `credit` || account.type === `loan`) {
curBalance *= -1;
}

accountRows.push({
id: account.account_id,
institution_id: institution.institution_id,
balance_current: curBalance,
mask: account.mask,
name: account.official_name || account.name,
type: account.type,
subtype: account.subtype,
});
}
} catch (err) {
console.error(institutionName, err);
continue;
}

// transactions //
const startDate = moment().subtract(historyMonths, 'months').format('YYYY-MM-DD');
const endDate = moment().format('YYYY-MM-DD');
const {transactions} = await plaidClient.getAllTransactions(accessToken, startDate, endDate);

for (const tr of transactions) {
if (tr.pending) {
continue; // ignore pending transactions
}

// remove unneccessary prefix
const prefixMatch = tr.name.match(/^Ext Credit Card (Debit|Credit) /);
if (prefixMatch) {
tr.name = tr.name.substr(prefixMatch[0].length);
}

if (!tr.location.country && tr.iso_currency_code === `USD` && tr.location.region) {
tr.location.country = `US`;
}

transactionRows.push({
id: tr.transaction_id,
account_id: tr.account_id,
name: tr.name,
amount: -tr.amount,
date: tr.date,
category_id: tr.category_id,
currency_code: tr.iso_currency_code,
location_city: tr.location.city,
location_state: tr.location.region,
location_country: tr.location.country,
// plaid types not upto date, see https://github.com/plaid/plaid-node/pull/266
payment_channel: (tr as any).payment_channel,
});
}
}

await updateTable(`categories`, categoryRows);
await updateTable(`institutions`, institutionRows);
await updateTable(`accounts`, accountRows);
await updateTable(`transactions`, transactionRows);
}

///// main /////
const historyMonths = 1; // sync one month back
syncAccounts(plaidConf.institutionTokens, historyMonths)
.then(() => process.exit(0)) // we need manual process.exit because mysql holds pool
.catch((err) => {
console.error(err);
process.exit(1);
});
92 changes: 92 additions & 0 deletions src/error/Request.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/**
* Interface according to spec from http://www.jsonrpc.org/specification
* JSON-RPC is a stateless, light-weight remote procedure call (RPC) protocol.
* Primarily this specification defines several data structures.
* It is transport agnostic in that the concepts can be used within the same process,
* over sockets, over http, or in many various message passing environments. I
* It uses JSON (RFC 4627) as data format.
*/
export declare namespace JsonRpc2 {
/**
* Request object representation of a rpc call.
* Server always replies with a Response object having the same id.
*/
interface Request extends Notification {
/** An identifier established by the Client */
id: number;
}
/**
* Client can send a request with no expectation of a response.
* Server can send a notification without an explicit request by a client.
*/
interface Notification {
/** Name of the method to be invoked. */
method: string;
/** Parameter values to be used during the invocation of the method. */
params?: any;
/** Version of the JSON-RPC protocol. MUST be exactly "2.0". */
jsonrpc?: '2.0';
}
/**
* Response object representation of a rpc call.
* Response will always contain a result property unless an error occured.
* In which case, an error property is present.
*/
interface Response {
/** An identifier established by the Client. */
id: number;
/** Result object from the Server if method invocation was successful. */
result?: any;
/** Error object from Server if method invocation resulted in an error. */
error?: Error;
/** Version of the JSON-RPC protocol. MUST be exactly "2.0". */
jsonrpc?: '2.0';
}
/**
* Error object representation when a method invocation fails.
*/
interface Error {
/** Indicates the error type that occurred. */
code: ErrorCode;
/** A short description of the error. */
message: string;
/** Additional information about the error */
data?: any;
}
const enum ErrorCode {
/** Parse error Invalid JSON was received by the Server. */
ParseError = -32700,
/** Invalid Request The JSON sent is not a valid Request object. */
InvalidRequest = -32600,
/** The method does not exist / is not available. */
MethodNotFound = -32601,
/** Invalid method parameter(s). */
InvalidParams = 32602,
/** Internal JSON-RPC error. */
InternalError = -32603,
}
type PromiseOrNot<T> = Promise<T> | T;
/** A JsonRPC Client that abstracts the transportation of messages to and from the Server. */
interface Client {
/** Creates a Request object and sends to the Server. Returns the Response from the Server as a Promise. */
call: (method: string, params: any) => Promise<any>;
/** Invokes the handler function when Server sends a notification. */
on: (method: string, handler: (params: any) => void) => void;
/** Sends a notification to the Server. */
notify: (method: string, params?: any) => void;
}
/** A JsonRPC Server that abstracts the transportation of messages to and from the Client */
interface Server {
/**
* Invokes the handler function when Client sends a Request and sends the Response back.
* If handler function returns a Promise, then it waits for the promise to be resolved or rejected before returning.
* It also wraps the handler in a trycatch so it can send an error response when an exception is thrown.
*/
expose: (method: string, handler: (params: any) => Promise<any>) => void;
/** Invokes the handler function when Client sends a notification. */
on: (method: string, handler: (params: any) => void) => void;
/** Sends a notification to the Client. */
notify: (method: string, params?: any) => void;
}
}
export default JsonRpc2;
Loading

0 comments on commit 416b412

Please sign in to comment.