Skip to content

Commit

Permalink
Include StatefulSets in application list (#249) (#255)
Browse files Browse the repository at this point in the history
Changes made in instances.go
* Get kubernetes instances including StatefulSets
* Update GetMetadata method
* Update GetDeploymentConfiguration method
* Update Logstream to read container logs from StatefulSets
Changes made in helmchart:
* include statefulsets as allowed resource for Role/ClusterRole CDR
* Update dashboard.yaml to allow statefulsets in Role/ClusterRole CDR

Signed-off-by: fabrizziocht <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
  • Loading branch information
fabrizziocht and yaron2 authored Jul 7, 2023
1 parent 8f4e17a commit 4c87fc7
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ rules:
resources: ["pods", "pods/log", "namespaces"]
verbs: ["get", "list"]
- apiGroups: ["apps"]
resources: ["deployments", "components", "configurations"]
resources: ["deployments", "components", "configurations", "statefulsets"]
verbs: ["get", "list"]
- apiGroups: ["dapr.io"]
resources: ["components", "configurations"]
Expand Down
1 change: 1 addition & 0 deletions deploy/dashboard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ rules:
"components",
"configurations",
"namespaces",
"statefulsets",
]
verbs: ["get", "list"]
---
Expand Down
221 changes: 214 additions & 7 deletions pkg/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,41 @@ func (i *instances) GetContainers(scope string, id string) []string {
ctx := context.Background()
if i.kubeClient != nil {
resp, err := i.kubeClient.AppsV1().Deployments(scope).List(ctx, (meta_v1.ListOptions{}))
if err != nil || len(resp.Items) == 0 {
if err != nil {
return nil
}
if len(resp.Items) > 0 {
for _, d := range resp.Items {
if d.Spec.Template.Annotations[daprEnabledAnnotation] != "" {
daprID := d.Spec.Template.Annotations[daprIDAnnotation]
if daprID == id {
pods, err := i.kubeClient.CoreV1().Pods(d.GetNamespace()).List(ctx, meta_v1.ListOptions{
LabelSelector: labels.SelectorFromSet(d.Spec.Selector.MatchLabels).String(),
})
if err != nil {
log.Println(err)
return nil
}

for _, d := range resp.Items {
if len(pods.Items) > 0 {
p := pods.Items[0]
out := []string{}
for _, container := range p.Spec.Containers {
out = append(out, container.Name)
}
return out
}
}
}
}
}

respSts, errSts := i.kubeClient.AppsV1().StatefulSets(scope).List(ctx, (meta_v1.ListOptions{}))

if errSts != nil || len(resp.Items) == 0 {
return nil
}
for _, d := range respSts.Items {
if d.Spec.Template.Annotations[daprEnabledAnnotation] != "" {
daprID := d.Spec.Template.Annotations[daprIDAnnotation]
if daprID == id {
Expand All @@ -155,6 +185,7 @@ func (i *instances) GetContainers(scope string, id string) []string {
}
}
}

}
return nil
}
Expand Down Expand Up @@ -209,6 +240,51 @@ func (i *instances) GetLogStream(scope, id, containerName string) ([]io.ReadClos
}
}
}
respSts, errSts := i.kubeClient.AppsV1().StatefulSets(scope).List(ctx, (meta_v1.ListOptions{}))
if errSts != nil {
return nil, errSts
}
for _, d := range respSts.Items {
if d.Spec.Template.Annotations[daprEnabledAnnotation] != "" {
daprID := d.Spec.Template.Annotations[daprIDAnnotation]
if daprID == id {
pods, err := i.kubeClient.CoreV1().Pods(d.GetNamespace()).List(ctx, meta_v1.ListOptions{
LabelSelector: labels.SelectorFromSet(d.Spec.Selector.MatchLabels).String(),
})
if err != nil {
return nil, err
}

var logstreams []io.ReadCloser

for _, p := range pods.Items {
name := p.ObjectMeta.Name

for _, container := range p.Spec.Containers {
if container.Name == containerName {
var tailLines int64 = 100
options := v1.PodLogOptions{}
options.Container = container.Name
options.Timestamps = true
options.TailLines = &tailLines
if len(pods.Items) == 1 {
options.Follow = true
} else {
options.Follow = false // this is necessary to show logs from multiple replicas
}

res := i.kubeClient.CoreV1().Pods(p.ObjectMeta.Namespace).GetLogs(name, &options)
stream, streamErr := res.Stream(ctx)
if streamErr == nil {
logstreams = append(logstreams, stream)
}
}
}
}
return logstreams, nil
}
}
}
}
return nil, fmt.Errorf("could not find logstream for %v, %v, %v", scope, id, containerName)
}
Expand All @@ -218,11 +294,69 @@ func (i *instances) GetDeploymentConfiguration(scope string, id string) string {
ctx := context.Background()
if i.kubeClient != nil {
resp, err := i.kubeClient.AppsV1().Deployments(scope).List(ctx, (meta_v1.ListOptions{}))
if err != nil || len(resp.Items) == 0 {
if err != nil {
return ""
}
if len(resp.Items) > 0 {
for _, d := range resp.Items {
if d.Spec.Template.Annotations[daprEnabledAnnotation] != "" {
daprID := d.Spec.Template.Annotations[daprIDAnnotation]
if daprID == id {
pods, err := i.kubeClient.CoreV1().Pods(d.GetNamespace()).List(ctx, meta_v1.ListOptions{
LabelSelector: labels.SelectorFromSet(d.Spec.Selector.MatchLabels).String(),
})
if err != nil {
log.Println(err)
return ""
}

for _, d := range resp.Items {
if len(pods.Items) > 0 {
p := pods.Items[0]

name := p.ObjectMeta.Name
nspace := p.ObjectMeta.Namespace

restClient := i.kubeClient.CoreV1().RESTClient()
if err != nil {
log.Println(err)
return ""
}

url := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s", nspace, name)
data, err := restClient.Get().RequestURI(url).Stream(ctx)
if err != nil {
log.Println(err)
return ""
}

buf := new(bytes.Buffer)
_, err = buf.ReadFrom(data)
if err != nil {
log.Println(err)
return ""
}
dataStr := buf.String()
j := []byte(dataStr)
y, err := yaml.JSONToYAML(j)
if err != nil {
log.Println(err)
return ""
}

return string(y)
}
}
}
}
}

respSts, errSts := i.kubeClient.AppsV1().StatefulSets(scope).List(ctx, (meta_v1.ListOptions{}))

if errSts != nil || len(respSts.Items) == 0 {
return ""
}

for _, d := range respSts.Items {
if d.Spec.Template.Annotations[daprEnabledAnnotation] != "" {
daprID := d.Spec.Template.Annotations[daprIDAnnotation]
if daprID == id {
Expand Down Expand Up @@ -272,7 +406,6 @@ func (i *instances) GetDeploymentConfiguration(scope string, id string) string {
}
}
}

}
return ""
}
Expand Down Expand Up @@ -373,11 +506,38 @@ func (i *instances) GetMetadata(scope string, id string) MetadataOutput {
var secondaryUrl []string
if i.kubeClient != nil {
resp, err := i.kubeClient.AppsV1().Deployments(scope).List(ctx, (meta_v1.ListOptions{}))
if err != nil || len(resp.Items) == 0 {
if err != nil {
return MetadataOutput{}
}
if len(resp.Items) > 0 {
for _, d := range resp.Items {
if d.Spec.Template.Annotations[daprEnabledAnnotation] != "" {
daprID := d.Spec.Template.Annotations[daprIDAnnotation]
if daprID == id {
pods, err := i.kubeClient.CoreV1().Pods(d.GetNamespace()).List(ctx, meta_v1.ListOptions{
LabelSelector: labels.SelectorFromSet(d.Spec.Selector.MatchLabels).String(),
})
if err != nil {
log.Println(err)
return MetadataOutput{}
}

for _, d := range resp.Items {
if len(pods.Items) > 0 {
p := pods.Items[0]
url = append(url, fmt.Sprintf("http://%v:%v/v1.0/metadata", p.Status.PodIP, 3501))
secondaryUrl = append(secondaryUrl, fmt.Sprintf("http://%v:%v/v1.0/metadata", p.Status.PodIP, 3500))
}
}
}
}
}

respSts, errSts := i.kubeClient.AppsV1().StatefulSets(scope).List(ctx, (meta_v1.ListOptions{}))

if errSts != nil || len(respSts.Items) == 0 {
return MetadataOutput{}
}
for _, d := range respSts.Items {
if d.Spec.Template.Annotations[daprEnabledAnnotation] != "" {
daprID := d.Spec.Template.Annotations[daprIDAnnotation]
if daprID == id {
Expand Down Expand Up @@ -507,11 +667,17 @@ func (i *instances) getKubernetesInstances(scope string) []Instance {
ctx := context.Background()
list := []Instance{}
resp, err := i.kubeClient.AppsV1().Deployments(scope).List(ctx, (meta_v1.ListOptions{}))
respSts, errSts := i.kubeClient.AppsV1().StatefulSets(scope).List(ctx, (meta_v1.ListOptions{}))
if err != nil {
log.Println(err)
return list
}

if errSts != nil {
log.Println(errSts)
return list
}

for _, d := range resp.Items {
if d.Spec.Template.Annotations[daprEnabledAnnotation] != "" {
id := d.Spec.Template.Annotations[daprIDAnnotation]
Expand Down Expand Up @@ -553,6 +719,47 @@ func (i *instances) getKubernetesInstances(scope string) []Instance {
list = append(list, i)
}
}
for _, d := range respSts.Items {
if d.Spec.Template.Annotations[daprIDAnnotation] != "" {
id := d.Spec.Template.Annotations[daprIDAnnotation]
i := Instance{
AppID: id,
HTTPPort: 3500,
GRPCPort: 50001,
Command: "",
Age: age.GetAge(d.CreationTimestamp.Time),
Created: d.GetCreationTimestamp().String(),
PID: -1,
Replicas: int(*d.Spec.Replicas),
SupportsDeletion: false,
SupportsLogs: true,
Address: fmt.Sprintf("%s-dapr:80", id),
Status: fmt.Sprintf("%d/%d", d.Status.ReadyReplicas, d.Status.Replicas),
Labels: getAppLabelValue(d.Labels["app"]),
Selector: getAppLabelValue(d.Spec.Selector.MatchLabels["app"]),
Config: d.Spec.Template.Annotations["dapr.io/config"],
}
if val, ok := d.Spec.Template.Annotations[daprPortAnnotation]; ok {
appPort, err := strconv.Atoi(val)
if err == nil {
i.AppPort = appPort
}
}

s := json_serializer.NewYAMLSerializer(json_serializer.DefaultMetaFactory, nil, nil)
buf := new(bytes.Buffer)
err := s.Encode(&d, buf)
if err != nil {
log.Println(err)
return list
}

i.Manifest = buf.String()

list = append(list, i)
}
}

return list
}

Expand Down

0 comments on commit 4c87fc7

Please sign in to comment.