Skip to content
3 changes: 3 additions & 0 deletions tools/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ function ci_package_tests_run {
unix-ffi/gettext/test_gettext.py \
unix-ffi/pwd/test_getpwnam.py \
unix-ffi/re/test_re.py \
unix-ffi/sqlite3/test_sqlite3.py \
unix-ffi/sqlite3/test_sqlite3_2.py \
unix-ffi/sqlite3/test_sqlite3_3.py \
unix-ffi/time/test_strftime.py \
; do
echo "Running test $test"
Expand Down
2 changes: 1 addition & 1 deletion unix-ffi/sqlite3/manifest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
metadata(version="0.2.4")
metadata(version="0.3.0")

# Originally written by Paul Sokolovsky.

Expand Down
176 changes: 125 additions & 51 deletions unix-ffi/sqlite3/sqlite3.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
import sys
import ffilib
import uctypes


sq3 = ffilib.open("libsqlite3")

# int sqlite3_open(
# const char *filename, /* Database filename (UTF-8) */
# sqlite3 **ppDb /* OUT: SQLite db handle */
# );
sqlite3_open = sq3.func("i", "sqlite3_open", "sp")
# int sqlite3_close(sqlite3*);
sqlite3_close = sq3.func("i", "sqlite3_close", "p")
# int sqlite3_config(int, ...);
sqlite3_config = sq3.func("i", "sqlite3_config", "ii")
# int sqlite3_get_autocommit(sqlite3*);
sqlite3_get_autocommit = sq3.func("i", "sqlite3_get_autocommit", "p")
# int sqlite3_close_v2(sqlite3*);
sqlite3_close = sq3.func("i", "sqlite3_close_v2", "p")
# int sqlite3_prepare(
# sqlite3 *db, /* Database handle */
# const char *zSql, /* SQL statement, UTF-8 encoded */
# int nByte, /* Maximum length of zSql in bytes. */
# sqlite3_stmt **ppStmt, /* OUT: Statement handle */
# const char **pzTail /* OUT: Pointer to unused portion of zSql */
# );
sqlite3_prepare = sq3.func("i", "sqlite3_prepare", "psipp")
sqlite3_prepare = sq3.func("i", "sqlite3_prepare_v2", "psipp")
# int sqlite3_finalize(sqlite3_stmt *pStmt);
sqlite3_finalize = sq3.func("i", "sqlite3_finalize", "p")
# int sqlite3_step(sqlite3_stmt*);
Expand All @@ -23,20 +32,17 @@
sqlite3_column_count = sq3.func("i", "sqlite3_column_count", "p")
# int sqlite3_column_type(sqlite3_stmt*, int iCol);
sqlite3_column_type = sq3.func("i", "sqlite3_column_type", "pi")
# int sqlite3_column_int(sqlite3_stmt*, int iCol);
sqlite3_column_int = sq3.func("i", "sqlite3_column_int", "pi")
# using "d" return type gives wrong results
# double sqlite3_column_double(sqlite3_stmt*, int iCol);
sqlite3_column_double = sq3.func("d", "sqlite3_column_double", "pi")
# const unsigned char *sqlite3_column_text(sqlite3_stmt*, int iCol);
sqlite3_column_text = sq3.func("s", "sqlite3_column_text", "pi")
# sqlite3_int64 sqlite3_last_insert_rowid(sqlite3*);
# TODO: should return long int
sqlite3_last_insert_rowid = sq3.func("i", "sqlite3_last_insert_rowid", "p")
sqlite3_last_insert_rowid = sq3.func("l", "sqlite3_last_insert_rowid", "p")
# const char *sqlite3_errmsg(sqlite3*);
sqlite3_errmsg = sq3.func("s", "sqlite3_errmsg", "p")

# Too recent
##const char *sqlite3_errstr(int);
# sqlite3_errstr = sq3.func("s", "sqlite3_errstr", "i")


SQLITE_OK = 0
SQLITE_ERROR = 1
Expand All @@ -51,6 +57,11 @@
SQLITE_BLOB = 4
SQLITE_NULL = 5

SQLITE_CONFIG_URI = 17

# For compatibility with CPython sqlite3 driver
LEGACY_TRANSACTION_CONTROL = -1


class Error(Exception):
pass
Expand All @@ -61,79 +72,142 @@ def check_error(db, s):
raise Error(s, sqlite3_errmsg(db))


def get_ptr_size():
return uctypes.sizeof({"ptr": (0 | uctypes.PTR, uctypes.PTR)})


def __prepare_stmt(db, sql):
# Prepares a statement
stmt_ptr = bytes(get_ptr_size())
res = sqlite3_prepare(db, sql, -1, stmt_ptr, None)
check_error(db, res)
return int.from_bytes(stmt_ptr, sys.byteorder)

def __exec_stmt(db, sql):
# Prepares, executes, and finalizes a statement
stmt = __prepare_stmt(db, sql)
sqlite3_step(stmt)
res = sqlite3_finalize(stmt)
check_error(db, res)

def __is_dml(sql):
# Checks if a sql query is a DML, as these get a BEGIN in LEGACY_TRANSACTION_CONTROL
for dml in ["INSERT", "DELETE", "UPDATE", "MERGE"]:
if dml in sql.upper():
return True
return False


class Connections:
def __init__(self, h):
self.h = h
def __init__(self, db, isolation_level, autocommit):
self.db = db
self.isolation_level = isolation_level
self.autocommit = autocommit

def commit(self):
if self.autocommit == LEGACY_TRANSACTION_CONTROL and not sqlite3_get_autocommit(self.db):
__exec_stmt(self.db, "COMMIT")
elif self.autocommit == False:
__exec_stmt(self.db, "COMMIT")
__exec_stmt(self.db, "BEGIN")

def rollback(self):
if self.autocommit == LEGACY_TRANSACTION_CONTROL and not sqlite3_get_autocommit(self.db):
__exec_stmt(self.db, "ROLLBACK")
elif self.autocommit == False:
__exec_stmt(self.db, "ROLLBACK")
__exec_stmt(self.db, "BEGIN")

def cursor(self):
return Cursor(self.h)
return Cursor(self.db, self.isolation_level, self.autocommit)

def close(self):
s = sqlite3_close(self.h)
check_error(self.h, s)
if self.db:
if self.autocommit == False and not sqlite3_get_autocommit(self.db):
__exec_stmt(self.db, "ROLLBACK")

res = sqlite3_close(self.db)
check_error(self.db, res)
self.db = None


class Cursor:
def __init__(self, h):
self.h = h
self.stmnt = None
def __init__(self, db, isolation_level, autocommit):
self.db = db
self.isolation_level = isolation_level
self.autocommit = autocommit
self.stmt = None

def __quote(val):
if isinstance(val, str):
return "'%s'" % val
return str(val)

def execute(self, sql, params=None):
if self.stmt:
# If there is an existing statement, finalize that to free it
res = sqlite3_finalize(self.stmt)
check_error(self.db, res)

if params:
params = [quote(v) for v in params]
params = [self.__quote(v) for v in params]
sql = sql % tuple(params)
print(sql)
b = bytearray(4)
s = sqlite3_prepare(self.h, sql, -1, b, None)
check_error(self.h, s)
self.stmnt = int.from_bytes(b, sys.byteorder)
# print("stmnt", self.stmnt)
self.num_cols = sqlite3_column_count(self.stmnt)
# print("num_cols", self.num_cols)
# If it's not select, actually execute it here
# num_cols == 0 for statements which don't return data (=> modify it)

if __is_dml(sql) and self.autocommit == LEGACY_TRANSACTION_CONTROL and sqlite3_get_autocommit(self.db):
# For compatibility with CPython, add functionality for their default transaction
# behavior. Changing autocommit from LEGACY_TRANSACTION_CONTROL will remove this
__exec_stmt(self.db, "BEGIN " + self.isolation_level)

self.stmt = __prepare_stmt(self.db, sql)
self.num_cols = sqlite3_column_count(self.stmt)

if not self.num_cols:
v = self.fetchone()
# If it's not select, actually execute it here
# num_cols == 0 for statements which don't return data (=> modify it)
assert v is None
self.lastrowid = sqlite3_last_insert_rowid(self.h)
self.lastrowid = sqlite3_last_insert_rowid(self.db)

def close(self):
s = sqlite3_finalize(self.stmnt)
check_error(self.h, s)
if self.stmt:
res = sqlite3_finalize(self.stmt)
check_error(self.db, res)
self.stmt = None

def make_row(self):
def __make_row(self):
res = []
for i in range(self.num_cols):
t = sqlite3_column_type(self.stmnt, i)
# print("type", t)
t = sqlite3_column_type(self.stmt, i)
if t == SQLITE_INTEGER:
res.append(sqlite3_column_int(self.stmnt, i))
res.append(sqlite3_column_int(self.stmt, i))
elif t == SQLITE_FLOAT:
res.append(sqlite3_column_double(self.stmnt, i))
res.append(sqlite3_column_double(self.stmt, i))
elif t == SQLITE_TEXT:
res.append(sqlite3_column_text(self.stmnt, i))
res.append(sqlite3_column_text(self.stmt, i))
else:
raise NotImplementedError
return tuple(res)

def fetchone(self):
res = sqlite3_step(self.stmnt)
# print("step:", res)
res = sqlite3_step(self.stmt)
if res == SQLITE_DONE:
return None
if res == SQLITE_ROW:
return self.make_row()
check_error(self.h, res)
return self.__make_row()
check_error(self.db, res)


def connect(fname, uri=False, isolation_level="", autocommit=LEGACY_TRANSACTION_CONTROL):
if isolation_level not in [None, "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE"]:
raise Error("Invalid option for isolation level")

sqlite3_config(SQLITE_CONFIG_URI, int(uri))

def connect(fname):
b = bytearray(4)
sqlite3_open(fname, b)
h = int.from_bytes(b, sys.byteorder)
return Connections(h)
sqlite_ptr = bytes(get_ptr_size())
sqlite3_open(fname, sqlite_ptr)
db = int.from_bytes(sqlite_ptr, sys.byteorder)

if autocommit == False:
__exec_stmt(db, "BEGIN")

def quote(val):
if isinstance(val, str):
return "'%s'" % val
return str(val)
return Connections(db, isolation_level, autocommit)
3 changes: 3 additions & 0 deletions unix-ffi/sqlite3/test_sqlite3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@
assert row == e

assert expected == []

cur.close()
conn.close()
3 changes: 3 additions & 0 deletions unix-ffi/sqlite3/test_sqlite3_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@
cur.execute("SELECT * FROM foo")
assert cur.fetchone() == (42,)
assert cur.fetchone() is None

cur.close()
conn.close()
42 changes: 42 additions & 0 deletions unix-ffi/sqlite3/test_sqlite3_3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import sqlite3


def test_autocommit():
conn = sqlite3.connect(":memory:", autocommit=True)

# First cursor creates table and inserts value (DML)
cur = conn.cursor()
cur.execute("CREATE TABLE foo(a int)")
cur.execute("INSERT INTO foo VALUES (42)")
cur.close()

# Second cursor fetches 42 due to the autocommit
cur = conn.cursor()
cur.execute("SELECT * FROM foo")
assert cur.fetchone() == (42,)
assert cur.fetchone() is None

cur.close()
conn.close()

def test_manual():
conn = sqlite3.connect(":memory:", autocommit=False)

# First cursor creates table, insert rolls back
cur = conn.cursor()
cur.execute("CREATE TABLE foo(a int)")
conn.commit()
cur.execute("INSERT INTO foo VALUES (42)")
cur.close()
conn.rollback()

# Second connection fetches nothing due to the rollback
cur = conn.cursor()
cur.execute("SELECT * FROM foo")
assert cur.fetchone() is None

cur.close()
conn.close()

test_autocommit()
test_manual()