Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update activities in postgres periodically #2668

Merged
merged 11 commits into from
Nov 6, 2024

Conversation

sausage-todd
Copy link
Contributor

@sausage-todd sausage-todd commented Nov 4, 2024

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced a new script for synchronizing activity data between databases.
    • Added a new job for synchronizing activities to the job queue.
    • Enhanced activity data structure with additional timestamp fields.
    • New methods for checking existence and direct insertion of activities in the repository.
  • Improvements

    • Streamlined activity creation and update processes.
    • Improved querying capabilities for activities.
    • Enhanced logging functionality with detailed error handling.
    • Refined error handling in activity upserts for clearer reporting.
    • Updated database connections for moving activities to enhance efficiency.
  • Bug Fixes

    • Updated existing scripts for consistency and improved readability in SQL queries.
  • Chores

    • Added a new index to improve database performance for activity updates.
    • Modified formatting for existing scripts to ensure consistency.

Copy link

coderabbitai bot commented Nov 4, 2024

Walkthrough

This pull request introduces a new script for synchronizing activities, updates to the ActivityService class for improved data handling, and a new SQL index for the activities table. It modifies the data access layer to support streaming activities rather than direct updates, along with updates to various interfaces and repository methods to accommodate these changes. Overall, the changes aim to enhance the functionality and efficiency of activity data management within the application.

Changes

File Path Change Summary
backend/package.json Added script syncActivities and formatted fix-missing-org-displayName.
backend/src/bin/jobs/index.ts Imported syncActivitiesJob and added it to the jobs array.
backend/src/bin/jobs/syncActivities.ts Introduced functionality for synchronizing activity data with several asynchronous functions and a cron job.
backend/src/bin/scripts/syncActivities.ts Created a script that synchronizes activities based on a provided timestamp.
backend/src/database/migrations/V1730386050__activities-updated-at-index.sql Added SQL command to create an index on updatedAt column in activities table.
services/apps/data_sink_worker/src/service/activity.data.ts Added id property to IActivityCreateData interface.
services/apps/data_sink_worker/src/service/activity.service.ts Modified methods in ActivityService for streamlined activity creation and updates.
services/libs/data-access-layer/src/activities/update.ts Renamed updateActivities to streamActivities, added new updateActivities function.
services/libs/data-access-layer/src/conversations/sql.ts Improved SQL query formatting and readability.
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts Added updatedAt and createdAt properties to IDbActivityUpdateData interface.
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts Added methods existsWithId, rawUpdate, and rawInsert to ActivityRepository.
services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts Removed ActivityRepository class and its methods.
services/libs/logging/src/utility.ts Added logExecutionTimeV2 and updated timer function for better logging capabilities.

Possibly related PRs

Suggested labels

Improvement, Feature

Suggested reviewers

  • epipav
  • themarolt

Poem

🐇 In the fields where data flows,
A script now dances, syncs, and grows.
With every hop, activities align,
In the code, our efforts shine.
So let us cheer, for changes made,
In our tech garden, progress laid! 🌱


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🧹 Outside diff range and nitpick comments (11)
backend/src/bin/scripts/syncActivities.ts (1)

7-7: Consider removing unnecessary setImmediate.

The setImmediate wrapper isn't necessary here since we're already using an async function. The script can be simplified by directly using an async IIFE (Immediately Invoked Function Expression).

