Skip to content

Commit

Permalink
writing
Browse files Browse the repository at this point in the history
  • Loading branch information
tagomoris committed May 23, 2016
1 parent f148ed3 commit b1afd1c
Show file tree
Hide file tree
Showing 16 changed files with 727 additions and 56 deletions.
35 changes: 19 additions & 16 deletions lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

require 'json'

require 'fluent/input'
require 'fluent/plugin/input'
require 'fluent/config/error'

module Fluent
module Fluent::Plugin
class DummyInput < Input
Fluent::Plugin.register_input('dummy', self)

helpers :thread, :storage

BIN_NUM = 10

desc "The value is the tag assigned to the generated events."
Expand All @@ -48,37 +50,39 @@ class DummyInput < Input
dummy
end

def configure(conf)
def initialize
super
@storage = nil
end

@increment_value = 0
def configure(conf)
super
@dummy_index = 0
end

def start
super
@running = true
@thread = Thread.new(&method(:run))
end

def shutdown
@running = false
@thread.join
super
@storage = storage_create(type: 'local')
if @auto_increment_key && !@storage.get(:auto_increment_value)
@storage.put(:auto_increment_value, -1)
end

thread_create(:dummy_input, &method(:run))
end

def run
batch_num = (@rate / BIN_NUM).to_i
residual_num = (@rate % BIN_NUM)
while @running
while thread_current_running?
current_time = Time.now.to_i
BIN_NUM.times do
break unless (@running && Time.now.to_i <= current_time)
break unless (thread_current_running? && Time.now.to_i <= current_time)
wait(0.1) { emit(batch_num) }
end
emit(residual_num)
# wait for next second
while @running && Time.now.to_i <= current_time
while thread_current_running? && Time.now.to_i <= current_time
sleep 0.01
end
end
Expand All @@ -97,8 +101,7 @@ def generate
@dummy_index += 1
if @auto_increment_key
d = d.dup
d[@auto_increment_key] = @increment_value
@increment_value += 1
d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 }
end
d
end
Expand Down
59 changes: 59 additions & 0 deletions lib/fluent/plugin/out_buffered_null.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin/output'

module Fluent::Plugin
class BufferedNullOutput < Output
# This plugin is for tests of buffer plugins

Fluent::Plugin.register_output('buffered_null', self)

config_section :buffer do
config_set_default :chunk_keys, ['tag']
config_set_default :flush_at_shutdown, true
config_set_default :chunk_bytes_limit, 10 * 1024
end

attr_accessor :feed_proc, :delayed

def initialize
super
@delayed = false
@feed_proc = nil
end

def prefer_delayed_commit
@delayed
end

def write(chunk)
if @feed_proc
@feed_proc.call(chunk)
else
# ignore chunk.read
end
end

def try_write(chunk)
if @feed_proc
@feed_proc.call(chunk)
else
# ignore chunk.read
end
end
end
end
12 changes: 5 additions & 7 deletions lib/fluent/plugin/out_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,26 @@
# limitations under the License.
#

require 'fluent/output'
require 'fluent/plugin/output'

module Fluent
module Fluent::Plugin
class StdoutOutput < Output
Plugin.register_output('stdout', self)
Fluent::Plugin.register_output('stdout', self)

desc 'Output format.(json,hash)'
config_param :output_type, default: 'json'

def configure(conf)
super
@formatter = Plugin.new_formatter(@output_type)
@formatter = Fluent::Plugin.new_formatter(@output_type)
@formatter.configure(conf)
end

def emit(tag, es, chain)
def process(tag, es)
es.each {|time,record|
$log.write "#{Time.at(time).localtime} #{tag}: #{@formatter.format(tag, time, record).chomp}\n"
}
$log.flush

chain.next
end
end
end
18 changes: 14 additions & 4 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,18 @@ def expired?

# for tests
attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_time, :chunk_key_tag
attr_accessor :output_enqueue_thread_waiting
attr_accessor :output_enqueue_thread_waiting, :in_tests

# output_enqueue_thread_waiting: for test of output.rb itself
# in_tests: for tests of plugins with test drivers

def initialize
super
@counters_monitor = Monitor.new
@buffering = false
@delayed_commit = false
@as_secondary = false
@in_tests = false
@primary_instance = nil

# TODO: well organized counters
Expand All @@ -172,6 +176,10 @@ def initialize

@buffer = nil
@secondary = nil
@retry = nil
@dequeued_chunks = nil
@output_flush_threads = nil

@simple_chunking = nil
@chunk_keys = @chunk_key_time = @chunk_key_tag = nil
@flush_mode = nil
Expand Down Expand Up @@ -231,7 +239,7 @@ def configure(conf)
if (@buffering || @buffering.nil?) && !@as_secondary
# When @buffering.nil?, @buffer_config was initialized with default value for all parameters.
# If so, this configuration MUST success.
@chunk_keys = @buffer_config.chunk_keys
@chunk_keys = @buffer_config.chunk_keys.dup
@chunk_key_time = !!@chunk_keys.delete('time')
@chunk_key_tag = !!@chunk_keys.delete('tag')
if @chunk_keys.any?{ |key| key !~ CHUNK_KEY_PATTERN }
Expand Down Expand Up @@ -358,8 +366,10 @@ def start
end
@output_flush_thread_current_position = 0

if @flush_mode == :fast || @chunk_key_time
thread_create(:enqueue_thread, &method(:enqueue_thread_run))
unless @in_tests
if @flush_mode == :fast || @chunk_key_time
thread_create(:enqueue_thread, &method(:enqueue_thread_run))
end
end
end
@secondary.start if @secondary
Expand Down
8 changes: 6 additions & 2 deletions lib/fluent/plugin/storage_local.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ def configure(conf)

@on_memory = false
if !@path && !@_plugin_id_configured
if @autosave || @persistent
if @persistent
raise Fluent::ConfigError, "Plugin @id or path for <storage> required to save data"
else
log.info "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
if @autosave
log.warn "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
else
log.info "both of Plugin @id and path for <storage> are not specified. Using on-memory store."
end
@on_memory = true
end
elsif @path
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/system_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def self.blank_system_config
end

def self.overwrite_system_config(hash)
older = $_system_config || nil
older = defined?($_system_config) ? $_system_config : nil
begin
$_system_config = SystemConfig.new(Fluent::Config::Element.new('system', '', hash, []))
yield
Expand Down Expand Up @@ -111,7 +111,7 @@ def system_config
def system_config_override(opts={})
require 'fluent/engine'
if !instance_variable_defined?("@_system_config") || @_system_config.nil?
@_system_config = ($_system_config || Fluent::Engine.system_config).dup
@_system_config = (defined?($_system_config) ? $_system_config : Fluent::Engine.system_config).dup
end
opts.each_pair do |key, value|
@_system_config.send(:"#{key.to_s}=", value)
Expand Down
Loading

0 comments on commit b1afd1c

Please sign in to comment.