-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: scaffold session-recording-v2 #27498
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty nice PR. found nothing blocking & added a couple of opinions :)
import { KafkaMetrics } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/metrics' | ||
import { KafkaParser } from '../../../../../src/main/ingestion-queues/session-recording-v2/kafka/parser' | ||
|
||
const do_gzip = promisify(gzip) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: idk if there is a convention around already but usually is it camelCase for function names.
e.g.
const do_gzip = promisify(gzip) | |
const compressWithGzip = promisify(gzip) |
public async handleEachBatch(messages: Message[], context: { heartbeat: () => void }): Promise<void> { | ||
context.heartbeat() | ||
|
||
if (messages.length !== 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (messages.length !== 0) { | |
if (messages.length > 0) { |
sendTimeoutGuardToSentry: false, | ||
func: async () => { | ||
// Increment message received counter for each message | ||
for (const message of messages) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could be a bit more readable with (pure preference)
messages.forEach((message) => this.metrics.incrementMessageReceived(message.partition));
|
||
this.metrics.observeKafkaBatchSize(messages.length) | ||
this.metrics.observeKafkaBatchSizeKb( | ||
messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could pull this out as
const batchSizeKb = messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024;
const batchSize = messages.length;
and do (also preference)
this.metrics.observeKafkaBatchSize(batchSize);
this.metrics.observeKafkaBatchSizeKb(batchSizeKb);
}) | ||
} | ||
|
||
await runInstrumentedFunction({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(preference): to make things more readable and instead of inlining everything you could pull things out a bit more e.g.
async function processBatchMessages(messages) {
const batchSizeKb = messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024;
const batchSize = messages.length;
// Increment message received counter for each message
messages.forEach((message) => this.metrics.incrementMessageReceived(message.partition));
// Record Kafka batch size and size in KB
this.metrics.observeKafkaBatchSize(batchSize);
this.metrics.observeKafkaBatchSizeKb(batchSizeKb);
// Parse messages
const parsedMessages = await runInstrumentedFunction({
statsKey: 'recordingingesterv2.handleEachBatch.parseKafkaMessages',
func: async () => this.messageProcessor.parseBatch(messages),
});
context.heartbeat();
// Consume messages in parallel or sequentially
if (this.config.SESSION_RECORDING_PARALLEL_CONSUMPTION) {
await consumeMessagesInParallel(parsedMessages);
} else {
await consumeMessagesSequentially(parsedMessages);
}
}
async function consumeMessagesInParallel(parsedMessages) {
await Promise.all(parsedMessages.map((m) => this.consume(m)));
}
async function consumeMessagesSequentially(parsedMessages) {
for (const message of parsedMessages) {
await this.consume(message);
}
}
so in the end we would only have
await runInstrumentedFunction({
statsKey: 'recordingingesterv2.handleEachBatch',
sendTimeoutGuardToSentry: false,
func: async () => {
await processBatchMessages(messages);
},
});
but then again preference as i like things broken out rather than in one function. no need to change your implementation if you do not see that similar :)
import { ParsedMessageData } from './types' | ||
|
||
const GZIP_HEADER = Buffer.from([0x1f, 0x8b, 0x08, 0x00]) | ||
const do_unzip = promisify(unzip) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same with camelCasing it
const do_unzip = promisify(unzip) | |
const decompressWithGzip = promisify(unzip) |
} | ||
} | ||
|
||
private isGzipped(buffer: Buffer): boolean { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we really want to squeeze for performance we could technically get rid of the loop here i think with
private isGzipped(buffer: Buffer): boolean { | |
private isGzipped(buffer: Buffer): boolean { | |
return buffer.length >= GZIP_HEADER.length && buffer.slice(0, GZIP_HEADER.length).equals(GZIP_HEADER); | |
} |
plugin-server/src/main/ingestion-queues/session-recording-v2/teams/team-filter.ts
Show resolved
Hide resolved
private readonly metrics: VersionMetrics | ||
) {} | ||
|
||
public async parseBatch(messages: TInput[]): Promise<MessageWithTeam[]> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as before if ordering does not matter we could do something like
public async parseBatch(messages: TInput[]): Promise<MessageWithTeam[]> {
// Parse the messages batch using the source processor
const processedMessages = await this.sourceProcessor.parseBatch(messages)
// Use Promise.all for parallel processing of the checkLibVersion function
await Promise.all(processedMessages.map((message) => this.checkLibVersion(message)))
return processedMessages
}
to enable concurrency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I don't think we care about the ordering of the warnings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. In theory I might structure things differently but its hard to say till we get to real code so happy to approve 👍
Problem
We are working on a major refactoring of the session recording pipeline to improve its reliability and performance. Scaffolding a new session recording service is the first step in this process.
Changes
Creates a new session-recording-v2 module with a blackhole ingestion service based on the current session-recording implementation.
Does this work well for both Cloud and self-hosted?
Works for both cloud and self-hosted.
How did you test this code?