Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #21457 to 7.x: Remove nil-zero metrics and linux-exclusive metrics from Metricbeat #21597

Merged
merged 2 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Move service config under metrics and simplify metric types. {pull}18691[18691]
- Fix ECS compliance of user.id field in system/users metricset {pull}19019[19019]
- Rename googlecloud stackdriver metricset to metrics. {pull}19718[19718]
- Remove "invalid zero" metrics on Windows and Darwin, don't report linux-only memory and diskio metrics when running under agent. {pull}21457[21457]

*Packetbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
sigar "github.com/elastic/gosigar"
)

// mapping fields which output by `iostat -x` on linux
// IOMetric contains mapping fields which are outputed by `iostat -x` on linux
//
// Device: rrqm/s wrqm/s r/s w/s rsec/s wsec/s avgrq-sz avgqu-sz await r_await w_await svctm %util
// sda 0.06 0.78 0.09 0.27 9.42 8.06 48.64 0.00 1.34 0.99 1.45 0.77 0.03
type DiskIOMetric struct {
type IOMetric struct {
ReadRequestMergeCountPerSec float64 `json:"rrqmCps"`
WriteRequestMergeCountPerSec float64 `json:"wrqmCps"`
ReadRequestCountPerSec float64 `json:"rrqCps"`
Expand All @@ -46,8 +46,9 @@ type DiskIOMetric struct {
BusyPct float64 `json:"busy"`
}

type DiskIOStat struct {
// IOStat carries disk statistics for all devices
type IOStat struct {
lastDiskIOCounters map[string]disk.IOCountersStat
lastCpu sigar.Cpu
curCpu sigar.Cpu
lastCPU sigar.Cpu
curCPU sigar.Cpu
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
"github.com/elastic/beats/v7/libbeat/metric/system/cpu"
)

func Get_CLK_TCK() uint32 {
// GetCLKTCK emulates the _SC_CLK_TCK syscall
func GetCLKTCK() uint32 {
// return uint32(C.sysconf(C._SC_CLK_TCK))
// NOTE: _SC_CLK_TCK should be fetched from sysconf using cgo
return uint32(100)
Expand All @@ -38,77 +39,78 @@ func IOCounters(names ...string) (map[string]disk.IOCountersStat, error) {
}

// NewDiskIOStat :init DiskIOStat object.
func NewDiskIOStat() *DiskIOStat {
return &DiskIOStat{
func NewDiskIOStat() *IOStat {
return &IOStat{
lastDiskIOCounters: map[string]disk.IOCountersStat{},
}
}

// OpenSampling creates current cpu sampling
// need call as soon as get IOCounters.
func (stat *DiskIOStat) OpenSampling() error {
return stat.curCpu.Get()
func (stat *IOStat) OpenSampling() error {
return stat.curCPU.Get()
}

// CalIOStatistics calculates IO statistics.
func (stat *DiskIOStat) CalIOStatistics(result *DiskIOMetric, counter disk.IOCountersStat) error {
// CalcIOStatistics calculates IO statistics.
func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, error) {
var last disk.IOCountersStat
var ok bool

// if last counter not found, create one and return all 0
if last, ok = stat.lastDiskIOCounters[counter.Name]; !ok {
stat.lastDiskIOCounters[counter.Name] = counter
return nil
return IOMetric{}, nil
}

// calculate the delta ms between the CloseSampling and OpenSampling
deltams := 1000.0 * float64(stat.curCpu.Total()-stat.lastCpu.Total()) / float64(cpu.NumCores) / float64(Get_CLK_TCK())
deltams := 1000.0 * float64(stat.curCPU.Total()-stat.lastCPU.Total()) / float64(cpu.NumCores) / float64(GetCLKTCK())
if deltams <= 0 {
return errors.New("The delta cpu time between close sampling and open sampling is less or equal to 0")
return IOMetric{}, errors.New("The delta cpu time between close sampling and open sampling is less or equal to 0")
}

rd_ios := counter.ReadCount - last.ReadCount
rd_merges := counter.MergedReadCount - last.MergedReadCount
rd_bytes := counter.ReadBytes - last.ReadBytes
rd_ticks := counter.ReadTime - last.ReadTime
wr_ios := counter.WriteCount - last.WriteCount
wr_merges := counter.MergedWriteCount - last.MergedWriteCount
wr_bytes := counter.WriteBytes - last.WriteBytes
wr_ticks := counter.WriteTime - last.WriteTime
rdIOs := counter.ReadCount - last.ReadCount
rdMerges := counter.MergedReadCount - last.MergedReadCount
rdBytes := counter.ReadBytes - last.ReadBytes
rdTicks := counter.ReadTime - last.ReadTime
wrIOs := counter.WriteCount - last.WriteCount
wrMerges := counter.MergedWriteCount - last.MergedWriteCount
wrBytes := counter.WriteBytes - last.WriteBytes
wrTicks := counter.WriteTime - last.WriteTime
ticks := counter.IoTime - last.IoTime
aveq := counter.WeightedIO - last.WeightedIO
n_ios := rd_ios + wr_ios
n_ticks := rd_ticks + wr_ticks
n_bytes := rd_bytes + wr_bytes
nIOs := rdIOs + wrIOs
nTicks := rdTicks + wrTicks
nBytes := rdBytes + wrBytes
size := float64(0)
wait := float64(0)
svct := float64(0)

if n_ios > 0 {
size = float64(n_bytes) / float64(n_ios)
wait = float64(n_ticks) / float64(n_ios)
svct = float64(ticks) / float64(n_ios)
if nIOs > 0 {
size = float64(nBytes) / float64(nIOs)
wait = float64(nTicks) / float64(nIOs)
svct = float64(ticks) / float64(nIOs)
}

queue := float64(aveq) / deltams
per_sec := func(x uint64) float64 {
perSec := func(x uint64) float64 {
return 1000.0 * float64(x) / deltams
}

result.ReadRequestMergeCountPerSec = per_sec(rd_merges)
result.WriteRequestMergeCountPerSec = per_sec(wr_merges)
result.ReadRequestCountPerSec = per_sec(rd_ios)
result.WriteRequestCountPerSec = per_sec(wr_ios)
result.ReadBytesPerSec = per_sec(rd_bytes)
result.WriteBytesPerSec = per_sec(wr_bytes)
result := IOMetric{}
result.ReadRequestMergeCountPerSec = perSec(rdMerges)
result.WriteRequestMergeCountPerSec = perSec(wrMerges)
result.ReadRequestCountPerSec = perSec(rdIOs)
result.WriteRequestCountPerSec = perSec(wrIOs)
result.ReadBytesPerSec = perSec(rdBytes)
result.WriteBytesPerSec = perSec(wrBytes)
result.AvgRequestSize = size
result.AvgQueueSize = queue
result.AvgAwaitTime = wait
if rd_ios > 0 {
result.AvgReadAwaitTime = float64(rd_ticks) / float64(rd_ios)
if rdIOs > 0 {
result.AvgReadAwaitTime = float64(rdTicks) / float64(rdIOs)
}
if wr_ios > 0 {
result.AvgWriteAwaitTime = float64(wr_ticks) / float64(wr_ios)
if wrIOs > 0 {
result.AvgWriteAwaitTime = float64(wrTicks) / float64(wrIOs)
}
result.AvgServiceTime = svct
result.BusyPct = 100.0 * float64(ticks) / deltams
Expand All @@ -117,10 +119,11 @@ func (stat *DiskIOStat) CalIOStatistics(result *DiskIOMetric, counter disk.IOCou
}

stat.lastDiskIOCounters[counter.Name] = counter
return nil
return result, nil

}

func (stat *DiskIOStat) CloseSampling() {
stat.lastCpu = stat.curCpu
// CloseSampling closes the disk sampler
func (stat *IOStat) CloseSampling() {
stat.lastCPU = stat.curCPU
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,53 +27,11 @@ import (
"github.com/stretchr/testify/assert"

sigar "github.com/elastic/gosigar"

mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/metricbeat/module/system"
)

func Test_Get_CLK_TCK(t *testing.T) {
func Test_GetCLKTCK(t *testing.T) {
//usually the tick is 100
assert.Equal(t, uint32(100), Get_CLK_TCK())
}

func TestDataNameFilter(t *testing.T) {
oldFS := system.HostFS
newFS := "_meta/testdata"
system.HostFS = &newFS
defer func() {
system.HostFS = oldFS
}()

conf := map[string]interface{}{
"module": "system",
"metricsets": []string{"diskio"},
"diskio.include_devices": []string{"sda", "sda1", "sda2"},
}

f := mbtest.NewReportingMetricSetV2Error(t, conf)
data, errs := mbtest.ReportingFetchV2Error(f)
assert.Empty(t, errs)
assert.Equal(t, 3, len(data))
}

func TestDataEmptyFilter(t *testing.T) {
oldFS := system.HostFS
newFS := "_meta/testdata"
system.HostFS = &newFS
defer func() {
system.HostFS = oldFS
}()

conf := map[string]interface{}{
"module": "system",
"metricsets": []string{"diskio"},
}

f := mbtest.NewReportingMetricSetV2Error(t, conf)
data, errs := mbtest.ReportingFetchV2Error(f)
assert.Empty(t, errs)
assert.Equal(t, 10, len(data))
assert.Equal(t, uint32(100), GetCLKTCK())
}

func TestDiskIOStat_CalIOStatistics(t *testing.T) {
Expand All @@ -85,27 +43,27 @@ func TestDiskIOStat_CalIOStatistics(t *testing.T) {
Name: "iostat",
}

stat := &DiskIOStat{
stat := &IOStat{
lastDiskIOCounters: map[string]disk.IOCountersStat{
"iostat": disk.IOCountersStat{
"iostat": {
ReadCount: 3,
WriteCount: 5,
ReadTime: 7,
WriteTime: 11,
Name: "iostat",
},
},
lastCpu: sigar.Cpu{Idle: 100},
curCpu: sigar.Cpu{Idle: 1},
lastCPU: sigar.Cpu{Idle: 100},
curCPU: sigar.Cpu{Idle: 1},
}

expected := DiskIOMetric{
expected := IOMetric{
AvgAwaitTime: 24.0 / 22.0,
AvgReadAwaitTime: 1.2,
AvgWriteAwaitTime: 1,
}
var got DiskIOMetric
err := stat.CalIOStatistics(&got, counter)

got, err := stat.CalcIOStatistics(counter)
if err != nil {
t.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ import (
)

// NewDiskIOStat :init DiskIOStat object.
func NewDiskIOStat() *DiskIOStat {
return &DiskIOStat{
func NewDiskIOStat() *IOStat {
return &IOStat{
lastDiskIOCounters: map[string]disk.IOCountersStat{},
}
}

// OpenSampling stub for linux implementation.
func (stat *DiskIOStat) OpenSampling() error {
func (stat *IOStat) OpenSampling() error {
return nil
}

// CalIOStatistics stub for linux implementation.
func (stat *DiskIOStat) CalIOStatistics(result *DiskIOMetric, counter disk.IOCountersStat) error {
return errors.New("not implemented out of linux")
// CalcIOStatistics stub for linux implementation.
func (stat *IOStat) CalcIOStatistics(rcounter disk.IOCountersStat) (IOMetric, error) {
return IOMetric{}, errors.New("not implemented out of linux")
}

// CloseSampling stub for linux implementation.
func (stat *DiskIOStat) CloseSampling() {}
func (stat *IOStat) CloseSampling() {}

// IOCounters should map functionality to disk package for linux os.
func IOCounters(names ...string) (map[string]disk.IOCountersStat, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ import (
)

// NewDiskIOStat :init DiskIOStat object.
func NewDiskIOStat() *DiskIOStat {
return &DiskIOStat{
func NewDiskIOStat() *IOStat {
return &IOStat{
lastDiskIOCounters: map[string]disk.IOCountersStat{},
}
}

// OpenSampling stub for linux implementation.
func (stat *DiskIOStat) OpenSampling() error {
func (stat *IOStat) OpenSampling() error {
return nil
}

// CalIOStatistics stub for linux implementation.
func (stat *DiskIOStat) CalIOStatistics(result *DiskIOMetric, counter disk.IOCountersStat) error {
return errors.New("iostat is not implement for Windows")
// CalcIOStatistics stub for linux implementation.
func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, error) {
return IOMetric{}, errors.New("iostat is not implement for Windows")
}

// CloseSampling stub for linux implementation.
func (stat *DiskIOStat) CloseSampling() {}
func (stat *IOStat) CloseSampling() {}

// IOCounters should map functionality to disk package for linux os.
func IOCounters(names ...string) (map[string]disk.IOCountersStat, error) {
Expand Down
Loading