-
-
Notifications
You must be signed in to change notification settings - Fork 440
/
Copy pathsqlitedb.py
213 lines (183 loc) · 8.71 KB
/
sqlitedb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""SQLite abstraction for coverage.py"""
from __future__ import annotations
import contextlib
import re
import sqlite3
from typing import cast, Any, Iterable, Iterator, List, Optional, Tuple
from coverage.debug import AutoReprMixin, clipped_repr, exc_one_line
from coverage.exceptions import DataError
from coverage.types import TDebugCtl
class SqliteDb(AutoReprMixin):
"""A simple abstraction over a SQLite database.
Use as a context manager, then you can use it like a
:class:`python:sqlite3.Connection` object::
with SqliteDb(filename, debug_control) as db:
with db.execute("select a, b from some_table") as cur:
for a, b in cur:
etc(a, b)
"""
def __init__(self, filename: str, debug: TDebugCtl) -> None:
self.debug = debug
self.filename = filename
self.nest = 0
self.con: Optional[sqlite3.Connection] = None
def _connect(self) -> None:
"""Connect to the db and do universal initialization."""
if self.con is not None:
return
# It can happen that Python switches threads while the tracer writes
# data. The second thread will also try to write to the data,
# effectively causing a nested context. However, given the idempotent
# nature of the tracer operations, sharing a connection among threads
# is not a problem.
if self.debug.should("sql"):
self.debug.write(f"Connecting to {self.filename!r}")
try:
self.con = sqlite3.connect(self.filename, check_same_thread=False)
except sqlite3.Error as exc:
raise DataError(f"Couldn't use data file {self.filename!r}: {exc}") from exc
self.con.create_function("REGEXP", 2, lambda txt, pat: re.search(txt, pat) is not None)
# This pragma makes writing faster. It disables rollbacks, but we never need them.
self.execute_void("pragma journal_mode=off")
# This pragma makes writing faster. It can fail in unusual situations
# (https://github.com/nedbat/coveragepy/issues/1646), so use fail_ok=True
# to keep things going.
self.execute_void("pragma synchronous=off", fail_ok=True)
def close(self) -> None:
"""If needed, close the connection."""
if self.con is not None and self.filename != ":memory:":
self.con.close()
self.con = None
def __enter__(self) -> SqliteDb:
if self.nest == 0:
self._connect()
assert self.con is not None
self.con.__enter__()
self.nest += 1
return self
def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore[no-untyped-def]
self.nest -= 1
if self.nest == 0:
try:
assert self.con is not None
self.con.__exit__(exc_type, exc_value, traceback)
self.close()
except Exception as exc:
if self.debug.should("sql"):
self.debug.write(f"EXCEPTION from __exit__: {exc_one_line(exc)}")
raise DataError(f"Couldn't end data file {self.filename!r}: {exc}") from exc
def _execute(self, sql: str, parameters: Iterable[Any]) -> sqlite3.Cursor:
"""Same as :meth:`python:sqlite3.Connection.execute`."""
if self.debug.should("sql"):
tail = f" with {parameters!r}" if parameters else ""
self.debug.write(f"Executing {sql!r}{tail}")
try:
assert self.con is not None
try:
return self.con.execute(sql, parameters) # type: ignore[arg-type]
except Exception:
# In some cases, an error might happen that isn't really an
# error. Try again immediately.
# https://github.com/nedbat/coveragepy/issues/1010
return self.con.execute(sql, parameters) # type: ignore[arg-type]
except sqlite3.Error as exc:
msg = str(exc)
if self.filename != ":memory:":
try:
# `execute` is the first thing we do with the database, so try
# hard to provide useful hints if something goes wrong now.
with open(self.filename, "rb") as bad_file:
cov4_sig = b"!coverage.py: This is a private format"
if bad_file.read(len(cov4_sig)) == cov4_sig:
msg = (
"Looks like a coverage 4.x data file. " +
"Are you mixing versions of coverage?"
)
except Exception:
pass
if self.debug.should("sql"):
self.debug.write(f"EXCEPTION from execute: {exc_one_line(exc)}")
raise DataError(f"Couldn't use data file {self.filename!r}: {msg}") from exc
@contextlib.contextmanager
def execute(
self,
sql: str,
parameters: Iterable[Any] = (),
) -> Iterator[sqlite3.Cursor]:
"""Context managed :meth:`python:sqlite3.Connection.execute`.
Use with a ``with`` statement to auto-close the returned cursor.
"""
cur = self._execute(sql, parameters)
try:
yield cur
finally:
cur.close()
def execute_void(self, sql: str, parameters: Iterable[Any] = (), fail_ok: bool = False) -> None:
"""Same as :meth:`python:sqlite3.Connection.execute` when you don't need the cursor.
If `fail_ok` is True, then SQLite errors are ignored.
"""
try:
# PyPy needs the .close() calls here, or sqlite gets twisted up:
# https://bitbucket.org/pypy/pypy/issues/2872/default-isolation-mode-is-different-on
self._execute(sql, parameters).close()
except DataError:
if not fail_ok:
raise
def execute_for_rowid(self, sql: str, parameters: Iterable[Any] = ()) -> int:
"""Like execute, but returns the lastrowid."""
with self.execute(sql, parameters) as cur:
assert cur.lastrowid is not None
rowid: int = cur.lastrowid
if self.debug.should("sqldata"):
self.debug.write(f"Row id result: {rowid!r}")
return rowid
def execute_one(self, sql: str, parameters: Iterable[Any] = ()) -> Optional[Tuple[Any, ...]]:
"""Execute a statement and return the one row that results.
This is like execute(sql, parameters).fetchone(), except it is
correct in reading the entire result set. This will raise an
exception if more than one row results.
Returns a row, or None if there were no rows.
"""
with self.execute(sql, parameters) as cur:
rows = list(cur)
if len(rows) == 0:
return None
elif len(rows) == 1:
return cast(Tuple[Any, ...], rows[0])
else:
raise AssertionError(f"SQL {sql!r} shouldn't return {len(rows)} rows")
def _executemany(self, sql: str, data: List[Any]) -> sqlite3.Cursor:
"""Same as :meth:`python:sqlite3.Connection.executemany`."""
if self.debug.should("sql"):
final = ":" if self.debug.should("sqldata") else ""
self.debug.write(f"Executing many {sql!r} with {len(data)} rows{final}")
if self.debug.should("sqldata"):
for i, row in enumerate(data):
self.debug.write(f"{i:4d}: {row!r}")
assert self.con is not None
try:
return self.con.executemany(sql, data)
except Exception:
# In some cases, an error might happen that isn't really an
# error. Try again immediately.
# https://github.com/nedbat/coveragepy/issues/1010
return self.con.executemany(sql, data)
def executemany_void(self, sql: str, data: Iterable[Any]) -> None:
"""Same as :meth:`python:sqlite3.Connection.executemany` when you don't need the cursor."""
data = list(data)
if data:
self._executemany(sql, data).close()
def executescript(self, script: str) -> None:
"""Same as :meth:`python:sqlite3.Connection.executescript`."""
if self.debug.should("sql"):
self.debug.write("Executing script with {} chars: {}".format(
len(script), clipped_repr(script, 100),
))
assert self.con is not None
self.con.executescript(script).close()
def dump(self) -> str:
"""Return a multi-line string, the SQL dump of the database."""
assert self.con is not None
return "\n".join(self.con.iterdump())