Skip to content

Commit

Permalink
wip: import source
Browse files Browse the repository at this point in the history
  • Loading branch information
blacha committed Dec 13, 2024
1 parent 102fc42 commit 94385ad
Show file tree
Hide file tree
Showing 22 changed files with 1,330 additions and 0 deletions.
Empty file.
5 changes: 5 additions & 0 deletions packages/lambda-analytic-cloudfront/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# @basemaps/lambda-analytic-cloudfront

Generate analytics from CloudFront distribution statistics

Every hour this lambda function runs and generates a rolled up summary of usage by API Key
41 changes: 41 additions & 0 deletions packages/lambda-analytic-cloudfront/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"name": "@basemaps/lambda-analytic-cloudfront",
"version": "7.11.0",
"private": true,
"repository": {
"type": "git",
"url": "https://github.com/linz/basemaps.git",
"directory": "packages/lambda-analytic-cloudfront"
},
"author": {
"name": "Land Information New Zealand",
"url": "https://linz.govt.nz",
"organization": true
},
"type": "module",
"engines": {
"node": ">=16.0.0"
},
"license": "MIT",
"dependencies": {
"@basemaps/config": "^7.11.0",
"@basemaps/geo": "^7.11.0",
"@basemaps/shared": "^7.11.0",
"ua-parser-js": "^1.0.39"
},
"scripts": {
"test": "node --test",
"bundle": "../../scripts/bundle.mjs package.json"
},
"devDependencies": {
"@elastic/elasticsearch": "^8.16.2",
"@types/ua-parser-js": "^0.7.36"
},
"bundle": {
"entry": "src/index.ts",
"outdir": "dist/",
"external": [
"pino-pretty"
]
}
}
119 changes: 119 additions & 0 deletions packages/lambda-analytic-cloudfront/src/__test__/analytics.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import assert from 'node:assert';
import { beforeEach, describe, it, TestContext } from 'node:test';
import { gzipSync } from 'node:zlib';

import { Env, fsa, FsMemory } from '@basemaps/shared';
import { Client } from '@elastic/elasticsearch';

import { getYesterday } from '../date.js';
import { Elastic } from '../elastic.js';
import { handler } from '../handler.js';
import { LogStats } from '../log.stats.js';
import { LogData } from './log.data.js';

interface IndexOperation {
index: { _index: string };
}
type BulkOperation = (IndexOperation | LogStats)[];

describe('analytic lambda', () => {
const memory = new FsMemory();
beforeEach(() => {
fsa.register('mem://', memory);
memory.files.clear();

Elastic.indexDelay = 1;
Elastic._client = undefined;
});

function setupEnv(t: TestContext): void {
t.mock.method(Env, 'get', (key: string): string => {
switch (key) {
case Env.Analytics.CacheBucket:
return 'mem://cache/';
case Env.Analytics.CloudFrontSourceBucket:
return 'mem://source/';
case Env.Analytics.CloudFrontId:
return 'cfid';
case Env.Analytics.MaxRecords:
return '30';
}
throw new Error(`Invalid test process.env access ${key}`);
});
}

it('should process some log data', async (t) => {
setupEnv(t);

const operations: BulkOperation[] = [];
Elastic._client = {
bulk(op: { operations: BulkOperation }) {
operations.push(op.operations);
return Promise.resolve({});
},
} as unknown as Client;

const YesterDay = getYesterday();
const shortDate = YesterDay.toISOString().slice(0, 13).replace('T', '-');

await fsa.write(new URL(`mem://source/cfid.${shortDate}/data.txt.gz`), gzipSync(LogData));

await handler();

// One call to insert
assert.equal(operations.length, 1);

const op = operations[0];

const indexOpt = op[0] as IndexOperation;
const logOpt = op[1] as LogStats;

// First Log line: /v1/tiles/aerial/EPSG:3857/19/516588/320039.webp
assert.equal(indexOpt.index._index, 'basemaps-history-2020');
assert.equal(logOpt.apiType, 'd');
assert.equal(logOpt.tileMatrix, 'EPSG:3857');
assert.equal(logOpt.tileMatrixId, 'WebMercatorQuad');
assert.equal(logOpt.tileSet, 'aerial');
assert.equal(logOpt.z, 19);
assert.equal(logOpt.cacheHit, 1);
assert.equal(logOpt.cacheMiss, 0);
assert.equal(logOpt.total, 1);

assert.deepEqual(logOpt.ua, { os: 'linux', name: 'chrome', version: '85', variant: 'unknown' });

const files = [...memory.files.keys()];
assert.equal(files.length, 2); // two files one input one output

assert.equal(
files[1],
`mem://cache/RollUpV3/${shortDate.slice(0, 4)}/${shortDate.slice(5, 7)}/${shortDate}.ndjson.gz`,
);
});

it('should write errors to storage', async (t) => {
setupEnv(t);

Elastic._client = {
bulk() {
return Promise.resolve({ errors: ['Hello'] });
},
} as unknown as Client;

const YesterDay = getYesterday();
const shortDate = YesterDay.toISOString().slice(0, 13).replace('T', '-');

await fsa.write(new URL(`mem://source/cfid.${shortDate}/data.txt.gz`), gzipSync(LogData));

const ret = await handler().catch((e: Error) => e);

assert.equal(String(ret), 'Error: Failed to index');

const files = [...memory.files.keys()];
assert.equal(files.length, 2); // two files one input one output

assert.ok(
files[1].startsWith(`mem://cache/errors-${new Date().toISOString().slice(0, 12)}`),
// ${shortDate.slice(0, 4)}/${shortDate.slice(5, 7)}/${shortDate}.ndjson.gz`,
);
});
});
15 changes: 15 additions & 0 deletions packages/lambda-analytic-cloudfront/src/__test__/log.data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { ulid } from 'ulid';

