Skip to content

Commit

Permalink
mongo-tail: Rewrite with Mongo Ruby driver 2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
cosmo0920 committed Oct 20, 2016
1 parent 93d6bf2 commit f111510
Showing 1 changed file with 37 additions and 30 deletions.
67 changes: 37 additions & 30 deletions bin/mongo-tail
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ require 'json'
require 'mongo'

TailConfig = {
:d => 'fluent',
:c => 'out_mongo_backup',
:h => 'localhost',
:p => 27017,
:n => 10
d: 'fluent',
c: 'out_mongo_backup',
h: 'localhost',
p: 27017,
n: 10
}

OptionParser.new { |opt|
Expand All @@ -28,51 +28,58 @@ OptionParser.new { |opt|
opt.parse!(ARGV)
}

def get_client_options(conf)
client_options = {}
client_options[:database] = conf[:d]
client_options
end

def get_collection_options
collection_options = {}
collection_options[:capped] = true
collection_options
end

def get_capped_collection(conf)
db = Mongo::MongoClient.new(conf[:h], conf[:p]).db(conf[:d])
if db.collection_names.include?(conf[:c])
collection = db.collection(conf[:c])
if collection.capped?
collection
else
puts "#{conf[:c]} is not capped. mongo-tail can not tail normal collection."
end
client_options = get_client_options(conf)
collection_options = get_collection_options
client = Mongo::Client.new(["#{conf[:h]}:#{conf[:p]}"], client_options)
collection = client["#{conf[:c]}", collection_options]
if collection.capped?
collection
else
puts "#{conf[:c]} not found: server = #{conf[:h]}:#{conf[:p]}"
puts "#{conf[:c]} is not capped. mongo-tail can not tail normal collection."
end
end

def create_cursor_conf(collection, conf)
def create_skip_number(collection, conf)
skip = collection.count - conf[:n]
cursor_conf = {}
cursor_conf[:skip] = skip if skip > 0
cursor_conf
skip
end

def tail_n(collection, conf)
collection.find({}, create_cursor_conf(collection, conf)).each { |doc|
collection.find().skip(create_skip_number(collection, conf)).each {|doc|
puts doc.to_json
}
end

def tail_forever(collection, conf)
cursor_conf = create_cursor_conf(collection, conf)
cursor_conf[:tailable] = true

cursor = Mongo::Cursor.new(collection, cursor_conf)
loop {
# TODO: Check more detail of cursor state if needed
cursor = Mongo::Cursor.new(collection, cursor_conf) unless cursor.alive?
option['_id'] = {'$gt' => BSON::ObjectId(@last_id)} if @last_id

if doc = cursor.next_document
STDOUT.puts doc.to_json
STDOUT.flush
documents = collection.find(option)
if documents.count >= 1
documents.each {|doc|
STDOUT.puts doc.to_json
STDOUT.flush
if id = doc.delete('_id')
@last_id = id.to_s
end
}
else
sleep 1
end
}
rescue
# ignore Mongo::OperationFailuer at CURSOR_NOT_FOUND
end

exit unless collection = get_capped_collection(TailConfig)
Expand Down

0 comments on commit f111510

Please sign in to comment.