Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warn too many staged chunks at configure #1054

Merged
merged 2 commits into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Output < Base
CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/
CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@a-zA-Z0-9]+\}/

CHUNKING_FIELD_WARN_NUM = 4

config_param :time_as_integer, :bool, default: false

# `<buffer>` and `<secondary>` sections are available only when '#format' and '#write' are implemented
Expand Down Expand Up @@ -253,6 +255,10 @@ def configure(conf)
@output_time_formatter_cache = {}
end

if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM
log.warn "many chunk keys specified, and it may cause too many chunks on your system."
end

# no chunk keys or only tags (chunking can be done without iterating event stream)
@simple_chunking = !@chunk_key_time && @chunk_keys.empty?

Expand Down
22 changes: 21 additions & 1 deletion lib/fluent/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

require 'test/unit'
require 'fluent/env' # for Fluent.windows?
require 'fluent/test/log'
require 'fluent/test/base'
require 'fluent/test/input_test'
require 'fluent/test/output_test'
Expand All @@ -28,4 +29,23 @@
dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO
logdev = Fluent::Test::DummyLogDevice.new
logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
$log ||= Fluent::Log.new(logger)
$log ||= Fluent::Log.new(logger)

module Fluent
module Test
def self.setup
Fluent.__send__(:remove_const, :Engine)
engine = Fluent.const_set(:Engine, EngineClass.new).init(SystemConfig.new)

engine.define_singleton_method(:now=) {|n|
@now = n
}
engine.define_singleton_method(:now) {
@now ||= super()
}

nil
end
end
end

68 changes: 2 additions & 66 deletions lib/fluent/test/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,14 @@
# limitations under the License.
#

require 'fluent/config'
require 'fluent/engine'
require 'fluent/system_config'
require 'fluent/config'
require 'fluent/test/log'
require 'serverengine'

module Fluent
module Test
def self.setup
Fluent.__send__(:remove_const, :Engine)
engine = Fluent.const_set(:Engine, EngineClass.new).init(SystemConfig.new)

engine.define_singleton_method(:now=) {|n|
@now = n
}
engine.define_singleton_method(:now) {
@now ||= super()
}

nil
end

class TestDriver
include ::Test::Unit::Assertions

Expand Down Expand Up @@ -84,57 +71,6 @@ def run(num_waits = 10, &block)
end
end
end

class DummyLogDevice
attr_reader :logs

def initialize
@logs = []
end

def reset
@logs = []
end

def tty?
false
end

def puts(*args)
args.each{ |arg| write(arg + "\n") }
end

def write(message)
@logs.push message
end

def flush
true
end

def close
true
end
end

class TestLogger < Fluent::PluginLogger
def initialize
@logdev = DummyLogDevice.new
dl_opts = {}
dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO
logger = ServerEngine::DaemonLogger.new(@logdev, dl_opts)
log = Fluent::Log.new(logger)
super(log)
end

def reset
@logdev.reset
end

def logs
@logdev.logs
end
end
end
end

Expand Down
73 changes: 73 additions & 0 deletions lib/fluent/test/log.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'serverengine'
require 'fluent/log'

module Fluent
module Test
class DummyLogDevice
attr_reader :logs

def initialize
@logs = []
end

def reset
@logs = []
end

def tty?
false
end

def puts(*args)
args.each{ |arg| write(arg + "\n") }
end

def write(message)
@logs.push message
end

def flush
true
end

def close
true
end
end

class TestLogger < Fluent::PluginLogger
def initialize
@logdev = DummyLogDevice.new
dl_opts = {}
dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO
logger = ServerEngine::DaemonLogger.new(@logdev, dl_opts)
log = Fluent::Log.new(logger)
super(log)
end

def reset
@logdev.reset
end

def logs
@logdev.logs
end
end
end
end
51 changes: 51 additions & 0 deletions test/plugin/test_output_as_buffered.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,57 @@ def waiting(seconds)
Timecop.return
end

sub_test_case 'buffered output configured with many chunk keys' do
setup do
@stored_global_logger = $log
$log = Fluent::Test::TestLogger.new
@hash = {
'flush_mode' => 'interval',
'flush_thread_burst_interval' => 0.01,
'chunk_limit_size' => 1024,
'timekey' => 60,
}
@i = create_output(:buffered)
end
teardown do
$log = @stored_global_logger
end
test 'nothing are warned with less chunk keys' do
chunk_keys = 'time,key1,key2,key3'
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)]))
logs = @i.log.out.logs.dup
@i.start
assert{ logs.select{|log| log.include?('[warn]') }.size == 0 }
end

test 'a warning reported with 4 chunk keys' do
chunk_keys = 'key1,key2,key3,key4'
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)]))
logs = @i.log.out.logs.dup

@i.start # this calls `log.reset`... capturing logs about configure must be done before this line
assert_equal ['key1', 'key2', 'key3', 'key4'], @i.chunk_keys

assert{ logs.select{|log| log.include?('[warn]: many chunk keys specified, and it may cause too many chunks on your system.') }.size == 1 }
end

test 'a warning reported with 4 chunk keys including "tag"' do
chunk_keys = 'tag,key1,key2,key3'
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)]))
logs = @i.log.out.logs.dup
@i.start # this calls `log.reset`... capturing logs about configure must be done before this line
assert{ logs.select{|log| log.include?('[warn]: many chunk keys specified, and it may cause too many chunks on your system.') }.size == 1 }
end

test 'time key is not included for warned chunk keys' do
chunk_keys = 'time,key1,key2,key3'
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_keys,@hash)]))
logs = @i.log.out.logs.dup
@i.start
assert{ logs.select{|log| log.include?('[warn]') }.size == 0 }
end
end

sub_test_case 'buffered output feature without any buffer key, flush_mode: lazy' do
setup do
hash = {
Expand Down