diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index cf764b679a0cf..4ae1de829d558 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -85,6 +85,8 @@ export function getDefaultConfig(): PluginsServerConfig { KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: 1, CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '', CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON, + PERSON_INFO_TO_REDIS_TEAMS: '', + PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min } } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 3fde9f5cdde4d..a633375251247 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -113,6 +113,8 @@ export interface PluginsServerConfig extends Record { CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS: boolean CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: string CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string + PERSON_INFO_TO_REDIS_TEAMS: string + PERSON_INFO_CACHE_TTL: number } export interface Hub extends PluginsServerConfig { diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index 555e0f94aea4e..1f780e41712b0 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -150,12 +150,20 @@ export class DB { /** Whether to write to clickhouse_person_unique_id topic */ writeToPersonUniqueId?: boolean + /** How many seconds to keep person info in Redis cache */ + PERSON_INFO_CACHE_TTL: number + + /** Which teams is person info caching enabled on */ + personInfoCachingEnabledTeams: Set + constructor( postgres: Pool, redisPool: GenericPool, kafkaProducer: KafkaProducerWrapper | undefined, clickhouse: ClickHouse | undefined, - statsd: StatsD | undefined + statsd: StatsD | undefined, + personInfoCacheTtl = 1, + personInfoCachingEnabledTeams: Set = new Set() ) { this.postgres = postgres this.redisPool = redisPool @@ -163,6 +171,8 @@ export class DB { this.clickhouse = clickhouse this.statsd = statsd this.postgresLogsWrapper = new PostgresLogsWrapper(this) + this.PERSON_INFO_CACHE_TTL = personInfoCacheTtl + this.personInfoCachingEnabledTeams = personInfoCachingEnabledTeams } // Postgres @@ -413,6 +423,47 @@ export class DB { } // Person + REDIS_PERSON_ID_PREFIX = 'person_id' + REDIS_PERSON_CREATED_AT_PREFIX = 'person_created_at' + REDIS_PERSON_PROPERTIES_PREFIX = 'person_props' + + private getPersonIdCacheKey(teamId: number, distinctId: string): string { + return `${this.REDIS_PERSON_ID_PREFIX}:${teamId}:${distinctId}` + } + + private getPersonCreatedAtCacheKey(teamId: number, personId: number): string { + return `${this.REDIS_PERSON_CREATED_AT_PREFIX}:${teamId}:${personId}` + } + + private getPersonPropertiesCacheKey(teamId: number, personId: number): string { + return `${this.REDIS_PERSON_PROPERTIES_PREFIX}:${teamId}:${personId}` + } + + private async updatePersonIdCache(teamId: number, distinctId: string, personId: number): Promise { + if (this.personInfoCachingEnabledTeams.has(teamId)) { + await this.redisSet(this.getPersonIdCacheKey(teamId, distinctId), personId, this.PERSON_INFO_CACHE_TTL) + } + } + + private async updatePersonCreatedAtCache(teamId: number, personId: number, createdAt: DateTime): Promise { + if (this.personInfoCachingEnabledTeams.has(teamId)) { + await this.redisSet( + this.getPersonCreatedAtCacheKey(teamId, personId), + createdAt, + this.PERSON_INFO_CACHE_TTL + ) + } + } + + private async updatePersonPropertiesCache(teamId: number, personId: number, properties: Properties): Promise { + if (this.personInfoCachingEnabledTeams.has(teamId)) { + await this.redisSet( + this.getPersonPropertiesCacheKey(teamId, personId), + JSON.stringify(properties), + this.PERSON_INFO_CACHE_TTL + ) + } + } public async fetchPersons(database?: Database.Postgres): Promise public async fetchPersons(database: Database.ClickHouse): Promise @@ -546,6 +597,16 @@ export class DB { await this.kafkaProducer.queueMessages(kafkaMessages) } + // Update person info cache + await Promise.all( + (distinctIds || []) + .map((distinctId) => this.updatePersonIdCache(teamId, distinctId, person.id)) + .concat([ + this.updatePersonPropertiesCache(teamId, person.id, properties), + this.updatePersonCreatedAtCache(teamId, person.id, person.created_at), + ]) + ) + return person } @@ -601,6 +662,8 @@ export class DB { } } + await this.updatePersonPropertiesCache(updatedPerson.team_id, updatedPerson.id, updatedPerson.properties) + return client ? kafkaMessages : updatedPerson } @@ -672,6 +735,7 @@ export class DB { if (this.kafkaProducer && kafkaMessages.length) { await this.kafkaProducer.queueMessages(kafkaMessages) } + await this.updatePersonIdCache(person.team_id, distinctId, person.id) } public async addDistinctIdPooled( @@ -802,6 +866,11 @@ export class DB { ], }) } + await this.updatePersonIdCache( + usefulColumns.team_id, + usefulColumns.distinct_id, + usefulColumns.person_id + ) } } return kafkaMessages diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index 9dfec1a277123..b55e4f6435949 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -186,7 +186,15 @@ export async function createHub( ) status.info('👍', `Redis`) - const db = new DB(postgres, redisPool, kafkaProducer, clickhouse, statsd) + const db = new DB( + postgres, + redisPool, + kafkaProducer, + clickhouse, + statsd, + serverConfig.PERSON_INFO_CACHE_TTL, + new Set(serverConfig.PERSON_INFO_TO_REDIS_TEAMS.split(',').filter(String).map(Number)) + ) const teamManager = new TeamManager(db, serverConfig, statsd) const organizationManager = new OrganizationManager(db) const pluginsApiKeyManager = new PluginsApiKeyManager(db)