-
Notifications
You must be signed in to change notification settings - Fork 740
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update activities in postgres periodically #2668
Conversation
WalkthroughThis pull request introduces a new script for synchronizing activities, updates to the Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 14
🧹 Outside diff range and nitpick comments (11)
backend/src/bin/scripts/syncActivities.ts (1)
7-7
: Consider removing unnecessarysetImmediate
.The
setImmediate
wrapper isn't necessary here since we're already using an async function. The script can be simplified by directly using an async IIFE (Immediately Invoked Function Expression).-setImmediate(async () => { +(async () => {services/libs/logging/src/utility.ts (2)
Line range hint
19-41
: Consider refactoring to reduce code duplication and enhance error logging.While the error handling addition is valuable, there's room for improvement in the implementation.
Consider these improvements:
export const logExecutionTimeV2 = async <T>( process: () => Promise<T>, log: Logger, name: string, ): Promise<T> => { - const start = performance.now() - - const end = () => { - const end = performance.now() - const duration = end - start - const durationInSeconds = duration / 1000 - return durationInSeconds.toFixed(2) - } + const timerInstance = timer(log, name) try { const result = await process() - log.info(`Process ${name} took ${end()} seconds!`) + timerInstance.end() return result } catch (e) { - log.info(`Process ${name} failed after ${end()} seconds!`) + log.error(`Process ${name} failed:`, e) + timerInstance.end(`${name} (failed)`) throw e } }This refactoring:
- Reuses the existing
timer
function to avoid timing logic duplication- Adds error details to the log for better debugging
- Distinguishes failed process timing logs with a "(failed)" suffix
Line range hint
49-59
: LGTM! Good improvements to prevent double-ending and support name overrides.The implementation looks solid with:
- Protection against multiple
end()
calls- Clean name override handling using nullish coalescing
Consider adding a debug log when skipping due to
isEnded
:end: function (overrideName?: string) { if (isEnded) { + log.debug(`Timer ${overrideName ?? name} already ended, skipping.`) return }
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (1)
131-132
: Document the timestamp fields.Consider adding JSDoc comments to explain:
- The purpose of these timestamp fields
- Whether they are automatically managed or need manual updates
- Their relationship with the new periodic update functionality
Example documentation:
export interface IDbActivityUpdateData { // ... existing fields ... + /** Timestamp of the last update to this activity record. Used for periodic synchronization. */ updatedAt?: string + /** Timestamp when this activity was first created. */ createdAt?: string }services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1)
191-210
: Consider a safer pattern for bulk operations.The addition of "raw" methods that bypass safety checks suggests a need for performance optimization, possibly for bulk operations. However, this approach introduces security and data integrity risks.
Consider these alternatives:
- Add a proper bulk update/insert method with all safety checks
- Use database transactions for better performance while maintaining safety
- If the goal is to avoid parent ID updates, rename methods to clearly indicate this (e.g.,
updateWithoutParentSync
)- Document performance requirements and constraints to help identify better solutions
Example bulk operation pattern:
public async bulkUpdate( tenantId: string, segmentId: string, updates: Array<{ id: string, data: IDbActivityUpdateData }> ): Promise<void> { return this.db().tx(async t => { const timestamp = new Date() const queries = updates.map(({ id, data }) => { const prepared = RepositoryBase.prepare( { ...data, updatedAt: timestamp }, this.updateActivityColumnSet ) const query = this.dbInstance.helpers.update(prepared, this.updateActivityColumnSet) const condition = this.format( 'where id = $(id) and "tenantId" = $(tenantId) and "segmentId" = $(segmentId)', { id, tenantId, segmentId } ) return t.result(`${query} ${condition}`) }) const results = await t.batch(queries) results.forEach(r => this.checkUpdateRowCount(r.rowCount, 1)) }) }services/libs/data-access-layer/src/conversations/sql.ts (1)
394-397
: Consider adding debug logging for skipped empty orderBy parts.The addition of the empty orderBy part check is a good defensive programming practice. However, consider adding debug logging when skipping empty parts to help with troubleshooting.
if (orderByPart.trim().length === 0) { + console.debug(`Skipping empty orderBy part in queryConversations`) continue }
services/libs/data-access-layer/src/activities/update.ts (3)
Line range hint
13-33
: Adjust the timer to accurately measure the streaming durationThe timer
t
is ended inside the loop after processing the first item, which only measures the time until the first activity is processed. To capture the total duration of the streaming process, consider movingt.end()
after the stream has fully processed all activities.Apply this diff to adjust the timer:
const t = timer(logger, `query activities with ${whereClause}`) const res = await qdb.stream(qs, async (stream) => { for await (const item of stream) { const activity = item as unknown as IDbActivityCreateData await onActivity(activity) } }) +t.end()
Line range hint
13-33
: Add error handling during activity processing in the streamIf an error occurs within
onActivity(activity)
, it could terminate the stream and leave it in an inconsistent state. Wrap the activity processing logic in a try-catch block to handle exceptions gracefully and ensure the stream continues or fails safely.Apply this diff to include error handling:
for await (const item of stream) { const activity = item as unknown as IDbActivityCreateData + try { await onActivity(activity) + } catch (error) { + logger.error({ error }, 'Error processing activity in stream') + // Decide whether to continue processing or rethrow the error based on your error handling strategy + } }
29-29
: Control concurrency to prevent resource exhaustionIf
onActivity
involves heavy computation or I/O-bound operations, processing a large stream without limiting concurrency may overwhelm system resources. Consider implementing a concurrency limit using a library likep-limit
to manage the number of simultaneousonActivity
executions.Example using
p-limit
:import QueryStream from 'pg-query-stream' +import pLimit from 'p-limit' const limit = pLimit(10) // Limit to 10 concurrent operations const res = await qdb.stream(qs, async (stream) => { for await (const item of stream) { const activity = item as unknown as IDbActivityCreateData + limit(() => onActivity(activity)).catch((error) => { + logger.error({ error }, 'Error processing activity in stream') + // Handle error accordingly + }) } })backend/src/bin/jobs/syncActivities.ts (2)
94-94
: Avoid variable shadowing for the timer variableThe variable
t
is declared both outside the loop (line 94) and inside the loop (line 118). This could lead to confusion or errors. Consider renaming the innert
variable to prevent shadowing.Apply this change:
... ...Also applies to: 118-118
96-97
: Refactor loop to avoid disabling ESLint rulesThe comment
// eslint-disable-next-line no-constant-condition
suppresses an ESLint warning for thewhile (true)
loop. Refactoring the loop enhances code quality and adheres to ESLint guidelines.Apply this change to use a condition in the loop:
...
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (13)
backend/package.json
(1 hunks)backend/src/bin/jobs/index.ts
(2 hunks)backend/src/bin/jobs/syncActivities.ts
(1 hunks)backend/src/bin/scripts/syncActivities.ts
(1 hunks)backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
(1 hunks)services/apps/data_sink_worker/src/service/activity.data.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.service.ts
(4 hunks)services/libs/data-access-layer/src/activities/update.ts
(2 hunks)services/libs/data-access-layer/src/conversations/sql.ts
(6 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
(0 hunks)services/libs/logging/src/utility.ts
(2 hunks)
💤 Files with no reviewable changes (1)
- services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
✅ Files skipped from review due to trivial changes (1)
- backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
🔇 Additional comments (13)
backend/src/bin/scripts/syncActivities.ts (2)
1-5
: LGTM! Clean imports and proper logger setup.
The imports are well-organized and the logger setup follows best practices by using a child logger with appropriate context.
3-3
: Verify syncActivities function signature and batch processing.
Let's verify that the syncActivities
function signature matches our usage and supports batch processing as mentioned in the summary.
Also applies to: 15-15
✅ Verification successful
Function signature and batch processing implementation verified
The verification confirms:
- The
syncActivities
function accepts the correct parameters:logger
and optionalmaxUpdatedAt
- Batch processing is properly implemented through
syncActivitiesBatch
function - The function includes progress tracking and logging functionality
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify syncActivities function implementation
# Expected: Function should accept logger and updatedAt parameters and implement batch processing
# Check function signature and implementation
ast-grep --pattern 'export function syncActivities($_logger, $_updatedAt) {
$$$
}'
# Look for batch processing implementation
rg -A 10 'syncActivities' backend/src/bin/jobs/syncActivities.ts
Length of output: 1280
backend/src/bin/jobs/index.ts (2)
11-11
: LGTM!
The import statement follows the established pattern and naming conventions.
22-22
: Verify job execution order requirements.
The addition of syncActivitiesJob
to the jobs array looks good. However, let's verify if the job execution order matters for the application's functionality.
✅ Verification successful
Let me gather more information about the job implementation and its dependencies.
Let me check the implementation of syncActivitiesJob to understand its dependencies and requirements.
Let me check the implementation of the syncActivities job file directly.
No job ordering dependencies found for syncActivitiesJob
The syncActivitiesJob is an independent daily job that synchronizes activities by reading from QuestDB and writing to PostgreSQL. The implementation shows:
- No dependencies on other jobs in the array
- Self-contained functionality that doesn't rely on the execution of other jobs
- Uses its own timestamp tracking for incremental updates
- Has no comments or code indicating required ordering
The job can safely run in any position within the jobs array.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any job ordering dependencies or documentation
# Test 1: Look for job ordering documentation in README files
echo "Checking for job ordering documentation..."
fd -t f "README" | xargs rg -i "job.*order|execution.*order"
# Test 2: Check if syncActivities job has any dependencies on other jobs
echo "Checking for job dependencies..."
rg -A 5 "syncActivities.*depend"
rg -A 5 "depends.*syncActivities"
# Test 3: Check if the job is referenced in any configuration files
echo "Checking for job configuration..."
fd -e json -e yaml -e yml | xargs rg "syncActivities"
Length of output: 565
Script:
#!/bin/bash
# Check the implementation of syncActivitiesJob and other jobs in the array
# Look for the job implementation
echo "Checking syncActivitiesJob implementation..."
rg -A 10 "syncActivitiesJob"
# Check the jobs array and its context
echo "Checking jobs array context..."
rg -A 10 -B 10 "const jobs = \["
# Look for any job scheduling or sequencing logic
echo "Checking for job scheduling logic..."
rg -A 5 "schedule|sequence|order.*job"
Length of output: 31860
Script:
#!/bin/bash
# Look for syncActivities job implementation and its dependencies
# Check the implementation file
echo "Checking syncActivities job implementation..."
rg -A 10 "export default.*syncActivities" backend/src/bin/jobs/
# Look for any dependencies or imports in the job file
echo "Checking job dependencies..."
rg -B 5 "import.*syncActivities" backend/src/bin/jobs/
# Check if there are any comments about ordering
echo "Checking for ordering comments..."
rg -B 5 -A 5 "syncActivities.*job" backend/src/bin/jobs/
Length of output: 951
Script:
#!/bin/bash
# Check the syncActivities job implementation
# Look for the job file content
echo "Checking syncActivities job file..."
cat backend/src/bin/jobs/syncActivities.ts
# Look for any dependencies between jobs in the implementation
echo "Checking for job dependencies in implementation..."
rg -l "integrationTicks|refreshMaterializedViews|cleanUp|checkStuckIntegrationRuns|refreshGroupsioToken|refreshGitlabToken|refreshGithubRepoSettings|autoImportGroups" backend/src/bin/jobs/syncActivities.ts
# Check if there are any comments about job ordering
echo "Checking for ordering comments in job file..."
rg -i "order|sequence|after|before|depend" backend/src/bin/jobs/syncActivities.ts
Length of output: 4735
services/apps/data_sink_worker/src/service/activity.data.ts (2)
4-4
:
Breaking Change: Required id
field added to IActivityCreateData
Adding a required id
field is a breaking change that will affect all code creating activities. We need to ensure all consumers are updated.
Let's verify the impact:
4-4
: Clarify ID generation strategy
Please provide details about:
- Who is responsible for generating these IDs?
- How is uniqueness guaranteed across different sources?
- What is the ID format/standard being used?
Let's check for existing ID generation patterns:
✅ Verification successful
Based on the search results, I can see extensive usage of UUIDs throughout the codebase. Let me search for more specific patterns around ID generation in the data sink worker context:
Let me check one more thing to understand how activities are created in this context:
Based on the code analysis, I can now provide a clear response about the ID generation strategy:
ID Generation Strategy for Activities
The ID field in the activity data sink worker is managed through the following process:
- IDs are generated and managed by the data sink worker service when processing activities
- The codebase uses UUID format, specifically leveraging the common library's UUID utilities
- The ID generation is handled upstream before the activity reaches the data sink worker, as evidenced by the activity.data.ts interface which only declares the ID field without generation logic
Key points:
- The data sink worker expects activities to already have IDs when they arrive
- The worker validates and processes these IDs but doesn't generate them
- The ID format follows the UUID standard used throughout the codebase
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Look for ID generation patterns in the codebase
rg -A 3 'generateId|uuid|nanoid'
Length of output: 165663
Script:
#!/bin/bash
# Look for ID generation patterns specifically in data sink worker
rg -A 5 "id.*:" services/apps/data_sink_worker/
Length of output: 22076
Script:
#!/bin/bash
# Look for activity creation patterns in the data sink worker
rg -A 10 "new activity|create activity|processActivity" services/apps/data_sink_worker/
Length of output: 9208
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (2)
131-132
: Verify the handling of createdAt in column set.
While both updatedAt
and createdAt
are added to the interface, only updatedAt
is included in the updateActivityColumnSet
. This asymmetry might be intentional (as createdAt
typically shouldn't change during updates), but it's worth confirming.
Consider adding a comment explaining why createdAt
is part of the update interface but not included in the update column set.
Line range hint 1-180
: Clarify the status of this code.
This file is located in an old
directory, which typically indicates deprecated or legacy code. Given that this PR introduces new functionality, we should verify if this is the correct location for these changes.
✅ Verification successful
Let me check the content of both files to compare them.
Let me check which file is being actively used by searching for imports of these files.
Changes should be made to the file in the old
directory
The file in the old
directory is still actively used by multiple services and components as shown by the import references. The newer file at services/apps/data_sink_worker/src/service/activity.data.ts
appears to be a simplified interface version but is not being imported anywhere. Both files were modified in the same commits, indicating they are meant to coexist rather than one replacing the other.
- The old file is imported by:
- services/apps/data_sink_worker/src/service/activity.service.ts
- services/libs/data-access-layer/src/activities/update.ts
- services/libs/data-access-layer/src/activities/sql.ts
- services/libs/data-access-layer/src/activities/ilp.ts
- backend/src/bin/jobs/syncActivities.ts
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check if there's a newer version of this file outside the 'old' directory
fd -t f "activity.data.ts" | grep -v "old"
# Check for any migration plans or deprecation notices
rg -i "deprecat|migrat|old|legacy" "$(dirname "services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts")"
Length of output: 891
Script:
#!/bin/bash
# Compare the content of both files
echo "=== Content of the file in 'old' directory ==="
cat "services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts"
echo -e "\n=== Content of the file outside 'old' directory ==="
cat "services/apps/data_sink_worker/src/service/activity.data.ts"
Length of output: 4989
Script:
#!/bin/bash
# Search for imports of both files
echo "=== References to old file ==="
rg -l "from.*old.*activity\.data"
echo -e "\n=== References to new file ==="
rg -l "from.*service/activity\.data"
# Check git history of both files
echo -e "\n=== Git history of old file ==="
git log --oneline "services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts"
echo -e "\n=== Git history of new file ==="
git log --oneline "services/apps/data_sink_worker/src/service/activity.data.ts"
Length of output: 1153
backend/package.json (1)
44-44
: LGTM! Consistent formatting.
The script formatting now aligns with the project's conventions.
services/libs/data-access-layer/src/conversations/sql.ts (3)
49-52
: LGTM! Formatting improvements enhance readability.
The indentation and line breaks have been adjusted to improve code readability while maintaining the existing logic.
342-347
: LGTM! Formatting improvements enhance readability.
The SQL query formatting has been standardized with proper indentation and line breaks while preserving the existing logic.
477-477
: LGTM! SQL query formatting improvements enhance readability.
The SQL queries have been reformatted with proper indentation, line breaks, and column alignment, making them more readable and maintainable while preserving the existing functionality.
Also applies to: 479-479, 503-503, 505-505, 527-527
services/libs/data-access-layer/src/activities/update.ts (1)
Line range hint 13-33
: Verify that all activities meet expected criteria during streaming
Ensure that the activities retrieved by the WHERE
clause truly represent the intended dataset. There might be edge cases where activities don't meet the expected criteria, leading to potential issues in onActivity
.
Run the following script to check the count of activities being streamed:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (6)
services/libs/data-access-layer/src/activities/update.ts (2)
Line range hint
13-33
: Fix timer usage and add error handlingThe current implementation has several issues that need attention:
- The timer.end() is called on the first iteration, which is incorrect
- Missing error handling around stream processing
- No control over batch size for memory management
Here's the suggested fix:
export async function streamActivities( qdb: DbConnOrTx, onActivity: (activity: IDbActivityCreateData) => Promise<void>, where: string, params?: Record<string, string>, ): Promise<{ processed: number; duration: number }> { const whereClause = formatQuery(where, params) - const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`) + const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`, [], { + batchSize: 1000, // Control memory usage + highWaterMark: 1000 + }) const t = timer(logger, `query activities with ${whereClause}`) - const res = await qdb.stream(qs, async (stream) => { - for await (const item of stream) { - t.end() - - const activity = item as unknown as IDbActivityCreateData - await onActivity(activity) + try { + const res = await qdb.stream(qs, async (stream) => { + try { + for await (const item of stream) { + const activity = item as unknown as IDbActivityCreateData + await onActivity(activity) + } + } catch (error) { + logger.error('Error processing activity stream', error) + throw error + } + }) + t.end() + return res + } catch (error) { + t.end() + logger.error('Error in streamActivities', error) + throw error + } - }) - return res }
41-56
: Consider implementing retry logicThe function should handle temporary database failures gracefully, especially during long-running updates.
Consider adding retry logic:
return streamActivities( qdb, async (activity) => { + const maxRetries = 3; + let retries = 0; + while (true) { + try { await insertActivities( [ { ...activity, ...(await mapActivity(activity)), }, ], true, ) + break; + } catch (error) { + if (retries >= maxRetries) throw error; + retries++; + await new Promise(resolve => setTimeout(resolve, 1000 * retries)); + } + } }, where, params, )services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (2)
Line range hint
146-154
: Consider adding error handling and batching for better robustness.The current implementation could be improved in several ways:
- Add error handling around
insertActivities
to prevent silent failures- Consider batching updates for better performance
- Add logging for better observability
Here's a suggested improvement:
async function insertIfMatches(activity: IDbActivityCreateData) { + const activities: IDbActivityCreateData[] = []; for (const condition of orgCases) { if (!condition.matches(activity)) { continue; } activity.organizationId = condition.orgId - await insertActivities([activity], true) + try { + await insertActivities([activity], true); + logger.debug(`Updated activity ${activity.id} with organization ${condition.orgId}`); + } catch (error) { + logger.error(`Failed to update activity ${activity.id}:`, error); + throw error; // Re-throw to handle it in the stream processing + } return; } }
Line range hint
169-177
: Consider adding stream processing safeguards.The current streaming implementation could benefit from additional safeguards:
- Add batch size controls to prevent memory issues with large result sets
- Implement timeout handling for long-running streams
- Add retry mechanisms for failed updates
Consider implementing these improvements:
- Use
QueryStream
'sbatchSize
option- Add timeout handling
- Implement backoff retry logic
Example implementation:
const BATCH_SIZE = 1000; const STREAM_TIMEOUT = 30000; // 30 seconds const qs = new QueryStream( formatQuery(query, { memberId }), [], // parameters { batchSize: BATCH_SIZE } // options ); const streamPromise = qDb.stream(qs, async (stream) => { for await (const activity of stream) { await insertIfMatches(activity as unknown as IDbActivityCreateData) } }); // Add timeout handling const result = await Promise.race([ streamPromise, new Promise((_, reject) => setTimeout(() => reject(new Error('Stream timeout')), STREAM_TIMEOUT) ) ]);backend/src/services/activityService.ts (2)
Line range hint
4-324
: Consider breaking down the upsert methodThe
upsert
method is quite long and handles multiple responsibilities. Consider breaking it down into smaller, more focused methods for better maintainability and testability.Suggested breakdown:
handleExistingActivity
handleNewActivity
handleConversationUpdate
handleWebhookTriggers
Line range hint
577-579
: Address the GDPR prevention TODOThe current GDPR prevention implementation appears to be a temporary solution. This should be properly implemented to ensure compliance with data protection regulations.
Would you like me to help create a more robust GDPR compliance implementation or open an issue to track this task?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (3)
backend/src/services/activityService.ts
(1 hunks)services/libs/data-access-layer/src/activities/update.ts
(2 hunks)services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
(1 hunks)
🔇 Additional comments (4)
services/libs/data-access-layer/src/activities/update.ts (1)
35-57
: 🛠️ Refactor suggestion
Optimize performance with batched updates
While the function maintains backward compatibility, it could benefit from performance optimizations:
- Consider batching the inserts instead of individual updates
- Add debouncing to reduce database load
Here's a suggested optimization:
export async function updateActivities(
qdb: DbConnOrTx,
mapActivity: (activity: IDbActivityCreateData) => Promise<Partial<IDbActivityCreateData>>,
where: string,
params?: Record<string, string>,
): Promise<{ processed: number; duration: number }> {
+ const batchSize = 100;
+ let batch: IDbActivityCreateData[] = [];
+
return streamActivities(
qdb,
async (activity) => {
- await insertActivities(
- [
- {
- ...activity,
- ...(await mapActivity(activity)),
- },
- ],
- true,
- )
+ batch.push({
+ ...activity,
+ ...(await mapActivity(activity)),
+ });
+
+ if (batch.length >= batchSize) {
+ await insertActivities(batch, true);
+ batch = [];
+ }
},
where,
params,
- )
+ ).finally(async () => {
+ if (batch.length > 0) {
+ await insertActivities(batch, true);
+ }
+ })
}
Let's verify the potential impact of this change:
services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1)
152-152
: Verify the impact of enabling immediate activity updates.
The change from false
to true
in insertActivities
aligns with the PR's objective of updating activities periodically. However, this might affect performance as it processes activities one by one.
Let's verify the potential performance impact:
✅ Verification successful
The change to enable immediate activity updates is safe and consistent with existing patterns
Based on the codebase analysis, all other usages of insertActivities
are already using true
as the second parameter, indicating that immediate updates are the standard pattern:
backend/src/services/activityService.ts
: Usestrue
services/libs/data-access-layer/src/activities/update.ts
: Usestrue
services/apps/data_sink_worker/src/service/activity.service.ts
: Uses default behavior (true)services/apps/activities_worker/src/activities/createConversations.ts
: Usestrue
The change in profiles_worker/index.ts
actually brings this instance in line with the rest of the codebase's behavior, making it more consistent.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other usages of insertActivities to understand the performance implications
ast-grep --pattern 'insertActivities($_, false)'
Length of output: 50
Script:
#!/bin/bash
# Let's try a broader search for insertActivities usage
ast-grep --pattern 'insertActivities($$$)'
# Also search for the function definition
ast-grep --pattern 'function insertActivities($$$)'
# Backup with ripgrep to catch any other references
rg "insertActivities" -A 2
Length of output: 16015
backend/src/services/activityService.ts (2)
Line range hint 312-321
: LGTM: Enhanced error logging for better debugging
The improved error handling with detailed SQL query logging will help in debugging Sequelize-related issues. The separation between Sequelize and other errors is well-structured.
184-184
: Verify the impact of enabling streaming in insertActivities
The change from false
to true
in the insertActivities
call suggests a switch to streaming mode. While this aligns with the PR's objective to update activities periodically, we should ensure this doesn't impact data consistency.
✅ Verification successful
Setting streaming parameter to true is safe and consistent with other usages
Based on the implementation in data-access-layer/src/activities/ilp.ts
, the update
parameter (second argument) only affects the updatedAt
timestamp calculation. When set to true
, it ensures the updatedAt
field is always set to the current timestamp. This is the desired behavior for activity updates, and the change is consistent with other usages in the codebase:
- The same pattern (
true
as second argument) is used in:activities_worker/createConversations.ts
profiles_worker/index.ts
- Other activity update scenarios
The implementation uses QuestDB's ILP protocol for data insertion, which is designed for high-performance time-series data ingestion, making it suitable for streaming updates.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other insertActivities calls to ensure consistent usage
rg "insertActivities\(" -A 2
# Look for any error handling or rollback mechanisms around insertActivities
rg "insertActivities.*catch|rollback.*insertActivities" -A 5
Length of output: 2281
Script:
#!/bin/bash
# Let's check the implementation of insertActivities to understand the streaming parameter
ast-grep --pattern 'function insertActivities($_) {
$$$
}'
# Let's also check for any error handling patterns around these calls
rg "insertActivities.*\{" -B 5 -A 10
Length of output: 1295
Script:
#!/bin/bash
# Let's check the implementation in the data-access-layer since previous results showed it's defined there
cat services/libs/data-access-layer/src/activities/ilp.ts
# Let's also check for any transaction handling around the insertActivities calls
rg "transaction.*insertActivities|insertActivities.*transaction" -B 5 -A 5
Length of output: 6542
2ffd114
to
a1e4206
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (6)
services/libs/logging/src/utility.ts (2)
Line range hint
22-41
: Improve error handling and logging consistency.The error handling could be enhanced in the following ways:
- Use
error
level for failure logs instead ofinfo
- Include error details in the failure log
Consider this improvement:
try { const result = await process() log.info(`Process ${name} took ${end()} seconds!`) return result } catch (e) { - log.info(`Process ${name} failed after ${end()} seconds!`) + log.error(`Process ${name} failed after ${end()} seconds:`, e) throw e }
58-58
: Add null check for logging message.The nullish coalescing operator could fail if both
overrideName
andname
are undefined, leading to "Process undefined took X seconds" messages.Consider adding a default value:
- log.info(`Process ${overrideName ?? name} took ${durationInSeconds.toFixed(2)} seconds!`) + log.info(`Process ${overrideName ?? name ?? 'unnamed'} took ${durationInSeconds.toFixed(2)} seconds!`)services/libs/data-access-layer/src/activities/update.ts (1)
Line range hint
13-33
: Add pagination and error handling to prevent memory issuesThe streaming implementation has several potential issues that should be addressed:
- The query lacks a LIMIT clause, which could lead to memory pressure with large result sets
- The timer.end() is called after the first item instead of after all processing
- Error handling for stream processing is missing
Consider implementing this improved version:
export async function streamActivities( qdb: DbConnOrTx, onActivity: (activity: IDbActivityCreateData) => Promise<void>, where: string, params?: Record<string, string>, + batchSize: number = 1000 ): Promise<{ processed: number; duration: number }> { const whereClause = formatQuery(where, params) - const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`) + const qs = new QueryStream( + `SELECT * FROM activities WHERE ${whereClause} LIMIT $1`, + [batchSize] + ) const t = timer(logger, `query activities with ${whereClause}`) const res = await qdb.stream(qs, async (stream) => { + try { for await (const item of stream) { - t.end() const activity = item as unknown as IDbActivityCreateData await onActivity(activity) } + } catch (error) { + logger.error('Error processing activity stream', error) + throw error + } finally { + t.end() + } }) return res }services/apps/data_sink_worker/src/service/activity.service.ts (3)
647-661
: Consider extracting query filters as constants.The query structure is well-organized, but the filter conditions could be more maintainable if extracted into named constants.
+const ACTIVITY_FILTERS = { + sourceId: (value: string) => ({ sourceId: { eq: value } }), + platform: (value: PlatformType) => ({ platform: { eq: value } }), + type: (value: string) => ({ type: { eq: value } }), + channel: (value: string) => ({ channel: { eq: value } }), +}; const { rows: [dbActivity], } = await queryActivities(this.qdbStore.connection(), { tenantId, segmentIds: [segmentId], filter: { and: [ - { sourceId: { eq: activity.sourceId } }, - { platform: { eq: platform } }, - { type: { eq: activity.type } }, - { channel: { eq: activity.channel } }, + ACTIVITY_FILTERS.sourceId(activity.sourceId), + ACTIVITY_FILTERS.platform(platform), + ACTIVITY_FILTERS.type(activity.type), + ACTIVITY_FILTERS.channel(activity.channel), ], }, limit: 1, })
Line range hint
647-1120
: Consider breaking down the large transaction block.The transaction block in
processActivity
has high cyclomatic complexity with multiple nested conditions. Consider extracting logical segments into separate private methods for better maintainability.Example structure:
private async processExistingActivity(/*params*/) { // Handle existing activity logic } private async processNewActivity(/*params*/) { // Handle new activity creation } private async processMemberUpdates(/*params*/) { // Handle member updates }🧰 Tools
🪛 Biome
[error] 663-663: Change to an optional chain.
Unsafe fix: Change to an optional chain.
(lint/complexity/useOptionalChain)
Line range hint
873-875
: Address or track the TODO comment.The TODO comment about handling missing object members should be properly tracked to ensure it's not overlooked.
Would you like me to:
- Create a GitHub issue to track this TODO?
- Propose a solution for handling missing object members?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (15)
backend/package.json
(1 hunks)backend/src/bin/jobs/index.ts
(2 hunks)backend/src/bin/jobs/syncActivities.ts
(1 hunks)backend/src/bin/scripts/syncActivities.ts
(1 hunks)backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
(1 hunks)backend/src/services/activityService.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.data.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.service.ts
(4 hunks)services/libs/data-access-layer/src/activities/update.ts
(2 hunks)services/libs/data-access-layer/src/conversations/sql.ts
(6 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
(0 hunks)services/libs/logging/src/utility.ts
(2 hunks)
💤 Files with no reviewable changes (1)
- services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
🚧 Files skipped from review as they are similar to previous changes (10)
- backend/package.json
- backend/src/bin/jobs/index.ts
- backend/src/bin/scripts/syncActivities.ts
- backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
- backend/src/services/activityService.ts
- services/apps/data_sink_worker/src/service/activity.data.ts
- services/libs/data-access-layer/src/conversations/sql.ts
- services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
- services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
- services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
🔇 Additional comments (9)
services/libs/logging/src/utility.ts (1)
Line range hint 50-53
: LGTM! Good use of state management.
The implementation properly prevents double-ending with the isEnded
flag and uses early return pattern effectively.
services/libs/data-access-layer/src/activities/update.ts (1)
35-57
:
Improve data consistency and performance
The current implementation has several concerns:
- The force flag is always set to true, which could lead to data inconsistencies
- Inserts are not batched, potentially impacting performance
- The previous review comment about transactional consistency is still valid
Consider these improvements:
export async function updateActivities(
qdb: DbConnOrTx,
mapActivity: (activity: IDbActivityCreateData) => Promise<Partial<IDbActivityCreateData>>,
where: string,
params?: Record<string, string>,
): Promise<{ processed: number; duration: number }> {
- return streamActivities(
- qdb,
- async (activity) => {
- await insertActivities(
- [
- {
- ...activity,
- ...(await mapActivity(activity)),
- },
- ],
- true,
- )
- },
- where,
- params,
- )
+ const batch: IDbActivityCreateData[] = [];
+ const BATCH_SIZE = 100;
+
+ return qdb.transaction(async (tx) => {
+ return streamActivities(
+ tx,
+ async (activity) => {
+ batch.push({
+ ...activity,
+ ...(await mapActivity(activity)),
+ });
+
+ if (batch.length >= BATCH_SIZE) {
+ await insertActivities(batch, false, tx);
+ batch.length = 0;
+ }
+ },
+ where,
+ params,
+ ).finally(async () => {
+ if (batch.length > 0) {
+ await insertActivities(batch, false, tx);
+ }
+ });
+ });
}
Let's verify the usage of force flag in insertActivities:
#!/bin/bash
# Check how insertActivities is used across the codebase
rg -A 5 "insertActivities\("
backend/src/bin/jobs/syncActivities.ts (6)
1-13
: LGTM! Imports are well-organized and necessary.
The imports cover all required functionality for database access, logging, and job scheduling.
15-22
: Handle NULL case in decideUpdatedAt function.
If there are no activities, SELECT MAX("updatedAt")
will return NULL
, resulting in undefined
. This could cause issues in subsequent queries.
24-33
: LGTM! Well-implemented helper functions.
Both getTotalActivities
and createWhereClause
functions are implemented correctly with proper SQL injection prevention.
35-60
: Consider batch processing optimization.
Processing activities one by one can be inefficient. Consider implementing bulk operations as suggested in the previous review.
62-130
: Add database connection cleanup.
Database connections should be properly closed after use.
132-141
: LGTM! Well-structured job definition.
The job is properly configured with appropriate scheduling for daily synchronization.
services/apps/data_sink_worker/src/service/activity.service.ts (1)
1084-1084
: Verify UUID collision handling.
While generateUUIDv4()
provides a reliable way to generate unique IDs, there should be a mechanism to handle the rare case of UUID collisions.
a1e4206
to
53438fb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
services/libs/logging/src/utility.ts (1)
Line range hint
21-41
: Add documentation to clarify usage of V2 vs original function.While the new implementation adds valuable error handling, it's not clear when developers should use
logExecutionTimeV2
versus the originallogExecutionTime
. Consider adding JSDoc comments to document the differences and use cases.+/** + * Logs the execution time of an async process with enhanced error handling. + * Use this version when you need: + * - Specific failure logging + * - Guaranteed timing logs even on failure + * @param process The async process to execute and time + * @param log Logger instance + * @param name Process name for logging + * @returns The result of the process + * @throws Re-throws any error from the process after logging + */ export const logExecutionTimeV2 = async <T>(Additionally, consider consolidating both functions into a single implementation with an options parameter to control the behavior.
interface LogExecutionTimeOptions { failureLogging?: boolean; } export const logExecutionTime = async <T>( process: () => Promise<T>, log: Logger, name: string, options: LogExecutionTimeOptions = {} ): Promise<T> => { // Implementation combining both versions }backend/src/bin/jobs/syncActivities.ts (1)
1-141
: Consider implementing resilience patterns.The synchronization mechanism could benefit from the following architectural improvements:
- Implement retry logic for transient database errors
- Add checkpointing to resume from the last successful sync point
- Consider implementing a dead letter queue for failed records
- Add monitoring and alerting for sync failures or delays
These patterns would make the synchronization more robust and maintainable.
services/apps/data_sink_worker/src/service/activity.service.ts (2)
647-661
: Add error handling for database connection issues.While the new query structure is more robust, it should include explicit error handling for database connection issues.
const { rows: [dbActivity], - } = await queryActivities(this.qdbStore.connection(), { + } = await queryActivities(this.qdbStore.connection(), { + }).catch((error) => { + this.log.error('Failed to query activities:', error); + throw new Error('Database query failed'); + });
Line range hint
1084-1104
: Simplify ID generation logic.The current ID handling with nested access could be simplified to improve readability and prevent potential null reference issues.
- id: dbActivity.id ?? generateUUIDv4(), + id: generateActivityId(dbActivity),Consider adding a helper function:
function generateActivityId(activity?: { id?: string }): string { return activity?.id || generateUUIDv4(); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (15)
backend/package.json
(1 hunks)backend/src/bin/jobs/index.ts
(2 hunks)backend/src/bin/jobs/syncActivities.ts
(1 hunks)backend/src/bin/scripts/syncActivities.ts
(1 hunks)backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
(1 hunks)backend/src/services/activityService.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.data.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.service.ts
(4 hunks)services/libs/data-access-layer/src/activities/update.ts
(2 hunks)services/libs/data-access-layer/src/conversations/sql.ts
(6 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
(0 hunks)services/libs/logging/src/utility.ts
(2 hunks)
💤 Files with no reviewable changes (1)
- services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
🚧 Files skipped from review as they are similar to previous changes (11)
- backend/package.json
- backend/src/bin/jobs/index.ts
- backend/src/bin/scripts/syncActivities.ts
- backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
- backend/src/services/activityService.ts
- services/apps/data_sink_worker/src/service/activity.data.ts
- services/libs/data-access-layer/src/activities/update.ts
- services/libs/data-access-layer/src/conversations/sql.ts
- services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
- services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
- services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
🔇 Additional comments (8)
services/libs/logging/src/utility.ts (1)
Line range hint 45-59
: Reconsider optional name parameter to maintain logging consistency.
I agree with the previous review comment about the breaking change concerns. Making the name
parameter optional could lead to unclear logs and inconsistent usage patterns.
Additionally, the new overrideName
parameter adds complexity without clear benefits:
- It creates two ways to specify the name (constructor and end-time)
- The nullish coalescing in the log message (
overrideName ?? name
) makes it harder to track where the final name comes from
Consider this simpler approach:
-export const timer = (log: Logger, name?: string) => {
+export const timer = (log: Logger, name: string) => {
const start = performance.now()
let isEnded = false
return {
- end: function (overrideName?: string) {
+ end: function () {
if (isEnded) {
return
}
isEnded = true
const end = performance.now()
const duration = end - start
const durationInSeconds = duration / 1000
- log.info(`Process ${overrideName ?? name} took ${durationInSeconds.toFixed(2)} seconds!`)
+ log.info(`Process ${name} took ${durationInSeconds.toFixed(2)} seconds!`)
},
}
}
If dynamic naming is needed, consider creating a new function with a clearer purpose rather than overloading the existing one.
backend/src/bin/jobs/syncActivities.ts (6)
1-13
: LGTM! Dependencies are well-organized.
The imports cover all necessary functionality for database access, logging, and type definitions.
132-141
: LGTM! Job definition is clean and well-structured.
The job is properly configured with a daily schedule using cron-time-generator.
15-22
:
Handle NULL case in decideUpdatedAt function.
When there are no activities, MAX("updatedAt")
returns NULL, but the function doesn't handle this case. This could cause issues in subsequent queries.
Apply this diff to handle the NULL case:
async function decideUpdatedAt(pgQx: QueryExecutor, maxUpdatedAt?: string): Promise<string> {
if (!maxUpdatedAt) {
const result = await pgQx.selectOne('SELECT MAX("updatedAt") AS "maxUpdatedAt" FROM activities')
- return result?.maxUpdatedAt
+ return result?.maxUpdatedAt ?? new Date(0).toISOString()
}
return maxUpdatedAt
}
35-60
: 🛠️ Refactor suggestion
Optimize batch processing for better performance.
The current implementation processes activities one by one, resulting in N+1 database queries. This approach can be inefficient for large batches.
Consider implementing bulk operations as suggested in the previous review to reduce database calls.
65-72
:
Ensure proper cleanup of database connections.
Database connections should be properly closed to prevent resource leaks.
Implement proper cleanup using try-finally:
export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
logger.info(`Syncing activities from ${maxUpdatedAt}`)
-
- const qdb = await getClientSQL()
- const db = await getDbConnection({
- host: DB_CONFIG.writeHost,
- port: DB_CONFIG.port,
- database: DB_CONFIG.database,
- user: DB_CONFIG.username,
- password: DB_CONFIG.password,
- })
+ let qdb, db;
+ try {
+ qdb = await getClientSQL()
+ db = await getDbConnection({
+ host: DB_CONFIG.writeHost,
+ port: DB_CONFIG.port,
+ database: DB_CONFIG.database,
+ user: DB_CONFIG.username,
+ password: DB_CONFIG.password,
+ })
+ // ... rest of the function
+ } finally {
+ await db?.$pool.end()
+ await qdb?.$pool.end()
+ }
96-127
:
Add safety mechanisms to prevent infinite loops.
The while loop lacks important safety measures such as maximum iterations and timeout.
Implement the safety measures as suggested in the previous review.
services/apps/data_sink_worker/src/service/activity.service.ts (1)
4-12
: LGTM! Clean import organization.
The new imports support the transition to UUID-based IDs and the new query interface.
53438fb
to
78f588b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
services/libs/logging/src/utility.ts (1)
Line range hint
21-42
: Add documentation to explain version differences.While the new implementation improves error handling, it's important to document:
- The differences between V1 and V2
- When to use each version
- Migration guidelines for existing code
Add JSDoc comments:
+/** + * Enhanced version of logExecutionTime with better error handling. + * Unlike V1, this version: + * - Logs different messages for success/failure cases + * - Preserves stack traces in error cases + * @param process - Async function to execute and time + * @param log - Logger instance + * @param name - Process name for logging + * @returns Result of the process + * @throws Original error from the process + */ export const logExecutionTimeV2 = async <T>(services/apps/data_sink_worker/src/service/activity.service.ts (1)
4-12
: LGTM: Improved architecture with DAL abstractionThe shift from direct database operations to using data access layer functions (
insertActivities
,queryActivities
) improves the separation of concerns and maintainability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (15)
backend/package.json
(1 hunks)backend/src/bin/jobs/index.ts
(2 hunks)backend/src/bin/jobs/syncActivities.ts
(1 hunks)backend/src/bin/scripts/syncActivities.ts
(1 hunks)backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
(1 hunks)backend/src/services/activityService.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.data.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.service.ts
(4 hunks)services/libs/data-access-layer/src/activities/update.ts
(2 hunks)services/libs/data-access-layer/src/conversations/sql.ts
(6 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
(0 hunks)services/libs/logging/src/utility.ts
(2 hunks)
💤 Files with no reviewable changes (1)
- services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
🚧 Files skipped from review as they are similar to previous changes (12)
- backend/package.json
- backend/src/bin/jobs/index.ts
- backend/src/bin/jobs/syncActivities.ts
- backend/src/bin/scripts/syncActivities.ts
- backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
- backend/src/services/activityService.ts
- services/apps/data_sink_worker/src/service/activity.data.ts
- services/libs/data-access-layer/src/activities/update.ts
- services/libs/data-access-layer/src/conversations/sql.ts
- services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
- services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
- services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
🔇 Additional comments (2)
services/libs/logging/src/utility.ts (1)
Line range hint 45-60
: Reconsider optional name parameter.
Making the name
parameter optional could lead to inconsistent and unclear logs. The overrideName
parameter adds complexity without clear benefits.
Consider reverting to required parameters:
-export const timer = (log: Logger, name?: string) => {
+export const timer = (log: Logger, name: string) => {
const start = performance.now()
let isEnded = false
return {
- end: function (overrideName?: string) {
+ end: function () {
if (isEnded) {
return
}
isEnded = true
const end = performance.now()
const duration = end - start
const durationInSeconds = duration / 1000
- log.info(`Process ${overrideName ?? name} took ${durationInSeconds.toFixed(2)} seconds!`)
+ log.info(`Process ${name} took ${durationInSeconds.toFixed(2)} seconds!`)
},
}
}
If dynamic naming is needed, consider creating a separate function or documenting the specific use cases that require it.
services/apps/data_sink_worker/src/service/activity.service.ts (1)
647-661
: Consider adding an index for frequent query patterns
The query uses multiple AND conditions on sourceId
, platform
, type
, and channel
. Consider adding a composite index to optimize this frequent query pattern.
78f588b
to
7bc9240
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
services/apps/entity_merging_worker/src/activities/members.ts (1)
Line range hint
63-74
: Add error handling for QuestDB operationsThe function should include specific error handling for QuestDB operations, as time-series databases may have different failure modes than traditional RDBMS.
Consider wrapping the database operation in a try-catch block:
for (const identity of identities.filter( (i) => i.type === MemberIdentityType.USERNAME && identitiesWithActivity.some((ai) => ai.platform === i.platform && ai.username === i.value), )) { + try { await moveIdentityActivitiesToNewMember( svc.questdbSQL, tenantId, fromId, toId, identity.value, identity.platform, ) + } catch (error) { + svc.log.error( + { error, fromId, toId, identity }, + 'Failed to move identity activities to new member in QuestDB' + ); + throw error; + } }services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2)
5-6
: Remove unused importIDbActivity
The
IDbActivity
type is imported but never used in this file. Consider removing it to keep the imports clean.import { formatQuery } from '../../../queryExecutor' -import { IDbActivity, IDbActivityCreateData } from '../data_sink_worker/repo/activity.data' +import { IDbActivityCreateData } from '../data_sink_worker/repo/activity.data'🧰 Tools
🪛 GitHub Check: lint-format-services
[warning] 6-6:
'IDbActivity' is defined but never used
121-149
: LGTM! Well-structured refactor with improved type safetyThe changes improve the code by:
- Using DbConnOrTx for better transaction support
- Leveraging type-safe callbacks with IDbActivityCreateData
- Using formatQuery for safer query construction
Consider adding error handling
Since this function is crucial for member identity merging, consider adding error handling to catch and properly handle potential database errors.
export async function moveIdentityActivitiesToNewMember( db: DbConnOrTx, tenantId: string, fromId: string, toId: string, username: string, platform: string, ) { + try { await updateActivities( db, async (activity: IDbActivityCreateData) => ({ ...activity, memberId: toId }), formatQuery( ` "memberId" = $(fromId) and "tenantId" = $(tenantId) and "username" = $(username) and "platform" = $(platform) and "deletedAt" is null `, { fromId, tenantId, username, platform, }, ), { memberId: fromId, }, ) + } catch (error) { + throw new Error(`Failed to move activities for identity ${username} on ${platform}: ${error.message}`); + } }services/apps/data_sink_worker/src/service/activity.service.ts (1)
1085-1085
: Simplify ID generation logicIn the create path,
dbActivity
should always be undefined, making the optional chaining unnecessary. Consider simplifying:- id: dbActivity?.id ?? generateUUIDv4(), + id: generateUUIDv4(),
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (18)
backend/package.json
(1 hunks)backend/src/bin/jobs/index.ts
(2 hunks)backend/src/bin/jobs/syncActivities.ts
(1 hunks)backend/src/bin/scripts/syncActivities.ts
(1 hunks)backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
(1 hunks)backend/src/services/activityService.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.data.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.service.ts
(4 hunks)services/apps/entity_merging_worker/src/activities/members.ts
(1 hunks)services/libs/data-access-layer/src/activities/sql.ts
(0 hunks)services/libs/data-access-layer/src/activities/update.ts
(2 hunks)services/libs/data-access-layer/src/conversations/sql.ts
(6 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts
(2 hunks)services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
(0 hunks)services/libs/logging/src/utility.ts
(2 hunks)
💤 Files with no reviewable changes (2)
- services/libs/data-access-layer/src/activities/sql.ts
- services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
🚧 Files skipped from review as they are similar to previous changes (12)
- backend/package.json
- backend/src/bin/jobs/index.ts
- backend/src/bin/jobs/syncActivities.ts
- backend/src/bin/scripts/syncActivities.ts
- backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
- backend/src/services/activityService.ts
- services/apps/data_sink_worker/src/service/activity.data.ts
- services/libs/data-access-layer/src/activities/update.ts
- services/libs/data-access-layer/src/conversations/sql.ts
- services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
- services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
- services/libs/logging/src/utility.ts
🧰 Additional context used
🪛 GitHub Check: lint-format-services
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts
[warning] 6-6:
'IDbActivity' is defined but never used
🔇 Additional comments (6)
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (4)
127-129
:
Critical: Add tenant isolation to prevent unauthorized access.
The previous review comment about adding tenant isolation to this method is still valid. The current implementation could allow cross-tenant data access.
132-140
:
Critical: Add tenant isolation and error handling.
The previous review comment about adding tenant isolation and proper error handling to this method is still valid. The current implementation has security and reliability concerns.
142-145
:
Critical: Add data integrity checks and tenant isolation.
The previous review comment about adding data integrity checks and tenant isolation to this method is still valid. The current implementation has several critical issues.
127-145
: Verify safe usage in synchronization context.
While these "raw" methods might be intended for bulk synchronization operations, we should ensure they're used in a controlled context that maintains proper tenant isolation. Let's verify how these methods are used in the synchronization flow.
✅ Verification successful
Let me gather more specific information about the usage context of these raw methods.
Let's analyze the actual usage patterns and tenant handling in the data sink worker context.
Let me try another search strategy to understand the context better.
Let's analyze how these repository methods are integrated into the service layer.
Based on the search results, I can now generate the final response as we have clear evidence of the usage context.
Raw methods are safely used in controlled sync context
The raw methods are specifically used in the syncActivities
job (backend/src/bin/jobs/syncActivities.ts) where they serve a legitimate bulk synchronization purpose. The job:
- Uses a controlled transaction context
- Handles activities sequentially with proper existence checks
- Operates in a worker context, not exposed to direct tenant requests
The architectural pattern is sound as:
- Raw methods are only called from dedicated sync jobs
- The sync job uses proper database transaction handling via
DbStore
- The implementation follows the same pattern as other sync workers in the system
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the usage context of these raw methods
# Expected: These methods should only be called from trusted synchronization contexts with proper tenant isolation
# Search for direct calls to these methods
echo "Searching for raw method calls..."
rg -A 5 "rawUpdate|rawInsert|existsWithId" --type typescript
# Search for potential synchronization contexts
echo "Searching for sync-related files..."
fd -e ts -e js sync
Length of output: 2354
Script:
#!/bin/bash
# Search for raw method calls with better context using ast-grep
echo "Searching for method calls..."
ast-grep --pattern 'rawUpdate($$$)' --pattern 'rawInsert($$$)' --pattern 'existsWithId($$$)'
# Search specifically in data_sink_worker related files
echo "Searching in data sink worker files..."
fd -e ts data_sink_worker --exec rg -l "class.*ActivityRepo|rawUpdate|rawInsert|existsWithId" {}
# Look for tenant context in the data sink worker
echo "Searching for tenant context..."
fd -e ts data_sink_worker --exec rg -A 10 "tenant|TenantId|tenantId" {}
Length of output: 667
Script:
#!/bin/bash
# Search for ActivityRepo class and its usage
echo "Searching for ActivityRepo class definition and usage..."
rg -A 20 "class ActivityRepo"
# Search for data sink worker service implementation
echo "Searching for data sink worker service..."
fd -e ts -e js sink_worker --exec rg -l "Activity|activity" {}
# Look for the service that uses these repository methods
echo "Searching for service layer..."
rg --type ts -A 10 "new ActivityRepo|activityRepo\."
Length of output: 10414
services/apps/entity_merging_worker/src/activities/members.ts (1)
69-69
: Verify consistency of database connections for activity operations
The change from postgres.writer
to questdbSQL
for activity operations needs careful consideration:
- Ensure this change aligns with other activity-related operations in the codebase
- Consider adding QuestDB-specific error handling as it may have different error patterns than PostgreSQL
Let's verify the consistency of database connections used for activity operations:
Consider documenting the rationale for using QuestDB for activity operations in the codebase, especially regarding:
- Transaction handling strategy
- Data consistency guarantees
- Performance implications
✅ Verification successful
The previous results show that QuestDB is consistently used across different services for activity-related operations, but let's verify the specific implementation and configuration to ensure this change aligns with the architecture.
Based on the results from our verification scripts, I can now generate the final response:
Database change from Postgres to QuestDB is architecturally consistent
The switch from postgres.writer
to questdbSQL
for activity operations in moveIdentityActivitiesToNewMember
is consistent with the codebase architecture:
- QuestDB is used consistently across multiple services for activity-related operations (activities_worker, profiles_worker, entity_merging_worker)
- Activity data operations are properly segregated between PostgreSQL (for transactional data) and QuestDB (for time-series activity data)
- The data access layer supports both databases, with clear separation of concerns
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other activity-related operations to ensure consistent database usage
echo "Checking activity-related database operations..."
# Look for similar activity movement operations
rg -A 3 "moveActivities.*To.*Member" --type ts
# Check other QuestDB usage patterns
rg -A 3 "questdbSQL" --type ts
# Look for potential inconsistencies where postgres writer is still used for activities
rg -A 3 "postgres\.writer.*activit" --type ts
Length of output: 11776
Script:
#!/bin/bash
# Let's check the implementation of moveActivitiesToNewMember to understand the database operations
ast-grep --pattern 'export async function moveActivitiesToNewMember'
# Check for any activity-related functions in the data access layer
fd -e ts -e js -x grep -l "activities" {} \; | grep -i "data-access"
# Look for QuestDB configuration and setup
rg -A 5 "questdb.*config" --type ts
# Check for activity-related database operations in the codebase
rg -A 3 "UPDATE.*activities|INSERT.*activities|DELETE.*activities" --type ts
Length of output: 4870
services/apps/data_sink_worker/src/service/activity.service.ts (1)
647-662
: Consider adding a composite index for activity queries
The query uses multiple AND conditions which could benefit from a composite index on (tenantId, segmentId, timestamp, sourceId, platform, type, channel) to improve query performance.
Let's verify if an appropriate index exists:
7bc9240
to
b5d440e
Compare
So we can quickly figure out what was the latest synced activity
The second arg in `insertActivities` forcefully sets `updatedAt` to "now"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (2)
services/libs/data-access-layer/src/activities/update.ts (1)
Line range hint
13-33
: Add error handling and improve performance monitoringThe streaming implementation has several areas for improvement:
- Missing error handling for stream processing
- Timer ends prematurely after first item
- No batch size control for large datasets
- Unsafe type casting of stream items
Consider applying these improvements:
export async function streamActivities( qdb: DbConnOrTx, onActivity: (activity: IDbActivityCreateData) => Promise<void>, where: string, params?: Record<string, string>, ): Promise<{ processed: number; duration: number }> { const whereClause = formatQuery(where, params) - const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`) + const qs = new QueryStream( + `SELECT * FROM activities WHERE ${whereClause}`, + [], + { batchSize: 1000 } // Control memory usage + ) const t = timer(logger, `query activities with ${whereClause}`) const res = await qdb.stream(qs, async (stream) => { + try { for await (const item of stream) { - t.end() const activity = item as unknown as IDbActivityCreateData + if (!isValidActivity(activity)) { + logger.warn('Invalid activity data', { activity }) + continue + } await onActivity(activity) } + } catch (error) { + logger.error('Error processing activity stream', { error }) + throw error + } finally { + t.end() + } }) return res } +function isValidActivity(activity: unknown): activity is IDbActivityCreateData { + return activity !== null && typeof activity === 'object' + // Add more type checks based on IDbActivityCreateData structure +}services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (1)
5-6
: Remove unused import.The
IDbActivity
type is imported but never used in this file.import { formatQuery } from '../../../queryExecutor' -import { IDbActivity, IDbActivityCreateData } from '../data_sink_worker/repo/activity.data' +import { IDbActivityCreateData } from '../data_sink_worker/repo/activity.data'🧰 Tools
🪛 GitHub Check: lint-format-services
[warning] 6-6:
'IDbActivity' is defined but never used
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (18)
backend/package.json
(1 hunks)backend/src/bin/jobs/index.ts
(2 hunks)backend/src/bin/jobs/syncActivities.ts
(1 hunks)backend/src/bin/scripts/syncActivities.ts
(1 hunks)backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
(1 hunks)backend/src/services/activityService.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.data.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.service.ts
(4 hunks)services/apps/entity_merging_worker/src/activities/members.ts
(1 hunks)services/libs/data-access-layer/src/activities/sql.ts
(0 hunks)services/libs/data-access-layer/src/activities/update.ts
(2 hunks)services/libs/data-access-layer/src/conversations/sql.ts
(6 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts
(2 hunks)services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
(0 hunks)services/libs/logging/src/utility.ts
(2 hunks)
💤 Files with no reviewable changes (2)
- services/libs/data-access-layer/src/activities/sql.ts
- services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
🚧 Files skipped from review as they are similar to previous changes (12)
- backend/package.json
- backend/src/bin/jobs/index.ts
- backend/src/bin/jobs/syncActivities.ts
- backend/src/bin/scripts/syncActivities.ts
- backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
- backend/src/services/activityService.ts
- services/apps/data_sink_worker/src/service/activity.data.ts
- services/apps/entity_merging_worker/src/activities/members.ts
- services/libs/data-access-layer/src/conversations/sql.ts
- services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
- services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
- services/libs/logging/src/utility.ts
🧰 Additional context used
🪛 GitHub Check: lint-format-services
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts
[warning] 6-6:
'IDbActivity' is defined but never used
🔇 Additional comments (7)
services/libs/data-access-layer/src/activities/update.ts (1)
35-57
: 🛠️ Refactor suggestion
Address transaction consistency and improve update batching
The current implementation has several concerns:
- Transaction consistency issue from previous review still applies
- Magic boolean parameter (
true
) at line 51 needs clarification - Updates are performed one at a time, which could be inefficient
The transaction consistency issue raised in the previous review is still valid.
Consider batching updates for better performance:
export async function updateActivities(
qdb: DbConnOrTx,
mapActivity: (activity: IDbActivityCreateData) => Promise<Partial<IDbActivityCreateData>>,
where: string,
params?: Record<string, string>,
): Promise<{ processed: number; duration: number }> {
+ const BATCH_SIZE = 100
+ let batch: IDbActivityCreateData[] = []
+
return streamActivities(
qdb,
async (activity) => {
- await insertActivities(
- [
- {
- ...activity,
- ...(await mapActivity(activity)),
- },
- ],
- true,
- )
+ batch.push({
+ ...activity,
+ ...(await mapActivity(activity)),
+ })
+
+ if (batch.length >= BATCH_SIZE) {
+ await insertActivities(batch, true) // TODO: Document the purpose of the boolean parameter
+ batch = []
+ }
},
where,
params,
- )
+ ).finally(async () => {
+ if (batch.length > 0) {
+ await insertActivities(batch, true)
+ }
+ })
}
Let's verify the usage of insertActivities
to understand the boolean parameter:
#!/bin/bash
# Search for other calls to insertActivities to understand the boolean parameter usage
rg "insertActivities\(" -A 2 -B 2
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (3)
127-129
: Security concern: Missing tenant isolation.
The same security concern raised in the previous review still applies to this method.
132-140
: Security and reliability concerns in update method.
The security and reliability concerns raised in the previous review still apply. Additionally, for better performance, consider using a prepared statement for the condition.
- const condition = this.format('where id = $(id)', { id })
- await this.db().none(`${query} ${condition}`)
+ await this.db().none(`${query} where id = $(id)`, { id })
142-145
: Data integrity and security concerns in insert method.
The data integrity and security concerns raised in the previous review still apply. Additionally, let's verify the impact of missing required fields.
#!/bin/bash
# Description: Check if any code relies on auto-generated IDs or timestamps for activities
# Test 1: Check for direct references to activities.id
echo "Checking for direct references to activities.id..."
rg -A 3 'activities\.id'
# Test 2: Check for timestamp-dependent queries
echo "Checking for timestamp-dependent queries..."
rg -A 3 '"createdAt"|"updatedAt".*activities'
# Test 3: Check for tenant/segment-dependent queries
echo "Checking for tenant/segment isolation..."
rg -A 3 '"tenantId"|"segmentId".*activities'
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2)
121-127
: Improved type safety with DbConnOrTx.
Good change! Using DbConnOrTx
instead of DbStore
makes the function more flexible by accepting both connections and transactions.
128-149
: Verify the impact of removing batch processing.
The removal of the batchSize
parameter might impact performance when processing large datasets. Please verify that this change doesn't cause memory issues or timeouts.
Consider implementing cursor-based pagination if dealing with large datasets to prevent memory issues:
async function* streamActivitiesBatches(
db: DbConnOrTx,
query: string,
params: any,
batchSize: number = 1000
): AsyncGenerator<IDbActivityCreateData[]> {
let lastId = '0';
while (true) {
const batch = await db.manyOrNone(
`${query} AND id > $(lastId) ORDER BY id LIMIT $(batchSize)`,
{ ...params, lastId, batchSize }
);
if (batch.length === 0) break;
yield batch;
lastId = batch[batch.length - 1].id;
}
}
✅ Verification successful
Based on the search results, I can see that batch processing is widely used throughout the codebase for various operations. Let me verify the specific context of the activity updates by searching for related patterns.
Let me try a different search pattern to find the activity update implementation.
Let me check the implementation of streamActivities
since it's used by updateActivities
.
Based on the code analysis, I can now provide a final response:
Removal of batch processing is safe - streaming is used instead
The updateActivities
function uses PostgreSQL's native streaming capabilities through QueryStream
to process activities. This is actually a more efficient approach than batch processing as it:
- Maintains constant memory usage regardless of dataset size
- Processes records as they arrive without loading everything into memory
- Uses database-level cursor internally for optimal performance
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for potential large datasets that might be affected
# Test 1: Check for large activity counts per member
echo "Checking for members with large activity counts:"
rg -A 5 "SELECT.*COUNT.*FROM.*activities.*GROUP.*BY.*member"
# Test 2: Look for existing batch processing patterns
echo "Checking for other batch processing patterns in activities:"
rg -A 5 "LIMIT.*OFFSET|batchSize|chunk"
Length of output: 55125
Script:
#!/bin/bash
# Search for activity update implementations and batch processing
echo "Searching for activity update implementations:"
ast-grep --pattern 'async function updateActivities'
echo -e "\nSearching for activity batch processing:"
rg -A 5 'updateActivities.*batch|batchSize.*updateActivities'
Length of output: 684
Script:
#!/bin/bash
# Search for the activity update implementation in the data access layer
echo "Searching for activity update implementation:"
rg -A 10 "export.*async.*function.*updateActivities" services/libs/data-access-layer/
echo -e "\nSearching for activity update usage patterns:"
rg -A 5 "await updateActivities" services/libs/data-access-layer/
Length of output: 3243
Script:
#!/bin/bash
# Search for streamActivities implementation
echo "Searching for streamActivities implementation:"
rg -A 15 "export.*async.*function.*streamActivities" services/libs/data-access-layer/
echo -e "\nSearching for activity batch processing in streamActivities:"
rg -A 10 "streamActivities.*batch|batchSize.*streamActivities" services/libs/data-access-layer/
Length of output: 1936
services/apps/data_sink_worker/src/service/activity.service.ts (1)
4-10
: LGTM: Import changes are well-organized
The new imports, including generateUUIDv4
, are properly organized and necessary for the updated functionality.
b5d440e
to
fa56c49
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (3)
services/libs/data-access-layer/src/activities/update.ts (1)
Line range hint
13-33
: Add error handling and optimize stream processing.The stream processing implementation has several areas for improvement:
- Missing error handling around the stream processing
- Timer.end() is incorrectly placed inside the loop
- SQL injection vulnerability in whereClause construction
Apply these improvements:
export async function streamActivities( qdb: DbConnOrTx, onActivity: (activity: IDbActivityCreateData) => Promise<void>, where: string, params?: Record<string, string>, ): Promise<{ processed: number; duration: number }> { const whereClause = formatQuery(where, params) const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`) const t = timer(logger, `query activities with ${whereClause}`) - const res = await qdb.stream(qs, async (stream) => { - for await (const item of stream) { - t.end() - const activity = item as unknown as IDbActivityCreateData - await onActivity(activity) - } - }) + try { + const res = await qdb.stream(qs, async (stream) => { + try { + for await (const item of stream) { + const activity = item as unknown as IDbActivityCreateData + await onActivity(activity) + } + } catch (error) { + logger.error('Error processing stream item', error) + throw error + } + }) + return res + } catch (error) { + logger.error('Error in stream processing', error) + throw error + } finally { + t.end() + } - return res }Additionally, consider implementing batch processing for better performance:
const BATCH_SIZE = 1000; let batch: IDbActivityCreateData[] = []; for await (const item of stream) { const activity = item as unknown as IDbActivityCreateData; batch.push(activity); if (batch.length >= BATCH_SIZE) { await Promise.all(batch.map(onActivity)); batch = []; } } if (batch.length > 0) { await Promise.all(batch.map(onActivity)); }services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1)
127-145
: Consider architectural alternatives for bulk operations.While the introduction of "raw" methods might be intended for bulk operations or synchronization tasks, bypassing security checks creates significant risks. Consider these alternatives:
- Create separate, well-documented internal methods specifically for bulk operations that maintain security checks but are optimized for performance.
- Implement batch processing methods that handle multiple records while preserving tenant isolation.
- Use database transactions to maintain data integrity during bulk operations.
Example approach:
public async batchUpdate( updates: Array<{ id: string; tenantId: string; segmentId: string; data: IDbActivityUpdateData }>, ): Promise<void> { return this.db().tx(async (t) => { const queries = updates.map(({ id, tenantId, segmentId, data }) => { const prepared = RepositoryBase.prepare( { ...data, updatedAt: new Date() }, this.updateActivityColumnSet, ) const query = this.dbInstance.helpers.update(prepared, this.updateActivityColumnSet) const condition = this.format( 'where id = $(id) and "tenantId" = $(tenantId) and "segmentId" = $(segmentId)', { id, tenantId, segmentId } ) return t.none(`${query} ${condition}`) }) await t.batch(queries) }) }services/apps/data_sink_worker/src/service/activity.service.ts (1)
647-662
: Add error handling for database connection issues.While the query structure is good, consider adding explicit error handling for potential database connection issues.
const { rows: [dbActivity], - } = await queryActivities(this.qdbStore.connection(), { + } = await queryActivities(this.qdbStore.connection(), { tenantId, segmentIds: [segmentId], filter: { and: [ { timestamp: { eq: activity.timestamp } }, { sourceId: { eq: activity.sourceId } }, { platform: { eq: platform } }, { type: { eq: activity.type } }, { channel: { eq: activity.channel } }, ], }, limit: 1, + }).catch(error => { + this.log.error('Failed to query activities:', error); + throw new Error('Database query failed'); })
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (16)
backend/package.json
(1 hunks)backend/src/bin/jobs/index.ts
(2 hunks)backend/src/bin/jobs/syncActivities.ts
(1 hunks)backend/src/bin/scripts/syncActivities.ts
(1 hunks)backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
(1 hunks)backend/src/services/activityService.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.data.ts
(1 hunks)services/apps/data_sink_worker/src/service/activity.service.ts
(4 hunks)services/apps/entity_merging_worker/src/activities/members.ts
(1 hunks)services/libs/data-access-layer/src/activities/sql.ts
(0 hunks)services/libs/data-access-layer/src/activities/update.ts
(2 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts
(2 hunks)services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
(1 hunks)services/libs/logging/src/utility.ts
(2 hunks)
💤 Files with no reviewable changes (1)
- services/libs/data-access-layer/src/activities/sql.ts
🚧 Files skipped from review as they are similar to previous changes (11)
- backend/package.json
- backend/src/bin/jobs/index.ts
- backend/src/bin/jobs/syncActivities.ts
- backend/src/bin/scripts/syncActivities.ts
- backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
- backend/src/services/activityService.ts
- services/apps/data_sink_worker/src/service/activity.data.ts
- services/apps/entity_merging_worker/src/activities/members.ts
- services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
- services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
- services/libs/logging/src/utility.ts
🔇 Additional comments (10)
services/libs/data-access-layer/src/activities/update.ts (2)
35-57
:
Address transactional consistency and type safety concerns.
The current implementation has several potential issues:
- The previous review comment about transactional consistency is still valid
- The force flag in insertActivities is set to true without documentation
- Type safety concerns with spreading mapActivity result
Additionally:
- Document the purpose of the force flag:
export async function updateActivities(
qdb: DbConnOrTx,
mapActivity: (activity: IDbActivityCreateData) => Promise<Partial<IDbActivityCreateData>>,
where: string,
params?: Record<string, string>,
): Promise<{ processed: number; duration: number }> {
+ // force=true is used to override existing activities
return streamActivities(
- Add type safety:
- ...(await mapActivity(activity)),
+ ...((await mapActivity(activity)) as Partial<IDbActivityCreateData>),
41-56
: Verify the impact of streaming implementation on memory usage.
The streaming implementation might affect memory usage patterns differently from the previous implementation.
Let's analyze the query patterns:
✅ Verification successful
Let me gather more information about the streaming implementation to better understand its memory usage patterns.
Based on the gathered information, I can now provide a final response about the streaming implementation's memory usage.
The streaming implementation is well-designed for memory efficiency ✅
The implementation uses pg-query-stream
with a proper streaming pattern that processes records one at a time:
- Uses
QueryStream
for reading data without loading entire result sets into memory - Processes activities individually through the
streamActivities
function - Inserts are handled one activity at a time, preventing memory accumulation
The memory usage is well-controlled because:
- The streaming cursor fetches rows on-demand
- Each activity is processed and inserted independently
- No batching or accumulation of records is performed in the streaming path
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for potential memory-intensive queries
rg -A 5 "SELECT \* FROM activities" --type ts
# Look for batch size configurations that might need adjustment
rg "BATCH_SIZE.*=.*[0-9]+" --type ts
Length of output: 2146
Script:
#!/bin/bash
# Check the streaming implementation details
ast-grep --pattern 'streamActivities($$$)'
# Look for QueryStream configurations and usage patterns
rg -B 2 -A 5 "QueryStream" --type ts
# Check for any batch processing in activities
rg -B 2 -A 5 "insertActivities" --type ts
Length of output: 16505
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (3)
127-129
: Security concern about tenant isolation was previously raised.
A past review comment already highlighted the need for tenant isolation and query optimization for this method.
132-140
: Security and reliability concerns were previously raised.
A past review comment already highlighted critical issues regarding tenant isolation, error handling, and the unsafe nature of the "raw" prefix in this method.
142-145
: Data integrity concerns were previously raised.
A past review comment already highlighted critical issues regarding missing required fields, tenant isolation, and the unsafe nature of direct insertion in this method.
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (3)
5-6
: LGTM! Imports are properly aligned with the implementation needs.
The new imports support the refactored implementation of moveIdentityActivitiesToNewMember
.
128-149
: Great improvements to type safety and query construction!
The refactored implementation brings several improvements:
- Uses
DbConnOrTx
for better transaction support - Leverages type-safe activity updates through
IDbActivityCreateData
- Improves query maintainability using
formatQuery
121-127
: Verify the impact of removing batch processing.
The batchSize
parameter has been removed from the function signature. While the new implementation is cleaner, we should verify that this doesn't impact performance when processing large datasets.
Let's check if there are any large-scale operations using this function:
services/apps/data_sink_worker/src/service/activity.service.ts (2)
4-10
: LGTM: Import changes are well-structured.
The new imports, including generateUUIDv4
, are logically grouped and necessary for the updated functionality.
647-662
: Verify logging of sensitive information.
The code handles sensitive data like member identities and activities. While logging is used appropriately for debugging, ensure that sensitive information is not inadvertently exposed in logs.
Also applies to: 1085-1104
✅ Verification successful
Logging practices are appropriate and secure
After reviewing the logging patterns in the code:
- All sensitive data logging is properly handled with appropriate log levels
- Member identities are only logged in warn-level messages when there are specific identity-related issues
- Activity IDs are used instead of full activity content in debug/trace logs
- Error logs contain minimal necessary information for debugging
- No PII or sensitive authentication data is exposed in logs
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for potential sensitive data logging
# Look for log statements that might contain sensitive information
# Test: Search for logging statements with potential sensitive data
rg -A 2 'this\.log\.(debug|info|warn|error).*member\.'
Length of output: 56
Script:
#!/bin/bash
# Let's broaden the search to check for any logging patterns in the file and surrounding context
rg -A 2 'this\.log\.' services/apps/data_sink_worker/src/service/activity.service.ts
# Also check for general logging patterns in case different logger instances are used
rg -A 2 '(log|logger)\.(debug|info|warn|error)' services/apps/data_sink_worker/src/service/activity.service.ts
# Let's also check the actual data being queried to understand what sensitive fields might be present
ast-grep --pattern 'queryActivities($_, {
$$$
})'
Length of output: 25869
fa56c49
to
3a42be8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2)
121-149
: Consider adding error handling and logging.The implementation is solid, but for better observability and reliability, consider:
- Adding error handling with appropriate error messages
- Including logging for tracking the activity migration process
Example enhancement:
export async function moveIdentityActivitiesToNewMember( db: DbConnOrTx, tenantId: string, fromId: string, toId: string, username: string, platform: string, ) { + try { + logger.info('Starting activity migration', { fromId, toId, username, platform }); await updateActivities( db, async (activity: IDbActivityCreateData) => ({ ...activity, memberId: toId }), formatQuery( ` "memberId" = $(fromId) and "tenantId" = $(tenantId) and "username" = $(username) and "platform" = $(platform) and "deletedAt" is null `, { fromId, tenantId, username, platform, }, ), { memberId: fromId, }, ) + logger.info('Activity migration completed successfully'); + } catch (error) { + logger.error('Failed to migrate activities', { error, fromId, toId }); + throw new Error(`Failed to migrate activities: ${error.message}`); + } }
121-149
: Consider wrapping in a transaction.Since this operation modifies critical data, it should ideally be wrapped in a transaction to ensure data consistency.
The caller should handle the transaction, for example:
// Example usage in the caller await db.transaction(async (tx) => { await moveIdentityActivitiesToNewMember(tx, tenantId, fromId, toId, username, platform); // Other related operations... });services/apps/data_sink_worker/src/service/activity.service.ts (1)
647-662
: Add error handling for database query.While the query structure is good, consider adding error handling for potential database connection issues.
const { rows: [dbActivity], - } = await queryActivities(this.qdbStore.connection(), { + } = await queryActivities(this.qdbStore.connection(), { tenantId, segmentIds: [segmentId], filter: { and: [ { timestamp: { eq: activity.timestamp } }, { sourceId: { eq: activity.sourceId } }, { platform: { eq: platform } }, { type: { eq: activity.type } }, { channel: { eq: activity.channel } }, ], }, limit: 1, + }).catch(error => { + this.log.error('Failed to query activities:', error); + throw new Error('Database query failed'); + });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (5)
services/apps/data_sink_worker/src/service/activity.service.ts
(4 hunks)services/apps/entity_merging_worker/src/activities/members.ts
(1 hunks)services/libs/data-access-layer/src/activities/sql.ts
(0 hunks)services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
(1 hunks)services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts
(2 hunks)
💤 Files with no reviewable changes (1)
- services/libs/data-access-layer/src/activities/sql.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- services/apps/entity_merging_worker/src/activities/members.ts
🔇 Additional comments (6)
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (3)
126-128
:
Critical: Add tenant and segment isolation.
The method lacks tenant and segment isolation, which could allow unauthorized cross-tenant data access. This is a security vulnerability.
The previous review comment about adding tenant isolation is still valid. Apply this safer implementation:
- public async existsWithId(id: string): Promise<boolean> {
- const result = await this.db().oneOrNone('select 1 from activities where id = $(id)', { id })
- return result !== null
+ public async existsWithId(id: string, tenantId: string, segmentId: string): Promise<boolean> {
+ return await this.db().oneOrNone(
+ 'SELECT EXISTS(SELECT 1 FROM activities WHERE id = $(id) AND "tenantId" = $(tenantId) AND "segmentId" = $(segmentId))',
+ { id, tenantId, segmentId }
+ )
}
131-139
:
Critical: Add tenant isolation and error handling.
The method has several security and reliability concerns that could lead to unauthorized data access and silent failures.
The previous review comment about adding tenant isolation and error handling is still valid. Apply this safer implementation:
- public async rawUpdate(id: string, data: IDbActivityUpdateData): Promise<void> {
+ public async updateWithoutParentSync(
+ id: string,
+ tenantId: string,
+ segmentId: string,
+ data: IDbActivityUpdateData
+ ): Promise<void> {
const prepared = RepositoryBase.prepare(
{ ...data, updatedAt: new Date() },
this.updateActivityColumnSet,
)
const query = this.dbInstance.helpers.update(prepared, this.updateActivityColumnSet)
- const condition = this.format('where id = $(id)', { id })
- await this.db().none(`${query} ${condition}`)
+ const condition = this.format(
+ 'where id = $(id) and "tenantId" = $(tenantId) and "segmentId" = $(segmentId)',
+ { id, tenantId, segmentId }
+ )
+ const result = await this.db().result(`${query} ${condition}`)
+ this.checkUpdateRowCount(result.rowCount, 1)
}
141-144
:
Critical: Add data integrity checks and tenant isolation.
The method lacks essential data integrity checks and security measures.
The previous review comment about adding data integrity checks and tenant isolation is still valid. Apply this safer implementation:
- public async rawInsert(data: IDbActivityCreateData): Promise<void> {
+ public async insertWithoutParentSync(
+ tenantId: string,
+ segmentId: string,
+ data: IDbActivityCreateData
+ ): Promise<string> {
+ const id = generateUUIDv1()
+ const ts = new Date()
const prepared = RepositoryBase.prepare(
- data,
+ {
+ ...data,
+ id,
+ tenantId,
+ segmentId,
+ createdAt: ts,
+ updatedAt: ts
+ },
this.insertActivityColumnSet,
)
const query = this.dbInstance.helpers.insert(prepared, this.insertActivityColumnSet)
await this.db().none(query)
+ return id
}
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (1)
5-6
: LGTM: Import additions are appropriate.
The new imports for formatQuery
and IDbActivityCreateData
are well-organized and support the refactored implementation.
services/apps/data_sink_worker/src/service/activity.service.ts (2)
4-10
: LGTM: Import statements are properly organized.
The new imports for generateUUIDv4
and queryActivities
are correctly added and well-organized.
Also applies to: 12-12
1085-1085
: Previous comment about ID generation logic is still valid.
The current ID generation logic could lead to collisions in high-concurrency scenarios.
Summary by CodeRabbit
Release Notes
New Features
Improvements
Bug Fixes
Chores