Issue #11688: Add sqlite3.Connection.set_trace_callback(). Patch by Torsten Landschoff.

This commit is contained in:
Antoine Pitrou 2011-04-04 00:12:04 +02:00
parent d7edf3b82d
commit 5bfa0622ec
4 changed files with 128 additions and 1 deletions

View File

@ -369,6 +369,22 @@ Connection Objects
method with :const:`None` for *handler*. method with :const:`None` for *handler*.
.. method:: Connection.set_trace_callback(trace_callback)
Registers *trace_callback* to be called for each SQL statement that is
actually executed by the SQLite backend.
The only argument passed to the callback is the statement (as string) that
is being executed. The return value of the callback is ignored. Note that
the backend does not only run statements passed to the :meth:`Cursor.execute`
methods. Other sources include the transaction management of the Python
module and the execution of triggers defined in the current database.
Passing :const:`None` as *trace_callback* will disable the trace callback.
.. versionadded:: 3.3
.. method:: Connection.enable_load_extension(enabled) .. method:: Connection.enable_load_extension(enabled)
This routine allows/disallows the SQLite engine to load SQLite extensions This routine allows/disallows the SQLite engine to load SQLite extensions

View File

@ -175,10 +175,56 @@ class ProgressTests(unittest.TestCase):
con.execute("select 1 union select 2 union select 3").fetchall() con.execute("select 1 union select 2 union select 3").fetchall()
self.assertEqual(action, 0, "progress handler was not cleared") self.assertEqual(action, 0, "progress handler was not cleared")
class TraceCallbackTests(unittest.TestCase):
def CheckTraceCallbackUsed(self):
"""
Test that the trace callback is invoked once it is set.
"""
con = sqlite.connect(":memory:")
traced_statements = []
def trace(statement):
traced_statements.append(statement)
con.set_trace_callback(trace)
con.execute("create table foo(a, b)")
self.assertTrue(traced_statements)
self.assertTrue(any("create table foo" in stmt for stmt in traced_statements))
def CheckClearTraceCallback(self):
"""
Test that setting the trace callback to None clears the previously set callback.
"""
con = sqlite.connect(":memory:")
traced_statements = []
def trace(statement):
traced_statements.append(statement)
con.set_trace_callback(trace)
con.set_trace_callback(None)
con.execute("create table foo(a, b)")
self.assertFalse(traced_statements, "trace callback was not cleared")
def CheckUnicodeContent(self):
"""
Test that the statement can contain unicode literals.
"""
unicode_value = '\xf6\xe4\xfc\xd6\xc4\xdc\xdf\u20ac'
con = sqlite.connect(":memory:")
traced_statements = []
def trace(statement):
traced_statements.append(statement)
con.set_trace_callback(trace)
con.execute("create table foo(x)")
con.execute("insert into foo(x) values (?)", (unicode_value,))
con.commit()
self.assertTrue(any(unicode_value in stmt for stmt in traced_statements),
"Unicode data garbled in trace callback")
def suite(): def suite():
collation_suite = unittest.makeSuite(CollationTests, "Check") collation_suite = unittest.makeSuite(CollationTests, "Check")
progress_suite = unittest.makeSuite(ProgressTests, "Check") progress_suite = unittest.makeSuite(ProgressTests, "Check")
return unittest.TestSuite((collation_suite, progress_suite)) trace_suite = unittest.makeSuite(TraceCallbackTests, "Check")
return unittest.TestSuite((collation_suite, progress_suite, trace_suite))
def test(): def test():
runner = unittest.TextTestRunner() runner = unittest.TextTestRunner()

View File

@ -87,6 +87,9 @@ Core and Builtins
Library Library
------- -------
- Issue #11688: Add sqlite3.Connection.set_trace_callback(). Patch by
Torsten Landschoff.
- Issue #11746: Fix SSLContext.load_cert_chain() to accept elliptic curve - Issue #11746: Fix SSLContext.load_cert_chain() to accept elliptic curve
private keys. private keys.

