Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tiered-limit datastore #301

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/datastore-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@
"./tiered": {
"types": "./dist/src/tiered.d.ts",
"import": "./dist/src/tiered.js"
},
"./tiered-limit": {
"types": "./dist/src/tiered-limit.d.ts",
"import": "./dist/src/tiered-limit.js"
}
},
"eslintConfig": {
Expand Down
1 change: 1 addition & 0 deletions packages/datastore-core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export { ShardingDatastore } from './sharding.js'
export { MountDatastore } from './mount.js'
export { TieredDatastore } from './tiered.js'
export { NamespaceDatastore } from './namespace.js'
export { TieredLimitDatastore } from './tiered-limit.js'

export { Errors }
export { shard }
Expand Down
172 changes: 172 additions & 0 deletions packages/datastore-core/src/tiered-limit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import { logger } from '@libp2p/logger'
import { type Key, type KeyQuery, type Pair, type Query } from 'interface-datastore'
import { BaseDatastore } from './base.js'
import type { AbortOptions, AwaitIterable } from 'interface-store'

/**
* @example for memory store limited to 1MB, where extra data is dropped
*
* ```typescript
* import { MemoryDatastore } from 'datastore-core'
* import { TieredLimitDatastore } from 'datastore-core'
* import { BlackHoleDatastore } from 'datastore-core'
*
* const tieredLimitDatastore = new TieredLimitDatastore({
* maxBytes: 1024 * 1024, // 1MB limit
* store: new MemoryDatastore()
* }, new BlackHoleDatastore())
* ```
*/

const log = logger('datastore:core:tiered-limit')

export class TieredLimitDatastore<T extends BaseDatastore, T2 extends BaseDatastore> extends BaseDatastore {
private readonly primaryStore: T
private readonly backingStore: T2
private readonly maxBytes: number
private currentBytes: number = 0
/**
* Tracks sizes of items
*
* Note: this map is not taken into account when considering the maxBytes limit
*/
private readonly sizeMap = new Map<Key, number>()
/**
* Tracks order for eviction
* keys are added to the end of the array when added or updated
* keys are removed from the start of the array when evicted
* Note: size of keys is not tracked, so if you have large keys, you should
* increase the maxBytes limit accordingly
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* increase the maxBytes limit accordingly
* decrease the maxBytes limit accordingly

*/
private readonly evictionOrder: Key[] = []

constructor ({ maxBytes, store }: { maxBytes: number, store: T }, backingStore: T2) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not completely happy with these constructor args either...

super()
this.primaryStore = store
this.backingStore = backingStore
this.maxBytes = maxBytes
}

private updateSize (key: Key, sizeDelta: number): void {
this.currentBytes += sizeDelta
if (sizeDelta > 0) {
// If adding or updating size, push key to eviction order
this.evictionOrder.push(key)
} else {
// If reducing size, find and remove the key from eviction order
const index = this.evictionOrder.indexOf(key)
if (index !== -1) this.evictionOrder.splice(index, 1)
}
}

/**
* Evict items from primary store to backing store until required space is available
*/
private async evictSpace (requiredSpace: number): Promise<void> {
if (requiredSpace <= 0) {
return // No need to evict negative space
}

Check warning on line 68 in packages/datastore-core/src/tiered-limit.ts

View check run for this annotation

Codecov / codecov/patch

packages/datastore-core/src/tiered-limit.ts#L67-L68

Added lines #L67 - L68 were not covered by tests
if (this.currentBytes + requiredSpace > this.maxBytes && this.evictionOrder.length > 0) {
log.trace('Evicting %d bytes from primary store to backing store', requiredSpace)
while (this.currentBytes + requiredSpace > this.maxBytes && this.evictionOrder.length > 0) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could probably be smarter here and use .batch()

const keyToEvict = this.evictionOrder.shift() // Get the oldest key
if (keyToEvict == null) {
// this shouldn't happen, but if it does:
// TODO: do we want to just add to the backingStore if we can't evict?
throw new Error('Need to evict but nothing else to evict. Is the item you are trying to add too large?')
}

Check warning on line 77 in packages/datastore-core/src/tiered-limit.ts

View check run for this annotation

Codecov / codecov/patch

packages/datastore-core/src/tiered-limit.ts#L74-L77

Added lines #L74 - L77 were not covered by tests
const size = this.sizeMap.get(keyToEvict)
if (size == null) {
throw new Error('Key to evict not found in size map. This should not happen.')
}

Check warning on line 81 in packages/datastore-core/src/tiered-limit.ts

View check run for this annotation

Codecov / codecov/patch

packages/datastore-core/src/tiered-limit.ts#L80-L81

Added lines #L80 - L81 were not covered by tests
log.trace('Evicting %d bytes for key "%s"', size, keyToEvict.toString())
const value = await this.primaryStore.get(keyToEvict) // Get value to evict
await this.backingStore.put(keyToEvict, value) // Ensure it's saved in the backing store
await this.primaryStore.delete(keyToEvict) // Delete from primary store
this.sizeMap.delete(keyToEvict) // Remove size tracking for this key
this.currentBytes -= size // Update current used bytes
}
log.trace('Eviction complete')
}
}

