Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event property counter #7500

Merged
merged 56 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
598c213
create event property model
mariusandra Dec 3, 2021
b609261
add null
mariusandra Dec 3, 2021
01307be
rename cache vars
mariusandra Dec 3, 2021
37aad8c
update event properties table on ingestion
mariusandra Dec 3, 2021
ed359ea
match date formats
mariusandra Dec 3, 2021
8f80f87
match date formats
mariusandra Dec 3, 2021
17cb696
better string handling
mariusandra Dec 3, 2021
304a740
property type can be null too
mariusandra Dec 3, 2021
586c39b
pass event timestamp
mariusandra Dec 3, 2021
d738f9b
update property type later
mariusandra Dec 3, 2021
4cbc1cc
perform all updates through a buffer object
mariusandra Dec 3, 2021
1ad5949
move to EventPropertyCounter
mariusandra Dec 3, 2021
b9be65f
Merge branch 'master' into event-property-counter
mariusandra Dec 14, 2021
2227052
fix migration
mariusandra Dec 14, 2021
4a1da7c
improve flush last seen at job
mariusandra Dec 14, 2021
e64062a
flush job periodically + env
mariusandra Dec 14, 2021
129532b
upsert all event properties in 1 query
mariusandra Dec 14, 2021
ba1b831
log to statsd
mariusandra Dec 14, 2021
2c3c27d
enable property counter only if experimental mode enabled
mariusandra Dec 14, 2021
f69e68c
use now() instead of event timestamp
mariusandra Dec 14, 2021
1c2a777
fix seconds
mariusandra Dec 14, 2021
356d341
Merge branch 'master' into event-property-counter
mariusandra Dec 14, 2021
6215919
add user/pass for default postgres
mariusandra Dec 14, 2021
30ddc74
add tests
mariusandra Dec 14, 2021
48842e1
use big integers
mariusandra Dec 14, 2021
8e7429f
make query work with 50k props
mariusandra Dec 14, 2021
7162157
processing events saves event properties
mariusandra Dec 14, 2021
6434800
fix script
mariusandra Dec 14, 2021
19c12e1
test date format detection
mariusandra Dec 14, 2021
1ff7d94
default enabled
mariusandra Dec 14, 2021
6486d70
only enable event property counter for specific teams
mariusandra Dec 14, 2021
4895947
Merge branch 'master' into event-property-counter
mariusandra Dec 14, 2021
4026f71
eslint fixes
mariusandra Dec 14, 2021
12d3cc4
Merge branch 'master' into event-property-counter
mariusandra Dec 14, 2021
07b20ee
fix logs double-sync noise in tests
mariusandra Dec 14, 2021
9f16444
fix bigint test
mariusandra Dec 14, 2021
c1dc433
don't do tasks that make no sense
mariusandra Dec 14, 2021
114fd2d
remove dead code
mariusandra Dec 14, 2021
523fddd
simpler test setup
mariusandra Dec 16, 2021
97fdd2d
different contraint name
mariusandra Dec 16, 2021
7a5a8a0
refactor team manager
mariusandra Dec 16, 2021
a61884b
greatly simplify the system
mariusandra Dec 16, 2021
20e893b
fetch cached event properties
mariusandra Dec 16, 2021
3ce42cc
fix team manager and timestamps
mariusandra Dec 16, 2021
0d42e92
add cached entry
mariusandra Dec 16, 2021
fbe080e
also don't cache event properties for teams that have it disabled
mariusandra Dec 16, 2021
8e961b8
remove indexes that are not going to be used
mariusandra Dec 16, 2021
aa638e7
Merge branch 'master' into event-property-counter
mariusandra Dec 16, 2021
e7772ab
remove unused imports
mariusandra Dec 16, 2021
8b01a16
blacked
mariusandra Dec 16, 2021
d070faa
remember event properties with a LRU cache
mariusandra Dec 17, 2021
f0f4f36
fix eslint
mariusandra Dec 17, 2021
a39253a
clean up the last bits
mariusandra Dec 17, 2021
cd463f4
move ONE_HOUR to constants
mariusandra Dec 17, 2021
5db0f2d
use sane_repr
mariusandra Dec 17, 2021
c5469cd
fix typo
mariusandra Dec 17, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ auth: 0012_alter_user_first_name_max_length
axes: 0006_remove_accesslog_trusted
contenttypes: 0002_remove_content_type_name
ee: 0005_project_based_permissioning
posthog: 0191_rename_specialmigration_asyncmigration
posthog: 0192_event_properties
rest_hooks: 0002_swappable_hook_model
sessions: 0001_initial
social_django: 0010_uid_db_index
2 changes: 1 addition & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"prepublishOnly": "yarn build",
"setup:dev:clickhouse": "cd .. && export DEBUG=1 PRIMARY_DB=clickhouse && python manage.py migrate_clickhouse",
"setup:test:ee": "yarn setup:test:postgres && yarn setup:test:clickhouse",
"setup:test:postgres": "cd .. && (dropdb test_posthog || echo 'no db to drop') && createdb test_posthog && DATABASE_URL=postgres://localhost:5432/test_posthog DEBUG=1 python manage.py migrate",
"setup:test:postgres": "cd .. && (PGPASSWORD=posthog dropdb -h localhost -U posthog test_posthog || echo 'no db to drop') && PGPASSWORD=posthog createdb -h localhost -U posthog test_posthog && DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog DEBUG=1 python manage.py migrate",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just going to patch these as I find them... Probably should just get #7675 in as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update deployment docs then? Seems like we need to create this username with a specific password, also to detail what permissions are needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In #7730 I've proposed a slightly different approach so that people with env vars set get to keep them. And people without get sane defaults.

