-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Event property counter #7500
Event property counter #7500
Changes from 38 commits
598c213
b609261
01307be
37aad8c
ed359ea
8f80f87
17cb696
304a740
586c39b
d738f9b
4cbc1cc
1ad5949
b9be65f
2227052
4a1da7c
e64062a
129532b
ba1b831
2c3c27d
f69e68c
1c2a777
356d341
6215919
30ddc74
48842e1
8e7429f
7162157
6434800
19c12e1
1ff7d94
6486d70
4895947
4026f71
12d3cc4
07b20ee
9f16444
c1dc433
114fd2d
523fddd
97fdd2d
7a5a8a0
a61884b
20e893b
3ce42cc
0d42e92
fbe080e
8e961b8
aa638e7
e7772ab
8b01a16
d070faa
f0f4f36
a39253a
cd463f4
5db0f2d
c5469cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,9 +15,9 @@ export function getDefaultConfig(): PluginsServerConfig { | |
return { | ||
CELERY_DEFAULT_QUEUE: 'celery', | ||
DATABASE_URL: isTestEnv | ||
? 'postgres://localhost:5432/test_posthog' | ||
? 'postgres://posthog:posthog@localhost:5432/test_posthog' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as for settings.py There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For test/dev environments, this is good and aligns with our new "developing locally" guidelines. |
||
: isDevEnv | ||
? 'postgres://localhost:5432/posthog' | ||
? 'postgres://posthog:posthog@localhost:5432/posthog' | ||
: null, | ||
POSTHOG_DB_NAME: null, | ||
POSTHOG_DB_USER: 'postgres', | ||
|
@@ -78,6 +78,7 @@ export function getDefaultConfig(): PluginsServerConfig { | |
SITE_URL: null, | ||
NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS: '', | ||
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: true, | ||
EXPERIMENTAL_EVENT_PROPERTY_COUNTER_ENABLED_TEAMS: '', | ||
} | ||
} | ||
|
||
|
@@ -135,7 +136,9 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> { | |
'(advanced) corresponds to the length of time a piscina worker should block for when looking for tasks', | ||
NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS: | ||
'(advanced) teams for which to run the new person properties update flow on', | ||
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: 'enable experimental feature to track lastSeenAt', | ||
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: '(advanced) enable experimental feature to track lastSeenAt', | ||
EXPERIMENTAL_EVENT_PROPERTY_COUNTER_ENABLED_TEAMS: | ||
'(advanced) teams for which to enable experimental feature to count event properties', | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,7 @@ export async function startPluginsServer( | |
let piscinaStatsJob: schedule.Job | undefined | ||
let internalMetricsStatsJob: schedule.Job | undefined | ||
let flushLastSeenAtCacheJob: schedule.Job | undefined | ||
let flushEventPropertyCounterJob: schedule.Job | undefined | ||
let pluginMetricsJob: schedule.Job | undefined | ||
let piscina: Piscina | undefined | ||
let queue: Queue | undefined // ingestion queue | ||
|
@@ -81,6 +82,8 @@ export async function startPluginsServer( | |
statusReport.stopStatusReportSchedule() | ||
piscinaStatsJob && schedule.cancelJob(piscinaStatsJob) | ||
internalMetricsStatsJob && schedule.cancelJob(internalMetricsStatsJob) | ||
flushLastSeenAtCacheJob && schedule.cancelJob(flushLastSeenAtCacheJob) | ||
flushEventPropertyCounterJob && schedule.cancelJob(flushEventPropertyCounterJob) | ||
await jobQueueConsumer?.stop() | ||
await scheduleControl?.stopSchedule() | ||
await new Promise<void>((resolve, reject) => | ||
|
@@ -198,10 +201,17 @@ export async function startPluginsServer( | |
}) | ||
} | ||
|
||
// every minute flush lastSeenAt cache | ||
// every 10 seconds past the minute flush lastSeenAt cache | ||
if (serverConfig.EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED) { | ||
flushLastSeenAtCacheJob = schedule.scheduleJob('0 * * * * *', async () => { | ||
await hub!.teamManager.flushLastSeenAtCache() | ||
flushLastSeenAtCacheJob = schedule.scheduleJob('10 * * * * *', async () => { | ||
await piscina!.broadcastTask({ task: 'flushLastSeenAtCache' }) | ||
}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be performed on the worker threads. The local teamManager doesn't get any events passing through it. At least it shouldn't. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You know better here, don't have all the context to make the right call |
||
} | ||
|
||
// every 20 seconds past the minute flush the event property counter cache | ||
if (serverConfig.EXPERIMENTAL_EVENT_PROPERTY_COUNTER_ENABLED_TEAMS) { | ||
flushEventPropertyCounterJob = schedule.scheduleJob('20 * * * * *', async () => { | ||
await piscina!.broadcastTask({ task: 'flushEventPropertyCounter' }) | ||
}) | ||
} | ||
|
||
|
@@ -268,6 +278,7 @@ export async function stopPiscina(piscina: Piscina): Promise<void> { | |
await Promise.all([ | ||
piscina.broadcastTask({ task: 'flushKafkaMessages' }), | ||
piscina.broadcastTask({ task: 'flushLastSeenAtCache' }), | ||
piscina.broadcastTask({ task: 'flushEventPropertyCounter' }), | ||
delay(2000), | ||
]) | ||
await piscina.destroy() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,8 +36,9 @@ export class PostgresLogsWrapper { | |
this.flushTimeout = null | ||
} | ||
if (this.logs.length > 0) { | ||
await this.db.batchInsertPostgresLogs(this.logs) | ||
const logs = this.logs | ||
this.logs = [] | ||
await this.db.batchInsertPostgresLogs(logs) | ||
Comment on lines
-39
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Flyby fix to reduce a lot of noise (and errors?) in test logs when this flush gets triggered too quickly. |
||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
import { Properties } from '@posthog/plugin-scaffold' | ||
import { StatsD } from 'hot-shots' | ||
import { DateTime } from 'luxon' | ||
|
||
import { TeamId } from '../../types' | ||
import { DB } from '../../utils/db/db' | ||
import { timeoutGuard } from '../../utils/db/utils' | ||
import { status } from '../../utils/status' | ||
|
||
enum EventPropertyType { | ||
Number = 'NUMBER', | ||
String = 'STRING', | ||
Boolean = 'BOOLEAN', | ||
DateTime = 'DATETIME', | ||
} | ||
|
||
interface EventPropertiesCounter { | ||
createdAt: number | ||
lastSeenAt: number | ||
totalVolume: number | ||
propertyType: EventPropertyType | null | ||
propertyTypeFormat: string | null | ||
} | ||
|
||
interface EventNamePropertiesBuffer { | ||
totalVolume: number | ||
uniqueVolume: number | ||
buffer: Map<TeamId, Map<string, EventPropertiesCounter>> | ||
} | ||
|
||
export class EventPropertyCounter { | ||
db: DB | ||
eventPropertiesBuffer: EventNamePropertiesBuffer | ||
lastFlushAt: DateTime | ||
statsd?: StatsD | ||
|
||
constructor(db: DB, statsd?: StatsD) { | ||
this.db = db | ||
this.statsd = statsd | ||
this.eventPropertiesBuffer = { totalVolume: 0, uniqueVolume: 0, buffer: new Map() } | ||
this.lastFlushAt = DateTime.now() | ||
} | ||
|
||
public async updateEventPropertyCounter(teamId: number, event: string, properties: Properties): Promise<void> { | ||
this.updateEventPropertiesBuffer(teamId, event, properties) | ||
// Flush every 2 minutes or 50k unique properties, whichever comes first. | ||
// Additionally, a flush is broadcast every 1 minute from pluginServer. | ||
if ( | ||
this.eventPropertiesBuffer.uniqueVolume > 50000 || | ||
DateTime.now().diff(this.lastFlushAt).as('seconds') > 120 | ||
) { | ||
await this.flush() | ||
} | ||
} | ||
|
||
/** Save information about event properties into a custom buffer */ | ||
public updateEventPropertiesBuffer(teamId: number, event: string, properties: Record<string, any>): void { | ||
const timestamp = DateTime.now().toSeconds() | ||
let bufferForTeam = this.eventPropertiesBuffer.buffer.get(teamId) | ||
if (!bufferForTeam) { | ||
bufferForTeam = new Map() | ||
this.eventPropertiesBuffer.buffer.set(teamId, bufferForTeam) | ||
} | ||
for (const [property, value] of Object.entries(properties)) { | ||
const key = JSON.stringify([event, property]) | ||
let propertyBuffer = bufferForTeam.get(key) | ||
if (!propertyBuffer) { | ||
propertyBuffer = { | ||
createdAt: timestamp, | ||
lastSeenAt: timestamp, | ||
totalVolume: 1, | ||
propertyType: null, | ||
propertyTypeFormat: null, | ||
} | ||
bufferForTeam.set(key, propertyBuffer) | ||
this.eventPropertiesBuffer.uniqueVolume += 1 | ||
} else { | ||
propertyBuffer.createdAt = Math.min(timestamp, propertyBuffer.createdAt) | ||
propertyBuffer.lastSeenAt = Math.max(timestamp, propertyBuffer.lastSeenAt) | ||
propertyBuffer.totalVolume += 1 | ||
} | ||
this.eventPropertiesBuffer.totalVolume += 1 | ||
|
||
propertyBuffer.propertyType = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens when we have outlier property values? e.g. I have a bug that every once in a while I send an invalid value (e.g. for a timestamp)? Can we base this based on the format of the majority instead of the latest? |
||
typeof value === 'number' | ||
? EventPropertyType.Number | ||
: typeof value === 'boolean' | ||
? EventPropertyType.Boolean | ||
: typeof value === 'string' | ||
? EventPropertyType.String | ||
: null | ||
propertyBuffer.propertyTypeFormat = null | ||
|
||
if (propertyBuffer.propertyType === EventPropertyType.String) { | ||
const dateFormat = detectDateFormat(value) | ||
if (dateFormat) { | ||
propertyBuffer.propertyType = EventPropertyType.DateTime | ||
propertyBuffer.propertyTypeFormat = dateFormat | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We won't be doing much with this data right now since the last plan was to base this off of the property definitions table (so set once for all events). Should we thus keep these fields around? I think it might be nice to have, and we will eventually get someone who wants more granularity. But... 🤔 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we shouldn't add them if we're not using them straight away (or we should have clear sign-posting for someone who finds them about what they are there for) |
||
} | ||
} | ||
|
||
public async flush(): Promise<void> { | ||
if (this.eventPropertiesBuffer.totalVolume === 0) { | ||
return | ||
} | ||
const timeout = timeoutGuard( | ||
`Still flushing the event names and properties buffer. Timeout warning after 30 sec!` | ||
) | ||
try { | ||
const startTime = DateTime.now() | ||
const lastFlushedSecondsAgo = DateTime.now().diff(this.lastFlushAt).as('seconds') | ||
const cacheSize = this.eventPropertiesBuffer.uniqueVolume | ||
status.info( | ||
`🚽 Starting flushEventPropertyCounter. Cache size: ${cacheSize} items. Last flushed: ${lastFlushedSecondsAgo} seconds ago.` | ||
) | ||
|
||
const oldBuffer = this.eventPropertiesBuffer | ||
this.eventPropertiesBuffer = { totalVolume: 0, uniqueVolume: 0, buffer: new Map() } | ||
this.lastFlushAt = DateTime.now() | ||
|
||
const columns: any[][] = [[], [], [], [], [], [], [], []] | ||
|
||
for (const [teamId, teamBuffer] of oldBuffer.buffer.entries()) { | ||
for (const [key, propertyBuffer] of teamBuffer.entries()) { | ||
const [event, property] = JSON.parse(key) | ||
const { propertyType, propertyTypeFormat, totalVolume, lastSeenAt, createdAt } = propertyBuffer | ||
columns[0].push(teamId) | ||
columns[1].push(event) | ||
columns[2].push(property) | ||
columns[3].push(propertyType) | ||
columns[4].push(propertyTypeFormat) | ||
columns[5].push(totalVolume) | ||
columns[6].push(DateTime.fromSeconds(createdAt).toUTC()) | ||
columns[7].push(DateTime.fromSeconds(lastSeenAt).toUTC()) | ||
} | ||
} | ||
|
||
// VALUES ${queryValues.join(',')} | ||
await this.db.postgresQuery( | ||
`INSERT INTO posthog_eventproperty(team_id, event, property, property_type, property_type_format, total_volume, created_at, last_seen_at) | ||
SELECT * FROM UNNEST ($1::int[], $2::text[], $3::text[], $4::text[], $5::text[], $6::bigint[], $7::timestamp with time zone[], $8::timestamp with time zone[]) | ||
ON CONFLICT ON CONSTRAINT posthog_eventproperty_team_id_event_property_10910b3b_uniq DO UPDATE SET | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Worth giving the index a name in the migration instead? |
||
total_volume = posthog_eventproperty.total_volume + excluded.total_volume, | ||
created_at = LEAST(posthog_eventproperty.created_at, excluded.created_at), | ||
last_seen_at = GREATEST(posthog_eventproperty.last_seen_at, excluded.last_seen_at), | ||
property_type = CASE WHEN posthog_eventproperty.property_type IS NULL THEN excluded.property_type ELSE posthog_eventproperty.property_type END, | ||
property_type_format = CASE WHEN posthog_eventproperty.property_type_format IS NULL THEN excluded.property_type_format ELSE posthog_eventproperty.property_type_format END | ||
`, | ||
columns, | ||
'eventPropertyCounterFlush' | ||
) | ||
|
||
const elapsedTime = DateTime.now().diff(startTime).as('milliseconds') | ||
this.statsd?.set('flushEventPropertyCounter.Size', cacheSize) | ||
this.statsd?.set('flushEventPropertyCounter.QuerySize', columns[0].length) | ||
this.statsd?.timing('flushEventPropertyCounter', elapsedTime) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: postgresQuery automatically has this, so this is kind of redundant. |
||
status.info(`✅ 🚽 flushEventPropertyCounter finished successfully in ${elapsedTime} ms.`) | ||
} finally { | ||
clearTimeout(timeout) | ||
} | ||
} | ||
} | ||
|
||
export function detectDateFormat(value: string): string | void { | ||
if (value.match(/^\d{4}-\d{2}-\d{2}$/)) { | ||
return 'YYYY-MM-DD' | ||
} | ||
|
||
if (value.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(.\d{1,3}?Z?|)$/)) { | ||
return 'ISO8601 UTC' | ||
} | ||
|
||
if (value.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(.\d{1,3}|)\+\d{2}:\d{2}$/)) { | ||
return 'ISO8601 TZ' | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What more date formats do we need to know about? Regular unix timestamps perhaps? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ClickHouse docs list the formats they can handle here https://clickhouse.com/docs/en/sql-reference/functions/type-conversion-functions/#parsedatetimebesteffort There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think timestamps are pretty common too. I added a comment elsewhere, do wonder if this is required or CH is smart enough to parse them correctly when used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ClickHouse `parseDateTimeBestEffort is pretty clever #6619 (comment) The type and format (in the short to medium term) will be most useful in the UI. E.g. if it's a timestamp and not simply a number we can display it as a date, and tell the API it is a date. See #7608 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just going to patch these as I find them... Probably should just get #7675 in as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update deployment docs then? Seems like we need to create this username with a specific password, also to detail what permissions are needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #7730 I've proposed a slightly different approach so that people with env vars set get to keep them. And people without get sane defaults.
I haven't extended that to plugin-server since it's being changed here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However I solved this now by replacing
With