Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_forward: Implement handshake protocol #8561

Merged
merged 6 commits into from
Mar 14, 2024

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Mar 8, 2024

In forward protocol, there is a definition for handshake.
When using out_forward on Fluentd with <security> directive(s), out_forward on Fluentd hangs due to Fluent Bit didn't implement handshake protocol on its in_forward.
This PR aims to implement handshake protocol. but I didn't implement user authentication part yet. User authentication is also implemented in this PR.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[SERVICE]
    Flush        5
    Daemon       Off
    Log_Level    debug
    HTTP_Monitor Off
    HTTP_Port    2020

[INPUT]
    Name forward
    Listen 0.0.0.0
    Port 24224
    shared_key  FluentBit
    security.users fluentd changeme
    tls         On
    tls.verify  Off
    tls.crt_file self_signed.crt
    tls.key_file self_signed.key

[OUTPUT]
    Name  stdout
    Match **

The self signed certificate is created by the following:

/path/to/fluent-bit/build$ openssl req -x509 \
            -newkey rsa:4096 \
            -sha256 \
            -nodes \
            -keyout self_signed.key \
            -out self_signed.crt \
            -subj "/CN=test.host.net"

And using Fluentd with its configuration:

<source>
  @type sample
  tag test
  # rate 10
</source>

<match test>
  @type forward
  flush_interval 2
  transport tls
  tls_allow_self_signed_cert true
  tls_cert_path /path/to/fluent-bit/build/self_signed.crt
  tls_verify_hostname false
  <security>
    self_hostname output.testing.local
    shared_key FluentBit
  </security>
  <server>
    host 127.0.0.1
    port 24224
    username fluentd
    password changeme
  </server>
</match>
  • Debug log output from testing the change
Fluent Bit v3.0.0
* Copyright (C) 2015-2024 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

