Removed the Requirement to Install Python and NodeJS (Now Bundled with Borealis)

This commit is contained in:
2025-04-24 00:42:19 -06:00
parent 785265d3e7
commit 9c68cdea84
7786 changed files with 2386458 additions and 217 deletions

View File

@ -0,0 +1,15 @@
from test.support import import_helper, load_package_tests, verbose
# Skip test if _sqlite3 module not installed.
import_helper.import_module('_sqlite3')
import os
import sqlite3
# Implement the unittest "load tests" protocol.
def load_tests(*args):
pkg_dir = os.path.dirname(__file__)
return load_package_tests(pkg_dir, *args)
if verbose:
print(f"test_sqlite3: testing with SQLite version {sqlite3.sqlite_version}")

View File

@ -0,0 +1,4 @@
from test.test_sqlite3 import load_tests # Needed for the "load tests" protocol.
import unittest
unittest.main()

View File

@ -0,0 +1,164 @@
import sqlite3 as sqlite
import unittest
from .util import memory_database
class BackupTests(unittest.TestCase):
def setUp(self):
cx = self.cx = sqlite.connect(":memory:")
cx.execute('CREATE TABLE foo (key INTEGER)')
cx.executemany('INSERT INTO foo (key) VALUES (?)', [(3,), (4,)])
cx.commit()
def tearDown(self):
self.cx.close()
def verify_backup(self, bckcx):
result = bckcx.execute("SELECT key FROM foo ORDER BY key").fetchall()
self.assertEqual(result[0][0], 3)
self.assertEqual(result[1][0], 4)
def test_bad_target(self):
with self.assertRaises(TypeError):
self.cx.backup(None)
with self.assertRaises(TypeError):
self.cx.backup()
def test_bad_target_filename(self):
with self.assertRaises(TypeError):
self.cx.backup('some_file_name.db')
def test_bad_target_same_connection(self):
with self.assertRaises(ValueError):
self.cx.backup(self.cx)
def test_bad_target_closed_connection(self):
with memory_database() as bck:
bck.close()
with self.assertRaises(sqlite.ProgrammingError):
self.cx.backup(bck)
def test_bad_source_closed_connection(self):
with memory_database() as bck:
source = sqlite.connect(":memory:")
source.close()
with self.assertRaises(sqlite.ProgrammingError):
source.backup(bck)
def test_bad_target_in_transaction(self):
with memory_database() as bck:
bck.execute('CREATE TABLE bar (key INTEGER)')
bck.executemany('INSERT INTO bar (key) VALUES (?)', [(3,), (4,)])
with self.assertRaises(sqlite.OperationalError) as cm:
self.cx.backup(bck)
def test_keyword_only_args(self):
with self.assertRaises(TypeError):
with memory_database() as bck:
self.cx.backup(bck, 1)
def test_simple(self):
with memory_database() as bck:
self.cx.backup(bck)
self.verify_backup(bck)
def test_progress(self):
journal = []
def progress(status, remaining, total):
journal.append(status)
with memory_database() as bck:
self.cx.backup(bck, pages=1, progress=progress)
self.verify_backup(bck)
self.assertEqual(len(journal), 2)
self.assertEqual(journal[0], sqlite.SQLITE_OK)
self.assertEqual(journal[1], sqlite.SQLITE_DONE)
def test_progress_all_pages_at_once_1(self):
journal = []
def progress(status, remaining, total):
journal.append(remaining)
with memory_database() as bck:
self.cx.backup(bck, progress=progress)
self.verify_backup(bck)
self.assertEqual(len(journal), 1)
self.assertEqual(journal[0], 0)
def test_progress_all_pages_at_once_2(self):
journal = []
def progress(status, remaining, total):
journal.append(remaining)
with memory_database() as bck:
self.cx.backup(bck, pages=-1, progress=progress)
self.verify_backup(bck)
self.assertEqual(len(journal), 1)
self.assertEqual(journal[0], 0)
def test_non_callable_progress(self):
with self.assertRaises(TypeError) as cm:
with memory_database() as bck:
self.cx.backup(bck, pages=1, progress='bar')
self.assertEqual(str(cm.exception), 'progress argument must be a callable')
def test_modifying_progress(self):
journal = []
def progress(status, remaining, total):
if not journal:
self.cx.execute('INSERT INTO foo (key) VALUES (?)', (remaining+1000,))
self.cx.commit()
journal.append(remaining)
with memory_database() as bck:
self.cx.backup(bck, pages=1, progress=progress)
self.verify_backup(bck)
result = bck.execute("SELECT key FROM foo"
" WHERE key >= 1000"
" ORDER BY key").fetchall()
self.assertEqual(result[0][0], 1001)
self.assertEqual(len(journal), 3)
self.assertEqual(journal[0], 1)
self.assertEqual(journal[1], 1)
self.assertEqual(journal[2], 0)
def test_failing_progress(self):
def progress(status, remaining, total):
raise SystemError('nearly out of space')
with self.assertRaises(SystemError) as err:
with memory_database() as bck:
self.cx.backup(bck, progress=progress)
self.assertEqual(str(err.exception), 'nearly out of space')
def test_database_source_name(self):
with memory_database() as bck:
self.cx.backup(bck, name='main')
with memory_database() as bck:
self.cx.backup(bck, name='temp')
with self.assertRaises(sqlite.OperationalError) as cm:
with memory_database() as bck:
self.cx.backup(bck, name='non-existing')
self.assertIn("unknown database", str(cm.exception))
self.cx.execute("ATTACH DATABASE ':memory:' AS attached_db")
self.cx.execute('CREATE TABLE attached_db.foo (key INTEGER)')
self.cx.executemany('INSERT INTO attached_db.foo (key) VALUES (?)', [(3,), (4,)])
self.cx.commit()
with memory_database() as bck:
self.cx.backup(bck, name='attached_db')
self.verify_backup(bck)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,155 @@
"""sqlite3 CLI tests."""
import sqlite3
import unittest
from sqlite3.__main__ import main as cli
from test.support.os_helper import TESTFN, unlink
from test.support import captured_stdout, captured_stderr, captured_stdin
class CommandLineInterface(unittest.TestCase):
def _do_test(self, *args, expect_success=True):
with (
captured_stdout() as out,
captured_stderr() as err,
self.assertRaises(SystemExit) as cm
):
cli(args)
return out.getvalue(), err.getvalue(), cm.exception.code
def expect_success(self, *args):
out, err, code = self._do_test(*args)
self.assertEqual(code, 0,
"\n".join([f"Unexpected failure: {args=}", out, err]))
self.assertEqual(err, "")
return out
def expect_failure(self, *args):
out, err, code = self._do_test(*args, expect_success=False)
self.assertNotEqual(code, 0,
"\n".join([f"Unexpected failure: {args=}", out, err]))
self.assertEqual(out, "")
return err
def test_cli_help(self):
out = self.expect_success("-h")
self.assertIn("usage: python -m sqlite3", out)
def test_cli_version(self):
out = self.expect_success("-v")
self.assertIn(sqlite3.sqlite_version, out)
def test_cli_execute_sql(self):
out = self.expect_success(":memory:", "select 1")
self.assertIn("(1,)", out)
def test_cli_execute_too_much_sql(self):
stderr = self.expect_failure(":memory:", "select 1; select 2")
err = "ProgrammingError: You can only execute one statement at a time"
self.assertIn(err, stderr)
def test_cli_execute_incomplete_sql(self):
stderr = self.expect_failure(":memory:", "sel")
self.assertIn("OperationalError (SQLITE_ERROR)", stderr)
def test_cli_on_disk_db(self):
self.addCleanup(unlink, TESTFN)
out = self.expect_success(TESTFN, "create table t(t)")
self.assertEqual(out, "")
out = self.expect_success(TESTFN, "select count(t) from t")
self.assertIn("(0,)", out)
class InteractiveSession(unittest.TestCase):
MEMORY_DB_MSG = "Connected to a transient in-memory database"
PS1 = "sqlite> "
PS2 = "... "
def run_cli(self, *args, commands=()):
with (
captured_stdin() as stdin,
captured_stdout() as stdout,
captured_stderr() as stderr,
self.assertRaises(SystemExit) as cm
):
for cmd in commands:
stdin.write(cmd + "\n")
stdin.seek(0)
cli(args)
out = stdout.getvalue()
err = stderr.getvalue()
self.assertEqual(cm.exception.code, 0,
f"Unexpected failure: {args=}\n{out}\n{err}")
return out, err
def test_interact(self):
out, err = self.run_cli()
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertTrue(out.endswith(self.PS1))
self.assertEqual(out.count(self.PS1), 1)
self.assertEqual(out.count(self.PS2), 0)
def test_interact_quit(self):
out, err = self.run_cli(commands=(".quit",))
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertTrue(out.endswith(self.PS1))
self.assertEqual(out.count(self.PS1), 1)
self.assertEqual(out.count(self.PS2), 0)
def test_interact_version(self):
out, err = self.run_cli(commands=(".version",))
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn(sqlite3.sqlite_version + "\n", out)
self.assertTrue(out.endswith(self.PS1))
self.assertEqual(out.count(self.PS1), 2)
self.assertEqual(out.count(self.PS2), 0)
self.assertIn(sqlite3.sqlite_version, out)
def test_interact_valid_sql(self):
out, err = self.run_cli(commands=("SELECT 1;",))
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn("(1,)\n", out)
self.assertTrue(out.endswith(self.PS1))
self.assertEqual(out.count(self.PS1), 2)
self.assertEqual(out.count(self.PS2), 0)
def test_interact_incomplete_multiline_sql(self):
out, err = self.run_cli(commands=("SELECT 1",))
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertTrue(out.endswith(self.PS2))
self.assertEqual(out.count(self.PS1), 1)
self.assertEqual(out.count(self.PS2), 1)
def test_interact_valid_multiline_sql(self):
out, err = self.run_cli(commands=("SELECT 1\n;",))
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn(self.PS2, out)
self.assertIn("(1,)\n", out)
self.assertTrue(out.endswith(self.PS1))
self.assertEqual(out.count(self.PS1), 2)
self.assertEqual(out.count(self.PS2), 1)
def test_interact_invalid_sql(self):
out, err = self.run_cli(commands=("sel;",))
self.assertIn(self.MEMORY_DB_MSG, err)
self.assertIn("OperationalError (SQLITE_ERROR)", err)
self.assertTrue(out.endswith(self.PS1))
self.assertEqual(out.count(self.PS1), 2)
self.assertEqual(out.count(self.PS2), 0)
def test_interact_on_disk_file(self):
self.addCleanup(unlink, TESTFN)
out, err = self.run_cli(TESTFN, commands=("CREATE TABLE t(t);",))
self.assertIn(TESTFN, err)
self.assertTrue(out.endswith(self.PS1))
out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",))
self.assertIn("(0,)\n", out)
if __name__ == "__main__":
unittest.main()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,233 @@
# Author: Paul Kippes <kippesp@gmail.com>
import unittest
from .util import memory_database
from .util import MemoryDatabaseMixin
from .util import requires_virtual_table
class DumpTests(MemoryDatabaseMixin, unittest.TestCase):
def test_table_dump(self):
expected_sqls = [
"PRAGMA foreign_keys=OFF;",
"""CREATE TABLE "index"("index" blob);"""
,
"""INSERT INTO "index" VALUES(X'01');"""
,
"""CREATE TABLE "quoted""table"("quoted""field" text);"""
,
"""INSERT INTO "quoted""table" VALUES('quoted''value');"""
,
"CREATE TABLE t1(id integer primary key, s1 text, " \
"t1_i1 integer not null, i2 integer, unique (s1), " \
"constraint t1_idx1 unique (i2), " \
"constraint t1_i1_idx1 unique (t1_i1));"
,
"INSERT INTO \"t1\" VALUES(1,'foo',10,20);"
,
"INSERT INTO \"t1\" VALUES(2,'foo2',30,30);"
,
"CREATE TABLE t2(id integer, t2_i1 integer, " \
"t2_i2 integer, primary key (id)," \
"foreign key(t2_i1) references t1(t1_i1));"
,
# Foreign key violation.
"INSERT INTO \"t2\" VALUES(1,2,3);"
,
"CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \
"begin " \
"update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \
"end;"
,
"CREATE VIEW v1 as select * from t1 left join t2 " \
"using (id);"
]
[self.cu.execute(s) for s in expected_sqls]
i = self.cx.iterdump()
actual_sqls = [s for s in i]
expected_sqls = [
"PRAGMA foreign_keys=OFF;",
"BEGIN TRANSACTION;",
*expected_sqls[1:],
"COMMIT;",
]
[self.assertEqual(expected_sqls[i], actual_sqls[i])
for i in range(len(expected_sqls))]
def test_table_dump_filter(self):
all_table_sqls = [
"""CREATE TABLE "some_table_2" ("id_1" INTEGER);""",
"""INSERT INTO "some_table_2" VALUES(3);""",
"""INSERT INTO "some_table_2" VALUES(4);""",
"""CREATE TABLE "test_table_1" ("id_2" INTEGER);""",
"""INSERT INTO "test_table_1" VALUES(1);""",
"""INSERT INTO "test_table_1" VALUES(2);""",
]
all_views_sqls = [
"""CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""",
"""CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""",
]
# Create database structure.
for sql in [*all_table_sqls, *all_views_sqls]:
self.cu.execute(sql)
# %_table_% matches all tables.
dump_sqls = list(self.cx.iterdump(filter="%_table_%"))
self.assertEqual(
dump_sqls,
["BEGIN TRANSACTION;", *all_table_sqls, "COMMIT;"],
)
# view_% matches all views.
dump_sqls = list(self.cx.iterdump(filter="view_%"))
self.assertEqual(
dump_sqls,
["BEGIN TRANSACTION;", *all_views_sqls, "COMMIT;"],
)
# %_1 matches tables and views with the _1 suffix.
dump_sqls = list(self.cx.iterdump(filter="%_1"))
self.assertEqual(
dump_sqls,
[
"BEGIN TRANSACTION;",
"""CREATE TABLE "test_table_1" ("id_2" INTEGER);""",
"""INSERT INTO "test_table_1" VALUES(1);""",
"""INSERT INTO "test_table_1" VALUES(2);""",
"""CREATE VIEW "view_1" AS SELECT * FROM "some_table_2";""",
"COMMIT;"
],
)
# some_% matches some_table_2.
dump_sqls = list(self.cx.iterdump(filter="some_%"))
self.assertEqual(
dump_sqls,
[
"BEGIN TRANSACTION;",
"""CREATE TABLE "some_table_2" ("id_1" INTEGER);""",
"""INSERT INTO "some_table_2" VALUES(3);""",
"""INSERT INTO "some_table_2" VALUES(4);""",
"COMMIT;"
],
)
# Only single object.
dump_sqls = list(self.cx.iterdump(filter="view_2"))
self.assertEqual(
dump_sqls,
[
"BEGIN TRANSACTION;",
"""CREATE VIEW "view_2" AS SELECT * FROM "test_table_1";""",
"COMMIT;"
],
)
# % matches all objects.
dump_sqls = list(self.cx.iterdump(filter="%"))
self.assertEqual(
dump_sqls,
["BEGIN TRANSACTION;", *all_table_sqls, *all_views_sqls, "COMMIT;"],
)
def test_dump_autoincrement(self):
expected = [
'CREATE TABLE "t1" (id integer primary key autoincrement);',
'INSERT INTO "t1" VALUES(NULL);',
'CREATE TABLE "t2" (id integer primary key autoincrement);',
]
self.cu.executescript("".join(expected))
# the NULL value should now be automatically be set to 1
expected[1] = expected[1].replace("NULL", "1")
expected.insert(0, "BEGIN TRANSACTION;")
expected.extend([
'DELETE FROM "sqlite_sequence";',
'INSERT INTO "sqlite_sequence" VALUES(\'t1\',1);',
'COMMIT;',
])
actual = [stmt for stmt in self.cx.iterdump()]
self.assertEqual(expected, actual)
def test_dump_autoincrement_create_new_db(self):
self.cu.execute("BEGIN TRANSACTION")
self.cu.execute("CREATE TABLE t1 (id integer primary key autoincrement)")
self.cu.execute("CREATE TABLE t2 (id integer primary key autoincrement)")
self.cu.executemany("INSERT INTO t1 VALUES(?)", ((None,) for _ in range(9)))
self.cu.executemany("INSERT INTO t2 VALUES(?)", ((None,) for _ in range(4)))
self.cx.commit()
with memory_database() as cx2:
query = "".join(self.cx.iterdump())
cx2.executescript(query)
cu2 = cx2.cursor()
dataset = (
("t1", 9),
("t2", 4),
)
for table, seq in dataset:
with self.subTest(table=table, seq=seq):
res = cu2.execute("""
SELECT "seq" FROM "sqlite_sequence" WHERE "name" == ?
""", (table,))
rows = res.fetchall()
self.assertEqual(rows[0][0], seq)
def test_unorderable_row(self):
# iterdump() should be able to cope with unorderable row types (issue #15545)
class UnorderableRow:
def __init__(self, cursor, row):
self.row = row
def __getitem__(self, index):
return self.row[index]
self.cx.row_factory = UnorderableRow
CREATE_ALPHA = """CREATE TABLE "alpha" ("one");"""
CREATE_BETA = """CREATE TABLE "beta" ("two");"""
expected = [
"BEGIN TRANSACTION;",
CREATE_ALPHA,
CREATE_BETA,
"COMMIT;"
]
self.cu.execute(CREATE_BETA)
self.cu.execute(CREATE_ALPHA)
got = list(self.cx.iterdump())
self.assertEqual(expected, got)
def test_dump_custom_row_factory(self):
# gh-118221: iterdump should be able to cope with custom row factories.
def dict_factory(cu, row):
fields = [col[0] for col in cu.description]
return dict(zip(fields, row))
self.cx.row_factory = dict_factory
CREATE_TABLE = "CREATE TABLE test(t);"
expected = ["BEGIN TRANSACTION;", CREATE_TABLE, "COMMIT;"]
self.cu.execute(CREATE_TABLE)
actual = list(self.cx.iterdump())
self.assertEqual(expected, actual)
self.assertEqual(self.cx.row_factory, dict_factory)
@requires_virtual_table("fts4")
def test_dump_virtual_tables(self):
# gh-64662
expected = [
"BEGIN TRANSACTION;",
"PRAGMA writable_schema=ON;",
("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"
"VALUES('table','test','test',0,'CREATE VIRTUAL TABLE test USING fts4(example)');"),
"CREATE TABLE 'test_content'(docid INTEGER PRIMARY KEY, 'c0example');",
"CREATE TABLE 'test_docsize'(docid INTEGER PRIMARY KEY, size BLOB);",
("CREATE TABLE 'test_segdir'(level INTEGER,idx INTEGER,start_block INTEGER,"
"leaves_end_block INTEGER,end_block INTEGER,root BLOB,PRIMARY KEY(level, idx));"),
"CREATE TABLE 'test_segments'(blockid INTEGER PRIMARY KEY, block BLOB);",
"CREATE TABLE 'test_stat'(id INTEGER PRIMARY KEY, value BLOB);",
"PRAGMA writable_schema=OFF;",
"COMMIT;"
]
self.cu.execute("CREATE VIRTUAL TABLE test USING fts4(example)")
actual = list(self.cx.iterdump())
self.assertEqual(expected, actual)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,323 @@
# pysqlite2/test/factory.py: tests for the various factories in pysqlite
#
# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the authors be held liable for any damages
# arising from the use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
# appreciated but is not required.
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
import unittest
import sqlite3 as sqlite
from collections.abc import Sequence
from .util import memory_database
from .util import MemoryDatabaseMixin
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
class MyCursor(sqlite.Cursor):
def __init__(self, *args, **kwargs):
sqlite.Cursor.__init__(self, *args, **kwargs)
self.row_factory = dict_factory
class ConnectionFactoryTests(unittest.TestCase):
def test_connection_factories(self):
class DefectFactory(sqlite.Connection):
def __init__(self, *args, **kwargs):
return None
class OkFactory(sqlite.Connection):
def __init__(self, *args, **kwargs):
sqlite.Connection.__init__(self, *args, **kwargs)
with memory_database(factory=OkFactory) as con:
self.assertIsInstance(con, OkFactory)
regex = "Base Connection.__init__ not called."
with self.assertRaisesRegex(sqlite.ProgrammingError, regex):
with memory_database(factory=DefectFactory) as con:
self.assertIsInstance(con, DefectFactory)
def test_connection_factory_relayed_call(self):
# gh-95132: keyword args must not be passed as positional args
class Factory(sqlite.Connection):
def __init__(self, *args, **kwargs):
kwargs["isolation_level"] = None
super(Factory, self).__init__(*args, **kwargs)
with memory_database(factory=Factory) as con:
self.assertIsNone(con.isolation_level)
self.assertIsInstance(con, Factory)
def test_connection_factory_as_positional_arg(self):
class Factory(sqlite.Connection):
def __init__(self, *args, **kwargs):
super(Factory, self).__init__(*args, **kwargs)
regex = (
r"Passing more than 1 positional argument to _sqlite3.Connection\(\) "
r"is deprecated. Parameters 'timeout', 'detect_types', "
r"'isolation_level', 'check_same_thread', 'factory', "
r"'cached_statements' and 'uri' will become keyword-only "
r"parameters in Python 3.15."
)
with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
with memory_database(5.0, 0, None, True, Factory) as con:
self.assertIsNone(con.isolation_level)
self.assertIsInstance(con, Factory)
self.assertEqual(cm.filename, __file__)
class CursorFactoryTests(MemoryDatabaseMixin, unittest.TestCase):
def test_is_instance(self):
cur = self.con.cursor()
self.assertIsInstance(cur, sqlite.Cursor)
cur = self.con.cursor(MyCursor)
self.assertIsInstance(cur, MyCursor)
cur = self.con.cursor(factory=lambda con: MyCursor(con))
self.assertIsInstance(cur, MyCursor)
def test_invalid_factory(self):
# not a callable at all
self.assertRaises(TypeError, self.con.cursor, None)
# invalid callable with not exact one argument
self.assertRaises(TypeError, self.con.cursor, lambda: None)
# invalid callable returning non-cursor
self.assertRaises(TypeError, self.con.cursor, lambda con: None)
class RowFactoryTestsBackwardsCompat(MemoryDatabaseMixin, unittest.TestCase):
def test_is_produced_by_factory(self):
cur = self.con.cursor(factory=MyCursor)
cur.execute("select 4+5 as foo")
row = cur.fetchone()
self.assertIsInstance(row, dict)
cur.close()
class RowFactoryTests(MemoryDatabaseMixin, unittest.TestCase):
def setUp(self):
super().setUp()
self.con.row_factory = sqlite.Row
def test_custom_factory(self):
self.con.row_factory = lambda cur, row: list(row)
row = self.con.execute("select 1, 2").fetchone()
self.assertIsInstance(row, list)
def test_sqlite_row_index(self):
row = self.con.execute("select 1 as a_1, 2 as b").fetchone()
self.assertIsInstance(row, sqlite.Row)
self.assertEqual(row["a_1"], 1, "by name: wrong result for column 'a_1'")
self.assertEqual(row["b"], 2, "by name: wrong result for column 'b'")
self.assertEqual(row["A_1"], 1, "by name: wrong result for column 'A_1'")
self.assertEqual(row["B"], 2, "by name: wrong result for column 'B'")
self.assertEqual(row[0], 1, "by index: wrong result for column 0")
self.assertEqual(row[1], 2, "by index: wrong result for column 1")
self.assertEqual(row[-1], 2, "by index: wrong result for column -1")
self.assertEqual(row[-2], 1, "by index: wrong result for column -2")
with self.assertRaises(IndexError):
row['c']
with self.assertRaises(IndexError):
row['a_\x11']
with self.assertRaises(IndexError):
row['a\x7f1']
with self.assertRaises(IndexError):
row[2]
with self.assertRaises(IndexError):
row[-3]
with self.assertRaises(IndexError):
row[2**1000]
with self.assertRaises(IndexError):
row[complex()] # index must be int or string
def test_sqlite_row_index_unicode(self):
row = self.con.execute("select 1 as \xff").fetchone()
self.assertEqual(row["\xff"], 1)
with self.assertRaises(IndexError):
row['\u0178']
with self.assertRaises(IndexError):
row['\xdf']
def test_sqlite_row_slice(self):
# A sqlite.Row can be sliced like a list.
row = self.con.execute("select 1, 2, 3, 4").fetchone()
self.assertEqual(row[0:0], ())
self.assertEqual(row[0:1], (1,))
self.assertEqual(row[1:3], (2, 3))
self.assertEqual(row[3:1], ())
# Explicit bounds are optional.
self.assertEqual(row[1:], (2, 3, 4))
self.assertEqual(row[:3], (1, 2, 3))
# Slices can use negative indices.
self.assertEqual(row[-2:-1], (3,))
self.assertEqual(row[-2:], (3, 4))
# Slicing supports steps.
self.assertEqual(row[0:4:2], (1, 3))
self.assertEqual(row[3:0:-2], (4, 2))
def test_sqlite_row_iter(self):
# Checks if the row object is iterable.
row = self.con.execute("select 1 as a, 2 as b").fetchone()
# Is iterable in correct order and produces valid results:
items = [col for col in row]
self.assertEqual(items, [1, 2])
# Is iterable the second time:
items = [col for col in row]
self.assertEqual(items, [1, 2])
def test_sqlite_row_as_tuple(self):
# Checks if the row object can be converted to a tuple.
row = self.con.execute("select 1 as a, 2 as b").fetchone()
t = tuple(row)
self.assertEqual(t, (row['a'], row['b']))
def test_sqlite_row_as_dict(self):
# Checks if the row object can be correctly converted to a dictionary.
row = self.con.execute("select 1 as a, 2 as b").fetchone()
d = dict(row)
self.assertEqual(d["a"], row["a"])
self.assertEqual(d["b"], row["b"])
def test_sqlite_row_hash_cmp(self):
# Checks if the row object compares and hashes correctly.
row_1 = self.con.execute("select 1 as a, 2 as b").fetchone()
row_2 = self.con.execute("select 1 as a, 2 as b").fetchone()
row_3 = self.con.execute("select 1 as a, 3 as b").fetchone()
row_4 = self.con.execute("select 1 as b, 2 as a").fetchone()
row_5 = self.con.execute("select 2 as b, 1 as a").fetchone()
self.assertTrue(row_1 == row_1)
self.assertTrue(row_1 == row_2)
self.assertFalse(row_1 == row_3)
self.assertFalse(row_1 == row_4)
self.assertFalse(row_1 == row_5)
self.assertFalse(row_1 == object())
self.assertFalse(row_1 != row_1)
self.assertFalse(row_1 != row_2)
self.assertTrue(row_1 != row_3)
self.assertTrue(row_1 != row_4)
self.assertTrue(row_1 != row_5)
self.assertTrue(row_1 != object())
with self.assertRaises(TypeError):
row_1 > row_2
with self.assertRaises(TypeError):
row_1 < row_2
with self.assertRaises(TypeError):
row_1 >= row_2
with self.assertRaises(TypeError):
row_1 <= row_2
self.assertEqual(hash(row_1), hash(row_2))
def test_sqlite_row_as_sequence(self):
# Checks if the row object can act like a sequence.
row = self.con.execute("select 1 as a, 2 as b").fetchone()
as_tuple = tuple(row)
self.assertEqual(list(reversed(row)), list(reversed(as_tuple)))
self.assertIsInstance(row, Sequence)
def test_sqlite_row_keys(self):
# Checks if the row object can return a list of columns as strings.
row = self.con.execute("select 1 as a, 2 as b").fetchone()
self.assertEqual(row.keys(), ['a', 'b'])
def test_fake_cursor_class(self):
# Issue #24257: Incorrect use of PyObject_IsInstance() caused
# segmentation fault.
# Issue #27861: Also applies for cursor factory.
class FakeCursor(str):
__class__ = sqlite.Cursor
self.assertRaises(TypeError, self.con.cursor, FakeCursor)
self.assertRaises(TypeError, sqlite.Row, FakeCursor(), ())
class TextFactoryTests(MemoryDatabaseMixin, unittest.TestCase):
def test_unicode(self):
austria = "Österreich"
row = self.con.execute("select ?", (austria,)).fetchone()
self.assertEqual(type(row[0]), str, "type of row[0] must be unicode")
def test_string(self):
self.con.text_factory = bytes
austria = "Österreich"
row = self.con.execute("select ?", (austria,)).fetchone()
self.assertEqual(type(row[0]), bytes, "type of row[0] must be bytes")
self.assertEqual(row[0], austria.encode("utf-8"), "column must equal original data in UTF-8")
def test_custom(self):
self.con.text_factory = lambda x: str(x, "utf-8", "ignore")
austria = "Österreich"
row = self.con.execute("select ?", (austria,)).fetchone()
self.assertEqual(type(row[0]), str, "type of row[0] must be unicode")
self.assertTrue(row[0].endswith("reich"), "column must contain original data")
class TextFactoryTestsWithEmbeddedZeroBytes(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
self.con.execute("create table test (value text)")
self.con.execute("insert into test (value) values (?)", ("a\x00b",))
def tearDown(self):
self.con.close()
def test_string(self):
# text_factory defaults to str
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), str)
self.assertEqual(row[0], "a\x00b")
def test_bytes(self):
self.con.text_factory = bytes
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), bytes)
self.assertEqual(row[0], b"a\x00b")
def test_bytearray(self):
self.con.text_factory = bytearray
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), bytearray)
self.assertEqual(row[0], b"a\x00b")
def test_custom(self):
# A custom factory should receive a bytes argument
self.con.text_factory = lambda x: x
row = self.con.execute("select value from test").fetchone()
self.assertIs(type(row[0]), bytes)
self.assertEqual(row[0], b"a\x00b")
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,369 @@
# pysqlite2/test/hooks.py: tests for various SQLite-specific hooks
#
# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the authors be held liable for any damages
# arising from the use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
# appreciated but is not required.
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
import contextlib
import sqlite3 as sqlite
import unittest
from test.support.os_helper import TESTFN, unlink
from .util import memory_database, cx_limit, with_tracebacks
from .util import MemoryDatabaseMixin
class CollationTests(MemoryDatabaseMixin, unittest.TestCase):
def test_create_collation_not_string(self):
with self.assertRaises(TypeError):
self.con.create_collation(None, lambda x, y: (x > y) - (x < y))
def test_create_collation_not_callable(self):
with self.assertRaises(TypeError) as cm:
self.con.create_collation("X", 42)
self.assertEqual(str(cm.exception), 'parameter must be callable')
def test_create_collation_not_ascii(self):
self.con.create_collation("collä", lambda x, y: (x > y) - (x < y))
def test_create_collation_bad_upper(self):
class BadUpperStr(str):
def upper(self):
return None
mycoll = lambda x, y: -((x > y) - (x < y))
self.con.create_collation(BadUpperStr("mycoll"), mycoll)
result = self.con.execute("""
select x from (
select 'a' as x
union
select 'b' as x
) order by x collate mycoll
""").fetchall()
self.assertEqual(result[0][0], 'b')
self.assertEqual(result[1][0], 'a')
def test_collation_is_used(self):
def mycoll(x, y):
# reverse order
return -((x > y) - (x < y))
self.con.create_collation("mycoll", mycoll)
sql = """
select x from (
select 'a' as x
union
select 'b' as x
union
select 'c' as x
) order by x collate mycoll
"""
result = self.con.execute(sql).fetchall()
self.assertEqual(result, [('c',), ('b',), ('a',)],
msg='the expected order was not returned')
self.con.create_collation("mycoll", None)
with self.assertRaises(sqlite.OperationalError) as cm:
result = self.con.execute(sql).fetchall()
self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll')
def test_collation_returns_large_integer(self):
def mycoll(x, y):
# reverse order
return -((x > y) - (x < y)) * 2**32
self.con.create_collation("mycoll", mycoll)
sql = """
select x from (
select 'a' as x
union
select 'b' as x
union
select 'c' as x
) order by x collate mycoll
"""
result = self.con.execute(sql).fetchall()
self.assertEqual(result, [('c',), ('b',), ('a',)],
msg="the expected order was not returned")
def test_collation_register_twice(self):
"""
Register two different collation functions under the same name.
Verify that the last one is actually used.
"""
con = self.con
con.create_collation("mycoll", lambda x, y: (x > y) - (x < y))
con.create_collation("mycoll", lambda x, y: -((x > y) - (x < y)))
result = con.execute("""
select x from (select 'a' as x union select 'b' as x) order by x collate mycoll
""").fetchall()
self.assertEqual(result[0][0], 'b')
self.assertEqual(result[1][0], 'a')
def test_deregister_collation(self):
"""
Register a collation, then deregister it. Make sure an error is raised if we try
to use it.
"""
con = self.con
con.create_collation("mycoll", lambda x, y: (x > y) - (x < y))
con.create_collation("mycoll", None)
with self.assertRaises(sqlite.OperationalError) as cm:
con.execute("select 'a' as x union select 'b' as x order by x collate mycoll")
self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll')
class ProgressTests(MemoryDatabaseMixin, unittest.TestCase):
def test_progress_handler_used(self):
"""
Test that the progress handler is invoked once it is set.
"""
progress_calls = []
def progress():
progress_calls.append(None)
return 0
self.con.set_progress_handler(progress, 1)
self.con.execute("""
create table foo(a, b)
""")
self.assertTrue(progress_calls)
def test_opcode_count(self):
"""
Test that the opcode argument is respected.
"""
con = self.con
progress_calls = []
def progress():
progress_calls.append(None)
return 0
con.set_progress_handler(progress, 1)
curs = con.cursor()
curs.execute("""
create table foo (a, b)
""")
first_count = len(progress_calls)
progress_calls = []
con.set_progress_handler(progress, 2)
curs.execute("""
create table bar (a, b)
""")
second_count = len(progress_calls)
self.assertGreaterEqual(first_count, second_count)
def test_cancel_operation(self):
"""
Test that returning a non-zero value stops the operation in progress.
"""
def progress():
return 1
self.con.set_progress_handler(progress, 1)
curs = self.con.cursor()
self.assertRaises(
sqlite.OperationalError,
curs.execute,
"create table bar (a, b)")
def test_clear_handler(self):
"""
Test that setting the progress handler to None clears the previously set handler.
"""
con = self.con
action = 0
def progress():
nonlocal action
action = 1
return 0
con.set_progress_handler(progress, 1)
con.set_progress_handler(None, 1)
con.execute("select 1 union select 2 union select 3").fetchall()
self.assertEqual(action, 0, "progress handler was not cleared")
@with_tracebacks(ZeroDivisionError, name="bad_progress")
def test_error_in_progress_handler(self):
def bad_progress():
1 / 0
self.con.set_progress_handler(bad_progress, 1)
with self.assertRaises(sqlite.OperationalError):
self.con.execute("""
create table foo(a, b)
""")
@with_tracebacks(ZeroDivisionError, name="bad_progress")
def test_error_in_progress_handler_result(self):
class BadBool:
def __bool__(self):
1 / 0
def bad_progress():
return BadBool()
self.con.set_progress_handler(bad_progress, 1)
with self.assertRaises(sqlite.OperationalError):
self.con.execute("""
create table foo(a, b)
""")
def test_progress_handler_keyword_args(self):
regex = (
r"Passing keyword argument 'progress_handler' to "
r"_sqlite3.Connection.set_progress_handler\(\) is deprecated. "
r"Parameter 'progress_handler' will become positional-only in "
r"Python 3.15."
)
with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
self.con.set_progress_handler(progress_handler=lambda: None, n=1)
self.assertEqual(cm.filename, __file__)
class TraceCallbackTests(MemoryDatabaseMixin, unittest.TestCase):
@contextlib.contextmanager
def check_stmt_trace(self, cx, expected):
try:
traced = []
cx.set_trace_callback(lambda stmt: traced.append(stmt))
yield
finally:
self.assertEqual(traced, expected)
cx.set_trace_callback(None)
def test_trace_callback_used(self):
"""
Test that the trace callback is invoked once it is set.
"""
traced_statements = []
def trace(statement):
traced_statements.append(statement)
self.con.set_trace_callback(trace)
self.con.execute("create table foo(a, b)")
self.assertTrue(traced_statements)
self.assertTrue(any("create table foo" in stmt for stmt in traced_statements))
def test_clear_trace_callback(self):
"""
Test that setting the trace callback to None clears the previously set callback.
"""
con = self.con
traced_statements = []
def trace(statement):
traced_statements.append(statement)
con.set_trace_callback(trace)
con.set_trace_callback(None)
con.execute("create table foo(a, b)")
self.assertFalse(traced_statements, "trace callback was not cleared")
def test_unicode_content(self):
"""
Test that the statement can contain unicode literals.
"""
unicode_value = '\xf6\xe4\xfc\xd6\xc4\xdc\xdf\u20ac'
con = self.con
traced_statements = []
def trace(statement):
traced_statements.append(statement)
con.set_trace_callback(trace)
con.execute("create table foo(x)")
con.execute("insert into foo(x) values ('%s')" % unicode_value)
con.commit()
self.assertTrue(any(unicode_value in stmt for stmt in traced_statements),
"Unicode data %s garbled in trace callback: %s"
% (ascii(unicode_value), ', '.join(map(ascii, traced_statements))))
def test_trace_callback_content(self):
# set_trace_callback() shouldn't produce duplicate content (bpo-26187)
traced_statements = []
def trace(statement):
traced_statements.append(statement)
queries = ["create table foo(x)",
"insert into foo(x) values(1)"]
self.addCleanup(unlink, TESTFN)
con1 = sqlite.connect(TESTFN, isolation_level=None)
con2 = sqlite.connect(TESTFN)
try:
con1.set_trace_callback(trace)
cur = con1.cursor()
cur.execute(queries[0])
con2.execute("create table bar(x)")
cur.execute(queries[1])
finally:
con1.close()
con2.close()
self.assertEqual(traced_statements, queries)
def test_trace_expanded_sql(self):
expected = [
"create table t(t)",
"BEGIN ",
"insert into t values(0)",
"insert into t values(1)",
"insert into t values(2)",
"COMMIT",
]
with memory_database() as cx, self.check_stmt_trace(cx, expected):
with cx:
cx.execute("create table t(t)")
cx.executemany("insert into t values(?)", ((v,) for v in range(3)))
@with_tracebacks(
sqlite.DataError,
regex="Expanded SQL string exceeds the maximum string length"
)
def test_trace_too_much_expanded_sql(self):
# If the expanded string is too large, we'll fall back to the
# unexpanded SQL statement.
# The resulting string length is limited by the runtime limit
# SQLITE_LIMIT_LENGTH.
template = "select 1 as a where a="
category = sqlite.SQLITE_LIMIT_LENGTH
with memory_database() as cx, cx_limit(cx, category=category) as lim:
ok_param = "a"
bad_param = "a" * lim
unexpanded_query = template + "?"
expected = [unexpanded_query]
with self.check_stmt_trace(cx, expected):
cx.execute(unexpanded_query, (bad_param,))
expanded_query = f"{template}'{ok_param}'"
with self.check_stmt_trace(cx, [expanded_query]):
cx.execute(unexpanded_query, (ok_param,))
@with_tracebacks(ZeroDivisionError, regex="division by zero")
def test_trace_bad_handler(self):
with memory_database() as cx:
cx.set_trace_callback(lambda stmt: 5/0)
cx.execute("select 1")
def test_trace_keyword_args(self):
regex = (
r"Passing keyword argument 'trace_callback' to "
r"_sqlite3.Connection.set_trace_callback\(\) is deprecated. "
r"Parameter 'trace_callback' will become positional-only in "
r"Python 3.15."
)
with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
self.con.set_trace_callback(trace_callback=lambda: None)
self.assertEqual(cm.filename, __file__)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,507 @@
# pysqlite2/test/regression.py: pysqlite regression tests
#
# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the authors be held liable for any damages
# arising from the use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
# appreciated but is not required.
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
import datetime
import unittest
import sqlite3 as sqlite
import weakref
import functools
from test import support
from unittest.mock import patch
from .util import memory_database, cx_limit
from .util import MemoryDatabaseMixin
class RegressionTests(MemoryDatabaseMixin, unittest.TestCase):
def test_pragma_user_version(self):
# This used to crash pysqlite because this pragma command returns NULL for the column name
cur = self.con.cursor()
cur.execute("pragma user_version")
def test_pragma_schema_version(self):
# This still crashed pysqlite <= 2.2.1
with memory_database(detect_types=sqlite.PARSE_COLNAMES) as con:
cur = self.con.cursor()
cur.execute("pragma schema_version")
def test_statement_reset(self):
# pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
# reset before a rollback, but only those that are still in the
# statement cache. The others are not accessible from the connection object.
with memory_database(cached_statements=5) as con:
cursors = [con.cursor() for x in range(5)]
cursors[0].execute("create table test(x)")
for i in range(10):
cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
for i in range(5):
cursors[i].execute(" " * i + "select x from test")
con.rollback()
def test_column_name_with_spaces(self):
cur = self.con.cursor()
cur.execute('select 1 as "foo bar [datetime]"')
self.assertEqual(cur.description[0][0], "foo bar [datetime]")
cur.execute('select 1 as "foo baz"')
self.assertEqual(cur.description[0][0], "foo baz")
def test_statement_finalization_on_close_db(self):
# pysqlite versions <= 2.3.3 only finalized statements in the statement
# cache when closing the database. statements that were still
# referenced in cursors weren't closed and could provoke "
# "OperationalError: Unable to close due to unfinalised statements".
cursors = []
# default statement cache size is 100
for i in range(105):
cur = self.con.cursor()
cursors.append(cur)
cur.execute("select 1 x union select " + str(i))
def test_on_conflict_rollback(self):
con = self.con
con.execute("create table foo(x, unique(x) on conflict rollback)")
con.execute("insert into foo(x) values (1)")
try:
con.execute("insert into foo(x) values (1)")
except sqlite.DatabaseError:
pass
con.execute("insert into foo(x) values (2)")
try:
con.commit()
except sqlite.OperationalError:
self.fail("pysqlite knew nothing about the implicit ROLLBACK")
def test_workaround_for_buggy_sqlite_transfer_bindings(self):
"""
pysqlite would crash with older SQLite versions unless
a workaround is implemented.
"""
self.con.execute("create table foo(bar)")
self.con.execute("drop table foo")
self.con.execute("create table foo(bar)")
def test_empty_statement(self):
"""
pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
for "no-operation" statements
"""
self.con.execute("")
def test_type_map_usage(self):
"""
pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
a statement. This test exhibits the problem.
"""
SELECT = "select * from foo"
with memory_database(detect_types=sqlite.PARSE_DECLTYPES) as con:
cur = con.cursor()
cur.execute("create table foo(bar timestamp)")
with self.assertWarnsRegex(DeprecationWarning, "adapter"):
cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
cur.execute(SELECT)
cur.execute("drop table foo")
cur.execute("create table foo(bar integer)")
cur.execute("insert into foo(bar) values (5)")
cur.execute(SELECT)
def test_bind_mutating_list(self):
# Issue41662: Crash when mutate a list of parameters during iteration.
class X:
def __conform__(self, protocol):
parameters.clear()
return "..."
parameters = [X(), 0]
with memory_database(detect_types=sqlite.PARSE_DECLTYPES) as con:
con.execute("create table foo(bar X, baz integer)")
# Should not crash
with self.assertRaises(IndexError):
con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
def test_error_msg_decode_error(self):
# When porting the module to Python 3.0, the error message about
# decoding errors disappeared. This verifies they're back again.
with self.assertRaises(sqlite.OperationalError) as cm:
self.con.execute("select 'xxx' || ? || 'yyy' colname",
(bytes(bytearray([250])),)).fetchone()
msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
self.assertIn(msg, str(cm.exception))
def test_register_adapter(self):
"""
See issue 3312.
"""
self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
def test_set_isolation_level(self):
# See issue 27881.
class CustomStr(str):
def upper(self):
return None
def __del__(self):
con.isolation_level = ""
con = self.con
con.isolation_level = None
for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
with self.subTest(level=level):
con.isolation_level = level
con.isolation_level = level.lower()
con.isolation_level = level.capitalize()
con.isolation_level = CustomStr(level)
# setting isolation_level failure should not alter previous state
con.isolation_level = None
con.isolation_level = "DEFERRED"
pairs = [
(1, TypeError), (b'', TypeError), ("abc", ValueError),
("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
]
for value, exc in pairs:
with self.subTest(level=value):
with self.assertRaises(exc):
con.isolation_level = value
self.assertEqual(con.isolation_level, "DEFERRED")
def test_cursor_constructor_call_check(self):
"""
Verifies that cursor methods check whether base class __init__ was
called.
"""
class Cursor(sqlite.Cursor):
def __init__(self, con):
pass
cur = Cursor(self.con)
with self.assertRaises(sqlite.ProgrammingError):
cur.execute("select 4+5").fetchall()
with self.assertRaisesRegex(sqlite.ProgrammingError,
r'^Base Cursor\.__init__ not called\.$'):
cur.close()
def test_str_subclass(self):
"""
The Python 3.0 port of the module didn't cope with values of subclasses of str.
"""
class MyStr(str): pass
self.con.execute("select ?", (MyStr("abc"),))
def test_connection_constructor_call_check(self):
"""
Verifies that connection methods check whether base class __init__ was
called.
"""
class Connection(sqlite.Connection):
def __init__(self, name):
pass
con = Connection(":memory:")
with self.assertRaises(sqlite.ProgrammingError):
cur = con.cursor()
def test_auto_commit(self):
"""
Verifies that creating a connection in autocommit mode works.
2.5.3 introduced a regression so that these could no longer
be created.
"""
with memory_database(isolation_level=None) as con:
self.assertIsNone(con.isolation_level)
self.assertFalse(con.in_transaction)
def test_pragma_autocommit(self):
"""
Verifies that running a PRAGMA statement that does an autocommit does
work. This did not work in 2.5.3/2.5.4.
"""
cur = self.con.cursor()
cur.execute("create table foo(bar)")
cur.execute("insert into foo(bar) values (5)")
cur.execute("pragma page_size")
row = cur.fetchone()
def test_connection_call(self):
"""
Call a connection with a non-string SQL request: check error handling
of the statement constructor.
"""
self.assertRaises(TypeError, self.con, b"select 1")
def test_collation(self):
def collation_cb(a, b):
return 1
self.assertRaises(UnicodeEncodeError, self.con.create_collation,
# Lone surrogate cannot be encoded to the default encoding (utf8)
"\uDC80", collation_cb)
def test_recursive_cursor_use(self):
"""
http://bugs.python.org/issue10811
Recursively using a cursor, such as when reusing it from a generator led to segfaults.
Now we catch recursive cursor usage and raise a ProgrammingError.
"""
cur = self.con.cursor()
cur.execute("create table a (bar)")
cur.execute("create table b (baz)")
def foo():
cur.execute("insert into a (bar) values (?)", (1,))
yield 1
with self.assertRaises(sqlite.ProgrammingError):
cur.executemany("insert into b (baz) values (?)",
((i,) for i in foo()))
def test_convert_timestamp_microsecond_padding(self):
"""
http://bugs.python.org/issue14720
The microsecond parsing of convert_timestamp() should pad with zeros,
since the microsecond string "456" actually represents "456000".
"""
with memory_database(detect_types=sqlite.PARSE_DECLTYPES) as con:
cur = con.cursor()
cur.execute("CREATE TABLE t (x TIMESTAMP)")
# Microseconds should be 456000
cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
# Microseconds should be truncated to 123456
cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
cur.execute("SELECT * FROM t")
with self.assertWarnsRegex(DeprecationWarning, "converter"):
values = [x[0] for x in cur.fetchall()]
self.assertEqual(values, [
datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
])
def test_invalid_isolation_level_type(self):
# isolation level is a string, not an integer
regex = "isolation_level must be str or None"
with self.assertRaisesRegex(TypeError, regex):
memory_database(isolation_level=123).__enter__()
def test_null_character(self):
# Issue #21147
cur = self.con.cursor()
queries = ["\0select 1", "select 1\0"]
for query in queries:
with self.subTest(query=query):
self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
self.con.execute, query)
with self.subTest(query=query):
self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
cur.execute, query)
def test_surrogates(self):
con = self.con
self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'")
self.assertRaises(UnicodeEncodeError, con, "select '\udcff'")
cur = con.cursor()
self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'")
self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'")
def test_large_sql(self):
msg = "query string is too large"
with memory_database() as cx, cx_limit(cx) as lim:
cu = cx.cursor()
cx("select 1".ljust(lim))
# use a different SQL statement; don't reuse from the LRU cache
cu.execute("select 2".ljust(lim))
sql = "select 3".ljust(lim+1)
self.assertRaisesRegex(sqlite.DataError, msg, cx, sql)
self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql)
def test_commit_cursor_reset(self):
"""
Connection.commit() did reset cursors, which made sqlite3
to return rows multiple times when fetched from cursors
after commit. See issues 10513 and 23129 for details.
"""
con = self.con
con.executescript("""
create table t(c);
create table t2(c);
insert into t values(0);
insert into t values(1);
insert into t values(2);
""")
self.assertEqual(con.isolation_level, "")
counter = 0
for i, row in enumerate(con.execute("select c from t")):
with self.subTest(i=i, row=row):
con.execute("insert into t2(c) values (?)", (i,))
con.commit()
if counter == 0:
self.assertEqual(row[0], 0)
elif counter == 1:
self.assertEqual(row[0], 1)
elif counter == 2:
self.assertEqual(row[0], 2)
counter += 1
self.assertEqual(counter, 3, "should have returned exactly three rows")
def test_bpo31770(self):
"""
The interpreter shouldn't crash in case Cursor.__init__() is called
more than once.
"""
def callback(*args):
pass
cur = sqlite.Cursor(self.con)
ref = weakref.ref(cur, callback)
cur.__init__(self.con)
del cur
# The interpreter shouldn't crash when ref is collected.
del ref
support.gc_collect()
def test_del_isolation_level_segfault(self):
with self.assertRaises(AttributeError):
del self.con.isolation_level
def test_bpo37347(self):
class Printer:
def log(self, *args):
return sqlite.SQLITE_OK
for method in [self.con.set_trace_callback,
functools.partial(self.con.set_progress_handler, n=1),
self.con.set_authorizer]:
printer_instance = Printer()
method(printer_instance.log)
method(printer_instance.log)
self.con.execute("select 1") # trigger seg fault
method(None)
def test_return_empty_bytestring(self):
cur = self.con.execute("select X''")
val = cur.fetchone()[0]
self.assertEqual(val, b'')
def test_table_lock_cursor_replace_stmt(self):
with memory_database() as con:
con = self.con
cur = con.cursor()
cur.execute("create table t(t)")
cur.executemany("insert into t values(?)",
((v,) for v in range(5)))
con.commit()
cur.execute("select t from t")
cur.execute("drop table t")
con.commit()
def test_table_lock_cursor_dealloc(self):
with memory_database() as con:
con.execute("create table t(t)")
con.executemany("insert into t values(?)",
((v,) for v in range(5)))
con.commit()
cur = con.execute("select t from t")
del cur
support.gc_collect()
con.execute("drop table t")
con.commit()
def test_table_lock_cursor_non_readonly_select(self):
with memory_database() as con:
con.execute("create table t(t)")
con.executemany("insert into t values(?)",
((v,) for v in range(5)))
con.commit()
def dup(v):
con.execute("insert into t values(?)", (v,))
return
con.create_function("dup", 1, dup)
cur = con.execute("select dup(t) from t")
del cur
support.gc_collect()
con.execute("drop table t")
con.commit()
def test_executescript_step_through_select(self):
with memory_database() as con:
values = [(v,) for v in range(5)]
with con:
con.execute("create table t(t)")
con.executemany("insert into t values(?)", values)
steps = []
con.create_function("step", 1, lambda x: steps.append((x,)))
con.executescript("select step(t) from t")
self.assertEqual(steps, values)
class RecursiveUseOfCursors(unittest.TestCase):
# GH-80254: sqlite3 should not segfault for recursive use of cursors.
msg = "Recursive use of cursors not allowed"
def setUp(self):
self.con = sqlite.connect(":memory:",
detect_types=sqlite.PARSE_COLNAMES)
self.cur = self.con.cursor()
self.cur.execute("create table test(x foo)")
self.cur.executemany("insert into test(x) values (?)",
[("foo",), ("bar",)])
def tearDown(self):
self.cur.close()
self.con.close()
def test_recursive_cursor_init(self):
conv = lambda x: self.cur.__init__(self.con)
with patch.dict(sqlite.converters, {"INIT": conv}):
self.cur.execute('select x as "x [INIT]", x from test')
self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
self.cur.fetchall)
def test_recursive_cursor_close(self):
conv = lambda x: self.cur.close()
with patch.dict(sqlite.converters, {"CLOSE": conv}):
self.cur.execute('select x as "x [CLOSE]", x from test')
self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
self.cur.fetchall)
def test_recursive_cursor_iter(self):
conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None)
with patch.dict(sqlite.converters, {"ITER": conv}):
self.cur.execute('select x as "x [ITER]", x from test')
self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
self.cur.fetchall)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,528 @@
# pysqlite2/test/transactions.py: tests transactions
#
# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the authors be held liable for any damages
# arising from the use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
# appreciated but is not required.
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
import unittest
import sqlite3 as sqlite
from contextlib import contextmanager
from test.support.os_helper import TESTFN, unlink
from test.support.script_helper import assert_python_ok
from .util import memory_database
from .util import MemoryDatabaseMixin
class TransactionTests(unittest.TestCase):
def setUp(self):
# We can disable the busy handlers, since we control
# the order of SQLite C API operations.
self.con1 = sqlite.connect(TESTFN, timeout=0)
self.cur1 = self.con1.cursor()
self.con2 = sqlite.connect(TESTFN, timeout=0)
self.cur2 = self.con2.cursor()
def tearDown(self):
try:
self.cur1.close()
self.con1.close()
self.cur2.close()
self.con2.close()
finally:
unlink(TESTFN)
def test_dml_does_not_auto_commit_before(self):
self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)")
self.cur1.execute("create table test2(j)")
self.cur2.execute("select i from test")
res = self.cur2.fetchall()
self.assertEqual(len(res), 0)
def test_insert_starts_transaction(self):
self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)")
self.cur2.execute("select i from test")
res = self.cur2.fetchall()
self.assertEqual(len(res), 0)
def test_update_starts_transaction(self):
self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)")
self.con1.commit()
self.cur1.execute("update test set i=6")
self.cur2.execute("select i from test")
res = self.cur2.fetchone()[0]
self.assertEqual(res, 5)
def test_delete_starts_transaction(self):
self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)")
self.con1.commit()
self.cur1.execute("delete from test")
self.cur2.execute("select i from test")
res = self.cur2.fetchall()
self.assertEqual(len(res), 1)
def test_replace_starts_transaction(self):
self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)")
self.con1.commit()
self.cur1.execute("replace into test(i) values (6)")
self.cur2.execute("select i from test")
res = self.cur2.fetchall()
self.assertEqual(len(res), 1)
self.assertEqual(res[0][0], 5)
def test_toggle_auto_commit(self):
self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)")
self.con1.isolation_level = None
self.assertEqual(self.con1.isolation_level, None)
self.cur2.execute("select i from test")
res = self.cur2.fetchall()
self.assertEqual(len(res), 1)
self.con1.isolation_level = "DEFERRED"
self.assertEqual(self.con1.isolation_level , "DEFERRED")
self.cur1.execute("insert into test(i) values (5)")
self.cur2.execute("select i from test")
res = self.cur2.fetchall()
self.assertEqual(len(res), 1)
def test_raise_timeout(self):
self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)")
with self.assertRaises(sqlite.OperationalError):
self.cur2.execute("insert into test(i) values (5)")
def test_locking(self):
# This tests the improved concurrency with pysqlite 2.3.4. You needed
# to roll back con2 before you could commit con1.
self.cur1.execute("create table test(i)")
self.cur1.execute("insert into test(i) values (5)")
with self.assertRaises(sqlite.OperationalError):
self.cur2.execute("insert into test(i) values (5)")
# NO self.con2.rollback() HERE!!!
self.con1.commit()
def test_rollback_cursor_consistency(self):
"""Check that cursors behave correctly after rollback."""
with memory_database() as con:
cur = con.cursor()
cur.execute("create table test(x)")
cur.execute("insert into test(x) values (5)")
cur.execute("select 1 union select 2 union select 3")
con.rollback()
self.assertEqual(cur.fetchall(), [(1,), (2,), (3,)])
def test_multiple_cursors_and_iternext(self):
# gh-94028: statements are cleared and reset in cursor iternext.
# Provoke the gh-94028 by using a cursor cache.
CURSORS = {}
def sql(cx, sql, *args):
cu = cx.cursor()
cu.execute(sql, args)
CURSORS[id(sql)] = cu
return cu
self.con1.execute("create table t(t)")
sql(self.con1, "insert into t values (?), (?), (?)", "u1", "u2", "u3")
self.con1.commit()
# On second connection, verify rows are visible, then delete them.
count = sql(self.con2, "select count(*) from t").fetchone()[0]
self.assertEqual(count, 3)
changes = sql(self.con2, "delete from t").rowcount
self.assertEqual(changes, 3)
self.con2.commit()
# Back in original connection, create 2 new users.
sql(self.con1, "insert into t values (?)", "u4")
sql(self.con1, "insert into t values (?)", "u5")
# The second connection cannot see uncommitted changes.
count = sql(self.con2, "select count(*) from t").fetchone()[0]
self.assertEqual(count, 0)
# First connection can see its own changes.
count = sql(self.con1, "select count(*) from t").fetchone()[0]
self.assertEqual(count, 2)
# The second connection can now see the changes.
self.con1.commit()
count = sql(self.con2, "select count(*) from t").fetchone()[0]
self.assertEqual(count, 2)
class RollbackTests(unittest.TestCase):
"""bpo-44092: sqlite3 now leaves it to SQLite to resolve rollback issues"""
def setUp(self):
self.con = sqlite.connect(":memory:")
self.cur1 = self.con.cursor()
self.cur2 = self.con.cursor()
with self.con:
self.con.execute("create table t(c)");
self.con.executemany("insert into t values(?)", [(0,), (1,), (2,)])
self.cur1.execute("begin transaction")
select = "select c from t"
self.cur1.execute(select)
self.con.rollback()
self.res = self.cur2.execute(select) # Reusing stmt from cache
def tearDown(self):
self.con.close()
def _check_rows(self):
for i, row in enumerate(self.res):
self.assertEqual(row[0], i)
def test_no_duplicate_rows_after_rollback_del_cursor(self):
del self.cur1
self._check_rows()
def test_no_duplicate_rows_after_rollback_close_cursor(self):
self.cur1.close()
self._check_rows()
def test_no_duplicate_rows_after_rollback_new_query(self):
self.cur1.execute("select c from t where c = 1")
self._check_rows()
class SpecialCommandTests(MemoryDatabaseMixin, unittest.TestCase):
def test_drop_table(self):
self.cur.execute("create table test(i)")
self.cur.execute("insert into test(i) values (5)")
self.cur.execute("drop table test")
def test_pragma(self):
self.cur.execute("create table test(i)")
self.cur.execute("insert into test(i) values (5)")
self.cur.execute("pragma count_changes=1")
class TransactionalDDL(MemoryDatabaseMixin, unittest.TestCase):
def test_ddl_does_not_autostart_transaction(self):
# For backwards compatibility reasons, DDL statements should not
# implicitly start a transaction.
self.con.execute("create table test(i)")
self.con.rollback()
result = self.con.execute("select * from test").fetchall()
self.assertEqual(result, [])
def test_immediate_transactional_ddl(self):
# You can achieve transactional DDL by issuing a BEGIN
# statement manually.
self.con.execute("begin immediate")
self.con.execute("create table test(i)")
self.con.rollback()
with self.assertRaises(sqlite.OperationalError):
self.con.execute("select * from test")
def test_transactional_ddl(self):
# You can achieve transactional DDL by issuing a BEGIN
# statement manually.
self.con.execute("begin")
self.con.execute("create table test(i)")
self.con.rollback()
with self.assertRaises(sqlite.OperationalError):
self.con.execute("select * from test")
class IsolationLevelFromInit(unittest.TestCase):
CREATE = "create table t(t)"
INSERT = "insert into t values(1)"
def setUp(self):
self.traced = []
def _run_test(self, cx):
cx.execute(self.CREATE)
cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
with cx:
cx.execute(self.INSERT)
def test_isolation_level_default(self):
with memory_database() as cx:
self._run_test(cx)
self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
def test_isolation_level_begin(self):
with memory_database(isolation_level="") as cx:
self._run_test(cx)
self.assertEqual(self.traced, ["BEGIN ", self.INSERT, "COMMIT"])
def test_isolation_level_deferred(self):
with memory_database(isolation_level="DEFERRED") as cx:
self._run_test(cx)
self.assertEqual(self.traced, ["BEGIN DEFERRED", self.INSERT, "COMMIT"])
def test_isolation_level_immediate(self):
with memory_database(isolation_level="IMMEDIATE") as cx:
self._run_test(cx)
self.assertEqual(self.traced,
["BEGIN IMMEDIATE", self.INSERT, "COMMIT"])
def test_isolation_level_exclusive(self):
with memory_database(isolation_level="EXCLUSIVE") as cx:
self._run_test(cx)
self.assertEqual(self.traced,
["BEGIN EXCLUSIVE", self.INSERT, "COMMIT"])
def test_isolation_level_none(self):
with memory_database(isolation_level=None) as cx:
self._run_test(cx)
self.assertEqual(self.traced, [self.INSERT])
class IsolationLevelPostInit(unittest.TestCase):
QUERY = "insert into t values(1)"
def setUp(self):
self.cx = sqlite.connect(":memory:")
self.cx.execute("create table t(t)")
self.traced = []
self.cx.set_trace_callback(lambda stmt: self.traced.append(stmt))
def tearDown(self):
self.cx.close()
def test_isolation_level_default(self):
with self.cx:
self.cx.execute(self.QUERY)
self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
def test_isolation_level_begin(self):
self.cx.isolation_level = ""
with self.cx:
self.cx.execute(self.QUERY)
self.assertEqual(self.traced, ["BEGIN ", self.QUERY, "COMMIT"])
def test_isolation_level_deferrred(self):
self.cx.isolation_level = "DEFERRED"
with self.cx:
self.cx.execute(self.QUERY)
self.assertEqual(self.traced, ["BEGIN DEFERRED", self.QUERY, "COMMIT"])
def test_isolation_level_immediate(self):
self.cx.isolation_level = "IMMEDIATE"
with self.cx:
self.cx.execute(self.QUERY)
self.assertEqual(self.traced,
["BEGIN IMMEDIATE", self.QUERY, "COMMIT"])
def test_isolation_level_exclusive(self):
self.cx.isolation_level = "EXCLUSIVE"
with self.cx:
self.cx.execute(self.QUERY)
self.assertEqual(self.traced,
["BEGIN EXCLUSIVE", self.QUERY, "COMMIT"])
def test_isolation_level_none(self):
self.cx.isolation_level = None
with self.cx:
self.cx.execute(self.QUERY)
self.assertEqual(self.traced, [self.QUERY])
class AutocommitAttribute(unittest.TestCase):
"""Test PEP 249-compliant autocommit behaviour."""
legacy = sqlite.LEGACY_TRANSACTION_CONTROL
@contextmanager
def check_stmt_trace(self, cx, expected, reset=True):
try:
traced = []
cx.set_trace_callback(lambda stmt: traced.append(stmt))
yield
finally:
self.assertEqual(traced, expected)
if reset:
cx.set_trace_callback(None)
def test_autocommit_default(self):
with memory_database() as cx:
self.assertEqual(cx.autocommit,
sqlite.LEGACY_TRANSACTION_CONTROL)
def test_autocommit_setget(self):
dataset = (
True,
False,
sqlite.LEGACY_TRANSACTION_CONTROL,
)
for mode in dataset:
with self.subTest(mode=mode):
with memory_database(autocommit=mode) as cx:
self.assertEqual(cx.autocommit, mode)
with memory_database() as cx:
cx.autocommit = mode
self.assertEqual(cx.autocommit, mode)
def test_autocommit_setget_invalid(self):
msg = "autocommit must be True, False, or.*LEGACY"
for mode in "a", 12, (), None:
with self.subTest(mode=mode):
with self.assertRaisesRegex(ValueError, msg):
sqlite.connect(":memory:", autocommit=mode)
def test_autocommit_disabled(self):
expected = [
"SELECT 1",
"COMMIT",
"BEGIN",
"ROLLBACK",
"BEGIN",
]
with memory_database(autocommit=False) as cx:
self.assertTrue(cx.in_transaction)
with self.check_stmt_trace(cx, expected):
cx.execute("SELECT 1")
cx.commit()
cx.rollback()
def test_autocommit_disabled_implicit_rollback(self):
expected = ["ROLLBACK"]
with memory_database(autocommit=False) as cx:
self.assertTrue(cx.in_transaction)
with self.check_stmt_trace(cx, expected, reset=False):
cx.close()
def test_autocommit_enabled(self):
expected = ["CREATE TABLE t(t)", "INSERT INTO t VALUES(1)"]
with memory_database(autocommit=True) as cx:
self.assertFalse(cx.in_transaction)
with self.check_stmt_trace(cx, expected):
cx.execute("CREATE TABLE t(t)")
cx.execute("INSERT INTO t VALUES(1)")
self.assertFalse(cx.in_transaction)
def test_autocommit_enabled_txn_ctl(self):
for op in "commit", "rollback":
with self.subTest(op=op):
with memory_database(autocommit=True) as cx:
meth = getattr(cx, op)
self.assertFalse(cx.in_transaction)
with self.check_stmt_trace(cx, []):
meth() # expect this to pass silently
self.assertFalse(cx.in_transaction)
def test_autocommit_disabled_then_enabled(self):
expected = ["COMMIT"]
with memory_database(autocommit=False) as cx:
self.assertTrue(cx.in_transaction)
with self.check_stmt_trace(cx, expected):
cx.autocommit = True # should commit
self.assertFalse(cx.in_transaction)
def test_autocommit_enabled_then_disabled(self):
expected = ["BEGIN"]
with memory_database(autocommit=True) as cx:
self.assertFalse(cx.in_transaction)
with self.check_stmt_trace(cx, expected):
cx.autocommit = False # should begin
self.assertTrue(cx.in_transaction)
def test_autocommit_explicit_then_disabled(self):
expected = ["BEGIN DEFERRED"]
with memory_database(autocommit=True) as cx:
self.assertFalse(cx.in_transaction)
with self.check_stmt_trace(cx, expected):
cx.execute("BEGIN DEFERRED")
cx.autocommit = False # should now be a no-op
self.assertTrue(cx.in_transaction)
def test_autocommit_enabled_ctx_mgr(self):
with memory_database(autocommit=True) as cx:
# The context manager is a no-op if autocommit=True
with self.check_stmt_trace(cx, []):
with cx:
self.assertFalse(cx.in_transaction)
self.assertFalse(cx.in_transaction)
def test_autocommit_disabled_ctx_mgr(self):
expected = ["COMMIT", "BEGIN"]
with memory_database(autocommit=False) as cx:
with self.check_stmt_trace(cx, expected):
with cx:
self.assertTrue(cx.in_transaction)
self.assertTrue(cx.in_transaction)
def test_autocommit_compat_ctx_mgr(self):
expected = ["BEGIN ", "INSERT INTO T VALUES(1)", "COMMIT"]
with memory_database(autocommit=self.legacy) as cx:
cx.execute("create table t(t)")
with self.check_stmt_trace(cx, expected):
with cx:
self.assertFalse(cx.in_transaction)
cx.execute("INSERT INTO T VALUES(1)")
self.assertTrue(cx.in_transaction)
self.assertFalse(cx.in_transaction)
def test_autocommit_enabled_executescript(self):
expected = ["BEGIN", "SELECT 1"]
with memory_database(autocommit=True) as cx:
with self.check_stmt_trace(cx, expected):
self.assertFalse(cx.in_transaction)
cx.execute("BEGIN")
cx.executescript("SELECT 1")
self.assertTrue(cx.in_transaction)
def test_autocommit_disabled_executescript(self):
expected = ["SELECT 1"]
with memory_database(autocommit=False) as cx:
with self.check_stmt_trace(cx, expected):
self.assertTrue(cx.in_transaction)
cx.executescript("SELECT 1")
self.assertTrue(cx.in_transaction)
def test_autocommit_compat_executescript(self):
expected = ["BEGIN", "COMMIT", "SELECT 1"]
with memory_database(autocommit=self.legacy) as cx:
with self.check_stmt_trace(cx, expected):
self.assertFalse(cx.in_transaction)
cx.execute("BEGIN")
cx.executescript("SELECT 1")
self.assertFalse(cx.in_transaction)
def test_autocommit_disabled_implicit_shutdown(self):
# The implicit ROLLBACK should not call back into Python during
# interpreter tear-down.
code = """if 1:
import sqlite3
cx = sqlite3.connect(":memory:", autocommit=False)
cx.set_trace_callback(print)
"""
assert_python_ok("-c", code, PYTHONIOENCODING="utf-8")
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,547 @@
# pysqlite2/test/types.py: tests for type conversion and detection
#
# Copyright (C) 2005 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the authors be held liable for any damages
# arising from the use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
# appreciated but is not required.
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
import datetime
import unittest
import sqlite3 as sqlite
import sys
try:
import zlib
except ImportError:
zlib = None
from test import support
class SqliteTypeTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
self.cur = self.con.cursor()
self.cur.execute("create table test(i integer, s varchar, f number, b blob)")
def tearDown(self):
self.cur.close()
self.con.close()
def test_string(self):
self.cur.execute("insert into test(s) values (?)", ("Österreich",))
self.cur.execute("select s from test")
row = self.cur.fetchone()
self.assertEqual(row[0], "Österreich")
def test_string_with_null_character(self):
self.cur.execute("insert into test(s) values (?)", ("a\0b",))
self.cur.execute("select s from test")
row = self.cur.fetchone()
self.assertEqual(row[0], "a\0b")
def test_small_int(self):
self.cur.execute("insert into test(i) values (?)", (42,))
self.cur.execute("select i from test")
row = self.cur.fetchone()
self.assertEqual(row[0], 42)
def test_large_int(self):
num = 123456789123456789
self.cur.execute("insert into test(i) values (?)", (num,))
self.cur.execute("select i from test")
row = self.cur.fetchone()
self.assertEqual(row[0], num)
def test_float(self):
val = 3.14
self.cur.execute("insert into test(f) values (?)", (val,))
self.cur.execute("select f from test")
row = self.cur.fetchone()
self.assertEqual(row[0], val)
def test_blob(self):
sample = b"Guglhupf"
val = memoryview(sample)
self.cur.execute("insert into test(b) values (?)", (val,))
self.cur.execute("select b from test")
row = self.cur.fetchone()
self.assertEqual(row[0], sample)
def test_unicode_execute(self):
self.cur.execute("select 'Österreich'")
row = self.cur.fetchone()
self.assertEqual(row[0], "Österreich")
def test_too_large_int(self):
for value in 2**63, -2**63-1, 2**64:
with self.assertRaises(OverflowError):
self.cur.execute("insert into test(i) values (?)", (value,))
self.cur.execute("select i from test")
row = self.cur.fetchone()
self.assertIsNone(row)
def test_string_with_surrogates(self):
for value in 0xd8ff, 0xdcff:
with self.assertRaises(UnicodeEncodeError):
self.cur.execute("insert into test(s) values (?)", (chr(value),))
self.cur.execute("select s from test")
row = self.cur.fetchone()
self.assertIsNone(row)
@unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
@support.bigmemtest(size=2**31, memuse=4, dry_run=False)
def test_too_large_string(self, maxsize):
with self.assertRaises(sqlite.DataError):
self.cur.execute("insert into test(s) values (?)", ('x'*(2**31-1),))
with self.assertRaises(sqlite.DataError):
self.cur.execute("insert into test(s) values (?)", ('x'*(2**31),))
self.cur.execute("select 1 from test")
row = self.cur.fetchone()
self.assertIsNone(row)
@unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
@support.bigmemtest(size=2**31, memuse=3, dry_run=False)
def test_too_large_blob(self, maxsize):
with self.assertRaises(sqlite.DataError):
self.cur.execute("insert into test(s) values (?)", (b'x'*(2**31-1),))
with self.assertRaises(sqlite.DataError):
self.cur.execute("insert into test(s) values (?)", (b'x'*(2**31),))
self.cur.execute("select 1 from test")
row = self.cur.fetchone()
self.assertIsNone(row)
class DeclTypesTests(unittest.TestCase):
class Foo:
def __init__(self, _val):
if isinstance(_val, bytes):
# sqlite3 always calls __init__ with a bytes created from a
# UTF-8 string when __conform__ was used to store the object.
_val = _val.decode('utf-8')
self.val = _val
def __eq__(self, other):
if not isinstance(other, DeclTypesTests.Foo):
return NotImplemented
return self.val == other.val
def __conform__(self, protocol):
if protocol is sqlite.PrepareProtocol:
return self.val
else:
return None
def __str__(self):
return "<%s>" % self.val
class BadConform:
def __init__(self, exc):
self.exc = exc
def __conform__(self, protocol):
raise self.exc
def setUp(self):
self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
self.cur = self.con.cursor()
self.cur.execute("""
create table test(
i int,
s str,
f float,
b bool,
u unicode,
foo foo,
bin blob,
n1 number,
n2 number(5),
bad bad,
cbin cblob)
""")
# override float, make them always return the same number
sqlite.converters["FLOAT"] = lambda x: 47.2
# and implement two custom ones
sqlite.converters["BOOL"] = lambda x: bool(int(x))
sqlite.converters["FOO"] = DeclTypesTests.Foo
sqlite.converters["BAD"] = DeclTypesTests.BadConform
sqlite.converters["WRONG"] = lambda x: "WRONG"
sqlite.converters["NUMBER"] = float
sqlite.converters["CBLOB"] = lambda x: b"blobish"
def tearDown(self):
del sqlite.converters["FLOAT"]
del sqlite.converters["BOOL"]
del sqlite.converters["FOO"]
del sqlite.converters["BAD"]
del sqlite.converters["WRONG"]
del sqlite.converters["NUMBER"]
del sqlite.converters["CBLOB"]
self.cur.close()
self.con.close()
def test_string(self):
# default
self.cur.execute("insert into test(s) values (?)", ("foo",))
self.cur.execute('select s as "s [WRONG]" from test')
row = self.cur.fetchone()
self.assertEqual(row[0], "foo")
def test_small_int(self):
# default
self.cur.execute("insert into test(i) values (?)", (42,))
self.cur.execute("select i from test")
row = self.cur.fetchone()
self.assertEqual(row[0], 42)
def test_large_int(self):
# default
num = 123456789123456789
self.cur.execute("insert into test(i) values (?)", (num,))
self.cur.execute("select i from test")
row = self.cur.fetchone()
self.assertEqual(row[0], num)
def test_float(self):
# custom
val = 3.14
self.cur.execute("insert into test(f) values (?)", (val,))
self.cur.execute("select f from test")
row = self.cur.fetchone()
self.assertEqual(row[0], 47.2)
def test_bool(self):
# custom
self.cur.execute("insert into test(b) values (?)", (False,))
self.cur.execute("select b from test")
row = self.cur.fetchone()
self.assertIs(row[0], False)
self.cur.execute("delete from test")
self.cur.execute("insert into test(b) values (?)", (True,))
self.cur.execute("select b from test")
row = self.cur.fetchone()
self.assertIs(row[0], True)
def test_unicode(self):
# default
val = "\xd6sterreich"
self.cur.execute("insert into test(u) values (?)", (val,))
self.cur.execute("select u from test")
row = self.cur.fetchone()
self.assertEqual(row[0], val)
def test_foo(self):
val = DeclTypesTests.Foo("bla")
self.cur.execute("insert into test(foo) values (?)", (val,))
self.cur.execute("select foo from test")
row = self.cur.fetchone()
self.assertEqual(row[0], val)
def test_error_in_conform(self):
val = DeclTypesTests.BadConform(TypeError)
with self.assertRaises(sqlite.ProgrammingError):
self.cur.execute("insert into test(bad) values (?)", (val,))
with self.assertRaises(sqlite.ProgrammingError):
self.cur.execute("insert into test(bad) values (:val)", {"val": val})
val = DeclTypesTests.BadConform(KeyboardInterrupt)
with self.assertRaises(KeyboardInterrupt):
self.cur.execute("insert into test(bad) values (?)", (val,))
with self.assertRaises(KeyboardInterrupt):
self.cur.execute("insert into test(bad) values (:val)", {"val": val})
def test_unsupported_seq(self):
class Bar: pass
val = Bar()
with self.assertRaises(sqlite.ProgrammingError):
self.cur.execute("insert into test(f) values (?)", (val,))
def test_unsupported_dict(self):
class Bar: pass
val = Bar()
with self.assertRaises(sqlite.ProgrammingError):
self.cur.execute("insert into test(f) values (:val)", {"val": val})
def test_blob(self):
# default
sample = b"Guglhupf"
val = memoryview(sample)
self.cur.execute("insert into test(bin) values (?)", (val,))
self.cur.execute("select bin from test")
row = self.cur.fetchone()
self.assertEqual(row[0], sample)
def test_number1(self):
self.cur.execute("insert into test(n1) values (5)")
value = self.cur.execute("select n1 from test").fetchone()[0]
# if the converter is not used, it's an int instead of a float
self.assertEqual(type(value), float)
def test_number2(self):
"""Checks whether converter names are cut off at '(' characters"""
self.cur.execute("insert into test(n2) values (5)")
value = self.cur.execute("select n2 from test").fetchone()[0]
# if the converter is not used, it's an int instead of a float
self.assertEqual(type(value), float)
def test_convert_zero_sized_blob(self):
self.con.execute("insert into test(cbin) values (?)", (b"",))
cur = self.con.execute("select cbin from test")
# Zero-sized blobs with converters returns None. This differs from
# blobs without a converter, where b"" is returned.
self.assertIsNone(cur.fetchone()[0])
class ColNamesTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
self.cur = self.con.cursor()
self.cur.execute("create table test(x foo)")
sqlite.converters["FOO"] = lambda x: "[%s]" % x.decode("ascii")
sqlite.converters["BAR"] = lambda x: "<%s>" % x.decode("ascii")
sqlite.converters["EXC"] = lambda x: 5/0
sqlite.converters["B1B1"] = lambda x: "MARKER"
def tearDown(self):
del sqlite.converters["FOO"]
del sqlite.converters["BAR"]
del sqlite.converters["EXC"]
del sqlite.converters["B1B1"]
self.cur.close()
self.con.close()
def test_decl_type_not_used(self):
"""
Assures that the declared type is not used when PARSE_DECLTYPES
is not set.
"""
self.cur.execute("insert into test(x) values (?)", ("xxx",))
self.cur.execute("select x from test")
val = self.cur.fetchone()[0]
self.assertEqual(val, "xxx")
def test_none(self):
self.cur.execute("insert into test(x) values (?)", (None,))
self.cur.execute("select x from test")
val = self.cur.fetchone()[0]
self.assertEqual(val, None)
def test_col_name(self):
self.cur.execute("insert into test(x) values (?)", ("xxx",))
self.cur.execute('select x as "x y [bar]" from test')
val = self.cur.fetchone()[0]
self.assertEqual(val, "<xxx>")
# Check if the stripping of colnames works. Everything after the first
# '[' (and the preceding space) should be stripped.
self.assertEqual(self.cur.description[0][0], "x y")
def test_case_in_converter_name(self):
self.cur.execute("select 'other' as \"x [b1b1]\"")
val = self.cur.fetchone()[0]
self.assertEqual(val, "MARKER")
def test_cursor_description_no_row(self):
"""
cursor.description should at least provide the column name(s), even if
no row returned.
"""
self.cur.execute("select * from test where 0 = 1")
self.assertEqual(self.cur.description[0][0], "x")
def test_cursor_description_insert(self):
self.cur.execute("insert into test values (1)")
self.assertIsNone(self.cur.description)
class CommonTableExpressionTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
self.cur = self.con.cursor()
self.cur.execute("create table test(x foo)")
def tearDown(self):
self.cur.close()
self.con.close()
def test_cursor_description_cte_simple(self):
self.cur.execute("with one as (select 1) select * from one")
self.assertIsNotNone(self.cur.description)
self.assertEqual(self.cur.description[0][0], "1")
def test_cursor_description_cte_multiple_columns(self):
self.cur.execute("insert into test values(1)")
self.cur.execute("insert into test values(2)")
self.cur.execute("with testCTE as (select * from test) select * from testCTE")
self.assertIsNotNone(self.cur.description)
self.assertEqual(self.cur.description[0][0], "x")
def test_cursor_description_cte(self):
self.cur.execute("insert into test values (1)")
self.cur.execute("with bar as (select * from test) select * from test where x = 1")
self.assertIsNotNone(self.cur.description)
self.assertEqual(self.cur.description[0][0], "x")
self.cur.execute("with bar as (select * from test) select * from test where x = 2")
self.assertIsNotNone(self.cur.description)
self.assertEqual(self.cur.description[0][0], "x")
class ObjectAdaptationTests(unittest.TestCase):
def cast(obj):
return float(obj)
cast = staticmethod(cast)
def setUp(self):
self.con = sqlite.connect(":memory:")
try:
del sqlite.adapters[int]
except:
pass
sqlite.register_adapter(int, ObjectAdaptationTests.cast)
self.cur = self.con.cursor()
def tearDown(self):
del sqlite.adapters[(int, sqlite.PrepareProtocol)]
self.cur.close()
self.con.close()
def test_caster_is_used(self):
self.cur.execute("select ?", (4,))
val = self.cur.fetchone()[0]
self.assertEqual(type(val), float)
def test_missing_adapter(self):
with self.assertRaises(sqlite.ProgrammingError):
sqlite.adapt(1.) # No float adapter registered
def test_missing_protocol(self):
with self.assertRaises(sqlite.ProgrammingError):
sqlite.adapt(1, None)
def test_defect_proto(self):
class DefectProto():
def __adapt__(self):
return None
with self.assertRaises(sqlite.ProgrammingError):
sqlite.adapt(1., DefectProto)
def test_defect_self_adapt(self):
class DefectSelfAdapt(float):
def __conform__(self, _):
return None
with self.assertRaises(sqlite.ProgrammingError):
sqlite.adapt(DefectSelfAdapt(1.))
def test_custom_proto(self):
class CustomProto():
def __adapt__(self):
return "adapted"
self.assertEqual(sqlite.adapt(1., CustomProto), "adapted")
def test_adapt(self):
val = 42
self.assertEqual(float(val), sqlite.adapt(val))
def test_adapt_alt(self):
alt = "other"
self.assertEqual(alt, sqlite.adapt(1., None, alt))
@unittest.skipUnless(zlib, "requires zlib")
class BinaryConverterTests(unittest.TestCase):
def convert(s):
return zlib.decompress(s)
convert = staticmethod(convert)
def setUp(self):
self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
sqlite.register_converter("bin", BinaryConverterTests.convert)
def tearDown(self):
self.con.close()
def test_binary_input_for_converter(self):
testdata = b"abcdefg" * 10
result = self.con.execute('select ? as "x [bin]"', (memoryview(zlib.compress(testdata)),)).fetchone()[0]
self.assertEqual(testdata, result)
class DateTimeTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
self.cur = self.con.cursor()
self.cur.execute("create table test(d date, ts timestamp)")
def tearDown(self):
self.cur.close()
self.con.close()
def test_sqlite_date(self):
d = sqlite.Date(2004, 2, 14)
with self.assertWarnsRegex(DeprecationWarning, "adapter") as cm:
self.cur.execute("insert into test(d) values (?)", (d,))
self.assertEqual(cm.filename, __file__)
self.cur.execute("select d from test")
with self.assertWarnsRegex(DeprecationWarning, "converter") as cm:
d2 = self.cur.fetchone()[0]
self.assertEqual(cm.filename, __file__)
self.assertEqual(d, d2)
def test_sqlite_timestamp(self):
ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0)
with self.assertWarnsRegex(DeprecationWarning, "adapter") as cm:
self.cur.execute("insert into test(ts) values (?)", (ts,))
self.assertEqual(cm.filename, __file__)
self.cur.execute("select ts from test")
with self.assertWarnsRegex(DeprecationWarning, "converter") as cm:
ts2 = self.cur.fetchone()[0]
self.assertEqual(cm.filename, __file__)
self.assertEqual(ts, ts2)
def test_sql_timestamp(self):
now = datetime.datetime.now(tz=datetime.UTC)
self.cur.execute("insert into test(ts) values (current_timestamp)")
self.cur.execute("select ts from test")
with self.assertWarnsRegex(DeprecationWarning, "converter"):
ts = self.cur.fetchone()[0]
self.assertEqual(type(ts), datetime.datetime)
self.assertEqual(ts.year, now.year)
def test_date_time_sub_seconds(self):
ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000)
with self.assertWarnsRegex(DeprecationWarning, "adapter"):
self.cur.execute("insert into test(ts) values (?)", (ts,))
self.cur.execute("select ts from test")
with self.assertWarnsRegex(DeprecationWarning, "converter"):
ts2 = self.cur.fetchone()[0]
self.assertEqual(ts, ts2)
def test_date_time_sub_seconds_floating_point(self):
ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241)
with self.assertWarnsRegex(DeprecationWarning, "adapter"):
self.cur.execute("insert into test(ts) values (?)", (ts,))
self.cur.execute("select ts from test")
with self.assertWarnsRegex(DeprecationWarning, "converter"):
ts2 = self.cur.fetchone()[0]
self.assertEqual(ts, ts2)
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,853 @@
# pysqlite2/test/userfunctions.py: tests for user-defined functions and
# aggregates.
#
# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
#
# This file is part of pysqlite.
#
# This software is provided 'as-is', without any express or implied
# warranty. In no event will the authors be held liable for any damages
# arising from the use of this software.
#
# Permission is granted to anyone to use this software for any purpose,
# including commercial applications, and to alter it and redistribute it
# freely, subject to the following restrictions:
#
# 1. The origin of this software must not be misrepresented; you must not
# claim that you wrote the original software. If you use this software
# in a product, an acknowledgment in the product documentation would be
# appreciated but is not required.
# 2. Altered source versions must be plainly marked as such, and must not be
# misrepresented as being the original software.
# 3. This notice may not be removed or altered from any source distribution.
import sys
import unittest
import sqlite3 as sqlite
from unittest.mock import Mock, patch
from test.support import bigmemtest, gc_collect
from .util import cx_limit, memory_database
from .util import with_tracebacks
def func_returntext():
return "foo"
def func_returntextwithnull():
return "1\x002"
def func_returnunicode():
return "bar"
def func_returnint():
return 42
def func_returnfloat():
return 3.14
def func_returnnull():
return None
def func_returnblob():
return b"blob"
def func_returnlonglong():
return 1<<31
def func_raiseexception():
5/0
def func_memoryerror():
raise MemoryError
def func_overflowerror():
raise OverflowError
class AggrNoStep:
def __init__(self):
pass
def finalize(self):
return 1
class AggrNoFinalize:
def __init__(self):
pass
def step(self, x):
pass
class AggrExceptionInInit:
def __init__(self):
5/0
def step(self, x):
pass
def finalize(self):
pass
class AggrExceptionInStep:
def __init__(self):
pass
def step(self, x):
5/0
def finalize(self):
return 42
class AggrExceptionInFinalize:
def __init__(self):
pass
def step(self, x):
pass
def finalize(self):
5/0
class AggrCheckType:
def __init__(self):
self.val = None
def step(self, whichType, val):
theType = {"str": str, "int": int, "float": float, "None": type(None),
"blob": bytes}
self.val = int(theType[whichType] is type(val))
def finalize(self):
return self.val
class AggrCheckTypes:
def __init__(self):
self.val = 0
def step(self, whichType, *vals):
theType = {"str": str, "int": int, "float": float, "None": type(None),
"blob": bytes}
for val in vals:
self.val += int(theType[whichType] is type(val))
def finalize(self):
return self.val
class AggrSum:
def __init__(self):
self.val = 0.0
def step(self, val):
self.val += val
def finalize(self):
return self.val
class AggrText:
def __init__(self):
self.txt = ""
def step(self, txt):
self.txt = self.txt + txt
def finalize(self):
return self.txt
class FunctionTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
self.con.create_function("returntext", 0, func_returntext)
self.con.create_function("returntextwithnull", 0, func_returntextwithnull)
self.con.create_function("returnunicode", 0, func_returnunicode)
self.con.create_function("returnint", 0, func_returnint)
self.con.create_function("returnfloat", 0, func_returnfloat)
self.con.create_function("returnnull", 0, func_returnnull)
self.con.create_function("returnblob", 0, func_returnblob)
self.con.create_function("returnlonglong", 0, func_returnlonglong)
self.con.create_function("returnnan", 0, lambda: float("nan"))
self.con.create_function("return_noncont_blob", 0,
lambda: memoryview(b"blob")[::2])
self.con.create_function("raiseexception", 0, func_raiseexception)
self.con.create_function("memoryerror", 0, func_memoryerror)
self.con.create_function("overflowerror", 0, func_overflowerror)
self.con.create_function("isblob", 1, lambda x: isinstance(x, bytes))
self.con.create_function("isnone", 1, lambda x: x is None)
self.con.create_function("spam", -1, lambda *x: len(x))
self.con.execute("create table test(t text)")
def tearDown(self):
self.con.close()
def test_func_error_on_create(self):
with self.assertRaises(sqlite.OperationalError):
self.con.create_function("bla", -100, lambda x: 2*x)
def test_func_too_many_args(self):
category = sqlite.SQLITE_LIMIT_FUNCTION_ARG
msg = "too many arguments on function"
with cx_limit(self.con, category=category, limit=1):
self.con.execute("select abs(-1)");
with self.assertRaisesRegex(sqlite.OperationalError, msg):
self.con.execute("select max(1, 2)");
def test_func_ref_count(self):
def getfunc():
def f():
return 1
return f
f = getfunc()
globals()["foo"] = f
# self.con.create_function("reftest", 0, getfunc())
self.con.create_function("reftest", 0, f)
cur = self.con.cursor()
cur.execute("select reftest()")
def test_func_return_text(self):
cur = self.con.cursor()
cur.execute("select returntext()")
val = cur.fetchone()[0]
self.assertEqual(type(val), str)
self.assertEqual(val, "foo")
def test_func_return_text_with_null_char(self):
cur = self.con.cursor()
res = cur.execute("select returntextwithnull()").fetchone()[0]
self.assertEqual(type(res), str)
self.assertEqual(res, "1\x002")
def test_func_return_unicode(self):
cur = self.con.cursor()
cur.execute("select returnunicode()")
val = cur.fetchone()[0]
self.assertEqual(type(val), str)
self.assertEqual(val, "bar")
def test_func_return_int(self):
cur = self.con.cursor()
cur.execute("select returnint()")
val = cur.fetchone()[0]
self.assertEqual(type(val), int)
self.assertEqual(val, 42)
def test_func_return_float(self):
cur = self.con.cursor()
cur.execute("select returnfloat()")
val = cur.fetchone()[0]
self.assertEqual(type(val), float)
if val < 3.139 or val > 3.141:
self.fail("wrong value")
def test_func_return_null(self):
cur = self.con.cursor()
cur.execute("select returnnull()")
val = cur.fetchone()[0]
self.assertEqual(type(val), type(None))
self.assertEqual(val, None)
def test_func_return_blob(self):
cur = self.con.cursor()
cur.execute("select returnblob()")
val = cur.fetchone()[0]
self.assertEqual(type(val), bytes)
self.assertEqual(val, b"blob")
def test_func_return_long_long(self):
cur = self.con.cursor()
cur.execute("select returnlonglong()")
val = cur.fetchone()[0]
self.assertEqual(val, 1<<31)
def test_func_return_nan(self):
cur = self.con.cursor()
cur.execute("select returnnan()")
self.assertIsNone(cur.fetchone()[0])
@with_tracebacks(ZeroDivisionError, name="func_raiseexception")
def test_func_exception(self):
cur = self.con.cursor()
with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select raiseexception()")
cur.fetchone()
self.assertEqual(str(cm.exception), 'user-defined function raised exception')
@with_tracebacks(MemoryError, name="func_memoryerror")
def test_func_memory_error(self):
cur = self.con.cursor()
with self.assertRaises(MemoryError):
cur.execute("select memoryerror()")
cur.fetchone()
@with_tracebacks(OverflowError, name="func_overflowerror")
def test_func_overflow_error(self):
cur = self.con.cursor()
with self.assertRaises(sqlite.DataError):
cur.execute("select overflowerror()")
cur.fetchone()
def test_any_arguments(self):
cur = self.con.cursor()
cur.execute("select spam(?, ?)", (1, 2))
val = cur.fetchone()[0]
self.assertEqual(val, 2)
def test_empty_blob(self):
cur = self.con.execute("select isblob(x'')")
self.assertTrue(cur.fetchone()[0])
def test_nan_float(self):
cur = self.con.execute("select isnone(?)", (float("nan"),))
# SQLite has no concept of nan; it is converted to NULL
self.assertTrue(cur.fetchone()[0])
def test_too_large_int(self):
err = "Python int too large to convert to SQLite INTEGER"
self.assertRaisesRegex(OverflowError, err, self.con.execute,
"select spam(?)", (1 << 65,))
def test_non_contiguous_blob(self):
self.assertRaisesRegex(BufferError,
"underlying buffer is not C-contiguous",
self.con.execute, "select spam(?)",
(memoryview(b"blob")[::2],))
@with_tracebacks(BufferError, regex="buffer.*contiguous")
def test_return_non_contiguous_blob(self):
with self.assertRaises(sqlite.OperationalError):
cur = self.con.execute("select return_noncont_blob()")
cur.fetchone()
def test_param_surrogates(self):
self.assertRaisesRegex(UnicodeEncodeError, "surrogates not allowed",
self.con.execute, "select spam(?)",
("\ud803\ude6d",))
def test_func_params(self):
results = []
def append_result(arg):
results.append((arg, type(arg)))
self.con.create_function("test_params", 1, append_result)
dataset = [
(42, int),
(-1, int),
(1234567890123456789, int),
(4611686018427387905, int), # 63-bit int with non-zero low bits
(3.14, float),
(float('inf'), float),
("text", str),
("1\x002", str),
("\u02e2q\u02e1\u2071\u1d57\u1d49", str),
(b"blob", bytes),
(bytearray(range(2)), bytes),
(memoryview(b"blob"), bytes),
(None, type(None)),
]
for val, _ in dataset:
cur = self.con.execute("select test_params(?)", (val,))
cur.fetchone()
self.assertEqual(dataset, results)
# Regarding deterministic functions:
#
# Between 3.8.3 and 3.15.0, deterministic functions were only used to
# optimize inner loops. From 3.15.0 and onward, deterministic functions
# were permitted in WHERE clauses of partial indices, which allows testing
# based on syntax, iso. the query optimizer.
def test_func_non_deterministic(self):
mock = Mock(return_value=None)
self.con.create_function("nondeterministic", 0, mock, deterministic=False)
with self.assertRaises(sqlite.OperationalError):
self.con.execute("create index t on test(t) where nondeterministic() is not null")
def test_func_deterministic(self):
mock = Mock(return_value=None)
self.con.create_function("deterministic", 0, mock, deterministic=True)
try:
self.con.execute("create index t on test(t) where deterministic() is not null")
except sqlite.OperationalError:
self.fail("Unexpected failure while creating partial index")
def test_func_deterministic_keyword_only(self):
with self.assertRaises(TypeError):
self.con.create_function("deterministic", 0, int, True)
def test_function_destructor_via_gc(self):
# See bpo-44304: The destructor of the user function can
# crash if is called without the GIL from the gc functions
def md5sum(t):
return
with memory_database() as dest:
dest.create_function("md5", 1, md5sum)
x = dest("create table lang (name, first_appeared)")
del md5sum, dest
y = [x]
y.append(y)
del x,y
gc_collect()
@with_tracebacks(OverflowError)
def test_func_return_too_large_int(self):
cur = self.con.cursor()
msg = "string or blob too big"
for value in 2**63, -2**63-1, 2**64:
self.con.create_function("largeint", 0, lambda value=value: value)
with self.assertRaisesRegex(sqlite.DataError, msg):
cur.execute("select largeint()")
@with_tracebacks(UnicodeEncodeError, "surrogates not allowed", "chr")
def test_func_return_text_with_surrogates(self):
cur = self.con.cursor()
self.con.create_function("pychr", 1, chr)
for value in 0xd8ff, 0xdcff:
with self.assertRaises(sqlite.OperationalError):
cur.execute("select pychr(?)", (value,))
@unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
@bigmemtest(size=2**31, memuse=3, dry_run=False)
def test_func_return_too_large_text(self, size):
cur = self.con.cursor()
for size in 2**31-1, 2**31:
self.con.create_function("largetext", 0, lambda size=size: "b" * size)
with self.assertRaises(sqlite.DataError):
cur.execute("select largetext()")
@unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
@bigmemtest(size=2**31, memuse=2, dry_run=False)
def test_func_return_too_large_blob(self, size):
cur = self.con.cursor()
for size in 2**31-1, 2**31:
self.con.create_function("largeblob", 0, lambda size=size: b"b" * size)
with self.assertRaises(sqlite.DataError):
cur.execute("select largeblob()")
def test_func_return_illegal_value(self):
self.con.create_function("badreturn", 0, lambda: self)
msg = "user-defined function raised exception"
self.assertRaisesRegex(sqlite.OperationalError, msg,
self.con.execute, "select badreturn()")
def test_func_keyword_args(self):
regex = (
r"Passing keyword arguments 'name', 'narg' and 'func' to "
r"_sqlite3.Connection.create_function\(\) is deprecated. "
r"Parameters 'name', 'narg' and 'func' will become "
r"positional-only in Python 3.15."
)
def noop():
return None
with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
self.con.create_function("noop", 0, func=noop)
self.assertEqual(cm.filename, __file__)
with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
self.con.create_function("noop", narg=0, func=noop)
self.assertEqual(cm.filename, __file__)
with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
self.con.create_function(name="noop", narg=0, func=noop)
self.assertEqual(cm.filename, __file__)
class WindowSumInt:
def __init__(self):
self.count = 0
def step(self, value):
self.count += value
def value(self):
return self.count
def inverse(self, value):
self.count -= value
def finalize(self):
return self.count
class BadWindow(Exception):
pass
@unittest.skipIf(sqlite.sqlite_version_info < (3, 25, 0),
"Requires SQLite 3.25.0 or newer")
class WindowFunctionTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
self.cur = self.con.cursor()
# Test case taken from https://www.sqlite.org/windowfunctions.html#udfwinfunc
values = [
("a", 4),
("b", 5),
("c", 3),
("d", 8),
("e", 1),
]
with self.con:
self.con.execute("create table test(x, y)")
self.con.executemany("insert into test values(?, ?)", values)
self.expected = [
("a", 9),
("b", 12),
("c", 16),
("d", 12),
("e", 9),
]
self.query = """
select x, %s(y) over (
order by x rows between 1 preceding and 1 following
) as sum_y
from test order by x
"""
self.con.create_window_function("sumint", 1, WindowSumInt)
def tearDown(self):
self.cur.close()
self.con.close()
def test_win_sum_int(self):
self.cur.execute(self.query % "sumint")
self.assertEqual(self.cur.fetchall(), self.expected)
def test_win_error_on_create(self):
self.assertRaises(sqlite.ProgrammingError,
self.con.create_window_function,
"shouldfail", -100, WindowSumInt)
@with_tracebacks(BadWindow)
def test_win_exception_in_method(self):
for meth in "__init__", "step", "value", "inverse":
with self.subTest(meth=meth):
with patch.object(WindowSumInt, meth, side_effect=BadWindow):
name = f"exc_{meth}"
self.con.create_window_function(name, 1, WindowSumInt)
msg = f"'{meth}' method raised error"
with self.assertRaisesRegex(sqlite.OperationalError, msg):
self.cur.execute(self.query % name)
self.cur.fetchall()
@with_tracebacks(BadWindow)
def test_win_exception_in_finalize(self):
# Note: SQLite does not (as of version 3.38.0) propagate finalize
# callback errors to sqlite3_step(); this implies that OperationalError
# is _not_ raised.
with patch.object(WindowSumInt, "finalize", side_effect=BadWindow):
name = "exception_in_finalize"
self.con.create_window_function(name, 1, WindowSumInt)
self.cur.execute(self.query % name)
self.cur.fetchall()
@with_tracebacks(AttributeError)
def test_win_missing_method(self):
class MissingValue:
def step(self, x): pass
def inverse(self, x): pass
def finalize(self): return 42
class MissingInverse:
def step(self, x): pass
def value(self): return 42
def finalize(self): return 42
class MissingStep:
def value(self): return 42
def inverse(self, x): pass
def finalize(self): return 42
dataset = (
("step", MissingStep),
("value", MissingValue),
("inverse", MissingInverse),
)
for meth, cls in dataset:
with self.subTest(meth=meth, cls=cls):
name = f"exc_{meth}"
self.con.create_window_function(name, 1, cls)
with self.assertRaisesRegex(sqlite.OperationalError,
f"'{meth}' method not defined"):
self.cur.execute(self.query % name)
self.cur.fetchall()
@with_tracebacks(AttributeError)
def test_win_missing_finalize(self):
# Note: SQLite does not (as of version 3.38.0) propagate finalize
# callback errors to sqlite3_step(); this implies that OperationalError
# is _not_ raised.
class MissingFinalize:
def step(self, x): pass
def value(self): return 42
def inverse(self, x): pass
name = "missing_finalize"
self.con.create_window_function(name, 1, MissingFinalize)
self.cur.execute(self.query % name)
self.cur.fetchall()
def test_win_clear_function(self):
self.con.create_window_function("sumint", 1, None)
self.assertRaises(sqlite.OperationalError, self.cur.execute,
self.query % "sumint")
def test_win_redefine_function(self):
# Redefine WindowSumInt; adjust the expected results accordingly.
class Redefined(WindowSumInt):
def step(self, value): self.count += value * 2
def inverse(self, value): self.count -= value * 2
expected = [(v[0], v[1]*2) for v in self.expected]
self.con.create_window_function("sumint", 1, Redefined)
self.cur.execute(self.query % "sumint")
self.assertEqual(self.cur.fetchall(), expected)
def test_win_error_value_return(self):
class ErrorValueReturn:
def __init__(self): pass
def step(self, x): pass
def value(self): return 1 << 65
self.con.create_window_function("err_val_ret", 1, ErrorValueReturn)
self.assertRaisesRegex(sqlite.DataError, "string or blob too big",
self.cur.execute, self.query % "err_val_ret")
class AggregateTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
cur = self.con.cursor()
cur.execute("""
create table test(
t text,
i integer,
f float,
n,
b blob
)
""")
cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)",
("foo", 5, 3.14, None, memoryview(b"blob"),))
cur.close()
self.con.create_aggregate("nostep", 1, AggrNoStep)
self.con.create_aggregate("nofinalize", 1, AggrNoFinalize)
self.con.create_aggregate("excInit", 1, AggrExceptionInInit)
self.con.create_aggregate("excStep", 1, AggrExceptionInStep)
self.con.create_aggregate("excFinalize", 1, AggrExceptionInFinalize)
self.con.create_aggregate("checkType", 2, AggrCheckType)
self.con.create_aggregate("checkTypes", -1, AggrCheckTypes)
self.con.create_aggregate("mysum", 1, AggrSum)
self.con.create_aggregate("aggtxt", 1, AggrText)
def tearDown(self):
self.con.close()
def test_aggr_error_on_create(self):
with self.assertRaises(sqlite.OperationalError):
self.con.create_function("bla", -100, AggrSum)
@with_tracebacks(AttributeError, name="AggrNoStep")
def test_aggr_no_step(self):
cur = self.con.cursor()
with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select nostep(t) from test")
self.assertEqual(str(cm.exception),
"user-defined aggregate's 'step' method not defined")
def test_aggr_no_finalize(self):
cur = self.con.cursor()
msg = "user-defined aggregate's 'finalize' method not defined"
with self.assertRaisesRegex(sqlite.OperationalError, msg):
cur.execute("select nofinalize(t) from test")
val = cur.fetchone()[0]
@with_tracebacks(ZeroDivisionError, name="AggrExceptionInInit")
def test_aggr_exception_in_init(self):
cur = self.con.cursor()
with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select excInit(t) from test")
val = cur.fetchone()[0]
self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error")
@with_tracebacks(ZeroDivisionError, name="AggrExceptionInStep")
def test_aggr_exception_in_step(self):
cur = self.con.cursor()
with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select excStep(t) from test")
val = cur.fetchone()[0]
self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error")
@with_tracebacks(ZeroDivisionError, name="AggrExceptionInFinalize")
def test_aggr_exception_in_finalize(self):
cur = self.con.cursor()
with self.assertRaises(sqlite.OperationalError) as cm:
cur.execute("select excFinalize(t) from test")
val = cur.fetchone()[0]
self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error")
def test_aggr_check_param_str(self):
cur = self.con.cursor()
cur.execute("select checkTypes('str', ?, ?)", ("foo", str()))
val = cur.fetchone()[0]
self.assertEqual(val, 2)
def test_aggr_check_param_int(self):
cur = self.con.cursor()
cur.execute("select checkType('int', ?)", (42,))
val = cur.fetchone()[0]
self.assertEqual(val, 1)
def test_aggr_check_params_int(self):
cur = self.con.cursor()
cur.execute("select checkTypes('int', ?, ?)", (42, 24))
val = cur.fetchone()[0]
self.assertEqual(val, 2)
def test_aggr_check_param_float(self):
cur = self.con.cursor()
cur.execute("select checkType('float', ?)", (3.14,))
val = cur.fetchone()[0]
self.assertEqual(val, 1)
def test_aggr_check_param_none(self):
cur = self.con.cursor()
cur.execute("select checkType('None', ?)", (None,))
val = cur.fetchone()[0]
self.assertEqual(val, 1)
def test_aggr_check_param_blob(self):
cur = self.con.cursor()
cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),))
val = cur.fetchone()[0]
self.assertEqual(val, 1)
def test_aggr_check_aggr_sum(self):
cur = self.con.cursor()
cur.execute("delete from test")
cur.executemany("insert into test(i) values (?)", [(10,), (20,), (30,)])
cur.execute("select mysum(i) from test")
val = cur.fetchone()[0]
self.assertEqual(val, 60)
def test_aggr_no_match(self):
cur = self.con.execute("select mysum(i) from (select 1 as i) where i == 0")
val = cur.fetchone()[0]
self.assertIsNone(val)
def test_aggr_text(self):
cur = self.con.cursor()
for txt in ["foo", "1\x002"]:
with self.subTest(txt=txt):
cur.execute("select aggtxt(?) from test", (txt,))
val = cur.fetchone()[0]
self.assertEqual(val, txt)
def test_agg_keyword_args(self):
regex = (
r"Passing keyword arguments 'name', 'n_arg' and 'aggregate_class' to "
r"_sqlite3.Connection.create_aggregate\(\) is deprecated. "
r"Parameters 'name', 'n_arg' and 'aggregate_class' will become "
r"positional-only in Python 3.15."
)
with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
self.con.create_aggregate("test", 1, aggregate_class=AggrText)
self.assertEqual(cm.filename, __file__)
with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
self.con.create_aggregate("test", n_arg=1, aggregate_class=AggrText)
self.assertEqual(cm.filename, __file__)
with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
self.con.create_aggregate(name="test", n_arg=0,
aggregate_class=AggrText)
self.assertEqual(cm.filename, __file__)
class AuthorizerTests(unittest.TestCase):
@staticmethod
def authorizer_cb(action, arg1, arg2, dbname, source):
if action != sqlite.SQLITE_SELECT:
return sqlite.SQLITE_DENY
if arg2 == 'c2' or arg1 == 't2':
return sqlite.SQLITE_DENY
return sqlite.SQLITE_OK
def setUp(self):
self.con = sqlite.connect(":memory:")
self.con.executescript("""
create table t1 (c1, c2);
create table t2 (c1, c2);
insert into t1 (c1, c2) values (1, 2);
insert into t2 (c1, c2) values (4, 5);
""")
# For our security test:
self.con.execute("select c2 from t2")
self.con.set_authorizer(self.authorizer_cb)
def tearDown(self):
self.con.close()
def test_table_access(self):
with self.assertRaises(sqlite.DatabaseError) as cm:
self.con.execute("select * from t2")
self.assertIn('prohibited', str(cm.exception))
def test_column_access(self):
with self.assertRaises(sqlite.DatabaseError) as cm:
self.con.execute("select c2 from t1")
self.assertIn('prohibited', str(cm.exception))
def test_clear_authorizer(self):
self.con.set_authorizer(None)
self.con.execute("select * from t2")
self.con.execute("select c2 from t1")
def test_authorizer_keyword_args(self):
regex = (
r"Passing keyword argument 'authorizer_callback' to "
r"_sqlite3.Connection.set_authorizer\(\) is deprecated. "
r"Parameter 'authorizer_callback' will become positional-only in "
r"Python 3.15."
)
with self.assertWarnsRegex(DeprecationWarning, regex) as cm:
self.con.set_authorizer(authorizer_callback=lambda: None)
self.assertEqual(cm.filename, __file__)
class AuthorizerRaiseExceptionTests(AuthorizerTests):
@staticmethod
def authorizer_cb(action, arg1, arg2, dbname, source):
if action != sqlite.SQLITE_SELECT:
raise ValueError
if arg2 == 'c2' or arg1 == 't2':
raise ValueError
return sqlite.SQLITE_OK
@with_tracebacks(ValueError, name="authorizer_cb")
def test_table_access(self):
super().test_table_access()
@with_tracebacks(ValueError, name="authorizer_cb")
def test_column_access(self):
super().test_table_access()
class AuthorizerIllegalTypeTests(AuthorizerTests):
@staticmethod
def authorizer_cb(action, arg1, arg2, dbname, source):
if action != sqlite.SQLITE_SELECT:
return 0.0
if arg2 == 'c2' or arg1 == 't2':
return 0.0
return sqlite.SQLITE_OK
class AuthorizerLargeIntegerTests(AuthorizerTests):
@staticmethod
def authorizer_cb(action, arg1, arg2, dbname, source):
if action != sqlite.SQLITE_SELECT:
return 2**32
if arg2 == 'c2' or arg1 == 't2':
return 2**32
return sqlite.SQLITE_OK
if __name__ == "__main__":
unittest.main()

