Skip to content

Commit

Permalink
Collect Docker Swarm service metrics in docker input plugin (#3141)
Browse files Browse the repository at this point in the history
  • Loading branch information
adityacs authored and danielnelson committed Oct 3, 2017
1 parent 9257f3b commit 308f4af
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 0 deletions.
13 changes: 13 additions & 0 deletions plugins/inputs/docker/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
docker "github.com/docker/docker/client"
"github.com/docker/go-connections/sockets"
)
Expand All @@ -20,6 +21,9 @@ type Client interface {
ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ContainerStats(ctx context.Context, containerID string, stream bool) (types.ContainerStats, error)
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error)
TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error)
}

func NewEnvClient() (Client, error) {
Expand Down Expand Up @@ -65,3 +69,12 @@ func (c *SocketClient) ContainerStats(ctx context.Context, containerID string, s
func (c *SocketClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return c.client.ContainerInspect(ctx, containerID)
}
func (c *SocketClient) ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) {
return c.client.ServiceList(ctx, options)
}
func (c *SocketClient) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) {
return c.client.TaskList(ctx, options)
}
func (c *SocketClient) NodeList(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error) {
return c.client.NodeList(ctx, options)
}
83 changes: 83 additions & 0 deletions plugins/inputs/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"regexp"
"strconv"
Expand All @@ -14,6 +15,7 @@ import (
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
Expand All @@ -35,6 +37,8 @@ type Docker struct {
Endpoint string
ContainerNames []string

GatherServices bool `toml:"gather_services"`

Timeout internal.Duration
PerDevice bool `toml:"perdevice"`
Total bool `toml:"total"`
Expand Down Expand Up @@ -82,6 +86,9 @@ var sampleConfig = `
## To use environment variables (ie, docker-machine), set endpoint = "ENV"
endpoint = "unix:///var/run/docker.sock"
## Set to true to collect Swarm metrics(desired_replicas, running_replicas)
gather_services = false
## Only collect metrics for these containers, collect all if empty
container_names = []
Expand Down Expand Up @@ -160,6 +167,13 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
acc.AddError(err)
}

if d.GatherServices {
err := d.gatherSwarmInfo(acc)
if err != nil {
acc.AddError(err)
}
}

// List containers
opts := types.ContainerListOptions{}
ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
Expand Down Expand Up @@ -187,6 +201,75 @@ func (d *Docker) Gather(acc telegraf.Accumulator) error {
return nil
}

func (d *Docker) gatherSwarmInfo(acc telegraf.Accumulator) error {

ctx, cancel := context.WithTimeout(context.Background(), d.Timeout.Duration)
defer cancel()
services, err := d.client.ServiceList(ctx, types.ServiceListOptions{})
if err != nil {
return err
}

if len(services) > 0 {

tasks, err := d.client.TaskList(ctx, types.TaskListOptions{})
if err != nil {
return err
}

nodes, err := d.client.NodeList(ctx, types.NodeListOptions{})
if err != nil {
return err
}

running := map[string]int{}
tasksNoShutdown := map[string]int{}

activeNodes := make(map[string]struct{})
for _, n := range nodes {
if n.Status.State != swarm.NodeStateDown {
activeNodes[n.ID] = struct{}{}
}
}

for _, task := range tasks {
if task.DesiredState != swarm.TaskStateShutdown {
tasksNoShutdown[task.ServiceID]++
}

if task.Status.State == swarm.TaskStateRunning {
running[task.ServiceID]++
}
}

for _, service := range services {
tags := map[string]string{}
fields := make(map[string]interface{})
now := time.Now()
tags["service_id"] = service.ID
tags["service_name"] = service.Spec.Name
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
tags["service_mode"] = "replicated"
fields["tasks_running"] = running[service.ID]
fields["tasks_desired"] = *service.Spec.Mode.Replicated.Replicas
} else if service.Spec.Mode.Global != nil {
tags["service_mode"] = "global"
fields["tasks_running"] = running[service.ID]
fields["tasks_desired"] = tasksNoShutdown[service.ID]
} else {
log.Printf("E! Unknow Replicas Mode")
}
// Add metrics
acc.AddFields("docker_swarm",
fields,
tags,
now)
}
}

return nil
}

func (d *Docker) gatherInfo(acc telegraf.Accumulator) error {
// Init vars
dataFields := make(map[string]interface{})
Expand Down
82 changes: 82 additions & 0 deletions plugins/inputs/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/influxdata/telegraf/testutil"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
"github.com/stretchr/testify/require"
)

Expand All @@ -16,6 +17,9 @@ type MockClient struct {
ContainerListF func(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error)
ContainerStatsF func(ctx context.Context, containerID string, stream bool) (types.ContainerStats, error)
ContainerInspectF func(ctx context.Context, containerID string) (types.ContainerJSON, error)
ServiceListF func(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error)
TaskListF func(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error)
NodeListF func(ctx context.Context, options types.NodeListOptions) ([]swarm.Node, error)
}

func (c *MockClient) Info(ctx context.Context) (types.Info, error) {
Expand Down Expand Up @@ -44,6 +48,27 @@ func (c *MockClient) ContainerInspect(
return c.ContainerInspectF(ctx, containerID)
}

func (c *MockClient) ServiceList(
ctx context.Context,
options types.ServiceListOptions,
) ([]swarm.Service, error) {
return c.ServiceListF(ctx, options)
}

func (c *MockClient) TaskList(
ctx context.Context,
options types.TaskListOptions,
) ([]swarm.Task, error) {
return c.TaskListF(ctx, options)
}

func (c *MockClient) NodeList(
ctx context.Context,
options types.NodeListOptions,
) ([]swarm.Node, error) {
return c.NodeListF(ctx, options)
}

func newClient(host string, tlsConfig *tls.Config) (Client, error) {
return &MockClient{
InfoF: func(context.Context) (types.Info, error) {
Expand All @@ -58,6 +83,15 @@ func newClient(host string, tlsConfig *tls.Config) (Client, error) {
ContainerInspectF: func(context.Context, string) (types.ContainerJSON, error) {
return containerInspect, nil
},
ServiceListF: func(context.Context, types.ServiceListOptions) ([]swarm.Service, error) {
return ServiceList, nil
},
TaskListF: func(context.Context, types.TaskListOptions) ([]swarm.Task, error) {
return TaskList, nil
},
NodeListF: func(context.Context, types.NodeListOptions) ([]swarm.Node, error) {
return NodeList, nil
},
}, nil
}

Expand Down Expand Up @@ -227,6 +261,15 @@ func TestDocker_WindowsMemoryContainerStats(t *testing.T) {
ContainerInspectF: func(ctx context.Context, containerID string) (types.ContainerJSON, error) {
return containerInspect, nil
},
ServiceListF: func(context.Context, types.ServiceListOptions) ([]swarm.Service, error) {
return ServiceList, nil
},
TaskListF: func(context.Context, types.TaskListOptions) ([]swarm.Task, error) {
return TaskList, nil
},
NodeListF: func(context.Context, types.NodeListOptions) ([]swarm.Node, error) {
return NodeList, nil
},
}, nil
},
}
Expand Down Expand Up @@ -436,3 +479,42 @@ func TestDockerGatherInfo(t *testing.T) {
},
)
}

func TestDockerGatherSwarmInfo(t *testing.T) {
var acc testutil.Accumulator
d := Docker{
newClient: newClient,
}

err := acc.GatherError(d.Gather)
require.NoError(t, err)

d.gatherSwarmInfo(&acc)

// test docker_container_net measurement
acc.AssertContainsTaggedFields(t,
"docker_swarm",
map[string]interface{}{
"tasks_running": int(2),
"tasks_desired": uint64(2),
},
map[string]string{
"service_id": "qolkls9g5iasdiuihcyz9rnx2",
"service_name": "test1",
"service_mode": "replicated",
},
)

acc.AssertContainsTaggedFields(t,
"docker_swarm",
map[string]interface{}{
"tasks_running": int(1),
"tasks_desired": int(1),
},
map[string]string{
"service_id": "qolkls9g5iasdiuihcyz9rn3",
"service_name": "test2",
"service_mode": "global",
},
)
}
74 changes: 74 additions & 0 deletions plugins/inputs/docker/docker_testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/registry"
"github.com/docker/docker/api/types/swarm"
)

var info = types.Info{
Expand Down Expand Up @@ -133,6 +134,79 @@ var containerList = []types.Container{
},
}

var two = uint64(2)
var ServiceList = []swarm.Service{
swarm.Service{
ID: "qolkls9g5iasdiuihcyz9rnx2",
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: "test1",
},
Mode: swarm.ServiceMode{
Replicated: &swarm.ReplicatedService{
Replicas: &two,
},
},
},
},
swarm.Service{
ID: "qolkls9g5iasdiuihcyz9rn3",
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: "test2",
},
Mode: swarm.ServiceMode{
Global: &swarm.GlobalService{},
},
},
},
}

var TaskList = []swarm.Task{
swarm.Task{
ID: "kwh0lv7hwwbh",
ServiceID: "qolkls9g5iasdiuihcyz9rnx2",
NodeID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.TaskStatus{
State: "running",
},
DesiredState: "running",
},
swarm.Task{
ID: "u78m5ojbivc3",
ServiceID: "qolkls9g5iasdiuihcyz9rnx2",
NodeID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.TaskStatus{
State: "running",
},
DesiredState: "running",
},
swarm.Task{
ID: "1n1uilkhr98l",
ServiceID: "qolkls9g5iasdiuihcyz9rn3",
NodeID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.TaskStatus{
State: "running",
},
DesiredState: "running",
},
}

var NodeList = []swarm.Node{
swarm.Node{
ID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.NodeStatus{
State: "ready",
},
},
swarm.Node{
ID: "0cl4jturcyd1ks3fwpd010kor",
Status: swarm.NodeStatus{
State: "ready",
},
},
}

func containerStats() types.ContainerStats {
var stat types.ContainerStats
jsonStat := `
Expand Down

0 comments on commit 308f4af

Please sign in to comment.