___________.__                        __    __________.__  __          ________  
\_   _____/|  |  __ __   ____   _____/  |_  \______   \__|/  |_  ___  _\_____  \ 
 |    __)  |  | |  |  \_/ __ \ /    \   __\  |    |  _/  \   __\ \  \/ / _(__  < 
 |     \   |  |_|  |  /\  ___/|   |  \  |    |    |   \  ||  |    \   / /       \
 \___  /   |____/____/  \___  >___|  /__|    |______  /__||__|     \_/ /______  /
     \/                     \/     \/               \/                        \/ 

[2024/03/11 19:29:49] [ info] Configuration:
[2024/03/11 19:29:49] [ info]  flush time     | 5.000000 seconds
[2024/03/11 19:29:49] [ info]  grace          | 5 seconds
[2024/03/11 19:29:49] [ info]  daemon         | 0
[2024/03/11 19:29:49] [ info] ___________
[2024/03/11 19:29:49] [ info]  inputs:
[2024/03/11 19:29:49] [ info]      forward
[2024/03/11 19:29:49] [ info] ___________
[2024/03/11 19:29:49] [ info]  filters:
[2024/03/11 19:29:49] [ info] ___________
[2024/03/11 19:29:49] [ info]  outputs:
[2024/03/11 19:29:49] [ info]      stdout.0
[2024/03/11 19:29:49] [ info] ___________
[2024/03/11 19:29:49] [ info]  collectors:
[2024/03/11 19:29:49] [ info] [fluent bit] version=3.0.0, commit=df5ea6a818, pid=193377
[2024/03/11 19:29:49] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2024/03/11 19:29:49] [ info] [storage] ver=1.1.6, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2024/03/11 19:29:49] [ info] [cmetrics] version=0.7.0
[2024/03/11 19:29:49] [ info] [ctraces ] version=0.4.0
[2024/03/11 19:29:49] [ info] [input:forward:forward.0] initializing
[2024/03/11 19:29:49] [ info] [input:forward:forward.0] storage_strategy='memory' (memory only)
[2024/03/11 19:29:49] [debug] [forward:forward.0] created event channels: read=21 write=22
[2024/03/11 19:29:49] [debug] [in_fw] Listen='0.0.0.0' TCP_Port=24224
[2024/03/11 19:29:49] [debug] [downstream] listening on 0.0.0.0:24224
[2024/03/11 19:29:49] [ info] [input:forward:forward.0] listening on 0.0.0.0:24224
[2024/03/11 19:29:49] [ info] [output:stdout:stdout.0] worker #0 started
[2024/03/11 19:29:49] [debug] [stdout:stdout.0] created event channels: read=24 write=25
[2024/03/11 19:29:49] [ info] [sp] stream processor started
[2024/03/11 19:29:53] [debug] [input:forward:forward.0] protocol: sending HELO
[2024/03/11 19:29:53] [debug] [input:forward:forward.0] protocol: checking PING
[2024/03/11 19:29:53] [debug] [input:forward:forward.0] protocol: received PING
[2024/03/11 19:29:53] [debug] [input:forward:forward.0] protocol: sending PONG
[2024/03/11 19:29:53] [debug] [socket] could not validate socket status for #40 (don't worry)
[2024/03/11 19:29:54] [debug] [input:forward:forward.0] protocol: sending HELO
[2024/03/11 19:29:54] [debug] [input:forward:forward.0] protocol: checking PING
[2024/03/11 19:29:54] [debug] [input:forward:forward.0] protocol: received PING
[2024/03/11 19:29:54] [debug] [input:forward:forward.0] protocol: sending PONG
[2024/03/11 19:29:54] [debug] [socket] could not validate socket status for #40 (don't worry)
[2024/03/11 19:29:56] [debug] [input:forward:forward.0] protocol: sending HELO
[2024/03/11 19:29:56] [debug] [input:forward:forward.0] protocol: sending HELO
[2024/03/11 19:29:56] [debug] [input:forward:forward.0] protocol: checking PING
[2024/03/11 19:29:56] [debug] [input:forward:forward.0] protocol: received PING
[2024/03/11 19:29:56] [debug] [input:forward:forward.0] protocol: sending PONG
[2024/03/11 19:29:56] [debug] [socket] could not validate socket status for #40 (don't worry)
[2024/03/11 19:29:57] [debug] [input:forward:forward.0] protocol: sending HELO
[2024/03/11 19:29:57] [debug] [input:forward:forward.0] protocol: checking PING
[2024/03/11 19:29:57] [debug] [input:forward:forward.0] protocol: received PING
[2024/03/11 19:29:57] [debug] [input:forward:forward.0] protocol: sending PONG
[2024/03/11 19:29:57] [debug] [input chunk] update output instances with new chunk size diff=81, records=3, input=forward.0
[2024/03/11 19:29:57] [debug] [input:forward:forward.0] protocol: checking PING
[2024/03/11 19:29:57] [debug] [input:forward:forward.0] protocol: received PING
[2024/03/11 19:29:57] [debug] [input:forward:forward.0] protocol: sending PONG
[2024/03/11 19:29:57] [debug] [socket] could not validate socket status for #41 (don't worry)
[2024/03/11 19:29:57] [debug] [socket] could not validate socket status for #40 (don't worry)
[2024/03/11 19:29:58] [debug] [input:forward:forward.0] protocol: sending HELO
[2024/03/11 19:29:58] [debug] [input:forward:forward.0] protocol: checking PING
[2024/03/11 19:29:58] [debug] [input:forward:forward.0] protocol: received PING
[2024/03/11 19:29:58] [debug] [input:forward:forward.0] protocol: sending PONG
[2024/03/11 19:29:58] [debug] [input chunk] update output instances with new chunk size diff=27, records=1, input=forward.0
[2024/03/11 19:29:58] [debug] [socket] could not validate socket status for #40 (don't worry)
[2024/03/11 19:29:59] [debug] [task] created task=0x8119730 id=0 OK
[2024/03/11 19:29:59] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] test: [[1710152993.026812836, {}], {"message"=>"sample"}]
[1] test: [[1710152994.028428225, {}], {"message"=>"sample"}]
[2] test: [[1710152995.030168613, {}], {"message"=>"sample"}]
[3] test: [[1710152996.032160194, {}], {"message"=>"sample"}]
[2024/03/11 19:29:59] [debug] [out flush] cb_destroy coro_id=0
[2024/03/11 19:29:59] [debug] [task] destroy task=0x8119730 (task_id=0)
^C[2024/03/11 19:30:01] [engine] caught signal (SIGINT)
[2024/03/11 19:30:01] [ warn] [engine] service will shutdown in max 5 seconds
[2024/03/11 19:30:01] [ info] [input] pausing forward.0
[2024/03/11 19:30:01] [ info] [engine] service has stopped (0 pending tasks)
[2024/03/11 19:30:01] [ info] [input] pausing forward.0
[2024/03/11 19:30:01] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2024/03/11 19:30:01] [ info] [output:stdout:stdout.0] thread worker #0 stopped
  • Attached Valgrind output that shows no leaks or memory corruption was found
