Added permission checking. Added unit tests for the db_layer and permissions.

This commit is contained in:
Christopher Hawthorne 2013-11-08 23:35:53 -05:00
parent d858f34ef8
commit 1637c72955
10 changed files with 638 additions and 41 deletions

2
TODO
View File

@ -14,6 +14,8 @@ _Code Quality Improvements_
Document all functions
Conform to python naming conventions and code style
Make db_layer use a helper function to handle most of the database queries
Unify the db_layer interface w.r.t. whether book_ids should be passed as
strings or ints or dicts.
_Bugs_

View File

@ -36,7 +36,9 @@ Keys:
def openLibrary_isbn(ISBN):
isbn = str(ISBN)
try:
jsondata = urlopen("http://openlibrary.org/api/books?format=json&jscmd=data&bibkeys=ISBN:"+isbn, timeout=3)
jsondata = urlopen("http://openlibrary.org/api/books"
"?format=json&jscmd=data&bibkeys=ISBN:"+isbn,
timeout=3)
except URLError:
return {}
openBook = loads(jsondata.read().decode('utf-8'))
@ -62,7 +64,8 @@ def openLibrary_isbn(ISBN):
book["publish location"] += "; " + v['name']
book['publish location'] = book['publish location'][2:]
# for lccn, there maybe be multiple values in the query. I'm just taking the first, but the full list may be useful
# for lccn, there maybe be multiple values in the query. I'm just taking
# the first, but the full list may be useful
if "lccn" in openBook['identifiers']:
book["lccn"]=int(openBook['identifiers']['lccn'][0])
if "publish_date" in openBook:
@ -82,7 +85,9 @@ def openLibrary_isbn(ISBN):
def openLibrary_lccn(LCCN):
lccn = str(LCCN)
try:
jsondata = urlopen("http://openlibrary.org/api/books?format=json&jscmd=data&bibkeys=lccn:"+lccn, timeout=3)
jsondata = urlopen("http://openlibrary.org/api/books"
"?format=json&jscmd=data&bibkeys=lccn:"+lccn,
timeout=3)
except URLError:
return {}
openBook = loads(jsondata.read().decode('utf-8'))
@ -108,7 +113,8 @@ def openLibrary_lccn(LCCN):
book["publish location"] += "; " + v['name']
book['publish location'] = book['publish location'][2:]
# for isbn, there maybe be multiple values in the query. I'm just taking the first, but the full list may be useful
# for isbn, there maybe be multiple values in the query. I'm just taking
# the first, but the full list may be useful
# There are also ISBN's that have non-number values :(
if "isbn_10" in openBook['identifiers']:
book["isbn"]=openBook['identifiers']['isbn_10'][0]

View File

