Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move ignore_outgoing to packetbeat #2393

Merged
merged 1 commit into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
*Packetbeat*
- Group HTTP fields under `http.request` and `http.response` {pull}2167[2167]
- Export `http.request.body` and `http.response.body` when configured under `include_body_for` {pull}2167[2167]
- Move `ignore_outgoing` config to `packetbeat.ignore_outgoing` {pull}2393[2393]

*Topbeat*

Expand Down
5 changes: 0 additions & 5 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,6 @@ filebeat.prospectors:
# sub-dictionary. Default is false.
#fields_under_root: false

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#ignore_outgoing: true

# How often (in seconds) shippers are publishing their IPs to the topology map.
# The default is 10 seconds.
#refresh_topology_freq: 10
Expand Down
5 changes: 0 additions & 5 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ tags: [
{%- endif -%}
]

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
# ignore_outgoing: true

{% if geoip_paths is not none %}
geoip:
paths: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,18 @@ output:



############################# Shipper #########################################

shipper:
# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
# If this options is not defined, the hostname is used.
#name:

# The tags of the shipper are included in their own field with each
# transaction published. Tags make it easy to group servers by different
# logical properties.
#tags: ["service-X", "web-tier"]

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#ignore_outgoing: true
############################# Beat #########################################

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
# If this options is not defined, the hostname is used.
#name:

# The tags of the shipper are included in their own field with each
# transaction published. Tags make it easy to group servers by different
# logical properties.
#tags: ["service-X", "web-tier"]



############################# Logging #########################################
Expand Down
5 changes: 0 additions & 5 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@
# sub-dictionary. Default is false.
#fields_under_root: false

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#ignore_outgoing: true

# How often (in seconds) shippers are publishing their IPs to the topology map.
# The default is 10 seconds.
#refresh_topology_freq: 10
Expand Down
37 changes: 0 additions & 37 deletions libbeat/docs/generalconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ Here is an example configuration:
# logical properties.
tags: ["service-X", "web-tier"]

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
ignore_outgoing: true

# How often (in seconds) shippers are publishing their IPs to the topology map.
# The default is 10 seconds.
refresh_topology_freq: 10
Expand Down Expand Up @@ -124,38 +119,6 @@ fields:
region: us-east-1
------------------------------------------------------------------------------

===== ignore_outgoing

If the `ignore_outgoing` option is enabled, the Beat ignores all the
transactions initiated from the server running the Beat.

This is useful when two Beats publish the same transactions. Because one Beat
sees the transaction in its outgoing queue and the other sees it in its incoming
queue, you can end up with duplicate transactions. To remove the duplicates, you
can enable the `ignore_outgoing` option on one of the servers.

For example, in the following scenario, you see a 3-server architecture
where a Beat is installed on each server. t1 is the transaction exchanged between
Server1 and Server2, and t2 is the transaction between Server2 and Server3.

image:./images/option_ignore_outgoing.png[Beats Architecture]

By default, each transaction is indexed twice because Beat2
sees both transactions. So you would see the following published transactions
(when `ignore_outgoing` is false):

- Beat1: t1
- Beat2: t1 and t2
- Beat3: t2

To avoid duplicates, you can force your Beats to send only the incoming
transactions and ignore the transactions created by the local server. So you would
see the following published transactions (when `ignore_outgoing` is true):

- Beat1: none
- Beat2: t1
- Beat3: t2

===== refresh_topology_freq

