Skip to content

Commit

Permalink
Use Map.groupBy
Browse files Browse the repository at this point in the history
  • Loading branch information
simolus3 committed Feb 5, 2025
1 parent efb834e commit b8b7836
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 59 deletions.
2 changes: 1 addition & 1 deletion .nvmrc
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v20.12.2
v22.13.1
17 changes: 13 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
"start:service": "pnpm --filter @powersync/service-image watch",
"clean": "pnpm run -r clean",
"release": "pnpm build:production && pnpm changeset publish",
"test": "pnpm run -r test"
"test": "pnpm run -r test",
"vitest": "vitest"
},
"devDependencies": {
"@changesets/cli": "^2.27.8",
"@types/node": "^22.5.5",
"@types/node": "^22.13.1",
"async": "^3.2.4",
"bson": "^6.8.0",
"concurrently": "^8.2.2",
Expand All @@ -34,11 +35,19 @@
"rsocket-core": "1.0.0-alpha.3",
"rsocket-websocket-client": "1.0.0-alpha.3",
"semver": "^7.5.4",
"tsc-watch": "^6.2.0",
"ts-node-dev": "^2.0.0",
"typescript": "^5.6.2",
"tsc-watch": "^6.2.0",
"typescript": "^5.7.3",
"vite-tsconfig-paths": "^4.3.2",
"vitest": "^2.1.1",
"ws": "^8.2.3"
},
"pnpm": {
"ignoredBuiltDependencies": [
"esbuild"
],
"onlyBuiltDependencies": [
"esbuild"
]
}
}
35 changes: 12 additions & 23 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,39 +219,28 @@ async function* streamResponseInner(
// TODO: This is not really how bucket priorities are supposed to be implemented - we might want to listen for new write
// checkpoints while we're serving this one too. When we're syncing low-priority buckets of this checkpoint, we could interrupt
// this batch and serve the high-priority buckets of the new checkpoint first?
bucketsToFetch.sort((a, b) => a.priority - b.priority);
let firstBucketInSamePriority = 0;
let currentPriority = bucketsToFetch[0]!.priority;
const lowestPriority = bucketsToFetch.at(-1)!.priority;

const bucketDataWithPriority = (endIndex?: number) => {
return bucketDataInBatches({
const bucketsByPriority = [...Map.groupBy(bucketsToFetch, (bucket) => bucket.priority).entries()]
bucketsByPriority.sort((a, b) => b[0] - a[0]) // Inverting sort order, high priority buckets have smaller priority values
const lowestPriority = bucketsByPriority.at(-1)?.[0]

// This incrementally updates dataBuckets with each individual bucket position.
// At the end of this, we can be sure that all buckets have data up to the checkpoint.
for (const [priority, buckets] of bucketsByPriority) {
yield* bucketDataInBatches({
storage,
checkpoint,
bucketsToFetch: bucketsToFetch.slice(firstBucketInSamePriority, endIndex),
bucketsToFetch: buckets,
dataBuckets,
raw_data,
binary_data,
signal,
tracker,
user_id: syncParams.user_id,
forPriority: currentPriority !== lowestPriority ? currentPriority : undefined
// Passing undefined will emit a full sync complete message at the end. If we pass a priority, we'll emit a partial
// sync complete message.
forPriority: priority !== lowestPriority ? priority : undefined
});
};

for (let i = 0; i < bucketsToFetch.length; i++) {
if (bucketsToFetch[i].priority == currentPriority) {
continue;
}

// This incrementally updates dataBuckets with each individual bucket position.
// At the end of this, we can be sure that all buckets have data up to the checkpoint.
yield* bucketDataWithPriority(i);
firstBucketInSamePriority = i;
currentPriority = bucketsToFetch[i].priority;
}
// Sync highest priority
yield* bucketDataWithPriority();

await new Promise((resolve) => setTimeout(resolve, 10));
}
Expand Down
Loading

0 comments on commit b8b7836

Please sign in to comment.