@ -8,8 +8,8 @@ class browserWindow:
topline = 0
entries = []
selected = list()
commands = [(' /', 'search'), (' n', 'find next'), (' N', 'find previous'),
('F6', 'Sort Column'), (' q', 'quit')]
commands = [(' /', 'search'), (' n', 'find next'), (' N', 'find previous'),
('F6', 'Sort Column'), (' q', 'quit')]
cs = []
# column definitions are in (label, weight, specified width) triples
columnDefs = [('something',1,None)]
@ -90,7 +90,8 @@ class browserWindow:
self.w.addstr(row+3, 1, " ")
for k,width in self.columns:
if k.lower() in entry:
self.w.addnstr(row+3,cursor,str(entry[k.lower()])+" "*width,width)
self.w.addnstr(row+3, cursor,
str(entry[k.lower()])+" "*width, width)
cursor += width+1
else:
self.w.addstr(row+3,1," "*(self.mx-2))
@ -492,10 +493,11 @@ class categorySelector(browserWindow):
# first removed the deselected ones
uncats = []
cats = []
for old, new, category in zip(self.original, self.selected, self.entries):
if old and (not new):
for old, new, category in zip(self.original, self.selected,
self.entries):
if old and not new:
uncats.append(category)
if (not old) and new:
if not old and new:
cats.append(category)
db.uncategorizeBook(self.book, uncats)
# add the newly selected categories
@ -516,12 +518,15 @@ class categorySelector(browserWindow):
class columnSelector(browserWindow):
columnDefs = [('Column',100,None)]
entries = [{'column': 'id'}, {'column': 'isbn'}, {'column': 'lccn'},
{'column': 'title'}, {'column': 'subtitle'}, {'column': 'authors'},
entries = [
{'column': 'id'}, {'column': 'isbn'}, {'column': 'lccn'},
{'column': 'title'}, {'column': 'subtitle'}, {'column': 'authors'},
{'column': 'edition'}, {'column': 'publisher'},
{'column': 'publish year'}, {'column': 'publish month'},
{'column': 'publish location'}, {'column': 'pages'}, {'column': 'pagination'},
{'column': 'weight'}, {'column': 'last updated'}]
{'column': 'publish location'}, {'column': 'pages'},
{'column': 'pagination'}, {'column': 'weight'},
{'column': 'last updated'},
]
def __init__(self,window,helpbar,height=40,width=20):
self.selected = [False,False,False,False,False,False,False,

View File

@ -1,6 +1,8 @@
import sys
import sqlite3
import permissions
dbFile = 'sqLibrary.db'
bookTable = 'books'
bookCategoryTable='book_categories'
@ -12,7 +14,9 @@ CREATE TABLE IF NOT EXISTS books
(id INTEGER PRIMARY KEY AUTOINCREMENT,
isbn, lccn, title, subtitle, authors, edition,
publisher, publish_year, publish_month, publish_location,
pages, pagination, weight, last_updated DATETIME DEFAULT current_timestamp, deleted BOOLEAN DEFAULT 0);
pages, pagination, weight,
last_updated DATETIME DEFAULT current_timestamp,
deleted BOOLEAN DEFAULT 0);
CREATE TABLE IF NOT EXISTS categories
(cat_id INTEGER PRIMARY KEY, category STRING UNIQUE ON CONFLICT IGNORE);
@ -43,7 +47,8 @@ BEGIN
DELETE FROM book_categories WHERE cat_id = old.cat_id;
END;
CREATE TRIGGER IF NOT EXISTS insert_book_category_time AFTER INSERT ON book_categories
CREATE TRIGGER IF NOT EXISTS insert_book_category_time AFTER INSERT
ON book_categories
BEGIN
UPDATE books SET last_updated = DATETIME('NOW') WHERE id = new.id;
END;
@ -52,15 +57,16 @@ END;
################################3
# character escaping, etc for sql queries
#################################
def colify(s):
def _colify(s):
return s.replace(" ","_").lower()
def stringify(v):
def _stringify(v):
return '"' + str(v).strip().replace('"','""') + '"'
###################################
# book functions
##################################
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def addBook(book):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
@ -68,21 +74,28 @@ def addBook(book):
vals = []
for k,v in book.items():
if v!="":
cols.append(colify(k))
vals.append(stringify(v))
cols.append(_colify(k))
vals.append(_stringify(v))
query = "INSERT INTO "+bookTable+" ("+", ".join(cols)+") VALUES ("+", ".join(vals)+");"
query = ("INSERT INTO "+bookTable+" ("+", ".join(cols)+") VALUES ("+
", ".join(vals)+");")
c.execute(query)
conn.commit()
c.close()
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def updateBook(book, bookID):
'''
Takes book attribute dictionary and a string representating the book ID
number, and returns updates the book accordingly
'''
conn = sqlite3.connect(dbFile)
c = conn.cursor()
updates=[]
for k,v in book.items():
updates.append(colify(k)+"="+stringify(v))
query = "UPDATE "+bookTable+" SET " + ", ".join(updates)+" WHERE id = " +str(bookID)+";"
updates.append(_colify(k)+"="+_stringify(v))
query = ("UPDATE "+bookTable+" SET " + ", ".join(updates)+" WHERE id = " +
str(bookID)+";")
c.execute(query)
conn.commit()
c.close()
@ -97,9 +110,15 @@ def getBooks():
return books
def getBooksByCategory(cat):
'''
Takes a string representating the category ID number, and returns
non-deleted books in that category
'''
conn = sqlite3.connect(dbFile)
c = conn.cursor()
query = "SELECT "+",".join(map(colify,columns))+" FROM "+bookTable+" JOIN "+bookCategoryTable+" USING (id) WHERE cat_id = :id AND deleted=0;"
query = ("SELECT "+",".join(map(_colify,columns))+" FROM "+bookTable+
" JOIN "+bookCategoryTable+
" USING (id) WHERE cat_id = :id AND deleted=0;")
c.execute(query,cat)
books = [_query_to_book(b) for b in c]
c.close()
@ -124,6 +143,7 @@ def getBookByID(bookid):
return book
# removes book from catalogue
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def removeBook(bookid):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
@ -132,6 +152,7 @@ def removeBook(bookid):
conn.commit()
c.close()
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def removeBooks(books):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
@ -142,6 +163,7 @@ def removeBooks(books):
c.close()
# restores trashed books
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def restoreBooks(books):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
@ -151,7 +173,8 @@ def restoreBooks(books):
conn.commit()
c.close()
# fully deletes book from removedBooks table
# fully deletes book from books table
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def deleteBook(bookid):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
@ -160,6 +183,7 @@ def deleteBook(bookid):
conn.commit()
c.close()
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def deleteBooks(books):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
@ -180,7 +204,8 @@ def _query_to_book(book_query):
def getBookCategories(book):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
query = "SELECT id,cat_id,category FROM "+bookCategoryTable+" JOIN "+categoryTable+" USING (cat_id) WHERE id = :id ;"
query = ("SELECT id,cat_id,category FROM "+bookCategoryTable+" JOIN "+
categoryTable+" USING (cat_id) WHERE id = :id ;")
c.execute(query,book)
cats = []
for book_id,cat_id,cat_name in c:
@ -188,16 +213,19 @@ def getBookCategories(book):
c.close()
return cats
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def categorizeBook(book, cats):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
query = "INSERT OR IGNORE INTO "+bookCategoryTable+" (id,cat_id) VALUES (?, ?);"
query = ("INSERT OR IGNORE INTO "+bookCategoryTable+
" (id,cat_id) VALUES (?, ?);")
for cat in cats:
args = (book['id'],cat['id'])
c.execute(query,args)
conn.commit()
c.close()
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def uncategorizeBook(book, cats):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
@ -219,14 +247,17 @@ def getCategories():
c.close()
return cats
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def addCategory(cat):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
query = "INSERT OR IGNORE INTO "+categoryTable+" (category) VALUES ("+stringify(cat)+");"
query = ("INSERT OR IGNORE INTO "+categoryTable+" (category) VALUES ("
+_stringify(cat)+");")
c.execute(query)
conn.commit()
c.close()
@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
def deleteCategories(cats):
conn = sqlite3.connect(dbFile)
c = conn.cursor()
@ -239,20 +270,20 @@ def deleteCategories(cats):
#########################################
# Database initialization
#########################################
def createBooksTable():
def _createBooksTable():
conn = sqlite3.connect(dbFile)
c = conn.cursor()
c.executescript(bookTableCreation)
conn.commit()
c.close()
def createTriggers():
def _createTriggers():
conn = sqlite3.connect(dbFile)
c = conn.cursor()
c.executescript(bookTriggerCreation)
conn.commit()
c.close()
createBooksTable()
createTriggers()
def initializeDatabase():
_createBooksTable()
_createTriggers()