The refresh interval of the topology map in
Expand Down
7 changes: 0 additions & 7 deletions libbeat/publisher/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ type BeatPublisher struct {
Index string
Output []*outputWorker
TopologyOutput outputs.TopologyOutputer
ignoreOutgoing bool
geoLite *libgeo.GeoIP
Processors *processors.Processors

Expand Down Expand Up @@ -87,7 +86,6 @@ type ShipperConfig struct {
common.EventMetadata `config:",inline"` // Fields and tags to add to each event.
Name string `config:"name"`
RefreshTopologyFreq time.Duration `config:"refresh_topology_freq"`
Ignore_outgoing bool `config:"ignore_outgoing"`
Topology_expire int `config:"topology_expire"`
Geoip common.Geoip `config:"geoip"`

Expand Down Expand Up @@ -145,10 +143,6 @@ func (publisher *BeatPublisher) GeoLite() *libgeo.GeoIP {
return publisher.geoLite
}

func (publisher *BeatPublisher) IgnoreOutgoing() bool {
return publisher.ignoreOutgoing
}

func (publisher *BeatPublisher) Connect() Client {
atomic.AddUint32(&publisher.numClients, 1)
return newClient(publisher)
Expand Down Expand Up @@ -207,7 +201,6 @@ func (publisher *BeatPublisher) init(
processors *processors.Processors,
) error {
var err error
publisher.ignoreOutgoing = shipper.Ignore_outgoing
publisher.Processors = processors

publisher.disabled = *publishDisabled
Expand Down
9 changes: 6 additions & 3 deletions libbeat/scripts/migrate_beat_config_1_x_to_5_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def migrate_packetbeat(content):
Changes things like `interfaces:` to `packetbeat.interfaces:`
at the top level.
"""
sections = ["interfaces", "protocols", "procs", "runoptions"]
sections = ["interfaces", "protocols", "procs", "runoptions", "ignore_outgoing"]
lines = content.splitlines()
outlines = []
for line in lines:
Expand Down Expand Up @@ -61,8 +61,9 @@ def main():

with open(args.file, "r") as f:
content = f.read()
out = migrate_packetbeat(content)
out = migrate_shipper(out)
# Shipper must be migrated first for ignore_outgoing to be applied properly
out = migrate_shipper(content)
out = migrate_packetbeat(out)

if args.dry:
print(out)
Expand Down Expand Up @@ -92,6 +93,7 @@ def test_migrate_packetbeat():
ports: [53]
runoptions:
procs:
ignore_outgoing: true
"""

output = migrate_packetbeat(test)
Expand All @@ -109,6 +111,7 @@ def test_migrate_packetbeat():
ports: [53]
packetbeat.runoptions:
packetbeat.procs:
packetbeat.ignore_outgoing: true
"""


Expand Down
6 changes: 0 additions & 6 deletions libbeat/tests/system/config/mockbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ tags: [
{%- endif -%}]


# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
# ignore_outgoing: true



############################# Output ############################################

Expand Down
5 changes: 0 additions & 5 deletions metricbeat/metricbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ metricbeat.modules:
# sub-dictionary. Default is false.
#fields_under_root: false

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#ignore_outgoing: true

# How often (in seconds) shippers are publishing their IPs to the topology map.
# The default is 10 seconds.
#refresh_topology_freq: 10
Expand Down
5 changes: 0 additions & 5 deletions metricbeat/tests/system/config/metricbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ tags: [
{%- endif -%}
]

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
# ignore_outgoing: true

{% if geoip_paths is not none %}
geoip:
paths: [
Expand Down
3 changes: 2 additions & 1 deletion packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (pb *Packetbeat) init(b *beat.Beat) error {
if b.Config.Shipper.BulkQueueSize != nil {
bulkQueueSize = *b.Config.Shipper.BulkQueueSize
}
pb.Pub, err = publish.NewPublisher(b.Publisher, queueSize, bulkQueueSize)

pb.Pub, err = publish.NewPublisher(b.Publisher, queueSize, bulkQueueSize, pb.Config.IgnoreOutgoing)
if err != nil {
return fmt.Errorf("Initializing publisher failed: %v", err)
}
Expand Down
11 changes: 6 additions & 5 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
)

type Config struct {
Interfaces InterfacesConfig `config:"interfaces"`
Flows *Flows `config:"flows"`
Protocols map[string]*common.Config `config:"protocols"`
Procs procs.ProcsConfig `config:"procs"`
RunOptions droppriv.RunOptions
Interfaces InterfacesConfig `config:"interfaces"`
Flows *Flows `config:"flows"`
Protocols map[string]*common.Config `config:"protocols"`
Procs procs.ProcsConfig `config:"procs"`
IgnoreOutgoing bool `config:"ignore_outgoing"`
RunOptions droppriv.RunOptions
}

type InterfacesConfig struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,38 @@ NOTE: This setting disables automatic generation of the BPF filter. If
you use this setting, it's your responsibility to keep the BPF filters in sync with the
ports defined in the `protocols` section.

===== ignore_outgoing

If the `ignore_outgoing` option is enabled, Packetbeat ignores all the
transactions initiated from the server running Packetbeat.

This is useful when two Packetbeat instances publish the same transactions. Because one Packetbeat
sees the transaction in its outgoing queue and the other sees it in its incoming
queue, you can end up with duplicate transactions. To remove the duplicates, you
can enable the `packetbeat.ignore_outgoing` option on one of the servers.

For example, in the following scenario, you see a 3-server architecture
where a Beat is installed on each server. t1 is the transaction exchanged between
Server1 and Server2, and t2 is the transaction between Server2 and Server3.

image:./images/option_ignore_outgoing.png[Beats Architecture]

By default, each transaction is indexed twice because Beat2
sees both transactions. So you would see the following published transactions
(when `ignore_outgoing` is false):

- Beat1: t1
- Beat2: t1 and t2
- Beat3: t2

To avoid duplicates, you can force your Beats to send only the incoming
transactions and ignore the transactions created by the local server. So you would
see the following published transactions (when `ignore_outgoing` is true):

- Beat1: none
- Beat2: t1
- Beat3: t2


[[configuration-flows]]
=== Flows Configuration
Expand Down
4 changes: 4 additions & 0 deletions packetbeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,7 @@ packetbeat.protocols.nfs:
# - process: app
# cmdline_grep: gunicorn

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#packetbeat.ignore_outgoing: true
9 changes: 4 additions & 5 deletions packetbeat/packetbeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,10 @@ packetbeat.protocols.nfs:
# - process: app
# cmdline_grep: gunicorn

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#packetbeat.ignore_outgoing: true

#================================ General =====================================

Expand All @@ -468,11 +472,6 @@ packetbeat.protocols.nfs:
# sub-dictionary. Default is false.
#fields_under_root: false

# Uncomment the following if you want to ignore transactions created
# by the server on which the shipper is installed. This option is useful
# to remove duplicates if shippers are installed on multiple servers.
#ignore_outgoing: true

# How often (in seconds) shippers are publishing their IPs to the topology map.
# The default is 10 seconds.
#refresh_topology_freq: 10
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type TopologyProvider interface {
IsPublisherIP(ip string) bool
GetServerName(ip string) string
GeoLite() *libgeo.GeoIP
IgnoreOutgoing() bool
}

func (t *ChanTransactions) PublishTransaction(event common.MapStr) bool {
Expand All @@ -58,6 +57,7 @@ var debugf = logp.MakeDebug("publish")
func NewPublisher(
pub publisher.Publisher,
hwm, bulkHWM int,
ignoreOutgoing bool,
) (*PacketbeatPublisher, error) {
topo, ok := pub.(TopologyProvider)
if !ok {
Expand All @@ -68,7 +68,7 @@ func NewPublisher(
pub: pub,
topo: topo,
geoLite: topo.GeoLite(),
ignoreOutgoing: topo.IgnoreOutgoing(),
ignoreOutgoing: ignoreOutgoing,
client: pub.Connect(),
done: make(chan struct{}),
trans: make(chan common.MapStr, hwm),
Expand Down
6 changes: 3 additions & 3 deletions packetbeat/publish/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestFilterEvent(t *testing.T) {

func TestDirectionOut(t *testing.T) {
publisher := newTestPublisher([]string{"192.145.2.4"})
ppub, _ := NewPublisher(publisher, 1000, 1)
ppub, _ := NewPublisher(publisher, 1000, 1, false)

event := common.MapStr{
"src": &common.Endpoint{
Expand All @@ -89,7 +89,7 @@ func TestDirectionOut(t *testing.T) {

func TestDirectionIn(t *testing.T) {
publisher := newTestPublisher([]string{"192.145.2.5"})
ppub, _ := NewPublisher(publisher, 1000, 1)
ppub, _ := NewPublisher(publisher, 1000, 1, false)

event := common.MapStr{
"src": &common.Endpoint{
Expand Down Expand Up @@ -121,7 +121,7 @@ func newTestPublisher(ips []string) *publisher.BeatPublisher {

func TestNoDirection(t *testing.T) {
publisher := newTestPublisher([]string{"192.145.2.6"})
ppub, _ := NewPublisher(publisher, 1000, 1)
ppub, _ := NewPublisher(publisher, 1000, 1, false)

event := common.MapStr{
"src": &common.Endpoint{
Expand Down
Loading