I haven't extended that to plugin-server since it's being changed here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However I solved this now by replacing

"setup:test:postgres": "cd .. && (PGPASSWORD=posthog dropdb -h localhost -U posthog test_posthog || echo 'no db to drop') && PGPASSWORD=posthog createdb -h localhost -U posthog test_posthog && DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog DEBUG=1 python manage.py migrate",

With

"setup:test:postgres": "cd .. && python manage.py setup_test_environment",

"setup:test:clickhouse": "cd .. && unset KAFKA_URL && export TEST=1 PRIMARY_DB=clickhouse CLICKHOUSE_DATABASE=posthog_test && python manage.py migrate_clickhouse",
"services:start": "cd .. && docker-compose -f ee/docker-compose.ch.yml up zookeeper kafka clickhouse",
"services:stop": "cd .. && docker-compose -f ee/docker-compose.ch.yml down",
Expand Down
9 changes: 6 additions & 3 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ export function getDefaultConfig(): PluginsServerConfig {
return {
CELERY_DEFAULT_QUEUE: 'celery',
DATABASE_URL: isTestEnv
? 'postgres://localhost:5432/test_posthog'
? 'postgres://posthog:posthog@localhost:5432/test_posthog'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for settings.py

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For test/dev environments, this is good and aligns with our new "developing locally" guidelines.

: isDevEnv
? 'postgres://localhost:5432/posthog'
? 'postgres://posthog:posthog@localhost:5432/posthog'
: null,
POSTHOG_DB_NAME: null,
POSTHOG_DB_USER: 'postgres',
Expand Down Expand Up @@ -78,6 +78,7 @@ export function getDefaultConfig(): PluginsServerConfig {
SITE_URL: null,
NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS: '',
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: true,
EXPERIMENTAL_EVENT_PROPERTY_COUNTER_ENABLED_TEAMS: '',
}
}

Expand Down Expand Up @@ -135,7 +136,9 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> {
'(advanced) corresponds to the length of time a piscina worker should block for when looking for tasks',
NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS:
'(advanced) teams for which to run the new person properties update flow on',
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: 'enable experimental feature to track lastSeenAt',
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: '(advanced) enable experimental feature to track lastSeenAt',
EXPERIMENTAL_EVENT_PROPERTY_COUNTER_ENABLED_TEAMS:
'(advanced) teams for which to enable experimental feature to count event properties',
}
}