async handleSizeForPut (key: Key, value: Uint8Array): Promise<void> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be private

const size = value.byteLength
if (size > this.maxBytes) {
throw new Error(`Item size ${size} exceeds maxBytes limit of ${this.maxBytes}`)
}
const existingSize = this.sizeMap.get(key) ?? 0 // existing size is 0 if not found
const sizeDelta = size - existingSize // if already in the primary store, this will be 0

await this.evictSpace(sizeDelta) // Evict if needed before adding new item

this.sizeMap.set(key, size) // Update size tracking
this.updateSize(key, sizeDelta)
}

async put (key: Key, value: Uint8Array, options?: AbortOptions): Promise<Key> {
log.trace('Putting %s', key.toString())
try {
await this.handleSizeForPut(key, value)
} catch (err: any) {
log.error('Error putting %s to primary store: %s', key.toString(), err)
log.trace('Putting %s to backing store', key.toString())
await this.backingStore.put(key, value, options)
return key
}
log.trace('Putting %s to primary store', key.toString())
await this.primaryStore.put(key, value, options)

// Write to backingstore happens upon eviction
return key
}

async get (key: Key, options?: AbortOptions): Promise<Uint8Array> {
if (await this.primaryStore.has(key)) {
log.trace('Getting %s from primary store', key.toString())
return this.primaryStore.get(key, options)
}
SgtPooki marked this conversation as resolved.
Show resolved Hide resolved

log.trace('Getting %s from backing store', key.toString())
const value = await this.backingStore.get(key, options)
// TODO: Do we always want to put the value back into the primary store? It could be a config option.
await this.put(key, value, options)
return value
}

async has (key: Key, options?: AbortOptions): Promise<boolean> {
if (await this.primaryStore.has(key, options)) {
return true
}
return this.backingStore.has(key, options)
}

private async deleteFromPrimaryStore (key: Key, options?: any): Promise<void> {
if (await this.primaryStore.has(key, options)) {
const size = this.sizeMap.get(key)
if (size != null) {
this.updateSize(key, -size) // Update size tracking
this.sizeMap.delete(key) // Remove size tracking
}
await this.primaryStore.delete(key, options)
}
}

async delete (key: Key, options?: AbortOptions): Promise<void> {
log.trace('Deleting %s', key.toString())
await this.deleteFromPrimaryStore(key, options)
await this.backingStore.delete(key, options)
}

async * _allKeys (q: KeyQuery, options?: AbortOptions): AwaitIterable<Key> {
// TODO: How to handle stores that don't implement _allKeys? Do we want to?
yield * this.primaryStore._allKeys(q, options)
yield * this.backingStore._allKeys(q, options)
}

async * _all (q: Query, options?: AbortOptions): AwaitIterable<Pair> {
// TODO: How to handle stores that don't implement _all? Do we want to?
yield * this.primaryStore._all(q, options)
yield * this.backingStore._all(q, options)
}
}
129 changes: 129 additions & 0 deletions packages/datastore-core/test/tiered-limit.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/* eslint-env mocha */

import { expect } from 'aegir/chai'
import { Key } from 'interface-datastore/key'
import { interfaceDatastoreTests } from 'interface-datastore-tests'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { MemoryDatastore } from '../src/memory.js'
import { TieredLimitDatastore } from '../src/tiered-limit.js'
import type { BaseDatastore } from '../src/base.js'

/**
* @typedef {import('interface-datastore').Datastore} Datastore
*/

