This repository has been archived by the owner on Jul 1, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdeployment.py
123 lines (92 loc) · 3.96 KB
/
deployment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# vim:set sw=4 ts=4 et:
#
# Copyright (c) 2016-2017 Torchbox Ltd.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely. This software is provided 'as-is', without any express or implied
# warranty.
import json, kubernetes
import kubeutil
# get_deployment: return the named deployment.
def get_deployment(namespace, name):
api_client = kubeutil.get_client()
# We can't use the normal client API here because it returns Python objects
# that can't be converted back into JSON. Instead, fetch the JSON by hand.
resource_path = ('/apis/extensions/v1beta1/namespaces/'
+ namespace
+ '/deployments/'
+ name)
header_params = {}
header_params['Accept'] = api_client.select_header_accept(['application/json'])
header_params['Content-Type'] = api_client.select_header_content_type(['*/*'])
header_params.update(kubeutil.config.api_key)
(resp, code, header) = api_client.call_api(
resource_path, 'GET', {}, {}, header_params, None, [], _preload_content=False)
dp = json.loads(resp.data.decode('utf-8'))
return dp
# get_replicasets: return all the active replicasets for a deployment.
# old replicasets (with zero replicas) are not included.
def get_replicasets(dp):
ret = []
api_client = kubeutil.get_client()
# We can't use the normal client API here because it returns Python objects
# that can't be converted back into JSON. Instead, fetch the JSON by hand.
resource_path = ('/apis/extensions/v1beta1/namespaces/'
+ dp['metadata']['namespace']
+ '/replicasets')
header_params = {}
header_params['Accept'] = api_client.select_header_accept(['application/json'])
header_params['Content-Type'] = api_client.select_header_content_type(['*/*'])
header_params.update(kubeutil.config.api_key)
(resp, code, header) = api_client.call_api(
resource_path, 'GET', {}, {}, header_params, None, [], _preload_content=False)
rslist = json.loads(resp.data.decode('utf-8'))
for rs in rslist['items']:
md = rs['metadata']
# Check if this RS is owned by the correct deployment.
if 'ownerReferences' not in md:
continue
has_owner = False
for owner in md['ownerReferences']:
if owner['kind'] == 'Deployment' and owner['name'] == dp['metadata']['name']:
has_owner = True
break
if not has_owner:
continue
if rs['spec']['replicas'] == 0:
continue
ret.append(rs)
return ret
# get_rs_pods: get all the pods for a replicaset.
def get_rs_pods(rs):
ret = []
api_client = kubeutil.get_client()
# We can't use the normal client API here because it returns Python objects
# that can't be converted back into JSON. Instead, fetch the JSON by hand.
resource_path = ('/api/v1/namespaces/'
+ rs['metadata']['namespace']
+ '/pods')
header_params = {}
header_params['Accept'] = api_client.select_header_accept(['application/json'])
header_params['Content-Type'] = api_client.select_header_content_type(['*/*'])
header_params.update(kubeutil.config.api_key)
(resp, code, header) = api_client.call_api(
resource_path, 'GET', {}, {}, header_params, None, [], _preload_content=False)
podlist = json.loads(resp.data.decode('utf-8'))
for pod in podlist['items']:
md = pod['metadata']
if 'ownerReferences' not in md:
continue
has_owner = False
for owner in md['ownerReferences']:
if owner['kind'] != 'ReplicaSet':
continue
if owner['name'] != rs['metadata']['name']:
continue
has_owner = True
break
if not has_owner:
continue
ret.append(pod)
return ret