17
exceptions.py Normal file
View File

@ -0,0 +1,17 @@
import abc
class LibrarianException(Exception, metaclass=abc.ABCMeta):
@abc.abstractproperty
def error_msg(self):
pass
def __str__(self):
return self.error_msg
class PermissionsError(LibrarianException):
def __init__(self, permission_string):
self.permission_string = permission_string
@property
def error_msg(self):
return "Need privilege level {}".format(self.permission_string)

View File

@ -28,7 +28,8 @@ class helpBar:
r=0
c=0
for key,command in self.commands:
self.w.addnstr(r,c,key+" "+command+" "*self.colWidth,self.colWidth-1)
self.w.addnstr(r,c,key+" "+command+" "*self.colWidth,
self.colWidth-1)
self.w.chgat(r,c,2,curses.A_REVERSE)
c+=self.colWidth
if c > self.colWidth*self.numCols:

View File

@ -131,12 +131,14 @@ def catMenu():
cat.clear()
m = [("Browse Library", browseMenu),
("Add Book", addForm),
("Categories", catMenu),
("View Trash", trashMenu),
("",exit),
("Exit", exit)]
curses.wrapper(menutest, m)
if __name__ == "__main__":
db.initializeDatabase()
m = [("Browse Library", browseMenu),
("Add Book", addForm),
("Categories", catMenu),
("View Trash", trashMenu),
("",exit),
("Exit", exit)]
curses.wrapper(menutest, m)

