Skip to content

Commit

Permalink
Support dump and load bytes and add back up limit (#207)
Browse files Browse the repository at this point in the history
* Support B and allow Limit in filter

* Support limit as option

* Fix help

* Fix typo

* Fix linting issue; Update help message

* Fix lint issues

---------

Co-authored-by: Fata Nugraha <[email protected]>
  • Loading branch information
fatanugraha and Fata Nugraha authored May 11, 2023
1 parent 3ca4ea9 commit d1d4804
Showing 1 changed file with 47 additions and 6 deletions.
53 changes: 47 additions & 6 deletions dynamodump/dynamodump.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

import argparse
import base64
import boto3
import datetime
import errno
Expand Down Expand Up @@ -49,9 +50,26 @@
SCHEMA_FILE = "schema.json"
THREAD_START_DELAY = 1 # seconds

json.JSONEncoder.default = lambda self, obj: (
obj.isoformat() if isinstance(obj, datetime.datetime) else None
)

def encoder(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()

if isinstance(obj, bytes):
return base64.b64encode(obj).decode("utf-8")

return json.JSONEncoder.encoder(self, obj)


json.JSONEncoder.default = encoder


def process_item_types(dct):
for item in dct["Items"]:
for key in item:
val = item[key]
if "B" in val:
item[key]["B"] = base64.b64decode(val["B"].encode("utf-8"))


def _get_aws_client(
Expand Down Expand Up @@ -642,7 +660,12 @@ def do_empty(dynamo, table_name, billing_mode):


def do_backup(
dynamo, read_capacity, table_queue=None, src_table=None, filter_option=None
dynamo,
read_capacity,
limit=None,
table_queue=None,
src_table=None,
filter_option=None,
):
"""
Connect to DynamoDB and perform the backup for src_table or each table in table_queue
Expand Down Expand Up @@ -693,6 +716,7 @@ def do_backup(
mkdir_p(args.dumpPath + os.sep + table_name + os.sep + DATA_DIR)

i = 1
num_items = 0
last_evaluated_key = None

while True:
Expand Down Expand Up @@ -725,11 +749,16 @@ def do_backup(
"w+",
)
del scanned_table["ResponseMetadata"]

f.write(json.dumps(scanned_table, indent=JSON_INDENT))
f.close()

i += 1

num_items += len(scanned_table["Items"])
if limit and num_items > limit:
break

try:
last_evaluated_key = scanned_table["LastEvaluatedKey"]
except KeyError:
Expand Down Expand Up @@ -980,8 +1009,9 @@ def do_restore(
+ DATA_DIR
+ os.sep
+ data_file
)
),
)
process_item_types(item_data)
items.extend(item_data["Items"])

# batch write data
Expand Down Expand Up @@ -1241,6 +1271,11 @@ def main():
parser.add_argument(
"--log", help="Logging level - DEBUG|INFO|WARNING|ERROR|CRITICAL [optional]"
)
parser.add_argument(
"--limit",
help="Limit option for backup, will stop the back up process after number of backed up items reaches the limit [optional]",
type=int,
)
parser.add_argument(
"-f",
"--filterOption",
Expand Down Expand Up @@ -1342,6 +1377,7 @@ def main():
do_backup(
conn,
args.read_capacity,
limit=args.limit,
table_queue=None,
filter_option=filter_option,
)
Expand All @@ -1350,6 +1386,7 @@ def main():
conn,
args.read_capacity,
matching_backup_tables,
limit=args.limit,
filter_option=filter_option,
)
except AttributeError:
Expand All @@ -1362,7 +1399,11 @@ def main():
t = threading.Thread(
target=do_backup,
args=(conn, args.readCapacity),
kwargs={"table_queue": q, "filter_option": filter_option},
kwargs={
"table_queue": q,
"filter_option": filter_option,
"limit": args.limit,
},
)
t.start()
threads.append(t)
Expand Down

0 comments on commit d1d4804

Please sign in to comment.