diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index 767a4cde7e..0baf9d8a79 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -266,13 +266,16 @@ def setup_watcher(path, pe) line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines)) tw.attach do |watcher| - watcher.timer_trigger = timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer - event_loop_attach(watcher.stat_trigger) if watcher.enable_stat_watcher + event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger + event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger end tw rescue => e if tw - tw.detach + tw.detach { |watcher| + event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger + event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger + } tw.close end raise e @@ -343,7 +346,10 @@ def update_watcher(path, pe) # so adding close_io argument to avoid this problem. # At shutdown, IOHandler's io will be released automatically after detached the event loop def detach_watcher(tw, close_io = true) - tw.detach + tw.detach { |watcher| + event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger + event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger + } tw.close if close_io flush_buffer(tw) if tw.unwatched && @pf @@ -352,6 +358,8 @@ def detach_watcher(tw, close_io = true) end def detach_watcher_after_rotate_wait(tw) + # Call event_loop_attach/event_loop_detach is high-cost for short-live object. + # If this has a problem with large number of files, use @_event_loop directly instead of timer_execute. timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do detach_watcher(tw) end @@ -479,7 +487,7 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e @update_watcher = update_watcher @stat_trigger = @enable_stat_watcher ? StatWatcher.new(self, &method(:on_notify)) : nil - @timer_trigger = nil + @timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil @rotate_handler = RotateHandler.new(self, &method(:on_rotate)) @io_handler = nil @@ -513,8 +521,7 @@ def attach end def detach - @timer_trigger.detach if @enable_watch_timer && @timer_trigger && @timer_trigger.attached? - @stat_trigger.detach if @enable_stat_watcher && @stat_trigger && @stat_trigger.attached? + yield self @io_handler.on_notify if @io_handler end @@ -613,6 +620,21 @@ def swap_state(pe) pe # This pe will be updated in on_rotate after TailWatcher is initialized end + class TimerTrigger < Coolio::TimerWatcher + def initialize(interval, log, &callback) + @callback = callback + @log = log + super(interval, true) + end + + def on_timer + @callback.call + rescue => e + @log.error e.to_s + @log.error_backtrace + end + end + class StatWatcher < Coolio::StatWatcher def initialize(watcher, &callback) @watcher = watcher @@ -629,7 +651,6 @@ def on_change(prev, cur) end end - class FIFO def initialize(from_encoding, encoding) @from_encoding = from_encoding