View File

@ -904,6 +904,38 @@ static int _progress_handler(void* user_arg)
return rc; return rc;
} }
static void _trace_callback(void* user_arg, const char* statement_string)
{
PyObject *py_statement = NULL;
PyObject *ret = NULL;
#ifdef WITH_THREAD
PyGILState_STATE gilstate;
gilstate = PyGILState_Ensure();
#endif
py_statement = PyUnicode_DecodeUTF8(statement_string,
strlen(statement_string), "replace");
if (py_statement) {
ret = PyObject_CallFunctionObjArgs((PyObject*)user_arg, py_statement, NULL);
Py_DECREF(py_statement);
}
if (ret) {
Py_DECREF(ret);
} else {
if (_enable_callback_tracebacks) {
PyErr_Print();
} else {
PyErr_Clear();
}
}
#ifdef WITH_THREAD
PyGILState_Release(gilstate);
#endif
}
static PyObject* pysqlite_connection_set_authorizer(pysqlite_Connection* self, PyObject* args, PyObject* kwargs) static PyObject* pysqlite_connection_set_authorizer(pysqlite_Connection* self, PyObject* args, PyObject* kwargs)
{ {
PyObject* authorizer_cb; PyObject* authorizer_cb;
@ -963,6 +995,34 @@ static PyObject* pysqlite_connection_set_progress_handler(pysqlite_Connection* s
return Py_None; return Py_None;
} }
static PyObject* pysqlite_connection_set_trace_callback(pysqlite_Connection* self, PyObject* args, PyObject* kwargs)
{
PyObject* trace_callback;
static char *kwlist[] = { "trace_callback", NULL };
if (!pysqlite_check_thread(self) || !pysqlite_check_connection(self)) {
return NULL;
}
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O:set_trace_callback",
kwlist, &trace_callback)) {
return NULL;
}
if (trace_callback == Py_None) {
/* None clears the trace callback previously set */
sqlite3_trace(self->db, 0, (void*)0);
} else {
if (PyDict_SetItem(self->function_pinboard, trace_callback, Py_None) == -1)
return NULL;
sqlite3_trace(self->db, _trace_callback, trace_callback);
}
Py_INCREF(Py_None);
return Py_None;
}
#ifdef HAVE_LOAD_EXTENSION #ifdef HAVE_LOAD_EXTENSION
static PyObject* pysqlite_enable_load_extension(pysqlite_Connection* self, PyObject* args) static PyObject* pysqlite_enable_load_extension(pysqlite_Connection* self, PyObject* args)
{ {
@ -1516,6 +1576,8 @@ static PyMethodDef connection_methods[] = {
#endif #endif
{"set_progress_handler", (PyCFunction)pysqlite_connection_set_progress_handler, METH_VARARGS|METH_KEYWORDS, {"set_progress_handler", (PyCFunction)pysqlite_connection_set_progress_handler, METH_VARARGS|METH_KEYWORDS,
PyDoc_STR("Sets progress handler callback. Non-standard.")}, PyDoc_STR("Sets progress handler callback. Non-standard.")},
{"set_trace_callback", (PyCFunction)pysqlite_connection_set_trace_callback, METH_VARARGS|METH_KEYWORDS,
PyDoc_STR("Sets a trace callback called for each SQL statement (passed as unicode). Non-standard.")},
{"execute", (PyCFunction)pysqlite_connection_execute, METH_VARARGS, {"execute", (PyCFunction)pysqlite_connection_execute, METH_VARARGS,
PyDoc_STR("Executes a SQL statement. Non-standard.")}, PyDoc_STR("Executes a SQL statement. Non-standard.")},
{"executemany", (PyCFunction)pysqlite_connection_executemany, METH_VARARGS, {"executemany", (PyCFunction)pysqlite_connection_executemany, METH_VARARGS,