-setImmediate(async () => {
+(async () => {
services/libs/logging/src/utility.ts (2)

Line range hint 19-41: Consider refactoring to reduce code duplication and enhance error logging.

While the error handling addition is valuable, there's room for improvement in the implementation.

Consider these improvements:

 export const logExecutionTimeV2 = async <T>(
   process: () => Promise<T>,
   log: Logger,
   name: string,
 ): Promise<T> => {
-  const start = performance.now()
-
-  const end = () => {
-    const end = performance.now()
-    const duration = end - start
-    const durationInSeconds = duration / 1000
-    return durationInSeconds.toFixed(2)
-  }
+  const timerInstance = timer(log, name)
   try {
     const result = await process()
-    log.info(`Process ${name} took ${end()} seconds!`)
+    timerInstance.end()
     return result
   } catch (e) {
-    log.info(`Process ${name} failed after ${end()} seconds!`)
+    log.error(`Process ${name} failed:`, e)
+    timerInstance.end(`${name} (failed)`)
     throw e
   }
 }

This refactoring:

  1. Reuses the existing timer function to avoid timing logic duplication
  2. Adds error details to the log for better debugging
  3. Distinguishes failed process timing logs with a "(failed)" suffix

Line range hint 49-59: LGTM! Good improvements to prevent double-ending and support name overrides.

The implementation looks solid with:

  • Protection against multiple end() calls
  • Clean name override handling using nullish coalescing

Consider adding a debug log when skipping due to isEnded:

     end: function (overrideName?: string) {
       if (isEnded) {
+        log.debug(`Timer ${overrideName ?? name} already ended, skipping.`)
         return
       }
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (1)

131-132: Document the timestamp fields.

Consider adding JSDoc comments to explain:

  • The purpose of these timestamp fields
  • Whether they are automatically managed or need manual updates
  • Their relationship with the new periodic update functionality

Example documentation:

 export interface IDbActivityUpdateData {
   // ... existing fields ...
+  /** Timestamp of the last update to this activity record. Used for periodic synchronization. */
   updatedAt?: string
+  /** Timestamp when this activity was first created. */
   createdAt?: string
 }
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1)

191-210: Consider a safer pattern for bulk operations.

The addition of "raw" methods that bypass safety checks suggests a need for performance optimization, possibly for bulk operations. However, this approach introduces security and data integrity risks.

Consider these alternatives:

  1. Add a proper bulk update/insert method with all safety checks
  2. Use database transactions for better performance while maintaining safety
  3. If the goal is to avoid parent ID updates, rename methods to clearly indicate this (e.g., updateWithoutParentSync)
  4. Document performance requirements and constraints to help identify better solutions

Example bulk operation pattern:

public async bulkUpdate(
  tenantId: string,
  segmentId: string,
  updates: Array<{ id: string, data: IDbActivityUpdateData }>
): Promise<void> {
  return this.db().tx(async t => {
    const timestamp = new Date()
    const queries = updates.map(({ id, data }) => {
      const prepared = RepositoryBase.prepare(
        { ...data, updatedAt: timestamp },
        this.updateActivityColumnSet
      )
      const query = this.dbInstance.helpers.update(prepared, this.updateActivityColumnSet)
      const condition = this.format(
        'where id = $(id) and "tenantId" = $(tenantId) and "segmentId" = $(segmentId)',
        { id, tenantId, segmentId }
      )
      return t.result(`${query} ${condition}`)
    })
    const results = await t.batch(queries)
    results.forEach(r => this.checkUpdateRowCount(r.rowCount, 1))
  })
}
services/libs/data-access-layer/src/conversations/sql.ts (1)

394-397: Consider adding debug logging for skipped empty orderBy parts.

The addition of the empty orderBy part check is a good defensive programming practice. However, consider adding debug logging when skipping empty parts to help with troubleshooting.

    if (orderByPart.trim().length === 0) {
+     console.debug(`Skipping empty orderBy part in queryConversations`)
      continue
    }
services/libs/data-access-layer/src/activities/update.ts (3)

Line range hint 13-33: Adjust the timer to accurately measure the streaming duration

The timer t is ended inside the loop after processing the first item, which only measures the time until the first activity is processed. To capture the total duration of the streaming process, consider moving t.end() after the stream has fully processed all activities.

Apply this diff to adjust the timer:

 const t = timer(logger, `query activities with ${whereClause}`)
 const res = await qdb.stream(qs, async (stream) => {
   for await (const item of stream) {
     const activity = item as unknown as IDbActivityCreateData
     await onActivity(activity)
   }
 })
+t.end()

Line range hint 13-33: Add error handling during activity processing in the stream

If an error occurs within onActivity(activity), it could terminate the stream and leave it in an inconsistent state. Wrap the activity processing logic in a try-catch block to handle exceptions gracefully and ensure the stream continues or fails safely.

Apply this diff to include error handling:

 for await (const item of stream) {
   const activity = item as unknown as IDbActivityCreateData
+  try {
     await onActivity(activity)
+  } catch (error) {
+    logger.error({ error }, 'Error processing activity in stream')
+    // Decide whether to continue processing or rethrow the error based on your error handling strategy
+  }
 }

29-29: Control concurrency to prevent resource exhaustion

If onActivity involves heavy computation or I/O-bound operations, processing a large stream without limiting concurrency may overwhelm system resources. Consider implementing a concurrency limit using a library like p-limit to manage the number of simultaneous onActivity executions.

Example using p-limit:

 import QueryStream from 'pg-query-stream'
+import pLimit from 'p-limit'

 const limit = pLimit(10) // Limit to 10 concurrent operations

 const res = await qdb.stream(qs, async (stream) => {
   for await (const item of stream) {
     const activity = item as unknown as IDbActivityCreateData
+    limit(() => onActivity(activity)).catch((error) => {
+      logger.error({ error }, 'Error processing activity in stream')
+      // Handle error accordingly
+    })
   }
 })
backend/src/bin/jobs/syncActivities.ts (2)

94-94: Avoid variable shadowing for the timer variable

The variable t is declared both outside the loop (line 94) and inside the loop (line 118). This could lead to confusion or errors. Consider renaming the inner t variable to prevent shadowing.

Apply this change:

...
...

Also applies to: 118-118


96-97: Refactor loop to avoid disabling ESLint rules

The comment // eslint-disable-next-line no-constant-condition suppresses an ESLint warning for the while (true) loop. Refactoring the loop enhances code quality and adheres to ESLint guidelines.

Apply this change to use a condition in the loop:

...
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between f37eb77 and ef5f41e.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (13)
  • backend/package.json (1 hunks)
  • backend/src/bin/jobs/index.ts (2 hunks)
  • backend/src/bin/jobs/syncActivities.ts (1 hunks)
  • backend/src/bin/scripts/syncActivities.ts (1 hunks)
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.data.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.service.ts (4 hunks)
  • services/libs/data-access-layer/src/activities/update.ts (2 hunks)
  • services/libs/data-access-layer/src/conversations/sql.ts (6 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts (0 hunks)
  • services/libs/logging/src/utility.ts (2 hunks)
💤 Files with no reviewable changes (1)
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
✅ Files skipped from review due to trivial changes (1)
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
🔇 Additional comments (13)
backend/src/bin/scripts/syncActivities.ts (2)

1-5: LGTM! Clean imports and proper logger setup.

The imports are well-organized and the logger setup follows best practices by using a child logger with appropriate context.


3-3: Verify syncActivities function signature and batch processing.

Let's verify that the syncActivities function signature matches our usage and supports batch processing as mentioned in the summary.

Also applies to: 15-15

✅ Verification successful

Function signature and batch processing implementation verified

The verification confirms:

  • The syncActivities function accepts the correct parameters: logger and optional maxUpdatedAt
  • Batch processing is properly implemented through syncActivitiesBatch function
  • The function includes progress tracking and logging functionality
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify syncActivities function implementation
# Expected: Function should accept logger and updatedAt parameters and implement batch processing

# Check function signature and implementation
ast-grep --pattern 'export function syncActivities($_logger, $_updatedAt) {
  $$$
}'

# Look for batch processing implementation
rg -A 10 'syncActivities' backend/src/bin/jobs/syncActivities.ts

Length of output: 1280

backend/src/bin/jobs/index.ts (2)

11-11: LGTM!

The import statement follows the established pattern and naming conventions.


22-22: Verify job execution order requirements.

The addition of syncActivitiesJob to the jobs array looks good. However, let's verify if the job execution order matters for the application's functionality.

✅ Verification successful

Let me gather more information about the job implementation and its dependencies.


Let me check the implementation of syncActivitiesJob to understand its dependencies and requirements.


Let me check the implementation of the syncActivities job file directly.


No job ordering dependencies found for syncActivitiesJob

The syncActivitiesJob is an independent daily job that synchronizes activities by reading from QuestDB and writing to PostgreSQL. The implementation shows:

  • No dependencies on other jobs in the array
  • Self-contained functionality that doesn't rely on the execution of other jobs
  • Uses its own timestamp tracking for incremental updates
  • Has no comments or code indicating required ordering

The job can safely run in any position within the jobs array.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any job ordering dependencies or documentation

# Test 1: Look for job ordering documentation in README files
echo "Checking for job ordering documentation..."
fd -t f "README" | xargs rg -i "job.*order|execution.*order"

# Test 2: Check if syncActivities job has any dependencies on other jobs
echo "Checking for job dependencies..."
rg -A 5 "syncActivities.*depend" 
rg -A 5 "depends.*syncActivities"

# Test 3: Check if the job is referenced in any configuration files
echo "Checking for job configuration..."
fd -e json -e yaml -e yml | xargs rg "syncActivities"

Length of output: 565


Script:

#!/bin/bash
# Check the implementation of syncActivitiesJob and other jobs in the array

# Look for the job implementation
echo "Checking syncActivitiesJob implementation..."
rg -A 10 "syncActivitiesJob" 

# Check the jobs array and its context
echo "Checking jobs array context..."
rg -A 10 -B 10 "const jobs = \[" 

# Look for any job scheduling or sequencing logic
echo "Checking for job scheduling logic..."
rg -A 5 "schedule|sequence|order.*job"

Length of output: 31860


Script:

#!/bin/bash
# Look for syncActivities job implementation and its dependencies

# Check the implementation file
echo "Checking syncActivities job implementation..."
rg -A 10 "export default.*syncActivities" backend/src/bin/jobs/

# Look for any dependencies or imports in the job file
echo "Checking job dependencies..."
rg -B 5 "import.*syncActivities" backend/src/bin/jobs/

# Check if there are any comments about ordering
echo "Checking for ordering comments..."
rg -B 5 -A 5 "syncActivities.*job" backend/src/bin/jobs/

Length of output: 951


Script:

#!/bin/bash
# Check the syncActivities job implementation

# Look for the job file content
echo "Checking syncActivities job file..."
cat backend/src/bin/jobs/syncActivities.ts

# Look for any dependencies between jobs in the implementation
echo "Checking for job dependencies in implementation..."
rg -l "integrationTicks|refreshMaterializedViews|cleanUp|checkStuckIntegrationRuns|refreshGroupsioToken|refreshGitlabToken|refreshGithubRepoSettings|autoImportGroups" backend/src/bin/jobs/syncActivities.ts

# Check if there are any comments about job ordering
echo "Checking for ordering comments in job file..."
rg -i "order|sequence|after|before|depend" backend/src/bin/jobs/syncActivities.ts

Length of output: 4735

services/apps/data_sink_worker/src/service/activity.data.ts (2)

4-4: ⚠️ Potential issue

Breaking Change: Required id field added to IActivityCreateData

Adding a required id field is a breaking change that will affect all code creating activities. We need to ensure all consumers are updated.

Let's verify the impact:


4-4: Clarify ID generation strategy

Please provide details about:

  1. Who is responsible for generating these IDs?
  2. How is uniqueness guaranteed across different sources?
  3. What is the ID format/standard being used?

Let's check for existing ID generation patterns:

✅ Verification successful

Based on the search results, I can see extensive usage of UUIDs throughout the codebase. Let me search for more specific patterns around ID generation in the data sink worker context:


Let me check one more thing to understand how activities are created in this context:


Based on the code analysis, I can now provide a clear response about the ID generation strategy:

ID Generation Strategy for Activities

The ID field in the activity data sink worker is managed through the following process:

  1. IDs are generated and managed by the data sink worker service when processing activities
  2. The codebase uses UUID format, specifically leveraging the common library's UUID utilities
  3. The ID generation is handled upstream before the activity reaches the data sink worker, as evidenced by the activity.data.ts interface which only declares the ID field without generation logic

Key points:

  • The data sink worker expects activities to already have IDs when they arrive
  • The worker validates and processes these IDs but doesn't generate them
  • The ID format follows the UUID standard used throughout the codebase
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Look for ID generation patterns in the codebase
rg -A 3 'generateId|uuid|nanoid'

Length of output: 165663


Script:

#!/bin/bash
# Look for ID generation patterns specifically in data sink worker
rg -A 5 "id.*:" services/apps/data_sink_worker/

Length of output: 22076


Script:

#!/bin/bash
# Look for activity creation patterns in the data sink worker
rg -A 10 "new activity|create activity|processActivity" services/apps/data_sink_worker/

Length of output: 9208

services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (2)

131-132: Verify the handling of createdAt in column set.

While both updatedAt and createdAt are added to the interface, only updatedAt is included in the updateActivityColumnSet. This asymmetry might be intentional (as createdAt typically shouldn't change during updates), but it's worth confirming.

Consider adding a comment explaining why createdAt is part of the update interface but not included in the update column set.


Line range hint 1-180: Clarify the status of this code.

This file is located in an old directory, which typically indicates deprecated or legacy code. Given that this PR introduces new functionality, we should verify if this is the correct location for these changes.

✅ Verification successful

Let me check the content of both files to compare them.


Let me check which file is being actively used by searching for imports of these files.


Changes should be made to the file in the old directory

The file in the old directory is still actively used by multiple services and components as shown by the import references. The newer file at services/apps/data_sink_worker/src/service/activity.data.ts appears to be a simplified interface version but is not being imported anywhere. Both files were modified in the same commits, indicating they are meant to coexist rather than one replacing the other.

  • The old file is imported by:
    • services/apps/data_sink_worker/src/service/activity.service.ts
    • services/libs/data-access-layer/src/activities/update.ts
    • services/libs/data-access-layer/src/activities/sql.ts
    • services/libs/data-access-layer/src/activities/ilp.ts
    • backend/src/bin/jobs/syncActivities.ts
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there's a newer version of this file outside the 'old' directory
fd -t f "activity.data.ts" | grep -v "old"

# Check for any migration plans or deprecation notices
rg -i "deprecat|migrat|old|legacy" "$(dirname "services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts")"

Length of output: 891


Script:

#!/bin/bash
# Compare the content of both files
echo "=== Content of the file in 'old' directory ==="
cat "services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts"
echo -e "\n=== Content of the file outside 'old' directory ==="
cat "services/apps/data_sink_worker/src/service/activity.data.ts"

Length of output: 4989


Script:

#!/bin/bash
# Search for imports of both files
echo "=== References to old file ==="
rg -l "from.*old.*activity\.data" 

echo -e "\n=== References to new file ==="
rg -l "from.*service/activity\.data"

# Check git history of both files
echo -e "\n=== Git history of old file ==="
git log --oneline "services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts"

echo -e "\n=== Git history of new file ==="
git log --oneline "services/apps/data_sink_worker/src/service/activity.data.ts"

Length of output: 1153

backend/package.json (1)

44-44: LGTM! Consistent formatting.

The script formatting now aligns with the project's conventions.

services/libs/data-access-layer/src/conversations/sql.ts (3)

49-52: LGTM! Formatting improvements enhance readability.

The indentation and line breaks have been adjusted to improve code readability while maintaining the existing logic.


342-347: LGTM! Formatting improvements enhance readability.

The SQL query formatting has been standardized with proper indentation and line breaks while preserving the existing logic.


477-477: LGTM! SQL query formatting improvements enhance readability.

The SQL queries have been reformatted with proper indentation, line breaks, and column alignment, making them more readable and maintainable while preserving the existing functionality.

Also applies to: 479-479, 503-503, 505-505, 527-527

services/libs/data-access-layer/src/activities/update.ts (1)

Line range hint 13-33: Verify that all activities meet expected criteria during streaming

Ensure that the activities retrieved by the WHERE clause truly represent the intended dataset. There might be edge cases where activities don't meet the expected criteria, leading to potential issues in onActivity.

Run the following script to check the count of activities being streamed:

backend/src/bin/scripts/syncActivities.ts Show resolved Hide resolved
backend/src/bin/scripts/syncActivities.ts Show resolved Hide resolved
backend/src/bin/scripts/syncActivities.ts Show resolved Hide resolved
services/libs/logging/src/utility.ts Show resolved Hide resolved
services/libs/data-access-layer/src/activities/update.ts Outdated Show resolved Hide resolved
backend/src/bin/jobs/syncActivities.ts Show resolved Hide resolved
backend/src/bin/jobs/syncActivities.ts Show resolved Hide resolved
backend/src/bin/jobs/syncActivities.ts Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (6)
services/libs/data-access-layer/src/activities/update.ts (2)

Line range hint 13-33: Fix timer usage and add error handling

The current implementation has several issues that need attention:

  1. The timer.end() is called on the first iteration, which is incorrect
  2. Missing error handling around stream processing
  3. No control over batch size for memory management

Here's the suggested fix:

 export async function streamActivities(
   qdb: DbConnOrTx,
   onActivity: (activity: IDbActivityCreateData) => Promise<void>,
   where: string,
   params?: Record<string, string>,
 ): Promise<{ processed: number; duration: number }> {
   const whereClause = formatQuery(where, params)
-  const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`)
+  const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`, [], {
+    batchSize: 1000, // Control memory usage
+    highWaterMark: 1000
+  })

   const t = timer(logger, `query activities with ${whereClause}`)
-  const res = await qdb.stream(qs, async (stream) => {
-    for await (const item of stream) {
-      t.end()
-
-      const activity = item as unknown as IDbActivityCreateData
-      await onActivity(activity)
+  try {
+    const res = await qdb.stream(qs, async (stream) => {
+      try {
+        for await (const item of stream) {
+          const activity = item as unknown as IDbActivityCreateData
+          await onActivity(activity)
+        }
+      } catch (error) {
+        logger.error('Error processing activity stream', error)
+        throw error
+      }
+    })
+    t.end()
+    return res
+  } catch (error) {
+    t.end()
+    logger.error('Error in streamActivities', error)
+    throw error
+  }
-  })
-  return res
 }

41-56: Consider implementing retry logic

The function should handle temporary database failures gracefully, especially during long-running updates.

Consider adding retry logic:

 return streamActivities(
   qdb,
   async (activity) => {
+    const maxRetries = 3;
+    let retries = 0;
+    while (true) {
+      try {
         await insertActivities(
           [
             {
               ...activity,
               ...(await mapActivity(activity)),
             },
           ],
           true,
         )
+        break;
+      } catch (error) {
+        if (retries >= maxRetries) throw error;
+        retries++;
+        await new Promise(resolve => setTimeout(resolve, 1000 * retries));
+      }
+    }
   },
   where,
   params,
 )
services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (2)

Line range hint 146-154: Consider adding error handling and batching for better robustness.

The current implementation could be improved in several ways:

  1. Add error handling around insertActivities to prevent silent failures
  2. Consider batching updates for better performance
  3. Add logging for better observability

Here's a suggested improvement:

  async function insertIfMatches(activity: IDbActivityCreateData) {
+   const activities: IDbActivityCreateData[] = [];
    for (const condition of orgCases) {
      if (!condition.matches(activity)) {
        continue;
      }

      activity.organizationId = condition.orgId
-     await insertActivities([activity], true)
+     try {
+       await insertActivities([activity], true);
+       logger.debug(`Updated activity ${activity.id} with organization ${condition.orgId}`);
+     } catch (error) {
+       logger.error(`Failed to update activity ${activity.id}:`, error);
+       throw error; // Re-throw to handle it in the stream processing
+     }
      return;
    }
  }

Line range hint 169-177: Consider adding stream processing safeguards.

The current streaming implementation could benefit from additional safeguards:

  1. Add batch size controls to prevent memory issues with large result sets
  2. Implement timeout handling for long-running streams
  3. Add retry mechanisms for failed updates

Consider implementing these improvements:

  1. Use QueryStream's batchSize option
  2. Add timeout handling
  3. Implement backoff retry logic

Example implementation:

const BATCH_SIZE = 1000;
const STREAM_TIMEOUT = 30000; // 30 seconds

const qs = new QueryStream(
  formatQuery(query, { memberId }),
  [], // parameters
  { batchSize: BATCH_SIZE } // options
);

const streamPromise = qDb.stream(qs, async (stream) => {
  for await (const activity of stream) {
    await insertIfMatches(activity as unknown as IDbActivityCreateData)
  }
});

// Add timeout handling
const result = await Promise.race([
  streamPromise,
  new Promise((_, reject) => 
    setTimeout(() => reject(new Error('Stream timeout')), STREAM_TIMEOUT)
  )
]);
backend/src/services/activityService.ts (2)

Line range hint 4-324: Consider breaking down the upsert method

The upsert method is quite long and handles multiple responsibilities. Consider breaking it down into smaller, more focused methods for better maintainability and testability.

Suggested breakdown:

  • handleExistingActivity
  • handleNewActivity
  • handleConversationUpdate
  • handleWebhookTriggers

Line range hint 577-579: Address the GDPR prevention TODO

The current GDPR prevention implementation appears to be a temporary solution. This should be properly implemented to ensure compliance with data protection regulations.

Would you like me to help create a more robust GDPR compliance implementation or open an issue to track this task?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between ef5f41e and 2ffd114.

📒 Files selected for processing (3)
  • backend/src/services/activityService.ts (1 hunks)
  • services/libs/data-access-layer/src/activities/update.ts (2 hunks)
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1 hunks)
🔇 Additional comments (4)
services/libs/data-access-layer/src/activities/update.ts (1)

35-57: 🛠️ Refactor suggestion

Optimize performance with batched updates

While the function maintains backward compatibility, it could benefit from performance optimizations:

  1. Consider batching the inserts instead of individual updates
  2. Add debouncing to reduce database load

Here's a suggested optimization:

 export async function updateActivities(
   qdb: DbConnOrTx,
   mapActivity: (activity: IDbActivityCreateData) => Promise<Partial<IDbActivityCreateData>>,
   where: string,
   params?: Record<string, string>,
 ): Promise<{ processed: number; duration: number }> {
+  const batchSize = 100;
+  let batch: IDbActivityCreateData[] = [];
+
   return streamActivities(
     qdb,
     async (activity) => {
-      await insertActivities(
-        [
-          {
-            ...activity,
-            ...(await mapActivity(activity)),
-          },
-        ],
-        true,
-      )
+      batch.push({
+        ...activity,
+        ...(await mapActivity(activity)),
+      });
+
+      if (batch.length >= batchSize) {
+        await insertActivities(batch, true);
+        batch = [];
+      }
     },
     where,
     params,
-  )
+  ).finally(async () => {
+    if (batch.length > 0) {
+      await insertActivities(batch, true);
+    }
+  })
 }

Let's verify the potential impact of this change:

services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1)

152-152: Verify the impact of enabling immediate activity updates.

The change from false to true in insertActivities aligns with the PR's objective of updating activities periodically. However, this might affect performance as it processes activities one by one.

Let's verify the potential performance impact:

✅ Verification successful

The change to enable immediate activity updates is safe and consistent with existing patterns

Based on the codebase analysis, all other usages of insertActivities are already using true as the second parameter, indicating that immediate updates are the standard pattern:

  • backend/src/services/activityService.ts: Uses true
  • services/libs/data-access-layer/src/activities/update.ts: Uses true
  • services/apps/data_sink_worker/src/service/activity.service.ts: Uses default behavior (true)
  • services/apps/activities_worker/src/activities/createConversations.ts: Uses true

The change in profiles_worker/index.ts actually brings this instance in line with the rest of the codebase's behavior, making it more consistent.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other usages of insertActivities to understand the performance implications
ast-grep --pattern 'insertActivities($_, false)' 

Length of output: 50


Script:

#!/bin/bash
# Let's try a broader search for insertActivities usage
ast-grep --pattern 'insertActivities($$$)'

# Also search for the function definition
ast-grep --pattern 'function insertActivities($$$)'

# Backup with ripgrep to catch any other references
rg "insertActivities" -A 2

Length of output: 16015

backend/src/services/activityService.ts (2)

Line range hint 312-321: LGTM: Enhanced error logging for better debugging

The improved error handling with detailed SQL query logging will help in debugging Sequelize-related issues. The separation between Sequelize and other errors is well-structured.


184-184: Verify the impact of enabling streaming in insertActivities

The change from false to true in the insertActivities call suggests a switch to streaming mode. While this aligns with the PR's objective to update activities periodically, we should ensure this doesn't impact data consistency.

✅ Verification successful

Setting streaming parameter to true is safe and consistent with other usages

Based on the implementation in data-access-layer/src/activities/ilp.ts, the update parameter (second argument) only affects the updatedAt timestamp calculation. When set to true, it ensures the updatedAt field is always set to the current timestamp. This is the desired behavior for activity updates, and the change is consistent with other usages in the codebase:

  • The same pattern (true as second argument) is used in:
    • activities_worker/createConversations.ts
    • profiles_worker/index.ts
    • Other activity update scenarios

The implementation uses QuestDB's ILP protocol for data insertion, which is designed for high-performance time-series data ingestion, making it suitable for streaming updates.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other insertActivities calls to ensure consistent usage
rg "insertActivities\(" -A 2

# Look for any error handling or rollback mechanisms around insertActivities
rg "insertActivities.*catch|rollback.*insertActivities" -A 5

Length of output: 2281


Script:

#!/bin/bash
# Let's check the implementation of insertActivities to understand the streaming parameter
ast-grep --pattern 'function insertActivities($_) {
  $$$
}'

# Let's also check for any error handling patterns around these calls
rg "insertActivities.*\{" -B 5 -A 10

Length of output: 1295


Script:

#!/bin/bash
# Let's check the implementation in the data-access-layer since previous results showed it's defined there
cat services/libs/data-access-layer/src/activities/ilp.ts

# Let's also check for any transaction handling around the insertActivities calls
rg "transaction.*insertActivities|insertActivities.*transaction" -B 5 -A 5

Length of output: 6542

@sausage-todd sausage-todd force-pushed the update-activities-in-postgres branch from 2ffd114 to a1e4206 Compare November 4, 2024 09:31
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (6)
services/libs/logging/src/utility.ts (2)

Line range hint 22-41: Improve error handling and logging consistency.

The error handling could be enhanced in the following ways:

  1. Use error level for failure logs instead of info
  2. Include error details in the failure log

Consider this improvement:

   try {
     const result = await process()
     log.info(`Process ${name} took ${end()} seconds!`)
     return result
   } catch (e) {
-    log.info(`Process ${name} failed after ${end()} seconds!`)
+    log.error(`Process ${name} failed after ${end()} seconds:`, e)
     throw e
   }

58-58: Add null check for logging message.

The nullish coalescing operator could fail if both overrideName and name are undefined, leading to "Process undefined took X seconds" messages.

Consider adding a default value:

-      log.info(`Process ${overrideName ?? name} took ${durationInSeconds.toFixed(2)} seconds!`)
+      log.info(`Process ${overrideName ?? name ?? 'unnamed'} took ${durationInSeconds.toFixed(2)} seconds!`)
services/libs/data-access-layer/src/activities/update.ts (1)

Line range hint 13-33: Add pagination and error handling to prevent memory issues

The streaming implementation has several potential issues that should be addressed:

  1. The query lacks a LIMIT clause, which could lead to memory pressure with large result sets
  2. The timer.end() is called after the first item instead of after all processing
  3. Error handling for stream processing is missing

Consider implementing this improved version:

 export async function streamActivities(
   qdb: DbConnOrTx,
   onActivity: (activity: IDbActivityCreateData) => Promise<void>,
   where: string,
   params?: Record<string, string>,
+  batchSize: number = 1000
 ): Promise<{ processed: number; duration: number }> {
   const whereClause = formatQuery(where, params)
-  const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`)
+  const qs = new QueryStream(
+    `SELECT * FROM activities WHERE ${whereClause} LIMIT $1`,
+    [batchSize]
+  )

   const t = timer(logger, `query activities with ${whereClause}`)
   const res = await qdb.stream(qs, async (stream) => {
+    try {
       for await (const item of stream) {
-        t.end()
         const activity = item as unknown as IDbActivityCreateData
         await onActivity(activity)
       }
+    } catch (error) {
+      logger.error('Error processing activity stream', error)
+      throw error
+    } finally {
+      t.end()
+    }
   })
   return res
 }
services/apps/data_sink_worker/src/service/activity.service.ts (3)

647-661: Consider extracting query filters as constants.

The query structure is well-organized, but the filter conditions could be more maintainable if extracted into named constants.

+const ACTIVITY_FILTERS = {
+  sourceId: (value: string) => ({ sourceId: { eq: value } }),
+  platform: (value: PlatformType) => ({ platform: { eq: value } }),
+  type: (value: string) => ({ type: { eq: value } }),
+  channel: (value: string) => ({ channel: { eq: value } }),
+};

 const {
   rows: [dbActivity],
 } = await queryActivities(this.qdbStore.connection(), {
   tenantId,
   segmentIds: [segmentId],
   filter: {
     and: [
-      { sourceId: { eq: activity.sourceId } },
-      { platform: { eq: platform } },
-      { type: { eq: activity.type } },
-      { channel: { eq: activity.channel } },
+      ACTIVITY_FILTERS.sourceId(activity.sourceId),
+      ACTIVITY_FILTERS.platform(platform),
+      ACTIVITY_FILTERS.type(activity.type),
+      ACTIVITY_FILTERS.channel(activity.channel),
     ],
   },
   limit: 1,
 })

Line range hint 647-1120: Consider breaking down the large transaction block.

The transaction block in processActivity has high cyclomatic complexity with multiple nested conditions. Consider extracting logical segments into separate private methods for better maintainability.

Example structure:

private async processExistingActivity(/*params*/) {
  // Handle existing activity logic
}

private async processNewActivity(/*params*/) {
  // Handle new activity creation
}

private async processMemberUpdates(/*params*/) {
  // Handle member updates
}
🧰 Tools
🪛 Biome

[error] 663-663: Change to an optional chain.

Unsafe fix: Change to an optional chain.

(lint/complexity/useOptionalChain)


Line range hint 873-875: Address or track the TODO comment.

The TODO comment about handling missing object members should be properly tracked to ensure it's not overlooked.

Would you like me to:

  1. Create a GitHub issue to track this TODO?
  2. Propose a solution for handling missing object members?
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 2ffd114 and a1e4206.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (15)
  • backend/package.json (1 hunks)
  • backend/src/bin/jobs/index.ts (2 hunks)
  • backend/src/bin/jobs/syncActivities.ts (1 hunks)
  • backend/src/bin/scripts/syncActivities.ts (1 hunks)
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql (1 hunks)
  • backend/src/services/activityService.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.data.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.service.ts (4 hunks)
  • services/libs/data-access-layer/src/activities/update.ts (2 hunks)
  • services/libs/data-access-layer/src/conversations/sql.ts (6 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts (0 hunks)
  • services/libs/logging/src/utility.ts (2 hunks)
💤 Files with no reviewable changes (1)
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
🚧 Files skipped from review as they are similar to previous changes (10)
  • backend/package.json
  • backend/src/bin/jobs/index.ts
  • backend/src/bin/scripts/syncActivities.ts
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
  • backend/src/services/activityService.ts
  • services/apps/data_sink_worker/src/service/activity.data.ts
  • services/libs/data-access-layer/src/conversations/sql.ts
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
🔇 Additional comments (9)
services/libs/logging/src/utility.ts (1)

Line range hint 50-53: LGTM! Good use of state management.

The implementation properly prevents double-ending with the isEnded flag and uses early return pattern effectively.

services/libs/data-access-layer/src/activities/update.ts (1)

35-57: ⚠️ Potential issue

Improve data consistency and performance

The current implementation has several concerns:

  1. The force flag is always set to true, which could lead to data inconsistencies
  2. Inserts are not batched, potentially impacting performance
  3. The previous review comment about transactional consistency is still valid

Consider these improvements:

 export async function updateActivities(
   qdb: DbConnOrTx,
   mapActivity: (activity: IDbActivityCreateData) => Promise<Partial<IDbActivityCreateData>>,
   where: string,
   params?: Record<string, string>,
 ): Promise<{ processed: number; duration: number }> {
-  return streamActivities(
-    qdb,
-    async (activity) => {
-      await insertActivities(
-        [
-          {
-            ...activity,
-            ...(await mapActivity(activity)),
-          },
-        ],
-        true,
-      )
-    },
-    where,
-    params,
-  )
+  const batch: IDbActivityCreateData[] = [];
+  const BATCH_SIZE = 100;
+
+  return qdb.transaction(async (tx) => {
+    return streamActivities(
+      tx,
+      async (activity) => {
+        batch.push({
+          ...activity,
+          ...(await mapActivity(activity)),
+        });
+
+        if (batch.length >= BATCH_SIZE) {
+          await insertActivities(batch, false, tx);
+          batch.length = 0;
+        }
+      },
+      where,
+      params,
+    ).finally(async () => {
+      if (batch.length > 0) {
+        await insertActivities(batch, false, tx);
+      }
+    });
+  });
 }

Let's verify the usage of force flag in insertActivities:

#!/bin/bash
# Check how insertActivities is used across the codebase
rg -A 5 "insertActivities\(" 
backend/src/bin/jobs/syncActivities.ts (6)

1-13: LGTM! Imports are well-organized and necessary.

The imports cover all required functionality for database access, logging, and job scheduling.


15-22: Handle NULL case in decideUpdatedAt function.

If there are no activities, SELECT MAX("updatedAt") will return NULL, resulting in undefined. This could cause issues in subsequent queries.


24-33: LGTM! Well-implemented helper functions.

Both getTotalActivities and createWhereClause functions are implemented correctly with proper SQL injection prevention.


35-60: Consider batch processing optimization.

Processing activities one by one can be inefficient. Consider implementing bulk operations as suggested in the previous review.


62-130: Add database connection cleanup.

Database connections should be properly closed after use.


132-141: LGTM! Well-structured job definition.

The job is properly configured with appropriate scheduling for daily synchronization.

services/apps/data_sink_worker/src/service/activity.service.ts (1)

1084-1084: Verify UUID collision handling.

While generateUUIDv4() provides a reliable way to generate unique IDs, there should be a mechanism to handle the rare case of UUID collisions.

backend/src/bin/jobs/syncActivities.ts Show resolved Hide resolved
@sausage-todd sausage-todd force-pushed the update-activities-in-postgres branch from a1e4206 to 53438fb Compare November 5, 2024 11:36
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (4)
services/libs/logging/src/utility.ts (1)

Line range hint 21-41: Add documentation to clarify usage of V2 vs original function.

While the new implementation adds valuable error handling, it's not clear when developers should use logExecutionTimeV2 versus the original logExecutionTime. Consider adding JSDoc comments to document the differences and use cases.

+/**
+ * Logs the execution time of an async process with enhanced error handling.
+ * Use this version when you need:
+ * - Specific failure logging
+ * - Guaranteed timing logs even on failure
+ * @param process The async process to execute and time
+ * @param log Logger instance
+ * @param name Process name for logging
+ * @returns The result of the process
+ * @throws Re-throws any error from the process after logging
+ */
 export const logExecutionTimeV2 = async <T>(

Additionally, consider consolidating both functions into a single implementation with an options parameter to control the behavior.

interface LogExecutionTimeOptions {
  failureLogging?: boolean;
}

export const logExecutionTime = async <T>(
  process: () => Promise<T>,
  log: Logger,
  name: string,
  options: LogExecutionTimeOptions = {}
): Promise<T> => {
  // Implementation combining both versions
}
backend/src/bin/jobs/syncActivities.ts (1)

1-141: Consider implementing resilience patterns.

The synchronization mechanism could benefit from the following architectural improvements:

  1. Implement retry logic for transient database errors
  2. Add checkpointing to resume from the last successful sync point
  3. Consider implementing a dead letter queue for failed records
  4. Add monitoring and alerting for sync failures or delays

These patterns would make the synchronization more robust and maintainable.

services/apps/data_sink_worker/src/service/activity.service.ts (2)

647-661: Add error handling for database connection issues.

While the new query structure is more robust, it should include explicit error handling for database connection issues.

          const {
            rows: [dbActivity],
-          } = await queryActivities(this.qdbStore.connection(), {
+          } = await queryActivities(this.qdbStore.connection(), {
+          }).catch((error) => {
+            this.log.error('Failed to query activities:', error);
+            throw new Error('Database query failed');
+          });

Line range hint 1084-1104: Simplify ID generation logic.

The current ID handling with nested access could be simplified to improve readability and prevent potential null reference issues.

-                id: dbActivity.id ?? generateUUIDv4(),
+                id: generateActivityId(dbActivity),

Consider adding a helper function:

function generateActivityId(activity?: { id?: string }): string {
  return activity?.id || generateUUIDv4();
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between a1e4206 and 53438fb.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (15)
  • backend/package.json (1 hunks)
  • backend/src/bin/jobs/index.ts (2 hunks)
  • backend/src/bin/jobs/syncActivities.ts (1 hunks)
  • backend/src/bin/scripts/syncActivities.ts (1 hunks)
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql (1 hunks)
  • backend/src/services/activityService.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.data.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.service.ts (4 hunks)
  • services/libs/data-access-layer/src/activities/update.ts (2 hunks)
  • services/libs/data-access-layer/src/conversations/sql.ts (6 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts (0 hunks)
  • services/libs/logging/src/utility.ts (2 hunks)
💤 Files with no reviewable changes (1)
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
🚧 Files skipped from review as they are similar to previous changes (11)
  • backend/package.json
  • backend/src/bin/jobs/index.ts
  • backend/src/bin/scripts/syncActivities.ts
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
  • backend/src/services/activityService.ts
  • services/apps/data_sink_worker/src/service/activity.data.ts
  • services/libs/data-access-layer/src/activities/update.ts
  • services/libs/data-access-layer/src/conversations/sql.ts
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
🔇 Additional comments (8)
services/libs/logging/src/utility.ts (1)

Line range hint 45-59: Reconsider optional name parameter to maintain logging consistency.

I agree with the previous review comment about the breaking change concerns. Making the name parameter optional could lead to unclear logs and inconsistent usage patterns.

Additionally, the new overrideName parameter adds complexity without clear benefits:

  1. It creates two ways to specify the name (constructor and end-time)
  2. The nullish coalescing in the log message (overrideName ?? name) makes it harder to track where the final name comes from

Consider this simpler approach:

-export const timer = (log: Logger, name?: string) => {
+export const timer = (log: Logger, name: string) => {
   const start = performance.now()
   let isEnded = false
   return {
-    end: function (overrideName?: string) {
+    end: function () {
       if (isEnded) {
         return
       }
       isEnded = true
 
       const end = performance.now()
       const duration = end - start
       const durationInSeconds = duration / 1000
-      log.info(`Process ${overrideName ?? name} took ${durationInSeconds.toFixed(2)} seconds!`)
+      log.info(`Process ${name} took ${durationInSeconds.toFixed(2)} seconds!`)
     },
   }
 }

If dynamic naming is needed, consider creating a new function with a clearer purpose rather than overloading the existing one.

backend/src/bin/jobs/syncActivities.ts (6)

1-13: LGTM! Dependencies are well-organized.

The imports cover all necessary functionality for database access, logging, and type definitions.


132-141: LGTM! Job definition is clean and well-structured.

The job is properly configured with a daily schedule using cron-time-generator.


15-22: ⚠️ Potential issue

Handle NULL case in decideUpdatedAt function.

When there are no activities, MAX("updatedAt") returns NULL, but the function doesn't handle this case. This could cause issues in subsequent queries.

Apply this diff to handle the NULL case:

 async function decideUpdatedAt(pgQx: QueryExecutor, maxUpdatedAt?: string): Promise<string> {
   if (!maxUpdatedAt) {
     const result = await pgQx.selectOne('SELECT MAX("updatedAt") AS "maxUpdatedAt" FROM activities')
-    return result?.maxUpdatedAt
+    return result?.maxUpdatedAt ?? new Date(0).toISOString()
   }

   return maxUpdatedAt
 }

35-60: 🛠️ Refactor suggestion

Optimize batch processing for better performance.

The current implementation processes activities one by one, resulting in N+1 database queries. This approach can be inefficient for large batches.

Consider implementing bulk operations as suggested in the previous review to reduce database calls.


65-72: ⚠️ Potential issue

Ensure proper cleanup of database connections.

Database connections should be properly closed to prevent resource leaks.

Implement proper cleanup using try-finally:

 export async function syncActivities(logger: Logger, maxUpdatedAt?: string) {
   logger.info(`Syncing activities from ${maxUpdatedAt}`)
-
-  const qdb = await getClientSQL()
-  const db = await getDbConnection({
-    host: DB_CONFIG.writeHost,
-    port: DB_CONFIG.port,
-    database: DB_CONFIG.database,
-    user: DB_CONFIG.username,
-    password: DB_CONFIG.password,
-  })
+  let qdb, db;
+  try {
+    qdb = await getClientSQL()
+    db = await getDbConnection({
+      host: DB_CONFIG.writeHost,
+      port: DB_CONFIG.port,
+      database: DB_CONFIG.database,
+      user: DB_CONFIG.username,
+      password: DB_CONFIG.password,
+    })
+    // ... rest of the function
+  } finally {
+    await db?.$pool.end()
+    await qdb?.$pool.end()
+  }

96-127: ⚠️ Potential issue

Add safety mechanisms to prevent infinite loops.

The while loop lacks important safety measures such as maximum iterations and timeout.

Implement the safety measures as suggested in the previous review.

services/apps/data_sink_worker/src/service/activity.service.ts (1)

4-12: LGTM! Clean import organization.

The new imports support the transition to UUID-based IDs and the new query interface.

@sausage-todd sausage-todd force-pushed the update-activities-in-postgres branch from 53438fb to 78f588b Compare November 6, 2024 06:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (2)
services/libs/logging/src/utility.ts (1)

Line range hint 21-42: Add documentation to explain version differences.

While the new implementation improves error handling, it's important to document:

  1. The differences between V1 and V2
  2. When to use each version
  3. Migration guidelines for existing code

Add JSDoc comments:

+/**
+ * Enhanced version of logExecutionTime with better error handling.
+ * Unlike V1, this version:
+ * - Logs different messages for success/failure cases
+ * - Preserves stack traces in error cases
+ * @param process - Async function to execute and time
+ * @param log - Logger instance
+ * @param name - Process name for logging
+ * @returns Result of the process
+ * @throws Original error from the process
+ */
 export const logExecutionTimeV2 = async <T>(
services/apps/data_sink_worker/src/service/activity.service.ts (1)

4-12: LGTM: Improved architecture with DAL abstraction

The shift from direct database operations to using data access layer functions (insertActivities, queryActivities) improves the separation of concerns and maintainability.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 53438fb and 78f588b.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (15)
  • backend/package.json (1 hunks)
  • backend/src/bin/jobs/index.ts (2 hunks)
  • backend/src/bin/jobs/syncActivities.ts (1 hunks)
  • backend/src/bin/scripts/syncActivities.ts (1 hunks)
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql (1 hunks)
  • backend/src/services/activityService.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.data.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.service.ts (4 hunks)
  • services/libs/data-access-layer/src/activities/update.ts (2 hunks)
  • services/libs/data-access-layer/src/conversations/sql.ts (6 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts (0 hunks)
  • services/libs/logging/src/utility.ts (2 hunks)
💤 Files with no reviewable changes (1)
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
🚧 Files skipped from review as they are similar to previous changes (12)
  • backend/package.json
  • backend/src/bin/jobs/index.ts
  • backend/src/bin/jobs/syncActivities.ts
  • backend/src/bin/scripts/syncActivities.ts
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
  • backend/src/services/activityService.ts
  • services/apps/data_sink_worker/src/service/activity.data.ts
  • services/libs/data-access-layer/src/activities/update.ts
  • services/libs/data-access-layer/src/conversations/sql.ts
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
🔇 Additional comments (2)
services/libs/logging/src/utility.ts (1)

Line range hint 45-60: Reconsider optional name parameter.

Making the name parameter optional could lead to inconsistent and unclear logs. The overrideName parameter adds complexity without clear benefits.

Consider reverting to required parameters:

-export const timer = (log: Logger, name?: string) => {
+export const timer = (log: Logger, name: string) => {
   const start = performance.now()
   let isEnded = false
   return {
-    end: function (overrideName?: string) {
+    end: function () {
       if (isEnded) {
         return
       }
       isEnded = true

       const end = performance.now()
       const duration = end - start
       const durationInSeconds = duration / 1000
-      log.info(`Process ${overrideName ?? name} took ${durationInSeconds.toFixed(2)} seconds!`)
+      log.info(`Process ${name} took ${durationInSeconds.toFixed(2)} seconds!`)
     },
   }
 }

If dynamic naming is needed, consider creating a separate function or documenting the specific use cases that require it.

services/apps/data_sink_worker/src/service/activity.service.ts (1)

647-661: Consider adding an index for frequent query patterns

The query uses multiple AND conditions on sourceId, platform, type, and channel. Consider adding a composite index to optimize this frequent query pattern.

@sausage-todd sausage-todd force-pushed the update-activities-in-postgres branch from 78f588b to 7bc9240 Compare November 6, 2024 10:42
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (4)
services/apps/entity_merging_worker/src/activities/members.ts (1)

Line range hint 63-74: Add error handling for QuestDB operations

The function should include specific error handling for QuestDB operations, as time-series databases may have different failure modes than traditional RDBMS.

Consider wrapping the database operation in a try-catch block:

  for (const identity of identities.filter(
    (i) =>
      i.type === MemberIdentityType.USERNAME &&
      identitiesWithActivity.some((ai) => ai.platform === i.platform && ai.username === i.value),
  )) {
+   try {
      await moveIdentityActivitiesToNewMember(
        svc.questdbSQL,
        tenantId,
        fromId,
        toId,
        identity.value,
        identity.platform,
      )
+   } catch (error) {
+     svc.log.error(
+       { error, fromId, toId, identity },
+       'Failed to move identity activities to new member in QuestDB'
+     );
+     throw error;
+   }
  }
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2)

5-6: Remove unused import IDbActivity

The IDbActivity type is imported but never used in this file. Consider removing it to keep the imports clean.

import { formatQuery } from '../../../queryExecutor'
-import { IDbActivity, IDbActivityCreateData } from '../data_sink_worker/repo/activity.data'
+import { IDbActivityCreateData } from '../data_sink_worker/repo/activity.data'
🧰 Tools
🪛 GitHub Check: lint-format-services

[warning] 6-6:
'IDbActivity' is defined but never used


121-149: LGTM! Well-structured refactor with improved type safety

The changes improve the code by:

  • Using DbConnOrTx for better transaction support
  • Leveraging type-safe callbacks with IDbActivityCreateData
  • Using formatQuery for safer query construction

Consider adding error handling

Since this function is crucial for member identity merging, consider adding error handling to catch and properly handle potential database errors.

export async function moveIdentityActivitiesToNewMember(
  db: DbConnOrTx,
  tenantId: string,
  fromId: string,
  toId: string,
  username: string,
  platform: string,
) {
+  try {
    await updateActivities(
      db,
      async (activity: IDbActivityCreateData) => ({ ...activity, memberId: toId }),
      formatQuery(
        `
          "memberId" = $(fromId)
          and "tenantId" = $(tenantId)
          and "username" = $(username)
          and "platform" = $(platform)
          and "deletedAt" is null
        `,
        {
          fromId,
          tenantId,
          username,
          platform,
        },
      ),
      {
        memberId: fromId,
      },
    )
+  } catch (error) {
+    throw new Error(`Failed to move activities for identity ${username} on ${platform}: ${error.message}`);
+  }
}
services/apps/data_sink_worker/src/service/activity.service.ts (1)

1085-1085: Simplify ID generation logic

In the create path, dbActivity should always be undefined, making the optional chaining unnecessary. Consider simplifying:

-                id: dbActivity?.id ?? generateUUIDv4(),
+                id: generateUUIDv4(),
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 78f588b and 7bc9240.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (18)
  • backend/package.json (1 hunks)
  • backend/src/bin/jobs/index.ts (2 hunks)
  • backend/src/bin/jobs/syncActivities.ts (1 hunks)
  • backend/src/bin/scripts/syncActivities.ts (1 hunks)
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql (1 hunks)
  • backend/src/services/activityService.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.data.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.service.ts (4 hunks)
  • services/apps/entity_merging_worker/src/activities/members.ts (1 hunks)
  • services/libs/data-access-layer/src/activities/sql.ts (0 hunks)
  • services/libs/data-access-layer/src/activities/update.ts (2 hunks)
  • services/libs/data-access-layer/src/conversations/sql.ts (6 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2 hunks)
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts (0 hunks)
  • services/libs/logging/src/utility.ts (2 hunks)
💤 Files with no reviewable changes (2)
  • services/libs/data-access-layer/src/activities/sql.ts
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
🚧 Files skipped from review as they are similar to previous changes (12)
  • backend/package.json
  • backend/src/bin/jobs/index.ts
  • backend/src/bin/jobs/syncActivities.ts
  • backend/src/bin/scripts/syncActivities.ts
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
  • backend/src/services/activityService.ts
  • services/apps/data_sink_worker/src/service/activity.data.ts
  • services/libs/data-access-layer/src/activities/update.ts
  • services/libs/data-access-layer/src/conversations/sql.ts
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
  • services/libs/logging/src/utility.ts
🧰 Additional context used
🪛 GitHub Check: lint-format-services
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts

[warning] 6-6:
'IDbActivity' is defined but never used

🔇 Additional comments (6)
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (4)

127-129: ⚠️ Potential issue

Critical: Add tenant isolation to prevent unauthorized access.

The previous review comment about adding tenant isolation to this method is still valid. The current implementation could allow cross-tenant data access.


132-140: ⚠️ Potential issue

Critical: Add tenant isolation and error handling.

The previous review comment about adding tenant isolation and proper error handling to this method is still valid. The current implementation has security and reliability concerns.


142-145: ⚠️ Potential issue

Critical: Add data integrity checks and tenant isolation.

The previous review comment about adding data integrity checks and tenant isolation to this method is still valid. The current implementation has several critical issues.


127-145: Verify safe usage in synchronization context.

While these "raw" methods might be intended for bulk synchronization operations, we should ensure they're used in a controlled context that maintains proper tenant isolation. Let's verify how these methods are used in the synchronization flow.

✅ Verification successful

Let me gather more specific information about the usage context of these raw methods.

Let's analyze the actual usage patterns and tenant handling in the data sink worker context.


Let me try another search strategy to understand the context better.

Let's analyze how these repository methods are integrated into the service layer.


Based on the search results, I can now generate the final response as we have clear evidence of the usage context.

Raw methods are safely used in controlled sync context

The raw methods are specifically used in the syncActivities job (backend/src/bin/jobs/syncActivities.ts) where they serve a legitimate bulk synchronization purpose. The job:

  • Uses a controlled transaction context
  • Handles activities sequentially with proper existence checks
  • Operates in a worker context, not exposed to direct tenant requests

The architectural pattern is sound as:

  • Raw methods are only called from dedicated sync jobs
  • The sync job uses proper database transaction handling via DbStore
  • The implementation follows the same pattern as other sync workers in the system
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage context of these raw methods
# Expected: These methods should only be called from trusted synchronization contexts with proper tenant isolation

# Search for direct calls to these methods
echo "Searching for raw method calls..."
rg -A 5 "rawUpdate|rawInsert|existsWithId" --type typescript

# Search for potential synchronization contexts
echo "Searching for sync-related files..."
fd -e ts -e js sync

Length of output: 2354


Script:

#!/bin/bash
# Search for raw method calls with better context using ast-grep
echo "Searching for method calls..."
ast-grep --pattern 'rawUpdate($$$)' --pattern 'rawInsert($$$)' --pattern 'existsWithId($$$)'

# Search specifically in data_sink_worker related files
echo "Searching in data sink worker files..."
fd -e ts data_sink_worker --exec rg -l "class.*ActivityRepo|rawUpdate|rawInsert|existsWithId" {}

# Look for tenant context in the data sink worker
echo "Searching for tenant context..."
fd -e ts data_sink_worker --exec rg -A 10 "tenant|TenantId|tenantId" {}

Length of output: 667


Script:

#!/bin/bash
# Search for ActivityRepo class and its usage
echo "Searching for ActivityRepo class definition and usage..."
rg -A 20 "class ActivityRepo" 

# Search for data sink worker service implementation
echo "Searching for data sink worker service..."
fd -e ts -e js sink_worker --exec rg -l "Activity|activity" {}

# Look for the service that uses these repository methods
echo "Searching for service layer..."
rg --type ts -A 10 "new ActivityRepo|activityRepo\." 

Length of output: 10414

services/apps/entity_merging_worker/src/activities/members.ts (1)

69-69: Verify consistency of database connections for activity operations

The change from postgres.writer to questdbSQL for activity operations needs careful consideration:

  1. Ensure this change aligns with other activity-related operations in the codebase
  2. Consider adding QuestDB-specific error handling as it may have different error patterns than PostgreSQL

Let's verify the consistency of database connections used for activity operations:

Consider documenting the rationale for using QuestDB for activity operations in the codebase, especially regarding:

  • Transaction handling strategy
  • Data consistency guarantees
  • Performance implications
✅ Verification successful

The previous results show that QuestDB is consistently used across different services for activity-related operations, but let's verify the specific implementation and configuration to ensure this change aligns with the architecture.


Based on the results from our verification scripts, I can now generate the final response:

Database change from Postgres to QuestDB is architecturally consistent

The switch from postgres.writer to questdbSQL for activity operations in moveIdentityActivitiesToNewMember is consistent with the codebase architecture:

  • QuestDB is used consistently across multiple services for activity-related operations (activities_worker, profiles_worker, entity_merging_worker)
  • Activity data operations are properly segregated between PostgreSQL (for transactional data) and QuestDB (for time-series activity data)
  • The data access layer supports both databases, with clear separation of concerns
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other activity-related operations to ensure consistent database usage
echo "Checking activity-related database operations..."

# Look for similar activity movement operations
rg -A 3 "moveActivities.*To.*Member" --type ts

# Check other QuestDB usage patterns
rg -A 3 "questdbSQL" --type ts

# Look for potential inconsistencies where postgres writer is still used for activities
rg -A 3 "postgres\.writer.*activit" --type ts

Length of output: 11776


Script:

#!/bin/bash
# Let's check the implementation of moveActivitiesToNewMember to understand the database operations
ast-grep --pattern 'export async function moveActivitiesToNewMember'

# Check for any activity-related functions in the data access layer
fd -e ts -e js -x grep -l "activities" {} \; | grep -i "data-access"

# Look for QuestDB configuration and setup
rg -A 5 "questdb.*config" --type ts

# Check for activity-related database operations in the codebase
rg -A 3 "UPDATE.*activities|INSERT.*activities|DELETE.*activities" --type ts

Length of output: 4870

services/apps/data_sink_worker/src/service/activity.service.ts (1)

647-662: Consider adding a composite index for activity queries

The query uses multiple AND conditions which could benefit from a composite index on (tenantId, segmentId, timestamp, sourceId, platform, type, channel) to improve query performance.

Let's verify if an appropriate index exists:

@sausage-todd sausage-todd force-pushed the update-activities-in-postgres branch from 7bc9240 to b5d440e Compare November 6, 2024 15:27
Misha Savelyev added 4 commits November 6, 2024 15:32
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (2)
services/libs/data-access-layer/src/activities/update.ts (1)

Line range hint 13-33: Add error handling and improve performance monitoring

The streaming implementation has several areas for improvement:

  1. Missing error handling for stream processing
  2. Timer ends prematurely after first item
  3. No batch size control for large datasets
  4. Unsafe type casting of stream items

Consider applying these improvements:

 export async function streamActivities(
   qdb: DbConnOrTx,
   onActivity: (activity: IDbActivityCreateData) => Promise<void>,
   where: string,
   params?: Record<string, string>,
 ): Promise<{ processed: number; duration: number }> {
   const whereClause = formatQuery(where, params)
-  const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`)
+  const qs = new QueryStream(
+    `SELECT * FROM activities WHERE ${whereClause}`,
+    [], 
+    { batchSize: 1000 }  // Control memory usage
+  )

   const t = timer(logger, `query activities with ${whereClause}`)
   const res = await qdb.stream(qs, async (stream) => {
+    try {
       for await (const item of stream) {
-        t.end()
         const activity = item as unknown as IDbActivityCreateData
+        if (!isValidActivity(activity)) {
+          logger.warn('Invalid activity data', { activity })
+          continue
+        }
         await onActivity(activity)
       }
+    } catch (error) {
+      logger.error('Error processing activity stream', { error })
+      throw error
+    } finally {
+      t.end()
+    }
   })
   return res
 }