export const DevApiKey = 'd' + ulid().toLowerCase();
export const ClientApiKey = 'c' + ulid().toLowerCase();

export const LogData = `#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end
2020-07-28 01:11:25 AKL50-C1 20753 255.255.255.141 GET d1mez8rta20vo0.cloudfront.net /v1/tiles/aerial/EPSG:3857/19/516588/320039.webp 200 https://bar.com/ Mozilla/5.0%20(X11;%20Linux%20x86_64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/85.0.4183.101%20Safari/537.36 api=${DevApiKey} - Hit sBUoz03SwR_hVZkdj0LVC1s_bKakd9ONcKTYRrQLuIR3VPBQUx5xog== basemaps.linz.govt.nz https 82 0.049 - TLSv1.3 TLS_AES_128_GCM_SHA256 Hit HTTP/2.0 -- 21780 0.049 Hit image/webp 20320 - -
2020-07-28 01:16:13 SYD1-C2 156474 255.255.255.4 GET d1mez8rta20vo0.cloudfront.net /v1/tiles/aerial/NZTM2000Quad/19/516542/319785.png 200 https://www.bar.com/ Mozilla/5.0%20(Macintosh;%20Intel%20Mac%20OS%20X%2010_15_4)%20AppleWebKit/605.1.15%20(KHTML,%20like%20Gecko)%20Version/13.1.2%20Safari/605.1.15 api=${DevApiKey}&foo=bar - Hit 9KNnEESjZA-yVs62ffwtRYNaa0gpYKLeEEHH490dmO7AAu3ZxnPc8Q== basemaps.linz.govt.nz https 77 1.791 - TLSv1.3 TLS_AES_128_GCM_SHA256 Hit HTTP/2.0 - - 19468 0.028 Hit image/png 155886 - -
2020-07-28 01:16:21 SYD1-C2 21223 255.255.255.73 GET d1mez8rta20vo0.cloudfront.net /v1/tiles/topo50/3857/18/257866/162011.jpeg 200 https://bar.com/map/ Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/85.0.4183.102%20Safari/537.36 api=${DevApiKey} - Miss a5nrTCsdsP5EDQ9EXkUQQJMCJTlbRUz5JIxowZ-1kRriRDUmLPxvVQ== basemaps.linz.govt.nz https 76 0.222 - TLSv1.3 TLS_AES_128_GCM_SHA256 Miss HTTP/2.0 - - 57799 0.222 Miss image/jpeg 20797 - -
2020-07-28 01:13:33 SYD4-C2 2588 255.255.255.128 GET d1mez8rta20vo0.cloudfront.net /v1/tiles/topo50/EPSG:3857/WMTSCapabilities.xml 200 - Mozilla/5.0%20QGIS/31006 api=${ClientApiKey} - RefreshHit oflBr-vO5caoVpi2S23hGh9YWMUca-McU_Fl5oN9fqW_H9ea_iS-Kg== basemaps.linz.govt.nz https 243 0.051 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 RefreshHit HTTP/1.1 - - 55515 0.050 RefreshHit text/xml -
2020-07-28 01:13:33 SYD4-C2 2588 255.255.255.128 GET d1mez8rta20vo0.cloudfront.net /v1/tiles/topo50/EPSG:2193/18/257866/162011.pbf 200 - Mozilla/5.0%20QGIS/31006 api=${ClientApiKey} - RefreshHit oflBr-vO5caoVpi2S23hGh9YWMUca-McU_Fl5oN9fqW_H9ea_iS-Kg== basemaps.linz.govt.nz https 243 0.051 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 RefreshHit HTTP/1.1 - - 55515 0.050 RefreshHit text/xml -
2020-07-28 01:13:33 SYD4-C2 2588 255.255.255.128 GET d1mez8rta20vo0.cloudfront.net /v1/tiles/antipodes-islands-satellite-2019-2020-0.5m/NZTM2000Quad/18/257866/162011.webp 200 - Mozilla/5.0%20QGIS/31006 api=${ClientApiKey} - RefreshHit oflBr-vO5caoVpi2S23hGh9YWMUca-McU_Fl5oN9fqW_H9ea_iS-Kg== basemaps.linz.govt.nz https 243 0.051 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 RefreshHit HTTP/1.1 - - 55515 0.050 RefreshHit text/xml -
2020-07-28 01:13:33 SYD4-C2 2588 255.255.255.128 GET d1mez8rta20vo0.cloudfront.net /v1/tiles/elevation/WebMercatorQuad/18/257866/162011.png 200 - Mozilla/5.0%20QGIS/31006 api=${ClientApiKey}&pipeline=terrain-rgb - RefreshHit oflBr-vO5caoVpi2S23hGh9YWMUca-McU_Fl5oN9fqW_H9ea_iS-Kg== basemaps.linz.govt.nz https 243 0.051 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 RefreshHit HTTP/1.1 - - 55515 0.050 RefreshHit text/xml -
`.trim();
33 changes: 33 additions & 0 deletions packages/lambda-analytic-cloudfront/src/date.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
export function getYesterday(): Date {
// Process up to about a day ago
const maxDate = new Date();
maxDate.setUTCMinutes(0);
maxDate.setUTCSeconds(0);
maxDate.setUTCMilliseconds(0);
maxDate.setUTCDate(maxDate.getUTCDate() - 1);
return maxDate;
}

export function* byDay(startDate: Date, endDate: Date): Generator<string> {
const currentDate = new Date(startDate);
currentDate.setUTCMinutes(0);
currentDate.setUTCSeconds(0);
currentDate.setUTCMilliseconds(0);
while (true) {
yield currentDate.toISOString().slice(0, 10);
currentDate.setUTCDate(currentDate.getUTCDate() - 1);
if (currentDate.getTime() < endDate.getTime()) break;
}
}

export function* byMonth(startDate: Date, endDate: Date): Generator<string> {
const currentDate = new Date(startDate);
currentDate.setUTCMinutes(0);
currentDate.setUTCSeconds(0);
currentDate.setUTCMilliseconds(0);
while (true) {
yield currentDate.toISOString().slice(0, 7);
currentDate.setUTCMonth(currentDate.getUTCMonth() - 1);
if (currentDate.getTime() < endDate.getTime()) break;
}
}
74 changes: 74 additions & 0 deletions packages/lambda-analytic-cloudfront/src/elastic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { Env, LogType } from '@basemaps/shared';
import { Client } from '@elastic/elasticsearch';

import { LogStats } from './log.stats.js';

export class ElasticClient {
_client: Client | undefined;
/** Between index requests delay this amount */
indexDelay: number = 200;

get client(): Client {
if (this._client != null) return this._client;

const id = Env.get(Env.Analytics.ElasticId);
const apiKey = Env.get(Env.Analytics.ElasticApiKey);
if (id == null) throw new Error(`$${Env.Analytics.ElasticId} is unset`);
if (apiKey == null) throw new Error(`$${Env.Analytics.ElasticApiKey} is unset`);
this._client = new Client({ cloud: { id }, auth: { apiKey } });
return this._client;
}

errors: unknown[] = [];
insertQueue: Promise<void> = Promise.resolve();

async insert(prefix: string, combined: Iterable<LogStats>, log: LogType): Promise<void> {
this.insertQueue = this.insertQueue.then(() => this._doInsert(prefix, combined, log));
return this.insertQueue;
}

async _doInsert(prefix: string, combined: Iterable<LogStats>, log: LogType): Promise<void> {
const client = this.client;
let inserts = 0;
let skipHits = 0;
let operations: unknown[] = [];

const startTime = performance.now();

const errors = this.errors;
const indexDelay = this.indexDelay;

async function doInsert(): Promise<void> {
inserts += operations.length / 2;
log.trace({ prefix, records: operations.length / 2, skipHits, total: inserts }, 'log:ingest');
const ret = await client.bulk({ operations });

if (ret.errors) {
errors.push(ret);
throw new Error('Failed to index: ' + prefix);
}
// Give it a little bit of time to index
await new Promise((r) => setTimeout(r, indexDelay));
operations = [];
}

for (const rec of combined) {
if (rec.total < 1) {
skipHits++;
continue;
}
operations.push({ index: { _index: 'basemaps-history-' + rec['@timestamp'].slice(0, 4), _id: rec.id } }, rec);
if (operations.length > 50_000) await doInsert();
}

if (operations.length > 0) await doInsert();

if (inserts > 0) {
log.info({ prefix, skipHits, total: inserts, duration: performance.now() - startTime }, 'log:ingest');
} else {
log.trace({ prefix }, 'log:ingest:skip');
}
}
}

export const Elastic = new ElasticClient();
Loading

0 comments on commit 94385ad

Please sign in to comment.