Skip to content
This repository has been archived by the owner on Jul 30, 2019. It is now read-only.

Commit

Permalink
Merge pull request #4 from pressednet/refactor/fetcher
Browse files Browse the repository at this point in the history
refactor fetcher to find faster an object revision
  • Loading branch information
boone authored Feb 7, 2017
2 parents f9a5039 + 9f9fea2 commit cf00adb
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 75 deletions.
2 changes: 1 addition & 1 deletion lib/outatime/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def initialize(options)
def run
fetcher = Outatime::Fetcher.new(options)

pb = ProgressBar.create(total: fetcher.total_size,
pb = ProgressBar.create(total: nil,
format: "%t: |%B| %f %c/%C %R MB/sec",
rate_scale: lambda { |rate| rate / 1024 / 1024 },
throttle_rate: 0.5)
Expand Down
142 changes: 87 additions & 55 deletions lib/outatime/fetcher.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,45 @@
require 'thread/pool'

# Outatime::Fetcher is responsible for finding the exact revision for each file
# for a given time.
#
# AWS S3 API lists all file revisions in a very particular order and this
# class takes advantage of that to quickly parse and find the revision.
#
# The 'GET Bucket Object Versions'
# (http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETVersion.html)
# returns all file revisions ordered by key (path + filename) and last
# modified time.
#
# For example:
#
# id: 12456, key: "src/file1", last_modified: 7 Feb 11:38
# id: 12357, key: "src/file1", last_modified: 7 Feb 11:37
# id: 12357, key: "src/file1", last_modified: 7 Feb 11:00
# id: 22222, key: "src/file2", last_modified: 7 Feb 11:39
# ...
#
# Keep in mind that this response is paginated, where the max amount of
# revisions per page is 1000.
#
# When a versioned bucket contains a huge amount of files and revisions,
# fetching all data may take a long time. So Outatime::Fetcher starts
# downloading the correct revision even when not all file revisions are
# already fetched, i.e. when it is still downloading the revisions and their
# data. That accelerates the process.
#
# So how do we know when we have all information we need before start downloading
# a specific file revision? As the response is ordered by key (filename) and
# timestamps, fetcher can start downloading if the actual response page
# contains all possible revisions for a file, i.e. its revisions aren't
# paginated.
#
# When the response for a given key (filename) is paginated, the file is not
# downloaded until all revisions are fetched in the next response page.
#
# This algorithm removes the need to fetch all file revisions (which may take
# several requests) for a versioned bucket before starting to download its files,
# but acts on each individual file instead.
module Outatime
class Fetcher
attr_accessor :options
Expand All @@ -15,10 +57,10 @@ class Fetcher
#
def initialize(options = {})
@options = options
@files_mutex = Mutex.new
@fetch_block_mutex = Mutex.new
@s3_client = options[:s3_client] if options[:s3_client]
@from = ::Chronic.parse(@options[:from]) if @options[:from]
@from = ::Chronic.parse(@options[:from]) if @options[:from]
@pool = Thread.pool(@options.fetch(:threads, 20))

# raise if the date/time was not parsed
raise ArgumentError, "The from time was not parseable." if @from.nil?
Expand All @@ -31,7 +73,11 @@ def initialize(options = {})
#
# Returns nothing.
def fetch!(&block)
fetch_objects(object_versions, &block)
object_versions do |object_version|
fetch_object(object_version, &block)
end

@pool.wait(:done)
end

# Public: Returns the objects total size.
Expand All @@ -45,34 +91,29 @@ def total_size
#
# Returns an Array of Aws::S3::Types::ObjectVersion.
def object_versions
puts "fetching object versions from #{@from}" if verbose?
@files ||= begin
versions = []
delete_markers = []

s3_client.list_object_versions(bucket: @options[:bucket],
prefix: @options[:prefix]).each do |response|

versions.concat(filter_future_items(response.versions))
delete_markers.concat(filter_future_items(response.delete_markers))
end

# keep only the latest versions
# AWS lists the latest versions first, so it should be OK to use uniq here.
versions.uniq! { |obj| obj.key }
delete_markers.uniq! { |obj| obj.key }

delete_marker_keys = delete_markers.map { |dm| dm.key }

# check versions to see if we have newer delete_markers
# if so, delete those versions
versions.delete_if do |version|
if dm_index = delete_marker_keys.index(version.key)
if version.last_modified <= delete_markers[dm_index].last_modified
true
remaining_versions = []
remaining_delete_markers = []

s3_client.list_object_versions(bucket: @options[:bucket],
prefix: @options[:prefix]).each do |response|

versions = remaining_versions.concat(response.versions)
versions_by_key = versions.group_by {|v| v.key }
delete_markers = remaining_delete_markers.concat(response.delete_markers)
delete_markers_by_key = delete_markers.group_by {|v| v.key }

versions_by_key.each do |key, versions|
next if response.next_key_marker == key
filter_items(versions).each do |version|
dl_marker = filter_items(Array(delete_markers_by_key[version.key])).first
if dl_marker.nil? || (version.last_modified > dl_marker.last_modified)
yield version
end
end
end

remaining_versions = Array(versions_by_key[response.next_key_marker])
remaining_delete_markers = Array(delete_markers_by_key[response.next_key_marker])
end
end

Expand All @@ -90,33 +131,24 @@ def verbose?
# files - an Array of Aws::S3::Types::ObjectVersion.
#
# Returns nothing.
def fetch_objects(files)
threads = []

@options[:threads].times do
threads << Thread.new do
while !(file = @files_mutex.synchronize { files.pop }).nil? do
dest = Pathname.new("#{@options[:destination]}/#{file.key}")

if file.key.end_with?("/")
puts "Creating s3 subdirectory #{file.key} - #{Time.now}" if verbose?
dest.mkpath
else
dest.dirname.mkpath

puts "Copying from s3 #{file.key} - #{Time.now}" if verbose?
s3_client.get_object(response_target: "#{dest}",
bucket: @options[:bucket],
key: file.key,
version_id: file.version_id)
end

@fetch_block_mutex.synchronize { yield file } if block_given?
end
def fetch_object(file)
@pool.process do
dest = Pathname.new("#{@options[:destination]}/#{file.key}")
if file.key.end_with?("/")
puts "Creating s3 subdirectory #{file.key} - #{Time.now}" if verbose?
dest.mkpath
else
dest.dirname.mkpath

puts "Copying from s3 #{file.key} - #{Time.now}" if verbose?
s3_client.get_object(response_target: "#{dest}",
bucket: @options[:bucket],
key: file.key,
version_id: file.version_id)
end
end

threads.map(&:join)
@fetch_block_mutex.synchronize { yield file } if block_given?
end
end

# Private: Creates the S3 client instance.
Expand All @@ -132,8 +164,8 @@ def s3_client
# items - An Array of objects. Object must respond to #last_modified.
#
# Returns Array.
def filter_future_items(items)
items.keep_if { |obj| obj.last_modified <= @from }
def filter_items(items)
items.keep_if { |obj| obj.last_modified <= @from }.uniq {|obj| obj.key }
end
end
end
2 changes: 1 addition & 1 deletion lib/outatime/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Outatime
VERSION = "0.2.1"
VERSION = "0.3.0"
end
54 changes: 36 additions & 18 deletions spec/fetcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
}
)