+function isValidActivity(activity: unknown): activity is IDbActivityCreateData {
+  return activity !== null && typeof activity === 'object'
+  // Add more type checks based on IDbActivityCreateData structure
+}
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (1)

5-6: Remove unused import.

The IDbActivity type is imported but never used in this file.

import { formatQuery } from '../../../queryExecutor'
-import { IDbActivity, IDbActivityCreateData } from '../data_sink_worker/repo/activity.data'
+import { IDbActivityCreateData } from '../data_sink_worker/repo/activity.data'
🧰 Tools
🪛 GitHub Check: lint-format-services

[warning] 6-6:
'IDbActivity' is defined but never used

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 7bc9240 and b5d440e.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (18)
  • backend/package.json (1 hunks)
  • backend/src/bin/jobs/index.ts (2 hunks)
  • backend/src/bin/jobs/syncActivities.ts (1 hunks)
  • backend/src/bin/scripts/syncActivities.ts (1 hunks)
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql (1 hunks)
  • backend/src/services/activityService.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.data.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.service.ts (4 hunks)
  • services/apps/entity_merging_worker/src/activities/members.ts (1 hunks)
  • services/libs/data-access-layer/src/activities/sql.ts (0 hunks)
  • services/libs/data-access-layer/src/activities/update.ts (2 hunks)
  • services/libs/data-access-layer/src/conversations/sql.ts (6 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2 hunks)
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts (0 hunks)
  • services/libs/logging/src/utility.ts (2 hunks)
