diff --git a/CHANGELOG.md b/CHANGELOG.md index e5a03067de0..16e5a54d42d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ IMPROVEMENTS: * command: Add -short option to init command that emits a minimal jobspec [[GH-4239](https://github.com/hashicorp/nomad/issues/4239)] * discovery: Support Consul gRPC health checks. [[GH-4251](https://github.com/hashicorp/nomad/issues/4251)] + * driver/docker: Add progress monitoring and inactivity detection to docker + image pulls [[GH-4192](https://github.com/hashicorp/nomad/issues/4192)] ## 0.8.3 (April 27, 2018) diff --git a/client/driver/docker.go b/client/driver/docker.go index b0b23bea522..cc31e547eae 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -1546,7 +1546,8 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke d.emitEvent("Downloading image %s:%s", repo, tag) coordinator, callerID := d.getDockerCoordinator(client) - return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID) + + return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID, d.emitEvent) } // authBackend encapsulates a function that resolves registry credentials. diff --git a/client/driver/docker_coordinator.go b/client/driver/docker_coordinator.go index ab034c45bbe..47186062433 100644 --- a/client/driver/docker_coordinator.go +++ b/client/driver/docker_coordinator.go @@ -25,6 +25,12 @@ var ( imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`) ) +const ( + // dockerPullProgressEmitInterval is the interval at which the pull progress + // is emitted to the allocation + dockerPullProgressEmitInterval = 2 * time.Minute +) + // pullFuture is a sharable future for retrieving a pulled images ID and any // error that may have occurred during the pull. type pullFuture struct { @@ -98,6 +104,14 @@ type dockerCoordinator struct { // only have one request be sent to Docker pullFutures map[string]*pullFuture + // pullLoggers is used to track the LogEventFn for each alloc pulling an image. + // If multiple alloc's are attempting to pull the same image, each will need + // to register its own LogEventFn with the coordinator. + pullLoggers map[string][]LogEventFn + + // pullLoggerLock is used to sync access to the pullLoggers map + pullLoggerLock sync.RWMutex + // imageRefCount is the reference count of image IDs imageRefCount map[string]map[string]struct{} @@ -114,6 +128,7 @@ func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { return &dockerCoordinator{ dockerCoordinatorConfig: config, pullFutures: make(map[string]*pullFuture), + pullLoggers: make(map[string][]LogEventFn), imageRefCount: make(map[string]map[string]struct{}), deleteFuture: make(map[string]context.CancelFunc), } @@ -130,10 +145,11 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator { // PullImage is used to pull an image. It returns the pulled imaged ID or an // error that occurred during the pull -func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string) (imageID string, err error) { +func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string, emitFn LogEventFn) (imageID string, err error) { // Get the future d.imageLock.Lock() future, ok := d.pullFutures[image] + d.registerPullLogger(image, emitFn) if !ok { // Make the future future = newPullFuture() @@ -166,14 +182,25 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf // pullImageImpl is the implementation of pulling an image. The results are // returned via the passed future func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) { + defer d.clearPullLogger(image) // Parse the repo and tag repo, tag := docker.ParseRepositoryTag(image) if tag == "" { tag = "latest" } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pm := newImageProgressManager(image, cancel, d.handlePullInactivity, + d.handlePullProgressReport, d.handleSlowPullProgressReport) + defer pm.stop() + pullOptions := docker.PullImageOptions{ - Repository: repo, - Tag: tag, + Repository: repo, + Tag: tag, + OutputStream: pm, + RawJSONStream: true, + Context: ctx, } // Attempt to pull the image @@ -181,7 +208,15 @@ func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.Auth if authOptions != nil { auth = *authOptions } + err := d.client.PullImage(pullOptions, auth) + + if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded { + d.logger.Printf("[ERR] driver.docker: timeout pulling container %s:%s", repo, tag) + future.set("", recoverablePullError(ctxErr, image)) + return + } + if err != nil { d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err) future.set("", recoverablePullError(err, image)) @@ -337,6 +372,41 @@ func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) { d.imageLock.Unlock() } +func (d *dockerCoordinator) registerPullLogger(image string, logger LogEventFn) { + d.pullLoggerLock.Lock() + defer d.pullLoggerLock.Unlock() + if _, ok := d.pullLoggers[image]; !ok { + d.pullLoggers[image] = []LogEventFn{} + } + d.pullLoggers[image] = append(d.pullLoggers[image], logger) +} + +func (d *dockerCoordinator) clearPullLogger(image string) { + d.pullLoggerLock.Lock() + defer d.pullLoggerLock.Unlock() + delete(d.pullLoggers, image) +} + +func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{}) { + d.pullLoggerLock.RLock() + defer d.pullLoggerLock.RUnlock() + for i := range d.pullLoggers[image] { + go d.pullLoggers[image][i](message, args...) + } +} + +func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp time.Time) { + d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg) +} + +func (d *dockerCoordinator) handlePullProgressReport(image, msg string, _ time.Time) { + d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg) +} + +func (d *dockerCoordinator) handleSlowPullProgressReport(image, msg string, _ time.Time) { + d.emitEvent(image, "Docker image %s pull progress: %s", image, msg) +} + // recoverablePullError wraps the error gotten when trying to pull and image if // the error is recoverable. func recoverablePullError(err error, image string) error { diff --git a/client/driver/docker_coordinator_test.go b/client/driver/docker_coordinator_test.go index 1df049b28e6..c81cee99b9e 100644 --- a/client/driver/docker_coordinator_test.go +++ b/client/driver/docker_coordinator_test.go @@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) { id := "" for i := 0; i < 10; i++ { go func() { - id, _ = coordinator.PullImage(image, nil, uuid.Generate()) + id, _ = coordinator.PullImage(image, nil, uuid.Generate(), nil) }() } @@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) { callerIDs := make([]string, 10, 10) for i := 0; i < 10; i++ { callerIDs[i] = uuid.Generate() - id, _ = coordinator.PullImage(image, nil, callerIDs[i]) + id, _ = coordinator.PullImage(image, nil, callerIDs[i], nil) } // Check the reference count @@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, callerID) + id, _ := coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) { } // Pull image again within delay - id, _ = coordinator.PullImage(image, nil, callerID) + id, _ = coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 1 { @@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) { callerID := uuid.Generate() // Pull image - id, _ := coordinator.PullImage(image, nil, callerID) + id, _ := coordinator.PullImage(image, nil, callerID, nil) // Check the reference count if references := coordinator.imageRefCount[id]; len(references) != 0 { diff --git a/client/driver/docker_progress.go b/client/driver/docker_progress.go new file mode 100644 index 00000000000..912c2d4a884 --- /dev/null +++ b/client/driver/docker_progress.go @@ -0,0 +1,282 @@ +package driver + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "sync" + "time" + + "github.com/docker/docker/pkg/jsonmessage" + units "github.com/docker/go-units" +) + +const ( + // dockerPullActivityDeadline is the default value set in the imageProgressManager + // when newImageProgressManager is called + dockerPullActivityDeadline = 2 * time.Minute + + // dockerImageProgressReportInterval is the default value set in the + // imageProgressManager when newImageProgressManager is called + dockerImageProgressReportInterval = 10 * time.Second + + // dockerImageSlowProgressReportInterval is the default value set in the + // imageProgressManager when newImageProgressManager is called + dockerImageSlowProgressReportInterval = 2 * time.Minute +) + +// layerProgress tracks the state and downloaded bytes of a single layer within +// a docker image +type layerProgress struct { + id string + status layerProgressStatus + currentBytes int64 + totalBytes int64 +} + +type layerProgressStatus int + +const ( + layerProgressStatusUnknown layerProgressStatus = iota + layerProgressStatusStarting + layerProgressStatusWaiting + layerProgressStatusDownloading + layerProgressStatusVerifying + layerProgressStatusDownloaded + layerProgressStatusExtracting + layerProgressStatusComplete + layerProgressStatusExists +) + +func lpsFromString(status string) layerProgressStatus { + switch status { + case "Pulling fs layer": + return layerProgressStatusStarting + case "Waiting": + return layerProgressStatusWaiting + case "Downloading": + return layerProgressStatusDownloading + case "Verifying Checksum": + return layerProgressStatusVerifying + case "Download complete": + return layerProgressStatusDownloaded + case "Extracting": + return layerProgressStatusExtracting + case "Pull complete": + return layerProgressStatusComplete + case "Already exists": + return layerProgressStatusExists + default: + return layerProgressStatusUnknown + } +} + +// imageProgress tracks the status of each child layer as its pulled from a +// docker image repo +type imageProgress struct { + sync.RWMutex + lastMessage *jsonmessage.JSONMessage + timestamp time.Time + layers map[string]*layerProgress + pullStart time.Time +} + +// get returns a status message and the timestamp of the last status update +func (p *imageProgress) get() (string, time.Time) { + p.RLock() + defer p.RUnlock() + + if p.lastMessage == nil { + return "No progress", p.timestamp + } + + var pulled, pulling, waiting int + for _, l := range p.layers { + switch { + case l.status == layerProgressStatusStarting || + l.status == layerProgressStatusWaiting: + waiting++ + case l.status == layerProgressStatusDownloading || + l.status == layerProgressStatusVerifying: + pulling++ + case l.status >= layerProgressStatusDownloaded: + pulled++ + } + } + + elapsed := time.Now().Sub(p.pullStart) + cur := p.currentBytes() + total := p.totalBytes() + var est int64 + if cur != 0 { + est = (elapsed.Nanoseconds() / cur * total) - elapsed.Nanoseconds() + } + + return fmt.Sprintf("Pulled %d/%d (%s/%s) layers: %d waiting/%d pulling - est %.1fs remaining", + pulled, len(p.layers), units.BytesSize(float64(cur)), units.BytesSize(float64(total)), + waiting, pulling, time.Duration(est).Seconds()), p.timestamp +} + +// set takes a status message received from the docker engine api during an image +// pull and updates the status of the corresponding layer +func (p *imageProgress) set(msg *jsonmessage.JSONMessage) { + p.Lock() + defer p.Unlock() + + p.lastMessage = msg + p.timestamp = time.Now() + + lps := lpsFromString(msg.Status) + if lps == layerProgressStatusUnknown { + return + } + + layer, ok := p.layers[msg.ID] + if !ok { + layer = &layerProgress{id: msg.ID} + p.layers[msg.ID] = layer + } + layer.status = lps + if msg.Progress != nil && lps == layerProgressStatusDownloading { + layer.currentBytes = msg.Progress.Current + layer.totalBytes = msg.Progress.Total + } else if lps == layerProgressStatusDownloaded { + layer.currentBytes = layer.totalBytes + } +} + +// currentBytes iterates through all image layers and sums the total of +// current bytes. The caller is responsible for acquiring a read lock on the +// imageProgress struct +func (p *imageProgress) currentBytes() int64 { + var b int64 + for _, l := range p.layers { + b += l.currentBytes + } + return b +} + +// totalBytes iterates through all image layers and sums the total of +// total bytes. The caller is responsible for acquiring a read lock on the +// imageProgress struct +func (p *imageProgress) totalBytes() int64 { + var b int64 + for _, l := range p.layers { + b += l.totalBytes + } + return b +} + +// progressReporterFunc defines the method for handling inactivity and report +// events from the imageProgressManager. The image name, current status message +// and timestamp of last received status update are passed in. +type progressReporterFunc func(image string, msg string, timestamp time.Time) + +// imageProgressManager tracks the progress of pulling a docker image from an +// image repository. +// It also implemented the io.Writer interface so as to be passed to the docker +// client pull image method in order to receive status updates from the docker +// engine api. +type imageProgressManager struct { + imageProgress *imageProgress + image string + activityDeadline time.Duration + inactivityFunc progressReporterFunc + reportInterval time.Duration + reporter progressReporterFunc + slowReportInterval time.Duration + slowReporter progressReporterFunc + lastSlowReport time.Time + cancel context.CancelFunc + stopCh chan struct{} + buf bytes.Buffer +} + +func newImageProgressManager( + image string, cancel context.CancelFunc, + inactivityFunc, reporter, slowReporter progressReporterFunc) *imageProgressManager { + + pm := &imageProgressManager{ + image: image, + activityDeadline: dockerPullActivityDeadline, + inactivityFunc: inactivityFunc, + reportInterval: dockerImageProgressReportInterval, + reporter: reporter, + slowReportInterval: dockerImageSlowProgressReportInterval, + slowReporter: slowReporter, + imageProgress: &imageProgress{ + timestamp: time.Now(), + layers: make(map[string]*layerProgress), + }, + cancel: cancel, + stopCh: make(chan struct{}), + } + + pm.start() + return pm +} + +// start intiates the ticker to trigger the inactivity and reporter handlers +func (pm *imageProgressManager) start() { + now := time.Now() + pm.imageProgress.pullStart = now + pm.lastSlowReport = now + go func() { + ticker := time.NewTicker(dockerImageProgressReportInterval) + for { + select { + case <-ticker.C: + msg, lastStatusTime := pm.imageProgress.get() + t := time.Now() + if t.Sub(lastStatusTime) > pm.activityDeadline { + pm.inactivityFunc(pm.image, msg, lastStatusTime) + pm.cancel() + return + } + if t.Sub(pm.lastSlowReport) > pm.slowReportInterval { + pm.slowReporter(pm.image, msg, lastStatusTime) + pm.lastSlowReport = t + } + pm.reporter(pm.image, msg, lastStatusTime) + case <-pm.stopCh: + return + } + } + }() +} + +func (pm *imageProgressManager) stop() { + close(pm.stopCh) +} + +func (pm *imageProgressManager) Write(p []byte) (n int, err error) { + n, err = pm.buf.Write(p) + var msg jsonmessage.JSONMessage + + for { + line, err := pm.buf.ReadBytes('\n') + if err == io.EOF { + // Partial write of line; push back onto buffer and break until full line + pm.buf.Write(line) + break + } + if err != nil { + return n, err + } + err = json.Unmarshal(line, &msg) + if err != nil { + return n, err + } + + if msg.Error != nil { + // error received from the docker engine api + return n, msg.Error + } + + pm.imageProgress.set(&msg) + } + + return +} diff --git a/client/driver/docker_progress_test.go b/client/driver/docker_progress_test.go new file mode 100644 index 00000000000..2eb2e86569b --- /dev/null +++ b/client/driver/docker_progress_test.go @@ -0,0 +1,52 @@ +package driver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_DockerImageProgressManager(t *testing.T) { + + pm := &imageProgressManager{ + imageProgress: &imageProgress{ + timestamp: time.Now(), + layers: make(map[string]*layerProgress), + }, + } + + _, err := pm.Write([]byte(`{"status":"Pulling from library/golang","id":"1.9.5"} +{"status":"Pulling fs layer","progressDetail":{},"id":"c73ab1c6897b"} +{"status":"Pulling fs layer","progressDetail":{},"id":"1ab373b3deae"} +`)) + require.NoError(t, err) + require.Equal(t, 2, len(pm.imageProgress.layers), "number of layers should be 2") + + cur := pm.imageProgress.currentBytes() + require.Zero(t, cur) + tot := pm.imageProgress.totalBytes() + require.Zero(t, tot) + + _, err = pm.Write([]byte(`{"status":"Pulling fs layer","progress`)) + require.NoError(t, err) + require.Equal(t, 2, len(pm.imageProgress.layers), "number of layers should be 2") + + _, err = pm.Write([]byte(`Detail":{},"id":"b542772b4177"}` + "\n")) + require.NoError(t, err) + require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3") + + _, err = pm.Write([]byte(`{"status":"Downloading","progressDetail":{"current":45800,"total":4335495},"progress":"[\u003e ] 45.8kB/4.335MB","id":"b542772b4177"} +{"status":"Downloading","progressDetail":{"current":113576,"total":11108010},"progress":"[\u003e ] 113.6kB/11.11MB","id":"1ab373b3deae"} +{"status":"Downloading","progressDetail":{"current":694257,"total":4335495},"progress":"[========\u003e ] 694.3kB/4.335MB","id":"b542772b4177"}` + "\n")) + require.NoError(t, err) + require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3") + require.Equal(t, int64(807833), pm.imageProgress.currentBytes()) + require.Equal(t, int64(15443505), pm.imageProgress.totalBytes()) + + _, err = pm.Write([]byte(`{"status":"Download complete","progressDetail":{},"id":"b542772b4177"}` + "\n")) + require.NoError(t, err) + require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3") + require.Equal(t, int64(4449071), pm.imageProgress.currentBytes()) + require.Equal(t, int64(15443505), pm.imageProgress.totalBytes()) +}