-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathbitwarden.py
298 lines (246 loc) · 11.1 KB
/
bitwarden.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# -*- coding: utf-8 -*-
# Copyright (c) 2022, Jonathan Lung <[email protected]>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = """
name: bitwarden
author:
- Jonathan Lung (@lungj) <[email protected]>
requirements:
- bw (command line utility)
- be logged into bitwarden
- bitwarden vault unlocked
- E(BW_SESSION) environment variable set
short_description: Retrieve secrets from Bitwarden
version_added: 5.4.0
description:
- Retrieve secrets from Bitwarden.
options:
_terms:
description: Key(s) to fetch values for from login info.
required: true
type: list
elements: str
search:
description:
- Field to retrieve, for example V(name) or V(id).
- If set to V(id), only zero or one element can be returned.
Use the Jinja C(first) filter to get the only list element.
- If set to V(None) or V(''), or if O(_terms) is empty, records are not filtered by fields.
type: str
default: name
version_added: 5.7.0
field:
description: Field to fetch. Leave unset to fetch whole response.
type: str
collection_id:
description:
- Collection ID to filter results by collection. Leave unset to skip filtering.
- O(collection_id) and O(collection_name) are mutually exclusive.
type: str
version_added: 6.3.0
collection_name:
description:
- Collection name to filter results by collection. Leave unset to skip filtering.
- O(collection_id) and O(collection_name) are mutually exclusive.
type: str
version_added: 10.4.0
organization_id:
description: Organization ID to filter results by organization. Leave unset to skip filtering.
type: str
version_added: 8.5.0
bw_session:
description: Pass session key instead of reading from env.
type: str
version_added: 8.4.0
result_count:
description:
- Number of results expected for the lookup query. Task will fail if O(result_count)
is set but does not match the number of query results. Leave empty to skip this check.
type: int
version_added: 10.4.0
"""
EXAMPLES = """
- name: "Get 'password' from all Bitwarden records named 'a_test'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', field='password') }}
- name: "Get 'password' from Bitwarden record with ID 'bafba515-af11-47e6-abe3-af1200cd18b2'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'bafba515-af11-47e6-abe3-af1200cd18b2', search='id', field='password') | first }}
- name: "Get 'password' from all Bitwarden records named 'a_test' from collection"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', field='password', collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }}
- name: "Get list of all full Bitwarden records named 'a_test'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test') }}
- name: "Get custom field 'api_key' from all Bitwarden records named 'a_test'"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', field='api_key') }}
- name: "Get 'password' from all Bitwarden records named 'a_test', using given session key"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', field='password', bw_session='bXZ9B5TXi6...') }}
- name: "Get all Bitwarden records from collection"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', None, collection_id='bafba515-af11-47e6-abe3-af1200cd18b2') }}
- name: "Get all Bitwarden records from collection"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', None, collection_name='my_collections/test_collection') }}
- name: "Get Bitwarden record named 'a_test', ensure there is exactly one match"
ansible.builtin.debug:
msg: >-
{{ lookup('community.general.bitwarden', 'a_test', result_count=1) }}
"""
RETURN = """
_raw:
description:
- A one-element list that contains a list of requested fields or JSON objects of matches.
- If you use C(query), you get a list of lists. If you use C(lookup) without C(wantlist=true),
this always gets reduced to a list of field values or JSON objects.
type: list
elements: list
"""
from subprocess import Popen, PIPE
from ansible.errors import AnsibleError, AnsibleOptionsError
from ansible.module_utils.common.text.converters import to_bytes, to_text
from ansible.parsing.ajson import AnsibleJSONDecoder
from ansible.plugins.lookup import LookupBase
class BitwardenException(AnsibleError):
pass
class Bitwarden(object):
def __init__(self, path='bw'):
self._cli_path = path
self._session = None
@property
def cli_path(self):
return self._cli_path
@property
def session(self):
return self._session
@session.setter
def session(self, value):
self._session = value
@property
def unlocked(self):
out, err = self._run(['status'], stdin="")
decoded = AnsibleJSONDecoder().raw_decode(out)[0]
return decoded['status'] == 'unlocked'
def _run(self, args, stdin=None, expected_rc=0):
if self.session:
args += ['--session', self.session]
p = Popen([self.cli_path] + args, stdout=PIPE, stderr=PIPE, stdin=PIPE)
out, err = p.communicate(to_bytes(stdin))
rc = p.wait()
if rc != expected_rc:
if len(args) > 2 and args[0] == 'get' and args[1] == 'item' and b'Not found.' in err:
return 'null', ''
raise BitwardenException(err)
return to_text(out, errors='surrogate_or_strict'), to_text(err, errors='surrogate_or_strict')
def _get_matches(self, search_value, search_field, collection_id=None, organization_id=None):
"""Return matching records whose search_field is equal to key.
"""
# Prepare set of params for Bitwarden CLI
if search_field == 'id':
params = ['get', 'item', search_value]
else:
params = ['list', 'items']
if search_value:
params.extend(['--search', search_value])
if collection_id:
params.extend(['--collectionid', collection_id])
if organization_id:
params.extend(['--organizationid', organization_id])
out, err = self._run(params)
# This includes things that matched in different fields.
initial_matches = AnsibleJSONDecoder().raw_decode(out)[0]
if search_field == 'id':
if initial_matches is None:
initial_matches = []
else:
initial_matches = [initial_matches]
# Filter to only include results from the right field, if a search is requested by value or field
return [item for item in initial_matches
if not search_value or not search_field or item.get(search_field) == search_value]
def get_field(self, field, search_value, search_field="name", collection_id=None, organization_id=None):
"""Return a list of the specified field for records whose search_field match search_value
and filtered by collection if collection has been provided.
If field is None, return the whole record for each match.
"""
matches = self._get_matches(search_value, search_field, collection_id, organization_id)
if not field:
return matches
field_matches = []
for match in matches:
# if there are no custom fields, then `match` has no key 'fields'
if 'fields' in match:
custom_field_found = False
for custom_field in match['fields']:
if field == custom_field['name']:
field_matches.append(custom_field['value'])
custom_field_found = True
break
if custom_field_found:
continue
if 'login' in match and field in match['login']:
field_matches.append(match['login'][field])
continue
if field in match:
field_matches.append(match[field])
continue
if matches and not field_matches:
raise AnsibleError(f"field {field} does not exist in {search_value}")
return field_matches
def get_collection_ids(self, collection_name: str, organization_id=None) -> list[str]:
"""Return matching IDs of collections whose name is equal to collection_name."""
# Prepare set of params for Bitwarden CLI
params = ['list', 'collections', '--search', collection_name]
if organization_id:
params.extend(['--organizationid', organization_id])
out, err = self._run(params)
# This includes things that matched in different fields.
initial_matches = AnsibleJSONDecoder().raw_decode(out)[0]
# Filter to only return the ID of a collections with exactly matching name
return [item['id'] for item in initial_matches
if str(item.get('name')).lower() == collection_name.lower()]
class LookupModule(LookupBase):
def run(self, terms=None, variables=None, **kwargs):
self.set_options(var_options=variables, direct=kwargs)
field = self.get_option('field')
search_field = self.get_option('search')
collection_id = self.get_option('collection_id')
collection_name = self.get_option('collection_name')
organization_id = self.get_option('organization_id')
result_count = self.get_option('result_count')
_bitwarden.session = self.get_option('bw_session')
if not _bitwarden.unlocked:
raise AnsibleError("Bitwarden Vault locked. Run 'bw unlock'.")
if not terms:
terms = [None]
if collection_name and collection_id:
raise AnsibleOptionsError("'collection_name' and 'collection_id' are mutually exclusive!")
elif collection_name:
collection_ids = _bitwarden.get_collection_ids(collection_name, organization_id)
if not collection_ids:
raise BitwardenException("No matching collections found!")
else:
collection_ids = [collection_id]
results = [
_bitwarden.get_field(field, term, search_field, collection_id, organization_id)
for collection_id in collection_ids
for term in terms
]
for result in results:
if result_count is not None and len(result) != result_count:
raise BitwardenException(
f"Number of results doesn't match result_count! ({len(result)} != {result_count})")
return results
_bitwarden = Bitwarden()