Expand Down
17 changes: 14 additions & 3 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export async function startPluginsServer(
let piscinaStatsJob: schedule.Job | undefined
let internalMetricsStatsJob: schedule.Job | undefined
let flushLastSeenAtCacheJob: schedule.Job | undefined
let flushEventPropertyCounterJob: schedule.Job | undefined
let pluginMetricsJob: schedule.Job | undefined
let piscina: Piscina | undefined
let queue: Queue | undefined // ingestion queue
Expand Down Expand Up @@ -81,6 +82,8 @@ export async function startPluginsServer(
statusReport.stopStatusReportSchedule()
piscinaStatsJob && schedule.cancelJob(piscinaStatsJob)
internalMetricsStatsJob && schedule.cancelJob(internalMetricsStatsJob)
flushLastSeenAtCacheJob && schedule.cancelJob(flushLastSeenAtCacheJob)
flushEventPropertyCounterJob && schedule.cancelJob(flushEventPropertyCounterJob)
await jobQueueConsumer?.stop()
await scheduleControl?.stopSchedule()
await new Promise<void>((resolve, reject) =>
Expand Down Expand Up @@ -198,10 +201,17 @@ export async function startPluginsServer(
})
}

// every minute flush lastSeenAt cache
// every 10 seconds past the minute flush lastSeenAt cache
if (serverConfig.EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED) {
flushLastSeenAtCacheJob = schedule.scheduleJob('0 * * * * *', async () => {
await hub!.teamManager.flushLastSeenAtCache()
flushLastSeenAtCacheJob = schedule.scheduleJob('10 * * * * *', async () => {
await piscina!.broadcastTask({ task: 'flushLastSeenAtCache' })
})
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be performed on the worker threads. The local teamManager doesn't get any events passing through it. At least it shouldn't.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know better here, don't have all the context to make the right call

}

// every 20 seconds past the minute flush the event property counter cache
if (serverConfig.EXPERIMENTAL_EVENT_PROPERTY_COUNTER_ENABLED_TEAMS) {
flushEventPropertyCounterJob = schedule.scheduleJob('20 * * * * *', async () => {
await piscina!.broadcastTask({ task: 'flushEventPropertyCounter' })
})
}

Expand Down Expand Up @@ -268,6 +278,7 @@ export async function stopPiscina(piscina: Piscina): Promise<void> {
await Promise.all([
piscina.broadcastTask({ task: 'flushKafkaMessages' }),
piscina.broadcastTask({ task: 'flushLastSeenAtCache' }),
piscina.broadcastTask({ task: 'flushEventPropertyCounter' }),
delay(2000),
])
await piscina.destroy()
Expand Down
15 changes: 15 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { PluginMetricsManager } from './utils/plugin-metrics'
import { UUID } from './utils/utils'
import { ActionManager } from './worker/ingestion/action-manager'
import { ActionMatcher } from './worker/ingestion/action-matcher'
import { EventPropertyCounter } from './worker/ingestion/event-property-counter'
import { HookCommander } from './worker/ingestion/hooks'
import { OrganizationManager } from './worker/ingestion/organization-manager'
import { EventsProcessor } from './worker/ingestion/process-event'
Expand Down Expand Up @@ -104,6 +105,7 @@ export interface PluginsServerConfig extends Record<string, any> {
SITE_URL: string | null
NEW_PERSON_PROPERTIES_UPDATE_ENABLED_TEAMS: string
EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED: boolean
EXPERIMENTAL_EVENT_PROPERTY_COUNTER_ENABLED_TEAMS: string
}

export interface Hub extends PluginsServerConfig {
Expand Down Expand Up @@ -131,6 +133,7 @@ export interface Hub extends PluginsServerConfig {
pluginConfigSecretLookup: Map<string, PluginConfigId>
// tools
teamManager: TeamManager
eventPropertyCounter: EventPropertyCounter
organizationManager: OrganizationManager
pluginsApiKeyManager: PluginsApiKeyManager
actionManager: ActionManager
Expand Down Expand Up @@ -761,6 +764,18 @@ export interface PropertyDefinitionType {
team_id: number
}

export interface EventPropertyType {
id: string
event: string
property: string
property_type: string | null
property_type_format: string | null
total_volume: number | null
created_at: string // DateTime
last_seen_at: string // DateTime
team_id: number
}

export type PluginFunction = 'onEvent' | 'onAction' | 'processEvent' | 'onSnapshot' | 'pluginTask'

export enum CeleryTriggeredJobOperation {
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
ElementGroup,
Event,
EventDefinitionType,
EventPropertyType,
Group,
GroupTypeIndex,
GroupTypeToColumnIndex,
Expand Down Expand Up @@ -1119,6 +1120,13 @@ export class DB {
).rows as PropertyDefinitionType[]
}

// EventProperty

public async fetchEventProperties(): Promise<EventPropertyType[]> {
return (await this.postgresQuery('SELECT * FROM posthog_eventproperty', undefined, 'fetchEventProperties'))
.rows as EventPropertyType[]
}

// Action & ActionStep & Action<>Event

public async fetchAllActionsGroupedByTeam(): Promise<Record<Team['id'], Record<Action['id'], Action>>> {
Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { JobQueueManager } from '../../main/job-queues/job-queue-manager'
import { Hub, PluginsServerConfig } from '../../types'
import { ActionManager } from '../../worker/ingestion/action-manager'
import { ActionMatcher } from '../../worker/ingestion/action-matcher'
import { EventPropertyCounter } from '../../worker/ingestion/event-property-counter'
import { HookCommander } from '../../worker/ingestion/hooks'
import { OrganizationManager } from '../../worker/ingestion/organization-manager'
import { EventsProcessor } from '../../worker/ingestion/process-event'
Expand Down Expand Up @@ -165,6 +166,7 @@ export async function createHub(
serverConfig.SITE_URL,
serverConfig.EXPERIMENTAL_EVENTS_LAST_SEEN_ENABLED
)
const eventPropertyCounter = new EventPropertyCounter(db, statsd)
const organizationManager = new OrganizationManager(db)
const pluginsApiKeyManager = new PluginsApiKeyManager(db)
const actionManager = new ActionManager(db)
Expand All @@ -191,6 +193,7 @@ export async function createHub(
pluginSchedulePromises: { runEveryMinute: {}, runEveryHour: {}, runEveryDay: {} },

teamManager,
eventPropertyCounter,
organizationManager,
pluginsApiKeyManager,
actionManager,
Expand Down
3 changes: 2 additions & 1 deletion plugin-server/src/utils/db/postgres-logs-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ export class PostgresLogsWrapper {
this.flushTimeout = null
}
if (this.logs.length > 0) {
await this.db.batchInsertPostgresLogs(this.logs)
const logs = this.logs
this.logs = []
await this.db.batchInsertPostgresLogs(logs)
Comment on lines -39 to +41
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flyby fix to reduce a lot of noise (and errors?) in test logs when this flush gets triggered too quickly.

}
}
}
178 changes: 178 additions & 0 deletions plugin-server/src/worker/ingestion/event-property-counter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import { Properties } from '@posthog/plugin-scaffold'
import { StatsD } from 'hot-shots'
import { DateTime } from 'luxon'

import { TeamId } from '../../types'
import { DB } from '../../utils/db/db'
import { timeoutGuard } from '../../utils/db/utils'
import { status } from '../../utils/status'

enum EventPropertyType {
Number = 'NUMBER',
String = 'STRING',
Boolean = 'BOOLEAN',
DateTime = 'DATETIME',
}

interface EventPropertiesCounter {
createdAt: number
lastSeenAt: number
totalVolume: number
propertyType: EventPropertyType | null
propertyTypeFormat: string | null
}

interface EventNamePropertiesBuffer {
totalVolume: number
uniqueVolume: number
buffer: Map<TeamId, Map<string, EventPropertiesCounter>>
}

export class EventPropertyCounter {
db: DB
eventPropertiesBuffer: EventNamePropertiesBuffer
lastFlushAt: DateTime
statsd?: StatsD

constructor(db: DB, statsd?: StatsD) {
this.db = db
this.statsd = statsd
this.eventPropertiesBuffer = { totalVolume: 0, uniqueVolume: 0, buffer: new Map() }
this.lastFlushAt = DateTime.now()
}

public async updateEventPropertyCounter(teamId: number, event: string, properties: Properties): Promise<void> {
this.updateEventPropertiesBuffer(teamId, event, properties)
// Flush every 2 minutes or 50k unique properties, whichever comes first.
// Additionally, a flush is broadcast every 1 minute from pluginServer.
if (
this.eventPropertiesBuffer.uniqueVolume > 50000 ||
DateTime.now().diff(this.lastFlushAt).as('seconds') > 120
) {
await this.flush()
}
}

/** Save information about event properties into a custom buffer */
public updateEventPropertiesBuffer(teamId: number, event: string, properties: Record<string, any>): void {
const timestamp = DateTime.now().toSeconds()
let bufferForTeam = this.eventPropertiesBuffer.buffer.get(teamId)
if (!bufferForTeam) {
bufferForTeam = new Map()
this.eventPropertiesBuffer.buffer.set(teamId, bufferForTeam)
}
for (const [property, value] of Object.entries(properties)) {
const key = JSON.stringify([event, property])
let propertyBuffer = bufferForTeam.get(key)
if (!propertyBuffer) {
propertyBuffer = {
createdAt: timestamp,
lastSeenAt: timestamp,
totalVolume: 1,
propertyType: null,
propertyTypeFormat: null,
}
bufferForTeam.set(key, propertyBuffer)
this.eventPropertiesBuffer.uniqueVolume += 1
} else {
propertyBuffer.createdAt = Math.min(timestamp, propertyBuffer.createdAt)
propertyBuffer.lastSeenAt = Math.max(timestamp, propertyBuffer.lastSeenAt)
propertyBuffer.totalVolume += 1
}
this.eventPropertiesBuffer.totalVolume += 1

propertyBuffer.propertyType =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when we have outlier property values? e.g. I have a bug that every once in a while I send an invalid value (e.g. for a timestamp)? Can we base this based on the format of the majority instead of the latest?

typeof value === 'number'
? EventPropertyType.Number
: typeof value === 'boolean'
? EventPropertyType.Boolean
: typeof value === 'string'
? EventPropertyType.String
: null
propertyBuffer.propertyTypeFormat = null

if (propertyBuffer.propertyType === EventPropertyType.String) {
const dateFormat = detectDateFormat(value)
if (dateFormat) {
propertyBuffer.propertyType = EventPropertyType.DateTime
propertyBuffer.propertyTypeFormat = dateFormat
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't be doing much with this data right now since the last plan was to base this off of the property definitions table (so set once for all events). Should we thus keep these fields around? I think it might be nice to have, and we will eventually get someone who wants more granularity. But... 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't add them if we're not using them straight away (or we should have clear sign-posting for someone who finds them about what they are there for)

}
}

public async flush(): Promise<void> {
if (this.eventPropertiesBuffer.totalVolume === 0) {
return
}
const timeout = timeoutGuard(
`Still flushing the event names and properties buffer. Timeout warning after 30 sec!`
)
try {
const startTime = DateTime.now()
const lastFlushedSecondsAgo = DateTime.now().diff(this.lastFlushAt).as('seconds')
const cacheSize = this.eventPropertiesBuffer.uniqueVolume
status.info(
`🚽 Starting flushEventPropertyCounter. Cache size: ${cacheSize} items. Last flushed: ${lastFlushedSecondsAgo} seconds ago.`
)

const oldBuffer = this.eventPropertiesBuffer
this.eventPropertiesBuffer = { totalVolume: 0, uniqueVolume: 0, buffer: new Map() }
this.lastFlushAt = DateTime.now()

const columns: any[][] = [[], [], [], [], [], [], [], []]

for (const [teamId, teamBuffer] of oldBuffer.buffer.entries()) {
for (const [key, propertyBuffer] of teamBuffer.entries()) {
const [event, property] = JSON.parse(key)
const { propertyType, propertyTypeFormat, totalVolume, lastSeenAt, createdAt } = propertyBuffer
columns[0].push(teamId)
columns[1].push(event)
columns[2].push(property)
columns[3].push(propertyType)
columns[4].push(propertyTypeFormat)
columns[5].push(totalVolume)
columns[6].push(DateTime.fromSeconds(createdAt).toUTC())
columns[7].push(DateTime.fromSeconds(lastSeenAt).toUTC())
}
}

// VALUES ${queryValues.join(',')}
await this.db.postgresQuery(
`INSERT INTO posthog_eventproperty(team_id, event, property, property_type, property_type_format, total_volume, created_at, last_seen_at)
SELECT * FROM UNNEST ($1::int[], $2::text[], $3::text[], $4::text[], $5::text[], $6::bigint[], $7::timestamp with time zone[], $8::timestamp with time zone[])
ON CONFLICT ON CONSTRAINT posthog_eventproperty_team_id_event_property_10910b3b_uniq DO UPDATE SET
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth giving the index a name in the migration instead?

total_volume = posthog_eventproperty.total_volume + excluded.total_volume,
created_at = LEAST(posthog_eventproperty.created_at, excluded.created_at),
last_seen_at = GREATEST(posthog_eventproperty.last_seen_at, excluded.last_seen_at),
property_type = CASE WHEN posthog_eventproperty.property_type IS NULL THEN excluded.property_type ELSE posthog_eventproperty.property_type END,
property_type_format = CASE WHEN posthog_eventproperty.property_type_format IS NULL THEN excluded.property_type_format ELSE posthog_eventproperty.property_type_format END
`,
columns,
'eventPropertyCounterFlush'
)

const elapsedTime = DateTime.now().diff(startTime).as('milliseconds')
this.statsd?.set('flushEventPropertyCounter.Size', cacheSize)
this.statsd?.set('flushEventPropertyCounter.QuerySize', columns[0].length)
this.statsd?.timing('flushEventPropertyCounter', elapsedTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: postgresQuery automatically has this, so this is kind of redundant.

status.info(`✅ 🚽 flushEventPropertyCounter finished successfully in ${elapsedTime} ms.`)
} finally {
clearTimeout(timeout)
}
}
}

export function detectDateFormat(value: string): string | void {
if (value.match(/^\d{4}-\d{2}-\d{2}$/)) {
return 'YYYY-MM-DD'
}

if (value.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(.\d{1,3}?Z?|)$/)) {
return 'ISO8601 UTC'
}

if (value.match(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(.\d{1,3}|)\+\d{2}:\d{2}$/)) {
return 'ISO8601 TZ'
}
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What more date formats do we need to know about? Regular unix timestamps perhaps?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think timestamps are pretty common too. I added a comment elsewhere, do wonder if this is required or CH is smart enough to parse them correctly when used.

Copy link
Member

@pauldambra pauldambra Dec 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClickHouse `parseDateTimeBestEffort is pretty clever #6619 (comment)

The type and format (in the short to medium term) will be most useful in the UI. E.g. if it's a timestamp and not simply a number we can display it as a date, and tell the API it is a date. See #7608

Loading