Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unittests for models #93

Merged
merged 6 commits into from
Jan 18, 2016
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
tests for SQLQueryTest
  • Loading branch information
Alkalit committed Jan 18, 2016

Verified

This commit was signed with the committer’s verified signature.
mbklein Michael B. Klein
commit d6d64d02e42fafad80e953d7ccf0368cde38a4ab
233 changes: 230 additions & 3 deletions project/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -13,6 +13,8 @@
from .factories import RequestMinFactory, SQLQueryFactory, ResponseFactory


# TODO test atomicity

# http://stackoverflow.com/questions/13397038/uuid-max-character-length
# UUID_MAX_LENGTH = 36

@@ -31,7 +33,6 @@ def test_uuid_is_primary_key(self):

self.assertIsInstance(self.obj.id, uuid.UUID)

# 2016-01-13 16:08:11.531676+00:00
@freeze_time('2016-01-01 12:00:00')
def test_start_time_field_default(self):

@@ -200,11 +201,237 @@ def test_content_type_if_header_have_content_type(self):


class SQLQueryManagerTest(TestCase):
pass

def test_if_no_args_passed(self):
pass
def test_if_one_arg_passed(self):
pass
def if_a_few_args_passed(self):
pass
def if_objs_kw_arg_passed(self):
pass
def if_not_the_objs_kw_arg_passed(self):
pass


class SQLQueryTest(TestCase):
pass

def setUp(self):

self.obj = SQLQueryFactory.create()
self.end_time = datetime.datetime(2016, 1, 1, 12, 0, 5, tzinfo=pytz.UTC)
self.start_time = datetime.datetime(2016, 1, 1, 12, 0, 0, tzinfo=pytz.UTC)

@freeze_time('2016-01-01 12:00:00')
def test_start_time_field_default(self):

obj = SQLQueryFactory.create()
self.assertEqual(obj.start_time, datetime.datetime(2016, 1, 1, 12, 0, 0, tzinfo=pytz.UTC))

def test_is_m2o_related_to_request(self):

request = RequestMinFactory()
self.obj.request = request
self.obj.save()

self.assertIn(self.obj, request.queries.all())

def test_query_manager_instance(self):

self.assertIsInstance(models.SQLQuery.objects, models.SQLQueryManager)

def test_traceback_ln_only(self):

self.obj.traceback = """Traceback (most recent call last):
File "/home/user/some_script.py", line 10, in some_func
pass
File "/usr/lib/python2.7/bdb.py", line 20, in trace_dispatch
return self.dispatch_return(frame, arg)
File "/usr/lib/python2.7/bdb.py", line 30, in dispatch_return
if self.quitting: raise BdbQuit
BdbQuit"""

output = ('Traceback (most recent call last):\n'
' pass\n'
' return self.dispatch_return(frame, arg)\n'
' if self.quitting: raise BdbQuit')

self.assertEqual(self.obj.traceback_ln_only, output)

def test_formatted_query_if_no_query(self):

self.obj.query = ""
self.obj.formatted_query

def test_formatted_query_if_has_a_query(self):

query = """SELECT Book.title AS Title,
COUNT(*) AS Authors
FROM Book
JOIN Book_author
ON Book.isbn = Book_author.isbn
GROUP BY Book.title;"""

self.obj.query = query
self.obj.formatted_query

def test_num_joins_if_no_joins_in_query(self):

query = """SELECT Book.title AS Title,
COUNT(*) AS Authors
FROM Book
GROUP BY Book.title;"""

self.obj.query = query

self.assertEqual(self.obj.num_joins, 0)

def test_num_joins_if_joins_in_query(self):

query = """SELECT p.id
FROM Person p
JOIN address a ON p.Id = a.Person_ID
JOIN address_type at ON a.Type_ID = at.Id
JOIN `option` o ON p.Id = o.person_Id
JOIN option_address_type oat ON o.id = oat.option_id
WHERE a.country_id = 1 AND at.id <> oat.type_id;"""

self.obj.query = query
self.assertEqual(self.obj.num_joins, 4)

# FIXME bug, not a feature
def test_num_joins_if_no_joins_in_query_but_this_word_searched(self):

query = """SELECT Book.title FROM Book WHERE Book.title=`Join the dark side, Luke!`;"""

self.obj.query = query
self.assertEqual(self.obj.num_joins, 1)

def test_tables_involved_if_no_query(self):

self.obj.query = ''

self.assertEqual(self.obj.tables_involved, [])

def test_tables_involved_if_query_has_only_a_from_token(self):

query = """SELECT * FROM Book;"""
self.obj.query = query
self.assertEqual(self.obj.tables_involved, ['Book;'])

def test_tables_involved_if_query_has_a_join_token(self):

query = """SELECT p.id FROM Person p JOIN Address a ON p.Id = a.Person_ID;"""
self.obj.query = query
self.assertEqual(self.obj.tables_involved, ['Person', 'Address'])

def test_tables_involved_if_query_has_an_as_token(self):

query = 'SELECT Book.title AS Title FROM Book GROUP BY Book.title;'
self.obj.query = query
self.assertEqual(self.obj.tables_involved, ['Title', 'Book'])

# FIXME bug, not a feature
def test_tables_involved_check_with_fake_a_from_token(self):

query = """SELECT * FROM Book WHERE Book.title=`EVIL FROM WITHIN`;"""
self.obj.query = query
self.assertEqual(self.obj.tables_involved, ['Book', 'WITHIN`;'])

# FIXME bug, not a feature
def test_tables_involved_check_with_fake_a_join_token(self):

query = """SELECT * FROM Book WHERE Book.title=`Luke, join the dark side!`;"""
self.obj.query = query
self.assertEqual(self.obj.tables_involved, ['Book', 'the'])

