Skip to content

Commit

Permalink
Merge pull request #1306 from fluent/migrate-forward-plugin-to-v0.14-api
Browse files Browse the repository at this point in the history
Migrate forward plugin to v0.14 api
  • Loading branch information
tagomoris authored Nov 9, 2016
2 parents 38852e7 + 8b2e941 commit a3a8444
Show file tree
Hide file tree
Showing 7 changed files with 473 additions and 596 deletions.
130 changes: 25 additions & 105 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,26 @@
# limitations under the License.
#

require 'fcntl'

require 'cool.io'
require 'fluent/plugin/input'
require 'fluent/msgpack_factory'
require 'yajl'
require 'digest'

require 'fluent/input'
require 'fluent/plugin/socket_util'
require 'fcntl'
require 'cool.io'

module Fluent
module Fluent::Plugin
class ForwardInput < Input
Plugin.register_input('forward', self)
Fluent::Plugin.register_input('forward', self)

LISTEN_PORT = 24224
# See the wiki page below for protocol specification
# https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1

def initialize
super
require 'fluent/plugin/socket_util'
end
helpers :event_loop

LISTEN_PORT = 24224

desc 'The port to listen to.'
config_param :port, :integer, default: LISTEN_PORT
Expand Down Expand Up @@ -125,46 +128,30 @@ def configure(conf)
})
end
end
@lsock = @usock = nil
end

def start
super

@loop = Coolio::Loop.new

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)

@lsock = listen(client)
@loop.attach(@lsock)
event_loop_attach(@lsock)

@usock = client.listen_udp(@bind, @port)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
@loop.attach(@hbr)

@thread = Thread.new(&method(:run))
event_loop_attach(@hbr)
end

def shutdown
# In test cases it occasionally appeared that when detaching a watcher, another watcher is also detached.
# In the case in the iteration of watchers, a watcher that has been already detached is intended to be detached
# and therfore RuntimeError occurs saying that it is not attached to a loop.
# It occurs only when testing for sending responses to ForwardOutput.
# Sending responses needs to write the socket that is previously used only to read
# and a handler has 2 watchers that is used to read and to write.
# This problem occurs possibly because those watchers are thought to be related to each other
# and when detaching one of them the other is also detached for some reasons.
# As a workaround, check if watchers are attached before detaching them.
@loop.watchers.each {|w| w.detach if w.attached? }
@loop.stop
@usock.close
@thread.join
@lsock.close

def close
@lsock.close if @lsock
@usock.close if @usock
super
end

Expand All @@ -176,23 +163,6 @@ def listen(client)
s
end

#config_param :path, :string, :default => DEFAULT_SOCKET_PATH
#def listen
# if File.exist?(@path)
# File.unlink(@path)
# end
# FileUtils.mkdir_p File.dirname(@path)
# log.debug "listening fluent socket on #{@path}"
# Coolio::UNIXServer.new(@path, Handler, method(:on_message))
#end

def run
@loop.run(@blocking_timeout)
rescue => e
log.error "unexpected error", error: e
log.error_backtrace
end

private

def handle_connection(conn)
Expand Down Expand Up @@ -288,29 +258,6 @@ def response(option)
nil
end

# message Entry {
# 1: long time
# 2: object record
# }
#
# message Forward {
# 1: string tag
# 2: list<Entry> entries
# 3: object option (optional)
# }
#
# message PackedForward {
# 1: string tag
# 2: raw entries # msgpack stream of Entry
# 3: object option (optional)
# }
#
# message Message {
# 1: string tag
# 2: long? time
# 3: object record
# 4: object option (optional)
# }
def on_message(msg, chunk_size, peeraddr)
if msg.nil?
# for future TCP heartbeat_request
Expand Down Expand Up @@ -338,7 +285,7 @@ def on_message(msg, chunk_size, peeraddr)
# PackedForward
option = msg[2]
size = (option && option['size']) || 0
es_class = (option && option['compressed'] == 'gzip') ? CompressedMessagePackEventStream : MessagePackEventStream
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
es = es_class.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
Expand All @@ -349,12 +296,12 @@ def on_message(msg, chunk_size, peeraddr)
es = if @skip_invalid_event
check_and_skip_invalid_event(tag, entries, peeraddr)
else
es = MultiEventStream.new
es = Fluent::MultiEventStream.new
entries.each { |e|
record = e[1]
next if record.nil?
time = e[0]
time = (now ||= Engine.now) if time.to_i == 0
time = Fluent::Engine.now if time.nil? || time.to_i == 0 # `to_i == 0` for empty EventTime
es.add(time, record)
}
es
Expand All @@ -372,7 +319,7 @@ def on_message(msg, chunk_size, peeraddr)
return msg[3] # retry never succeeded so return ack and drop incoming event.
end
return if record.nil?
time = Engine.now if time.to_i == 0
time = Fluent::Engine.now if time.to_i == 0
record[@source_hostname_key] = peeraddr[2] if @source_hostname_key
router.emit(tag, time, record)
option = msg[3]
Expand All @@ -387,7 +334,7 @@ def invalid_event?(tag, time, record)
end

def check_and_skip_invalid_event(tag, es, peeraddr)
new_es = MultiEventStream.new
new_es = Fluent::MultiEventStream.new
es.each { |time, record|
if invalid_event?(tag, time, record)
log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record
Expand All @@ -399,7 +346,7 @@ def check_and_skip_invalid_event(tag, es, peeraddr)
end

def add_source_host(es, host)
new_es = MultiEventStream.new
new_es = Fluent::MultiEventStream.new
es.each { |time, record|
record[@source_hostname_key] = host
new_es.add(time, record)
Expand Down Expand Up @@ -430,33 +377,6 @@ def generate_helo(nonce, user_auth_salt)
['HELO', {'nonce' => nonce, 'auth' => (@security ? user_auth_salt : ''), 'keepalive' => !@deny_keepalive}]
end

##### Authentication Handshake
#
# 1. (client) connect to server
# * Socket handshake, checks certificate and its significate (in client, if using SSL)
# 2. (server)
# * check network/domain acl (if enabled)
# * disconnect when failed
# 3. (server) send HELO
# * ['HELO', options(hash)]
# * options:
# * nonce: string (required)
# * auth: string or blank_string (string: authentication required, and its salt is this value)
# 4. (client) send PING
# * ['PING', selfhostname, sharedkey_salt, sha512_hex(sharedkey_salt + selfhostname + nonce + sharedkey), username || '', sha512_hex(auth_salt + username + password) || '']
# 5. (server) check PING
# * check sharedkey
# * check username / password (if required)
# * send PONG FAILURE if failed
# * ['PONG', false, 'reason of authentication failure', '', '']
# 6. (server) send PONG
# * ['PONG', bool(authentication result), 'reason if authentication failed', selfhostname, sha512_hex(salt + selfhostname + nonce + sharedkey)]
# 7. (client) check PONG
# * check sharedkey
# * disconnect when failed
# 8. connection established
# * send data from client

def check_ping(message, remote_addr, user_auth_salt, nonce)
log.debug "checking ping"
# ['PING', self_hostname, shared_key_salt, sha512_hex(shared_key_salt + self_hostname + nonce + shared_key), username || '', sha512_hex(auth_salt + username + password) || '']
Expand Down
Loading

0 comments on commit a3a8444

Please sign in to comment.