Skip to content

Commit

Permalink
UBERF-8578: Fix extra stat call for storage adapter
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
  • Loading branch information
haiodo committed Nov 8, 2024
1 parent ad54f56 commit 10e5d3a
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 62 deletions.
21 changes: 12 additions & 9 deletions server/collaboration/src/utils/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ export async function yDocFromStorage (
ydoc?: YDoc
): Promise<YDoc | undefined> {
// stat the object to ensure it exists, because read will throw an error in this case
const blob = await storageAdapter.stat(ctx, workspace, documentId)
if (blob === undefined) {
return undefined
}
try {
const buffer = await storageAdapter.read(ctx, workspace, documentId)

// no need to apply gc because we load existing document
// it is either already gc-ed, or gc not needed and it is disabled
ydoc ??= new YDoc({ guid: generateId(), gc: false })
// no need to apply gc because we load existing document
// it is either already gc-ed, or gc not needed and it is disabled
ydoc ??= new YDoc({ guid: generateId(), gc: false })

const buffer = await storageAdapter.read(ctx, workspace, documentId)
return yDocFromBuffer(Buffer.concat(buffer as any), ydoc)
return yDocFromBuffer(Buffer.concat(buffer as any), ydoc)
} catch (err: any) {
if (err.code === 'NoSuchKey') {
return undefined
}
throw err
}
}

/** @public */
Expand Down
13 changes: 12 additions & 1 deletion server/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import type { Request, Response } from '@hcengineering/rpc'
import type { Token } from '@hcengineering/server-token'
import { type Readable } from 'stream'
import type { DbAdapter, DomainHelper } from './adapter'
import { type StorageAdapter } from './storage'
import type { StatisticsElement } from './stats'
import { type StorageAdapter } from './storage'

export interface ServerFindOptions<T extends Doc> extends FindOptions<T> {
domain?: Domain // Allow to find for Doc's in specified domain only.
Expand Down Expand Up @@ -519,6 +519,17 @@ export interface StorageConfig {
port?: number
}

export class NoSuchKeyError extends Error {
code: string
constructor (
msg: string,
readonly cause?: any
) {
super(msg)
this.code = 'NoSuchKey'
}
}

export interface StorageConfiguration {
default: string
storages: StorageConfig[]
Expand Down
2 changes: 1 addition & 1 deletion server/front/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ export function start (
}

let blobInfo = await ctx.with(
'notoken-stat',
'stat',
{ workspace: payload.workspace.name },
async (ctx) => await config.storageAdapter.stat(ctx, payload.workspace, uuid)
)
Expand Down
32 changes: 19 additions & 13 deletions server/s3/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import core, {
} from '@hcengineering/core'
import { getMetadata } from '@hcengineering/platform'
import serverCore, {
NoSuchKeyError,
type BlobStorageIterator,
type ListBlobResult,
type StorageAdapter,
Expand Down Expand Up @@ -316,21 +317,26 @@ export class S3Service implements StorageAdapter {
}

async doGet (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string, range?: string): Promise<Readable> {
const res = await this.client.getObject({
Bucket: this.getBucketId(workspaceId),
Key: this.getDocumentKey(workspaceId, objectName),
Range: range
})
try {
const res = await this.client.getObject({
Bucket: this.getBucketId(workspaceId),
Key: this.getDocumentKey(workspaceId, objectName),
Range: range
})

const stream = res.Body?.transformToWebStream()
const stream = res.Body?.transformToWebStream()

if (stream !== undefined) {
return Readable.fromWeb(stream as ReadableStream<any>)
} else {
const readable = new Readable()
readable._read = () => {}
readable.push(null)
return readable
if (stream !== undefined) {
return Readable.fromWeb(stream as ReadableStream<any>)
} else {
const readable = new Readable()
readable._read = () => {}
readable.push(null)
return readable
}
} catch (err: any) {
// In case of error return undefined
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`, err)
}
}

Expand Down
72 changes: 34 additions & 38 deletions server/server-storage/src/fallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ export class FallbackStorageAdapter implements StorageAdapter, StorageAdapterEx
return result
}

@withContext('aggregator-delete', {})
@withContext('fallback-delete', {})
async delete (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
for (const { adapter } of this.adapters) {
if (await adapter.exists(ctx, workspaceId)) {
Expand All @@ -153,7 +153,7 @@ export class FallbackStorageAdapter implements StorageAdapter, StorageAdapterEx
}
}

@withContext('aggregator-remove', {})
@withContext('fallback-remove', {})
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {
// Group by provider and delegate into it.
for (const { adapter } of this.adapters) {
Expand All @@ -173,61 +173,57 @@ export class FallbackStorageAdapter implements StorageAdapter, StorageAdapterEx
}
}

@withContext('aggregator-stat', {})
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Blob | undefined> {
const result = await this.findProvider(ctx, workspaceId, name)
if (result !== undefined) {
result.stat.provider = result.name
}
return result?.stat
}

@withContext('aggregator-get', {})
async get (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Readable> {
const result = await this.findProvider(ctx, workspaceId, name)
if (result === undefined) {
throw new NoSuchKeyError(`${workspaceId.name} missing ${name}`)
}
return await result.adapter.get(ctx, workspaceId, result.stat._id)
}

@withContext('find-provider', {})
private async findProvider (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string
): Promise<{ name: string, adapter: StorageAdapter, stat: Blob } | undefined> {
// Group by provider and delegate into it.
@withContext('fallback-stat', {})
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Blob | undefined> {
for (const { name, adapter } of this.adapters) {
const stat = await adapter.stat(ctx, workspaceId, objectName)
if (stat !== undefined) {
return { name, adapter, stat }
stat.provider = name
return stat
}
}
}

@withContext('fallback-get', {})
async get (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Readable> {
for (const { adapter } of this.adapters) {
try {
return await adapter.get(ctx, workspaceId, objectName)
} catch (err: any) {
// ignore
}
}
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
}

@withContext('aggregator-partial', {})
@withContext('fallback-partial', {})
async partial (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
offset: number,
length?: number | undefined
): Promise<Readable> {
const result = await this.findProvider(ctx, workspaceId, objectName)
if (result === undefined) {
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
for (const { adapter } of this.adapters) {
try {
return await adapter.partial(ctx, workspaceId, objectName, offset, length)
} catch (err: any) {
// ignore
}
}
return await result.adapter.partial(ctx, workspaceId, result.stat._id, offset, length)
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
}

@withContext('aggregator-read', {})
@withContext('fallback-read', {})
async read (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Buffer[]> {
const result = await this.findProvider(ctx, workspaceId, objectName)
if (result === undefined) {
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
for (const { adapter } of this.adapters) {
try {
return await adapter.read(ctx, workspaceId, objectName)
} catch (err: any) {
// Ignore
}
}
return await result.adapter.read(ctx, workspaceId, result.stat._id)
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
}

@withContext('aggregator-put', {})
Expand Down

0 comments on commit 10e5d3a

Please sign in to comment.