diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb index 301ebe96d3..d89cef55dc 100644 --- a/lib/fluent/plugin/in_forward.rb +++ b/lib/fluent/plugin/in_forward.rb @@ -14,23 +14,26 @@ # limitations under the License. # -require 'fcntl' -require 'cool.io' +require 'fluent/plugin/input' +require 'fluent/msgpack_factory' require 'yajl' +require 'digest' -require 'fluent/input' +require 'fluent/plugin/socket_util' +require 'fcntl' +require 'cool.io' -module Fluent +module Fluent::Plugin class ForwardInput < Input - Plugin.register_input('forward', self) + Fluent::Plugin.register_input('forward', self) - LISTEN_PORT = 24224 + # See the wiki page below for protocol specification + # https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1 - def initialize - super - require 'fluent/plugin/socket_util' - end + helpers :event_loop + + LISTEN_PORT = 24224 desc 'The port to listen to.' config_param :port, :integer, default: LISTEN_PORT @@ -125,13 +128,12 @@ def configure(conf) }) end end + @lsock = @usock = nil end def start super - @loop = Coolio::Loop.new - socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] if Fluent.windows? socket_manager_path = socket_manager_path.to_i @@ -139,32 +141,17 @@ def start client = ServerEngine::SocketManager::Client.new(socket_manager_path) @lsock = listen(client) - @loop.attach(@lsock) + event_loop_attach(@lsock) @usock = client.listen_udp(@bind, @port) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request)) - @loop.attach(@hbr) - - @thread = Thread.new(&method(:run)) + event_loop_attach(@hbr) end - def shutdown - # In test cases it occasionally appeared that when detaching a watcher, another watcher is also detached. - # In the case in the iteration of watchers, a watcher that has been already detached is intended to be detached - # and therfore RuntimeError occurs saying that it is not attached to a loop. - # It occurs only when testing for sending responses to ForwardOutput. - # Sending responses needs to write the socket that is previously used only to read - # and a handler has 2 watchers that is used to read and to write. - # This problem occurs possibly because those watchers are thought to be related to each other - # and when detaching one of them the other is also detached for some reasons. - # As a workaround, check if watchers are attached before detaching them. - @loop.watchers.each {|w| w.detach if w.attached? } - @loop.stop - @usock.close - @thread.join - @lsock.close - + def close + @lsock.close if @lsock + @usock.close if @usock super end @@ -176,23 +163,6 @@ def listen(client) s end - #config_param :path, :string, :default => DEFAULT_SOCKET_PATH - #def listen - # if File.exist?(@path) - # File.unlink(@path) - # end - # FileUtils.mkdir_p File.dirname(@path) - # log.debug "listening fluent socket on #{@path}" - # Coolio::UNIXServer.new(@path, Handler, method(:on_message)) - #end - - def run - @loop.run(@blocking_timeout) - rescue => e - log.error "unexpected error", error: e - log.error_backtrace - end - private def handle_connection(conn) @@ -288,29 +258,6 @@ def response(option) nil end - # message Entry { - # 1: long time - # 2: object record - # } - # - # message Forward { - # 1: string tag - # 2: list entries - # 3: object option (optional) - # } - # - # message PackedForward { - # 1: string tag - # 2: raw entries # msgpack stream of Entry - # 3: object option (optional) - # } - # - # message Message { - # 1: string tag - # 2: long? time - # 3: object record - # 4: object option (optional) - # } def on_message(msg, chunk_size, peeraddr) if msg.nil? # for future TCP heartbeat_request @@ -338,7 +285,7 @@ def on_message(msg, chunk_size, peeraddr) # PackedForward option = msg[2] size = (option && option['size']) || 0 - es_class = (option && option['compressed'] == 'gzip') ? CompressedMessagePackEventStream : MessagePackEventStream + es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream es = es_class.new(entries, nil, size.to_i) es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event es = add_source_host(es, peeraddr[2]) if @source_hostname_key @@ -349,12 +296,12 @@ def on_message(msg, chunk_size, peeraddr) es = if @skip_invalid_event check_and_skip_invalid_event(tag, entries, peeraddr) else - es = MultiEventStream.new + es = Fluent::MultiEventStream.new entries.each { |e| record = e[1] next if record.nil? time = e[0] - time = (now ||= Engine.now) if time.to_i == 0 + time = Fluent::Engine.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime es.add(time, record) } es @@ -372,7 +319,7 @@ def on_message(msg, chunk_size, peeraddr) return msg[3] # retry never succeeded so return ack and drop incoming event. end return if record.nil? - time = Engine.now if time.to_i == 0 + time = Fluent::Engine.now if time.to_i == 0 record[@source_hostname_key] = peeraddr[2] if @source_hostname_key router.emit(tag, time, record) option = msg[3] @@ -387,7 +334,7 @@ def invalid_event?(tag, time, record) end def check_and_skip_invalid_event(tag, es, peeraddr) - new_es = MultiEventStream.new + new_es = Fluent::MultiEventStream.new es.each { |time, record| if invalid_event?(tag, time, record) log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record @@ -399,7 +346,7 @@ def check_and_skip_invalid_event(tag, es, peeraddr) end def add_source_host(es, host) - new_es = MultiEventStream.new + new_es = Fluent::MultiEventStream.new es.each { |time, record| record[@source_hostname_key] = host new_es.add(time, record) @@ -430,33 +377,6 @@ def generate_helo(nonce, user_auth_salt) ['HELO', {'nonce' => nonce, 'auth' => (@security ? user_auth_salt : ''), 'keepalive' => !@deny_keepalive}] end - ##### Authentication Handshake - # - # 1. (client) connect to server - # * Socket handshake, checks certificate and its significate (in client, if using SSL) - # 2. (server) - # * check network/domain acl (if enabled) - # * disconnect when failed - # 3. (server) send HELO - # * ['HELO', options(hash)] - # * options: - # * nonce: string (required) - # * auth: string or blank_string (string: authentication required, and its salt is this value) - # 4. (client) send PING - # * ['PING', selfhostname, sharedkey_salt, sha512_hex(sharedkey_salt + selfhostname + nonce + sharedkey), username || '', sha512_hex(auth_salt + username + password) || ''] - # 5. (server) check PING - # * check sharedkey - # * check username / password (if required) - # * send PONG FAILURE if failed - # * ['PONG', false, 'reason of authentication failure', '', ''] - # 6. (server) send PONG - # * ['PONG', bool(authentication result), 'reason if authentication failed', selfhostname, sha512_hex(salt + selfhostname + nonce + sharedkey)] - # 7. (client) check PONG - # * check sharedkey - # * disconnect when failed - # 8. connection established - # * send data from client - def check_ping(message, remote_addr, user_auth_salt, nonce) log.debug "checking ping" # ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || ''] diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 76724744cb..4f8d89fed2 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -14,42 +14,23 @@ # limitations under the License. # -require 'base64' -require 'socket' -require 'fileutils' - -require 'cool.io' - require 'fluent/output' require 'fluent/config/error' +require 'base64' -module Fluent - class ForwardOutputError < StandardError - end - - class ForwardOutputResponseError < ForwardOutputError - end - - class ForwardOutputConnectionClosedError < ForwardOutputError - end +require 'fluent/compat/socket_util' - class ForwardOutputACKTimeoutError < ForwardOutputResponseError - end +module Fluent::Plugin + class ForwardOutput < Output + class Error < StandardError; end + class ResponseError < Error; end + class ConnectionClosedError < Error; end + class ACKTimeoutError < Error; end - class ForwardOutput < ObjectBufferedOutput - Plugin.register_output('forward', self) + Fluent::Plugin.register_output('forward', self) LISTEN_PORT = 24224 - def initialize - super - require 'fluent/plugin/socket_util' - @nodes = [] #=> [Node] - @loop = nil - @thread = nil - @finished = false - end - desc 'The timeout time when sending event logs.' config_param :send_timeout, :time, default: 60 desc 'The transport protocol to use for heartbeats.(udp,tcp,none)' @@ -114,17 +95,34 @@ def initialize config_param :port, :integer, default: LISTEN_PORT, obsoleted: "User section instead." config_param :host, :string, default: nil, obsoleted: "Use section instead." + config_section :buffer do + config_set_default :chunk_keys, ["tag"] + end + attr_reader :read_interval, :recover_sample_size + def initialize + super + + @nodes = [] #=> [Node] + @loop = nil + @thread = nil + @finished = false + end + def configure(conf) super + unless @chunk_key_tag + raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output" + end + @read_interval = @read_interval_msec / 1000.0 @recover_sample_size = @recover_wait / @heartbeat_interval if @dns_round_robin if @heartbeat_type == :udp - raise ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option" + raise Fluent::ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option" end end @@ -147,7 +145,7 @@ def configure(conf) end if @nodes.empty? - raise ConfigError, "forward output plugin requires at least one is required" + raise Fluent::ConfigError, "forward output plugin requires at least one is required" end raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1 @@ -166,7 +164,7 @@ def start if @heartbeat_type == :udp # assuming all hosts use udp - @usock = SocketUtil.create_udp_socket(@nodes.first.host) + @usock = Fluent::Compat::SocketUtil.create_udp_socket(@nodes.first.host) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) @loop.attach(@hb) @@ -183,7 +181,8 @@ def shutdown @finished = true if @loop @loop.watchers.each {|w| w.detach } - @loop.stop + # @loop.stop + @loop.stop rescue nil end @thread.join if @thread @usock.close if @usock @@ -198,9 +197,10 @@ def run log.error_backtrace end - def write_objects(tag, chunk) + def write(chunk) return if chunk.empty? + tag = chunk.metadata.tag error = nil wlen = @weight_array.length @@ -439,10 +439,10 @@ def send_data(tag, chunk) end unless available? - raise ForwardOutputConnectionClosedError, "failed to establish connection with node #{@name}" + raise ConnectionClosedError, "failed to establish connection with node #{@name}" end - option = { 'size' => chunk.size_of_events, 'compressed' => @compress } + option = { 'size' => chunk.size, 'compressed' => @compress } option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response # out_forward always uses Raw32 type for content. @@ -472,13 +472,13 @@ def send_data(tag, chunk) if raw_data.empty? @log.warn "node closed the connection. regard it as unavailable.", host: @host, port: @port disable! - raise ForwardOutputConnectionClosedError, "node #{@host}:#{@port} closed connection" + raise ConnectionClosedError, "node #{@host}:#{@port} closed connection" else @unpacker.feed(raw_data) res = @unpacker.read if res['ack'] != option['chunk'] # Some errors may have occured when ack and chunk id is different, so send the chunk again. - raise ForwardOutputResponseError, "ack in response and chunk id in sent data are different" + raise ResponseError, "ack in response and chunk id in sent data are different" end end @@ -489,7 +489,7 @@ def send_data(tag, chunk) # (2) the node does support sending response but responses have not arrived for some reasons. @log.warn "no response from node. regard it as unavailable.", host: @host, port: @port disable! - raise ForwardOutputACKTimeoutError, "node #{host}:#{port} does not return ACK" + raise ACKTimeoutError, "node #{host}:#{port} does not return ACK" end end @@ -601,11 +601,6 @@ def heartbeat(detect=true) end end - # TODO: #to_msgpack(string) is deprecated - def to_msgpack(out = '') - [@host, @port, @weight, @available].to_msgpack(out) - end - def generate_salt SecureRandom.hex(16) end diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb index 6f63203531..e81185f71f 100644 --- a/lib/fluent/test/driver/base.rb +++ b/lib/fluent/test/driver/base.rb @@ -76,13 +76,6 @@ def broken? def run(timeout: nil, start: true, shutdown: true, &block) instance_start if start - if @instance.respond_to?(:thread_wait_until_start) - @instance.thread_wait_until_start - end - if @instance.respond_to?(:event_loop_wait_until_start) - @instance.event_loop_wait_until_start - end - timeout ||= DEFAULT_TIMEOUT stop_at = Process.clock_gettime(@test_clock_id) + timeout @run_breaking_conditions << ->(){ Process.clock_gettime(@test_clock_id) >= stop_at } @@ -115,6 +108,14 @@ def instance_start @instance.after_start end + if @instance.respond_to?(:thread_wait_until_start) + @instance.thread_wait_until_start + end + + if @instance.respond_to?(:event_loop_wait_until_start) + @instance.event_loop_wait_until_start + end + instance_hook_after_started end diff --git a/lib/fluent/test/driver/base_owner.rb b/lib/fluent/test/driver/base_owner.rb index 1fbb8b3d42..c6a36305a8 100644 --- a/lib/fluent/test/driver/base_owner.rb +++ b/lib/fluent/test/driver/base_owner.rb @@ -71,25 +71,35 @@ def emit_error_event(tag, time, record, error) end def events(tag: nil) - return [] if @event_streams.nil? - selected = @event_streams.select{|e| tag.nil? ? true : e.tag == tag } if block_given? - selected.each do |e| - e.es.each do |time, record| - yield e.tag, time, record + event_streams(tag: tag) do |t, es| + es.each do |time, record| + yield t, time, record end end else list = [] - selected.each do |e| - e.es.each do |time, record| - list << [e.tag, time, record] + event_streams(tag: tag) do |t, es| + es.each do |time, record| + list << [t, time, record] end end list end end + def event_streams(tag: nil) + return [] if @event_streams.nil? + selected = @event_streams.select{|e| tag.nil? ? true : e.tag == tag } + if block_given? + selected.each do |e| + yield e.tag, e.es + end + else + selected.map{|e| [e.tag, e.es] } + end + end + def emit_count @event_streams.size end diff --git a/lib/fluent/test/startup_shutdown.rb b/lib/fluent/test/startup_shutdown.rb index aca4cd824b..62da263f24 100644 --- a/lib/fluent/test/startup_shutdown.rb +++ b/lib/fluent/test/startup_shutdown.rb @@ -15,6 +15,7 @@ # require 'serverengine' +require 'fileutils' module Fluent module Test @@ -28,6 +29,18 @@ def startup def shutdown @server.close end + + def self.setup + @socket_manager_path = ServerEngine::SocketManager::Server.generate_path + @server = ServerEngine::SocketManager::Server.open(@socket_manager_path) + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_path.to_s + end + + def self.teardown + @server.close + # on Windows, socket_manager_path is a TCP port number + FileUtils.rm_f @socket_manager_path unless Fluent.windows? + end end end end diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb index 6877cea548..50b9e8ac0c 100644 --- a/test/plugin/test_in_forward.rb +++ b/test/plugin/test_in_forward.rb @@ -1,6 +1,6 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/test/startup_shutdown' require 'base64' @@ -9,12 +9,19 @@ require 'fluent/plugin/in_forward' require 'fluent/plugin/compressable' +require 'timecop' + class ForwardInputTest < Test::Unit::TestCase include Fluent::Plugin::Compressable def setup Fluent::Test.setup @responses = [] # for testing responses after sending data + @d = nil + end + + def teardown + @d.instance_shutdown if @d end PORT = unused_port @@ -48,12 +55,19 @@ def setup ] def create_driver(conf=CONFIG) - Fluent::Test::InputTestDriver.new(Fluent::ForwardInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::ForwardInput).configure(conf) end - class Configure < self - def test_simple - d = create_driver + sub_test_case '#configure' do + setup do + Fluent::Test::StartupShutdown.setup + end + teardown do + Fluent::Test::StartupShutdown.teardown + end + + test 'simple' do + @d = d = create_driver assert_equal PORT, d.instance.port assert_equal '127.0.0.1', d.instance.bind assert_equal 0, d.instance.linger_timeout @@ -61,8 +75,8 @@ def test_simple assert !d.instance.backlog end - def test_auth - d = create_driver(CONFIG_AUTH) + test 'auth' do + @d = d = create_driver(CONFIG_AUTH) assert_equal PORT, d.instance.port assert_equal '127.0.0.1', d.instance.bind assert_equal 0, d.instance.linger_timeout @@ -74,148 +88,153 @@ def test_auth end end - # TODO: Will add Loop::run arity check with stub/mock library - def connect TCPSocket.new('127.0.0.1', PORT) end - class Message < self - extend Fluent::Test::StartupShutdown - - def test_time - d = create_driver - - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") - Fluent::Engine.now = time - - records = [ - ["tag1", time, {"a"=>1}], - ["tag2", time, {"a"=>2}], - ] + sub_test_case 'message' do + setup do + Fluent::Test::StartupShutdown.setup + end + teardown do + Fluent::Test::StartupShutdown.teardown + end - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do - records.each {|tag, _time, record| - send_data packer.write([tag, 0, record]).to_s - } + test 'time' do + time = event_time("2011-01-02 13:14:15 UTC") + begin + Timecop.freeze(Time.at(time)) + @d = d = create_driver + + records = [ + ["tag1", time, {"a"=>1}], + ["tag2", time, {"a"=>2}], + ] + + d.run(expect_records: records.length, timeout: 5) do + records.each {|tag, _time, record| + send_data packer.write([tag, 0, record]).to_s + } + end + assert_equal(records, d.events.sort_by {|a| a[0] }) + ensure + Timecop.return end - assert_equal(records, d.emits.sort_by {|a| a[0] }) end - def test_plain - d = create_driver + test 'plain' do + @d = d = create_driver - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}], ] - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do + d.run(expect_records: records.length, timeout: 5) do records.each {|tag, _time, record| send_data packer.write([tag, _time, record]).to_s } end - assert_equal(records, d.emits) + assert_equal(records, d.events) end - def test_time_as_integer - d = create_driver + test 'time_as_integer' do + @d = d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time_i = event_time("2011-01-02 13:14:15 UTC").to_i records = [ - ["tag1", time, {"a"=>1}], - ["tag2", time, {"a"=>2}], + ["tag1", time_i, {"a"=>1}], + ["tag2", time_i, {"a"=>2}], ] - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do + d.run(expect_records: records.length, timeout: 5) do records.each {|tag, _time, record| send_data packer.write([tag, _time, record]).to_s } end - assert_equal(records, d.emits) + assert_equal(records, d.events) end - def test_skip_invalid_event - d = create_driver(CONFIG + "skip_invalid_event true") + test 'skip_invalid_event' do + @d = d = create_driver(CONFIG + "skip_invalid_event true") - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a" => 1}], ["tag2", time, {"a" => 2}], ] - d.run do - entries = records.map { |tag, _time, record| [tag, _time, record] } + d.run(shutdown: false, expect_records: 2, timeout: 10) do + entries = [] # These entries are skipped entries << ['tag1', true, {'a' => 3}] << ['tag2', time, 'invalid record'] + entries += records.map { |tag, _time, record| [tag, _time, record] } entries.each {|tag, _time, record| # Without ack, logs are sometimes not saved to logs during test. - send_data packer.write([tag, _time, record]).to_s, try_to_receive_response: true + send_data packer.write([tag, _time, record]).to_s #, try_to_receive_response: true } end - assert_equal 2, d.instance.log.logs.count { |line| line =~ /got invalid event and drop it/ } - assert_equal records[0], d.emits[0] - assert_equal records[1], d.emits[1] + logs = d.instance.log.logs + assert_equal 2, logs.count { |line| line =~ /got invalid event and drop it/ } + assert_equal records[0], d.events[0] + assert_equal records[1], d.events[1] + + d.instance_shutdown end - def test_json - d = create_driver + test 'json_using_integer_time' do + @d = d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time_i = event_time("2011-01-02 13:14:15 UTC").to_i records = [ - ["tag1", time, {"a"=>1}], - ["tag2", time, {"a"=>2}], + ["tag1", time_i, {"a"=>1}], + ["tag2", time_i, {"a"=>2}], ] - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do + d.run(expect_records: records.length, timeout: 20) do records.each {|tag, _time, record| send_data [tag, _time, record].to_json } end - assert_equal(records, d.emits.sort_by {|a| a[1] }) + assert_equal(records, d.events.sort_by {|a| a[1] }) end - def test_json_with_newline - d = create_driver + test 'json_with_newline' do + @d = d = create_driver - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time_i = event_time("2011-01-02 13:14:15 UTC").to_i records = [ - ["tag1", time, {"a"=>1}], - ["tag2", time, {"a"=>2}], + ["tag1", time_i, {"a"=>1}], + ["tag2", time_i, {"a"=>2}], ] - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do + d.run(expect_records: records.length, timeout: 20) do records.each {|tag, _time, record| send_data [tag, _time, record].to_json + "\n" } end - assert_equal(records, d.emits.sort_by {|a| a[1] }) + assert_equal(records, d.events.sort_by {|a| a[1] }) end end - class Forward < self - extend Fluent::Test::StartupShutdown + sub_test_case 'forward' do + setup do + Fluent::Test::StartupShutdown.setup + end + teardown do + Fluent::Test::StartupShutdown.teardown + end data(tcp: { config: CONFIG, @@ -229,32 +248,26 @@ class Forward < self auth: true } }) - def test_plain(data) + test 'plain' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(expect_records: records.length, timeout: 20) do entries = [] records.each {|tag, _time, record| entries << [_time, record] } send_data packer.write(["tag1", entries]).to_s, **options end - assert_equal(records, d.emits) - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely + assert_equal(records, d.events) end data(tcp: { @@ -269,23 +282,19 @@ def test_plain(data) auth: true } }) - def test_time_as_integer(data) + test 'time_as_integer' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time_i = event_time("2011-01-02 13:14:15 UTC") records = [ - ["tag1", time, {"a"=>1}], - ["tag1", time, {"a"=>2}] + ["tag1", time_i, {"a"=>1}], + ["tag1", time_i, {"a"=>2}] ] - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(expect_records: records.length, timeout: 20) do entries = [] records.each {|tag, _time, record| entries << [_time, record] @@ -293,9 +302,7 @@ def test_time_as_integer(data) send_data packer.write(["tag1", entries]).to_s, **options end - assert_equal(records, d.emits) - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely + assert_equal(records, d.events) end data(tcp: { @@ -310,23 +317,19 @@ def test_time_as_integer(data) auth: true } }) - def test_skip_invalid_event(data) + test 'skip_invalid_event' do |data| config = data[:config] options = data[:options] - d = create_driver(config + "skip_invalid_event true") + @d = d = create_driver(config + "skip_invalid_event true") - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a" => 1}], ["tag1", time, {"a" => 2}], ] - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(shutdown: false, expect_records: records.length, timeout: 20) do entries = records.map { |tag, _time, record| [_time, record] } # These entries are skipped entries << ['invalid time', {'a' => 3}] << [time, 'invalid record'] @@ -334,14 +337,20 @@ def test_skip_invalid_event(data) send_data packer.write(["tag1", entries]).to_s, **options end - assert_equal 2, d.instance.log.logs.count { |line| line =~ /skip invalid event/ } + logs = d.instance.log.out.logs + assert{ logs.select{|line| line =~ /skip invalid event/ }.size == 2 } - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely + d.instance_shutdown end end - class PackedForward < self - extend Fluent::Test::StartupShutdown + sub_test_case 'packed forward' do + setup do + Fluent::Test::StartupShutdown.setup + end + teardown do + Fluent::Test::StartupShutdown.teardown + end data(tcp: { config: CONFIG, @@ -355,32 +364,26 @@ class PackedForward < self auth: true } }) - def test_plain(data) + test 'plain' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}], ] - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(expect_records: records.length, timeout: 20) do entries = '' records.each {|_tag, _time, record| packer(entries).write([_time, record]).flush } send_data packer.write(["tag1", entries]).to_s, **options end - assert_equal(records, d.emits) - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely + assert_equal(records, d.events) end data(tcp: { @@ -395,32 +398,26 @@ def test_plain(data) auth: true } }) - def test_time_as_integer(data) + test 'time_as_integer' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time_i = event_time("2011-01-02 13:14:15 UTC").to_i records = [ - ["tag1", time, {"a"=>1}], - ["tag1", time, {"a"=>2}], + ["tag1", time_i, {"a"=>1}], + ["tag1", time_i, {"a"=>2}], ] - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(expect_records: records.length, timeout: 20) do entries = '' records.each {|tag, _time, record| packer(entries).write([_time, record]).flush } send_data packer.write(["tag1", entries]).to_s, **options end - assert_equal(records, d.emits) - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely + assert_equal(records, d.events) end data(tcp: { @@ -435,23 +432,19 @@ def test_time_as_integer(data) auth: true } }) - def test_skip_invalid_event(data) + test 'skip_invalid_event' do |data| config = data[:config] options = data[:options] - d = create_driver(config + "skip_invalid_event true") + @d = d = create_driver(config + "skip_invalid_event true") - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") records = [ ["tag1", time, {"a" => 1}], ["tag1", time, {"a" => 2}], ] - d.expected_emits_length = records.length - d.run_timeout = 2 - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(shutdown: false, expect_records: records.length, timeout: 20) do entries = records.map { |tag, _time, record| [_time, record] } # These entries are skipped entries << ['invalid time', {'a' => 3}] << [time, 'invalid record'] @@ -463,22 +456,28 @@ def test_skip_invalid_event(data) send_data packer.write(["tag1", packed_entries]).to_s, **options end - assert_equal 2, d.instance.log.logs.count { |line| line =~ /skip invalid event/ } + logs = d.instance.log.logs + assert_equal 2, logs.count { |line| line =~ /skip invalid event/ } - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely + d.instance_shutdown end end - class CompressedPackedForward < self - extend Fluent::Test::StartupShutdown + sub_test_case 'compressed packed forward' do + setup do + Fluent::Test::StartupShutdown.setup + end + teardown do + Fluent::Test::StartupShutdown.teardown + end - def test_set_compress_to_option - d = create_driver + test 'set_compress_to_option' do + @d = d = create_driver - time = event_time("2011-01-02 13:14:15 UTC").to_i + time_i = event_time("2011-01-02 13:14:15 UTC").to_i events = [ - ["tag1", time, {"a"=>1}], - ["tag1", time, {"a"=>2}] + ["tag1", time_i, {"a"=>1}], + ["tag1", time_i, {"a"=>2}] ] # create compressed entries @@ -496,16 +495,16 @@ def test_set_compress_to_option end end - assert_equal events, d.emits + assert_equal events, d.events end - def test_create_CompressedMessagePackEventStream_with_gzip_compress_option - d = create_driver + test 'create_CompressedMessagePackEventStream_with_gzip_compress_option' do + @d = d = create_driver - time = event_time("2011-01-02 13:14:15 UTC").to_i + time_i = event_time("2011-01-02 13:14:15 UTC").to_i events = [ - ["tag1", time, {"a"=>1}], - ["tag1", time, {"a"=>2}] + ["tag1", time_i, {"a"=>1}], + ["tag1", time_i, {"a"=>2}] ] # create compressed entries @@ -525,20 +524,24 @@ def test_create_CompressedMessagePackEventStream_with_gzip_compress_option assert_equal 'gzip', option['compressed'] end end - d.emits end end - class Warning < self - extend Fluent::Test::StartupShutdown + sub_test_case 'warning' do + setup do + Fluent::Test::StartupShutdown.setup + end + teardown do + Fluent::Test::StartupShutdown.teardown + end - def test_send_large_chunk_warning - d = create_driver(CONFIG + %[ - chunk_size_warn_limit 16M - chunk_size_limit 32M - ]) + test 'send_large_chunk_warning' do + @d = d = create_driver(CONFIG + %[ + chunk_size_warn_limit 16M + chunk_size_limit 32M + ]) - time = Fluent::EventTime.parse("2014-04-25 13:14:15 UTC") + time = event_time("2014-04-25 13:14:15 UTC") # generate over 16M chunk str = "X" * 1024 * 1024 @@ -546,59 +549,61 @@ def test_send_large_chunk_warning assert chunk.size > (16 * 1024 * 1024) assert chunk.size < (32 * 1024 * 1024) - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, PEERADDR) end end # check emitted data - emits = d.emits + emits = d.events assert_equal 16, emits.size assert emits.map(&:first).all?{|t| t == "test.tag" } assert_equal (0...16).to_a, emits.map{|_tag, t, _record| t - time } # check log - assert d.instance.log.logs.select{|line| + logs = d.instance.log.logs + assert_equal 1, logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=16777216 size=16777501/ - }.size == 1, "large chunk warning is not logged" + }.size, "large chunk warning is not logged" + + d.instance_shutdown end - def test_send_large_chunk_only_warning - d = create_driver(CONFIG + %[ - chunk_size_warn_limit 16M - ]) - time = Fluent::EventTime.parse("2014-04-25 13:14:15 UTC") + test 'send_large_chunk_only_warning' do + @d = d = create_driver(CONFIG + %[ + chunk_size_warn_limit 16M + ]) + time = event_time("2014-04-25 13:14:15 UTC") # generate over 16M chunk str = "X" * 1024 * 1024 chunk = [ "test.tag", (0...16).map{|i| [time + i, {"data" => str}] } ].to_msgpack - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, PEERADDR) end end # check log - assert d.instance.log.logs.select{ |line| + logs = d.instance.log.logs + assert_equal 1, logs.select{ |line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_warn_limit':/ && line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=16777216 size=16777501/ - }.size == 1, "large chunk warning is not logged" + }.size, "large chunk warning is not logged" + + d.instance_shutdown end - def test_send_large_chunk_limit - d = create_driver(CONFIG + %[ - chunk_size_warn_limit 16M - chunk_size_limit 32M - ]) + test 'send_large_chunk_limit' do + @d = d = create_driver(CONFIG + %[ + chunk_size_warn_limit 16M + chunk_size_limit 32M + ]) - time = Fluent::EventTime.parse("2014-04-25 13:14:15 UTC") + time = event_time("2014-04-25 13:14:15 UTC") # generate over 32M chunk str = "X" * 1024 * 1024 @@ -606,50 +611,56 @@ def test_send_large_chunk_limit assert chunk.size > (32 * 1024 * 1024) # d.run => send_data - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(shutdown: false) do Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj| d.instance.send(:on_message, obj, chunk.size, PEERADDR) end end # check emitted data - emits = d.emits + emits = d.events assert_equal 0, emits.size # check log - assert d.instance.log.logs.select{|line| + logs = d.instance.log.logs + assert_equal 1, logs.select{|line| line =~ / \[warn\]: Input chunk size is larger than 'chunk_size_limit', dropped:/ && line =~ / tag="test.tag" source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" limit=33554432 size=33554989/ - }.size == 1, "large chunk warning is not logged" + }.size, "large chunk warning is not logged" + + d.instance_shutdown end data('string chunk' => 'broken string', 'integer chunk' => 10) - def test_send_broken_chunk(data) - d = create_driver + test 'send_broken_chunk' do |data| + @d = d = create_driver # d.run => send_data - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(shutdown: false) do d.instance.send(:on_message, data, 1000000000, PEERADDR) end # check emitted data - emits = d.emits - assert_equal 0, emits.size + assert_equal 0, d.events.size # check log - assert d.instance.log.logs.select{|line| + logs = d.instance.log.logs + assert_equal 1, logs.select{|line| line =~ / \[warn\]: incoming chunk is broken: source="host: 127.0.0.1, addr: 127.0.0.1, port: \d+" msg=#{data.inspect}/ - }.size == 1, "should not accept broken chunk" + }.size, "should not accept broken chunk" + + d.instance_shutdown end end - class RespondToRequiringAck < self - extend Fluent::Test::StartupShutdown + sub_test_case 'respond to required ack' do + setup do + Fluent::Test::StartupShutdown.setup + end + teardown do + Fluent::Test::StartupShutdown.teardown + end data(tcp: { config: CONFIG, @@ -663,24 +674,21 @@ class RespondToRequiringAck < self auth: true } }) - def test_message(data) + test 'message' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}] ] - d.expected_emits_length = events.length expected_acks = [] - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(expect_records: events.size) do events.each {|tag, _time, record| op = { 'chunk' => Base64.encode64(record.object_id.to_s) } expected_acks << op['chunk'] @@ -688,10 +696,8 @@ def test_message(data) } end - assert_equal events, d.emits + assert_equal events, d.events assert_equal expected_acks, @responses.map { |res| MessagePack.unpack(res)['ack'] } - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely end # FIX: response is not pushed into @responses because IO.select has been blocked until InputForward shutdowns @@ -707,24 +713,21 @@ def test_message(data) auth: true } }) - def test_forward(data) + test 'forward' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] - d.expected_emits_length = events.length expected_acks = [] - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(expect_records: events.size) do entries = [] events.each {|_tag, _time, record| entries << [_time, record] @@ -734,10 +737,8 @@ def test_forward(data) send_data ["tag1", entries, op].to_msgpack, try_to_receive_response: true, **options end - assert_equal events, d.emits + assert_equal events, d.events assert_equal expected_acks, @responses.map { |res| MessagePack.unpack(res)['ack'] } - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely end data(tcp: { @@ -752,24 +753,21 @@ def test_forward(data) auth: true } }) - def test_packed_forward(data) + test 'packed_forward' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] - d.expected_emits_length = events.length expected_acks = [] - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(expect_records: events.size) do entries = '' events.each {|_tag, _time,record| [time, record].to_msgpack(entries) @@ -779,10 +777,8 @@ def test_packed_forward(data) send_data ["tag1", entries, op].to_msgpack, try_to_receive_response: true, **options end - assert_equal events, d.emits + assert_equal events, d.events assert_equal expected_acks, @responses.map { |res| MessagePack.unpack(res)['ack'] } - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely end data( @@ -800,26 +796,21 @@ def test_packed_forward(data) # } # }, ) - def test_message_json(data) + test 'message_json' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time_i = event_time("2011-01-02 13:14:15 UTC") events = [ - ["tag1", time, {"a"=>1}], - ["tag2", time, {"a"=>2}] + ["tag1", time_i, {"a"=>1}], + ["tag2", time_i, {"a"=>2}] ] - d.expected_emits_length = events.length expected_acks = [] - d.expected_emits_length = events.length - d.run_timeout = 2 - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status - + d.run(expect_records: events.size, timeout: 20) do events.each {|tag, _time, record| op = { 'chunk' => Base64.encode64(record.object_id.to_s) } expected_acks << op['chunk'] @@ -827,15 +818,18 @@ def test_message_json(data) } end - assert_equal events, d.emits + assert_equal events, d.events assert_equal expected_acks, @responses.map { |res| JSON.parse(res)['ack'] } - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely end end - class NotRespondToNotRequiringAck < self - extend Fluent::Test::StartupShutdown + sub_test_case 'not respond without required ack' do + setup do + Fluent::Test::StartupShutdown.setup + end + teardown do + Fluent::Test::StartupShutdown.teardown + end data(tcp: { config: CONFIG, @@ -849,31 +843,26 @@ class NotRespondToNotRequiringAck < self auth: true } }) - def test_message(data) + test 'message' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag2", time, {"a"=>2}] ] - d.expected_emits_length = events.length - - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status + d.run(expect_records: events.size, timeout: 20) do events.each {|tag, _time, record| send_data [tag, _time, record].to_msgpack, try_to_receive_response: true, **options } end - assert_equal events, d.emits + assert_equal events, d.events assert_equal [nil, nil], @responses - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely end data(tcp: { @@ -888,22 +877,19 @@ def test_message(data) auth: true } }) - def test_forward(data) + test 'forward' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] - d.expected_emits_length = events.length - - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status + d.run(expect_records: events.size, timeout: 20) do entries = [] events.each {|tag, _time, record| entries << [_time, record] @@ -911,10 +897,8 @@ def test_forward(data) send_data ["tag1", entries].to_msgpack, try_to_receive_response: true, **options end - assert_equal events, d.emits + assert_equal events, d.events assert_equal [nil], @responses - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely end data(tcp: { @@ -929,22 +913,19 @@ def test_forward(data) auth: true } }) - def test_packed_forward(data) + test 'packed_forward' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] - d.expected_emits_length = events.length - - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status + d.run(expect_records: events.size, timeout: 20) do entries = '' events.each {|tag, _time, record| [_time, record].to_msgpack(entries) @@ -952,10 +933,8 @@ def test_packed_forward(data) send_data ["tag1", entries].to_msgpack, try_to_receive_response: true, **options end - assert_equal events, d.emits + assert_equal events, d.events assert_equal [nil], @responses - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely end data( @@ -973,31 +952,26 @@ def test_packed_forward(data) # } # }, ) - def test_message_json(data) + test 'message_json' do |data| config = data[:config] options = data[:options] - d = create_driver(config) + @d = d = create_driver(config) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time_i = event_time("2011-01-02 13:14:15 UTC").to_i events = [ - ["tag1", time, {"a"=>1}], - ["tag2", time, {"a"=>2}] + ["tag1", time_i, {"a"=>1}], + ["tag2", time_i, {"a"=>2}] ] - d.expected_emits_length = events.length - - d.run do - sleep 0.1 until d.instance.instance_eval{ @thread } && d.instance.instance_eval{ @thread }.status + d.run(expect_records: events.size, timeout: 20) do events.each {|tag, _time, record| send_data [tag, _time, record].to_json, try_to_receive_response: true, **options } end - assert_equal events, d.emits + assert_equal events, d.events assert_equal [nil, nil], @responses - - sleep 0.1 while d.instance.instance_eval{ @thread }.status # to confirm that plugin stopped completely end end @@ -1015,11 +989,12 @@ def unpacker def read_data(io, timeout, &block) res = '' select_timeout = 2 - timeout_at = Time.now + timeout + clock_id = Process::CLOCK_MONOTONIC_RAW rescue Process::CLOCK_MONOTONIC + timeout_at = Process.clock_gettime(clock_id) + timeout begin buf = '' io_activated = false - while Time.now < timeout_at + while Process.clock_gettime(clock_id) < timeout_at if IO.select([io], nil, nil, select_timeout) io_activated = true buf = io.readpartial(2048) @@ -1114,7 +1089,12 @@ def send_data(data, try_to_receive_response: false, response_timeout: 5, auth: f end sub_test_case 'source_hostname_key feature' do - extend Fluent::Test::StartupShutdown + setup do + Fluent::Test::StartupShutdown.setup + end + teardown do + Fluent::Test::StartupShutdown.teardown + end test 'message protocol with source_hostname_key' do execute_test { |events| @@ -1146,20 +1126,19 @@ def send_data(data, try_to_receive_response: false, response_timeout: 5, auth: f end def execute_test(&block) - d = create_driver(CONFIG + 'source_hostname_key source') + @d = d = create_driver(CONFIG + 'source_hostname_key source') - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") events = [ ["tag1", time, {"a"=>1}], ["tag1", time, {"a"=>2}] ] - d.expected_emits_length = events.length - d.run do + d.run(expect_records: events.size) do block.call(events) end - d.emits.each { |tag, _time, record| + d.events.each { |tag, _time, record| assert_true record.has_key?('source') } end diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 8d6e9c8f6f..8d0a4c932b 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -1,9 +1,12 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/output' require 'fluent/test/startup_shutdown' require 'fluent/plugin/out_forward' require 'flexmock/test_unit' +require 'fluent/test/driver/input' +require 'fluent/plugin/in_forward' + class ForwardOutputTest < Test::Unit::TestCase extend Fluent::Test::StartupShutdown @@ -29,9 +32,13 @@ def setup ] def create_driver(conf=CONFIG) - d = Fluent::Test::BufferedOutputTestDriver.new(Fluent::ForwardOutput) { + Fluent::Test::Driver::Output.new(Fluent::Plugin::ForwardOutput) { attr_reader :responses, :exceptions + def write(chunk) + super + end + def initialize super @responses = [] @@ -53,12 +60,6 @@ def send_data(tag, chunk) end end }.configure(conf) - router = Object.new - def router.method_missing(name, *args, **kw_args, &block) - Engine.root_agent.event_router.__send__(name, *args, **kw_args, &block) - end - d.instance.router = router - d end def test_configure @@ -173,29 +174,26 @@ def test_send_with_time_as_integer d = create_driver(CONFIG + %[flush_interval 1s]) - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - d.register_run_post_condition do - d.instance.responses.length == 1 - end - target_input_driver.run do - d.run do + d.end_if{ d.instance.responses.length == 1 } + d.run(default_tag: 'test') do records.each do |record| - d.emit record, time + d.feed(time, record) end end end - emits = target_input_driver.emits - assert_equal ['test', time, records[0]], emits[0] - assert_equal ['test', time, records[1]], emits[1] - assert(emits[0][1].is_a?(Integer)) - assert(emits[1][1].is_a?(Integer)) + events = target_input_driver.events + assert_equal_event_time(time, events[0][1]) + assert_equal ['test', time, records[0]], events[0] + assert_equal_event_time(time, events[1][1]) + assert_equal ['test', time, records[1]], events[1] assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned assert_empty d.instance.exceptions @@ -209,29 +207,26 @@ def test_send_without_time_as_integer time_as_integer false ]) - time = Fluent::EventTime.parse("2011-01-02 13:14:15 UTC") + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - d.register_run_post_condition do - d.instance.responses.length == 1 - end - target_input_driver.run do - d.run do + d.end_if{ d.instance.responses.length == 1 } + d.run(default_tag: 'test') do records.each do |record| - d.emit record, time + d.feed(time, record) end end end - emits = target_input_driver.emits - assert_equal ['test', time, records[0]], emits[0] - assert_equal ['test', time, records[1]], emits[1] - assert_equal_event_time(time, emits[0][1]) - assert_equal_event_time(time, emits[1][1]) + events = target_input_driver.events + assert_equal_event_time(time, events[0][1]) + assert_equal ['test', time, records[0]], events[0] + assert_equal_event_time(time, events[1][1]) + assert_equal ['test', time, records[1]], events[1] assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned assert_empty d.instance.exceptions @@ -251,24 +246,21 @@ def test_send_comprssed_message_pack_stream_if_compress_is_gzip {"a" => 1}, {"a" => 2} ] - d.register_run_post_condition do - d.instance.responses.length == 1 - end - target_input_driver.run do - d.run do + d.end_if{ d.instance.responses.length == 1 } + d.run(default_tag: 'test') do records.each do |record| - d.emit record, time + d.feed(time, record) end end end event_streams = target_input_driver.event_streams - assert_true event_streams[0].is_a?(Fluent::CompressedMessagePackEventStream) + assert_true event_streams[0][1].is_a?(Fluent::CompressedMessagePackEventStream) - emits = target_input_driver.emits - assert_equal ['test', time, records[0]], emits[0] - assert_equal ['test', time, records[1]], emits[1] + events = target_input_driver.events + assert_equal ['test', time, records[0]], events[0] + assert_equal ['test', time, records[1]], events[1] end def test_send_to_a_node_supporting_responses @@ -276,27 +268,24 @@ def test_send_to_a_node_supporting_responses d = create_driver(CONFIG + %[flush_interval 1s]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - d.register_run_post_condition do - d.instance.responses.length == 1 - end - target_input_driver.run do - d.run do + d.end_if{ d.instance.responses.length == 1 } + d.run(default_tag: 'test') do records.each do |record| - d.emit record, time + d.feed(time, record) end end end - emits = target_input_driver.emits - assert_equal ['test', time, records[0]], emits[0] - assert_equal ['test', time, records[1]], emits[1] + events = target_input_driver.events + assert_equal ['test', time, records[0]], events[0] + assert_equal ['test', time, records[1]], events[1] assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned assert_empty d.instance.exceptions @@ -307,27 +296,24 @@ def test_send_to_a_node_not_supporting_responses d = create_driver(CONFIG + %[flush_interval 1s]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - d.register_run_post_condition do - d.instance.responses.length == 1 - end - target_input_driver.run do - d.run do + d.end_if{ d.instance.responses.length == 1 } + d.run(default_tag: 'test') do records.each do |record| - d.emit record, time + d.feed(time, record) end end end - emits = target_input_driver.emits - assert_equal ['test', time, records[0]], emits[0] - assert_equal ['test', time, records[1]], emits[1] + events = target_input_driver.events + assert_equal ['test', time, records[0]], events[0] + assert_equal ['test', time, records[1]], events[1] assert_equal [nil], d.instance.responses # not attempt to receive responses, so nil is returned assert_empty d.instance.exceptions @@ -342,27 +328,24 @@ def test_require_a_node_supporting_responses_to_respond_with_ack ack_response_timeout 1s ]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - d.register_run_post_condition do - d.instance.responses.length == 1 - end - target_input_driver.run do - d.run do + d.end_if{ d.instance.responses.length == 1 } + d.run(default_tag: 'test') do records.each do |record| - d.emit record, time + d.feed(time, record) end end end - emits = target_input_driver.emits - assert_equal ['test', time, records[0]], emits[0] - assert_equal ['test', time, records[1]], emits[1] + events = target_input_driver.events + assert_equal ['test', time, records[0]], events[0] + assert_equal ['test', time, records[1]], events[1] assert_equal 1, d.instance.responses.length assert d.instance.responses[0].has_key?('ack') @@ -378,30 +361,26 @@ def test_require_a_node_not_supporting_responses_to_respond_with_ack ack_response_timeout 1s ]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - d.register_run_post_condition do - d.instance.responses.length == 1 - end - d.run_timeout = 2 - - assert_raise Fluent::ForwardOutputACKTimeoutError do + assert_raise Fluent::Plugin::ForwardOutput::ACKTimeoutError do target_input_driver.run do - d.run do + d.end_if{ d.instance.responses.length == 1 } + d.run(default_tag: 'test', timeout: 2, wait_flush_completion: false) do records.each do |record| - d.emit record, time + d.feed(time, record) end end end end - emits = target_input_driver.emits - assert_equal ['test', time, records[0]], emits[0] - assert_equal ['test', time, records[1]], emits[1] + events = target_input_driver.events + assert_equal ['test', time, records[0]], events[0] + assert_equal ['test', time, records[1]], events[1] node = d.instance.nodes.first assert_equal false, node.available # node is regarded as unavailable when timeout @@ -421,30 +400,26 @@ def test_require_a_node_not_supporting_responses_2_to_respond_with_ack ack_response_timeout 5s ]) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - d.register_run_post_condition do - d.instance.responses.length == 1 - end - d.run_timeout = 2 - - assert_raise Fluent::ForwardOutputACKTimeoutError do + assert_raise Fluent::Plugin::ForwardOutput::ACKTimeoutError do target_input_driver.run do - d.run do + d.end_if{ d.instance.responses.length == 1 } + d.run(default_tag: 'test', timeout: 2, wait_flush_completion: false) do records.each do |record| - d.emit record, time + d.feed(time, record) end end end end - emits = target_input_driver.emits - assert_equal ['test', time, records[0]], emits[0] - assert_equal ['test', time, records[1]], emits[1] + events = target_input_driver.events + assert_equal ['test', time, records[0]], events[0] + assert_equal ['test', time, records[1]], events[1] assert_equal 0, d.instance.responses.size assert_equal 1, d.instance.exceptions.size # send_data() fails and to be retried @@ -480,32 +455,24 @@ def test_authentication_with_shared_key ] d = create_driver(output_conf) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - target_input_driver.run do - sleep 0.1 until target_input_driver.instance.instance_eval{ @thread } && target_input_driver.instance.instance_eval{ @thread }.status - - d.run do + target_input_driver.run(expect_records: 2, timeout: 15) do + d.run(default_tag: 'test') do records.each do |record| - d.emit(record, time) - end - - # run_post_condition of Fluent::Test::*Driver are buggy: - t = Time.now - while Time.now < t + 15 && target_input_driver.emits.size < 2 - sleep 0.1 + d.feed(time, record) end end end - emits = target_input_driver.emits - assert{ emits != [] } - assert_equal(['test', time, records[0]], emits[0]) - assert_equal(['test', time, records[1]], emits[1]) + events = target_input_driver.events + assert{ events != [] } + assert_equal(['test', time, records[0]], events[0]) + assert_equal(['test', time, records[1]], events[1]) end def test_authentication_with_user_auth @@ -542,39 +509,31 @@ def test_authentication_with_user_auth ] d = create_driver(output_conf) - time = Time.parse("2011-01-02 13:14:15 UTC").to_i + time = event_time("2011-01-02 13:14:15 UTC") records = [ {"a" => 1}, {"a" => 2} ] - target_input_driver.run do - sleep 0.1 until target_input_driver.instance.instance_eval{ @thread } && target_input_driver.instance.instance_eval{ @thread }.status - - d.run do + target_input_driver.run(expect_records: 2, timeout: 15) do + d.run(default_tag: 'test') do records.each do |record| - d.emit(record, time) - end - - # run_post_condition of Fluent::Test::*Driver are buggy: - t = Time.now - while Time.now < t + 15 && target_input_driver.emits.size < 2 - sleep 0.1 + d.feed(time, record) end end end - emits = target_input_driver.emits - assert{ emits != [] } - assert_equal(['test', time, records[0]], emits[0]) - assert_equal(['test', time, records[1]], emits[1]) + events = target_input_driver.events + assert{ events != [] } + assert_equal(['test', time, records[0]], events[0]) + assert_equal(['test', time, records[1]], events[1]) end def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARGET_CONFIG) require 'fluent/plugin/in_forward' # TODO: Support actual TCP heartbeat test - Fluent::Test::InputTestDriver.new(Fluent::ForwardInput) { + Fluent::Test::Driver::Input.new(Fluent::Plugin::ForwardInput) { if response_stub.nil? # do nothing because in_forward responds for ack option in default else @@ -588,7 +547,7 @@ def create_target_input_driver(response_stub: nil, disconnect: false, conf: TARG def test_heartbeat_type_none d = create_driver(CONFIG + "\nheartbeat_type none") node = d.instance.nodes.first - assert_equal Fluent::ForwardOutput::NoneHeartbeatNode, node.class + assert_equal Fluent::Plugin::ForwardOutput::NoneHeartbeatNode, node.class d.instance.start assert_nil d.instance.instance_variable_get(:@loop) # no HeartbeatHandler, or HeartbeatRequestTimer @@ -607,8 +566,8 @@ def test_heartbeat_type_udp timer = d.instance.instance_variable_get(:@timer) hb = d.instance.instance_variable_get(:@hb) assert_equal UDPSocket, usock.class - assert_equal Fluent::ForwardOutput::HeartbeatRequestTimer, timer.class - assert_equal Fluent::ForwardOutput::HeartbeatHandler, hb.class + assert_equal Fluent::Plugin::ForwardOutput::HeartbeatRequestTimer, timer.class + assert_equal Fluent::Plugin::ForwardOutput::HeartbeatHandler, hb.class mock(usock).send("\0", 0, Socket.pack_sockaddr_in(TARGET_PORT, '127.0.0.1')).once timer.disable # call send_heartbeat at just once