💤 Files with no reviewable changes (2)
  • services/libs/data-access-layer/src/activities/sql.ts
  • services/libs/data-access-layer/src/old/apps/search_sync_worker/activity.repo.ts
🚧 Files skipped from review as they are similar to previous changes (12)
  • backend/package.json
  • backend/src/bin/jobs/index.ts
  • backend/src/bin/jobs/syncActivities.ts
  • backend/src/bin/scripts/syncActivities.ts
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
  • backend/src/services/activityService.ts
  • services/apps/data_sink_worker/src/service/activity.data.ts
  • services/apps/entity_merging_worker/src/activities/members.ts
  • services/libs/data-access-layer/src/conversations/sql.ts
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
  • services/libs/logging/src/utility.ts
🧰 Additional context used
🪛 GitHub Check: lint-format-services
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts

[warning] 6-6:
'IDbActivity' is defined but never used

🔇 Additional comments (7)
services/libs/data-access-layer/src/activities/update.ts (1)

35-57: 🛠️ Refactor suggestion

Address transaction consistency and improve update batching

The current implementation has several concerns:

  1. Transaction consistency issue from previous review still applies
  2. Magic boolean parameter (true) at line 51 needs clarification
  3. Updates are performed one at a time, which could be inefficient

The transaction consistency issue raised in the previous review is still valid.

