Skip to content

Commit

Permalink
Merge pull request #2117 from fluent/output-disable-chunk-backup
Browse files Browse the repository at this point in the history
output: Add disable_chunk_backup for ignore broken chunks. fix #2112
  • Loading branch information
repeatedly authored Sep 12, 2018
2 parents 33fe27e + f16ae05 commit 0337300
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
27 changes: 16 additions & 11 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class Output < Base
config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.'

config_param :retry_randomize, :bool, default: true, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.'
config_param :disable_chunk_backup, :bool, default: false, desc: 'If true, chunks are thrown away when unrecoverable error happens'
end

config_section :secondary, param_name: :secondary_config, required: false, multi: false, final: true do
Expand Down Expand Up @@ -1161,17 +1162,21 @@ def try_flush
end

def backup_chunk(chunk, using_secondary, delayed_commit)
unique_id = dump_unique_id_hex(chunk.unique_id)
safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log")
backup_dir = File.dirname(backup_file)

log.warn "bad chunk is moved to #{backup_file}"
FileUtils.mkdir_p(backup_dir) unless Dir.exist?(backup_dir)
File.open(backup_file, 'ab', system_config.file_permission || 0644) { |f|
chunk.write_to(f)
}
if @buffer_config.disable_chunk_backup
log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away"
else
unique_id = dump_unique_id_hex(chunk.unique_id)
safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log")
backup_dir = File.dirname(backup_file)

log.warn "bad chunk is moved to #{backup_file}"
FileUtils.mkdir_p(backup_dir) unless Dir.exist?(backup_dir)
File.open(backup_file, 'ab', system_config.file_permission || 0644) { |f|
chunk.write_to(f)
}
end
commit_write(chunk.unique_id, secondary: using_secondary, delayed: delayed_commit)
end

Expand Down
1 change: 1 addition & 0 deletions test/command/test_plugin_config_formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def process(tag, es)
retry_exponential_backoff_base: float: (2)
retry_max_interval: time: (nil)
retry_randomize: bool: (true)
disable_chunk_backup: bool: (false)
<secondary>: optional, single
@type: string: (nil)
<buffer>: optional, single
Expand Down
24 changes: 24 additions & 0 deletions test/plugin/test_output_as_buffered_backup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -279,5 +279,29 @@ def wait_flush(target_file)
assert { logs.any? { |l| l.include?("got unrecoverable error in primary and secondary is async output") } }
end
end

test 'chunk is thrown away when disable_chunk_backup is true' do
Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
id = 'backup_test'
hash = {
'flush_interval' => 1,
'flush_thread_burst_interval' => 0.1,
'disable_chunk_backup' => true
}
chunk_id = nil
@i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash)]))
@i.register(:write) { |chunk|
chunk_id = chunk.unique_id;
raise Fluent::UnrecoverableError, "yay, your #write must fail"
}

flush_chunks

target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
assert_false File.exist?(target)
logs = @i.log.out.logs
assert { logs.any? { |l| l.include?("disable_chunk_backup is true") } }
end
end
end
end

0 comments on commit 0337300

Please sign in to comment.