View File

@ -0,0 +1,85 @@
import contextlib
import functools
import io
import re
import sqlite3
import test.support
import unittest
# Helper for temporary memory databases
def memory_database(*args, **kwargs):
cx = sqlite3.connect(":memory:", *args, **kwargs)
return contextlib.closing(cx)
# Temporarily limit a database connection parameter
@contextlib.contextmanager
def cx_limit(cx, category=sqlite3.SQLITE_LIMIT_SQL_LENGTH, limit=128):
try:
_prev = cx.setlimit(category, limit)
yield limit
finally:
cx.setlimit(category, _prev)
def with_tracebacks(exc, regex="", name=""):
"""Convenience decorator for testing callback tracebacks."""
def decorator(func):
_regex = re.compile(regex) if regex else None
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
with test.support.catch_unraisable_exception() as cm:
# First, run the test with traceback enabled.
with check_tracebacks(self, cm, exc, _regex, name):
func(self, *args, **kwargs)
# Then run the test with traceback disabled.
func(self, *args, **kwargs)
return wrapper
return decorator
@contextlib.contextmanager
def check_tracebacks(self, cm, exc, regex, obj_name):
"""Convenience context manager for testing callback tracebacks."""
sqlite3.enable_callback_tracebacks(True)
try:
buf = io.StringIO()
with contextlib.redirect_stderr(buf):
yield
self.assertEqual(cm.unraisable.exc_type, exc)
if regex:
msg = str(cm.unraisable.exc_value)
self.assertIsNotNone(regex.search(msg))
if obj_name:
self.assertEqual(cm.unraisable.object.__name__, obj_name)
finally:
sqlite3.enable_callback_tracebacks(False)
class MemoryDatabaseMixin:
def setUp(self):
self.con = sqlite3.connect(":memory:")
self.cur = self.con.cursor()
def tearDown(self):
self.cur.close()
self.con.close()
@property
def cx(self):
return self.con
@property
def cu(self):
return self.cur
def requires_virtual_table(module):
with memory_database() as cx:
supported = (module,) in list(cx.execute("PRAGMA module_list"))
reason = f"Requires {module!r} virtual table support"
return unittest.skipUnless(supported, reason)