-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #40 from hexlabsio/feature/transactional-gets
Transactional Gets
- Loading branch information
Showing
5 changed files
with
255 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
import { TransactGetItemsCommandInput, TransactGetItemsInput } from '@aws-sdk/client-dynamodb'; | ||
import { ReturnConsumedCapacity } from '@aws-sdk/client-dynamodb/dist-types/models'; | ||
import { | ||
ConsumedCapacity, | ||
KeysAndAttributes, | ||
} from '@aws-sdk/client-dynamodb/dist-types/models/models_0'; | ||
import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'; | ||
import { AttributeBuilder } from './attribute-builder'; | ||
import { Projection, ProjectionHandler } from './projector'; | ||
import { TableDefinition } from './table-builder/table-definition'; | ||
import { CamelCaseKeys, DynamoConfig } from './types'; | ||
|
||
export type TypeOrProjection<T, PROJECTION> = (PROJECTION extends null ? T : PROJECTION) | undefined; | ||
|
||
export type TransactGetItemOptions< | ||
TableType, | ||
PROJECTION, | ||
> = CamelCaseKeys< | ||
Pick<KeysAndAttributes, 'ConsistentRead'> & | ||
Pick<TransactGetItemsInput, 'ReturnConsumedCapacity'> | ||
> & { | ||
projection?: Projection<TableType, PROJECTION>; | ||
}; | ||
|
||
export interface TransactGetExecutor<TableTypes extends any[]> { | ||
input: TransactGetItemsCommandInput; | ||
execute(options?: { returnConsumedCapacity?: ReturnConsumedCapacity }): Promise<{ | ||
items: TableTypes; | ||
consumedCapacity?: ConsumedCapacity[]; | ||
}>; | ||
and<B extends any[]>( | ||
other: TransactGetExecutor<B>, | ||
): TransactGetClient<[...TableTypes, ...B]>; | ||
} | ||
|
||
export class TransactGetExecutorHolder<TableTypes extends any[]> | ||
implements TransactGetExecutor<TableTypes> | ||
{ | ||
constructor( | ||
private readonly client: DynamoDBDocument, | ||
public readonly input: TransactGetItemsCommandInput, | ||
) {} | ||
|
||
/** | ||
* Execute the transact get request and get the results. | ||
*/ | ||
async execute(options: { returnConsumedCapacity?: ReturnConsumedCapacity } = {}): Promise<{ | ||
items: TableTypes; | ||
consumedCapacity?: ConsumedCapacity[]; | ||
}> { | ||
return await new TransactGetClient<TableTypes>(this.client, [this]).execute(options); | ||
} | ||
|
||
/** | ||
* Append another set of requests to apply alongside these requests. | ||
* @param other | ||
*/ | ||
and<B extends any[]>( | ||
other: TransactGetExecutor<B>, | ||
): TransactGetClient<[...TableTypes, ...B]> { | ||
return new TransactGetClient(this.client, [this, other]); | ||
} | ||
} | ||
|
||
export class TransactGetClient<TableTypes extends any[]> { | ||
public readonly input: TransactGetItemsCommandInput; | ||
|
||
constructor( | ||
private readonly client: DynamoDBDocument, | ||
private readonly executors: TransactGetExecutor<any>[], | ||
) { | ||
const TransactItems = this.executors.flatMap(it => it.input.TransactItems ?? []) | ||
this.input = { | ||
TransactItems, | ||
}; | ||
} | ||
|
||
and<B extends any[]>( | ||
other: TransactGetExecutor<B>, | ||
): TransactGetClient<[...TableTypes, ...B]> { | ||
return new TransactGetClient<[...TableTypes, ...B]>(this.client, [ | ||
...this.executors, | ||
other, | ||
]); | ||
} | ||
|
||
async execute( | ||
options: { returnConsumedCapacity?: ReturnConsumedCapacity } = {} | ||
): Promise<{ | ||
items: TableTypes; | ||
consumedCapacity?: ConsumedCapacity[]; | ||
}> { | ||
let result = await this.client.transactGet({ | ||
TransactItems: this.input.TransactItems, | ||
ReturnConsumedCapacity: options.returnConsumedCapacity | ||
}); | ||
return { | ||
items: (result.Responses ?? []).map(it => it.Item) as any, | ||
consumedCapacity: result.ConsumedCapacity, | ||
} | ||
} | ||
} | ||
|
||
export type ReturnTypesFor<K extends any[], T> = K extends [any] ? [T] : K extends [any, ...(infer B)] ? [T, ...ReturnTypesFor<B, T>] : T[] | ||
|
||
export class DynamoTransactGetter<TableConfig extends TableDefinition> { | ||
constructor( | ||
private readonly clientConfig: DynamoConfig, | ||
) {} | ||
|
||
get<const K extends TableConfig['keys'][], PROJECTION = null>( | ||
keys: K, | ||
options: TransactGetItemOptions<TableConfig['type'], PROJECTION> = {}, | ||
): TransactGetExecutor<ReturnTypesFor<K, TypeOrProjection<TableConfig['type'], PROJECTION>>> { | ||
const attributeBuilder = AttributeBuilder.create(); | ||
const expression = options.projection && ProjectionHandler.projectionExpressionFor( | ||
attributeBuilder, | ||
options.projection, | ||
); | ||
const input: TransactGetItemsCommandInput = { | ||
TransactItems: keys.map(key => ({ | ||
Get: { | ||
TableName: this.clientConfig.tableName, | ||
Key: key, | ||
...(options.projection ? {ProjectionExpression: expression} : {}), | ||
...attributeBuilder.asInput(), | ||
} | ||
})) | ||
} | ||
const client = this.clientConfig.client; | ||
return new TransactGetExecutorHolder(client, input); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
import { DynamoDB } from '@aws-sdk/client-dynamodb'; | ||
import { DynamoDBDocument } from '@aws-sdk/lib-dynamodb'; | ||
import { TableClient } from '../src'; | ||
import { | ||
SimpleTable, | ||
SimpleTable2, | ||
simpleTableDefinition, | ||
simpleTableDefinition2, | ||
} from './tables'; | ||
|
||
const dynamo = new DynamoDB({ | ||
endpoint: { hostname: 'localhost', port: 5001, protocol: 'http:', path: '/' }, | ||
region: 'local-env', | ||
credentials: { accessKeyId: 'x', secretAccessKey: 'x' }, | ||
}); | ||
const dynamoClient = DynamoDBDocument.from(dynamo); | ||
|
||
const testTable = new TableClient(simpleTableDefinition, { | ||
tableName: 'simpleTableDefinitionBatch', | ||
client: dynamoClient, | ||
logStatements: true, | ||
}); | ||
|
||
const testTable2 = new TableClient(simpleTableDefinition2, { | ||
tableName: 'simpleTableDefinitionBatch2', | ||
client: dynamoClient, | ||
logStatements: true, | ||
}); | ||
|
||
const preInserts: SimpleTable[] = new Array(1000).fill(0).map((a, index) => ({ | ||
identifier: index.toString(), | ||
sort: index.toString(), | ||
})); | ||
|
||
const preInserts2: SimpleTable2[] = new Array(1000).fill(0).map((a, index) => ({ | ||
identifier: (10000 + index).toString(), | ||
sort: index.toString(), | ||
text: 'test', | ||
})); | ||
|
||
describe('Transact Getter', () => { | ||
const TableName = 'simpleTableDefinitionBatch'; | ||
const TableName2 = 'simpleTableDefinitionBatch2'; | ||
|
||
beforeAll(async () => { | ||
await Promise.all( | ||
preInserts.map((Item) => dynamoClient.put({TableName, Item})), | ||
); | ||
await Promise.all( | ||
preInserts2.map((Item) => | ||
dynamoClient.put({TableName: TableName2, Item}), | ||
), | ||
); | ||
}, 20000); | ||
|
||
describe('Single Table', () => { | ||
|
||
it('should get single table', async () => { | ||
const executor = | ||
testTable.transaction.get([{identifier: '0'}]) | ||
.and( | ||
testTable.transaction.get([{identifier: '3'}]) | ||
) | ||
.and( | ||
testTable.transaction.get([{identifier: '4'}]) | ||
); | ||
console.log(JSON.stringify(executor.input, null, 2)); | ||
const result = await executor.execute(); | ||
expect(result.items).toEqual([ | ||
{identifier: '0', sort: '0'}, | ||
{identifier: '3', sort: '3'}, | ||
{identifier: '4', sort: '4'}, | ||
]); | ||
}); | ||
|
||
it('should get multi table', async () => { | ||
const executor = | ||
testTable.transaction.get([{identifier: '0'}]) | ||
.and( | ||
testTable2.transaction.get([{identifier: '10000', sort: '0'}]) | ||
) | ||
console.log(JSON.stringify(executor.input, null, 2)); | ||
const result = await executor.execute(); | ||
expect(result.items).toEqual([ | ||
{identifier: '0', sort: '0'}, | ||
{identifier: '10000', sort: '0', text: 'test'}, | ||
]); | ||
}); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters