Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to v0.14 plugin api #928

Merged
merged 29 commits into from
May 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1426fc1
Fix to do commit/rollback for 2 or more chunks at once
tagomoris May 12, 2016
a826141
fix to use Buffer#write
tagomoris May 12, 2016
9b3e3ba
fix tests with Buffer#write
tagomoris May 12, 2016
cd65e0e
Implement overflow_action (buffer_queue_full_action in v0.12)
tagomoris May 12, 2016
76c8264
set shorter flush_burst_interval to make teardown shorter
tagomoris May 12, 2016
6de12b0
add v0.12 plugin compatibility
tagomoris May 12, 2016
36a37c4
for readability
tagomoris May 9, 2016
e02204a
Fix to do commit/rollback for 2 or more chunks at once
tagomoris May 12, 2016
f2ec3b5
add hack to call super surely (even if v0.12 style plugin does not ca…
tagomoris Apr 28, 2016
783a393
new plugin uses #emit_events instead of #emit
tagomoris May 13, 2016
b27cdd1
remove useless method definitions (CallSuperMixin now searches class …
tagomoris May 13, 2016
3aa3dbe
fix to have corresponding config element on section objects
tagomoris May 13, 2016
397facf
add MultiOutput as a kind of Output plugin
tagomoris May 13, 2016
6c8a5af
switching plugin lifecycle
tagomoris May 13, 2016
8a1223e
fix to load OutputChain always
tagomoris May 13, 2016
9650d10
fix test not to raise ConfigError for missing store sections
tagomoris May 13, 2016
ce28b96
fix agent to call lifecycle methods for MultiOutput plugins
tagomoris May 16, 2016
5236e16
add tests for Fluent::Plugin::MultiOutput
tagomoris May 16, 2016
3047691
remove buffer plugins with old API
tagomoris May 16, 2016
7ee586c
rename new buffer plugin implementations as default one
tagomoris May 16, 2016
b6c8d04
rename memory2/file2 to memory/file and fix to use new error class fo…
tagomoris May 16, 2016
05decc1
delete unused class definition
tagomoris May 16, 2016
d8ecb10
use #emit_events instead of #emit for plugins
tagomoris May 16, 2016
46ae141
fix but, not to set buffer_path to path in <buffer> section
tagomoris May 16, 2016
18fa283
fix to call self_chain.next recursively to call next plugin instance …
tagomoris May 16, 2016
0cd2844
add configuration example with buf_file
tagomoris May 16, 2016
3afb1fd
fix to call super to initialize internal state
tagomoris May 16, 2016
dabce88
add configuration example to forward events from remote to remote
tagomoris May 17, 2016
eefaa15
add kwarg (to be ignored) to have same arguments between #to_msgpack_…
tagomoris May 17, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions example/in_out_forward.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<source>
@type forward
port 24224
</source>

<match test.**>
@type forward
buffer_type file
buffer_path /tmp/fluentd.forward.buffer
num_threads 10
flush_interval 1s
<server>
host 127.0.0.1
port 24225
</server>
</match>

18 changes: 18 additions & 0 deletions example/out_forward_buf_file.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<source>
@type dummy
tag test
</source>

<match test>
@type forward
buffer_path /tmp/fluentd.forward
buffer_type file
flush_interval 5
send_timeout 60
heartbeat_type tcp
heartbeat_interval 1
<server>
host 127.0.0.1
port 24224
</server>
</match>
104 changes: 54 additions & 50 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,15 @@ def initialize(log:)
@context = nil
@outputs = []
@filters = []
@started_outputs = []
@started_filters = []

@lifecycle_control_list = nil
# lifecycle_control_list is the list of plugins in this agent, and ordered
# from plugins which DOES emit, then DOESN'T emit
# (input -> output w/ router -> filter -> output w/o router)
# for start: use this order DESC
# (because plugins which appears later in configurations will receive events from plugins which appears ealier)
# for stop/before_shutdown/shutdown/after_shutdown/close/terminate: use this order ASC
@lifecycle_cache = nil

@log = log
@event_router = EventRouter.new(NoMatchMatch.new(log), self)
Expand Down Expand Up @@ -64,63 +71,60 @@ def configure(conf)
}
end

def start
@outputs.each { |o|
o.start
@started_outputs << o
}
def lifecycle_control_list
return @lifecycle_control_list if @lifecycle_control_list

@filters.each { |f|
f.start
@started_filters << f
lifecycle_control_list = {
input: [],
output_with_router: [],
filter: [],
output: [],
}
end

def shutdown
@started_filters.map { |f|
Thread.new do
begin
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_type_from_class(f.class), plugin_id: f.plugin_id
f.shutdown
rescue => e
log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error: e
log.warn_backtrace
end
if self.respond_to?(:inputs)
inputs.each do |i|
lifecycle_control_list[:input] << i
end
end
recursive_output_traverse = ->(o) {
if o.has_router?
lifecycle_control_list[:output_with_router] << o
else
lifecycle_control_list[:output] << o
end
}.each { |t| t.join }

# Output plugin as filter emits records at shutdown so emit problem still exist.
# This problem will be resolved after actual filter mechanizm.
@started_outputs.map { |o|
Thread.new do
begin
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_type_from_class(o.class), plugin_id: o.plugin_id
o.shutdown
rescue => e
log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error: e
log.warn_backtrace

if o.respond_to?(:outputs)
o.outputs.each do |store|
recursive_output_traverse.call(store)
end
end
}.each { |t| t.join }
end
}
outputs.each do |o|
recursive_output_traverse.call(o)
end
filters.each do |f|
lifecycle_control_list[:filter] << f
end

def flush!
flush_recursive(@outputs)
@lifecycle_control_list = lifecycle_control_list
end

def flush_recursive(array)
array.each { |o|
begin
if o.is_a?(BufferedOutput)
o.force_flush
elsif o.is_a?(MultiOutput)
flush_recursive(o.outputs)
end
rescue => e
log.debug "error while force flushing", error: e
log.debug_backtrace
def lifecycle(desc: false)
kind_list = if desc
[:output, :filter, :output_with_router]
else
[:output_with_router, :filter, :output]
end
kind_list.each do |kind|
list = if desc
lifecycle_control_list[kind].reverse
else
lifecycle_control_list[kind]
end
display_kind = (kind == :output_with_router ? :output : kind)
list.each do |instance|
yield instance, display_kind
end
}
end
end

def add_match(type, pattern, conf)
Expand Down
Loading