Skip to content

Commit

Permalink
Merge pull request #856 from stokito/events_filters
Browse files Browse the repository at this point in the history
Events filters
  • Loading branch information
fsouza authored Jan 5, 2021
2 parents f9ebdbc + d87c203 commit ac3178d
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 15 deletions.
66 changes: 53 additions & 13 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,48 @@ package docker
import (
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net"
"net/http"
"net/http/httputil"
"strconv"
"sync"
"sync/atomic"
"time"
)

// EventsOptions to filter events
// See https://docs.docker.com/engine/api/v1.41/#operation/SystemEvents for more details.
type EventsOptions struct {
// Show events created since this timestamp then stream new events.
Since string

// Show events created until this timestamp then stop streaming.
Until string

// Filter for events. For example:
// map[string][]string{"type": {"container"}, "event": {"start", "die"}}
// will return events when container was started and stopped or killed
//
// Available filters:
// config=<string> config name or ID
// container=<string> container name or ID
// daemon=<string> daemon name or ID
// event=<string> event type
// image=<string> image name or ID
// label=<string> image or container label
// network=<string> network name or ID
// node=<string> node ID
// plugin= plugin name or ID
// scope= local or swarm
// secret=<string> secret name or ID
// service=<string> service name or ID
// type=<string> container, image, volume, network, daemon, plugin, node, service, secret or config
// volume=<string> volume name
Filters map[string][]string
}

// APIEvents represents events coming from the Docker API
// The fields in the Docker API changed in API version 1.22, and
// events for more than images and containers are now fired off.
Expand Down Expand Up @@ -93,9 +124,17 @@ var (
//
// The parameter is a channel through which events will be sent.
func (c *Client) AddEventListener(listener chan<- *APIEvents) error {
return c.AddEventListenerWithOptions(EventsOptions{}, listener)
}

// AddEventListener adds a new listener to container events in the Docker API.
// See https://docs.docker.com/engine/api/v1.41/#operation/SystemEvents for more details.
//
// The listener parameter is a channel through which events will be sent.
func (c *Client) AddEventListenerWithOptions(options EventsOptions, listener chan<- *APIEvents) error {
var err error
if !c.eventMonitor.isEnabled() {
err = c.eventMonitor.enableEventMonitoring(c)
err = c.eventMonitor.enableEventMonitoring(c, options)
if err != nil {
return err
}
Expand Down Expand Up @@ -165,15 +204,15 @@ func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool {
return false
}

func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
func (eventState *eventMonitoringState) enableEventMonitoring(c *Client, opts EventsOptions) error {
eventState.Lock()
defer eventState.Unlock()
if !eventState.enabled {
eventState.enabled = true
atomic.StoreInt64(&eventState.lastSeen, 0)
eventState.C = make(chan *APIEvents, 100)
eventState.errC = make(chan error, 1)
go eventState.monitorEvents(c)
go eventState.monitorEvents(c, opts)
}
return nil
}
Expand All @@ -193,7 +232,7 @@ func (eventState *eventMonitoringState) disableEventMonitoring() {
}
}

func (eventState *eventMonitoringState) monitorEvents(c *Client) {
func (eventState *eventMonitoringState) monitorEvents(c *Client, opts EventsOptions) {
const (
noListenersTimeout = 5 * time.Second
noListenersInterval = 10 * time.Millisecond
Expand All @@ -213,7 +252,7 @@ func (eventState *eventMonitoringState) monitorEvents(c *Client) {
return
}

if err = eventState.connectWithRetry(c); err != nil {
if err = eventState.connectWithRetry(c, opts); err != nil {
// terminate if connect failed
eventState.disableEventMonitoring()
return
Expand All @@ -236,7 +275,7 @@ func (eventState *eventMonitoringState) monitorEvents(c *Client) {
eventState.disableEventMonitoring()
return
} else if err != nil {
defer func() { go eventState.monitorEvents(c) }()
defer func() { go eventState.monitorEvents(c, opts) }()
return
}
case <-timeout:
Expand All @@ -245,21 +284,21 @@ func (eventState *eventMonitoringState) monitorEvents(c *Client) {
}
}

func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
func (eventState *eventMonitoringState) connectWithRetry(c *Client, opts EventsOptions) error {
var retries int
eventState.RLock()
eventChan := eventState.C
errChan := eventState.errC
eventState.RUnlock()
err := c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
err := c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
for ; err != nil && retries < maxMonitorConnRetries; retries++ {
waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
time.Sleep(time.Duration(waitTime) * time.Millisecond)
eventState.RLock()
eventChan = eventState.C
errChan = eventState.errC
eventState.RUnlock()
err = c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
err = c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
}
return err
}
Expand Down Expand Up @@ -304,11 +343,12 @@ func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
}
}

func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error {
uri := "/events"
func (c *Client) eventHijack(opts EventsOptions, startTime int64, eventChan chan *APIEvents, errChan chan error) error {
// on reconnect override initial Since with last event seen time
if startTime != 0 {
uri += fmt.Sprintf("?since=%d", startTime)
opts.Since = strconv.FormatInt(startTime, 10)
}
uri := "/events?" + queryString(opts)
protocol := c.endpointURL.Scheme
address := c.endpointURL.Path
if protocol != "unix" && protocol != "npipe" {
Expand Down
8 changes: 6 additions & 2 deletions event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,12 @@ func testEventListeners(testName string, t *testing.T, buildServer func(http.Han
t.Error(err)
}
}()

err = client.AddEventListener(listener)
filters := map[string][]string{
"type": {"container"},
"event": {"create", "destroy", "start", "stop", "pull", "attach"},
}
opts := EventsOptions{Since: "1374067970", Until: "1442421700", Filters: filters}
err = client.AddEventListenerWithOptions(opts, listener)
if err != nil {
t.Errorf("Failed to add event listener: %s", err)
}
Expand Down

0 comments on commit ac3178d

Please sign in to comment.