-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adding command-line utility #8
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#!/usr/bin/python3 | ||
|
||
from keepassxc_browser.cli import main | ||
|
||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
#!/usr/bin/python3 | ||
|
||
import sys | ||
import os | ||
import re | ||
import subprocess | ||
import argparse | ||
import json | ||
import tldextract | ||
from . import protocol as kpp | ||
|
||
|
||
def find_pass_candidates(domain, connection, auth_id): | ||
try: | ||
candidates = connection.get_logins(auth_id,"https://"+domain) | ||
except kpp.ProtocolError as e: | ||
print("failure: KeePassXC protocol error:", e, file=sys.stderr) | ||
sys.exit(1) | ||
|
||
return { | ||
"%s [%s]" % (entry["name"], entry["login"]): (entry["name"], entry["login"], entry["password"]) for entry in candidates | ||
} | ||
|
||
|
||
def select_candidate(items, command): | ||
process = subprocess.run(command, input='\n'.join(items).encode("utf-8"), | ||
stdout=subprocess.PIPE, shell=True) | ||
return process.stdout.decode("utf-8").strip() | ||
|
||
|
||
def fetch_candidates(url, auth_id, keyfile): | ||
conn = kpp.Connection() | ||
try: | ||
conn.connect() | ||
except Exception as e: | ||
print("failure: cannot connect to KeePassXC server:", e, file=sys.stderr) | ||
sys.exit(1) | ||
|
||
keyfile_name = os.path.expanduser(keyfile) | ||
|
||
try: | ||
if os.path.exists(keyfile_name): | ||
try: | ||
keyfile = open(keyfile_name, "r") | ||
cred = kpp.Identity.unserialize(auth_id, keyfile.read()) | ||
keyfile.close() | ||
except Exception as e: | ||
print("failure: error when try to read keyfile", e, file=sys.stderr) | ||
sys.exit(1) | ||
|
||
conn.change_public_keys(cred) | ||
|
||
if not conn.test_associate(cred): | ||
conn.associate(cred) | ||
else: | ||
cred = kpp.Identity(auth_id) | ||
conn.change_public_keys(cred) | ||
conn.associate(cred) | ||
try: | ||
keyfile = open(keyfile_name, "w") | ||
keyfile.write(cred.serialize()) | ||
keyfile.close() | ||
except Exception as e: | ||
print("failure: error when trying to write keyfile", e, file=sys.stderr) | ||
sys.exit(1) | ||
except kpp.ProtocolError as e: | ||
print("failure: KeePassXC protocol error:", e, file=sys.stderr) | ||
sys.exit(1) | ||
|
||
extract_result = tldextract.extract(url) | ||
|
||
# Try to find candidates using targets in the following order: fully-qualified domain name (includes subdomains), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the thought process here? You try to do some sensible matching on the url? What usecase are you covering? At least this needs to be documented (see below @argparse) somehow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is in parts from the qutebrowser scripts for old KeePass. The idea is, if no match is found for the given domain, to search 'upward' the tree of subdomains, up to the last before the TLD. |
||
# the registered domain name and finally: the IPv4 address if that's what the URL represents | ||
candidates = {} | ||
if extract_result.domain: | ||
fqdn = '.'.join(i for i in extract_result if i) | ||
registered_domain = '.'.join(i for i in extract_result[1:] if i) | ||
else: | ||
if extract_result.ipv4 == "": | ||
print("failure: Format of URL '%s' is invalid!" % arguments.url, file=sys.stderr) | ||
sys.exit(1) | ||
fqdn = "" | ||
registered_domain = "" | ||
for target in filter(None, [fqdn, registered_domain, extract_result.ipv4]): | ||
target_candidates = find_pass_candidates(target, conn, cred) | ||
if target_candidates: | ||
candidates = target_candidates | ||
break | ||
else: | ||
if len(candidates) == 0: | ||
print("failure: no pass candidates for URL '%s' found!" % arguments.url, file=sys.stderr) | ||
sys.exit(1) | ||
|
||
return candidates | ||
|
||
|
||
def main(): | ||
argument_parser = argparse.ArgumentParser(description="Fetch credentials from a running KeepassXC instance") | ||
argument_parser.add_argument('url') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doc? |
||
argument_parser.add_argument('--store-keyfile', '-k', required=True, | ||
help='Auth token for Keepass-XC') | ||
argument_parser.add_argument('--auth-id', '-i', default='?', | ||
help='Auth ID for Keepass-XC, will be created if it does not exist') | ||
argument_parser.add_argument('--selector-command', '-s', default='dmenu', | ||
help='Command used to select from multiple entries (dmenu-compatible)') | ||
argument_parser.add_argument('--always-select', '-A', action="store_true", | ||
help='run selector command even if there is only one candidate') | ||
argument_parser.add_argument('--format', '-f', choices=("text","text-zero","json"), default='text', | ||
help="Kind of output ('text-zero' means zero-terminated textual output)") | ||
argument_parser.add_argument('--all-candidates', '-a', action='store_true', | ||
help='Output all candidates (do not ask for selection)') | ||
argument_parser.add_argument('--output', '-o', choices=("password","username","both"), default='both', | ||
help='Which information to include in the output') | ||
argument_parser.add_argument('--output-no-title', '-T', action='store_true', | ||
help='Do not include title field in output') | ||
argument_parser.add_argument('--output-no-prefix', '-P', action='store_true', | ||
help='Do not prefix output lines with field type (text output only)') | ||
arguments = argument_parser.parse_args() | ||
|
||
candidates = fetch_candidates(arguments.url, arguments.auth_id, arguments.store_keyfile) | ||
|
||
output_title = not arguments.output_no_title | ||
output_prefix = not arguments.output_no_prefix | ||
output_username = arguments.output != "password" | ||
output_password = arguments.output != "username" | ||
|
||
terminator = "\n" if arguments.format == "text" else "\0" | ||
|
||
def print_text_entry(entry): | ||
title, username, password = entry | ||
if output_prefix: | ||
if output_title: | ||
print("title", title, end=terminator) | ||
if output_username: | ||
print("username", username, end=terminator) | ||
if output_password: | ||
print("password", password, end=terminator) | ||
else: | ||
if output_title: | ||
print(title, end=terminator) | ||
if output_username: | ||
print(username, end=terminator) | ||
if output_password: | ||
print(password, end=terminator) | ||
|
||
def get_dict_entry(entry): | ||
title, username, password = entry | ||
result = {} | ||
if output_title: | ||
result["title"] = title | ||
if output_username: | ||
result["username"] = username | ||
if output_password: | ||
result["password"] = password | ||
return result | ||
|
||
if arguments.all_candidates: | ||
if arguments.format in ("text", "text-zero"): | ||
for entry in sorted(candidates.values()): | ||
print_text_entry(entry) | ||
elif arguments.format == "json": | ||
output = [] | ||
for entry in sorted(candidates.values()): | ||
output.append( get_dict_entry(entry) ) | ||
json.dump(output, sys.stdout) | ||
else: | ||
if len(candidates) == 1 and not arguments.always_select: | ||
entry = candidates.popitem()[1] | ||
else: | ||
selection = select_candidate(sorted(candidates), arguments.selector_command) | ||
if not selection: | ||
sys.exit(0) | ||
entry = candidates[selection] | ||
if arguments.format in ("text", "text-zero"): | ||
print_text_entry(entry) | ||
elif arguments.format == "json": | ||
json.dump(get_dict_entry(entry), sys.stdout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing package dep in setup.py. here?
python-keepassxc-browser/setup.py
Line 7 in 610ea63
Also, can we avoid this dep?