Consider batching updates for better performance:

 export async function updateActivities(
   qdb: DbConnOrTx,
   mapActivity: (activity: IDbActivityCreateData) => Promise<Partial<IDbActivityCreateData>>,
   where: string,
   params?: Record<string, string>,
 ): Promise<{ processed: number; duration: number }> {
+  const BATCH_SIZE = 100
+  let batch: IDbActivityCreateData[] = []
+
   return streamActivities(
     qdb,
     async (activity) => {
-      await insertActivities(
-        [
-          {
-            ...activity,
-            ...(await mapActivity(activity)),
-          },
-        ],
-        true,
-      )
+      batch.push({
+        ...activity,
+        ...(await mapActivity(activity)),
+      })
+
+      if (batch.length >= BATCH_SIZE) {
+        await insertActivities(batch, true) // TODO: Document the purpose of the boolean parameter
+        batch = []
+      }
     },
     where,
     params,
-  )
+  ).finally(async () => {
+    if (batch.length > 0) {
+      await insertActivities(batch, true)
+    }
+  })
 }

Let's verify the usage of insertActivities to understand the boolean parameter:

#!/bin/bash
# Search for other calls to insertActivities to understand the boolean parameter usage
rg "insertActivities\(" -A 2 -B 2
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (3)

127-129: Security concern: Missing tenant isolation.

