diff --git a/lib/outatime/cli.rb b/lib/outatime/cli.rb index 6f66de6..9933889 100644 --- a/lib/outatime/cli.rb +++ b/lib/outatime/cli.rb @@ -22,7 +22,7 @@ def initialize(options) def run fetcher = Outatime::Fetcher.new(options) - pb = ProgressBar.create(total: fetcher.total_size, + pb = ProgressBar.create(total: nil, format: "%t: |%B| %f %c/%C %R MB/sec", rate_scale: lambda { |rate| rate / 1024 / 1024 }, throttle_rate: 0.5) diff --git a/lib/outatime/fetcher.rb b/lib/outatime/fetcher.rb index f6faa26..2f895d4 100644 --- a/lib/outatime/fetcher.rb +++ b/lib/outatime/fetcher.rb @@ -1,3 +1,45 @@ +require 'thread/pool' + +# Outatime::Fetcher is responsible for finding the exact revision for each file +# for a given time. +# +# AWS S3 API lists all file revisions in a very particular order and this +# class takes advantage of that to quickly parse and find the revision. +# +# The 'GET Bucket Object Versions' +# (http://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGETVersion.html) +# returns all file revisions ordered by key (path + filename) and last +# modified time. +# +# For example: +# +# id: 12456, key: "src/file1", last_modified: 7 Feb 11:38 +# id: 12357, key: "src/file1", last_modified: 7 Feb 11:37 +# id: 12357, key: "src/file1", last_modified: 7 Feb 11:00 +# id: 22222, key: "src/file2", last_modified: 7 Feb 11:39 +# ... +# +# Keep in mind that this response is paginated, where the max amount of +# revisions per page is 1000. +# +# When a versioned bucket contains a huge amount of files and revisions, +# fetching all data may take a long time. So Outatime::Fetcher starts +# downloading the correct revision even when not all file revisions are +# already fetched, i.e. when it is still downloading the revisions and their +# data. That accelerates the process. +# +# So how do we know when we have all information we need before start downloading +# a specific file revision? As the response is ordered by key (filename) and +# timestamps, fetcher can start downloading if the actual response page +# contains all possible revisions for a file, i.e. its revisions aren't +# paginated. +# +# When the response for a given key (filename) is paginated, the file is not +# downloaded until all revisions are fetched in the next response page. +# +# This algorithm removes the need to fetch all file revisions (which may take +# several requests) for a versioned bucket before starting to download its files, +# but acts on each individual file instead. module Outatime class Fetcher attr_accessor :options @@ -15,10 +57,10 @@ class Fetcher # def initialize(options = {}) @options = options - @files_mutex = Mutex.new @fetch_block_mutex = Mutex.new @s3_client = options[:s3_client] if options[:s3_client] - @from = ::Chronic.parse(@options[:from]) if @options[:from] + @from = ::Chronic.parse(@options[:from]) if @options[:from] + @pool = Thread.pool(@options.fetch(:threads, 20)) # raise if the date/time was not parsed raise ArgumentError, "The from time was not parseable." if @from.nil? @@ -31,7 +73,11 @@ def initialize(options = {}) # # Returns nothing. def fetch!(&block) - fetch_objects(object_versions, &block) + object_versions do |object_version| + fetch_object(object_version, &block) + end + + @pool.wait(:done) end # Public: Returns the objects total size. @@ -45,34 +91,29 @@ def total_size # # Returns an Array of Aws::S3::Types::ObjectVersion. def object_versions - puts "fetching object versions from #{@from}" if verbose? - @files ||= begin - versions = [] - delete_markers = [] - - s3_client.list_object_versions(bucket: @options[:bucket], - prefix: @options[:prefix]).each do |response| - - versions.concat(filter_future_items(response.versions)) - delete_markers.concat(filter_future_items(response.delete_markers)) - end - - # keep only the latest versions - # AWS lists the latest versions first, so it should be OK to use uniq here. - versions.uniq! { |obj| obj.key } - delete_markers.uniq! { |obj| obj.key } - - delete_marker_keys = delete_markers.map { |dm| dm.key } - - # check versions to see if we have newer delete_markers - # if so, delete those versions - versions.delete_if do |version| - if dm_index = delete_marker_keys.index(version.key) - if version.last_modified <= delete_markers[dm_index].last_modified - true + remaining_versions = [] + remaining_delete_markers = [] + + s3_client.list_object_versions(bucket: @options[:bucket], + prefix: @options[:prefix]).each do |response| + + versions = remaining_versions.concat(response.versions) + versions_by_key = versions.group_by {|v| v.key } + delete_markers = remaining_delete_markers.concat(response.delete_markers) + delete_markers_by_key = delete_markers.group_by {|v| v.key } + + versions_by_key.each do |key, versions| + next if response.next_key_marker == key + filter_items(versions).each do |version| + dl_marker = filter_items(Array(delete_markers_by_key[version.key])).first + if dl_marker.nil? || (version.last_modified > dl_marker.last_modified) + yield version end end end + + remaining_versions = Array(versions_by_key[response.next_key_marker]) + remaining_delete_markers = Array(delete_markers_by_key[response.next_key_marker]) end end @@ -90,33 +131,24 @@ def verbose? # files - an Array of Aws::S3::Types::ObjectVersion. # # Returns nothing. - def fetch_objects(files) - threads = [] - - @options[:threads].times do - threads << Thread.new do - while !(file = @files_mutex.synchronize { files.pop }).nil? do - dest = Pathname.new("#{@options[:destination]}/#{file.key}") - - if file.key.end_with?("/") - puts "Creating s3 subdirectory #{file.key} - #{Time.now}" if verbose? - dest.mkpath - else - dest.dirname.mkpath - - puts "Copying from s3 #{file.key} - #{Time.now}" if verbose? - s3_client.get_object(response_target: "#{dest}", - bucket: @options[:bucket], - key: file.key, - version_id: file.version_id) - end - - @fetch_block_mutex.synchronize { yield file } if block_given? - end + def fetch_object(file) + @pool.process do + dest = Pathname.new("#{@options[:destination]}/#{file.key}") + if file.key.end_with?("/") + puts "Creating s3 subdirectory #{file.key} - #{Time.now}" if verbose? + dest.mkpath + else + dest.dirname.mkpath + + puts "Copying from s3 #{file.key} - #{Time.now}" if verbose? + s3_client.get_object(response_target: "#{dest}", + bucket: @options[:bucket], + key: file.key, + version_id: file.version_id) end - end - threads.map(&:join) + @fetch_block_mutex.synchronize { yield file } if block_given? + end end # Private: Creates the S3 client instance. @@ -132,8 +164,8 @@ def s3_client # items - An Array of objects. Object must respond to #last_modified. # # Returns Array. - def filter_future_items(items) - items.keep_if { |obj| obj.last_modified <= @from } + def filter_items(items) + items.keep_if { |obj| obj.last_modified <= @from }.uniq {|obj| obj.key } end end end diff --git a/lib/outatime/version.rb b/lib/outatime/version.rb index c310bca..57db59b 100644 --- a/lib/outatime/version.rb +++ b/lib/outatime/version.rb @@ -1,3 +1,3 @@ module Outatime - VERSION = "0.2.1" + VERSION = "0.3.0" end diff --git a/spec/fetcher_spec.rb b/spec/fetcher_spec.rb index 0dbfc00..c38aebb 100644 --- a/spec/fetcher_spec.rb +++ b/spec/fetcher_spec.rb @@ -68,7 +68,7 @@ } ) - allow(subject).to receive(:object_versions).and_return(files) + allow(subject).to receive(:object_versions).and_yield(files[0]).and_yield(files[1]) end it "downloads the files" do @@ -102,9 +102,12 @@ end describe "#object_versions" do - it "generates the proper object versions" do - s3_client.stub_responses(:list_object_versions, - versions: [ + let(:first_response) do + s3_client.stub_data(:list_object_versions, + { + next_key_marker: 'README', + next_version_id_marker: '111', + versions: [ { key: "future_file", last_modified: Chronic.parse("2016-11-05 14:49:00.000000000 Z"), @@ -127,19 +130,9 @@ }, { key: "README", - last_modified: Chronic.parse("2016-10-26 14:48:00.000000000 Z"), + last_modified: Chronic.parse("2016-11-05 15:48:00.000000000 Z"), version_id: "112" }, - { - key: "README", - last_modified: Chronic.parse("2016-10-26 14:47:00.000000000 Z"), - version_id: "111" - }, - { - key: "lib/", - last_modified: Chronic.parse("2016-10-26 14:46:00.000000000 Z"), - version_id: "661" - }, ], delete_markers: [ { @@ -154,9 +147,34 @@ key: "README", last_modified: Chronic.parse("2016-10-26 14:40:00.000000000 Z") # delete mark that happened before the last file modification, so it is ignored } - ]) + ]}) + end + + let(:second_response) do + s3_client.stub_data(:list_object_versions, + { + versions: [ + { + key: "README", + last_modified: Chronic.parse("2016-10-26 14:47:00.000000000 Z"), + version_id: "111" + }, + { + key: "lib/", + last_modified: Chronic.parse("2016-10-26 14:46:00.000000000 Z"), + version_id: "661" + }, + ], + delete_markers: [] + }) + end + + it "generates the proper object versions" do + expect(s3_client).to receive(:list_object_versions) + .and_return([first_response, second_response]) - versions = subject.object_versions + versions = [] + subject.object_versions {|v| versions << v } # ensure we are returning S3 object versions expect(versions.map(&:class).uniq).to eq([Aws::S3::Types::ObjectVersion]) @@ -167,7 +185,7 @@ # ensure the correct file versions are returned expect(versions.map(&:version_id)) .to match_array([ - "112", # README + "111", # README "221", # index.html "661", # lib/ ])