Skip to content

Commit

Permalink
feat: Write person ID and properties to redis (#9377)
Browse files Browse the repository at this point in the history
* feat: Write person ID and properties to redis

* comments

* test

* fix ts usage

* move json stringify to caching function

* Pull cache key creation out

* review comments
  • Loading branch information
tiina303 authored Apr 13, 2022
1 parent d8adc89 commit 38bf3ef
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 2 deletions.
2 changes: 2 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ export function getDefaultConfig(): PluginsServerConfig {
KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY: 1,
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: '',
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: KAFKA_EVENTS_JSON,
PERSON_INFO_TO_REDIS_TEAMS: '',
PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min
}
}

Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ export interface PluginsServerConfig extends Record<string, any> {
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS: boolean
CLICKHOUSE_DISABLE_EXTERNAL_SCHEMAS_TEAMS: string
CLICKHOUSE_JSON_EVENTS_KAFKA_TOPIC: string
PERSON_INFO_TO_REDIS_TEAMS: string
PERSON_INFO_CACHE_TTL: number
}

export interface Hub extends PluginsServerConfig {
Expand Down
71 changes: 70 additions & 1 deletion plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,19 +150,29 @@ export class DB {
/** Whether to write to clickhouse_person_unique_id topic */
writeToPersonUniqueId?: boolean

/** How many seconds to keep person info in Redis cache */
PERSON_INFO_CACHE_TTL: number

/** Which teams is person info caching enabled on */
personInfoCachingEnabledTeams: Set<number>

constructor(
postgres: Pool,
redisPool: GenericPool<Redis.Redis>,
kafkaProducer: KafkaProducerWrapper | undefined,
clickhouse: ClickHouse | undefined,
statsd: StatsD | undefined
statsd: StatsD | undefined,
personInfoCacheTtl = 1,
personInfoCachingEnabledTeams: Set<number> = new Set<number>()
) {
this.postgres = postgres
this.redisPool = redisPool
this.kafkaProducer = kafkaProducer
this.clickhouse = clickhouse
this.statsd = statsd
this.postgresLogsWrapper = new PostgresLogsWrapper(this)
this.PERSON_INFO_CACHE_TTL = personInfoCacheTtl
this.personInfoCachingEnabledTeams = personInfoCachingEnabledTeams
}

// Postgres
Expand Down Expand Up @@ -413,6 +423,47 @@ export class DB {
}

// Person
REDIS_PERSON_ID_PREFIX = 'person_id'
REDIS_PERSON_CREATED_AT_PREFIX = 'person_created_at'
REDIS_PERSON_PROPERTIES_PREFIX = 'person_props'

private getPersonIdCacheKey(teamId: number, distinctId: string): string {
return `${this.REDIS_PERSON_ID_PREFIX}:${teamId}:${distinctId}`
}

private getPersonCreatedAtCacheKey(teamId: number, personId: number): string {
return `${this.REDIS_PERSON_CREATED_AT_PREFIX}:${teamId}:${personId}`
}

private getPersonPropertiesCacheKey(teamId: number, personId: number): string {
return `${this.REDIS_PERSON_PROPERTIES_PREFIX}:${teamId}:${personId}`
}

private async updatePersonIdCache(teamId: number, distinctId: string, personId: number): Promise<void> {
if (this.personInfoCachingEnabledTeams.has(teamId)) {
await this.redisSet(this.getPersonIdCacheKey(teamId, distinctId), personId, this.PERSON_INFO_CACHE_TTL)
}
}

private async updatePersonCreatedAtCache(teamId: number, personId: number, createdAt: DateTime): Promise<void> {
if (this.personInfoCachingEnabledTeams.has(teamId)) {
await this.redisSet(
this.getPersonCreatedAtCacheKey(teamId, personId),
createdAt,
this.PERSON_INFO_CACHE_TTL
)
}
}

private async updatePersonPropertiesCache(teamId: number, personId: number, properties: Properties): Promise<void> {
if (this.personInfoCachingEnabledTeams.has(teamId)) {
await this.redisSet(
this.getPersonPropertiesCacheKey(teamId, personId),
JSON.stringify(properties),
this.PERSON_INFO_CACHE_TTL
)
}
}

public async fetchPersons(database?: Database.Postgres): Promise<Person[]>
public async fetchPersons(database: Database.ClickHouse): Promise<ClickHousePerson[]>
Expand Down Expand Up @@ -546,6 +597,16 @@ export class DB {
await this.kafkaProducer.queueMessages(kafkaMessages)
}

// Update person info cache
await Promise.all(
(distinctIds || [])
.map((distinctId) => this.updatePersonIdCache(teamId, distinctId, person.id))
.concat([
this.updatePersonPropertiesCache(teamId, person.id, properties),
this.updatePersonCreatedAtCache(teamId, person.id, person.created_at),
])
)

return person
}

Expand Down Expand Up @@ -601,6 +662,8 @@ export class DB {
}
}

await this.updatePersonPropertiesCache(updatedPerson.team_id, updatedPerson.id, updatedPerson.properties)

return client ? kafkaMessages : updatedPerson
}

Expand Down Expand Up @@ -672,6 +735,7 @@ export class DB {
if (this.kafkaProducer && kafkaMessages.length) {
await this.kafkaProducer.queueMessages(kafkaMessages)
}
await this.updatePersonIdCache(person.team_id, distinctId, person.id)
}

public async addDistinctIdPooled(
Expand Down Expand Up @@ -802,6 +866,11 @@ export class DB {
],
})
}
await this.updatePersonIdCache(
usefulColumns.team_id,
usefulColumns.distinct_id,
usefulColumns.person_id
)
}
}
return kafkaMessages
Expand Down
10 changes: 9 additions & 1 deletion plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,15 @@ export async function createHub(
)
status.info('👍', `Redis`)

const db = new DB(postgres, redisPool, kafkaProducer, clickhouse, statsd)
const db = new DB(
postgres,
redisPool,
kafkaProducer,
clickhouse,
statsd,
serverConfig.PERSON_INFO_CACHE_TTL,
new Set(serverConfig.PERSON_INFO_TO_REDIS_TEAMS.split(',').filter(String).map(Number))
)
const teamManager = new TeamManager(db, serverConfig, statsd)
const organizationManager = new OrganizationManager(db)
const pluginsApiKeyManager = new PluginsApiKeyManager(db)
Expand Down

0 comments on commit 38bf3ef

Please sign in to comment.