describe('TieredLimit', () => {
describe('all stores', () => {
const ms: BaseDatastore[] = []
let store: TieredLimitDatastore<BaseDatastore, BaseDatastore>
beforeEach(() => {
ms.push(new MemoryDatastore())
ms.push(new MemoryDatastore())
store = new TieredLimitDatastore({
maxBytes: 50, // 50 bytes limit for testing purposes.
store: ms[0]
}, ms[1])
})

it('put', async () => {
const k = new Key('hello')
const v = uint8ArrayFromString('world')
await store.put(k, v)
await expect(store.has(k)).to.eventually.be.true()
expect(ms[0].get(k)).to.be.eql(v)
expect(() => ms[1].get(k) as Uint8Array).to.throw('Not Found')
})

it('put - first item over limit', async () => {
const k = new Key('hello-too-big')
const v = uint8ArrayFromString('abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz') // 52 bytes out of 50
expect(v.byteLength).to.be.eql(52)
await store.put(k, v)
expect(ms[0].has(k)).to.be.false()
expect(ms[1].has(k)).to.be.true()
await expect(store.has(k)).to.eventually.be.true()
expect(() => ms[0].get(k) as Uint8Array).to.throw('Not Found')
expect(ms[1].get(k)).to.be.eql(v)
})

it('put - second item over limit', async () => {
const k = new Key('hello')
const v = uint8ArrayFromString('abcdefghijklmnopqrstuvwxyz') // 26 bytes out of 50
const k2 = new Key('hello2')
await store.put(k, v)
await store.put(k2, v)
expect(ms[0].has(k)).to.be.false() // evicted to backing store
expect(ms[1].has(k)).to.be.true()
expect(ms[0].has(k2)).to.be.true()
expect(ms[1].has(k2)).to.be.false()
await expect(store.has(k)).to.eventually.be.true()
await expect(store.has(k2)).to.eventually.be.true()
expect(ms[0].get(k2)).to.be.eql(v)
expect(ms[1].get(k)).to.be.eql(v)
})

it('get - over limit', async () => {
const k = new Key('hello-get-over-limit')
const v = uint8ArrayFromString('abcdefghijklmnopqrstuvwxyz') // 26 bytes out of 50
const k2 = new Key('hello-get-over-limit-2')
await store.put(k, v)
await store.put(k2, v)
expect(ms[0].has(k)).to.be.false() // evicted to backing store
expect(ms[0].has(k2)).to.be.true()
expect(ms[1].has(k)).to.be.true()
expect(ms[1].has(k2)).to.be.false()
await expect(store.has(k)).to.eventually.be.true()
await expect(store.has(k2)).to.eventually.be.true()
expect(ms[0].get(k2)).to.be.eql(v)
expect(ms[1].get(k)).to.be.eql(v)
const gotVal = await store.get(k) // should move from backing store to primary store, and evict k2
expect(gotVal).to.be.eql(v)
expect(ms[0].has(k)).to.be.true()
expect(ms[0].has(k2)).to.be.false() // no longer in primary store
// backing store has both now
expect(ms[1].has(k)).to.be.true()
expect(ms[1].has(k2)).to.be.true()

expect(ms[0].get(k)).to.be.eql(v)
expect(ms[1].get(k)).to.be.eql(v)
expect(ms[1].get(k2)).to.be.eql(v)
})

it('get and has, where available', async () => {
const k = new Key('hello')
const v = uint8ArrayFromString('world')
await ms[1].put(k, v)
const val = await store.get(k)
expect(val).to.be.eql(v)
const exists = await store.has(k)
expect(exists).to.be.eql(true)
})

it('has - key not found', async () => {
expect(await store.has(new Key('hello1'))).to.be.eql(false)
})

it('has and delete', async () => {
const k = new Key('hello')
const v = uint8ArrayFromString('world')
await store.put(k, v)
let res = await Promise.all([ms[0].has(k), ms[1].has(k)])
expect(res).to.be.eql([true, true])
await store.delete(k)
res = await Promise.all([ms[0].has(k), ms[1].has(k)])
expect(res).to.be.eql([false, false])
})
})

describe('inteface-datastore-tiered-limit', () => {
interfaceDatastoreTests({
setup () {
return new TieredLimitDatastore({
maxBytes: 50, // 50 bytes limit
store: new MemoryDatastore()
}, new MemoryDatastore())
},
teardown () { }
})
})
})
Loading