The same security concern raised in the previous review still applies to this method.


132-140: Security and reliability concerns in update method.

The security and reliability concerns raised in the previous review still apply. Additionally, for better performance, consider using a prepared statement for the condition.

-    const condition = this.format('where id = $(id)', { id })
-    await this.db().none(`${query} ${condition}`)
+    await this.db().none(`${query} where id = $(id)`, { id })

142-145: Data integrity and security concerns in insert method.

The data integrity and security concerns raised in the previous review still apply. Additionally, let's verify the impact of missing required fields.

#!/bin/bash
# Description: Check if any code relies on auto-generated IDs or timestamps for activities

# Test 1: Check for direct references to activities.id
echo "Checking for direct references to activities.id..."
rg -A 3 'activities\.id' 

# Test 2: Check for timestamp-dependent queries
echo "Checking for timestamp-dependent queries..."
rg -A 3 '"createdAt"|"updatedAt".*activities'

# Test 3: Check for tenant/segment-dependent queries
echo "Checking for tenant/segment isolation..."
rg -A 3 '"tenantId"|"segmentId".*activities'
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2)

121-127: Improved type safety with DbConnOrTx.

Good change! Using DbConnOrTx instead of DbStore makes the function more flexible by accepting both connections and transactions.


128-149: Verify the impact of removing batch processing.

