Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mint: Global db lock #391

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions cashu/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,19 @@ def _parse_timestamp(value, _):
self.lock = asyncio.Lock()

@asynccontextmanager
async def connect(self):
await self.lock.acquire()
try:
async with self.engine.connect() as conn: # type: ignore
async with conn.begin() as txn:
wconn = Connection(conn, txn, self.type, self.name, self.schema)

async def connect(self, lock_table: Optional[str] = None):
async with self.engine.connect() as conn: # type: ignore
async with conn.begin() as txn:
wconn = Connection(conn, txn, self.type, self.name, self.schema)
if lock_table:
if self.type in {POSTGRES, COCKROACH}:
await wconn.execute(
f"LOCK TABLE {table_with_schema(self, lock_table)} IN"
" EXCLUSIVE MODE;"
)
else:
await wconn.execute("BEGIN EXCLUSIVE;")
try:
if self.schema:
if self.type in {POSTGRES, COCKROACH}:
await wconn.execute(
Expand All @@ -164,10 +170,13 @@ async def connect(self):
await wconn.execute(
f"ATTACH '{self.path}' AS {self.schema}"
)

yield wconn
finally:
self.lock.release()
finally:
if lock_table:
if self.type in {POSTGRES, COCKROACH}:
pass
elif self.type == SQLITE:
await wconn.execute("COMMIT;")

async def fetchall(self, query: str, values: tuple = ()) -> list:
async with self.connect() as conn:
Expand Down
69 changes: 50 additions & 19 deletions cashu/mint/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ async def unset_proof_pending(
self, *, proof: Proof, db: Database, conn: Optional[Connection] = None
) -> None: ...

@abstractmethod
async def set_quote_pending(
self,
*,
db: Database,
quote_id: str,
conn: Optional[Connection] = None,
) -> None: ...

@abstractmethod
async def unset_quote_pending(
self, *, quote_id: str, db: Database, conn: Optional[Connection] = None
) -> None: ...

@abstractmethod
async def store_keyset(
self,
Expand All @@ -95,6 +109,7 @@ async def store_keyset(
@abstractmethod
async def get_balance(
self,
*,
db: Database,
conn: Optional[Connection] = None,
) -> int: ...
Expand Down Expand Up @@ -409,23 +424,6 @@ async def update_mint_quote(
),
)

# async def update_mint_quote_paid(
# self,
# *,
# quote_id: str,
# paid: bool,
# db: Database,
# conn: Optional[Connection] = None,
# ) -> None:
# await (conn or db).execute(
# f"UPDATE {table_with_schema(db, 'mint_quotes')} SET paid = ? WHERE"
# " quote = ?",
# (
# paid,
# quote_id,
# ),
# )

async def store_melt_quote(
self,
*,
Expand Down Expand Up @@ -459,9 +457,9 @@ async def get_melt_quote(
self,
*,
quote_id: str,
db: Database,
checking_id: Optional[str] = None,
request: Optional[str] = None,
db: Database,
conn: Optional[Connection] = None,
) -> Optional[MeltQuote]:
clauses = []
Expand Down Expand Up @@ -511,8 +509,8 @@ async def update_melt_quote(
async def store_keyset(
self,
*,
db: Database,
keyset: MintKeyset,
db: Database,
conn: Optional[Connection] = None,
) -> None:
await (conn or db).execute( # type: ignore
Expand All @@ -536,8 +534,40 @@ async def store_keyset(
),
)

async def set_quote_pending(
self,
*,
quote_id: str,
db: Database,
conn: Optional[Connection] = None,
) -> None:
await (conn or db).execute(
f"""
INSERT INTO {table_with_schema(db, 'quotes_pending')}
(quote)
VALUES (?)
""",
(quote_id,),
)

async def unset_quote_pending(
self,
*,
quote_id: str,
db: Database,
conn: Optional[Connection] = None,
) -> None:
await (conn or db).execute(
f"""
DELETE FROM {table_with_schema(db, 'quotes_pending')}
WHERE quote = ?
""",
(quote_id,),
)

async def get_balance(
self,
*,
db: Database,
conn: Optional[Connection] = None,
) -> int:
Expand Down Expand Up @@ -590,6 +620,7 @@ async def get_keyset(

async def get_proof_used(
self,
*,
db: Database,
secret: str,
conn: Optional[Connection] = None,
Expand Down
97 changes: 56 additions & 41 deletions cashu/mint/ledger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import copy
import math
import time
Expand Down Expand Up @@ -58,10 +57,6 @@

class Ledger(LedgerVerification, LedgerSpendingConditions):
backends: Mapping[Method, Mapping[Unit, LightningBackend]] = {}
locks: Dict[str, asyncio.Lock] = {} # holds multiprocessing locks
proofs_pending_lock: asyncio.Lock = (
asyncio.Lock()
) # holds locks for proofs_pending database
keysets: Dict[str, MintKeyset] = {}

def __init__(
Expand Down Expand Up @@ -419,10 +414,11 @@ async def mint(
await self._verify_outputs(outputs)
sum_amount_outputs = sum([b.amount for b in outputs])

self.locks[quote_id] = (
self.locks.get(quote_id) or asyncio.Lock()
) # create a new lock if it doesn't exist
async with self.locks[quote_id]:
quote = await self.get_mint_quote(quote_id=quote_id)

# set quote pending to avoid race conditions
await self._set_quote_pending(quote_id)
try:
quote = await self.get_mint_quote(quote_id=quote_id)
assert quote.paid, QuoteNotPaidError()
assert not quote.issued, "quote already issued"
Expand All @@ -438,7 +434,12 @@ async def mint(
logger.trace(f"crud: setting quote {quote_id} as issued")
quote.issued = True
await self.crud.update_mint_quote(quote=quote, db=self.db)
del self.locks[quote_id]
except Exception as e:
logger.trace(f"Mint exception: {e}")
raise e
finally:
await self._unset_quote_pending(quote_id)

return promises

async def melt_quote(
Expand Down Expand Up @@ -671,8 +672,9 @@ async def melt(
# verify inputs and their spending conditions
await self.verify_inputs_and_outputs(proofs=proofs)

# set proofs to pending to avoid race conditions
# set proofs and quote to pending to avoid race conditions
await self._set_proofs_pending(proofs)
await self._set_quote_pending(quote)
try:
melt_quote = await self.melt_mint_settle_internally(melt_quote)

Expand Down Expand Up @@ -718,8 +720,8 @@ async def melt(
logger.trace(f"Melt exception: {e}")
raise e
finally:
# delete proofs from pending list
await self._unset_proofs_pending(proofs)
await self._unset_quote_pending(quote)

return melt_quote.proof or "", return_promises

Expand All @@ -746,6 +748,7 @@ async def split(
"""
logger.trace("split called")

# set proofs pending to avoid race conditions
await self._set_proofs_pending(proofs)
try:
# explicitly check that amount of inputs is equal to amount of outputs
Expand All @@ -766,7 +769,6 @@ async def split(
logger.trace(f"split failed: {e}")
raise e
finally:
# delete proofs from pending list
await self._unset_proofs_pending(proofs)

logger.trace("split successful")
Expand Down Expand Up @@ -909,51 +911,64 @@ async def check_proofs_state(self, secrets: List[str]) -> List[ProofState]:
return states

async def _set_proofs_pending(self, proofs: List[Proof]) -> None:
"""If none of the proofs is in the pending table (_validate_proofs_pending), adds proofs to
the list of pending proofs or removes them. Used as a mutex for proofs.
"""Adds proofs to the list of pending proofs. If any of the proofs is
already in the list of pending proofs, raises an exception.

Note: The lock is enforced by the unique constraint on the secret column
in the database.

Args:
proofs (List[Proof]): Proofs to add to pending table.

Raises:
Exception: At least one proof already in pending table.
"""
# first we check whether these proofs are pending already
async with self.proofs_pending_lock:
async with self.db.connect() as conn:
await self._validate_proofs_pending(proofs, conn)
for p in proofs:
try:
await self.crud.set_proof_pending(
proof=p, db=self.db, conn=conn
)
except Exception:
raise TransactionError("proofs already pending.")
async with self.db.connect("proofs_pending") as conn:
# await self._validate_proofs_pending(proofs, conn)
for p in proofs:
try:
await self.crud.set_proof_pending(proof=p, db=self.db, conn=conn)
except Exception as e:
logger.trace(f"crud: set_proof_pending failed: {e}")
raise TransactionError("proofs already pending.")

async def _unset_proofs_pending(self, proofs: List[Proof]) -> None:
"""Deletes proofs from pending table.

Args:
proofs (List[Proof]): Proofs to delete.
"""
async with self.proofs_pending_lock:
async with self.db.connect() as conn:
for p in proofs:
await self.crud.unset_proof_pending(proof=p, db=self.db, conn=conn)
async with self.db.connect("proofs_pending") as conn:
for p in proofs:
await self.crud.unset_proof_pending(proof=p, db=self.db, conn=conn)

async def _validate_proofs_pending(
self, proofs: List[Proof], conn: Optional[Connection] = None
) -> None:
"""Checks if any of the provided proofs is in the pending proofs table.
async def _set_quote_pending(self, quote_id: str) -> None:
"""Adds quote to the list of pending quotes. If the quote is already in the list of pending quotes, raises an exception.

Note: The lock is enforced by the unique constraint on the checking_id column in the database.

Args:
proofs (List[Proof]): Proofs to check.
quote_id (str): Quote id to add to pending table.

Raises:
Exception: At least one of the proofs is in the pending table.
Exception: Quote already in pending table.
"""
proofs_pending = await self.crud.get_proofs_pending(db=self.db, conn=conn)
for p in proofs:
for pp in proofs_pending:
if p.secret == pp.secret:
raise TransactionError("proofs are pending.")
async with self.db.connect("quotes_pending") as conn:
try:
await self.crud.set_quote_pending(
quote_id=quote_id, db=self.db, conn=conn
)
except Exception as e:
logger.trace(f"crud: set_quote_pending failed: {e}")
raise TransactionError("quote already pending.")

async def _unset_quote_pending(self, quote_id: str) -> None:
"""Deletes quote from pending table.

Args:
quote_id (str): Quote id to delete.
"""
async with self.db.connect("quotes_pending") as conn:
await self.crud.unset_quote_pending(
quote_id=quote_id, db=self.db, conn=conn
)
16 changes: 16 additions & 0 deletions cashu/mint/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,19 @@ async def m013_keysets_add_encrypted_seed(db: Database):
f"ALTER TABLE {table_with_schema(db, 'keysets')} ADD COLUMN"
" seed_encryption_method TEXT"
)


async def m014_pending_quotes_table(db: Database) -> None:
"""
Store pending quotes.
"""
async with db.connect() as conn:
await conn.execute(f"""
CREATE TABLE IF NOT EXISTS {table_with_schema(db, 'quotes_pending')} (
quote TEXT NOT NULL,

UNIQUE (quote)

);
""")

2 changes: 1 addition & 1 deletion cashu/mint/verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def _verify_no_duplicate_proofs(self, proofs: List[Proof]) -> bool:

def _verify_no_duplicate_outputs(self, outputs: List[BlindedMessage]) -> bool:
B_s = [od.B_ for od in outputs]
if len(B_s) != len(list(set(B_s))):
if len(B_s) != len(set(B_s)):
return False
return True

Expand Down
Loading
Loading