258 lines
9.3 KiB
Python
258 lines
9.3 KiB
Python
from collections import Counter
|
|
from queue import Queue
|
|
import time
|
|
import unittest
|
|
from unittest import mock
|
|
|
|
from sqlalchemy import create_engine, event
|
|
from sqlalchemy.engine.base import Engine
|
|
|
|
from easy_profile.profiler import DebugQuery, SessionProfiler, SQL_OPERATORS
|
|
from easy_profile.reporters import Reporter
|
|
|
|
|
|
debug_queries = [
|
|
DebugQuery("SELECT id FROM users", {}, 1541489542, 1541489543),
|
|
DebugQuery("SELECT id FROM users", {}, 1541489542, 1541489543),
|
|
DebugQuery("SELECT name FROM users", {}, 1541489543, 1541489544),
|
|
DebugQuery("SELECT gender FROM users", {}, 1541489544, 1541489545),
|
|
DebugQuery(
|
|
"INSERT INTO users (name) VALUES (%(param_1)s)",
|
|
{"param_1": "Arthur Dent"},
|
|
1541489545,
|
|
1541489546
|
|
),
|
|
DebugQuery(
|
|
"INSERT INTO users (name) VALUES (%(param_1)s)",
|
|
{"param_1": "Ford Prefect"},
|
|
1541489546,
|
|
1541489547
|
|
),
|
|
DebugQuery(
|
|
"UPDATE users SET name=(%(param_1)s)",
|
|
{"param_1": "Prefect Ford"},
|
|
1541489547,
|
|
1541489548
|
|
),
|
|
DebugQuery("DELETE FROM users", {}, 1541489548, 1541489549),
|
|
]
|
|
|
|
|
|
class TestSessionProfiler(unittest.TestCase):
|
|
|
|
def test_initialization_default(self):
|
|
profiler = SessionProfiler()
|
|
self.assertIs(profiler.engine, Engine)
|
|
self.assertEqual(profiler.db_name, "default")
|
|
self.assertFalse(profiler.alive)
|
|
self.assertIsNone(profiler.queries)
|
|
|
|
def test_initialization_custom(self):
|
|
engine = create_engine("sqlite:///test")
|
|
profiler = SessionProfiler(engine)
|
|
self.assertIs(profiler.engine, engine)
|
|
self.assertEqual(profiler.db_name, "test")
|
|
|
|
def test_begin(self):
|
|
profiler = SessionProfiler()
|
|
with mock.patch.object(profiler, "_reset_stats") as mocked:
|
|
profiler.begin()
|
|
mocked.assert_called()
|
|
self.assertTrue(profiler.alive)
|
|
self.assertIsInstance(profiler.queries, Queue)
|
|
self.assertTrue(profiler.queries.empty())
|
|
self.assertTrue(event.contains(
|
|
profiler.engine,
|
|
profiler._before,
|
|
profiler._before_cursor_execute
|
|
))
|
|
self.assertTrue(event.contains(
|
|
profiler.engine,
|
|
profiler._after,
|
|
profiler._after_cursor_execute
|
|
))
|
|
|
|
def test_begin_alive(self):
|
|
profiler = SessionProfiler()
|
|
profiler.alive = True
|
|
with self.assertRaises(AssertionError) as exec_info:
|
|
profiler.begin()
|
|
|
|
error = exec_info.exception
|
|
self.assertEqual(str(error), "Profiling session has already begun")
|
|
|
|
def test_commit(self):
|
|
profiler = SessionProfiler()
|
|
profiler.begin()
|
|
with mock.patch.object(profiler, "_get_stats") as mocked:
|
|
profiler.commit()
|
|
mocked.assert_called()
|
|
self.assertFalse(profiler.alive)
|
|
self.assertFalse(event.contains(
|
|
profiler.engine,
|
|
profiler._before,
|
|
profiler._before_cursor_execute
|
|
))
|
|
self.assertFalse(event.contains(
|
|
profiler.engine,
|
|
profiler._after,
|
|
profiler._after_cursor_execute
|
|
))
|
|
|
|
def test_commit_alive(self):
|
|
profiler = SessionProfiler()
|
|
profiler.alive = False
|
|
with self.assertRaises(AssertionError) as exec_info:
|
|
profiler.commit()
|
|
|
|
error = exec_info.exception
|
|
self.assertEqual(str(error), "Profiling session is already committed")
|
|
|
|
def test__reset_stats(self):
|
|
profiler = SessionProfiler()
|
|
profiler._reset_stats()
|
|
self.assertEqual(profiler._stats["total"], 0)
|
|
self.assertEqual(profiler._stats["duration"], 0)
|
|
self.assertEqual(profiler._stats["select"], 0)
|
|
self.assertEqual(profiler._stats["insert"], 0)
|
|
self.assertEqual(profiler._stats["update"], 0)
|
|
self.assertEqual(profiler._stats["call_stack"], [])
|
|
self.assertEqual(profiler._stats["duplicates"], Counter())
|
|
self.assertEqual(profiler._stats["db"], profiler.db_name)
|
|
|
|
def test__get_stats(self):
|
|
profiler = SessionProfiler()
|
|
profiler.queries = Queue()
|
|
profiler._reset_stats()
|
|
duplicates = Counter()
|
|
for query in debug_queries:
|
|
profiler.queries.put(query)
|
|
duplicates_count = duplicates.get(query.statement, -1)
|
|
duplicates[query.statement] = duplicates_count + 1
|
|
|
|
stats = profiler._get_stats()
|
|
|
|
for op in SQL_OPERATORS:
|
|
res = filter(lambda x: op.upper() in x.statement, debug_queries)
|
|
self.assertEqual(stats[op], len(list(res)))
|
|
|
|
self.assertEqual(stats["db"], profiler.db_name)
|
|
self.assertEqual(stats["total"], len(debug_queries))
|
|
self.assertListEqual(debug_queries, stats["call_stack"])
|
|
self.assertDictEqual(stats["duplicates"], duplicates)
|
|
|
|
@mock.patch("easy_profile.profiler._timer")
|
|
def test__before_cursor_execute(self, mocked):
|
|
expected_time = time.time()
|
|
mocked.return_value = expected_time
|
|
profiler = SessionProfiler()
|
|
context = mock.Mock()
|
|
profiler._before_cursor_execute(
|
|
conn=None,
|
|
cursor=None,
|
|
statement=None,
|
|
parameters={},
|
|
context=context,
|
|
executemany=None
|
|
)
|
|
self.assertEqual(context._query_start_time, expected_time)
|
|
|
|
@mock.patch("easy_profile.profiler._timer")
|
|
def test__after_cursor_execute(self, mocked):
|
|
expected_query = debug_queries[0]
|
|
mocked.return_value = expected_query.end_time
|
|
profiler = SessionProfiler()
|
|
context = mock.Mock()
|
|
context._query_start_time = expected_query.start_time
|
|
with profiler:
|
|
profiler._after_cursor_execute(
|
|
conn=None,
|
|
cursor=None,
|
|
statement=expected_query.statement,
|
|
parameters=expected_query.parameters,
|
|
context=context,
|
|
executemany=None
|
|
)
|
|
actual_query = profiler.queries.get()
|
|
self.assertEqual(actual_query, expected_query)
|
|
|
|
def test_stats(self):
|
|
profiler = SessionProfiler()
|
|
self.assertIsNotNone(profiler.stats)
|
|
|
|
@mock.patch("easy_profile.profiler.SessionProfiler.begin")
|
|
@mock.patch("easy_profile.profiler.SessionProfiler.commit")
|
|
def test_contextmanager_interface(self, mocked_commit, mocked_begin):
|
|
profiler = SessionProfiler()
|
|
with profiler:
|
|
mocked_begin.assert_called()
|
|
mocked_commit.assert_called()
|
|
|
|
def test_decorator(self):
|
|
engine = self._create_engine()
|
|
profiler = SessionProfiler(engine)
|
|
wrapper = profiler()
|
|
wrapper(self._decorated_func)(engine)
|
|
# Test profile statistics
|
|
self.assertEqual(profiler.stats["db"], "undefined")
|
|
self.assertEqual(profiler.stats["total"], 4)
|
|
self.assertEqual(profiler.stats["select"], 3)
|
|
self.assertEqual(profiler.stats["delete"], 1)
|
|
self.assertEqual(profiler.stats["duplicates_count"], 1)
|
|
|
|
def test_decorator_path(self):
|
|
expected_path = "test_path"
|
|
engine = self._create_engine()
|
|
profiler = SessionProfiler(engine)
|
|
reporter = mock.Mock(spec=Reporter)
|
|
# Get profiler decorator with specified path
|
|
wrapper = profiler(path=expected_path, reporter=reporter)
|
|
wrapper(self._decorated_func)(engine)
|
|
# Test that reporter method report was called with expected path
|
|
reporter.report.assert_called_with(expected_path, profiler.stats)
|
|
|
|
def test_decorator_path_callback(self):
|
|
expected_path = "path_callback"
|
|
|
|
def _callback(func, *args, **kwargs):
|
|
return expected_path
|
|
|
|
engine = self._create_engine()
|
|
profiler = SessionProfiler(engine)
|
|
reporter = mock.Mock(spec=Reporter)
|
|
# Get profiler decorator with specified path_callback
|
|
wrapper = profiler(path_callback=_callback, reporter=reporter)
|
|
wrapper(self._decorated_func)(engine)
|
|
# Test that reporter method report was called with expected path
|
|
reporter.report.assert_called_with(expected_path, profiler.stats)
|
|
|
|
def test_decorator_path_and_path_callback(self):
|
|
expected_path = "path_callback"
|
|
|
|
def _callback(func, *args, **kwargs):
|
|
return expected_path
|
|
|
|
engine = self._create_engine()
|
|
profiler = SessionProfiler(engine)
|
|
reporter = mock.Mock(spec=Reporter)
|
|
# Get profiler decorator with specified path_callback
|
|
wrapper = profiler(
|
|
path="fail",
|
|
path_callback=_callback,
|
|
reporter=reporter
|
|
)
|
|
wrapper(self._decorated_func)(engine)
|
|
# Test that reporter method report was called with expected path
|
|
reporter.report.assert_called_with(expected_path, profiler.stats)
|
|
|
|
def _create_engine(self):
|
|
"""Creates and returns sqlalchemy engine."""
|
|
return create_engine("sqlite://")
|
|
|
|
def _decorated_func(self, engine):
|
|
"""Function for testing profiler as decorator."""
|
|
engine.execute("CREATE TABLE users (id int, name varchar(8))")
|
|
engine.execute("SELECT id FROM users")
|
|
engine.execute("SELECT id FROM users")
|
|
engine.execute("SELECT name FROM users")
|
|
engine.execute("DELETE FROM users")
|