From b1afd1c6df0bf08d833c152020bbb64d67286e6b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Tue, 24 May 2016 00:47:13 +0900 Subject: [PATCH] writing --- lib/fluent/plugin/in_dummy.rb | 35 +-- lib/fluent/plugin/out_buffered_null.rb | 59 ++++++ lib/fluent/plugin/out_stdout.rb | 12 +- lib/fluent/plugin/output.rb | 18 +- lib/fluent/plugin/storage_local.rb | 8 +- lib/fluent/system_config.rb | 4 +- lib/fluent/test/driver/base.rb | 223 ++++++++++++++++++++ lib/fluent/test/driver/event_feeder.rb | 98 +++++++++ lib/fluent/test/driver/filter.rb | 35 +++ lib/fluent/test/driver/input.rb | 31 +++ lib/fluent/test/driver/output.rb | 53 +++++ lib/fluent/test/driver/test_event_router.rb | 45 ++++ test/helper.rb | 29 +++ test/plugin/test_in_dummy.rb | 20 +- test/plugin/test_out_buffered_null.rb | 79 +++++++ test/plugin/test_out_stdout.rb | 34 +-- 16 files changed, 727 insertions(+), 56 deletions(-) create mode 100644 lib/fluent/plugin/out_buffered_null.rb create mode 100644 lib/fluent/test/driver/base.rb create mode 100644 lib/fluent/test/driver/event_feeder.rb create mode 100644 lib/fluent/test/driver/filter.rb create mode 100644 lib/fluent/test/driver/input.rb create mode 100644 lib/fluent/test/driver/output.rb create mode 100644 lib/fluent/test/driver/test_event_router.rb create mode 100644 test/plugin/test_out_buffered_null.rb diff --git a/lib/fluent/plugin/in_dummy.rb b/lib/fluent/plugin/in_dummy.rb index 914b571ae9..5f1bb71c4b 100644 --- a/lib/fluent/plugin/in_dummy.rb +++ b/lib/fluent/plugin/in_dummy.rb @@ -16,13 +16,15 @@ require 'json' -require 'fluent/input' +require 'fluent/plugin/input' require 'fluent/config/error' -module Fluent +module Fluent::Plugin class DummyInput < Input Fluent::Plugin.register_input('dummy', self) + helpers :thread, :storage + BIN_NUM = 10 desc "The value is the tag assigned to the generated events." @@ -48,37 +50,39 @@ class DummyInput < Input dummy end - def configure(conf) + def initialize super + @storage = nil + end - @increment_value = 0 + def configure(conf) + super @dummy_index = 0 end def start super - @running = true - @thread = Thread.new(&method(:run)) - end - def shutdown - @running = false - @thread.join - super + @storage = storage_create(type: 'local') + if @auto_increment_key && !@storage.get(:auto_increment_value) + @storage.put(:auto_increment_value, -1) + end + + thread_create(:dummy_input, &method(:run)) end def run batch_num = (@rate / BIN_NUM).to_i residual_num = (@rate % BIN_NUM) - while @running + while thread_current_running? current_time = Time.now.to_i BIN_NUM.times do - break unless (@running && Time.now.to_i <= current_time) + break unless (thread_current_running? && Time.now.to_i <= current_time) wait(0.1) { emit(batch_num) } end emit(residual_num) # wait for next second - while @running && Time.now.to_i <= current_time + while thread_current_running? && Time.now.to_i <= current_time sleep 0.01 end end @@ -97,8 +101,7 @@ def generate @dummy_index += 1 if @auto_increment_key d = d.dup - d[@auto_increment_key] = @increment_value - @increment_value += 1 + d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 } end d end diff --git a/lib/fluent/plugin/out_buffered_null.rb b/lib/fluent/plugin/out_buffered_null.rb new file mode 100644 index 0000000000..b3a53bdb8b --- /dev/null +++ b/lib/fluent/plugin/out_buffered_null.rb @@ -0,0 +1,59 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/output' + +module Fluent::Plugin + class BufferedNullOutput < Output + # This plugin is for tests of buffer plugins + + Fluent::Plugin.register_output('buffered_null', self) + + config_section :buffer do + config_set_default :chunk_keys, ['tag'] + config_set_default :flush_at_shutdown, true + config_set_default :chunk_bytes_limit, 10 * 1024 + end + + attr_accessor :feed_proc, :delayed + + def initialize + super + @delayed = false + @feed_proc = nil + end + + def prefer_delayed_commit + @delayed + end + + def write(chunk) + if @feed_proc + @feed_proc.call(chunk) + else + # ignore chunk.read + end + end + + def try_write(chunk) + if @feed_proc + @feed_proc.call(chunk) + else + # ignore chunk.read + end + end + end +end diff --git a/lib/fluent/plugin/out_stdout.rb b/lib/fluent/plugin/out_stdout.rb index 47968d3481..00a456cba9 100644 --- a/lib/fluent/plugin/out_stdout.rb +++ b/lib/fluent/plugin/out_stdout.rb @@ -14,28 +14,26 @@ # limitations under the License. # -require 'fluent/output' +require 'fluent/plugin/output' -module Fluent +module Fluent::Plugin class StdoutOutput < Output - Plugin.register_output('stdout', self) + Fluent::Plugin.register_output('stdout', self) desc 'Output format.(json,hash)' config_param :output_type, default: 'json' def configure(conf) super - @formatter = Plugin.new_formatter(@output_type) + @formatter = Fluent::Plugin.new_formatter(@output_type) @formatter.configure(conf) end - def emit(tag, es, chain) + def process(tag, es) es.each {|time,record| $log.write "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n" } $log.flush - - chain.next end end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 2944cd88c4..c51e6ab561 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -141,7 +141,10 @@ def expired? # for tests attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_time, :chunk_key_tag - attr_accessor :output_enqueue_thread_waiting + attr_accessor :output_enqueue_thread_waiting, :in_tests + + # output_enqueue_thread_waiting: for test of output.rb itself + # in_tests: for tests of plugins with test drivers def initialize super @@ -149,6 +152,7 @@ def initialize @buffering = false @delayed_commit = false @as_secondary = false + @in_tests = false @primary_instance = nil # TODO: well organized counters @@ -172,6 +176,10 @@ def initialize @buffer = nil @secondary = nil + @retry = nil + @dequeued_chunks = nil + @output_flush_threads = nil + @simple_chunking = nil @chunk_keys = @chunk_key_time = @chunk_key_tag = nil @flush_mode = nil @@ -231,7 +239,7 @@ def configure(conf) if (@buffering || @buffering.nil?) && !@as_secondary # When @buffering.nil?, @buffer_config was initialized with default value for all parameters. # If so, this configuration MUST success. - @chunk_keys = @buffer_config.chunk_keys + @chunk_keys = @buffer_config.chunk_keys.dup @chunk_key_time = !!@chunk_keys.delete('time') @chunk_key_tag = !!@chunk_keys.delete('tag') if @chunk_keys.any?{ |key| key !~ CHUNK_KEY_PATTERN } @@ -358,8 +366,10 @@ def start end @output_flush_thread_current_position = 0 - if @flush_mode == :fast || @chunk_key_time - thread_create(:enqueue_thread, &method(:enqueue_thread_run)) + unless @in_tests + if @flush_mode == :fast || @chunk_key_time + thread_create(:enqueue_thread, &method(:enqueue_thread_run)) + end end end @secondary.start if @secondary diff --git a/lib/fluent/plugin/storage_local.rb b/lib/fluent/plugin/storage_local.rb index 8c21e84822..fd88f318df 100644 --- a/lib/fluent/plugin/storage_local.rb +++ b/lib/fluent/plugin/storage_local.rb @@ -27,10 +27,14 @@ def configure(conf) @on_memory = false if !@path && !@_plugin_id_configured - if @autosave || @persistent + if @persistent raise Fluent::ConfigError, "Plugin @id or path for required to save data" else - log.info "both of Plugin @id and path for are not specified. Using on-memory store." + if @autosave + log.warn "both of Plugin @id and path for are not specified. Using on-memory store." + else + log.info "both of Plugin @id and path for are not specified. Using on-memory store." + end @on_memory = true end elsif @path diff --git a/lib/fluent/system_config.rb b/lib/fluent/system_config.rb index 7ee516dbd7..aa6937bc4c 100644 --- a/lib/fluent/system_config.rb +++ b/lib/fluent/system_config.rb @@ -51,7 +51,7 @@ def self.blank_system_config end def self.overwrite_system_config(hash) - older = $_system_config || nil + older = defined?($_system_config) ? $_system_config : nil begin $_system_config = SystemConfig.new(Fluent::Config::Element.new('system', '', hash, [])) yield @@ -111,7 +111,7 @@ def system_config def system_config_override(opts={}) require 'fluent/engine' if !instance_variable_defined?("@_system_config") || @_system_config.nil? - @_system_config = ($_system_config || Fluent::Engine.system_config).dup + @_system_config = (defined?($_system_config) ? $_system_config : Fluent::Engine.system_config).dup end opts.each_pair do |key, value| @_system_config.send(:"#{key.to_s}=", value) diff --git a/lib/fluent/test/driver/base.rb b/lib/fluent/test/driver/base.rb new file mode 100644 index 0000000000..1958f99537 --- /dev/null +++ b/lib/fluent/test/driver/base.rb @@ -0,0 +1,223 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/test/driver/test_event_router' + +require 'timeout' + +module Fluent + module Test + module Driver + class Base + def initialize(klass, opts: {}, &block) + if klass.is_a?(Class) + if block + # Create new class for test w/ overwritten methods + # klass.dup is worse because its ancestors does NOT include original class name + klass = Class.new(klass) + klass.module_eval(&block) + end + @instance = klass.new + else + @instance = klass + end + if opts + @instance.system_config_override(opts) + end + @instance.log = TestLogger.new + @logs = @instance.log.out.logs + + @run_post_conditions = [] + @run_breaking_conditions = [] + + @broken = false + + @event_streams = nil + @error_events = nil + end + + attr_reader :instance, :logs + + def configure(conf, syntax: :v1) + if conf.is_a?(Fluent::Config::Element) + @config = conf + else + @config = Config.parse(conf, "(test)", "(test_dir)", syntax: syntax) + end + + if @instance.respond_to?(:router=) + @event_streams = [] + @error_events = [] + + driver = self + mojule = Module.new do + define_method(:event_emitter_router) do |label_name| + TestEventRouter.new(driver) + end + end + @instance.singleton_class.module_eval do + prepend mojule + end + end + + @instance.configure(@config) + self + end + + def end_if(&block) + raise ArgumentError, "block is not given" unless block_given? + @run_post_conditions << block + end + + def break_if(&block) + raise ArgumentError, "block is not given" unless block_given? + @run_breaking_conditions << block + end + + def broken? + @broken + end + + Emit = Struct.new(:tag, :es) + ErrorEvent = Struct.new(:tag, :time, :record, :error) + + # via TestEventRouter + def emit_event_stream(tag, es) + @event_streams << Emit.new(tag, es) + end + + def emit_error_event(tag, time, record, error) + @error_events << ErrorEvent.new(tag, time, record, error) + end + + def events(tag: nil) + selected = @event_streams.select{|e| tag.nil? ? true : e.tag == tag } + if block_given? + selected.each do |e| + e.es.each do |time, record| + yield e.tag, time, record + end + end + else + list = [] + selected.each do |e| + e.es.each do |time, record| + list << [e.tag, time, record] + end + end + list + end + end + + def error_events(tag: nil) + selected = @error_events.select{|e| tag.nil? ? true : e.tag == tag } + if block_given? + selected.each do |e| + yield e.tag, e.time, e.record, e.error + end + else + selected.map{|e| [e.tag, e.time, e.record, e.error] } + end + end + + def run(expect_emits: nil, expect_records: nil, timeout: nil, start: true, shutdown: true, &block) + instance_start if start + + begin + run_actual(expect_emits: expect_emits, expect_records: expect_records, timeout: timeout, &block) + ensure + instance_shutdown if shutdown + end + end + + def instance_start + @instance.start unless @instance.started? + end + + def instance_shutdown + @instance.stop unless @instance.stopped? + @instance.before_shutdown? unless @instance.before_shutdown? + @instance.shutdown unless @instance.shutdown? + @instance.after_shutdown? unless @instance.after_shutdown? + @instance.close unless @instance.closed? + @instance.terminate unless @instance.terminated? + end + + def run_actual(expect_emits: nil, expect_records: nil, timeout: nil, &block) + if @instance.respond_to?(:_threads) + until @instance._threads.values.all?(&:alive?) + sleep 0.01 + end + end + + if @instance.respond_to?(:event_loop_running?) + until @instance.event_loop_running? + sleep 0.01 + end + end + + if expect_emits + @run_post_conditions << ->(){ @emit_streams.size >= expect_emits } + end + if expect_records + @run_post_conditions << ->(){ @emit_streams.reduce(0){|a, e| a + e.es.size } >= expected_records } + end + if timeout + stop_at = Time.now + timeout + @run_breaking_conditions << ->(){ Time.now >= stop_at } + end + + if !block_given? && @run_post_conditions.empty? && @run_breaking_conditions.empty? + raise ArgumentError, "no stop conditions nor block specified" + end + + if !block_given? + block = ->(){ sleep(0.1) until stop? } + end + + if timeout + begin + Timeout.timeout(timeout * 1.1) do |sec| + block.call + end + rescue Timeout::Error + @broken = true + end + else + block.call + end + end + + def stop? + # Should stop running if post conditions are not registered. + return true unless @run_post_conditions + + # Should stop running if all of the post conditions are true. + return true if @run_post_conditions.all? {|proc| proc.call } + + # Should stop running if some of the breaking conditions is true. + # In this case, some post conditions may be not true. + if @run_breaking_conditions.any? {|proc| proc.call } + @broken = true + return true + end + + false + end + end + end + end +end diff --git a/lib/fluent/test/driver/event_feeder.rb b/lib/fluent/test/driver/event_feeder.rb new file mode 100644 index 0000000000..46d5f67ada --- /dev/null +++ b/lib/fluent/test/driver/event_feeder.rb @@ -0,0 +1,98 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/event' +require 'fluent/time' + +module Fluent + module Test + module Driver + module EventFeeder + def initialize(klass, opts: {}, &block) + super + @default_tag = nil + @feed_method = nil + end + + def run(default_tag: nil, **kwargs, &block) + @feed_method = if @instance.respond_to?(:filter_stream) + :filter_stream + else + :emit_events + end + if default_tag + @default_tag = default_tag + end + super(**kwargs, &block) + end + + def feed_to_plugin(tag, es) + @instance.__send__(@feed_method, tag, es) + end + + # d.run do + # d.feed('tag', time, {record}) + # d.feed('tag', [ [time, {record}], [time, {record}], ... ]) + # d.feed('tag', es) + # end + # d.run(default_tag: 'tag') do + # d.feed({record}) + # d.feed(time, {record}) + # d.feed([ [time, {record}], [time, {record}], ... ]) + # d.feed(es) + # end + def feed(*args) + case args.size + when 1 + raise ArgumentError, "tag not specified without default_tag" unless @default_tag + case args.first + when Fluent::EventStream + feed_to_plugin(@default_tag, args.first) + when Array + feed_to_plugin(@default_tag, ArrayEventStream.new(args.first)) + when Hash + record = args.first + time = Fluent::EventTime.now + feed_to_plugin(@default_tag, OneEventStream.new(time, record)) + else + raise ArgumentError, "unexpected events object (neither event(Hash), EventStream nor Array): #{args.first.class}" + end + when 2 + if args[0].is_a?(String) && (args[1].is_a?(Array) || args[1].is_a?(Fluent::EventStream)) + tag, es = args + es = ArrayEventStream.new(es) if es.is_a?(Array) + feed_to_plugin(tag, es) + elsif @default_tag && (args[0].is_a?(Fluent::EventTime) || args[0].is_a?(Integer)) && args[1].is_a?(Hash) + time, record = args + feed_to_plugin(@default_tag, OneEventStream.new(time, record)) + else + raise ArgumentError, "unexpected values of argument: #{args[0].class}, #{args[1].class}" + end + when 3 + tag, time, record = args + if tag.is_a?(String) && (time.is_a?(Fluent::EventTime) || time.is_a?(Integer)) && record.is_a?(Hash) + feed_to_plugin(tag, OneEventStream.new(time, record)) + else + raise ArgumentError, "unexpected values of argument: #{tag.class}, #{time.class}, #{record.class}" + end + else + raise ArgumentError, "unexpected number of arguments: #{args}" + end + end + end + end + end +end diff --git a/lib/fluent/test/driver/filter.rb b/lib/fluent/test/driver/filter.rb new file mode 100644 index 0000000000..8f9ef831c7 --- /dev/null +++ b/lib/fluent/test/driver/filter.rb @@ -0,0 +1,35 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/test/driver/base' +require 'fluent/test/driver/event_feeder' + +require 'fluent/plugin/filter' + +module Fluent + module Test + module Driver + class Filter < Base + include EventFeeder + + def initialize(klass, opts: {}, &block) + super + raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Filter" unless @instance.is_a? Fluent::Plugin::Filter + end + end + end + end +end diff --git a/lib/fluent/test/driver/input.rb b/lib/fluent/test/driver/input.rb new file mode 100644 index 0000000000..0b831d5154 --- /dev/null +++ b/lib/fluent/test/driver/input.rb @@ -0,0 +1,31 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/test/driver/base' +require 'fluent/plugin/input' + +module Fluent + module Test + module Driver + class Input < Base + def initialize(klass, opts: {}, &block) + super + raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Input" unless @instance.is_a? Fluent::Plugin::Input + end + end + end + end +end diff --git a/lib/fluent/test/driver/output.rb b/lib/fluent/test/driver/output.rb new file mode 100644 index 0000000000..544139de04 --- /dev/null +++ b/lib/fluent/test/driver/output.rb @@ -0,0 +1,53 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/test/driver/base' +require 'fluent/test/driver/event_feeder' + +require 'fluent/plugin/output' + +module Fluent + module Test + module Driver + class Output < Base + include EventFeeder + + def initialize(klass, opts: {}, &block) + super + raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Output" unless @instance.is_a? Fluent::Plugin::Output + @instance.in_tests = true + @flush_buffer_at_cleanup = nil + end + + def run(flush: false, **kwargs, &block) + @flush_buffer_at_cleanup = flush + super(**kwargs, &block) + end + + def run_actual(**kwargs, &block) + super(**kwargs, &block) + if @flush_buffer_at_cleanup + @instance.force_flush + end + end + + def flush + @instance.force_flush + end + end + end + end +end diff --git a/lib/fluent/test/driver/test_event_router.rb b/lib/fluent/test/driver/test_event_router.rb new file mode 100644 index 0000000000..4379149849 --- /dev/null +++ b/lib/fluent/test/driver/test_event_router.rb @@ -0,0 +1,45 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/event' + +module Fluent + module Test + module Driver + class TestEventRouter + def initialize(driver) + @driver = driver + end + + def emit(tag, time, record) + @driver.emit_event_stream(tag, OneEventStream.new(time, record)) + end + + def emit_array(tag, array) + @driver.emit_event_stream(tag, ArrayEventStream.new(array)) + end + + def emit_stream(tag, es) + @driver.emit_event_stream(tag, es) + end + + def emit_error_event(tag, time, record, error) + @driver.emit_error_event(tag, time, record, error) + end + end + end + end +end diff --git a/test/helper.rb b/test/helper.rb index c132203a4d..62b1736331 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -46,6 +46,7 @@ def to_masked_element require 'fluent/log' require 'fluent/plugin_id' require 'fluent/plugin_helper' +require 'fluent/msgpack_factory' require 'fluent/time' require 'serverengine' @@ -78,6 +79,19 @@ def event_time(str=nil) end end +def msgpack(type) + case type + when :factory + Fluent::MessagePackFactory.factory + when :packer + Fluent::MessagePackFactory.packer + when :unpacker + Fluent::MessagePackFactory.unpacker + else + raise ArgumentError, "unknown msgpack object type '#{type}'" + end +end + def unused_port(num = 1) ports = [] sockets = [] @@ -94,6 +108,21 @@ def unused_port(num = 1) end end +def waiting(seconds, logs: nil, plugin: nil) + begin + Timeout.timeout(seconds) do + yield + end + rescue Timeout::Error + if logs + STDERR.print(*logs) + elsif plugin + STDERR.print(*plugin.log.out.logs) + end + raise + end +end + def ipv6_enabled? require 'socket' diff --git a/test/plugin/test_in_dummy.rb b/test/plugin/test_in_dummy.rb index 4fc4f405d4..ce8172ca7f 100644 --- a/test/plugin/test_in_dummy.rb +++ b/test/plugin/test_in_dummy.rb @@ -1,5 +1,5 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/input' require 'fluent/plugin/in_dummy' class DummyTest < Test::Unit::TestCase @@ -8,7 +8,7 @@ def setup end def create_driver(conf) - Fluent::Test::InputTestDriver.new(Fluent::DummyInput).configure(conf) + Fluent::Test::Driver::Input.new(Fluent::Plugin::DummyInput).configure(conf) end sub_test_case 'configure' do @@ -71,11 +71,9 @@ def create_driver(conf) test 'simple' do d = create_driver(config) - d.run { - # d.run sleeps 0.5 sec - } - emits = d.emits - emits.each do |tag, time, record| + d.run(timeout: 0.5) + + d.events.each do |tag, time, record| assert_equal("dummy", tag) assert_equal({"foo"=>"bar"}, record) assert(time.is_a?(Fluent::EventTime)) @@ -84,11 +82,9 @@ def create_driver(conf) test 'with auto_increment_key' do d = create_driver(config + %[auto_increment_key id]) - d.run { - # d.run sleeps 0.5 sec - } - emits = d.emits - emits.each_with_index do |(tag, _time, record), i| + d.run(timeout: 0.5) + + d.events.each_with_index do |(tag, _time, record), i| assert_equal("dummy", tag) assert_equal({"foo"=>"bar", "id"=>i}, record) end diff --git a/test/plugin/test_out_buffered_null.rb b/test/plugin/test_out_buffered_null.rb new file mode 100644 index 0000000000..1df7468729 --- /dev/null +++ b/test/plugin/test_out_buffered_null.rb @@ -0,0 +1,79 @@ +require_relative '../helper' +require 'fluent/test/driver/output' +require 'fluent/plugin/out_buffered_null' + +class BufferedNullOutputTestCase < Test::Unit::TestCase + sub_test_case 'BufferedNullOutput' do + test 'default chunk limit size is 100' do + d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('') + assert_equal 10 * 1024, d.instance.buffer_config.chunk_bytes_limit + assert d.instance.buffer_config.flush_at_shutdown + assert_equal ['tag'], d.instance.buffer_config.chunk_keys + assert d.instance.chunk_key_tag + assert !d.instance.chunk_key_time + assert_equal [], d.instance.chunk_keys + end + + test 'writes standard formattted chunks' do + d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('') + t = event_time("2016-05-23 00:22:13 -0800") + d.run(default_tag: 'test', flush: true) do + d.feed(t, {"message" => "null null null"}) + d.feed(t, {"message" => "null null"}) + d.feed(t, {"message" => "null"}) + end + + assert_equal 3, d.instance.emit_count + assert_equal 3, d.instance.emit_records + end + + test 'check for chunk passed to #write' do + d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('') + data = [] + d.instance.feed_proc = ->(chunk){ data << [chunk.unique_id, chunk.metadata.tag, chunk.read] } + + t = event_time("2016-05-23 00:22:13 -0800") + d.run(default_tag: 'test', flush: true) do + d.feed(t, {"message" => "null null null"}) + d.feed(t, {"message" => "null null"}) + d.feed(t, {"message" => "null"}) + end + + assert_equal 1, data.size + chunk_id, tag, binary = data.first + events = [] + Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj } + assert_equal 'test', tag + assert_equal [ [t, {"message" => "null null null"}], [t, {"message" => "null null"}], [t, {"message" => "null"}] ], events + end + + test 'check for chunk passed to #try_write' do + d = Fluent::Test::Driver::Output.new(Fluent::Plugin::BufferedNullOutput).configure('') + data = [] + d.instance.feed_proc = ->(chunk){ data << [chunk.unique_id, chunk.metadata.tag, chunk.read] } + d.instance.delayed = true + + t = event_time("2016-05-23 00:22:13 -0800") + d.run(default_tag: 'test', flush: true, shutdown: false) do + d.feed(t, {"message" => "null null null"}) + d.feed(t, {"message" => "null null"}) + d.feed(t, {"message" => "null"}) + end + + assert_equal 1, data.size + chunk_id, tag, binary = data.first + events = [] + Fluent::MessagePackFactory.unpacker.feed_each(binary){|obj| events << obj } + assert_equal 'test', tag + assert_equal [ [t, {"message" => "null null null"}], [t, {"message" => "null null"}], [t, {"message" => "null"}] ], events + + assert_equal [chunk_id], d.instance.buffer.dequeued.keys + + d.instance.commit_write(chunk_id) + + assert_equal [], d.instance.buffer.dequeued.keys + + d.instance_shutdown + end + end +end diff --git a/test/plugin/test_out_stdout.rb b/test/plugin/test_out_stdout.rb index 49e0567057..4701493d8e 100644 --- a/test/plugin/test_out_stdout.rb +++ b/test/plugin/test_out_stdout.rb @@ -1,5 +1,5 @@ require_relative '../helper' -require 'fluent/test' +require 'fluent/test/driver/output' require 'fluent/plugin/out_stdout' class StdoutOutputTest < Test::Unit::TestCase @@ -11,7 +11,7 @@ def setup ] def create_driver(conf = CONFIG) - Fluent::Test::OutputTestDriver.new(Fluent::StdoutOutput).configure(conf) + Fluent::Test::Driver::Output.new(Fluent::Plugin::StdoutOutput).configure(conf) end def test_configure @@ -34,28 +34,36 @@ def test_configure_output_type data('oj' => 'oj', 'yajl' => 'yajl') def test_emit_json(data) d = create_driver(CONFIG + "\noutput_type json\njson_parser #{data}") - time = Time.now - out = capture_log { d.emit({'test' => 'test'}, Fluent::EventTime.from_time(time)) } - assert_equal "#{time.localtime} test: {\"test\":\"test\"}\n", out + time = event_time() + out = capture_log do + d.run(default_tag: 'test') do + d.feed(time, {'test' => 'test'}) + end + end + assert_equal "#{Time.at(time).localtime} test: {\"test\":\"test\"}\n", out if data == 'yajl' # NOTE: Float::NAN is not jsonable - assert_raise(Yajl::EncodeError) { d.emit({'test' => Float::NAN}, time) } + assert_raise(Yajl::EncodeError) { d.feed('test', time, {'test' => Float::NAN}) } else - out = capture_log { d.emit({'test' => Float::NAN}, Fluent::EventTime.from_time(time)) } - assert_equal "#{time.localtime} test: {\"test\":NaN}\n", out + out = capture_log { d.feed('test', time, {'test' => Float::NAN}) } + assert_equal "#{Time.at(time).localtime} test: {\"test\":NaN}\n", out end end def test_emit_hash d = create_driver(CONFIG + "\noutput_type hash") - time = Time.now - out = capture_log { d.emit({'test' => 'test'}, time) } - assert_equal "#{time.localtime} test: {\"test\"=>\"test\"}\n", out + time = event_time() + out = capture_log do + d.run(default_tag: 'test') do + d.feed(time, {'test' => 'test'}) + end + end + assert_equal "#{Time.at(time).localtime} test: {\"test\"=>\"test\"}\n", out # NOTE: Float::NAN is not jsonable, but hash string can output it. - out = capture_log { d.emit({'test' => Float::NAN}, Fluent::EventTime.from_time(time)) } - assert_equal "#{time.localtime} test: {\"test\"=>NaN}\n", out + out = capture_log { d.feed('test', time, {'test' => Float::NAN}) } + assert_equal "#{Time.at(time).localtime} test: {\"test\"=>NaN}\n", out end private