Skip to content

Commit

Permalink
fix(kamel log): Use integration name for looking up containers
Browse files Browse the repository at this point in the history
Use integration name for looking up containers as a fallback if no
container could be found.

If no container could be identified even with the integration name,
use the first container for the log, assuming its the "main" container.

Fixes apache#347
  • Loading branch information
rhuss committed Jan 21, 2019
1 parent 92362f3 commit f366a15
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 24 deletions.
12 changes: 7 additions & 5 deletions pkg/util/log/annotation_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ import (
type SelectorScraper struct {
client kubernetes.Interface
namespace string
integrationName string
labelSelector string
podScrapers sync.Map
counter uint64
}

// NewSelectorScraper creates a new SelectorScraper
func NewSelectorScraper(client kubernetes.Interface, namespace string, labelSelector string) *SelectorScraper {
func NewSelectorScraper(client kubernetes.Interface, namespace string, integrationName string, labelSelector string) *SelectorScraper {
return &SelectorScraper{
client: client,
namespace: namespace,
integrationName: integrationName,
labelSelector: labelSelector,
}
}
Expand Down Expand Up @@ -122,17 +124,17 @@ func (s *SelectorScraper) synchronize(ctx context.Context, out *bufio.Writer) er
return nil
}

func (s *SelectorScraper) addPodScraper(ctx context.Context, name string, out *bufio.Writer) {
podScraper := NewPodScraper(s.client, s.namespace, name)
func (s *SelectorScraper) addPodScraper(ctx context.Context, podName string, out *bufio.Writer) {
podScraper := NewPodScraper(s.client, s.namespace, podName, s.integrationName)
podCtx, podCancel := context.WithCancel(ctx)
id := atomic.AddUint64(&s.counter, 1)
prefix := "[" + strconv.FormatUint(id, 10) + "] "
podReader := podScraper.Start(podCtx)
s.podScrapers.Store(name, podCancel)
s.podScrapers.Store(podName, podCancel)
go func() {
defer podCancel()

if _, err := out.WriteString(prefix + "Monitoring pod " + name); err != nil {
if _, err := out.WriteString(prefix + "Monitoring pod " + podName); err != nil {
logrus.Error("Cannot write to output: ", err)
return
}
Expand Down
43 changes: 25 additions & 18 deletions pkg/util/log/pod_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,19 @@ var commonUserContainerNames = map[string]bool{

// PodScraper scrapes logs of a specific pod
type PodScraper struct {
namespace string
name string
client kubernetes.Interface
namespace string
podName string
integrationName string
client kubernetes.Interface
}

// NewPodScraper creates a new pod scraper
func NewPodScraper(c kubernetes.Interface, namespace string, name string) *PodScraper {
func NewPodScraper(c kubernetes.Interface, namespace string, podName string, integrationName string) *PodScraper {
return &PodScraper{
namespace: namespace,
name: name,
client: c,
namespace: namespace,
podName: podName,
integrationName: integrationName,
client: c,
}
}

Expand All @@ -69,7 +71,7 @@ func (s *PodScraper) Start(ctx context.Context) *bufio.Reader {
}

func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientCloser func() error) {
containerName, err := s.waitForPodRunning(ctx, s.namespace, s.name)
containerName, err := s.waitForPodRunning(ctx, s.namespace, s.podName, s.integrationName)
if err != nil {
s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
return
Expand All @@ -78,7 +80,7 @@ func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientClos
Follow: true,
Container: containerName,
}
byteReader, err := s.client.CoreV1().Pods(s.namespace).GetLogs(s.name, &logOptions).Context(ctx).Stream()
byteReader, err := s.client.CoreV1().Pods(s.namespace).GetLogs(s.podName, &logOptions).Context(ctx).Stream()
if err != nil {
s.handleAndRestart(ctx, err, 5*time.Second, out, clientCloser)
return
Expand Down Expand Up @@ -106,18 +108,18 @@ func (s *PodScraper) doScrape(ctx context.Context, out *bufio.Writer, clientClos

func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.Duration, out *bufio.Writer, clientCloser func() error) {
if err != nil {
logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.name))
logrus.Warn(errors.Wrap(err, "error caught during log scraping for pod "+s.podName))
}

if ctx.Err() != nil {
logrus.Debug("Pod ", s.name, " will no longer be monitored")
logrus.Debug("Pod ", s.podName, " will no longer be monitored")
if err := clientCloser(); err != nil {
logrus.Warn("Unable to close the client", err)
}
return
}

logrus.Debug("Retrying to scrape pod ", s.name, " logs in ", wait.Seconds(), " seconds...")
logrus.Debug("Retrying to scrape pod ", s.podName, " logs in ", wait.Seconds(), " seconds...")
select {
case <-time.After(wait):
break
Expand All @@ -133,14 +135,14 @@ func (s *PodScraper) handleAndRestart(ctx context.Context, err error, wait time.

// waitForPodRunning waits for a given pod to reach the running state.
// It may return the internal container to watch if present
func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, name string) (string, error) {
func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, podName string, integrationName string) (string, error) {
pod := v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: v1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: podName,
Namespace: namespace,
},
}
Expand Down Expand Up @@ -181,29 +183,34 @@ func (s *PodScraper) waitForPodRunning(ctx context.Context, namespace string, na
}

if recvPod != nil && recvPod.Status.Phase == v1.PodRunning {
return s.chooseContainer(recvPod), nil
return s.chooseContainer(recvPod, integrationName), nil
}
} else if e.Type == watch.Deleted || e.Type == watch.Error {
return "", errors.New("unable to watch pod " + s.name)
return "", errors.New("unable to watch pod " + s.podName)
}
case <-time.After(30 * time.Second):
return "", errors.New("no state change after 30 seconds for pod " + s.name)
return "", errors.New("no state change after 30 seconds for pod " + s.podName)
}
}

}

func (s *PodScraper) chooseContainer(p *v1.Pod) string {
func (s *PodScraper) chooseContainer(p *v1.Pod, integrationName string) string {
if p != nil {
if len(p.Spec.Containers) == 1 {
// Let Kubernetes auto-detect
return ""
}
// Fallback to first container name
containerNameFound := p.Spec.Containers[0].Name
for _, c := range p.Spec.Containers {
if _, ok := commonUserContainerNames[c.Name]; ok {
return c.Name
} else if c.Name == integrationName {
containerNameFound = integrationName
}
}
return containerNameFound
}
return ""
}
2 changes: 1 addition & 1 deletion pkg/util/log/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// Print prints integrations logs to the stdout
func Print(ctx context.Context, client kubernetes.Interface, integration *v1alpha1.Integration) error {
scraper := NewSelectorScraper(client, integration.Namespace, "camel.apache.org/integration="+integration.Name)
scraper := NewSelectorScraper(client, integration.Namespace, integration.Name,"camel.apache.org/integration="+integration.Name)
reader := scraper.Start(ctx)

if _, err := io.Copy(os.Stdout, ioutil.NopCloser(reader)); err != nil {
Expand Down

0 comments on commit f366a15

Please sign in to comment.