allow(subject).to receive(:object_versions).and_return(files)
allow(subject).to receive(:object_versions).and_yield(files[0]).and_yield(files[1])
end

it "downloads the files" do
Expand Down Expand Up @@ -102,9 +102,12 @@
end

describe "#object_versions" do
it "generates the proper object versions" do
s3_client.stub_responses(:list_object_versions,
versions: [
let(:first_response) do
s3_client.stub_data(:list_object_versions,
{
next_key_marker: 'README',
next_version_id_marker: '111',
versions: [
{
key: "future_file",
last_modified: Chronic.parse("2016-11-05 14:49:00.000000000 Z"),
Expand All @@ -127,19 +130,9 @@
},
{
key: "README",
last_modified: Chronic.parse("2016-10-26 14:48:00.000000000 Z"),
last_modified: Chronic.parse("2016-11-05 15:48:00.000000000 Z"),
version_id: "112"
},
{
key: "README",
last_modified: Chronic.parse("2016-10-26 14:47:00.000000000 Z"),
version_id: "111"
},
{
key: "lib/",
last_modified: Chronic.parse("2016-10-26 14:46:00.000000000 Z"),
version_id: "661"
},
],
delete_markers: [
{
Expand All @@ -154,9 +147,34 @@
key: "README",
last_modified: Chronic.parse("2016-10-26 14:40:00.000000000 Z") # delete mark that happened before the last file modification, so it is ignored
}
])
]})
end

let(:second_response) do
s3_client.stub_data(:list_object_versions,
{
versions: [
{
key: "README",
last_modified: Chronic.parse("2016-10-26 14:47:00.000000000 Z"),
version_id: "111"
},
{
key: "lib/",
last_modified: Chronic.parse("2016-10-26 14:46:00.000000000 Z"),
version_id: "661"
},
],
delete_markers: []
})
end

it "generates the proper object versions" do
expect(s3_client).to receive(:list_object_versions)
.and_return([first_response, second_response])

versions = subject.object_versions
versions = []
subject.object_versions {|v| versions << v }

# ensure we are returning S3 object versions
expect(versions.map(&:class).uniq).to eq([Aws::S3::Types::ObjectVersion])
Expand All @@ -167,7 +185,7 @@
# ensure the correct file versions are returned
expect(versions.map(&:version_id))
.to match_array([
"112", # README
"111", # README
"221", # index.html
"661", # lib/
])
Expand Down

0 comments on commit cf00adb

Please sign in to comment.