Skip to content

Commit

Permalink
fix: pretty, pretty
Browse files Browse the repository at this point in the history
  • Loading branch information
Tadeuchi committed Nov 8, 2023
1 parent 6915601 commit d8a85b1
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 349 deletions.
3 changes: 3 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
lib
src/__tests__/integration/data
6 changes: 6 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"semi": true,
"trailingComma": "none",
"singleQuote": true,
"printWidth": 120
}
17 changes: 10 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 23 additions & 51 deletions src/PgContractCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import {
EvalStateResult,
LoggerFactory,
PruneStats,
SortKeyCacheResult,
} from "warp-contracts";
import { Pool, PoolClient } from "pg";
import { PgContractCacheOptions } from "./PgContractCacheOptions";

export class PgContractCache<V>
implements BasicSortKeyCache<EvalStateResult<V>>
{
SortKeyCacheResult
} from 'warp-contracts';
import { Pool, PoolClient } from 'pg';
import { PgContractCacheOptions } from './PgContractCacheOptions';

export class PgContractCache<V> implements BasicSortKeyCache<EvalStateResult<V>> {
private readonly logger = LoggerFactory.INST.create(PgContractCache.name);

private readonly pool: Pool;
Expand All @@ -21,7 +19,7 @@ export class PgContractCache<V>
if (!pgCacheOptions) {
this.pgCacheOptions = {
minEntriesPerContract: 10,
maxEntriesPerContract: 100,
maxEntriesPerContract: 100
};
}
this.pool = new Pool(pgCacheOptions);
Expand Down Expand Up @@ -67,7 +65,7 @@ export class PgContractCache<V>
}

async begin(): Promise<void> {
await this.client.query("BEGIN;");
await this.client.query('BEGIN;');
}

async close(): Promise<void> {
Expand All @@ -80,23 +78,18 @@ export class PgContractCache<V>
}

async commit(): Promise<void> {
await this.client.query("COMMIT;");
await this.client.query('COMMIT;');
}

async delete(key: string): Promise<void> {
await this.connection().query(
"DELETE FROM warp.sort_key_cache WHERE key = $1;",
[key]
);
await this.connection().query('DELETE FROM warp.sort_key_cache WHERE key = $1;', [key]);
}

dump(): Promise<any> {
return Promise.resolve(undefined);
}

async get(
cacheKey: CacheKey
): Promise<SortKeyCacheResult<EvalStateResult<V>> | null> {
async get(cacheKey: CacheKey): Promise<SortKeyCacheResult<EvalStateResult<V>> | null> {
const result = await this.connection().query(
`SELECT value
FROM warp.sort_key_cache
Expand All @@ -114,11 +107,9 @@ export class PgContractCache<V>
return null;
}

async getLast(
key: string
): Promise<SortKeyCacheResult<EvalStateResult<V>> | null> {
async getLast(key: string): Promise<SortKeyCacheResult<EvalStateResult<V>> | null> {
const result = await this.connection().query(
"SELECT sort_key, value FROM warp.sort_key_cache WHERE key = $1 ORDER BY sort_key DESC LIMIT 1",
'SELECT sort_key, value FROM warp.sort_key_cache WHERE key = $1 ORDER BY sort_key DESC LIMIT 1',
[key]
);

Expand All @@ -132,18 +123,13 @@ export class PgContractCache<V>
}

async getLastSortKey(): Promise<string | null> {
const result = await this.connection().query(
"SELECT max(sort_key) as sort_key FROM warp.sort_key_cache"
);
return result.rows[0].sort_key == "" ? null : result.rows[0].sortKey;
const result = await this.connection().query('SELECT max(sort_key) as sort_key FROM warp.sort_key_cache');
return result.rows[0].sort_key == '' ? null : result.rows[0].sortKey;
}

async getLessOrEqual(
key: string,
sortKey: string
): Promise<SortKeyCacheResult<EvalStateResult<V>> | null> {
async getLessOrEqual(key: string, sortKey: string): Promise<SortKeyCacheResult<EvalStateResult<V>> | null> {
const result = await this.connection().query(
"SELECT sort_key, value FROM warp.sort_key_cache WHERE key = $1 AND sort_key <= $2 ORDER BY sort_key DESC LIMIT 1",
'SELECT sort_key, value FROM warp.sort_key_cache WHERE key = $1 AND sort_key <= $2 ORDER BY sort_key DESC LIMIT 1',
[key, sortKey]
);

Expand All @@ -158,13 +144,9 @@ export class PgContractCache<V>

async open(): Promise<void> {
const conf = this.pgCacheOptions;
this.logger.info(
`Connecting pg... ${conf.user}@${conf.host}:${conf.port}/${conf.database}`
);
this.logger.info(`Connecting pg... ${conf.user}@${conf.host}:${conf.port}/${conf.database}`);
this.client = await this.pool.connect();
await this.client.query(
"CREATE schema if not exists warp; SET search_path TO 'warp';"
);
await this.client.query("CREATE schema if not exists warp; SET search_path TO 'warp';");
await this.sortKeyTable();
await this.validityTable();
this.logger.info(`Connected`);
Expand Down Expand Up @@ -222,7 +204,7 @@ export class PgContractCache<V>
entriesBefore: allItems,
entriesAfter: allItems - deleted,
sizeBefore: -1,
sizeAfter: -1,
sizeAfter: -1
};
}

Expand All @@ -247,13 +229,7 @@ export class PgContractCache<V>
SET valid = EXCLUDED.valid,
sort_key = EXCLUDED.sort_key,
error_message = EXCLUDED.error_message`,
[
stateCacheKey.key,
stateCacheKey.sortKey,
tx,
value.validity[tx],
value.errorMessages[tx],
]
[stateCacheKey.key, stateCacheKey.sortKey, tx, value.validity[tx], value.errorMessages[tx]]
);
}
}
Expand Down Expand Up @@ -288,11 +264,7 @@ export class PgContractCache<V>
/**
* Executed in a separate pool client, so that in can be used by a separate worker.
*/
async setSignature(
cacheKey: CacheKey,
hash: string,
signature: string
): Promise<void> {
async setSignature(cacheKey: CacheKey, hash: string, signature: string): Promise<void> {
this.logger.debug(`Attempting to set signature`, cacheKey, signature);
const result = await this.pool.query(
`
Expand All @@ -311,7 +283,7 @@ export class PgContractCache<V>
}

async rollback(): Promise<void> {
await this.client.query("ROLLBACK;");
await this.client.query('ROLLBACK;');
}

storage<S>(): S {
Expand Down
2 changes: 1 addition & 1 deletion src/PgContractCacheOptions.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ClientConfig } from "pg";
import { ClientConfig } from 'pg';

export interface PgContractCacheOptions extends ClientConfig {
minEntriesPerContract: number;
Expand Down
79 changes: 25 additions & 54 deletions src/PgSortKeyCache.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
import {
BatchDBOp,
CacheKey,
LoggerFactory,
PruneStats,
SortKeyCache,
SortKeyCacheResult,
} from "warp-contracts";
import { Pool, PoolClient } from "pg";
import { SortKeyCacheRangeOptions } from "warp-contracts/lib/types/cache/SortKeyCacheRangeOptions";
import { PgSortKeyCacheOptions } from "./PgSortKeyCacheOptions";
import { BatchDBOp, CacheKey, LoggerFactory, PruneStats, SortKeyCache, SortKeyCacheResult } from 'warp-contracts';
import { Pool, PoolClient } from 'pg';
import { SortKeyCacheRangeOptions } from 'warp-contracts/lib/types/cache/SortKeyCacheRangeOptions';
import { PgSortKeyCacheOptions } from './PgSortKeyCacheOptions';

export class PgSortKeyCache<V> implements SortKeyCache<V> {
private readonly logger = LoggerFactory.INST.create(PgSortKeyCache.name);
Expand All @@ -19,17 +12,15 @@ export class PgSortKeyCache<V> implements SortKeyCache<V> {

constructor(private readonly pgCacheOptions: PgSortKeyCacheOptions) {
if (!pgCacheOptions.tableName) {
throw new Error("Table name cannot be empty");
throw new Error('Table name cannot be empty');
}
this.tableName = pgCacheOptions.tableName;
}

private async createTableIfNotExists() {
await this.connection().query(
"CREATE schema if not exists warp; SET search_path TO 'warp';"
);
await this.connection().query("CREATE schema if not exists warp; SET search_path TO 'warp';");
this.logger.info(`Attempting to create table ${this.tableName}`);
const query = `
const query = `
CREATE TABLE IF NOT EXISTS "${this.tableName}"
(
id bigserial,
Expand All @@ -48,7 +39,7 @@ export class PgSortKeyCache<V> implements SortKeyCache<V> {
if (this.client == null) {
this.client = await this.pool.connect();
}
await this.client.query("BEGIN;");
await this.client.query('BEGIN;');
}

async close(): Promise<void> {
Expand All @@ -68,14 +59,11 @@ export class PgSortKeyCache<V> implements SortKeyCache<V> {
this.logger.error(`Called commit when no connection established.`);
return;
}
await this.client.query("COMMIT;");
await this.client.query('COMMIT;');
}

async delete(key: string): Promise<void> {
await this.connection().query(
`DELETE FROM warp."${this.tableName}" WHERE key = $1;`,
[key]
);
await this.connection().query(`DELETE FROM warp."${this.tableName}" WHERE key = $1;`, [key]);
}

dump(): Promise<any> {
Expand Down Expand Up @@ -104,35 +92,24 @@ export class PgSortKeyCache<V> implements SortKeyCache<V> {
);

if (result && result.rows && result.rows.length > 0) {
return new SortKeyCacheResult<V>(
result.rows[0].sort_key,
result.rows[0].value
);
return new SortKeyCacheResult<V>(result.rows[0].sort_key, result.rows[0].value);
}
return null;
}

async getLastSortKey(): Promise<string | null> {
const result = await this.connection().query(
`SELECT max(sort_key) as sort_key FROM warp."${this.tableName}";`
);
return result.rows[0].sort_key == "" ? null : result.rows[0].sortKey;
const result = await this.connection().query(`SELECT max(sort_key) as sort_key FROM warp."${this.tableName}";`);
return result.rows[0].sort_key == '' ? null : result.rows[0].sortKey;
}

async getLessOrEqual(
key: string,
sortKey: string
): Promise<SortKeyCacheResult<V> | null> {
async getLessOrEqual(key: string, sortKey: string): Promise<SortKeyCacheResult<V> | null> {
const result = await this.connection().query(
`SELECT sort_key, value FROM warp."${this.tableName}" WHERE key = $1 AND sort_key <= $2 ORDER BY sort_key DESC LIMIT 1;`,
[key, sortKey]
);

if (result && result.rows.length > 0) {
return new SortKeyCacheResult<V>(
result.rows[0].sort_key,
result.rows[0].value
);
return new SortKeyCacheResult<V>(result.rows[0].sort_key, result.rows[0].value);
}
return null;
}
Expand All @@ -144,7 +121,7 @@ export class PgSortKeyCache<V> implements SortKeyCache<V> {
});

this.logger.info(`Connecting pg... ${conf.user}@${conf.host}:${conf.port}/${conf.database}`);
await this.pool.query( "CREATE schema if not exists warp; SET search_path TO 'warp';");
await this.pool.query("CREATE schema if not exists warp; SET search_path TO 'warp';");
await this.createTableIfNotExists();
this.logger.info(`Setup finished`);
}
Expand Down Expand Up @@ -201,7 +178,7 @@ export class PgSortKeyCache<V> implements SortKeyCache<V> {
entriesBefore: allItems,
entriesAfter: allItems - deleted,
sizeBefore: -1,
sizeAfter: -1,
sizeAfter: -1
};
}

Expand Down Expand Up @@ -251,7 +228,7 @@ export class PgSortKeyCache<V> implements SortKeyCache<V> {
this.logger.error(`Rollback called, but no connection established`);
return;
}
await this.client.query("ROLLBACK;");
await this.client.query('ROLLBACK;');
}

storage<S>(): S {
Expand All @@ -273,9 +250,9 @@ export class PgSortKeyCache<V> implements SortKeyCache<V> {
try {
await this.begin();
for (const op of opStack) {
if (op.type === "put") {
if (op.type === 'put') {
await this.put(op.key, op.value);
} else if (op.type === "del") {
} else if (op.type === 'del') {
await this.delete(op.key);
}
}
Expand All @@ -300,11 +277,8 @@ export class PgSortKeyCache<V> implements SortKeyCache<V> {
return Promise.resolve(undefined);
}

async keys(
sortKey: string,
options?: SortKeyCacheRangeOptions
): Promise<string[]> {
const order = options?.reverse ? "DESC" : "ASC";
async keys(sortKey: string, options?: SortKeyCacheRangeOptions): Promise<string[]> {
const order = options?.reverse ? 'DESC' : 'ASC';
const result = await this.connection().query({
text: `WITH latest_values AS (SELECT DISTINCT ON (key) key, sort_key, value
FROM warp."${this.tableName}"
Expand All @@ -318,16 +292,13 @@ export class PgSortKeyCache<V> implements SortKeyCache<V> {
from latest_values
order by key ${order};`,
values: [sortKey, options?.gte, options?.lt, options?.limit],
rowMode: "array",
rowMode: 'array'
});
return result.rows.flat();
}

async kvMap(
sortKey: string,
options?: SortKeyCacheRangeOptions
): Promise<Map<string, V>> {
const order = options?.reverse ? "DESC" : "ASC";
async kvMap(sortKey: string, options?: SortKeyCacheRangeOptions): Promise<Map<string, V>> {
const order = options?.reverse ? 'DESC' : 'ASC';
const result = await this.connection().query(
`
WITH latest_values AS (SELECT DISTINCT ON (key) key, sort_key, value
Expand Down
2 changes: 1 addition & 1 deletion src/PgSortKeyCacheOptions.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ClientConfig } from "pg";
import { ClientConfig } from 'pg';

export interface PgSortKeyCacheOptions extends ClientConfig {
tableName: string;
Expand Down
Loading

0 comments on commit d8a85b1

Please sign in to comment.