Skip to content

Commit

Permalink
Merge pull request #187 from andrewwhitehead/perf-storage
Browse files Browse the repository at this point in the history
Performance tracking and improvements
  • Loading branch information
andrewwhitehead authored Sep 25, 2019
2 parents e509712 + 5b518ed commit 90a36e7
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 32 deletions.
15 changes: 14 additions & 1 deletion aries_cloudagent/messaging/credentials/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,21 @@ async def store_credential(
credential_definition,
raw_credential,
credential_exchange_record.credential_request_metadata,
credential_id=credential_id
credential_id=credential_id,
)

credential = await holder.get_credential(credential_id)

credential_exchange_record.state = CredentialExchange.STATE_STORED
credential_exchange_record.credential_id = credential_id
credential_exchange_record.credential = credential

# clear unnecessary data
credential_exchange_record.credential_offer = None
credential_exchange_record.credential_request = None
credential_exchange_record.raw_credential = None
# credential_request_metadata may be reused

await credential_exchange_record.save(self.context, reason="Store credential")

credential_stored_message = CredentialStored()
Expand Down Expand Up @@ -525,6 +532,12 @@ async def credential_stored(self, credential_stored_message: CredentialStored):
},
)

# clear unnecessary data
credential_exchange_record.credential_offer = None
credential_exchange_record.credential_request = None
credential_exchange_record.credential_request_metadata = None
credential_exchange_record.credential_values = None

credential_exchange_record.state = CredentialExchange.STATE_STORED
await credential_exchange_record.save(self.context, reason="Credential stored")

Expand Down
43 changes: 36 additions & 7 deletions aries_cloudagent/messaging/models/base_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class Meta:
LOG_STATE_FLAG = None
CACHE_TTL = 60
CACHE_ENABLED = False
UNENCRYPTED_TAGS = ()

