From bef877776a59d1f41f03685224e39a966be0b857 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 3 Jun 2016 17:31:18 +0900 Subject: [PATCH 1/3] cleanup code and fix to call some missed lifecycle methods --- lib/fluent/plugin_helper/storage.rb | 114 ++++++++++++++++------------ test/plugin_helper/test_storage.rb | 87 ++++++++++++++++++--- 2 files changed, 141 insertions(+), 60 deletions(-) diff --git a/lib/fluent/plugin_helper/storage.rb b/lib/fluent/plugin_helper/storage.rb index 1551d5ae7a..f2ab9c799f 100644 --- a/lib/fluent/plugin_helper/storage.rb +++ b/lib/fluent/plugin_helper/storage.rb @@ -14,19 +14,16 @@ # limitations under the License. # -require 'monitor' require 'forwardable' require 'fluent/plugin' require 'fluent/plugin/storage' -require 'fluent/plugin_helper/thread' require 'fluent/plugin_helper/timer' require 'fluent/config/element' module Fluent module PluginHelper module Storage - include Fluent::PluginHelper::Thread include Fluent::PluginHelper::Timer StorageState = Struct.new(:storage, :running) @@ -35,38 +32,34 @@ def storage_create(usage: '', type: nil, conf: nil) s = @_storages[usage] if s && s.running return s.storage - elsif !s - unless type - raise ArgumentError, "BUG: type not specified without configuration" + elsif s + # storage is already created, but not loaded / started + else # !s + if !type + raise ArgumentError, "BUG: both type and conf are not specified" unless conf + raise Fluent::ConfigError, "@type is not specified for " unless conf['@type'] + type = conf['@type'] end storage = Plugin.new_storage(type, parent: self) config = case conf when Fluent::Config::Element conf when Hash + # in code, programmer may use symbols as keys, but Element needs strings conf = Hash[conf.map{|k,v| [k.to_s, v]}] - Fluent::Config::Element.new('storage', '', conf, []) + Fluent::Config::Element.new('storage', usage, conf, []) when nil - Fluent::Config::Element.new('storage', '', {}, []) + Fluent::Config::Element.new('storage', usage, {}, []) else raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'" end storage.configure(config) + if @_storages_started + storage.start + end s = @_storages[usage] = StorageState.new(wrap_instance(storage), false) end - s.storage.load - - if s.storage.autosave && !s.storage.persistent - timer_execute(:storage_autosave, s.storage.autosave_interval, repeat: true) do - begin - s.storage.save - rescue => e - log.error "plugin storage failed to save its data", usage: usage, type: type, error: e - end - end - end - s.running = true s.storage end @@ -84,8 +77,8 @@ def self.included(mod) def initialize super + @_storages_started = false @_storages = {} # usage => storage_state - @_storages_mutex = Mutex.new end def configure(conf) @@ -95,56 +88,75 @@ def configure(conf) if @_storages[section.usage] raise Fluent::ConfigError, "duplicated storages configured: #{section.usage}" end - config = conf.elements(name: 'storage', arg: section.usage).first - raise "storage section with argument '#{section.usage}' not found. it may be a bug." unless config - - storage = Plugin.new_storage(section[:@type]) - storage.owner = self - storage.configure(config) + storage = Plugin.new_storage(section[:@type], parent: self) + storage.configure(section.corresponding_config_element) @_storages[section.usage] = StorageState.new(wrap_instance(storage), false) end end - def stop + def start super - # timer stops automatically + + @_storages_started = true + @_storages.each_pair do |usage, s| + s.storage.start + s.storage.load + + if s.storage.autosave && !s.storage.persistent + timer_execute(:storage_autosave, s.storage.autosave_interval, repeat: true) do + begin + s.storage.save + rescue => e + log.error "plugin storage failed to save its data", usage: usage, type: type, error: e + end + end + end + s.running = true + end end - def shutdown + def storage_operate(method_name, &block) @_storages.each_pair do |usage, s| begin - s.storage.save if s.storage.save_at_shutdown + block.call(s) if block_given? + s.storage.send(method_name) rescue => e - log.error "unexpected error while saving data of plugin storages", usage: usage, storage: s.storage, error: e + log.error "unexpected error while #{method_name}", usage: usage, storage: s.storage, error: e end end + end + def stop super + # timer stops automatically in super + storage_operate(:stop) end - def close - @_storages.each_pair do |usage, s| - begin - s.storage.close - rescue => e - log.error "unexpected error while closing plugin storages", usage: usage, storage: s.storage, error: e - end - s.running = false + def before_shutdown + storage_operate(:before_shutdown) + super + end + + def shutdown + storage_operate(:shutdown) do |s| + s.storage.save if s.storage.save_at_shutdown end + super + end + + def after_shutdown + storage_operate(:after_shutdown) + super + end + def close + storage_operate(:close){|s| s.running = false } super end def terminate - @_storages.each_pair do |usage, s| - begin - s.storage.terminate - rescue => e - log.error "unexpected error while terminating plugin storages", usage: usage, storage: s.storage, error: e - end - end + storage_operate(:terminate) @_storages = {} - super end @@ -170,7 +182,8 @@ def initialize(storage) end def_delegators :@storage, :autosave_interval, :save_at_shutdown - def_delegators :@storage, :close, :terminate + def_delegators :@storage, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate + def_delegators :@storage, :started?, :stopped?, :before_shutdown?, :shutdown?, :after_shutdown?, :closed?, :terminated? def persistent_always? true @@ -257,7 +270,8 @@ def initialize(storage) def_delegators :@storage, :persistent, :autosave, :autosave_interval, :save_at_shutdown def_delegators :@storage, :persistent_always? - def_delegators :@storage, :close, :terminate + def_delegators :@storage, :start, :stop, :before_shutdown, :shutdown, :after_shutdown, :close, :terminate + def_delegators :@storage, :started?, :stopped?, :before_shutdown?, :shutdown?, :after_shutdown?, :closed?, :terminated? def synchronized? true diff --git a/test/plugin_helper/test_storage.rb b/test/plugin_helper/test_storage.rb index c6c9d5dac7..1aa211a53f 100644 --- a/test/plugin_helper/test_storage.rb +++ b/test/plugin_helper/test_storage.rb @@ -169,16 +169,6 @@ class Dummy < Fluent::Plugin::TestBase assert{ s.implementation.is_a? ExampleStorage } end - test 'raises exception for storage creation without explicit type specification' do - @d = d = Dummy.new - d.configure(config_element()) - d.start - - assert_raises ArgumentError do - d.storage_create(usage: 'mydata', conf: config_element('storage', 'mydata', {'@type' => 'example'})) - end - end - test 'creates 2 or more storage plugin instances' do @d = d = Dummy.new conf = config_element('ROOT', '', {}, [ @@ -369,6 +359,8 @@ class Dummy < Fluent::Plugin::TestBase assert_equal 1, s.implementation.load_times assert_equal 0, s.implementation.save_times + d.before_shutdown + d.shutdown assert_equal 1, s.implementation.load_times @@ -408,4 +400,79 @@ class Dummy < Fluent::Plugin::TestBase assert_equal 0, s.implementation.data.size assert_equal 0, s.implementation.saved.size end + + test 'calls lifecycle methods for all plugin instances via owner plugin' do + @d = d = Dummy.new + conf = config_element('ROOT', '', {}, [ config_element('storage', '', {'@type' => 'example'}), config_element('storage', 'e2', {'@type' => 'example'}) ]) + d.configure(conf) + d.start + + i1 = d.storage_create(usage: '') + i2 = d.storage_create(usage: 'e2') + i3 = d.storage_create(usage: 'e3', type: 'ex2') + + assert i1.started? + assert i2.started? + assert i3.started? + + assert !i1.stopped? + assert !i2.stopped? + assert !i3.stopped? + + d.stop + + assert i1.stopped? + assert i2.stopped? + assert i3.stopped? + + assert !i1.before_shutdown? + assert !i2.before_shutdown? + assert !i3.before_shutdown? + + d.before_shutdown + + assert i1.before_shutdown? + assert i2.before_shutdown? + assert i3.before_shutdown? + + assert !i1.shutdown? + assert !i2.shutdown? + assert !i3.shutdown? + + d.shutdown + + assert i1.shutdown? + assert i2.shutdown? + assert i3.shutdown? + + assert !i1.after_shutdown? + assert !i2.after_shutdown? + assert !i3.after_shutdown? + + d.after_shutdown + + assert i1.after_shutdown? + assert i2.after_shutdown? + assert i3.after_shutdown? + + assert !i1.closed? + assert !i2.closed? + assert !i3.closed? + + d.close + + assert i1.closed? + assert i2.closed? + assert i3.closed? + + assert !i1.terminated? + assert !i2.terminated? + assert !i3.terminated? + + d.terminate + + assert i1.terminated? + assert i2.terminated? + assert i3.terminated? + end end From 1f0ea73354cef2b861e3ccb77ec4f661ea39859c Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 3 Jun 2016 17:31:36 +0900 Subject: [PATCH 2/3] add plugin helpers for parser and formatter --- lib/fluent/plugin_helper.rb | 2 + lib/fluent/plugin_helper/formatter.rb | 141 ++++++++++++++++ lib/fluent/plugin_helper/parser.rb | 141 ++++++++++++++++ test/plugin_helper/test_formatter.rb | 212 ++++++++++++++++++++++++ test/plugin_helper/test_parser.rb | 223 ++++++++++++++++++++++++++ 5 files changed, 719 insertions(+) create mode 100644 lib/fluent/plugin_helper/formatter.rb create mode 100644 lib/fluent/plugin_helper/parser.rb create mode 100644 test/plugin_helper/test_formatter.rb create mode 100644 test/plugin_helper/test_parser.rb diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index e2acd32aff..38406aa2e4 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -20,6 +20,8 @@ require 'fluent/plugin_helper/timer' require 'fluent/plugin_helper/child_process' require 'fluent/plugin_helper/storage' +require 'fluent/plugin_helper/parser' +require 'fluent/plugin_helper/formatter' require 'fluent/plugin_helper/retry_state' require 'fluent/plugin_helper/compat_parameters' diff --git a/lib/fluent/plugin_helper/formatter.rb b/lib/fluent/plugin_helper/formatter.rb new file mode 100644 index 0000000000..74dd1bea3c --- /dev/null +++ b/lib/fluent/plugin_helper/formatter.rb @@ -0,0 +1,141 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'monitor' +require 'forwardable' + +require 'fluent/plugin' +require 'fluent/plugin/formatter' +require 'fluent/config/element' + +module Fluent + module PluginHelper + module Formatter + def formatter_create(usage: '', type: nil, conf: nil) + formatter = @_formatters[usage] + return formatter if formatter + + if !type + raise ArgumentError, "BUG: both type and conf are not specified" unless conf + raise Fluent::ConfigError, "@type is not specified for " unless conf['@type'] + type = conf['@type'] + end + formatter = Fluent::Plugin.new_formatter(type, parent: self) + config = case conf + when Fluent::Config::Element + conf + when Hash + # in code, programmer may use symbols as keys, but Element needs strings + conf = Hash[conf.map{|k,v| [k.to_s, v]}] + Fluent::Config::Element.new('format', usage, conf, []) + when nil + Fluent::Config::Element.new('format', usage, {}, []) + else + raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'" + end + formatter.configure(config) + if @_formatters_started + formatter.start + end + + @_formatters[usage] = formatter + formatter + end + + def self.included(mod) + mod.instance_eval do + # minimum section definition to instantiate formatter plugin instances + config_section :format, required: false, multi: true, param_name: :formatter_configs do + config_argument :usage, :string, default: '' + config_param :@type, :string + end + end + end + + attr_reader :_formatters # for tests + + def initialize + super + @_formatters_started = false + @_formatters = {} # usage => formatter + end + + def configure(conf) + super + + @formatter_configs.each do |section| + if @_formatters[section.usage] + raise Fluent::ConfigError, "duplicated formatter configured: #{section.usage}" + end + formatter = Plugin.new_formatter(section[:@type], parent: self) + formatter.configure(section.corresponding_config_element) + @_formatters[section.usage] = formatter + end + end + + def start + super + @_formatters_started = true + @_formatters.each_pair do |usage, formatter| + formatter.start + end + end + + def formatter_operate(method_name, &block) + @_formatters.each_pair do |usage, formatter| + begin + formatter.send(method_name) + block.call(formatter) if block_given? + rescue => e + log.error "unexpected error while #{method_name}", usage: usage, formatter: formatter, error: e + end + end + end + + def stop + super + # timer stops automatically in super + formatter_operate(:stop) + end + + def before_shutdown + formatter_operate(:before_shutdown) + super + end + + def shutdown + formatter_operate(:shutdown) + super + end + + def after_shutdown + formatter_operate(:after_shutdown) + super + end + + def close + formatter_operate(:close) + super + end + + def terminate + formatter_operate(:terminate) + @_formatters = {} + super + end + end + end +end diff --git a/lib/fluent/plugin_helper/parser.rb b/lib/fluent/plugin_helper/parser.rb new file mode 100644 index 0000000000..14725401f5 --- /dev/null +++ b/lib/fluent/plugin_helper/parser.rb @@ -0,0 +1,141 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'monitor' +require 'forwardable' + +require 'fluent/plugin' +require 'fluent/plugin/parser' +require 'fluent/config/element' + +module Fluent + module PluginHelper + module Parser + def parser_create(usage: '', type: nil, conf: nil) + parser = @_parsers[usage] + return parser if parser + + if !type + raise ArgumentError, "BUG: both type and conf are not specified" unless conf + raise Fluent::ConfigError, "@type is not specified for " unless conf['@type'] + type = conf['@type'] + end + parser = Fluent::Plugin.new_parser(type, parent: self) + config = case conf + when Fluent::Config::Element + conf + when Hash + # in code, programmer may use symbols as keys, but Element needs strings + conf = Hash[conf.map{|k,v| [k.to_s, v]}] + Fluent::Config::Element.new('parse', usage, conf, []) + when nil + Fluent::Config::Element.new('parse', usage, {}, []) + else + raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'" + end + parser.configure(config) + if @_parsers_started + parser.start + end + + @_parsers[usage] = parser + parser + end + + def self.included(mod) + mod.instance_eval do + # minimum section definition to instantiate parser plugin instances + config_section :parse, required: false, multi: true, param_name: :parser_configs do + config_argument :usage, :string, default: '' + config_param :@type, :string + end + end + end + + attr_reader :_parsers # for tests + + def initialize + super + @_parsers_started = false + @_parsers = {} # usage => parser + end + + def configure(conf) + super + + @parser_configs.each do |section| + if @_parsers[section.usage] + raise Fluent::ConfigError, "duplicated parsers configured: #{section.usage}" + end + parser = Plugin.new_parser(section[:@type], parent: self) + parser.configure(section.corresponding_config_element) + @_parsers[section.usage] = parser + end + end + + def start + super + @_parsers_started = true + @_parsers.each_pair do |usage, parser| + parser.start + end + end + + def parser_operate(method_name, &block) + @_parsers.each_pair do |usage, parser| + begin + parser.send(method_name) + block.call(parser) if block_given? + rescue => e + log.error "unexpected error while #{method_name}", usage: usage, parser: parser, error: e + end + end + end + + def stop + super + # timer stops automatically in super + parser_operate(:stop) + end + + def before_shutdown + parser_operate(:before_shutdown) + super + end + + def shutdown + parser_operate(:shutdown) + super + end + + def after_shutdown + parser_operate(:after_shutdown) + super + end + + def close + parser_operate(:close) + super + end + + def terminate + parser_operate(:terminate) + @_parsers = {} + super + end + end + end +end diff --git a/test/plugin_helper/test_formatter.rb b/test/plugin_helper/test_formatter.rb new file mode 100644 index 0000000000..48340d7bbd --- /dev/null +++ b/test/plugin_helper/test_formatter.rb @@ -0,0 +1,212 @@ +require_relative '../helper' +require 'fluent/plugin_helper/formatter' +require 'fluent/plugin/base' + +class FormatterHelperTest < Test::Unit::TestCase + class ExampleFormatter < Fluent::Plugin::Formatter + Fluent::Plugin.register_formatter('example', self) + def format(tag, time, record) + "#{tag},#{time.to_i},#{record.keys.sort.join(',')}" # hey, you miss values! :P + end + end + class Example2Formatter < Fluent::Plugin::Formatter + Fluent::Plugin.register_formatter('example2', self) + def format(tag, time, record) + "#{tag},#{time.to_i},#{record.values.sort.join(',')}" # key... + end + end + class Dummy < Fluent::Plugin::TestBase + helpers :formatter + end + + setup do + @d = nil + end + + teardown do + if @d + @d.stop unless @d.stopped? + @d.shutdown unless @d.shutdown? + @d.close unless @d.closed? + @d.terminate unless @d.terminated? + end + end + + test 'can be initialized without any formatters at first' do + d = Dummy.new + assert_equal 0, d._formatters.size + end + + test 'can be configured without format sections' do + d = Dummy.new + assert_nothing_raised do + d.configure(config_element()) + end + assert_equal 0, d._formatters.size + end + + test 'can be configured with a format section' do + d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('format', '', {'@type' => 'example'}) + ]) + assert_nothing_raised do + d.configure(conf) + end + assert_equal 1, d._formatters.size + assert{ d._formatters.values.all?{ |formatter| !formatter.started? } } + end + + test 'can be configured with 2 or more format sections with different usages with each other' do + d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('format', 'default', {'@type' => 'example'}), + config_element('format', 'extra', {'@type' => 'example2'}), + ]) + assert_nothing_raised do + d.configure(conf) + end + assert_equal 2, d._formatters.size + assert{ d._formatters.values.all?{ |formatter| !formatter.started? } } + end + + test 'cannot be configured with 2 format sections with same usage' do + d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('format', 'default', {'@type' => 'example'}), + config_element('format', 'extra', {'@type' => 'example2'}), + config_element('format', 'extra', {'@type' => 'example2'}), + ]) + assert_raises Fluent::ConfigError do + d.configure(conf) + end + end + + test 'creates a format plugin instance which is already configured without usage' do + @d = d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('format', '', {'@type' => 'example'}) + ]) + d.configure(conf) + d.start + + formatter = d.formatter_create + assert{ formatter.is_a? ExampleFormatter } + assert formatter.started? + end + + test 'creates a formatter plugin instance which is already configured with usage' do + @d = d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('format', 'mydata', {'@type' => 'example'}) + ]) + d.configure(conf) + d.start + + formatter = d.formatter_create(usage: 'mydata') + assert{ formatter.is_a? ExampleFormatter } + assert formatter.started? + end + + test 'creates a formatter plugin without configurations' do + @d = d = Dummy.new + d.configure(config_element()) + d.start + + formatter = d.formatter_create(usage: 'mydata', type: 'example', conf: config_element('format', 'mydata')) + assert{ formatter.is_a? ExampleFormatter } + assert formatter.started? + end + + test 'creates 2 or more formatter plugin instances' do + @d = d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('format', 'mydata', {'@type' => 'example'}), + config_element('format', 'secret', {'@type' => 'example2'}) + ]) + d.configure(conf) + d.start + + p1 = d.formatter_create(usage: 'mydata') + p2 = d.formatter_create(usage: 'secret') + assert{ p1.is_a? ExampleFormatter } + assert p1.started? + assert{ p2.is_a? Example2Formatter } + assert p2.started? + end + + test 'calls lifecycle methods for all plugin instances via owner plugin' do + @d = d = Dummy.new + conf = config_element('ROOT', '', {}, [ config_element('format', '', {'@type' => 'example'}), config_element('format', 'e2', {'@type' => 'example'}) ]) + d.configure(conf) + d.start + + i1 = d.formatter_create(usage: '') + i2 = d.formatter_create(usage: 'e2') + i3 = d.formatter_create(usage: 'e3', type: 'example2') + + assert i1.started? + assert i2.started? + assert i3.started? + + assert !i1.stopped? + assert !i2.stopped? + assert !i3.stopped? + + d.stop + + assert i1.stopped? + assert i2.stopped? + assert i3.stopped? + + assert !i1.before_shutdown? + assert !i2.before_shutdown? + assert !i3.before_shutdown? + + d.before_shutdown + + assert i1.before_shutdown? + assert i2.before_shutdown? + assert i3.before_shutdown? + + assert !i1.shutdown? + assert !i2.shutdown? + assert !i3.shutdown? + + d.shutdown + + assert i1.shutdown? + assert i2.shutdown? + assert i3.shutdown? + + assert !i1.after_shutdown? + assert !i2.after_shutdown? + assert !i3.after_shutdown? + + d.after_shutdown + + assert i1.after_shutdown? + assert i2.after_shutdown? + assert i3.after_shutdown? + + assert !i1.closed? + assert !i2.closed? + assert !i3.closed? + + d.close + + assert i1.closed? + assert i2.closed? + assert i3.closed? + + assert !i1.terminated? + assert !i2.terminated? + assert !i3.terminated? + + d.terminate + + assert i1.terminated? + assert i2.terminated? + assert i3.terminated? + end +end diff --git a/test/plugin_helper/test_parser.rb b/test/plugin_helper/test_parser.rb new file mode 100644 index 0000000000..ec2a963ea1 --- /dev/null +++ b/test/plugin_helper/test_parser.rb @@ -0,0 +1,223 @@ +require_relative '../helper' +require 'fluent/plugin_helper/parser' +require 'fluent/plugin/base' +require 'fluent/time' + +class ParserHelperTest < Test::Unit::TestCase + class ExampleParser < Fluent::Plugin::Parser + Fluent::Plugin.register_parser('example', self) + def parse(text) + ary = text.split(/\s*,\s*/) + r = {} + ary.each_with_index do |v, i| + r[i.to_s] = v + end + yield Fluent::EventTime.now, r + end + end + class Example2Parser < Fluent::Plugin::Parser + Fluent::Plugin.register_parser('example2', self) + def parse(text) + ary = text.split(/\s*,\s*/) + r = {} + ary.each_with_index do |v, i| + r[i.to_s] = v + end + yield Fluent::EventTime.now, r + end + end + class Dummy < Fluent::Plugin::TestBase + helpers :parser + end + + setup do + @d = nil + end + + teardown do + if @d + @d.stop unless @d.stopped? + @d.shutdown unless @d.shutdown? + @d.close unless @d.closed? + @d.terminate unless @d.terminated? + end + end + + test 'can be initialized without any parsers at first' do + d = Dummy.new + assert_equal 0, d._parsers.size + end + + test 'can be configured without parse sections' do + d = Dummy.new + assert_nothing_raised do + d.configure(config_element()) + end + assert_equal 0, d._parsers.size + end + + test 'can be configured with a parse section' do + d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('parse', '', {'@type' => 'example'}) + ]) + assert_nothing_raised do + d.configure(conf) + end + assert_equal 1, d._parsers.size + assert{ d._parsers.values.all?{ |parser| !parser.started? } } + end + + test 'can be configured with 2 or more parse sections with different usages with each other' do + d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('parse', 'default', {'@type' => 'example'}), + config_element('parse', 'extra', {'@type' => 'example2'}), + ]) + assert_nothing_raised do + d.configure(conf) + end + assert_equal 2, d._parsers.size + assert{ d._parsers.values.all?{ |parser| !parser.started? } } + end + + test 'cannot be configured with 2 parse sections with same usage' do + d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('parse', 'default', {'@type' => 'example'}), + config_element('parse', 'extra', {'@type' => 'example2'}), + config_element('parse', 'extra', {'@type' => 'example2'}), + ]) + assert_raises Fluent::ConfigError do + d.configure(conf) + end + end + + test 'creates a parse plugin instance which is already configured without usage' do + @d = d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('parse', '', {'@type' => 'example'}) + ]) + d.configure(conf) + d.start + + parser = d.parser_create + assert{ parser.is_a? ExampleParser } + assert parser.started? + end + + test 'creates a parser plugin instance which is already configured with usage' do + @d = d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('parse', 'mydata', {'@type' => 'example'}) + ]) + d.configure(conf) + d.start + + parser = d.parser_create(usage: 'mydata') + assert{ parser.is_a? ExampleParser } + assert parser.started? + end + + test 'creates a parser plugin without configurations' do + @d = d = Dummy.new + d.configure(config_element()) + d.start + + parser = d.parser_create(usage: 'mydata', type: 'example', conf: config_element('parse', 'mydata')) + assert{ parser.is_a? ExampleParser } + assert parser.started? + end + + test 'creates 2 or more parser plugin instances' do + @d = d = Dummy.new + conf = config_element('ROOT', '', {}, [ + config_element('parse', 'mydata', {'@type' => 'example'}), + config_element('parse', 'secret', {'@type' => 'example2'}) + ]) + d.configure(conf) + d.start + + p1 = d.parser_create(usage: 'mydata') + p2 = d.parser_create(usage: 'secret') + assert{ p1.is_a? ExampleParser } + assert p1.started? + assert{ p2.is_a? Example2Parser } + assert p2.started? + end + + test 'calls lifecycle methods for all plugin instances via owner plugin' do + @d = d = Dummy.new + conf = config_element('ROOT', '', {}, [ config_element('parse', '', {'@type' => 'example'}), config_element('parse', 'e2', {'@type' => 'example'}) ]) + d.configure(conf) + d.start + + i1 = d.parser_create(usage: '') + i2 = d.parser_create(usage: 'e2') + i3 = d.parser_create(usage: 'e3', type: 'example2') + + assert i1.started? + assert i2.started? + assert i3.started? + + assert !i1.stopped? + assert !i2.stopped? + assert !i3.stopped? + + d.stop + + assert i1.stopped? + assert i2.stopped? + assert i3.stopped? + + assert !i1.before_shutdown? + assert !i2.before_shutdown? + assert !i3.before_shutdown? + + d.before_shutdown + + assert i1.before_shutdown? + assert i2.before_shutdown? + assert i3.before_shutdown? + + assert !i1.shutdown? + assert !i2.shutdown? + assert !i3.shutdown? + + d.shutdown + + assert i1.shutdown? + assert i2.shutdown? + assert i3.shutdown? + + assert !i1.after_shutdown? + assert !i2.after_shutdown? + assert !i3.after_shutdown? + + d.after_shutdown + + assert i1.after_shutdown? + assert i2.after_shutdown? + assert i3.after_shutdown? + + assert !i1.closed? + assert !i2.closed? + assert !i3.closed? + + d.close + + assert i1.closed? + assert i2.closed? + assert i3.closed? + + assert !i1.terminated? + assert !i2.terminated? + assert !i3.terminated? + + d.terminate + + assert i1.terminated? + assert i2.terminated? + assert i3.terminated? + end +end From 44e0f9679ed966da8a8ae5bad34a537046443d46 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Mon, 6 Jun 2016 12:17:16 +0900 Subject: [PATCH 3/3] fix error messages and removed unused require, useless comments --- lib/fluent/plugin_helper/formatter.rb | 7 ++----- lib/fluent/plugin_helper/parser.rb | 7 ++----- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/lib/fluent/plugin_helper/formatter.rb b/lib/fluent/plugin_helper/formatter.rb index 74dd1bea3c..f3cf46dc85 100644 --- a/lib/fluent/plugin_helper/formatter.rb +++ b/lib/fluent/plugin_helper/formatter.rb @@ -14,9 +14,6 @@ # limitations under the License. # -require 'monitor' -require 'forwardable' - require 'fluent/plugin' require 'fluent/plugin/formatter' require 'fluent/config/element' @@ -30,7 +27,7 @@ def formatter_create(usage: '', type: nil, conf: nil) if !type raise ArgumentError, "BUG: both type and conf are not specified" unless conf - raise Fluent::ConfigError, "@type is not specified for " unless conf['@type'] + raise Fluent::ConfigError, "@type is required in " unless conf['@type'] type = conf['@type'] end formatter = Fluent::Plugin.new_formatter(type, parent: self) @@ -107,7 +104,6 @@ def formatter_operate(method_name, &block) def stop super - # timer stops automatically in super formatter_operate(:stop) end @@ -133,6 +129,7 @@ def close def terminate formatter_operate(:terminate) + @_formatters_started = false @_formatters = {} super end diff --git a/lib/fluent/plugin_helper/parser.rb b/lib/fluent/plugin_helper/parser.rb index 14725401f5..88d3e32a8a 100644 --- a/lib/fluent/plugin_helper/parser.rb +++ b/lib/fluent/plugin_helper/parser.rb @@ -14,9 +14,6 @@ # limitations under the License. # -require 'monitor' -require 'forwardable' - require 'fluent/plugin' require 'fluent/plugin/parser' require 'fluent/config/element' @@ -30,7 +27,7 @@ def parser_create(usage: '', type: nil, conf: nil) if !type raise ArgumentError, "BUG: both type and conf are not specified" unless conf - raise Fluent::ConfigError, "@type is not specified for " unless conf['@type'] + raise Fluent::ConfigError, "@type is required in " unless conf['@type'] type = conf['@type'] end parser = Fluent::Plugin.new_parser(type, parent: self) @@ -107,7 +104,6 @@ def parser_operate(method_name, &block) def stop super - # timer stops automatically in super parser_operate(:stop) end @@ -133,6 +129,7 @@ def close def terminate parser_operate(:terminate) + @_parsers_started = false @_parsers = {} super end