The removal of the batchSize parameter might impact performance when processing large datasets. Please verify that this change doesn't cause memory issues or timeouts.

Consider implementing cursor-based pagination if dealing with large datasets to prevent memory issues:

async function* streamActivitiesBatches(
  db: DbConnOrTx,
  query: string,
  params: any,
  batchSize: number = 1000
): AsyncGenerator<IDbActivityCreateData[]> {
  let lastId = '0';
  while (true) {
    const batch = await db.manyOrNone(
      `${query} AND id > $(lastId) ORDER BY id LIMIT $(batchSize)`,
      { ...params, lastId, batchSize }
    );
    if (batch.length === 0) break;
    yield batch;
    lastId = batch[batch.length - 1].id;
  }
}
✅ Verification successful

Based on the search results, I can see that batch processing is widely used throughout the codebase for various operations. Let me verify the specific context of the activity updates by searching for related patterns.


Let me try a different search pattern to find the activity update implementation.


Let me check the implementation of streamActivities since it's used by updateActivities.


Based on the code analysis, I can now provide a final response:

Removal of batch processing is safe - streaming is used instead

The updateActivities function uses PostgreSQL's native streaming capabilities through QueryStream to process activities. This is actually a more efficient approach than batch processing as it:

  1. Maintains constant memory usage regardless of dataset size
  2. Processes records as they arrive without loading everything into memory
  3. Uses database-level cursor internally for optimal performance
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for potential large datasets that might be affected

# Test 1: Check for large activity counts per member
echo "Checking for members with large activity counts:"
rg -A 5 "SELECT.*COUNT.*FROM.*activities.*GROUP.*BY.*member"

# Test 2: Look for existing batch processing patterns
echo "Checking for other batch processing patterns in activities:"
rg -A 5 "LIMIT.*OFFSET|batchSize|chunk"

Length of output: 55125


Script:

#!/bin/bash
# Search for activity update implementations and batch processing
echo "Searching for activity update implementations:"
ast-grep --pattern 'async function updateActivities'

echo -e "\nSearching for activity batch processing:"
rg -A 5 'updateActivities.*batch|batchSize.*updateActivities'

Length of output: 684


Script:

#!/bin/bash
# Search for the activity update implementation in the data access layer
echo "Searching for activity update implementation:"
rg -A 10 "export.*async.*function.*updateActivities" services/libs/data-access-layer/

echo -e "\nSearching for activity update usage patterns:"
rg -A 5 "await updateActivities" services/libs/data-access-layer/

Length of output: 3243


Script:

#!/bin/bash
# Search for streamActivities implementation
echo "Searching for streamActivities implementation:"
rg -A 15 "export.*async.*function.*streamActivities" services/libs/data-access-layer/

echo -e "\nSearching for activity batch processing in streamActivities:"
rg -A 10 "streamActivities.*batch|batchSize.*streamActivities" services/libs/data-access-layer/

Length of output: 1936

services/apps/data_sink_worker/src/service/activity.service.ts (1)

4-10: LGTM: Import changes are well-organized

The new imports, including generateUUIDv4, are properly organized and necessary for the updated functionality.

@sausage-todd sausage-todd force-pushed the update-activities-in-postgres branch from b5d440e to fa56c49 Compare November 6, 2024 15:41
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (3)
services/libs/data-access-layer/src/activities/update.ts (1)

Line range hint 13-33: Add error handling and optimize stream processing.

The stream processing implementation has several areas for improvement:

  1. Missing error handling around the stream processing
  2. Timer.end() is incorrectly placed inside the loop
  3. SQL injection vulnerability in whereClause construction

Apply these improvements:

 export async function streamActivities(
   qdb: DbConnOrTx,
   onActivity: (activity: IDbActivityCreateData) => Promise<void>,
   where: string,
   params?: Record<string, string>,
 ): Promise<{ processed: number; duration: number }> {
   const whereClause = formatQuery(where, params)
   const qs = new QueryStream(`SELECT * FROM activities WHERE ${whereClause}`)
 
   const t = timer(logger, `query activities with ${whereClause}`)
-  const res = await qdb.stream(qs, async (stream) => {
-    for await (const item of stream) {
-      t.end()
-      const activity = item as unknown as IDbActivityCreateData
-      await onActivity(activity)
-    }
-  })
+  try {
+    const res = await qdb.stream(qs, async (stream) => {
+      try {
+        for await (const item of stream) {
+          const activity = item as unknown as IDbActivityCreateData
+          await onActivity(activity)
+        }
+      } catch (error) {
+        logger.error('Error processing stream item', error)
+        throw error
+      }
+    })
+    return res
+  } catch (error) {
+    logger.error('Error in stream processing', error)
+    throw error
+  } finally {
+    t.end()
+  }
-  return res
 }

Additionally, consider implementing batch processing for better performance:

const BATCH_SIZE = 1000;
let batch: IDbActivityCreateData[] = [];

for await (const item of stream) {
  const activity = item as unknown as IDbActivityCreateData;
  batch.push(activity);
  
  if (batch.length >= BATCH_SIZE) {
    await Promise.all(batch.map(onActivity));
    batch = [];
  }
}

if (batch.length > 0) {
  await Promise.all(batch.map(onActivity));
}
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1)

127-145: Consider architectural alternatives for bulk operations.

While the introduction of "raw" methods might be intended for bulk operations or synchronization tasks, bypassing security checks creates significant risks. Consider these alternatives:

  1. Create separate, well-documented internal methods specifically for bulk operations that maintain security checks but are optimized for performance.
  2. Implement batch processing methods that handle multiple records while preserving tenant isolation.
  3. Use database transactions to maintain data integrity during bulk operations.

Example approach:

public async batchUpdate(
  updates: Array<{ id: string; tenantId: string; segmentId: string; data: IDbActivityUpdateData }>,
): Promise<void> {
  return this.db().tx(async (t) => {
    const queries = updates.map(({ id, tenantId, segmentId, data }) => {
      const prepared = RepositoryBase.prepare(
        { ...data, updatedAt: new Date() },
        this.updateActivityColumnSet,
      )
      const query = this.dbInstance.helpers.update(prepared, this.updateActivityColumnSet)
      const condition = this.format(
        'where id = $(id) and "tenantId" = $(tenantId) and "segmentId" = $(segmentId)',
        { id, tenantId, segmentId }
      )
      return t.none(`${query} ${condition}`)
    })
    await t.batch(queries)
  })
}
services/apps/data_sink_worker/src/service/activity.service.ts (1)

647-662: Add error handling for database connection issues.

