Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding command-line utility #8

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cli
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/python3

from keepassxc_browser.cli import main

main()
177 changes: 177 additions & 0 deletions keepassxc_browser/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
#!/usr/bin/python3

import sys
import os
import re
import subprocess
import argparse
import json
import tldextract
Copy link
Owner

@hrehfeld hrehfeld Sep 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing package dep in setup.py. here?

install_requires=[

Also, can we avoid this dep?

from . import protocol as kpp


def find_pass_candidates(domain, connection, auth_id):
try:
candidates = connection.get_logins(auth_id,"https://"+domain)
except kpp.ProtocolError as e:
print("failure: KeePassXC protocol error:", e, file=sys.stderr)
sys.exit(1)

return {
"%s [%s]" % (entry["name"], entry["login"]): (entry["name"], entry["login"], entry["password"]) for entry in candidates
}


def select_candidate(items, command):
process = subprocess.run(command, input='\n'.join(items).encode("utf-8"),
stdout=subprocess.PIPE, shell=True)
return process.stdout.decode("utf-8").strip()


def fetch_candidates(url, auth_id, keyfile):
conn = kpp.Connection()
try:
conn.connect()
except Exception as e:
print("failure: cannot connect to KeePassXC server:", e, file=sys.stderr)
sys.exit(1)

keyfile_name = os.path.expanduser(keyfile)

try:
if os.path.exists(keyfile_name):
try:
keyfile = open(keyfile_name, "r")
cred = kpp.Identity.unserialize(auth_id, keyfile.read())
keyfile.close()
except Exception as e:
print("failure: error when try to read keyfile", e, file=sys.stderr)
sys.exit(1)

conn.change_public_keys(cred)

if not conn.test_associate(cred):
conn.associate(cred)
else:
cred = kpp.Identity(auth_id)
conn.change_public_keys(cred)
conn.associate(cred)
try:
keyfile = open(keyfile_name, "w")
keyfile.write(cred.serialize())
keyfile.close()
except Exception as e:
print("failure: error when trying to write keyfile", e, file=sys.stderr)
sys.exit(1)
except kpp.ProtocolError as e:
print("failure: KeePassXC protocol error:", e, file=sys.stderr)
sys.exit(1)

extract_result = tldextract.extract(url)

# Try to find candidates using targets in the following order: fully-qualified domain name (includes subdomains),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the thought process here? You try to do some sensible matching on the url? What usecase are you covering? At least this needs to be documented (see below @argparse) somehow.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is in parts from the qutebrowser scripts for old KeePass. The idea is, if no match is found for the given domain, to search 'upward' the tree of subdomains, up to the last before the TLD.

# the registered domain name and finally: the IPv4 address if that's what the URL represents
candidates = {}
if extract_result.domain:
fqdn = '.'.join(i for i in extract_result if i)
registered_domain = '.'.join(i for i in extract_result[1:] if i)
else:
if extract_result.ipv4 == "":
print("failure: Format of URL '%s' is invalid!" % arguments.url, file=sys.stderr)
sys.exit(1)
fqdn = ""
registered_domain = ""
for target in filter(None, [fqdn, registered_domain, extract_result.ipv4]):
target_candidates = find_pass_candidates(target, conn, cred)
if target_candidates:
candidates = target_candidates
break
else:
if len(candidates) == 0:
print("failure: no pass candidates for URL '%s' found!" % arguments.url, file=sys.stderr)
sys.exit(1)

return candidates


def main():
argument_parser = argparse.ArgumentParser(description="Fetch credentials from a running KeepassXC instance")
argument_parser.add_argument('url')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc?

argument_parser.add_argument('--store-keyfile', '-k', required=True,
help='Auth token for Keepass-XC')
argument_parser.add_argument('--auth-id', '-i', default='?',
help='Auth ID for Keepass-XC, will be created if it does not exist')
argument_parser.add_argument('--selector-command', '-s', default='dmenu',
help='Command used to select from multiple entries (dmenu-compatible)')
argument_parser.add_argument('--always-select', '-A', action="store_true",
help='run selector command even if there is only one candidate')
argument_parser.add_argument('--format', '-f', choices=("text","text-zero","json"), default='text',
help="Kind of output ('text-zero' means zero-terminated textual output)")
argument_parser.add_argument('--all-candidates', '-a', action='store_true',
help='Output all candidates (do not ask for selection)')
argument_parser.add_argument('--output', '-o', choices=("password","username","both"), default='both',
help='Which information to include in the output')
argument_parser.add_argument('--output-no-title', '-T', action='store_true',
help='Do not include title field in output')
argument_parser.add_argument('--output-no-prefix', '-P', action='store_true',
help='Do not prefix output lines with field type (text output only)')
arguments = argument_parser.parse_args()

candidates = fetch_candidates(arguments.url, arguments.auth_id, arguments.store_keyfile)

output_title = not arguments.output_no_title
output_prefix = not arguments.output_no_prefix
output_username = arguments.output != "password"
output_password = arguments.output != "username"

terminator = "\n" if arguments.format == "text" else "\0"

def print_text_entry(entry):
title, username, password = entry
if output_prefix:
if output_title:
print("title", title, end=terminator)
if output_username:
print("username", username, end=terminator)
if output_password:
print("password", password, end=terminator)
else:
if output_title:
print(title, end=terminator)
if output_username:
print(username, end=terminator)
if output_password:
print(password, end=terminator)

def get_dict_entry(entry):
title, username, password = entry
result = {}
if output_title:
result["title"] = title
if output_username:
result["username"] = username
if output_password:
result["password"] = password
return result

if arguments.all_candidates:
if arguments.format in ("text", "text-zero"):
for entry in sorted(candidates.values()):
print_text_entry(entry)
elif arguments.format == "json":
output = []
for entry in sorted(candidates.values()):
output.append( get_dict_entry(entry) )
json.dump(output, sys.stdout)
else:
if len(candidates) == 1 and not arguments.always_select:
entry = candidates.popitem()[1]
else:
selection = select_candidate(sorted(candidates), arguments.selector_command)
if not selection:
sys.exit(0)
entry = candidates[selection]
if arguments.format in ("text", "text-zero"):
print_text_entry(entry)
elif arguments.format == "json":
json.dump(get_dict_entry(entry), sys.stdout)