diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 7d70098fad..0efcab423c 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -96,6 +96,7 @@ class Output < Base config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.' config_param :retry_randomize, :bool, default: true, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.' + config_param :disable_chunk_backup, :bool, default: false, desc: 'If true, chunks are thrown away when unrecoverable error happens' end config_section :secondary, param_name: :secondary_config, required: false, multi: false, final: true do @@ -1161,17 +1162,21 @@ def try_flush end def backup_chunk(chunk, using_secondary, delayed_commit) - unique_id = dump_unique_id_hex(chunk.unique_id) - safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_') - backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR - backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log") - backup_dir = File.dirname(backup_file) - - log.warn "bad chunk is moved to #{backup_file}" - FileUtils.mkdir_p(backup_dir) unless Dir.exist?(backup_dir) - File.open(backup_file, 'ab', system_config.file_permission || 0644) { |f| - chunk.write_to(f) - } + if @buffer_config.disable_chunk_backup + log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away" + else + unique_id = dump_unique_id_hex(chunk.unique_id) + safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_') + backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR + backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log") + backup_dir = File.dirname(backup_file) + + log.warn "bad chunk is moved to #{backup_file}" + FileUtils.mkdir_p(backup_dir) unless Dir.exist?(backup_dir) + File.open(backup_file, 'ab', system_config.file_permission || 0644) { |f| + chunk.write_to(f) + } + end commit_write(chunk.unique_id, secondary: using_secondary, delayed: delayed_commit) end diff --git a/test/command/test_plugin_config_formatter.rb b/test/command/test_plugin_config_formatter.rb index de31d50f80..b2d1494a4a 100644 --- a/test/command/test_plugin_config_formatter.rb +++ b/test/command/test_plugin_config_formatter.rb @@ -155,6 +155,7 @@ def process(tag, es) retry_exponential_backoff_base: float: (2) retry_max_interval: time: (nil) retry_randomize: bool: (true) + disable_chunk_backup: bool: (false) : optional, single @type: string: (nil) : optional, single diff --git a/test/plugin/test_output_as_buffered_backup.rb b/test/plugin/test_output_as_buffered_backup.rb index 1c0dfed145..0d3e5517d5 100644 --- a/test/plugin/test_output_as_buffered_backup.rb +++ b/test/plugin/test_output_as_buffered_backup.rb @@ -279,5 +279,29 @@ def wait_flush(target_file) assert { logs.any? { |l| l.include?("got unrecoverable error in primary and secondary is async output") } } end end + + test 'chunk is thrown away when disable_chunk_backup is true' do + Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do + id = 'backup_test' + hash = { + 'flush_interval' => 1, + 'flush_thread_burst_interval' => 0.1, + 'disable_chunk_backup' => true + } + chunk_id = nil + @i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash)])) + @i.register(:write) { |chunk| + chunk_id = chunk.unique_id; + raise Fluent::UnrecoverableError, "yay, your #write must fail" + } + + flush_chunks + + target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log" + assert_false File.exist?(target) + logs = @i.log.out.logs + assert { logs.any? { |l| l.include?("disable_chunk_backup is true") } } + end + end end end