def __init__(
self,
Expand Down Expand Up @@ -83,7 +84,7 @@ def record_value(self) -> dict:
@property
def value(self) -> dict:
"""Accessor for the JSON record value generated for this record."""
ret = self.tags
ret = self.strip_tag_prefix(self.tags)
ret.update({"created_at": self.created_at, "updated_at": self.updated_at})
ret.update(self.record_value)
return ret
Expand All @@ -98,8 +99,9 @@ def tags(self) -> dict:
"""Accessor for the record tags generated for this record."""
tags = {"state": self.state}
tags.update(self.record_tags)
unenc = self.UNENCRYPTED_TAGS or ()
# tag values must be non-empty
return {k: v for (k, v) in tags.items() if v}
return {(f"~{k}" if k in unenc else k): v for (k, v) in tags.items() if v}

@classmethod
def cache_key(cls, record_id: str, record_type: str = None):
Expand Down Expand Up @@ -186,7 +188,7 @@ async def retrieve_by_id(
result = await storage.get_record(cls.RECORD_TYPE, record_id)
vals = json.loads(result.value)
if result.tags:
vals.update(result.tags)
vals.update(cls.strip_tag_prefix(result.tags))
if cls.CACHE_ENABLED:
await cls.set_cached_key(context, cache_key, vals)

Expand All @@ -204,10 +206,10 @@ async def retrieve_by_tag_filter(
"""
storage: BaseStorage = await context.inject(BaseStorage)
result = await storage.search_records(
cls.RECORD_TYPE, tag_filter
cls.RECORD_TYPE, cls.prefix_tag_filter(tag_filter)
).fetch_single()
vals = json.loads(result.value)
vals.update(result.tags)
vals.update(cls.strip_tag_prefix(result.tags))
return cls.from_storage(result.id, vals)

@classmethod
Expand All @@ -221,11 +223,13 @@ async def query(
tag_filter: An optional dictionary of tag filter clauses
"""
storage: BaseStorage = await context.inject(BaseStorage)
found = await storage.search_records(cls.RECORD_TYPE, tag_filter).fetch_all()
found = await storage.search_records(
cls.RECORD_TYPE, cls.prefix_tag_filter(tag_filter)
).fetch_all()
result = []
for record in found:
vals = json.loads(record.value)
vals.update(record.tags)
vals.update(cls.strip_tag_prefix(record.tags))
result.append(cls.from_storage(record.id, vals))
return result

Expand Down Expand Up @@ -358,6 +362,31 @@ def log_state(
out += f" {k}: {v}\n"
print(out, file=sys.stderr)

@classmethod
def strip_tag_prefix(cls, tags: dict):
"""Strip tilde from unencrypted tag names."""
return (
{(k[1:] if "~" in k else k): v for (k, v) in tags.items()} if tags else {}
)

@classmethod
def prefix_tag_filter(cls, tag_filter: dict):
"""Prefix unencrypted tags used in the tag filter."""
ret = None
if tag_filter:
unenc = cls.UNENCRYPTED_TAGS or ()
ret = {}
for k, v in tag_filter.items():
if k in ("$or", "$and") and isinstance(v, list):
ret[k] = [cls.prefix_tag_filter(clause) for clause in v]
elif k == "$not" and isinstance(v, dict):
ret[k] = cls.prefix_tag_filter(v)
elif k in unenc:
ret[f"~{k}"] = v
else:
ret[k] = v
return ret

def __eq__(self, other: Any) -> bool:
"""Comparison between records."""
if type(other) is type(self):
Expand Down
16 changes: 16 additions & 0 deletions aries_cloudagent/messaging/models/tests/test_base_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class Meta:
model_class = BaseRecordImpl


class UnencTestImpl(BaseRecord):
UNENCRYPTED_TAGS = {"a", "b"}


class TestBaseRecord(AsyncTestCase):
def test_init_undef(self):
with self.assertRaises(TypeError):
Expand Down Expand Up @@ -180,3 +184,15 @@ async def test_webhook(self):
topic = "topic"
await record.send_webhook(context, payload, topic=topic)
assert mock_responder.webhooks == [(topic, payload)]

async def test_tag_prefix(self):
tags = {"~x": "a", "y": "b"}
assert UnencTestImpl.strip_tag_prefix(tags) == {"x": "a", "y": "b"}

tags = {"a": "x", "b": "y", "c": "z"}
assert UnencTestImpl.prefix_tag_filter(tags) == {"~a": "x", "~b": "y", "c": "z"}

tags = {"$or": [{"a": "x"}, {"c": "z"}]}
assert UnencTestImpl.prefix_tag_filter(tags) == {
"$or": [{"~a": "x"}, {"c": "z"}]
}
1 change: 1 addition & 0 deletions demo/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
asyncpg~=0.18.0
prompt_toolkit~=2.0.9
git+https://github.com/webpy/webpy.git#egg=web.py
18 changes: 17 additions & 1 deletion demo/runners/performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ def check_received_creds(self) -> (int, int):
async def update_creds(self):
await self.credential_event.wait()

async def set_tag_policy(self, cred_def_id, taggables):
req_body = {"taggables": taggables}
await self.admin_POST(f"/wallet/tag-policy/{cred_def_id}", req_body)


class FaberAgent(BaseAgent):
def __init__(self, port: int, **kwargs):
Expand Down Expand Up @@ -205,6 +209,7 @@ async def main(start_port: int, show_timing: bool = False, routing: bool = False

with log_timer("Publish duration:"):
await faber.publish_defs()
# await alice.set_tag_policy(faber.credential_definition_id, ["name"])

with log_timer("Connect duration:"):
if routing:
Expand Down Expand Up @@ -292,6 +297,15 @@ async def check_received(agent, issue_count, pb):
avg = recv_timer.duration / issue_count
alice.log(f"Average time per credential: {avg:.2f}s ({1/avg:.2f}/s)")

if alice.postgres:
await alice.collect_postgres_stats(str(issue_count) + " creds")
for line in alice.format_postgres_stats():
alice.log(line)
if faber.postgres:
await faber.collect_postgres_stats(str(issue_count) + " creds")
for line in faber.format_postgres_stats():
faber.log(line)

if show_timing:
timing = await alice.fetch_timing()
if timing:
Expand Down Expand Up @@ -358,6 +372,8 @@ async def check_received(agent, issue_count, pb):
require_indy()

try:
asyncio.get_event_loop().run_until_complete(main(args.port, True, args.routing))
asyncio.get_event_loop().run_until_complete(
main(args.port, False, args.routing)
)
except KeyboardInterrupt:
os._exit(1)
104 changes: 81 additions & 23 deletions demo/runners/support/agent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import asyncpg
import functools
import json
import logging
Expand Down Expand Up @@ -117,6 +118,7 @@ def __init__(
)
self.wallet_key = params.get("wallet_key") or self.ident + rand_name
self.did = None
self.wallet_stats = []

async def register_schema_and_creddef(self, schema_name, version, schema_attrs):
# Create a schema
Expand Down Expand Up @@ -168,29 +170,8 @@ def get_agent_args(self):
result.extend(
[
("--wallet-storage-type", "postgres_storage"),
(
"--wallet-storage-config",
json.dumps(
{
"url": f"{self.internal_host}:5432",
"tls": "None",
"max_connections": 5,
"min_idle_time": 0,
"connection_timeout": 10,
}
),
),
(
"--wallet-storage-creds",
json.dumps(
{
"account": "postgres",
"password": "mysecretpassword",
"admin_account": "postgres",
"admin_password": "mysecretpassword",
}
),
),
("--wallet-storage-config", json.dumps(self.postgres_config)),
("--wallet-storage-creds", json.dumps(self.postgres_creds)),
]
)
if self.webhook_url:
Expand Down Expand Up @@ -426,3 +407,80 @@ def format_timing(self, timing: dict) -> dict:

async def reset_timing(self):
await self.admin_POST("/status/reset", text=True)

@property
def postgres_config(self):
return {
"url": f"{self.internal_host}:5432",
"tls": "None",
"max_connections": 5,
"min_idle_time": 0,
"connection_timeout": 10,
}

@property
def postgres_creds(self):
return {
"account": "postgres",
"password": "mysecretpassword",
"admin_account": "postgres",
"admin_password": "mysecretpassword",
}

async def collect_postgres_stats(self, ident: str, vacuum_full: bool = True):
creds = self.postgres_creds

conn = await asyncpg.connect(
host=self.internal_host,
port="5432",
user=creds["admin_account"],
password=creds["admin_password"],
database=self.wallet_name,
)

tables = ("items", "tags_encrypted", "tags_plaintext")
for t in tables:
await conn.execute(f"VACUUM FULL {t}" if vacuum_full else f"VACUUM {t}")

sizes = await conn.fetch(
"""
SELECT relname AS "relation",
pg_size_pretty(pg_total_relation_size(C.oid)) AS "total_size"
FROM pg_class C
LEFT JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE nspname = 'public'
ORDER BY pg_total_relation_size(C.oid) DESC;
"""
)
results = {k: [0, "0B"] for k in tables}
for row in sizes:
if row["relation"] in results:
results[row["relation"]][1] = row["total_size"].replace(" ", "")
for t in tables:
row = await conn.fetchrow(f"""SELECT COUNT(*) AS "count" FROM {t}""")
results[t][0] = row["count"]
self.wallet_stats.append((ident, results))

await conn.close()

def format_postgres_stats(self):
if not self.wallet_stats:
return
yield "{:30} | {:>17} | {:>17} | {:>17}".format(
f"{self.wallet_name} DB", "items", "tags_encrypted", "tags_plaintext"
)
yield "=" * 90
for ident, stats in self.wallet_stats:
yield "{:30} | {:8d} {:>8} | {:8d} {:>8} | {:8d} {:>8}".format(
ident,
stats["items"][0],
stats["items"][1],
stats["tags_encrypted"][0],
stats["tags_encrypted"][1],
stats["tags_plaintext"][0],
stats["tags_plaintext"][1],
)
yield ""

def reset_postgres_stats(self):
self.wallet_stats.clear()

0 comments on commit 90a36e7

Please sign in to comment.