While the query structure is good, consider adding explicit error handling for potential database connection issues.

          const {
            rows: [dbActivity],
-          } = await queryActivities(this.qdbStore.connection(), {
+          } = await queryActivities(this.qdbStore.connection(), {
             tenantId,
             segmentIds: [segmentId],
             filter: {
               and: [
                 { timestamp: { eq: activity.timestamp } },
                 { sourceId: { eq: activity.sourceId } },
                 { platform: { eq: platform } },
                 { type: { eq: activity.type } },
                 { channel: { eq: activity.channel } },
               ],
             },
             limit: 1,
+          }).catch(error => {
+            this.log.error('Failed to query activities:', error);
+            throw new Error('Database query failed');
           })
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between b5d440e and fa56c49.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (16)
  • backend/package.json (1 hunks)
  • backend/src/bin/jobs/index.ts (2 hunks)
  • backend/src/bin/jobs/syncActivities.ts (1 hunks)
  • backend/src/bin/scripts/syncActivities.ts (1 hunks)
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql (1 hunks)
  • backend/src/services/activityService.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.data.ts (1 hunks)
  • services/apps/data_sink_worker/src/service/activity.service.ts (4 hunks)
  • services/apps/entity_merging_worker/src/activities/members.ts (1 hunks)
  • services/libs/data-access-layer/src/activities/sql.ts (0 hunks)
  • services/libs/data-access-layer/src/activities/update.ts (2 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2 hunks)
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts (1 hunks)
  • services/libs/logging/src/utility.ts (2 hunks)
💤 Files with no reviewable changes (1)
  • services/libs/data-access-layer/src/activities/sql.ts
🚧 Files skipped from review as they are similar to previous changes (11)
  • backend/package.json
  • backend/src/bin/jobs/index.ts
  • backend/src/bin/jobs/syncActivities.ts
  • backend/src/bin/scripts/syncActivities.ts
  • backend/src/database/migrations/V1730386050__activities-updated-at-index.sql
  • backend/src/services/activityService.ts
  • services/apps/data_sink_worker/src/service/activity.data.ts
  • services/apps/entity_merging_worker/src/activities/members.ts
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.data.ts
  • services/libs/data-access-layer/src/old/apps/profiles_worker/index.ts
  • services/libs/logging/src/utility.ts
🔇 Additional comments (10)
services/libs/data-access-layer/src/activities/update.ts (2)

35-57: ⚠️ Potential issue

Address transactional consistency and type safety concerns.

The current implementation has several potential issues:

  1. The previous review comment about transactional consistency is still valid
  2. The force flag in insertActivities is set to true without documentation
  3. Type safety concerns with spreading mapActivity result

Additionally:

  1. Document the purpose of the force flag:
 export async function updateActivities(
   qdb: DbConnOrTx,
   mapActivity: (activity: IDbActivityCreateData) => Promise<Partial<IDbActivityCreateData>>,
   where: string,
   params?: Record<string, string>,
 ): Promise<{ processed: number; duration: number }> {
+  // force=true is used to override existing activities
   return streamActivities(
  1. Add type safety:
-      ...(await mapActivity(activity)),
+      ...((await mapActivity(activity)) as Partial<IDbActivityCreateData>),

41-56: Verify the impact of streaming implementation on memory usage.

The streaming implementation might affect memory usage patterns differently from the previous implementation.

Let's analyze the query patterns:

✅ Verification successful

Let me gather more information about the streaming implementation to better understand its memory usage patterns.


Based on the gathered information, I can now provide a final response about the streaming implementation's memory usage.

The streaming implementation is well-designed for memory efficiency

The implementation uses pg-query-stream with a proper streaming pattern that processes records one at a time:

  • Uses QueryStream for reading data without loading entire result sets into memory
  • Processes activities individually through the streamActivities function
  • Inserts are handled one activity at a time, preventing memory accumulation

The memory usage is well-controlled because:

  1. The streaming cursor fetches rows on-demand
  2. Each activity is processed and inserted independently
  3. No batching or accumulation of records is performed in the streaming path
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for potential memory-intensive queries
rg -A 5 "SELECT \* FROM activities" --type ts

# Look for batch size configurations that might need adjustment
rg "BATCH_SIZE.*=.*[0-9]+" --type ts

Length of output: 2146


Script:

#!/bin/bash
# Check the streaming implementation details
ast-grep --pattern 'streamActivities($$$)'

# Look for QueryStream configurations and usage patterns
rg -B 2 -A 5 "QueryStream" --type ts

# Check for any batch processing in activities
rg -B 2 -A 5 "insertActivities" --type ts

Length of output: 16505

services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (3)

127-129: Security concern about tenant isolation was previously raised.

A past review comment already highlighted the need for tenant isolation and query optimization for this method.


132-140: Security and reliability concerns were previously raised.

A past review comment already highlighted critical issues regarding tenant isolation, error handling, and the unsafe nature of the "raw" prefix in this method.


142-145: Data integrity concerns were previously raised.

A past review comment already highlighted critical issues regarding missing required fields, tenant isolation, and the unsafe nature of direct insertion in this method.

services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (3)

5-6: LGTM! Imports are properly aligned with the implementation needs.

The new imports support the refactored implementation of moveIdentityActivitiesToNewMember.


128-149: Great improvements to type safety and query construction!

The refactored implementation brings several improvements:

  • Uses DbConnOrTx for better transaction support
  • Leverages type-safe activity updates through IDbActivityCreateData
  • Improves query maintainability using formatQuery

121-127: Verify the impact of removing batch processing.

The batchSize parameter has been removed from the function signature. While the new implementation is cleaner, we should verify that this doesn't impact performance when processing large datasets.

Let's check if there are any large-scale operations using this function:

services/apps/data_sink_worker/src/service/activity.service.ts (2)

4-10: LGTM: Import changes are well-structured.

The new imports, including generateUUIDv4, are logically grouped and necessary for the updated functionality.


647-662: Verify logging of sensitive information.

The code handles sensitive data like member identities and activities. While logging is used appropriately for debugging, ensure that sensitive information is not inadvertently exposed in logs.

Also applies to: 1085-1104

✅ Verification successful

Logging practices are appropriate and secure

After reviewing the logging patterns in the code:

  • All sensitive data logging is properly handled with appropriate log levels
  • Member identities are only logged in warn-level messages when there are specific identity-related issues
  • Activity IDs are used instead of full activity content in debug/trace logs
  • Error logs contain minimal necessary information for debugging
  • No PII or sensitive authentication data is exposed in logs
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for potential sensitive data logging
# Look for log statements that might contain sensitive information

# Test: Search for logging statements with potential sensitive data
rg -A 2 'this\.log\.(debug|info|warn|error).*member\.' 

Length of output: 56


Script:

#!/bin/bash
# Let's broaden the search to check for any logging patterns in the file and surrounding context
rg -A 2 'this\.log\.' services/apps/data_sink_worker/src/service/activity.service.ts

# Also check for general logging patterns in case different logger instances are used
rg -A 2 '(log|logger)\.(debug|info|warn|error)' services/apps/data_sink_worker/src/service/activity.service.ts

# Let's also check the actual data being queried to understand what sensitive fields might be present
ast-grep --pattern 'queryActivities($_, {
  $$$
})'

Length of output: 25869

@sausage-todd sausage-todd force-pushed the update-activities-in-postgres branch from fa56c49 to 3a42be8 Compare November 6, 2024 16:00
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (3)
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2)

121-149: Consider adding error handling and logging.

The implementation is solid, but for better observability and reliability, consider:

  • Adding error handling with appropriate error messages
  • Including logging for tracking the activity migration process

Example enhancement:

 export async function moveIdentityActivitiesToNewMember(
   db: DbConnOrTx,
   tenantId: string,
   fromId: string,
   toId: string,
   username: string,
   platform: string,
 ) {
+  try {
+    logger.info('Starting activity migration', { fromId, toId, username, platform });
     await updateActivities(
       db,
       async (activity: IDbActivityCreateData) => ({ ...activity, memberId: toId }),
       formatQuery(
         `
           "memberId" = $(fromId)
           and "tenantId" = $(tenantId)
           and "username" = $(username)
           and "platform" = $(platform)
           and "deletedAt" is null
         `,
         {
           fromId,
           tenantId,
           username,
           platform,
         },
       ),
       {
         memberId: fromId,
       },
     )
+    logger.info('Activity migration completed successfully');
+  } catch (error) {
+    logger.error('Failed to migrate activities', { error, fromId, toId });
+    throw new Error(`Failed to migrate activities: ${error.message}`);
+  }
 }

121-149: Consider wrapping in a transaction.

Since this operation modifies critical data, it should ideally be wrapped in a transaction to ensure data consistency.

The caller should handle the transaction, for example:

// Example usage in the caller
await db.transaction(async (tx) => {
  await moveIdentityActivitiesToNewMember(tx, tenantId, fromId, toId, username, platform);
  // Other related operations...
});
services/apps/data_sink_worker/src/service/activity.service.ts (1)

647-662: Add error handling for database query.

While the query structure is good, consider adding error handling for potential database connection issues.

          const {
            rows: [dbActivity],
-          } = await queryActivities(this.qdbStore.connection(), {
+          } = await queryActivities(this.qdbStore.connection(), {
             tenantId,
             segmentIds: [segmentId],
             filter: {
               and: [
                 { timestamp: { eq: activity.timestamp } },
                 { sourceId: { eq: activity.sourceId } },
                 { platform: { eq: platform } },
                 { type: { eq: activity.type } },
                 { channel: { eq: activity.channel } },
               ],
             },
             limit: 1,
+          }).catch(error => {
+            this.log.error('Failed to query activities:', error);
+            throw new Error('Database query failed');
+          });
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between fa56c49 and 3a42be8.

📒 Files selected for processing (5)
  • services/apps/data_sink_worker/src/service/activity.service.ts (4 hunks)
  • services/apps/entity_merging_worker/src/activities/members.ts (1 hunks)
  • services/libs/data-access-layer/src/activities/sql.ts (0 hunks)
  • services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (1 hunks)
  • services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (2 hunks)
💤 Files with no reviewable changes (1)
  • services/libs/data-access-layer/src/activities/sql.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • services/apps/entity_merging_worker/src/activities/members.ts
🔇 Additional comments (6)
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/activity.repo.ts (3)

126-128: ⚠️ Potential issue

Critical: Add tenant and segment isolation.

The method lacks tenant and segment isolation, which could allow unauthorized cross-tenant data access. This is a security vulnerability.

The previous review comment about adding tenant isolation is still valid. Apply this safer implementation:

-  public async existsWithId(id: string): Promise<boolean> {
-    const result = await this.db().oneOrNone('select 1 from activities where id = $(id)', { id })
-    return result !== null
+  public async existsWithId(id: string, tenantId: string, segmentId: string): Promise<boolean> {
+    return await this.db().oneOrNone(
+      'SELECT EXISTS(SELECT 1 FROM activities WHERE id = $(id) AND "tenantId" = $(tenantId) AND "segmentId" = $(segmentId))',
+      { id, tenantId, segmentId }
+    )
   }

131-139: ⚠️ Potential issue

Critical: Add tenant isolation and error handling.

The method has several security and reliability concerns that could lead to unauthorized data access and silent failures.

The previous review comment about adding tenant isolation and error handling is still valid. Apply this safer implementation:

-  public async rawUpdate(id: string, data: IDbActivityUpdateData): Promise<void> {
+  public async updateWithoutParentSync(
+    id: string,
+    tenantId: string,
+    segmentId: string,
+    data: IDbActivityUpdateData
+  ): Promise<void> {
     const prepared = RepositoryBase.prepare(
       { ...data, updatedAt: new Date() },
       this.updateActivityColumnSet,
     )
     const query = this.dbInstance.helpers.update(prepared, this.updateActivityColumnSet)
-    const condition = this.format('where id = $(id)', { id })
-    await this.db().none(`${query} ${condition}`)
+    const condition = this.format(
+      'where id = $(id) and "tenantId" = $(tenantId) and "segmentId" = $(segmentId)',
+      { id, tenantId, segmentId }
+    )
+    const result = await this.db().result(`${query} ${condition}`)
+    this.checkUpdateRowCount(result.rowCount, 1)
   }

141-144: ⚠️ Potential issue

Critical: Add data integrity checks and tenant isolation.

The method lacks essential data integrity checks and security measures.

The previous review comment about adding data integrity checks and tenant isolation is still valid. Apply this safer implementation:

-  public async rawInsert(data: IDbActivityCreateData): Promise<void> {
+  public async insertWithoutParentSync(
+    tenantId: string,
+    segmentId: string,
+    data: IDbActivityCreateData
+  ): Promise<string> {
+    const id = generateUUIDv1()
+    const ts = new Date()
     const prepared = RepositoryBase.prepare(
-      data,
+      {
+        ...data,
+        id,
+        tenantId,
+        segmentId,
+        createdAt: ts,
+        updatedAt: ts
+      },
       this.insertActivityColumnSet,
     )
     const query = this.dbInstance.helpers.insert(prepared, this.insertActivityColumnSet)
     await this.db().none(query)
+    return id
   }
services/libs/data-access-layer/src/old/apps/entity_merging_worker/index.ts (1)

5-6: LGTM: Import additions are appropriate.

The new imports for formatQuery and IDbActivityCreateData are well-organized and support the refactored implementation.

services/apps/data_sink_worker/src/service/activity.service.ts (2)

4-10: LGTM: Import statements are properly organized.

The new imports for generateUUIDv4 and queryActivities are correctly added and well-organized.

Also applies to: 12-12


1085-1085: Previous comment about ID generation logic is still valid.

The current ID generation logic could lead to collisions in high-concurrency scenarios.

@sausage-todd sausage-todd merged commit b107506 into main Nov 6, 2024
7 checks passed
@sausage-todd sausage-todd deleted the update-activities-in-postgres branch November 6, 2024 16:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant