Skip to content

Commit

Permalink
Harmonize storages
Browse files Browse the repository at this point in the history
  • Loading branch information
ajthinking committed Jan 26, 2025
1 parent 27751e2 commit ff1da29
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 57 deletions.
57 changes: 25 additions & 32 deletions packages/core/src/InputObserverController.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ describe('InputObserverController', () => {
});

it('should return undefined if node status is not set', async () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);

const status = await mockStorage.getNodeStatus('nodeId');
expect(status).toBeUndefined();
});

it('should set and get node status', async () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);

await controller.reportNodeStatus('nodeId', 'BUSY');
Expand All @@ -38,7 +38,7 @@ describe('InputObserverController', () => {
});

it('should emit node status', () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);

controller.addNodeStatusObserver({
Expand All @@ -53,7 +53,7 @@ describe('InputObserverController', () => {
});

it('should emit node status with throttle', () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);

controller.addNodeStatusObserver({
Expand All @@ -69,7 +69,7 @@ describe('InputObserverController', () => {
});

it('should emit node status with multiple nodes', () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);

controller.addNodeStatusObserver({
Expand All @@ -85,7 +85,7 @@ describe('InputObserverController', () => {
});

it('should emit node status with multiple observers', async () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);

controller.addNodeStatusObserver({
Expand All @@ -102,11 +102,9 @@ describe('InputObserverController', () => {

it('should emit node status with async storage', async () => {
const nodeStatusStorage = new Map<NodeId, NodeStatus>();
const mockStorage = new DiagramObserverStorage(
undefined,
undefined,
nodeStatusStorage
);
const mockStorage = new DiagramObserverStorage('_');
mockStorage.setNodeStatuses(nodeStatusStorage)

mockStorage.setNodeStatus = async (nodeId: NodeId, status: NodeStatus) => {
await new Promise((resolve) => setTimeout(() => {
nodeStatusStorage.set(nodeId, status);
Expand Down Expand Up @@ -135,15 +133,15 @@ describe('InputObserverController', () => {
});

it('should return undefined if link count is not set', async () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);

const count = await mockStorage.getLinkCount('linkId');
expect(count).toBeUndefined();
});

it('should set and get link count', async () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);

await controller.reportLinksCount({
Expand All @@ -154,7 +152,7 @@ describe('InputObserverController', () => {
});

it('should emit link count', () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);
const linksCountObserver = { type: RequestObserverType.observeLinkCounts,
linkId: 'linkId', count: 10} as const;
Expand All @@ -171,7 +169,7 @@ describe('InputObserverController', () => {
});

it('should emit link count with throttle', () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);
const linksCountObserver = { type: RequestObserverType.observeLinkCounts,
linkId: 'linkId', count: 10} as const;
Expand All @@ -189,7 +187,7 @@ describe('InputObserverController', () => {
});

it('should emit link count with multiple links', () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);

controller.addLinkCountsObserver({
Expand All @@ -209,11 +207,9 @@ describe('InputObserverController', () => {

it('should emit link count with async storage', async () => {
const linkCountsStorage = new Map<LinkId, number>();
const mockStorage = new DiagramObserverStorage(
linkCountsStorage,
undefined,
undefined,
);
const mockStorage = new DiagramObserverStorage('_')
mockStorage.setLinkCounts(linkCountsStorage);

mockStorage.setLinkCount = async (linkId: LinkId, count: number) => {
await new Promise((resolve) => setTimeout(() => {
linkCountsStorage.set(linkId, count);
Expand Down Expand Up @@ -243,7 +239,7 @@ describe('InputObserverController', () => {
});

it('should return undefined if link items are not set', async () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');

const items = await mockStorage.getLinkItems({
linkId: 'linkId', offset: 0, limit: 100
Expand All @@ -252,7 +248,7 @@ describe('InputObserverController', () => {
});

it('should set and get link items', async () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);

await controller.reportItems({
Expand All @@ -264,7 +260,7 @@ describe('InputObserverController', () => {
});

it('should emit link items', () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);
const linkItemsParam: LinkItemsParam = { type: RequestObserverType.observeLinkItems,
linkId: 'linkId', items: [{value: 1}]};
Expand All @@ -282,7 +278,7 @@ describe('InputObserverController', () => {
});

it('should emit link items with throttle', async () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);
const linkItemsParam: LinkItemsParam = { type: RequestObserverType.observeLinkItems,
linkId: 'linkId', items: [{value: 1}]};
Expand All @@ -301,7 +297,7 @@ describe('InputObserverController', () => {
});

it('should emit link items with multiple links', () => {
const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
const controller = new InputObserverController(mockStorage);
const linkItemsParam1: LinkItemsParam = { type: RequestObserverType.observeLinkItems,
linkId: 'linkId1', items: [{value: 1}]};
Expand All @@ -323,11 +319,8 @@ describe('InputObserverController', () => {

it('should emit link items with async storage', async () => {
const linkItemsStorage = new Map<LinkId, ItemValue[]>();
const mockStorage = new DiagramObserverStorage(
undefined,
linkItemsStorage,
undefined,
);
const mockStorage = new DiagramObserverStorage('_')
mockStorage.setLinksItems(linkItemsStorage);

mockStorage.appendLinkItems = async (linkId: LinkId, items: ItemValue[]) => {
await new Promise((resolve) => setTimeout(() => {
Expand Down Expand Up @@ -368,7 +361,7 @@ describe('InputObserverController', () => {
throttleMs:0,
};

const mockStorage = new DiagramObserverStorage();
const mockStorage = new DiagramObserverStorage('_');
controller = new InputObserverController(mockStorage);
});

Expand Down
26 changes: 20 additions & 6 deletions packages/core/src/storage/diagramObserverStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ import { ItemValue } from '../types/ItemValue';
import { LinkId } from '../types/Link';
import { NodeId } from '../types/Node';
import { NodeStatus } from '../Executor';
import { GetLinkItemsParams, ObserverStorage } from '../types/ObserverStorage';
import { DiagramId, GetLinkItemsParams, ObserverStorage } from '../types/ObserverStorage';

/**
* implementation of ObserverStorage using Maps
*/
export class DiagramObserverStorage implements ObserverStorage {
private diagramId: DiagramId;
private linkCountsStorage: Map<LinkId, number> = new Map()
private linkItemsStorage: Map<LinkId, ItemValue[]> = new Map()
private nodeStatusStorage: Map<NodeId, NodeStatus> = new Map()

constructor(
private linkCountsStorage: Map<LinkId, number> = new Map(),
private linkItemsStorage: Map<LinkId, ItemValue[]> = new Map(),
private nodeStatusStorage: Map<NodeId, NodeStatus> = new Map()
) {}
constructor(diagramId: DiagramId) {
this.diagramId = diagramId;
}

// Link Counts
async getLinkCount(linkId: LinkId): Promise<number | undefined> {
Expand All @@ -24,6 +26,10 @@ export class DiagramObserverStorage implements ObserverStorage {
this.linkCountsStorage.set(linkId, count);
}

async setLinkCounts(counts: Map<LinkId, number>): Promise<void> {
this.linkCountsStorage = counts;
}

// Link Items
async getLinkItems({linkId, offset, limit}: GetLinkItemsParams): Promise<ItemValue[] | undefined> {
const storageItems = this.linkItemsStorage.get(linkId)?.slice(offset, offset + limit);
Expand All @@ -34,6 +40,10 @@ export class DiagramObserverStorage implements ObserverStorage {
this.linkItemsStorage.set(linkId, items);
}

async setLinksItems(linksItems: Map<LinkId, ItemValue[]>): Promise<void> {
this.linkItemsStorage = linksItems;
}

async appendLinkItems(linkId: LinkId, items: ItemValue[]): Promise<void> {
const currentItems = this.linkItemsStorage.get(linkId) ?? [];
this.linkItemsStorage.set(linkId, currentItems.concat(items));
Expand All @@ -48,6 +58,10 @@ export class DiagramObserverStorage implements ObserverStorage {
this.nodeStatusStorage.set(nodeId, status);
}

async setNodeStatuses(statuses: Map<NodeId, NodeStatus>): Promise<void> {
this.nodeStatusStorage = statuses;
}

async close(): Promise<void> {
this.linkCountsStorage.clear();
this.linkItemsStorage.clear();
Expand Down
19 changes: 13 additions & 6 deletions packages/ds-ext/src/DiagramEditorProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ export class DiagramEditorProvider implements vscode.CustomEditorProvider<Diagra
private observerStorage!: ObserverStorage;
private config: DataStoryConfig;

async dispose(): Promise<void> {
await this.observerStorage.close();
}

constructor(private readonly context: vscode.ExtensionContext) {
this.config = loadConfig(this.context);
}

async dispose(): Promise<void> {
await this.observerStorage?.close();
}

private async initializeStorage(diagramId: string) {
const storages = {
DUCK_DB: DuckDBStorage,
FILE: FileStorage,
Expand All @@ -44,11 +46,13 @@ export class DiagramEditorProvider implements vscode.CustomEditorProvider<Diagra
if(!Storage) throw new Error(`Unknown storage type: ${this.config.storage}`);

try {
this.observerStorage = new Storage();
this.observerStorage = new Storage(diagramId);
await this.observerStorage.init?.();
console.log(`Successfully initialized storage ${this.config.storage}`);
} catch (error) {
console.log(`Failed to initialize storage ${this.config.storage}. Using in-memory storage instead.`);
this.observerStorage = new DiagramObserverStorage();
this.observerStorage = new DiagramObserverStorage(diagramId);
await this.observerStorage.init?.();
}

this.inputObserverController = new InputObserverController(this.observerStorage);
Expand All @@ -59,6 +63,9 @@ export class DiagramEditorProvider implements vscode.CustomEditorProvider<Diagra
_openContext: vscode.CustomDocumentOpenContext,
_token: vscode.CancellationToken
): Promise<DiagramDocument> {
// Initialize storage with diagram ID from the file name
const diagramId = path.basename(uri.fsPath);
await this.initializeStorage(diagramId);
return DiagramDocument.create(uri);
}

Expand Down
7 changes: 4 additions & 3 deletions packages/ds-ext/src/duckDBStorage.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { GetLinkItemsParams, ItemValue, LinkCount, LinkId, NodeId, ObserverStorage } from '@data-story/core';
import { DiagramId, GetLinkItemsParams, ItemValue, LinkCount, LinkId, NodeId, ObserverStorage } from '@data-story/core';
import type { Database as DatabaseType} from 'duckdb-async';
import { createDataStoryDBPath } from './commands/createDataStoryDBPath';
export class DuckDBStorage implements ObserverStorage {
private db: DatabaseType | null = null;
private insertSequence: bigint = BigInt(0);
private diagramId: DiagramId;

constructor() {
this.init();
constructor(diagramId: DiagramId) {
this.diagramId = diagramId;
}

async init() {
Expand Down
34 changes: 26 additions & 8 deletions packages/ds-ext/src/fileStorage.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { GetLinkItemsParams, ItemValue, LinkId, NodeId, ObserverStorage } from '@data-story/core';
import { GetLinkItemsParams, ItemValue, LinkId, NodeId, ObserverStorage, DiagramId } from '@data-story/core';
import * as fs from 'fs';
import * as path from 'path';
import * as vscode from 'vscode';
Expand All @@ -10,12 +10,30 @@ type FileStorageData = {
}

export class FileStorage implements ObserverStorage {
constructor() {
this.write({
linkCounts: {},
linkItems: {},
nodes: {}
});
constructor(private diagramId: DiagramId) {}

async init(): Promise<void> {
const datastoryDir = path.join(
vscode.workspace.workspaceFolders![0].uri.fsPath,
'.datastory',
'storage',
this.diagramId
);

// Ensure the directory exists
if (!fs.existsSync(datastoryDir)) {
fs.mkdirSync(datastoryDir, { recursive: true });
}

// Initialize storage file if it doesn't exist
const filePath = this.getFilePath();
if (!fs.existsSync(filePath)) {
this.write({
linkCounts: {},
linkItems: {},
nodes: {}
});
}
}

async close() {}
Expand Down Expand Up @@ -61,7 +79,7 @@ export class FileStorage implements ObserverStorage {

private getFilePath() {
const workspacePath = vscode.workspace.workspaceFolders![0].uri.fsPath;
const datastoryDir = path.join(workspacePath, '.datastory');
const datastoryDir = path.join(workspacePath, '.datastory', 'storage', this.diagramId);
return path.join(datastoryDir, 'execution.json');
}

Expand Down
2 changes: 1 addition & 1 deletion packages/nodejs/src/server/SocketServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export class SocketServer {
this.app = app;
this.port = port;
this.messageHandlers = messageHandlers;
const storage = new DiagramObserverStorage();
const storage = new DiagramObserverStorage('_');
this.inputObserverController = new InputObserverController(storage);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/ui/src/components/DataStory/mockJSServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export class MockJSServer {
private messageHandlers = {};

constructor({ app, messageHandlers }: {app: Application, messageHandlers?: MessageHandlers}) {
const storage = new DiagramObserverStorage();
const storage = new DiagramObserverStorage('_');
const inputObserverController = new InputObserverController(storage);

this.messageHandlers = messageHandlers ?? getDefaultMsgHandlers(app, inputObserverController);
Expand Down

0 comments on commit ff1da29

Please sign in to comment.