Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

output: Add disable_chunk_backup for ignore broken chunks. fix #2112 #2117

Merged
merged 2 commits into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class Output < Base
config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.'

config_param :retry_randomize, :bool, default: true, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.'
config_param :disable_chunk_backup, :bool, default: false, desc: 'If true, chunks are thrown away when unrecoverable error happens'
end

config_section :secondary, param_name: :secondary_config, required: false, multi: false, final: true do
Expand Down Expand Up @@ -1161,17 +1162,21 @@ def try_flush
end

def backup_chunk(chunk, using_secondary, delayed_commit)
unique_id = dump_unique_id_hex(chunk.unique_id)
safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log")
backup_dir = File.dirname(backup_file)

log.warn "bad chunk is moved to #{backup_file}"
FileUtils.mkdir_p(backup_dir) unless Dir.exist?(backup_dir)
File.open(backup_file, 'ab', system_config.file_permission || 0644) { |f|
chunk.write_to(f)
}
if @buffer_config.disable_chunk_backup
log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away"
else
unique_id = dump_unique_id_hex(chunk.unique_id)
safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log")
backup_dir = File.dirname(backup_file)

log.warn "bad chunk is moved to #{backup_file}"
FileUtils.mkdir_p(backup_dir) unless Dir.exist?(backup_dir)
File.open(backup_file, 'ab', system_config.file_permission || 0644) { |f|
chunk.write_to(f)
}
end
commit_write(chunk.unique_id, secondary: using_secondary, delayed: delayed_commit)
end

Expand Down
1 change: 1 addition & 0 deletions test/command/test_plugin_config_formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def process(tag, es)
retry_exponential_backoff_base: float: (2)
retry_max_interval: time: (nil)
retry_randomize: bool: (true)
disable_chunk_backup: bool: (false)
<secondary>: optional, single
@type: string: (nil)
<buffer>: optional, single
Expand Down
24 changes: 24 additions & 0 deletions test/plugin/test_output_as_buffered_backup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -279,5 +279,29 @@ def wait_flush(target_file)
assert { logs.any? { |l| l.include?("got unrecoverable error in primary and secondary is async output") } }
end
end

test 'chunk is thrown away when disable_chunk_backup is true' do
Fluent::SystemConfig.overwrite_system_config('root_dir' => TMP_DIR) do
id = 'backup_test'
hash = {
'flush_interval' => 1,
'flush_thread_burst_interval' => 0.1,
'disable_chunk_backup' => true
}
chunk_id = nil
@i.configure(config_element('ROOT', '', {'@id' => id}, [config_element('buffer', 'tag', hash)]))
@i.register(:write) { |chunk|
chunk_id = chunk.unique_id;
raise Fluent::UnrecoverableError, "yay, your #write must fail"
}

flush_chunks

target = "#{TMP_DIR}/backup/worker0/#{id}/#{@i.dump_unique_id_hex(chunk_id)}.log"
assert_false File.exist?(target)
logs = @i.log.out.logs
assert { logs.any? { |l| l.include?("disable_chunk_backup is true") } }
end
end
end
end