# FIXME bug, not a feature
def test_tables_involved_check_with_fake_an_as_token(self):

query = """SELECT * FROM Book WHERE Book.title=`AS SOON AS POSIABLE`;"""
self.obj.query = query
self.assertEqual(self.obj.tables_involved, ['Book', 'POSIABLE`;'])

def test_tables_involved_if_query_has_subquery(self):

query = '''SELECT A.Col1, A.Col2, B.Col1,B.Col2
FROM (SELECT RealTableZ.Col1, RealTableY.Col2, RealTableY.ID AS ID
FROM RealTableZ
LEFT OUTER JOIN RealTableY ON RealTableZ.ForeignKeyY=RealTableY.ID
WHERE RealTableY.Col11>14
) AS B INNER JOIN A
ON A.ForeignKeyY=B.ID;'''
self.obj.query = query
self.assertEqual(self.obj.tables_involved, ['ID', 'RealTableZ', 'RealTableY', 'B', 'A'])

# FIXME bug, not a feature
def test_tables_involved_if_query_has_django_aliase_on_column_names(self):

query = 'SELECT foo AS bar FROM some_table;'
self.obj.query = query
self.assertEqual(self.obj.tables_involved, ['bar', 'some_table;'])

def test_save_if_no_end_and_start_time(self):

obj = SQLQueryFactory.create()

self.assertEqual(obj.time_taken, None)

@freeze_time('2016-01-01 12:00:00')
def test_save_if_has_end_time(self):

# datetime.datetime(2016, 1, 1, 12, 0, 5, tzinfo=pytz.UTC)
obj = SQLQueryFactory.create(end_time=self.end_time)

self.assertEqual(obj.time_taken, 5000.0)

@freeze_time('2016-01-01 12:00:00')
def test_save_if_has_start_time(self):

obj = SQLQueryFactory.create(start_time=self.start_time)

self.assertEqual(obj.time_taken, None)

def test_save_if_has_end_and_start_time(self):

obj = SQLQueryFactory.create(start_time=self.start_time, end_time=self.end_time)

self.assertEqual(obj.time_taken, 5000.0)

def test_save_if_has_pk_and_request(self):

self.obj.request = RequestMinFactory.create()
self.obj.save()
self.assertEqual(self.obj.request.num_sql_queries, 0)

def test_save_if_has_no_pk(self):

obj = SQLQueryFactory.build(start_time=self.start_time, end_time=self.end_time)
obj.request = RequestMinFactory.create()
obj.save()
self.assertEqual(obj.request.num_sql_queries, 1)

# should not rise
def test_save_if_has_no_request(self):

obj = SQLQueryFactory.build(start_time=self.start_time, end_time=self.end_time)
obj.save()

# FIXME a bug
def test_delete_if_no_related_requests(self):

with self.assertRaises(AttributeError):
self.obj.delete()

# self.assertNotIn(self.obj, models.SQLQuery.objects.all())

def test_delete_if_has_request(self):

self.obj.request = RequestMinFactory.create()
self.obj.save()
self.obj.delete()

self.assertNotIn(self.obj, models.SQLQuery.objects.all())


class BaseProfileTest(TestCase):
20 changes: 14 additions & 6 deletions silk/models.py
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ def total_meta_time(self):
# defined in atomic transaction within SQLQuery save()/delete() as well
# as in bulk_create of SQLQueryManager
# TODO: This is probably a bad way to do this, .count() will prob do?
num_sql_queries = IntegerField(default=0)
num_sql_queries = IntegerField(default=0) # TODO replace with count()

@property
def time_spent_on_sql_queries(self):
@@ -127,6 +127,7 @@ def headers(self):
return CaseInsensitiveDictionary(raw)


# TODO rewrite docstring
class SQLQueryManager(models.Manager):
def bulk_create(self, *args, **kwargs):
"""ensure that num_sql_queries remains consistent. Bulk create does not call
@@ -135,6 +136,7 @@ def bulk_create(self, *args, **kwargs):
objs = args[0]
else:
objs = kwargs.get('objs')

with atomic():
request_counter = Counter([x.request_id for x in objs])
requests = Request.objects.filter(pk__in=request_counter.keys())
@@ -158,6 +160,7 @@ class SQLQuery(models.Model):
traceback = TextField()
objects = SQLQueryManager()

# TODO docstring
@property
def traceback_ln_only(self):
return '\n'.join(self.traceback.split('\n')[::2])
@@ -177,13 +180,15 @@ def tables_involved(self):
TODO: Can probably parse the SQL using sqlparse etc and pull out table info that way?"""
components = [x.strip() for x in self.query.split()]
tables = []
for idx, c in enumerate(components):

for idx, component in enumerate(components):
# TODO: If django uses aliases on column names they will be falsely identified as tables...
if c.lower() == 'from' or c.lower() == 'join' or c.lower() == 'as':
if component.lower() == 'from' or component.lower() == 'join' or component.lower() == 'as':
try:
nxt = components[idx + 1]
if not nxt.startswith('('): # Subquery
stripped = nxt.strip().strip(',')
_next = components[idx + 1]
if not _next.startswith('('): # Subquery
stripped = _next.strip().strip(',')

if stripped:
tables.append(stripped)
except IndexError: # Reach the end
@@ -192,13 +197,16 @@ def tables_involved(self):

@atomic()
def save(self, *args, **kwargs):

if self.end_time and self.start_time:
interval = self.end_time - self.start_time
self.time_taken = interval.total_seconds() * 1000

if not self.pk:
if self.request:
self.request.num_sql_queries += 1
self.request.save()

super(SQLQuery, self).save(*args, **kwargs)

@atomic()