-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MongoDB: More ad hoc fixes for supporting real-world data #255
Conversation
cratedb_toolkit/io/mongodb/export.py
Outdated
if isinstance(value, dict): | ||
if len(value) == 1: | ||
if "$binary" in value and value["$binary"]["subType"] in ["03", "04"]: | ||
decoded = UUID(bytes=base64.b64decode(value["$binary"]["base64"])) | ||
return extract_value(decoded, parent_type) | ||
for k, v in value.items(): | ||
if k.startswith("$"): | ||
return extract_value(v, k.lstrip("$")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure enough, decoding MongoDB Extended JSON should not be performed manually, but should use _json_convert
instead, like the CDC subsystem is doing it already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Through decode_bson
, the machinery now uses BSON's _json_convert
.
cratedb-toolkit/cratedb_toolkit/io/mongodb/copy.py
Lines 52 to 59 in bc9362b
# Converge multiple MongoDB documents into SQL parameters for `executemany` operation. | |
parameters: t.List[DocumentDict] = [] | |
for document in data: | |
record = self.converter.convert(self.decode_bson(document)) | |
oid: str = self.get_document_key(record) | |
parameters.append({"oid": oid, "record": record}) | |
return SQLOperation(sql, parameters) |
tests/io/mongodb/test_cli.py
Outdated
DOCUMENT_IN = { | ||
"id": bson.Binary.from_uuid(UUID("d575540f-759d-4653-a4c4-4a9e410f1aa1")), | ||
"value": { | ||
"name": "foobar", | ||
"active": True, | ||
"created": dateutil.parser.parse("2020-06-19T15:03:53.727Z"), | ||
"timestamp": bson.datetime_ms.DatetimeMS(1455141600000), | ||
}, | ||
} | ||
DOCUMENT_OUT = { | ||
"__id": mock.ANY, | ||
"id": "d575540f-759d-4653-a4c4-4a9e410f1aa1", | ||
"value": { | ||
"name": "foobar", | ||
"active": True, | ||
"created": 1592579033000, | ||
"timestamp": 1455141600000, | ||
}, | ||
} | ||
DOCUMENT_DDL = """ | ||
CREATE TABLE IF NOT EXISTS "testdrive"."demo" ( | ||
"__id" TEXT, | ||
"id" TEXT, | ||
"value" OBJECT(DYNAMIC) AS ( | ||
"name" TEXT, | ||
"active" BOOLEAN, | ||
"created" TIMESTAMP WITH TIME ZONE, | ||
"timestamp" TIMESTAMP WITH TIME ZONE | ||
) | ||
)""".lstrip() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
66399b9
to
c899a30
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets a go.
7c8e999
to
bc9362b
Compare
cratedb_toolkit/io/mongodb/export.py
Outdated
""" | ||
# Manual treatment. | ||
# Some nested objects have been defined as strings, probably in previous schema versions. | ||
if "users" in value: | ||
for user_item in value["users"]: | ||
if "user" in user_item and not isinstance(user_item["user"], dict): | ||
user_item["user"] = {"id": user_item["user"]} | ||
""" | ||
|
||
if "createdBy" in value and not isinstance(value["createdBy"], dict): | ||
value["createdBy"] = {"id": value["createdBy"]} | ||
|
||
# Prune invalid date representations. | ||
for key in ["start_date", "end_date"]: | ||
if key in value: | ||
if not isinstance(value[key], dict): | ||
del value[key] | ||
elif "date" in value[key]: | ||
if isinstance(value[key]["date"], str): | ||
del value[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While others are configurable per --treatment=Path
option using a corresponding YAML file now, those three special treatments have not been generalized yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those rules now have been generalized further to be controlled using the treatment file.
convert_dict:
- name: createdBy
wrapper_name: id
- name: user
wrapper_name: id
prune_invalid_date:
- "start_date"
- "end_date"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The --treatment
option has been dissolved again. Special treatments have been integrated into Zyp.
zyp-treatment-all.yaml
is an example file representing a Zyp project that includes definitions of special treatments for a specific collection.
6ec4aab
to
5802e19
Compare
60193ce
to
f9ddac5
Compare
pyproject.toml
Outdated
"commons-codec[mongodb,zyp]>=0.0.14", | ||
"commons-codec[mongodb,zyp] @ git+https://github.com/crate/commons-codec@zyp-treatments", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, this needs another release of commons-codec beforehand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commons-codec v0.0.15 has been released.
... to use double leading underscores.
bson.errors.InvalidBSON: year 292278994 is out of range Consider Using CodecOptions(datetime_conversion=DATETIME_AUTO) or MongoClient(datetime_conversion='DATETIME_AUTO'). See: https://pymongo.readthedocs.io/en/stable/examples/datetimes.html#handling-out-of-range-datetimes
By default, assume `TEXT` as inner type.
... based on the first 10,000 documents.
This means relevant column definitions will not be included into the SQL DDL.
- Use `--transformation` option for applying special treatments. Certain fields should be stored as lists, some need to be ignored for now, others need to be treated manually, etc. - Use pagination on source collection, for creating batches towards CrateDB.
About
end_date: Long('1455141600000')
.ctk load table
use thedata OBJECT(DYNAMIC)
mapping strategy.Add--treatment
option for applying special treatments to certain items on real-world data.--transformation
option) for applying special treatments to certain items on real-world data.file+bson://...
.Documentation
https://cratedb-toolkit--255.org.readthedocs.build/io/mongodb/loader.html
Install
Backlog
batch-size
./cc @hlcianfagna, @hammerhead, @lservini, @zolbatar, @juanpardo, @widmogrod