View File

@ -0,0 +1,490 @@
import contextlib
import os
import sqlite3
import unittest
import db_layer
import exceptions
import permissions
class TestDBSetup(unittest.TestCase):
def tearDown(self):
try:
os.remove(db_layer.dbFile)
except FileNotFoundError:
pass
def test_db_setup(self):
conn = sqlite3.connect(db_layer.dbFile)
with self.assertRaises(sqlite3.OperationalError):
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT * FROM books;')
db_layer.initializeDatabase()
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT * FROM books;')
def test_existing_db_file(self):
conn = sqlite3.connect(db_layer.dbFile)
db_layer.initializeDatabase()
with contextlib.closing(conn.cursor()) as c:
c.execute('INSERT INTO books (isbn) VALUES (1111111111);')
conn.commit()
db_layer.initializeDatabase()
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT isbn FROM books;')
rows = list(c)
self.assertEqual([1111111111], [row[0] for row in rows])
class TestDBFunctions(unittest.TestCase):
maxDiff = None
def assertISBNs(self, expected_isbns, book_dicts):
expected_isbns = frozenset(expected_isbns)
actual_isbns = frozenset(book.get('isbn') for book in book_dicts)
self.assertEqual(expected_isbns, actual_isbns)
def assertCategories(self, expected_categories, book_dicts):
expected_categories = frozenset(expected_categories)
actual_categories = frozenset(book['category'] for book in book_dicts)
self.assertEqual(expected_categories, actual_categories)
def setUp(self):
db_layer.initializeDatabase()
permissions._CURRENT_GROUPS_GETTER = lambda: ["office", "libcom"]
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('''
INSERT INTO books (isbn, lccn, title, subtitle, authors, edition, publisher,
publish_year, publish_month, publish_location, pages,
pagination, weight)
VALUES
(1111111111111, 2222222222, 'Attack of the bad code', 'Return of the GOTO',
'cdchawthorne', '1st', 'CSC Publishing', '1992', 'June', 'Waterloo, Canada',
'496', 'xxvi, 493 p', '1 kg'),
(3333333333333, 4444444444, 'Star Wars VI--Return of the Jedi', 'Now in text!',
'George Lucas', '2nd', 'Lucas Film', '2013', 'November', 'Somewhere, USA',
'8128', 'xx, 8100 p', '10 kg');
''')
with contextlib.closing(conn.cursor()) as c:
c.execute('''
INSERT INTO books (isbn, lccn, deleted) VALUES
(5555555555555, NULL, 0), (NULL, 6666666666, 0), (7777777777777, NULL, 1);
''')
with contextlib.closing(conn.cursor()) as c:
c.execute('''
INSERT INTO categories (category)
VALUES ('My special category'), ('My second special category');
''')
with contextlib.closing(conn.cursor()) as c:
c.execute('''SELECT rowid, isbn, lccn, last_updated FROM books;''')
for row in c:
if row[1] == 1111111111111:
self.book0_id = row[0]
self.book0_last_updated = row[3]
elif row[1] == 3333333333333:
self.book1_id = row[0]
self.book1_last_updated = row[3]
elif row[1] == 5555555555555:
self.book2_id = row[0]
self.book2_last_updated = row[3]
elif row[2] == 6666666666:
self.book3_id = row[0]
self.book3_last_updated = row[3]
elif row[1] == 7777777777777:
self.book4_id = row[0]
self.book4_last_updated = row[3]
else:
self.assertTrue(False,
"Unexpected data in DB during setup")
with contextlib.closing(conn.cursor()) as c:
c.execute('''SELECT rowid, category FROM categories;''')
for row in c:
if row[1] == 'My special category':
self.category0_id = row[0]
elif row[1] == 'My second special category':
self.category1_id = row[0]
with contextlib.closing(conn.cursor()) as c:
c.execute('''
INSERT INTO book_categories (id, cat_id) VALUES (?, ?), (?, ?), (?, ?), (?, ?);
''',
(self.book1_id, self.category0_id,
self.book1_id, self.category1_id,
self.book0_id, self.category1_id,
self.book2_id, self.category1_id,
)
)
conn.commit()
def tearDown(self):
try:
os.remove(db_layer.dbFile)
except FileNotFoundError:
pass
def test_getBooks(self):
books = db_layer.getBooks()
expected_book0 = {
'id': self.book0_id,
'isbn': 1111111111111,
'lccn': 2222222222,
'title': 'Attack of the bad code',
'subtitle': 'Return of the GOTO',
'authors': 'cdchawthorne',
'edition': '1st',
'publisher': 'CSC Publishing',
'publish year': '1992',
'publish month': 'June',
'publish location': 'Waterloo, Canada',
'pages': '496',
'pagination': 'xxvi, 493 p',
'weight': '1 kg',
'last updated': self.book0_last_updated,
}
found_book0 = False
for book in books:
if book.get('isbn') == 1111111111111:
found_book0 = True
self.assertEqual(book, expected_book0)
self.assertTrue(found_book0, "getBooks() missing book0")
expected_isbns = [1111111111111, 3333333333333, 5555555555555, None]
self.assertISBNs(expected_isbns, books)
def test_getBooksByCategory(self):
books = db_layer.getBooksByCategory(str(self.category0_id))
expected_isbns = [3333333333333]
self.assertISBNs(expected_isbns, books)
def test_getRemovedBooks(self):
books = db_layer.getRemovedBooks()
expected_isbns = [7777777777777]
self.assertISBNs(expected_isbns, books)
def test_addBook(self):
db_layer.addBook({'isbn': 8888888888888, 'title': 'New book'})
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('''
SELECT title FROM books WHERE isbn = '8888888888888';
''')
rows = list(c)
self.assertEqual(['New book'], [row[0] for row in rows])
def test_updateBook(self):
db_layer.updateBook({'title': 'Attack of the questionable code'},
str(self.book0_id))
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('''
SELECT title FROM books WHERE id = ?;
''', (self.book0_id,))
rows = list(c)
self.assertEqual(['Attack of the questionable code'],
[row[0] for row in rows])
def test_getBookById(self):
book = db_layer.getBookByID(self.book0_id)
self.assertEqual('Attack of the bad code', book['title'])
def test_removeBook(self):
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('''
SELECT deleted FROM books WHERE id = ?;
''', (self.book1_id,))
rows = list(c)
self.assertEqual([0], [row[0] for row in rows])
db_layer.removeBook(self.book1_id)
with contextlib.closing(conn.cursor()) as c:
c.execute('''
SELECT deleted FROM books WHERE id = ?;
''', (self.book1_id,))
rows = list(c)
self.assertEqual([1], [row[0] for row in rows])
def test_removeBooks(self):
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('''
SELECT deleted FROM books WHERE id = ? OR id = ?;
''', (self.book0_id, self.book1_id,))
rows = list(c)
self.assertEqual([0,0], [row[0] for row in rows])
db_layer.removeBooks([str(self.book0_id), str(self.book1_id)])
with contextlib.closing(conn.cursor()) as c:
c.execute('''
SELECT deleted FROM books WHERE id = ? OR id = ?;
''', (self.book0_id, self.book1_id))
rows = list(c)
self.assertEqual([1,1], [row[0] for row in rows])
def test_deleteBook(self):
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
(self.book0_id, self.book1_id))
count = c.fetchone()[0]
self.assertEqual(2, count)
db_layer.deleteBook(self.book0_id)
db_layer.deleteBook(self.book1_id)
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
(self.book0_id, self.book1_id))
count = c.fetchone()[0]
self.assertEqual(0, count)
# Code duplication? What's that?
def test_deleteBooks(self):
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
(self.book0_id, self.book1_id))
count = c.fetchone()[0]
self.assertEqual(2, count)
db_layer.deleteBooks([str(self.book0_id), str(self.book1_id)])
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
(self.book0_id, self.book1_id))
count = c.fetchone()[0]
self.assertEqual(0, count)
def test_getBookCategories(self):
categories = db_layer.getBookCategories(str(self.book1_id))
expected_categories = ['My special category',
'My second special category']
self.assertCategories(expected_categories, categories)
categories = db_layer.getBookCategories(str(self.book0_id))
expected_categories = ['My second special category']
self.assertCategories(expected_categories, categories)
categories = db_layer.getBookCategories(str(self.book3_id))
expected_categories = []
self.assertCategories(expected_categories, categories)
def test_categorizeBook(self):
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
(self.book3_id,))
rows = list(c)
self.assertEqual([], rows)
db_layer.categorizeBook({'id': self.book3_id},
[{'id': self.category0_id},
{'id': self.category1_id}])
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
(self.book3_id,))
rows = list(c)
self.assertEqual(frozenset([self.category0_id, self.category1_id]),
frozenset(row[0] for row in rows))
def test_uncategorizeBook(self):
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
(self.book1_id,))
rows = list(c)
self.assertEqual(frozenset([self.category0_id, self.category1_id]),
frozenset(row[0] for row in rows))
db_layer.uncategorizeBook({'id': self.book1_id},
[{'id': self.category0_id},
{'id': self.category1_id}])
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
(self.book3_id,))
rows = list(c)
self.assertEqual([], rows)
def test_getCategories(self):
categories = db_layer.getCategories()
expected_categories = ['My special category',
'My second special category']
self.assertCategories(expected_categories, categories)
def test_addCategory(self):
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT category FROM categories;')
rows = list(c)
expected_categories = ['My special category',
'My second special category']
self.assertEqual(frozenset(expected_categories),
frozenset(row[0] for row in rows))
db_layer.addCategory('My third special category')
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT category FROM categories;')
rows = list(c)
expected_categories.append('My third special category')
self.assertEqual(frozenset(expected_categories),
frozenset(row[0] for row in rows))
def test_deleteCategories(self):
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT COUNT(*) FROM categories;')
rows = list(c)
self.assertEqual(2, rows[0][0])
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT COUNT(*) FROM book_categories;')
rows = list(c)
self.assertEqual(4, rows[0][0])
db_layer.deleteCategories([str(self.category1_id)])
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT COUNT(*) FROM categories;')
rows = list(c)
self.assertEqual(1, rows[0][0])
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT COUNT(*) FROM book_categories;')
rows = list(c)
self.assertEqual(1, rows[0][0])
class TestDBFunctionPermissions(unittest.TestCase):
def setUp(self):
db_layer.initializeDatabase()
# Need book in database or getBookByID will error when we test it
conn = sqlite3.connect(db_layer.dbFile)
with contextlib.closing(conn.cursor()) as c:
c.execute('INSERT INTO books (isbn) VALUES (5555555555555);')
with contextlib.closing(conn.cursor()) as c:
c.execute('SELECT rowid FROM books;')
self.dummy_book_id = c.fetchone()[0]
def tearDown(self):
try:
os.remove(db_layer.dbFile)
except FileNotFoundError:
pass
def verify_permissions(self, fn, authorized_groups, unauthorized_groups):
for group in authorized_groups:
permissions._CURRENT_GROUPS_GETTER = lambda: [group]
fn()
for group in unauthorized_groups:
permissions._CURRENT_GROUPS_GETTER = lambda: [group]
with self.assertRaises(exceptions.PermissionsError):
fn()
def test_db_function_permissions(self):
function_dicts = [
{'fn': lambda: db_layer.addBook({'isbn': 8888888888888}),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
{'fn': lambda: db_layer.updateBook({'isbn': 9999999999999}, 0),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
{'fn': lambda: db_layer.getBooks(),
'authorized_groups': ['libcom', 'office', 'unauthorized'],
'unauthorized_groups': []},
{'fn': lambda: db_layer.getBooksByCategory('0'),
'authorized_groups': ['libcom', 'office', 'unauthorized'],
'unauthorized_groups': []},
{'fn': lambda: db_layer.getRemovedBooks(),
'authorized_groups': ['libcom', 'office', 'unauthorized'],
'unauthorized_groups': []},
{'fn': lambda: db_layer.getBookByID(self.dummy_book_id),
'authorized_groups': ['libcom', 'office', 'unauthorized'],
'unauthorized_groups': []},
{'fn': lambda: db_layer.removeBook(0),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
{'fn': lambda: db_layer.removeBooks(['0']),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
{'fn': lambda: db_layer.restoreBooks(['0']),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
{'fn': lambda: db_layer.deleteBook(0),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
{'fn': lambda: db_layer.deleteBooks(['0']),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
{'fn': lambda: db_layer.getBookCategories('0'),
'authorized_groups': ['libcom', 'office', 'unauthorized'],
'unauthorized_groups': []},
{'fn': lambda: db_layer.categorizeBook({'id':'0'}, []),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
{'fn': lambda: db_layer.uncategorizeBook({'id':'0'}, []),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
{'fn': lambda: db_layer.getCategories(),
'authorized_groups': ['libcom', 'office', 'unauthorized'],
'unauthorized_groups': []},
{'fn': lambda: db_layer.addCategory('Cat5'),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
{'fn': lambda: db_layer.deleteCategories([]),
'authorized_groups': ['libcom'],
'unauthorized_groups': ['office', 'unauthorized']},
]
for function_dict in function_dicts:
self.verify_permissions(function_dict['fn'],
function_dict['authorized_groups'],
function_dict['unauthorized_groups'])

34
permissions.py Normal file
View File

@ -0,0 +1,34 @@
import grp
import os
import exceptions
class _PermissionLevel:
def __init__(self, group_name, pretty_name):
self.group_name = group_name
self.pretty_name = pretty_name
PERMISSION_OFFICE = _PermissionLevel("office", "Office worker")
PERMISSION_LIBCOM = _PermissionLevel("libcom", "Library Committee")
def check_permissions(permission_level):
def decorator(fn):
def wrapped_function(*args, **kwargs):
if not has_permission(permission_level):
raise exceptions.PermissionsError(permission_level.pretty_name)
return fn(*args, **kwargs)
return wrapped_function
return decorator
def has_permission(permission_level):
return permission_level.group_name in _CURRENT_GROUPS_GETTER()
def _current_group_names():
group_ids = os.getgroups()
group_names = [grp.getgrgid(group_id).gr_name for group_id in group_ids]
return group_names
# Hack to allow dependency injection for testing
_CURRENT_GROUPS_GETTER = _current_group_names

9
run_tests.sh Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
librarian_dir="$(realpath "$(dirname "$0")")"
librarian_tests_dir="${librarian_dir}/librarian_tests"
export PYTHONPATH="${librarian_dir}"
cd "${librarian_tests_dir}"
python -m unittest "$@" test_*.py