Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

yq: use tomllib if available #185

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 49 additions & 20 deletions yq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import subprocess
import sys
import typing
from datetime import date, datetime, time

import argcomplete
Expand All @@ -28,6 +29,14 @@
__version__ = "0.0.0"


class DecodeError(RuntimeError):
pass


class EncodeError(RuntimeError):
pass


class JSONDateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, (datetime, date, time)):
Expand All @@ -42,6 +51,32 @@ def decode_docs(jq_output, json_decoder):
yield doc


def decode_toml(fp: typing.IO, *, output_format: str) -> typing.Any:
if output_format != 'toml':
try:
import tomllib
if isinstance(fp, io.StringIO):
# HACK: this is just for tests, which kinda defeats the purpose of the tests,
# but there is no StringIO.buffer so there is no way around it
return tomllib.loads(fp.read())
return tomllib.load(fp.buffer if isinstance(fp, io.TextIOWrapper) else fp)
except ImportError:
pass

import tomlkit
return tomlkit.load(fp)


def encode_toml(fp: typing.TextIO, docs: typing.Iterable[typing.Any]):
import tomlkit
for doc in docs:
if not isinstance(doc, dict):
raise EncodeError(
"Error converting JSON to TOML: cannot represent non-object types at top level."
)
tomlkit.dump(doc, fp)


def xq_cli():
cli(input_format="xml", program_name="xq")

Expand Down Expand Up @@ -244,7 +279,7 @@ def yq(
import xmltodict

if xml_item_depth != 0:
raise Exception("xml_item_depth is not supported with xq -x")
raise DecodeError("xml_item_depth is not supported with xq -x")

doc = xmltodict.parse(
input_stream.buffer if isinstance(input_stream, io.TextIOWrapper) else input_stream.read(),
Expand All @@ -254,13 +289,12 @@ def yq(
json.dump(doc, json_buffer, cls=JSONDateTimeEncoder)
json_buffer.write("\n")
elif input_format == "toml":
import tomlkit

doc = tomlkit.load(input_stream) # type: ignore
doc = decode_toml(input_stream, output_format=output_format)
json.dump(doc, json_buffer, cls=JSONDateTimeEncoder)
json_buffer.write("\n")
else:
raise Exception("Unknown input format")
raise DecodeError("Unknown input format")

jq_out, jq_err = jq.communicate(json_buffer.getvalue())
json_decoder = json.JSONDecoder()
if output_format == "yaml" or output_format == "annotated_yaml":
Expand All @@ -286,30 +320,24 @@ def yq(
if xml_root:
doc = {xml_root: doc} # type: ignore
elif not isinstance(doc, dict):
msg = (
"{}: Error converting JSON to XML: cannot represent non-object types at top level. "
raise EncodeError(
"Error converting JSON to XML: cannot represent non-object types at top level. "
"Use --xml-root=name to envelope your output with a root element."
)
exit_func(msg.format(program_name))
full_document = True if xml_dtd else False
try:
xmltodict.unparse(
doc, output=output_stream, full_document=full_document, pretty=True, indent=" "
)
except ValueError as e:
if "Document must have exactly one root" in str(e):
raise Exception(str(e) + " Use --xml-root=name to envelope your output with a root element")
raise EncodeError(str(e) + " Use --xml-root=name to envelope your output with a root element")
else:
raise
output_stream.write("\n")
elif output_format == "toml":
import tomlkit

for doc in decode_docs(jq_out, json_decoder):
if not isinstance(doc, dict):
msg = "{}: Error converting JSON to TOML: cannot represent non-object types at top level."
exit_func(msg.format(program_name))
tomlkit.dump(doc, output_stream)
docs = decode_docs(jq_out, json_decoder)
encode_toml(output_stream, docs)
else:
if input_format == "yaml":
loader_class = get_loader(
Expand Down Expand Up @@ -344,13 +372,12 @@ def emit_entry(path, entry):
if doc:
emit_entry(None, doc)
elif input_format == "toml":
import tomlkit

for input_stream in input_streams:
json.dump(tomlkit.load(input_stream), jq.stdin, cls=JSONDateTimeEncoder) # type: ignore
doc = decode_toml(input_stream, output_format=output_format)
json.dump(doc, jq.stdin, cls=JSONDateTimeEncoder) # type: ignore
jq.stdin.write("\n") # type: ignore
else:
raise Exception("Unknown input format")
raise DecodeError("Unknown input format")

try:
jq.stdin.close() # type: ignore
Expand All @@ -360,5 +387,7 @@ def emit_entry(path, entry):
for input_stream in input_streams:
input_stream.close()
exit_func(jq.returncode)
except (DecodeError, EncodeError) as e:
exit_func("{}: {}".format(program_name, e))
except Exception as e:
exit_func("{}: Error running jq: {}: {}.".format(program_name, type(e).__name__, e))
Loading