Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect Docker Swarm metrics in docker input plugin #3141

Merged
merged 14 commits into from
Oct 3, 2017
13 changes: 13 additions & 0 deletions plugins/inputs/docker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
docker "github.com/docker/docker/client"
"github.com/docker/go-connections/sockets"
)
Expand All @@ -20,6 +21,9 @@ type Client interface {
ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ContainerStats(ctx context.Context, containerID string, stream bool) (types.ContainerStats, error)
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error)
TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error)
}

func NewEnvClient() (Client, error) {
Expand Down Expand Up @@ -65,3 +69,12 @@ func (c *SocketClient) ContainerStats(ctx context.Context, containerID string, s
func (c *SocketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return c.client.ContainerInspect(ctx, containerID)
}
func (c *SocketClient) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) {
return c.client.ServiceList(ctx, options)
}
func (c *SocketClient) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) {
return c.client.TaskList(ctx, options)
}
func (c *SocketClient) NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error) {
return c.client.NodeList(ctx, options)
}
83 changes: 83 additions & 0 deletions plugins/inputs/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"regexp"
"strconv"
Expand All @@ -14,6 +15,7 @@ import (
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
Expand All @@ -35,6 +37,8 @@ type Docker struct {
Endpoint string
ContainerNames []string

GatherServices bool `toml:"gather_services"`

Timeout internal.Duration
PerDevice bool `toml:"perdevice"`
Total bool `toml:"total"`
Expand Down Expand Up @@ -82,6 +86,9 @@ var sampleConfig = `
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
endpoint = "unix:///var/run/docker.sock"

## Set to true to collect Swarm metrics(desired_replicas, running_replicas)
gather_services = false

## Only collect metrics for these containers, collect all if empty
container_names = []

Expand Down Expand Up @@ -160,6 +167,13 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
acc.AddError(err)
}

if d.GatherServices {
err := d.gatherSwarmInfo(acc)
if err != nil {
acc.AddError(err)
}
}

// List containers
opts := types.ContainerListOptions{}
ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
Expand Down Expand Up @@ -187,6 +201,75 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
return nil
}

func (d *Docker) gatherSwarmInfo(acc telegraf.Accumulator) error {

ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
defer cancel()
services, err := d.client.ServiceList(ctx, types.ServiceListOptions{})
if err != nil {
return err
}

if len(services) > 0 {

tasks, err := d.client.TaskList(ctx, types.TaskListOptions{})
if err != nil {
return err
}

nodes, err := d.client.NodeList(ctx, types.NodeListOptions{})
if err != nil {
return err
}

running := map[string]int{}
tasksNoShutdown := map[string]int{}

activeNodes := make(map[string]struct{})
for _, n := range nodes {
if n.Status.State != swarm.NodeStateDown {
activeNodes[n.ID] = struct{}{}
}
}

for _, task := range tasks {
if task.DesiredState != swarm.TaskStateShutdown {
tasksNoShutdown[task.ServiceID]++
}

if task.Status.State == swarm.TaskStateRunning {
running[task.ServiceID]++
}
}

for _, service := range services {
tags := map[string]string{}
fields := make(map[string]interface{})
now := time.Now()
tags["service_id"] = service.ID
tags["service_name"] = service.Spec.Name
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
tags["service_mode"] = "replicated"
fields["tasks_running"] = running[service.ID]
fields["tasks_desired"] = *service.Spec.Mode.Replicated.Replicas
} else if service.Spec.Mode.Global != nil {
tags["service_mode"] = "global"
fields["tasks_running"] = running[service.ID]
fields["tasks_desired"] = tasksNoShutdown[service.ID]
} else {
log.Printf("E! Unknow Replicas Mode")
}
// Add metrics
acc.AddFields("docker_swarm",
fields,
tags,
now)
}
}

return nil
}

func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
// Init vars
dataFields := make(map[string]interface{})
Expand Down
82 changes: 82 additions & 0 deletions plugins/inputs/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/influxdata/telegraf/testutil"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
"github.com/stretchr/testify/require"
)

Expand All @@ -16,6 +17,9 @@ type MockClient struct {
ContainerListF func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ContainerStatsF func(ctx context.Context, containerID string, stream bool) (types.ContainerStats, error)
ContainerInspectF func(ctx context.Context, containerID string) (types.ContainerJSON, error)
ServiceListF func(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error)
TaskListF func(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
NodeListF func(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error)
}

func (c *MockClient) Info(ctx context.Context) (types.Info, error) {
Expand Down Expand Up @@ -44,6 +48,27 @@ func (c *MockClient) ContainerInspect(
return c.ContainerInspectF(ctx, containerID)
}

func (c *MockClient) ServiceList(
ctx context.Context,
options types.ServiceListOptions,
) ([]swarm.Service, error) {
return c.ServiceListF(ctx, options)
}

func (c *MockClient) TaskList(
ctx context.Context,
options types.TaskListOptions,
) ([]swarm.Task, error) {
return c.TaskListF(ctx, options)
}

func (c *MockClient) NodeList(
ctx context.Context,
options types.NodeListOptions,
) ([]swarm.Node, error) {
return c.NodeListF(ctx, options)
}

func newClient(host string, tlsConfig *tls.Config) (Client, error) {
return &MockClient{
InfoF: func(context.Context) (types.Info, error) {
Expand All @@ -58,6 +83,15 @@ func newClient(host string, tlsConfig *tls.Config) (Client, error) {
ContainerInspectF: func(context.Context, string) (types.ContainerJSON, error) {
return containerInspect, nil
},
ServiceListF: func(context.Context, types.ServiceListOptions) ([]swarm.Service, error) {
return ServiceList, nil
},
TaskListF: func(context.Context, types.TaskListOptions) ([]swarm.Task, error) {
return TaskList, nil
},
NodeListF: func(context.Context, types.NodeListOptions) ([]swarm.Node, error) {
return NodeList, nil
},
}, nil
}

Expand Down Expand Up @@ -227,6 +261,15 @@ func TestDocker_WindowsMemoryContainerStats(t *testing.T) {
ContainerInspectF: func(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return containerInspect, nil
},
ServiceListF: func(context.Context, types.ServiceListOptions) ([]swarm.Service, error) {
return ServiceList, nil
},
TaskListF: func(context.Context, types.TaskListOptions) ([]swarm.Task, error) {
return TaskList, nil
},
NodeListF: func(context.Context, types.NodeListOptions) ([]swarm.Node, error) {
return NodeList, nil
},
}, nil
},
}
Expand Down Expand Up @@ -436,3 +479,42 @@ func TestDockerGatherInfo(t *testing.T) {
},
)
}

func TestDockerGatherSwarmInfo(t *testing.T) {
var acc testutil.Accumulator
d := Docker{
newClient: newClient,
}

err := acc.GatherError(d.Gather)
require.NoError(t, err)

d.gatherSwarmInfo(&acc)

// test docker_container_net measurement
acc.AssertContainsTaggedFields(t,
"docker_swarm",
map[string]interface{}{
"tasks_running": int(2),
"tasks_desired": uint64(2),
},
map[string]string{
"service_id": "qolkls9g5iasdiuihcyz9rnx2",
"service_name": "test1",
"service_mode": "replicated",
},
)

acc.AssertContainsTaggedFields(t,
"docker_swarm",
map[string]interface{}{
"tasks_running": int(1),
"tasks_desired": int(1),
},
map[string]string{
"service_id": "qolkls9g5iasdiuihcyz9rn3",
"service_name": "test2",
"service_mode": "global",
},
)
}
74 changes: 74 additions & 0 deletions plugins/inputs/docker/docker_testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/registry"
"github.com/docker/docker/api/types/swarm"
)

var info = types.Info{
Expand Down Expand Up @@ -133,6 +134,79 @@ var containerList = []types.Container{
},
}

var two = uint64(2)
var ServiceList = []swarm.Service{
swarm.Service{
ID: "qolkls9g5iasdiuihcyz9rnx2",
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: "test1",
},
Mode: swarm.ServiceMode{
Replicated: &swarm.ReplicatedService{
Replicas: &two,
},
},
},
},
swarm.Service{
ID: "qolkls9g5iasdiuihcyz9rn3",
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: "test2",
},
Mode: swarm.ServiceMode{
Global: &swarm.GlobalService{},
},
},
},
}

var TaskList = []swarm.Task{
swarm.Task{
ID: "kwh0lv7hwwbh",
ServiceID: "qolkls9g5iasdiuihcyz9rnx2",
NodeID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.TaskStatus{
State: "running",
},
DesiredState: "running",
},
swarm.Task{
ID: "u78m5ojbivc3",
ServiceID: "qolkls9g5iasdiuihcyz9rnx2",
NodeID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.TaskStatus{
State: "running",
},
DesiredState: "running",
},
swarm.Task{
ID: "1n1uilkhr98l",
ServiceID: "qolkls9g5iasdiuihcyz9rn3",
NodeID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.TaskStatus{
State: "running",
},
DesiredState: "running",
},
}

var NodeList = []swarm.Node{
swarm.Node{
ID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.NodeStatus{
State: "ready",
},
},
swarm.Node{
ID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.NodeStatus{
State: "ready",
},
},
}

func containerStats() types.ContainerStats {
var stat types.ContainerStats
jsonStat := `
Expand Down