Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/driver: docker progress detection and monitoring #4192

Merged
merged 7 commits into from
May 7, 2018
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ IMPROVEMENTS:
* command: Add -short option to init command that emits a minimal
jobspec [[GH-4239](https://github.com/hashicorp/nomad/issues/4239)]
* discovery: Support Consul gRPC health checks. [[GH-4251](https://github.com/hashicorp/nomad/issues/4251)]
* driver/docker: Add progress monitoring and inactivity detection to docker
image pulls [[GH-4192](https://github.com/hashicorp/nomad/issues/4192)]

## 0.8.3 (April 27, 2018)

Expand Down
3 changes: 2 additions & 1 deletion client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1546,7 +1546,8 @@ func (d *DockerDriver) pullImage(driverConfig *DockerDriverConfig, client *docke

d.emitEvent("Downloading image %s:%s", repo, tag)
coordinator, callerID := d.getDockerCoordinator(client)
return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID)

return coordinator.PullImage(driverConfig.ImageName, authOptions, callerID, d.emitEvent)
}

// authBackend encapsulates a function that resolves registry credentials.
Expand Down
76 changes: 73 additions & 3 deletions client/driver/docker_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ var (
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)

const (
// dockerPullProgressEmitInterval is the interval at which the pull progress
// is emitted to the allocation
dockerPullProgressEmitInterval = 2 * time.Minute
)

// pullFuture is a sharable future for retrieving a pulled images ID and any
// error that may have occurred during the pull.
type pullFuture struct {
Expand Down Expand Up @@ -98,6 +104,14 @@ type dockerCoordinator struct {
// only have one request be sent to Docker
pullFutures map[string]*pullFuture

// pullLoggers is used to track the LogEventFn for each alloc pulling an image.
// If multiple alloc's are attempting to pull the same image, each will need
// to register its own LogEventFn with the coordinator.
pullLoggers map[string][]LogEventFn

// pullLoggerLock is used to sync access to the pullLoggers map
pullLoggerLock sync.RWMutex

// imageRefCount is the reference count of image IDs
imageRefCount map[string]map[string]struct{}

Expand All @@ -114,6 +128,7 @@ func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
return &dockerCoordinator{
dockerCoordinatorConfig: config,
pullFutures: make(map[string]*pullFuture),
pullLoggers: make(map[string][]LogEventFn),
imageRefCount: make(map[string]map[string]struct{}),
deleteFuture: make(map[string]context.CancelFunc),
}
Expand All @@ -130,10 +145,11 @@ func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {

// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occurred during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string) (imageID string, err error) {
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string, emitFn LogEventFn) (imageID string, err error) {
// Get the future
d.imageLock.Lock()
future, ok := d.pullFutures[image]
d.registerPullLogger(image, emitFn)
if !ok {
// Make the future
future = newPullFuture()
Expand Down Expand Up @@ -166,22 +182,41 @@ func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConf
// pullImageImpl is the implementation of pulling an image. The results are
// returned via the passed future
func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) {
defer d.clearPullLogger(image)
// Parse the repo and tag
repo, tag := docker.ParseRepositoryTag(image)
if tag == "" {
tag = "latest"
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldn't cancel the context created below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@schmichael convinced me that canceling a parent context will cancel any child contexts.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this when I removed the pull deadline


pm := newImageProgressManager(image, cancel, d.handlePullInactivity,
d.handlePullProgressReport, d.handleSlowPullProgressReport)
defer pm.stop()

pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
Repository: repo,
Tag: tag,
OutputStream: pm,
RawJSONStream: true,
Context: ctx,
}

// Attempt to pull the image
var auth docker.AuthConfiguration
if authOptions != nil {
auth = *authOptions
}

err := d.client.PullImage(pullOptions, auth)

if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded {
d.logger.Printf("[ERR] driver.docker: timeout pulling container %s:%s", repo, tag)
future.set("", recoverablePullError(ctxErr, image))
return
}

if err != nil {
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
future.set("", recoverablePullError(err, image))
Expand Down Expand Up @@ -337,6 +372,41 @@ func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) {
d.imageLock.Unlock()
}

func (d *dockerCoordinator) registerPullLogger(image string, logger LogEventFn) {
d.pullLoggerLock.Lock()
defer d.pullLoggerLock.Unlock()
if _, ok := d.pullLoggers[image]; !ok {
d.pullLoggers[image] = []LogEventFn{}
}
d.pullLoggers[image] = append(d.pullLoggers[image], logger)
}

func (d *dockerCoordinator) clearPullLogger(image string) {
d.pullLoggerLock.Lock()
defer d.pullLoggerLock.Unlock()
delete(d.pullLoggers, image)
}

func (d *dockerCoordinator) emitEvent(image, message string, args ...interface{}) {
d.pullLoggerLock.RLock()
defer d.pullLoggerLock.RUnlock()
for i := range d.pullLoggers[image] {
go d.pullLoggers[image][i](message, args...)
}
}

func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp time.Time) {
d.logger.Printf("[ERR] driver.docker: image %s pull aborted due to inactivity, last message recevieved at [%s]: %s", image, timestamp.String(), msg)
}

func (d *dockerCoordinator) handlePullProgressReport(image, msg string, _ time.Time) {
d.logger.Printf("[DEBUG] driver.docker: image %s pull progress: %s", image, msg)
}

func (d *dockerCoordinator) handleSlowPullProgressReport(image, msg string, _ time.Time) {
d.emitEvent(image, "Docker image %s pull progress: %s", image, msg)
}

// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func recoverablePullError(err error, image string) error {
Expand Down
10 changes: 5 additions & 5 deletions client/driver/docker_coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
id := ""
for i := 0; i < 10; i++ {
go func() {
id, _ = coordinator.PullImage(image, nil, uuid.Generate())
id, _ = coordinator.PullImage(image, nil, uuid.Generate(), nil)
}()
}

Expand Down Expand Up @@ -112,7 +112,7 @@ func TestDockerCoordinator_Pull_Remove(t *testing.T) {
callerIDs := make([]string, 10, 10)
for i := 0; i < 10; i++ {
callerIDs[i] = uuid.Generate()
id, _ = coordinator.PullImage(image, nil, callerIDs[i])
id, _ = coordinator.PullImage(image, nil, callerIDs[i], nil)
}

// Check the reference count
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
callerID := uuid.Generate()

// Pull image
id, _ := coordinator.PullImage(image, nil, callerID)
id, _ := coordinator.PullImage(image, nil, callerID, nil)

// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
Expand All @@ -189,7 +189,7 @@ func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
}

// Pull image again within delay
id, _ = coordinator.PullImage(image, nil, callerID)
id, _ = coordinator.PullImage(image, nil, callerID, nil)

// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestDockerCoordinator_No_Cleanup(t *testing.T) {
callerID := uuid.Generate()

// Pull image
id, _ := coordinator.PullImage(image, nil, callerID)
id, _ := coordinator.PullImage(image, nil, callerID, nil)

// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 0 {
Expand Down
Loading