Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write transaction #468

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions lib/dynamoid/adapter_plugin/aws_sdk_v3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,27 @@ def batch_write_item(table_name, objects, options = {})
end
end

def transact_write_items(table_name, models, _options = {})
out_put = process(table_name, models)
begin
response = client.transact_write_items({

transact_items: out_put,
return_consumed_capacity: 'TOTAL',
return_item_collection_metrics: 'SIZE'
})
rescue StandardError => e
puts e
end
end

def process(table_name, objects)
objects.map do |m|
key, attrib = m.flatten
{ key => { item: sanitize_item(attrib), table_name: table_name } }
end
end

# Get many items at once from DynamoDB. More efficient than getting each item individually.
#
# If optional block is passed `nil` will be returned and the block will be called for each read batch of items,
Expand Down
13 changes: 13 additions & 0 deletions lib/dynamoid/persistence.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require 'dynamoid/persistence/upsert'
require 'dynamoid/persistence/save'
require 'dynamoid/persistence/update_validations'
require 'dynamoid/persistence/transact'

# encoding: utf-8
module Dynamoid
Expand Down Expand Up @@ -159,6 +160,18 @@ def import(array_of_attributes)
Import.call(self, array_of_attributes)
end

# perfom multiple atomic operations synchronously
#
# similar to +import+ transact is a low-level method and won't have
# mechanisms like callback and validation
#
# users = User.transact({condition_check: {}, put: {}, delete: {}, update: {}})
# @param array_of_attributes [Array<Hash>]
# @return [Array] Created models
def transact(list_of_operations)
Transact.call(self, list_of_operations)
end

# Create a model.
#
# Initializes a new model and immediately saves it to DynamoDB.
Expand Down
57 changes: 57 additions & 0 deletions lib/dynamoid/persistence/transact.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# frozen_string_literal: true

require 'securerandom'
module Dynamoid
module Persistence
# @private
class Transact
def self.call(model_class, set_of_operations)
new(model_class, set_of_operations).call
end

def initialize(model_class, set_of_operations)
@model_class = model_class
@set_of_operations = set_of_operations
end

def call
models = @set_of_operations.map(&method(:build_model))
transact(models)
end

def build_model(attributes)
# attrs = attributes.symbolize_keys

type, attrib = attributes.flatten

attrs = attrib.symbolize_keys

operations = {}

if @model_class.timestamps_enabled?
time_now = DateTime.now.in_time_zone(Time.zone)
attrs[:created_at] ||= time_now
attrs[:updated_at] ||= time_now
end

@model_class.build(attrs).tap do |model|
model.hash_key = SecureRandom.uuid if model.hash_key.blank?
operations[type] = model
end

operations
end

def transact(models)
Dynamoid.adapter.transact_write_items(@model_class.table_name, array_of_dumped_attributes(models))
end

def array_of_dumped_attributes(models)
models.map do |m|
key, model = m.flatten
{ key => Dumping.dump_attributes(model.attributes, @model_class.attributes) }
end
end
end
end
end
18 changes: 16 additions & 2 deletions spec/dynamoid/persistence_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1306,7 +1306,7 @@ def log_message
obj = document_class.create(title: 'New Document')

expect {
document_class.update_fields(obj.id, { title: 'New title', publisher: 'New publisher' } )
document_class.update_fields(obj.id, { title: 'New title', publisher: 'New publisher' })
}.to raise_error Dynamoid::Errors::UnknownAttribute
end
end
Expand Down Expand Up @@ -1491,7 +1491,7 @@ def log_message
obj = document_class.create(title: 'New Document')

expect {
document_class.upsert(obj.id, { title: 'New title', publisher: 'New publisher' } )
document_class.upsert(obj.id, { title: 'New title', publisher: 'New publisher' })
}.to raise_error Dynamoid::Errors::UnknownAttribute
end
end
Expand Down Expand Up @@ -2594,6 +2594,20 @@ def log_message
end
end

describe '.transact' do
before do
Address.create_table
end

it 'process transaction' do
expect do
Address.transact([{
put: { city: 'Chicago' }
}, { put: { city: 'New York' } }])
end.to change { Address.count }.by(2)
end
end

describe '.import' do
before do
Address.create_table
Expand Down