Skip to content

Commit

Permalink
rotor: added persistent store metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Jan 15, 2025
1 parent 44c133c commit 5d1d287
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 52 deletions.
29 changes: 27 additions & 2 deletions libs/core-functions/src/functions/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
FuncReturn,
FunctionLogger,
JitsuFunction,
Metrics,
FunctionMetrics,
TTLStore,
} from "@jitsu/protocols/functions";
import {
Expand Down Expand Up @@ -70,6 +70,31 @@ export type MetricsMeta = {
retries?: number;
};

export type FuncChainResult = {
connectionId?: string;
events: AnyEvent[];
execLog: FunctionExecLog;
};

export type FunctionExecRes = {
receivedAt?: any;
eventIndex: number;
event?: any;
metricsMeta?: MetricsMeta;
functionId: string;
error?: any;
dropped?: boolean;
ms: number;
};

export type FunctionExecLog = FunctionExecRes[];

export interface RotorMetrics {
logMetrics: (execLog: FunctionExecLog) => void;
storeStatus: (namespace: string, operation: string, status: string) => void;
close: () => void;
}

export type FetchType = (
url: string,
opts?: FetchOpts,
Expand All @@ -92,7 +117,7 @@ export type FunctionChainContext = {
fetch: InternalFetchType;
store: TTLStore;
anonymousEventsStore?: AnonymousEventsStore;
metrics?: Metrics;
metrics?: FunctionMetrics;
connectionOptions?: any;
};

Expand Down
32 changes: 25 additions & 7 deletions libs/core-functions/src/functions/lib/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import parse from "parse-duration";
import { MongoClient, ReadPreference, Collection } from "mongodb";
import { RetryError } from "@jitsu/functions-lib";
import { getLog, Singleton } from "juava";
import { RotorMetrics } from "./index";

export const defaultTTL = 60 * 60 * 24 * 31; // 31 days
export const maxAllowedTTL = 2147483647; // max allowed value for ttl in redis (68years)
Expand Down Expand Up @@ -69,7 +70,8 @@ export const createMongoStore = (
namespace: string,
mongo: Singleton<MongoClient>,
useLocalCache: boolean,
fast: boolean
fast: boolean,
metrics?: RotorMetrics
): TTLStore => {
const localCache: Record<string, StoreValue> = {};
const readOptions = fast ? { readPreference: ReadPreference.NEAREST } : {};
Expand Down Expand Up @@ -113,22 +115,32 @@ export const createMongoStore = (
}
}

function storeErr(err: any, text: string) {
function storeErr(operation: "get" | "set" | "del" | "ttl", err: any, text: string, metrics?: RotorMetrics) {
log.atError().log(`${text}: ${err.message}`);
if (metrics) {
metrics.storeStatus(namespace, operation, "error");
}
if ((err.message ?? "").includes("timed out")) {
return new RetryError(text + ": Timed out.");
}
return new RetryError(text + ": " + err.message);
}

function success(operation: "get" | "set" | "del" | "ttl", metrics?: RotorMetrics) {
if (metrics) {
metrics.storeStatus(namespace, operation, "success");
}
}

return {
get: async (key: string) => {
try {
const res =
getFromLocalCache(key) || (await ensureCollection().then(c => c.findOne({ _id: key }, readOptions)));
success("get", metrics);
return res ? res.value : undefined;
} catch (err: any) {
throw storeErr(err, `Error getting key ${key} from mongo store ${namespace}`);
throw storeErr("get", err, `Error getting key ${key} from mongo store ${namespace}`);
}
},
getWithTTL: async (key: string) => {
Expand All @@ -139,9 +151,10 @@ export const createMongoStore = (
return undefined;
}
const ttl = res.expireAt ? Math.max(Math.floor((res.expireAt.getTime() - new Date().getTime()) / 1000), 0) : -1;
success("get", metrics);
return { value: res.value, ttl };
} catch (err: any) {
throw storeErr(err, `Error getting key ${key} from mongo store ${namespace}`);
throw storeErr("get", err, `Error getting key ${key} from mongo store ${namespace}`);
}
},
set: async (key: string, obj: any, opts?: SetOpts) => {
Expand All @@ -165,9 +178,12 @@ export const createMongoStore = (
if (useLocalCache) {
localCache[key] = colObj;
}
})
.then(() => {
success("set", metrics);
});
} catch (err: any) {
throw storeErr(err, `Error setting key ${key} in mongo store ${namespace}`);
throw storeErr("set", err, `Error setting key ${key} in mongo store ${namespace}`);
}
},
del: async (key: string) => {
Expand All @@ -179,21 +195,23 @@ export const createMongoStore = (
delete localCache[key];
}
});
success("del", metrics);
} catch (err: any) {
throw storeErr(err, `Error deleting key ${key} from mongo store ${namespace}`);
throw storeErr("del", err, `Error deleting key ${key} from mongo store ${namespace}`);
}
},
ttl: async (key: string) => {
try {
const res =
getFromLocalCache(key) || (await ensureCollection().then(c => c.findOne({ _id: key }, readOptions)));
success("ttl", metrics);
return res
? res.expireAt
? Math.max(Math.floor((res.expireAt.getTime() - new Date().getTime()) / 1000), 0)
: -1
: -2;
} catch (err: any) {
throw storeErr(err, `Error getting key ${key} from mongo store ${namespace}`);
throw storeErr("ttl", err, `Error getting key ${key} from mongo store ${namespace}`);
}
},
};
Expand Down
4 changes: 4 additions & 0 deletions libs/core-functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ export * as mongodbDestination from "./functions/mongodb-destination";
export { mongodb, mongoAnonymousEventsStore } from "./functions/lib/mongodb";
export type {
MetricsMeta,
RotorMetrics,
FuncChainResult,
FunctionExecLog,
FunctionExecRes,
FunctionContext,
FunctionChainContext,
FetchType,
Expand Down
2 changes: 1 addition & 1 deletion services/rotor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import Prometheus from "prom-client";
import { FunctionsHandler, FunctionsHandlerMulti } from "./http/functions";
import { initMaxMindClient, GeoResolver } from "./lib/maxmind";
import { MessageHandlerContext, rotorMessageHandler } from "./lib/message-handler";
import { DummyMetrics, Metrics } from "./lib/metrics";
import { DummyMetrics } from "./lib/metrics";
import { connectionsStore, functionsStore } from "./lib/repositories";
import { Server } from "node:net";
import { getApplicationVersion, getDiagnostics } from "./lib/version";
Expand Down
24 changes: 4 additions & 20 deletions services/rotor/src/lib/functions-chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import {
createTtlStore,
EnrichedConnectionConfig,
EntityStore,
FuncChainResult,
FunctionChainContext,
FunctionConfig,
FunctionContext,
FunctionExecLog,
getBuiltinFunction,
isDropResult,
JitsuFunctionWrapper,
Expand Down Expand Up @@ -69,25 +71,6 @@ udfCache.on("del", (key, value) => {
value.wrapper?.close();
});

export type FuncChainResult = {
connectionId?: string;
events: AnyEvent[];
execLog: FunctionExecLog;
};

export type FunctionExecRes = {
receivedAt?: any;
eventIndex: number;
event?: any;
metricsMeta?: MetricsMeta;
functionId: string;
error?: any;
dropped?: boolean;
ms: number;
};

export type FunctionExecLog = FunctionExecRes[];

export function checkError(chainRes: FuncChainResult) {
for (const el of chainRes.execLog) {
if (el.error && (el.error.name === DropRetryErrorName || el.error.name === RetryErrorName)) {
Expand Down Expand Up @@ -139,7 +122,8 @@ export function buildFunctionChain(
connection.workspaceId,
mongodb,
false,
fastStoreWorkspaceId.includes(connection.workspaceId)
fastStoreWorkspaceId.includes(connection.workspaceId),
rotorContext.metrics
);
if (rotorContext.redisClient) {
store = createMultiStore(store, createTtlStore(connection.workspaceId, rotorContext.redisClient));
Expand Down
4 changes: 2 additions & 2 deletions services/rotor/src/lib/message-handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { getLog, requireDefined } from "juava";
import { Metrics } from "./metrics";
import { GeoResolver } from "./maxmind";
import { IngestMessage } from "@jitsu/protocols/async-request";
import { CONNECTION_IDS_HEADER } from "./rotor";
Expand All @@ -17,6 +16,7 @@ import {
EnrichedConnectionConfig,
FunctionConfig,
WorkspaceWithProfiles,
RotorMetrics,
} from "@jitsu/core-functions";
import NodeCache from "node-cache";
import { buildFunctionChain, checkError, FuncChain, FuncChainFilter, runChain } from "./functions-chain";
Expand All @@ -35,7 +35,7 @@ export type MessageHandlerContext = {
functionsStore: EntityStore<FunctionConfig>;
workspaceStore: EntityStore<WorkspaceWithProfiles>;
eventsLogger: EventsStore;
metrics?: Metrics;
metrics?: RotorMetrics;
geoResolver?: GeoResolver;
dummyPersistentStore?: TTLStore;
redisClient?: Redis;
Expand Down
30 changes: 20 additions & 10 deletions services/rotor/src/lib/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import { getLog, requireDefined, stopwatch } from "juava";
import { FunctionExecLog, FunctionExecRes } from "./functions-chain";
import fetch from "node-fetch-commonjs";
import { MetricsMeta, httpAgent, httpsAgent } from "@jitsu/core-functions";
import {
FunctionExecLog,
FunctionExecRes,
MetricsMeta,
httpAgent,
httpsAgent,
RotorMetrics,
} from "@jitsu/core-functions";

import omit from "lodash/omit";
import type { Producer } from "kafkajs";
import { getCompressionType } from "./rotor";
import { Readable } from "stream";
import { randomUUID } from "node:crypto";
import { Counter } from "prom-client";

const log = getLog("metrics");
const bulkerBase = requireDefined(process.env.BULKER_URL, "env BULKER_URL is not defined");
Expand All @@ -27,17 +33,16 @@ type MetricsEvent = MetricsMeta & {
events: number;
};

export interface Metrics {
logMetrics: (execLog: FunctionExecLog) => void;
close: () => void;
}

export const DummyMetrics: Metrics = {
export const DummyMetrics: RotorMetrics = {
logMetrics: () => {},
storeStatus: () => {},
close: () => {},
};

export function createMetrics(producer?: Producer): Metrics {
export function createMetrics(
producer?: Producer,
storeCounter?: Counter<"namespace" | "operation" | "status">
): RotorMetrics {
const buffer: MetricsEvent[] = [];

const flush = async (buf: MetricsEvent[]) => {
Expand Down Expand Up @@ -199,6 +204,11 @@ export function createMetrics(producer?: Producer): Metrics {
buffer.length = 0;
}
},
storeStatus: (namespace: string, operation: string, status: string) => {
if (storeCounter) {
storeCounter.labels(namespace, operation, status).inc();
}
},
close: () => {
clearInterval(interval);
},
Expand Down
20 changes: 12 additions & 8 deletions services/rotor/src/lib/rotor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import dayjs from "dayjs";
import utc from "dayjs/plugin/utc";
dayjs.extend(utc);
import { getRetryPolicy, retryBackOffTime, retryLogMessage } from "./retries";
import { createMetrics, Metrics } from "./metrics";
import { FuncChainFilter, FuncChainResult } from "./functions-chain";
import { createMetrics } from "./metrics";
import { FuncChainFilter } from "./functions-chain";
import type { Admin, Consumer, Producer, KafkaMessage } from "kafkajs";
import { CompressionTypes } from "kafkajs";
import { functionFilter, MessageHandlerContext } from "./message-handler";
import { connectionsStore, functionsStore, workspaceStore } from "./repositories";
import { RotorMetrics, FuncChainResult } from "@jitsu/core-functions";

const log = getLog("kafka-rotor");

Expand Down Expand Up @@ -43,7 +44,7 @@ export type KafkaRotorConfig = {
};

export type KafkaRotor = {
start: () => Promise<Metrics>;
start: () => Promise<RotorMetrics>;
close: () => Promise<void>;
};

Expand All @@ -54,7 +55,7 @@ export function kafkaRotor(cfg: KafkaRotorConfig): KafkaRotor {
let admin: Admin;
let closeQueue: () => Promise<void>;
let interval: any;
let metrics: Metrics;
let metrics: RotorMetrics;
return {
start: async () => {
const kafka = connectToKafka({ defaultAppId: kafkaClientId, ...cfg.credentials });
Expand All @@ -69,7 +70,12 @@ export function kafkaRotor(cfg: KafkaRotorConfig): KafkaRotor {

producer = kafka.producer({ allowAutoTopicCreation: false });
await producer.connect();
metrics = createMetrics(producer);
const storeErrors = new Prometheus.Counter({
name: "rotor_store_statuses",
help: "rotor store statuses",
labelNames: ["namespace", "operation", "status"] as const,
});
metrics = createMetrics(producer, storeErrors);
admin = kafka.admin();

const topicOffsets = new Prometheus.Gauge({
Expand All @@ -87,21 +93,19 @@ export function kafkaRotor(cfg: KafkaRotorConfig): KafkaRotor {
const messagesProcessed = new Prometheus.Counter({
name: "rotor_messages_processed",
help: "messages processed",
// add `as const` here to enforce label names
labelNames: ["topic", "partition"] as const,
});
const messagesRequeued = new Prometheus.Counter({
name: "rotor_messages_requeued",
help: "messages requeued",
// add `as const` here to enforce label names
labelNames: ["topic"] as const,
});
const messagesDeadLettered = new Prometheus.Counter({
name: "rotor_messages_dead_lettered",
help: "messages dead lettered",
// add `as const` here to enforce label names
labelNames: ["topic"] as const,
});

interval = setInterval(async () => {
try {
for (const topic of kafkaTopics) {
Expand Down
4 changes: 2 additions & 2 deletions types/protocols/functions.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export interface Store {
ttl(key: string): Promise<number>;
}

export interface Metrics {
export interface FunctionMetrics {
counter(name: string): {
//increment / decrement counter. Supports negative values
inc: (value: number) => void;
Expand Down Expand Up @@ -70,7 +70,7 @@ export type FunctionContext<P extends AnyProps = AnyProps> = {
log: FunctionLogger;
fetch: FetchType;
store: TTLStore;
metrics?: Metrics;
metrics?: FunctionMetrics;
props: P;
};

Expand Down

0 comments on commit 5d1d287

Please sign in to comment.