-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add tiered-limit datastore #301
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,206 @@ | ||
import { logger } from '@libp2p/logger' | ||
import { type Key, type KeyQuery, type Pair, type Query } from 'interface-datastore' | ||
import { BaseDatastore } from './base.js' | ||
import type { AbortOptions, AwaitIterable } from 'interface-store' | ||
|
||
/** | ||
* @example for memory store limited to 1MB, where extra data is dropped | ||
* | ||
* ```typescript | ||
* import { MemoryDatastore } from 'datastore-core' | ||
* import { TieredLimitDatastore } from 'datastore-core' | ||
* import { BlackHoleDatastore } from 'datastore-core' | ||
* | ||
* const tieredLimitDatastore = new TieredLimitDatastore({ | ||
* maxBytes: 1024 * 1024, // 1MB limit | ||
* store: new MemoryDatastore() | ||
* }, new BlackHoleDatastore()) | ||
* ``` | ||
*/ | ||
|
||
const log = logger('datastore:core:tiered-limit') | ||
|
||
export class TieredLimitDatastore<T extends BaseDatastore, T2 extends BaseDatastore> extends BaseDatastore { | ||
private readonly primaryStore: T | ||
private readonly backingStore: T2 | ||
private readonly maxBytes: number | ||
private currentBytes: number = 0 | ||
/** | ||
* Tracks sizes of items | ||
* | ||
* Note: this map is not taken into account when considering the maxBytes limit | ||
*/ | ||
private readonly sizeMap = new Map<Key, number>() | ||
/** | ||
* Tracks order for eviction | ||
* keys are added to the end of the array when added or updated | ||
* keys are removed from the start of the array when evicted | ||
* Note: size of keys is not tracked, so if you have large keys, you should | ||
* increase the maxBytes limit accordingly | ||
*/ | ||
private readonly evictionOrder: Key[] = [] | ||
|
||
constructor ({ maxBytes, store }: { maxBytes: number, store: T }, backingStore: T2) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm not completely happy with these constructor args either... |
||
super() | ||
this.primaryStore = store | ||
this.backingStore = backingStore | ||
this.maxBytes = maxBytes | ||
} | ||
|
||
private updateSize (key: Key, sizeDelta: number): void { | ||
this.currentBytes += sizeDelta | ||
if (sizeDelta > 0) { | ||
// If adding or updating size, push key to eviction order | ||
this.evictionOrder.push(key) | ||
} else { | ||
// If reducing size, find and remove the key from eviction order | ||
const index = this.evictionOrder.indexOf(key) | ||
if (index !== -1) this.evictionOrder.splice(index, 1) | ||
} | ||
} | ||
|
||
/** | ||
* Evict items from primary store to backing store until required space is available | ||
*/ | ||
private async evictSpace (requiredSpace: number): Promise<void> { | ||
if (requiredSpace <= 0) { | ||
return // No need to evict negative space | ||
} | ||
if (this.currentBytes + requiredSpace > this.maxBytes && this.evictionOrder.length > 0) { | ||
log.trace('Evicting %d bytes from primary store to backing store', requiredSpace) | ||
while (this.currentBytes + requiredSpace > this.maxBytes && this.evictionOrder.length > 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could probably be smarter here and use |
||
const keyToEvict = this.evictionOrder.shift() // Get the oldest key | ||
if (keyToEvict == null) { | ||
// this shouldn't happen, but if it does: | ||
// TODO: do we want to just add to the backingStore if we can't evict? | ||
throw new Error('Need to evict but nothing else to evict. Is the item you are trying to add too large?') | ||
} | ||
const size = this.sizeMap.get(keyToEvict) | ||
if (size == null) { | ||
throw new Error('Key to evict not found in size map. This should not happen.') | ||
} | ||
log.trace('Evicting %d bytes for key "%s"', size, keyToEvict.toString()) | ||
const value = await this.primaryStore.get(keyToEvict) // Get value to evict | ||
await this.backingStore.put(keyToEvict, value) // Ensure it's saved in the backing store | ||
await this.primaryStore.delete(keyToEvict) // Delete from primary store | ||
this.sizeMap.delete(keyToEvict) // Remove size tracking for this key | ||
this.currentBytes -= size // Update current used bytes | ||
} | ||
log.trace('Eviction complete') | ||
} | ||
} | ||
|
||
async handleSizeForPut (key: Key, value: Uint8Array): Promise<void> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be private |
||
const size = value.byteLength | ||
if (size > this.maxBytes) { | ||
throw new Error(`Item size ${size} exceeds maxBytes limit of ${this.maxBytes}`) | ||
} | ||
const existingSize = this.sizeMap.get(key) ?? 0 // existing size is 0 if not found | ||
const sizeDelta = size - existingSize // if already in the primary store, this will be 0 | ||
|
||
await this.evictSpace(sizeDelta) // Evict if needed before adding new item | ||
|
||
this.sizeMap.set(key, size) // Update size tracking | ||
this.updateSize(key, sizeDelta) | ||
} | ||
|
||
async put (key: Key, value: Uint8Array, options?: AbortOptions): Promise<Key> { | ||
log.trace('Putting %s', key.toString()) | ||
try { | ||
await this.handleSizeForPut(key, value) | ||
} catch (err: any) { | ||
log.error('Error putting %s to primary store: %s', key.toString(), err) | ||
log.trace('Putting %s to backing store', key.toString()) | ||
await this.backingStore.put(key, value, options) | ||
return key | ||
} | ||
log.trace('Putting %s to primary store', key.toString()) | ||
await this.primaryStore.put(key, value, options) | ||
|
||
// Write to backingstore happens upon eviction | ||
return key | ||
} | ||
|
||
/** | ||
* If the key is in the primary store, remove it and add it to the back of the eviction order | ||
*/ | ||
private refreshKeyEvictionOrder (key: Key): void { | ||
const index = this.evictionOrder.indexOf(key) | ||
if (index !== -1) { | ||
this.evictionOrder.splice(index, 1) // Remove from eviction order | ||
this.evictionOrder.push(key) // Add to end of eviction order | ||
} | ||
} | ||
|
||
async get (key: Key, options?: AbortOptions): Promise<Uint8Array> { | ||
if (await this.primaryStore.has(key)) { | ||
log.trace('Getting %s from primary store', key.toString()) | ||
this.refreshKeyEvictionOrder(key) | ||
return this.primaryStore.get(key, options) | ||
} | ||
|
||
log.trace('Getting %s from backing store', key.toString()) | ||
const value = await this.backingStore.get(key, options) | ||
// TODO: Do we always want to put the value back into the primary store? It could be a config option. | ||
await this.put(key, value, options) | ||
return value | ||
} | ||
|
||
async has (key: Key, options?: AbortOptions): Promise<boolean> { | ||
if (await this.primaryStore.has(key, options)) { | ||
return true | ||
} | ||
return this.backingStore.has(key, options) | ||
} | ||
|
||
private async deleteFromPrimaryStore (key: Key, options?: any): Promise<void> { | ||
if (await this.primaryStore.has(key, options)) { | ||
const size = this.sizeMap.get(key) | ||
if (size != null) { | ||
this.updateSize(key, -size) // Update size tracking | ||
this.sizeMap.delete(key) // Remove size tracking | ||
} | ||
await this.primaryStore.delete(key, options) | ||
} | ||
} | ||
|
||
async delete (key: Key, options?: AbortOptions): Promise<void> { | ||
log.trace('Deleting %s', key.toString()) | ||
await this.deleteFromPrimaryStore(key, options) | ||
await this.backingStore.delete(key, options) | ||
} | ||
|
||
async * _allKeys (q: KeyQuery, options?: AbortOptions): AwaitIterable<Key> { | ||
// TODO: How to handle stores that don't implement _allKeys? Do we want to? | ||
const seenKeys = new Set<Key>() | ||
|
||
for await (const key of this.primaryStore._allKeys(q, options)) { | ||
seenKeys.add(key) | ||
yield key | ||
} | ||
|
||
// yield keys from the backing store, excluding duplicates | ||
for await (const key of this.backingStore._allKeys(q, options)) { | ||
if (!seenKeys.has(key)) { | ||
yield key | ||
} | ||
} | ||
} | ||
|
||
async * _all (q: Query, options?: AbortOptions): AwaitIterable<Pair> { | ||
// TODO: How to handle stores that don't implement _all? Do we want to? | ||
const seenKeys = new Set<Key>() | ||
|
||
for await (const pair of this.primaryStore._all(q, options)) { | ||
seenKeys.add(pair.key) | ||
yield pair | ||
} | ||
|
||
// yield pairs from the backing store, excluding duplicates | ||
for await (const pair of this.backingStore._all(q, options)) { | ||
if (!seenKeys.has(pair.key)) { | ||
yield pair | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* eslint-env mocha */ | ||
|
||
import { expect } from 'aegir/chai' | ||
import { Key } from 'interface-datastore/key' | ||
import { interfaceDatastoreTests } from 'interface-datastore-tests' | ||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' | ||
import { MemoryDatastore } from '../src/memory.js' | ||
import { TieredLimitDatastore } from '../src/tiered-limit.js' | ||
import type { BaseDatastore } from '../src/base.js' | ||
|
||
/** | ||
* @typedef {import('interface-datastore').Datastore} Datastore | ||
*/ | ||
|
||
describe('TieredLimit', () => { | ||
describe('all stores', () => { | ||
const ms: BaseDatastore[] = [] | ||
let store: TieredLimitDatastore<BaseDatastore, BaseDatastore> | ||
beforeEach(() => { | ||
ms.push(new MemoryDatastore()) | ||
ms.push(new MemoryDatastore()) | ||
store = new TieredLimitDatastore({ | ||
maxBytes: 50, // 50 bytes limit for testing purposes. | ||
store: ms[0] | ||
}, ms[1]) | ||
}) | ||
|
||
it('put', async () => { | ||
const k = new Key('hello') | ||
const v = uint8ArrayFromString('world') | ||
await store.put(k, v) | ||
await expect(store.has(k)).to.eventually.be.true() | ||
expect(ms[0].get(k)).to.be.eql(v) | ||
expect(() => ms[1].get(k) as Uint8Array).to.throw('Not Found') | ||
}) | ||
|
||
it('put - first item over limit', async () => { | ||
const k = new Key('hello-too-big') | ||
const v = uint8ArrayFromString('abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz') // 52 bytes out of 50 | ||
expect(v.byteLength).to.be.eql(52) | ||
await store.put(k, v) | ||
expect(ms[0].has(k)).to.be.false() | ||
expect(ms[1].has(k)).to.be.true() | ||
await expect(store.has(k)).to.eventually.be.true() | ||
expect(() => ms[0].get(k) as Uint8Array).to.throw('Not Found') | ||
expect(ms[1].get(k)).to.be.eql(v) | ||
}) | ||
|
||
it('put - second item over limit', async () => { | ||
const k = new Key('hello') | ||
const v = uint8ArrayFromString('abcdefghijklmnopqrstuvwxyz') // 26 bytes out of 50 | ||
const k2 = new Key('hello2') | ||
await store.put(k, v) | ||
await store.put(k2, v) | ||
expect(ms[0].has(k)).to.be.false() // evicted to backing store | ||
expect(ms[1].has(k)).to.be.true() | ||
expect(ms[0].has(k2)).to.be.true() | ||
expect(ms[1].has(k2)).to.be.false() | ||
await expect(store.has(k)).to.eventually.be.true() | ||
await expect(store.has(k2)).to.eventually.be.true() | ||
expect(ms[0].get(k2)).to.be.eql(v) | ||
expect(ms[1].get(k)).to.be.eql(v) | ||
}) | ||
|
||
it('get - over limit', async () => { | ||
const k = new Key('hello-get-over-limit') | ||
const v = uint8ArrayFromString('abcdefghijklmnopqrstuvwxyz') // 26 bytes out of 50 | ||
const k2 = new Key('hello-get-over-limit-2') | ||
await store.put(k, v) | ||
await store.put(k2, v) | ||
expect(ms[0].has(k)).to.be.false() // evicted to backing store | ||
expect(ms[0].has(k2)).to.be.true() | ||
expect(ms[1].has(k)).to.be.true() | ||
expect(ms[1].has(k2)).to.be.false() | ||
await expect(store.has(k)).to.eventually.be.true() | ||
await expect(store.has(k2)).to.eventually.be.true() | ||
expect(ms[0].get(k2)).to.be.eql(v) | ||
expect(ms[1].get(k)).to.be.eql(v) | ||
const gotVal = await store.get(k) // should move from backing store to primary store, and evict k2 | ||
expect(gotVal).to.be.eql(v) | ||
expect(ms[0].has(k)).to.be.true() | ||
expect(ms[0].has(k2)).to.be.false() // no longer in primary store | ||
// backing store has both now | ||
expect(ms[1].has(k)).to.be.true() | ||
expect(ms[1].has(k2)).to.be.true() | ||
|
||
expect(ms[0].get(k)).to.be.eql(v) | ||
expect(ms[1].get(k)).to.be.eql(v) | ||
expect(ms[1].get(k2)).to.be.eql(v) | ||
}) | ||
|
||
it('get and has, where available', async () => { | ||
const k = new Key('hello') | ||
const v = uint8ArrayFromString('world') | ||
await ms[1].put(k, v) | ||
const val = await store.get(k) | ||
expect(val).to.be.eql(v) | ||
const exists = await store.has(k) | ||
expect(exists).to.be.eql(true) | ||
}) | ||
|
||
it('has - key not found', async () => { | ||
expect(await store.has(new Key('hello1'))).to.be.eql(false) | ||
}) | ||
|
||
it('has and delete', async () => { | ||
const k = new Key('hello') | ||
const v = uint8ArrayFromString('world') | ||
await store.put(k, v) | ||
let res = await Promise.all([ms[0].has(k), ms[1].has(k)]) | ||
expect(res).to.be.eql([true, true]) | ||
await store.delete(k) | ||
res = await Promise.all([ms[0].has(k), ms[1].has(k)]) | ||
expect(res).to.be.eql([false, false]) | ||
}) | ||
}) | ||
|
||
describe('inteface-datastore-tiered-limit', () => { | ||
interfaceDatastoreTests({ | ||
setup () { | ||
return new TieredLimitDatastore({ | ||
maxBytes: 50, // 50 bytes limit | ||
store: new MemoryDatastore() | ||
}, new MemoryDatastore()) | ||
}, | ||
teardown () { } | ||
}) | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.