==193377== 
==193377== HEAP SUMMARY:
==193377==     in use at exit: 0 bytes in 0 blocks
==193377==   total heap usage: 12,496 allocs, 12,496 frees, 8,767,279 bytes allocated
==193377== 
==193377== All heap blocks were freed -- no leaks are possible
==193377== 
==193377== For lists of detected and suppressed errors, rerun with: -s
==193377== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Handshake process on in_forward in Fluent Bit is:

1. send HELO                                    (FW_HANDSHAKE_HELO)
2. check PING                                   (none)
3. send PONG                                    (FW_HANDSHAKE_PINGPONG)
4. Mark a connect as established                (FW_HANDSHAKE_ESTABLISHED)
5. Process retrived records

On waiting the actual records, we just early return from the event
handler to wait the next read event for ingested logs.

Signed-off-by: Hiroshi Hatake <[email protected]>
@RicardoAAD
Copy link
Collaborator

Hello @cosmo0920

I tested this implementation and worked with Classic and YAML format:

YAML config:

service:
  flush: 1
  log_level: debug
pipeline:
  inputs:
    - name: forward
      listen: 0.0.0.0
      port: "24224"
      tls: "on"
      tls.verify: "on"
      tls.crt_file: ./working/self_signed.crt
      tls.key_file: ./working/self_signed.key
      shared_key: FluentBit
      tls.debug: "4"
  outputs:
    - name: stdout
      match: '*'

Classic:

[SERVICE]
    Flush        5
    Daemon       Off
    Log_Level    debug
    HTTP_Monitor Off
    HTTP_Port    2020

[INPUT]
    Name forward
    Listen 0.0.0.0
    Port 24224
    shared_key  FluentBit
    #security.users fluentd changeme
    tls         On
    tls.verify  Off
    tls.crt_file self_signed.crt
    tls.key_file self_signed.key
[OUTPUT]
    Name  stdout
    Match **

Fluentd forwarder config

<system>
 log_level debug
</system>

<source>
        @type tail
         path Powershell.csv
         tag topic-rl-xxxx
         path_key tailed_path
         pos_file ./global.pos
         pos_file_compaction_interval 24h
         read_from_head true
         format none
</source>
<match topic-rl-xxxx>
  @type forward
  heartbeat_interval 5s
  connect_timeout 10
  transport tls
  ignore_network_errors_at_startup true
  dns_round_robin true
  tls_cert_path self_signed.crt
  tls_verify_hostname false
  <server>
       host  10.128.0.77
       port 24224
  </server>
  <security>
      self_hostname $HOSTNAME
      shared_key FluentBit
  </security>
  <buffer>
      @type file_single
      path ./tail.buffer
      flush_interval 5s
      overflow_action drop_oldest_chunk
      chunk_limit_size 1m
      total_limit_size 256m
  </buffer>
</match>

@edsiper edsiper merged commit dbfc4f6 into master Mar 14, 2024
44 checks passed
@edsiper edsiper deleted the cosmo0920-handshake-in_forward branch March 14, 2024 21:04
@edsiper edsiper added this to the Fluent Bit v3.0.0 milestone Mar 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants