Skip to content

Commit

Permalink
finalize sync process
Browse files Browse the repository at this point in the history
  • Loading branch information
adrien2p committed Jan 31, 2025
1 parent b6bfa89 commit 825d277
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 58 deletions.
27 changes: 19 additions & 8 deletions integration-tests/modules/__tests__/index/sync.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ medusaIntegrationTestRunner({
})

describe("Index engine syncing", () => {
it.only("should sync the data to the index based on the indexation configuration", async () => {
it("should sync the data to the index based on the indexation configuration", async () => {
console.info("[Index engine] Creating products")

await populateData(api, {
productCount: 10,
variantCount: 50,
priceCount: 10,
productCount: 2,
variantCount: 2,
priceCount: 2,
})

console.info("[Index engine] Creating products done")
Expand Down Expand Up @@ -102,7 +102,7 @@ medusaIntegrationTestRunner({

console.info("[Index engine] Sync done")

// 28 ms - 10 + 10 * 50 + 10 * 50 * 10 = 510
// 28 ms - 6511 records
const { data: results } = await indexEngine.query<"product">({
fields: [
"product.*",
Expand All @@ -111,11 +111,11 @@ medusaIntegrationTestRunner({
],
})

expect(results.length).toBe(10)
expect(results.length).toBe(2)
for (const result of results) {
expect(result.variants.length).toBe(50)
expect(result.variants.length).toBe(2)
for (const variant of result.variants) {
expect(variant.prices.length).toBe(10)
expect(variant.prices.length).toBe(2)
}
}
})
Expand Down Expand Up @@ -216,6 +216,17 @@ medusaIntegrationTestRunner({

expect(updatedResults.length).toBe(1)
expect(updatedResults[0].variants.length).toBe(1)

let staledRaws = await dbConnection.raw(
'SELECT * FROM "index_data" WHERE "staled_at" IS NOT NULL'
)

expect(staledRaws.rows.length).toBe(0)

staledRaws = await dbConnection.raw(
'SELECT * FROM "index_relation" WHERE "staled_at" IS NOT NULL'
)
expect(staledRaws.rows.length).toBe(0)
})
},
})
77 changes: 42 additions & 35 deletions packages/modules/index/src/services/data-synchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
CommonEvents,
ContainerRegistrationKeys,
groupBy,
Modules,
promiseAll,
} from "@medusajs/framework/utils"
Expand Down Expand Up @@ -44,6 +45,7 @@ export class DataSynchronizer {
return this.#container.indexDataService
}

// @ts-ignore
get #indexRelationService(): ModulesSdkTypes.IMedusaInternalService<any> {
return this.#container.indexRelationService
}
Expand Down Expand Up @@ -103,28 +105,48 @@ export class DataSynchronizer {

const staleCondition = staleOnly ? { staled_at: { $ne: null } } : {}

// Clean up staled data
await promiseAll([
this.#indexRelationService.delete({
selector: {
...staleCondition,
$or: [
{
parent_name: entities,
const dataToDelete = await this.#indexDataService.list({
...staleCondition,
name: entities,
})

const toDeleteByEntity = groupBy(dataToDelete, "name")

for (const entity of toDeleteByEntity.keys()) {
const records = toDeleteByEntity.get(entity)
const ids = records?.map(
(record: { data: { id: string } }) => record.data.id
)
if (!ids?.length) {
continue
}

if (this.#schemaObjectRepresentation[entity]) {
// Here we assume that some data have been deleted from from the source and we are cleaning since they are still staled in the index and we remove them from the index

// TODO: expand storage provider interface
await (this.#storageProvider as any).onDelete({
entity,
data: ids,
schemaEntityObjectRepresentation:
this.#schemaObjectRepresentation[entity],
})
} else {
// Here we assume that the entity is not indexed anymore as it is not part of the schema object representation and we are cleaning the index
await promiseAll([
this.#indexDataService.delete({
selector: {
name: entity,
},
{
child_name: entities,
}),
this.#indexRelationService.delete({
selector: {
$or: [{ parent_id: entity }, { child_id: entity }],
},
],
},
}),
this.#indexDataService.delete({
selector: {
...staleCondition,
name: entities,
},
}),
])
}),
])
}
}
}

async #updatedStatus(entity: string, status: IndexMetadataStatus) {
Expand Down Expand Up @@ -157,21 +179,6 @@ export class DataSynchronizer {
name: entity,
},
}),
this.#indexRelationService.update({
data: {
staled_at: new Date(),
},
selector: {
$or: [
{
parent_name: entity,
},
{
child_name: entity,
},
],
},
}),
])

const finalAcknoledgement = await this.syncEntity({
Expand Down
20 changes: 5 additions & 15 deletions packages/modules/index/src/services/postgres-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
* @protected
*/
@InjectTransactionManager()
protected async onCreate<
TData extends { id: string; [key: string]: unknown }
>(
async onCreate<TData extends { id: string; [key: string]: unknown }>(
{
entity,
data,
Expand Down Expand Up @@ -434,9 +432,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
* @protected
*/
@InjectTransactionManager()
protected async onUpdate<
TData extends { id: string; [key: string]: unknown }
>(
async onUpdate<TData extends { id: string; [key: string]: unknown }>(
{
entity,
data,
Expand Down Expand Up @@ -480,9 +476,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
* @protected
*/
@InjectTransactionManager()
protected async onDelete<
TData extends { id: string; [key: string]: unknown }
>(
async onDelete<TData extends { id: string; [key: string]: unknown }>(
{
entity,
data,
Expand Down Expand Up @@ -534,9 +528,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
* @protected
*/
@InjectTransactionManager()
protected async onAttach<
TData extends { id: string; [key: string]: unknown }
>(
async onAttach<TData extends { id: string; [key: string]: unknown }>(
{
entity,
data,
Expand Down Expand Up @@ -677,9 +669,7 @@ export class PostgresProvider implements IndexTypes.StorageProvider {
* @protected
*/
@InjectTransactionManager()
protected async onDetach<
TData extends { id: string; [key: string]: unknown }
>(
async onDetach<TData extends { id: string; [key: string]: unknown }>(
{
entity,
data,
Expand Down

0 comments on commit 825d277

Please sign in to comment.