From e3cea995c72b48a184ebce2f10e96e917c445e26 Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Tue, 28 Jan 2025 15:54:51 +0400 Subject: [PATCH] rotor: fill source name in function's EventContext rotor: don't count events produced by user recognition as multiplied events --- .../__tests__/user-recognition.test.ts | 3 ++ .../src/functions/bulker-destination.ts | 32 ++++++++++++------- .../src/functions/lib/udf_wrapper.ts | 1 + .../src/functions/user-recognition.ts | 6 +++- libs/core-functions/src/index.ts | 1 + libs/core-functions/src/lib/config-types.ts | 1 + services/rotor/src/lib/functions-chain.ts | 4 ++- services/rotor/src/lib/message-handler.ts | 1 + services/rotor/src/lib/metrics.ts | 4 +-- .../pages/api/admin/export/[name]/index.ts | 1 + 10 files changed, 39 insertions(+), 15 deletions(-) diff --git a/libs/core-functions/__tests__/user-recognition.test.ts b/libs/core-functions/__tests__/user-recognition.test.ts index 91467cc1e..8c2414ae8 100644 --- a/libs/core-functions/__tests__/user-recognition.test.ts +++ b/libs/core-functions/__tests__/user-recognition.test.ts @@ -55,6 +55,7 @@ const expectedEvents: AnalyticsServerEvent[] = [ }, { messageId: "1", + _JITSU_UR_MESSAGE_ID: "4", type: "page", anonymousId: "anon1", userId: "user1", @@ -67,6 +68,7 @@ const expectedEvents: AnalyticsServerEvent[] = [ }, { messageId: "2", + _JITSU_UR_MESSAGE_ID: "4", type: "page", anonymousId: "anon1", userId: "user1", @@ -79,6 +81,7 @@ const expectedEvents: AnalyticsServerEvent[] = [ }, { messageId: "3", + _JITSU_UR_MESSAGE_ID: "4", type: "page", anonymousId: "anon1", userId: "user1", diff --git a/libs/core-functions/src/functions/bulker-destination.ts b/libs/core-functions/src/functions/bulker-destination.ts index 2935be221..a36f92215 100644 --- a/libs/core-functions/src/functions/bulker-destination.ts +++ b/libs/core-functions/src/functions/bulker-destination.ts @@ -13,6 +13,9 @@ import { AnalyticsServerEvent, DataLayoutType } from "@jitsu/protocols/analytics import omit from "lodash/omit"; import { MetricsMeta } from "./lib"; +import { UserRecognitionParameter } from "./user-recognition"; + +const JitsuInternalProperties = [TableNameParameter, UserRecognitionParameter]; export type MappedEvent = { event: any; @@ -25,7 +28,7 @@ export type DataLayoutImpl = ( export function jitsuLegacy(event: AnalyticsServerEvent, ctx: FullContext): MappedEvent { const flat = toJitsuClassic(event, ctx); - return { event: omit(flat, TableNameParameter), table: event[TableNameParameter] ?? "events" }; + return { event: omit(flat, JitsuInternalProperties), table: event[TableNameParameter] ?? "events" }; } export function segmentLayout( @@ -56,7 +59,7 @@ export function segmentLayout( event.context?.groupId || event.traits?.groupId || event.context?.traits?.groupId ); transferFunc(transformed, event.properties); - transferFunc(transformed, event, ["context", "properties", "traits", "type", TableNameParameter]); + transferFunc(transformed, event, ["context", "properties", "traits", "type", ...JitsuInternalProperties]); } else { transformed = { context: {}, @@ -65,7 +68,7 @@ export function segmentLayout( transferFunc(transformed, event.properties); transferFunc(transformed, event.context?.traits); transferFunc(transformed, event.traits); - transferFunc(transformed, event, ["context", "properties", "traits", "type", TableNameParameter]); + transferFunc(transformed, event, ["context", "properties", "traits", "type", ...JitsuInternalProperties]); } break; case "group": @@ -79,7 +82,14 @@ export function segmentLayout( transferFunc(transformed.context.group, event.traits); transferValueFunc(transformed.context, "group_id", event.groupId); transferFunc(transformed, event.properties); - transferFunc(transformed, event, ["context", "properties", "traits", "type", "groupId", TableNameParameter]); + transferFunc(transformed, event, [ + "context", + "properties", + "traits", + "type", + "groupId", + ...JitsuInternalProperties, + ]); } else { transformed = { context: {}, @@ -87,7 +97,7 @@ export function segmentLayout( transferFunc(transformed.context, event.context, ["traits"]); transferFunc(transformed, event.properties); transferFunc(transformed, event.traits); - transferFunc(transformed, event, ["context", "properties", "traits", "type", TableNameParameter]); + transferFunc(transformed, event, ["context", "properties", "traits", "type", ...JitsuInternalProperties]); } break; case "track": @@ -102,13 +112,13 @@ export function segmentLayout( transferFunc(transformed.context.traits, event.properties?.traits, ["groupId"]); transferValueFunc(transformed.context, "group_id", event.context?.groupId || event.context?.traits?.groupId); transferFunc(transformed, event.properties, ["traits"]); - transferFunc(transformed, event, ["context", "properties", "type", TableNameParameter]); + transferFunc(transformed, event, ["context", "properties", "type", ...JitsuInternalProperties]); } else { baseTrackFlat = {}; - transferFunc(baseTrackFlat, event, ["properties", "type", TableNameParameter]); + transferFunc(baseTrackFlat, event, ["properties", "type", ...JitsuInternalProperties]); transformed = {}; transferFunc(transformed, event.properties); - transferFunc(transformed, event, ["properties", "type", TableNameParameter]); + transferFunc(transformed, event, ["properties", "type", ...JitsuInternalProperties]); } break; default: @@ -122,11 +132,11 @@ export function segmentLayout( transferFunc(transformed.context.traits, event.context?.traits, ["groupId"]); transferValueFunc(transformed.context, "group_id", event.context?.groupId || event.context?.traits?.groupId); transferFunc(transformed, event.properties); - transferFunc(transformed, event, ["context", "properties", TableNameParameter]); + transferFunc(transformed, event, ["context", "properties", ...JitsuInternalProperties]); } else { transformed = {}; transferFunc(transformed, event.properties); - transferFunc(transformed, event, ["properties", TableNameParameter]); + transferFunc(transformed, event, ["properties", ...JitsuInternalProperties]); } } if (event[TableNameParameter]) { @@ -167,7 +177,7 @@ export const dataLayouts: Record> = { segment: (event, ctx) => segmentLayout(event, false, ctx), "segment-single-table": (event, ctx) => segmentLayout(event, true, ctx), "jitsu-legacy": jitsuLegacy, - passthrough: event => ({ event: omit(event, TableNameParameter), table: event[TableNameParameter] ?? "events" }), + passthrough: event => ({ event: omit(event, JitsuInternalProperties), table: event[TableNameParameter] ?? "events" }), }; export type BulkerDestinationConfig = { diff --git a/libs/core-functions/src/functions/lib/udf_wrapper.ts b/libs/core-functions/src/functions/lib/udf_wrapper.ts index a07015866..9f2ccf717 100644 --- a/libs/core-functions/src/functions/lib/udf_wrapper.ts +++ b/libs/core-functions/src/functions/lib/udf_wrapper.ts @@ -396,6 +396,7 @@ export async function UDFTestRun({ headers: {}, source: { id: "functionsDebugger-streamId", + name: "Functions Debugger Stream", type: "browser", }, destination: { diff --git a/libs/core-functions/src/functions/user-recognition.ts b/libs/core-functions/src/functions/user-recognition.ts index b2b206ea3..ccdf4608b 100644 --- a/libs/core-functions/src/functions/user-recognition.ts +++ b/libs/core-functions/src/functions/user-recognition.ts @@ -5,6 +5,8 @@ import get from "lodash/get"; import set from "lodash/set"; import merge from "lodash/merge"; +export const UserRecognitionParameter = "_JITSU_UR_MESSAGE_ID"; + export const UserRecognitionConfig = z.object({ /** * Where to look for anonymous id, an array of JSON paths @@ -61,7 +63,9 @@ const UserRecognitionFunction: JitsuFunction { return evs.map(anonEvent => { //merge anonymous event with identified fields - return merge(anonEvent, identifiedFields); + const merged = merge(anonEvent, identifiedFields); + merged[UserRecognitionParameter] = event.messageId; + return merged; }); }); if (res.length === 0) { diff --git a/libs/core-functions/src/index.ts b/libs/core-functions/src/index.ts index bf31e140b..14ddb1894 100644 --- a/libs/core-functions/src/index.ts +++ b/libs/core-functions/src/index.ts @@ -60,6 +60,7 @@ export function isDropResult(result: FuncReturn): boolean { } export * as bulkerDestination from "./functions/bulker-destination"; +export { UserRecognitionParameter } from "./functions/user-recognition"; export { UDFWrapper, UDFTestRun } from "./functions/lib/udf_wrapper"; export type { UDFTestRequest, UDFTestResponse, logType } from "./functions/lib/udf_wrapper"; export { ProfileUDFWrapper, ProfileUDFTestRun, mergeUserTraits } from "./functions/lib/profiles-udf-wrapper"; diff --git a/libs/core-functions/src/lib/config-types.ts b/libs/core-functions/src/lib/config-types.ts index 239fc2242..818dcaa29 100644 --- a/libs/core-functions/src/lib/config-types.ts +++ b/libs/core-functions/src/lib/config-types.ts @@ -42,6 +42,7 @@ export type EnrichedConnectionConfig = { updatedAt?: Date; destinationId: string; streamId: string; + streamName?: string; metricsKeyPrefix: string; usesBulker: boolean; //destinationType diff --git a/services/rotor/src/lib/functions-chain.ts b/services/rotor/src/lib/functions-chain.ts index fc73fe8f8..87819c360 100644 --- a/services/rotor/src/lib/functions-chain.ts +++ b/services/rotor/src/lib/functions-chain.ts @@ -18,6 +18,7 @@ import { MetricsMeta, mongodb, UDFWrapper, + UserRecognitionParameter, wrapperFunction, } from "@jitsu/core-functions"; import Prometheus from "prom-client"; @@ -290,7 +291,8 @@ export async function runChain( const sw = stopwatch(); const rat = new Date(event.receivedAt); const execLogMeta = { - eventIndex: i, + // we don't multiply active incoming metrics for events produced by user recognition + eventIndex: event[UserRecognitionParameter] ? 0 : i, receivedAt: !isNaN(rat.getTime()) ? rat : new Date(), functionId: f.id, metricsMeta: metricsMeta, diff --git a/services/rotor/src/lib/message-handler.ts b/services/rotor/src/lib/message-handler.ts index a7f91ad57..c60f9f734 100644 --- a/services/rotor/src/lib/message-handler.ts +++ b/services/rotor/src/lib/message-handler.ts @@ -102,6 +102,7 @@ export async function rotorMessageHandler( source: { type: message.ingestType, id: connection.streamId, + name: connection.streamName, domain: message.origin?.domain, }, destination: { diff --git a/services/rotor/src/lib/metrics.ts b/services/rotor/src/lib/metrics.ts index 8c026be46..b4aea21a6 100644 --- a/services/rotor/src/lib/metrics.ts +++ b/services/rotor/src/lib/metrics.ts @@ -98,12 +98,12 @@ export function createMetrics( JSON.stringify({ timestamp: e.timestamp, workspaceId: e.workspaceId, - messageId: e.messageId, + messageId: e.key, }) ); streamOld.push("\n"); } - stream.push(JSON.stringify(omit(e, "retries", "messageId"))); + stream.push(JSON.stringify(omit(e, "retries", "messageId", "key"))); stream.push("\n"); }); //close stream diff --git a/webapps/console/pages/api/admin/export/[name]/index.ts b/webapps/console/pages/api/admin/export/[name]/index.ts index e371aefd6..deeb66aa2 100644 --- a/webapps/console/pages/api/admin/export/[name]/index.ts +++ b/webapps/console/pages/api/admin/export/[name]/index.ts @@ -249,6 +249,7 @@ const exports: Export[] = [ type: destinationType, workspaceId: workspace.id, streamId: from.id, + streamName: from.config?.name, destinationId: to.id, usesBulker: !!coreDestinationType?.usesBulker, options: {