Skip to content

Commit

Permalink
Interface out log writer for docker/podman formats
Browse files Browse the repository at this point in the history
- Interface the different container runtimes.
- Adds GRPC TLS flags. Allow the grpc client to either:
  - Use a cert specified by the CERT_FILE env var
  - Use an insecure GRPC connection using the INSECURE env var
  - Use the system's default certs t connect to the GRPC server.
  • Loading branch information
kleesc committed Nov 26, 2020
1 parent ee757ce commit ee2d41f
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 81 deletions.
50 changes: 25 additions & 25 deletions buildctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ const (

// Context represents the internal state of a build.
type Context struct {
client rpc.Client
writer containerclient.LogWriter
client rpc.Client
writer containerclient.LogWriter
containerClient containerclient.Client
args *rpc.BuildArgs
metadata *dockerfile.Metadata
buildpackDir string
buildID string
cacheTag string
args *rpc.BuildArgs
metadata *dockerfile.Metadata
buildpackDir string
buildID string
cacheTag string
}

// New connects to the docker daemon and sets up the initial state of a build
Expand All @@ -49,10 +49,10 @@ func New(client rpc.Client, args *rpc.BuildArgs, dockerHost, containerRuntime st
log.Infof("connected to docker host: %s", dockerHost)

return &Context{
client: client,
writer: containerclient.NewRPCWriter(client),
client: client,
writer: containerclient.NewRPCWriter(client, containerRuntime),
containerClient: containerClient,
args: args,
args: args,
}, nil
}

Expand Down Expand Up @@ -199,10 +199,10 @@ func primeCache(w containerclient.LogWriter, containerClient containerclient.Cli
err := retryDockerRequest(w, func() error {
return containerClient.PullImage(
containerclient.PullImageOptions{
Repository: args.FullRepoName(),
Registry: args.Registry,
Tag: cachedTag,
OutputStream: w,
Repository: args.FullRepoName(),
Registry: args.Registry,
Tag: cachedTag,
OutputStream: w,
},
containerclient.AuthConfiguration{
Username: "$token",
Expand All @@ -225,10 +225,10 @@ func pullBaseImage(w containerclient.LogWriter, containerClient containerclient.
}

pullOptions := containerclient.PullImageOptions{
Registry: args.Registry,
Repository: df.BaseImage,
Tag: df.BaseImageTag,
OutputStream: w,
Registry: args.Registry,
Repository: df.BaseImage,
Tag: df.BaseImageTag,
OutputStream: w,
}

// Only pull the base image with auth when it is in our own registry.
Expand Down Expand Up @@ -289,9 +289,9 @@ func pushBuiltImage(w containerclient.LogWriter, containerClient containerclient
for _, tagName := range args.TagNames {
// Setup tag options.
tagOptions := containerclient.TagImageOptions{
Repository: args.FullRepoName(),
Tag: tagName,
Force: true,
Repository: args.FullRepoName(),
Tag: tagName,
Force: true,
}

// Tag the image.
Expand All @@ -310,10 +310,10 @@ func pushBuiltImage(w containerclient.LogWriter, containerClient containerclient
err = retryDockerRequest(w, func() error {
return containerClient.PushImage(
containerclient.PushImageOptions{
Repository: args.FullRepoName(),
Registry: args.Registry,
Tag: tagName,
OutputStream: w,
Repository: args.FullRepoName(),
Registry: args.Registry,
Tag: tagName,
OutputStream: w,
},
containerclient.AuthConfiguration{
Username: "$token",
Expand Down
20 changes: 12 additions & 8 deletions cmd/quay-builder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"context"
"crypto/tls"
"os"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -24,16 +26,14 @@ func main() {
containerRuntime := os.Getenv("CONTAINER_RUNTIME")
dockerHost := os.Getenv("DOCKER_HOST")
token := os.Getenv("TOKEN")
endpoint := os.Getenv("ENDPOINT")
server := os.Getenv("SERVER")
certFile := os.Getenv("TLS_CERT_PATH")
insecure := os.Getenv("INSECURE")

log.Infof("starting quay-builder: %s", version.Version)

if endpoint == "" && server == "" {
log.Fatal("missing or empty ENDPOINT and SERVER env vars: one is required")
} else if endpoint == "" {
endpoint = server + "/b1/buildmanager"
if server == "" {
log.Fatal("missing or empty SERVER env vars: required format <host>:<port>")
}

if dockerHost == "" {
Expand All @@ -50,14 +50,18 @@ func main() {
log.Fatalf("invalid TLS config: %s", err)
}
opts = append(opts, grpc.WithTransportCredentials(tlsCfg))
} else {
} else if strings.ToLower(insecure) == "true" {
opts = append(opts, grpc.WithInsecure())
} else {
// Load the default system certs
tlsCfg := credentials.NewTLS(&tls.Config{})
opts = append(opts, grpc.WithTransportCredentials(tlsCfg))
}

// Attempt to connect to gRPC server (blocking)
log.Infof("connecting to gRPC server...: %s", endpoint)
log.Infof("connecting to gRPC server...: %s", server)
opts = append(opts, grpc.WithBlock(), grpc.WithTimeout(connectTimeout))
conn, err := grpc.Dial(endpoint, opts...)
conn, err := grpc.Dial(server, opts...)
if err != nil {
log.Fatalf("failed to dial grpc server: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,6 @@ const (
pushingStatus = "Pushing"
)

// LogWriter represents anything that can stream Docker logs from the daemon
// and check if any error has occured.
type LogWriter interface {
// ErrResponse returns an error that occurred from Docker and resets the
// state of that internal error value to nil. If there is no error, returns
// false as part of the tuple.
ErrResponse() (error, bool)

// ResetError throws away any error state from previously streamed logs.
ResetError()

io.Writer
}

// partialBuffer represents a buffer of data that was unable to be previously
// serialized because it was not enough data was provided to form valid JSON.
type partialBuffer []byte
Expand All @@ -49,25 +35,17 @@ func (pb *partialBuffer) getAndEmpty(in []byte) (ret []byte) {
return
}

// RPCWriter implements a Writer that consumes encoded JSON data and buffers it
// DockerRPCWriter implements a RPCWriter that consumes encoded JSON data and buffers it
// until it has a valid JSON object and then logs it to an rpc.Client.
type RPCWriter struct {
type DockerRPCWriter struct {
client rpc.Client
errResponse *Response
partialBuffer *partialBuffer
hasPartialBuffer bool
}

// NewRPCWriter allocates a new Writer that streams logs via an RPC client.
func NewRPCWriter(client rpc.Client) LogWriter {
return &RPCWriter{
client: client,
partialBuffer: new(partialBuffer),
}
}

// Write implements the io.Writer interface for RPCWriter.
func (w *RPCWriter) Write(p []byte) (n int, err error) {
func (w *DockerRPCWriter) Write(p []byte) (n int, err error) {
originalLength := len(p)

// Note: Sometimes Docker returns to us only the beginning of a stream,
Expand Down Expand Up @@ -135,7 +113,7 @@ func (w *RPCWriter) Write(p []byte) (n int, err error) {

// ErrResponse returns an error that occurred from Docker and then calls
// ResetError().
func (w *RPCWriter) ErrResponse() (error, bool) {
func (w *DockerRPCWriter) ErrResponse() (error, bool) {
err := w.errResponse
w.ResetError()

Expand All @@ -147,25 +125,10 @@ func (w *RPCWriter) ErrResponse() (error, bool) {
}

// ResetError throws away any error state from previously streamed logs.
func (w *RPCWriter) ResetError() {
func (w *DockerRPCWriter) ResetError() {
w.errResponse = nil
}

// Response represents a response from a Docker™ daemon.
type Response struct {
Error string `json:"error,omitempty"`
Stream string `json:"stream,omitempty"`
Status string `json:"status,omitempty"`
ID string `json:"id,omitempty"`
ProgressDetail progressDetail `json:"progressDetail,omitempty"`
}

// progressDetail represents the progress made by a Docker™ command.
type progressDetail struct {
Current int `json:"current,omitempty"`
Total int `json:"total,omitempty"`
}

type filter struct {
lastSent *Response
}
Expand Down
65 changes: 59 additions & 6 deletions containerclient/interface.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package containerclient

import (
"fmt"
"io"
"strings"

log "github.com/sirupsen/logrus"

"github.com/quay/quay-builder/rpc"
)

type BuildImageOptions struct {
Expand Down Expand Up @@ -74,15 +77,65 @@ type Client interface {
PruneImages(PruneImagesOptions) (*PruneImagesResults, error)
}

func NewClient(host, runtime string) (Client, error) {
runtime = strings.ToLower(runtime)
if runtime != "docker" && runtime != "podman" {
return nil, fmt.Errorf("Invalid container runtime: %s", runtime)
func NewClient(host, containerRuntime string) (Client, error) {
containerRuntime = strings.ToLower(containerRuntime)
if containerRuntime != "docker" && containerRuntime != "podman" {
log.Fatal("Invalid container runtime:", containerRuntime)
}

if runtime == "docker" {
if containerRuntime == "docker" {
return NewDockerClient(host)
} else {
return NewPodmanClient(host)
}
}

// LogWriter represents anything that can stream Docker logs from the daemon
// and check if any error has occured.
type LogWriter interface {
// ErrResponse returns an error that occurred from Docker and resets the
// state of that internal error value to nil. If there is no error, returns
// false as part of the tuple.
ErrResponse() (error, bool)

// ResetError throws away any error state from previously streamed logs.
ResetError()

io.Writer
}

// NewRPCWriter allocates a new Writer that streams logs via an RPC client.
func NewRPCWriter(client rpc.Client, containerRuntime string) LogWriter {
containerRuntime = strings.ToLower(containerRuntime)
if containerRuntime != "docker" && containerRuntime != "podman" {
log.Fatal("Invalid container runtime:", containerRuntime)
}

if containerRuntime == "docker" {
return &DockerRPCWriter{
client: client,
partialBuffer: new(partialBuffer),
}
} else {
return &PodmanRPCWriter{
client: client,
partialBuffer: new(partialBuffer),
}
}

}

// Response represents a response from a Docker™ daemon or podman.
type Response struct {
Error string `json:"error,omitempty"`
Stream string `json:"stream,omitempty"`
Status string `json:"status,omitempty"`
ID string `json:"id,omitempty"`
ProgressDetail progressDetail `json:"progressDetail,omitempty"`
}

// progressDetail represents the progress made by a Docker™ command.
type progressDetail struct {
Current int `json:"current,omitempty"`
Total int `json:"total,omitempty"`
}
55 changes: 55 additions & 0 deletions containerclient/podman_log_write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package containerclient

import (
"encoding/json"

log "github.com/sirupsen/logrus"

"github.com/quay/quay-builder/rpc"
)

// PodmanRPCWriter implements a RPCWriter.
// Unlike the Docker daemon, Podman's build call outputs plain string, and not JSON encoded data,
// so we need to serialize each line into a Response struct before logging it to an rpc.Client.
type PodmanRPCWriter struct {
client rpc.Client
errResponse *Response
partialBuffer *partialBuffer
hasPartialBuffer bool
}

// Write implements the io.Writer interface for RPCWriter.
func (w *PodmanRPCWriter) Write(p []byte) (n int, err error) {
// Unlike docker, libpod parses the JSON encoded data from stream before writing the output,
// without the option of returning the raw data instead.
// Instead of decoding the stream into a Response, we set the Response's "Stream" before
// marshaling it into JSON to be logged.
originalLength := len(p)

var m Response
m.Stream = string(p)

jsonData, err := json.Marshal(&m)
if err != nil {
log.Fatalf("Error when marshaling logs: %v", err)
}

err = w.client.PublishBuildLogEntry(string(jsonData))
if err != nil {
log.Fatalf("Failed to publish log entry: %v", err)
}

return originalLength, nil
}

func (w *PodmanRPCWriter) ErrResponse() (error, bool) {
// libpod already parses the JSON stream before writing to output.
// So the error would not be returned from the output stream,. but as
// the return value of the API call instead.
// See https://github.com/containers/podman/blob/master/pkg/bindings/images/build.go#L175
return nil, false
}

func (w *PodmanRPCWriter) ResetError() {
w.errResponse = nil
}

0 comments on commit ee2d41f

Please sign in to comment.