From 1637c72955caecb88cad38fc4acb3a75527bff1b Mon Sep 17 00:00:00 2001 From: Christopher Hawthorne Date: Fri, 8 Nov 2013 23:35:53 -0500 Subject: [PATCH] Added permission checking. Added unit tests for the db_layer and permissions. --- TODO | 2 + book_data.py | 14 +- browser.py | 25 +- db_layer.py | 69 +++-- exceptions.py | 17 ++ help_bar.py | 3 +- librarian.py | 16 +- librarian_tests/test_db_layer.py | 490 +++++++++++++++++++++++++++++++ permissions.py | 34 +++ run_tests.sh | 9 + 10 files changed, 638 insertions(+), 41 deletions(-) create mode 100644 exceptions.py create mode 100644 librarian_tests/test_db_layer.py create mode 100644 permissions.py create mode 100755 run_tests.sh diff --git a/TODO b/TODO index c98c126..a4b55fc 100644 --- a/TODO +++ b/TODO @@ -14,6 +14,8 @@ _Code Quality Improvements_ Document all functions Conform to python naming conventions and code style Make db_layer use a helper function to handle most of the database queries +Unify the db_layer interface w.r.t. whether book_ids should be passed as +strings or ints or dicts. _Bugs_ diff --git a/book_data.py b/book_data.py index e4c7c7a..d5c089a 100644 --- a/book_data.py +++ b/book_data.py @@ -36,7 +36,9 @@ Keys: def openLibrary_isbn(ISBN): isbn = str(ISBN) try: - jsondata = urlopen("http://openlibrary.org/api/books?format=json&jscmd=data&bibkeys=ISBN:"+isbn, timeout=3) + jsondata = urlopen("http://openlibrary.org/api/books" + "?format=json&jscmd=data&bibkeys=ISBN:"+isbn, + timeout=3) except URLError: return {} openBook = loads(jsondata.read().decode('utf-8')) @@ -62,7 +64,8 @@ def openLibrary_isbn(ISBN): book["publish location"] += "; " + v['name'] book['publish location'] = book['publish location'][2:] - # for lccn, there maybe be multiple values in the query. I'm just taking the first, but the full list may be useful + # for lccn, there maybe be multiple values in the query. I'm just taking + # the first, but the full list may be useful if "lccn" in openBook['identifiers']: book["lccn"]=int(openBook['identifiers']['lccn'][0]) if "publish_date" in openBook: @@ -82,7 +85,9 @@ def openLibrary_isbn(ISBN): def openLibrary_lccn(LCCN): lccn = str(LCCN) try: - jsondata = urlopen("http://openlibrary.org/api/books?format=json&jscmd=data&bibkeys=lccn:"+lccn, timeout=3) + jsondata = urlopen("http://openlibrary.org/api/books" + "?format=json&jscmd=data&bibkeys=lccn:"+lccn, + timeout=3) except URLError: return {} openBook = loads(jsondata.read().decode('utf-8')) @@ -108,7 +113,8 @@ def openLibrary_lccn(LCCN): book["publish location"] += "; " + v['name'] book['publish location'] = book['publish location'][2:] - # for isbn, there maybe be multiple values in the query. I'm just taking the first, but the full list may be useful + # for isbn, there maybe be multiple values in the query. I'm just taking + # the first, but the full list may be useful # There are also ISBN's that have non-number values :( if "isbn_10" in openBook['identifiers']: book["isbn"]=openBook['identifiers']['isbn_10'][0] diff --git a/browser.py b/browser.py index fc8d422..88a78f0 100644 --- a/browser.py +++ b/browser.py @@ -8,8 +8,8 @@ class browserWindow: topline = 0 entries = [] selected = list() - commands = [(' /', 'search'), (' n', 'find next'), (' N', 'find previous'), - ('F6', 'Sort Column'), (' q', 'quit')] + commands = [(' /', 'search'), (' n', 'find next'), (' N', 'find previous'), + ('F6', 'Sort Column'), (' q', 'quit')] cs = [] # column definitions are in (label, weight, specified width) triples columnDefs = [('something',1,None)] @@ -90,7 +90,8 @@ class browserWindow: self.w.addstr(row+3, 1, " ") for k,width in self.columns: if k.lower() in entry: - self.w.addnstr(row+3,cursor,str(entry[k.lower()])+" "*width,width) + self.w.addnstr(row+3, cursor, + str(entry[k.lower()])+" "*width, width) cursor += width+1 else: self.w.addstr(row+3,1," "*(self.mx-2)) @@ -492,10 +493,11 @@ class categorySelector(browserWindow): # first removed the deselected ones uncats = [] cats = [] - for old, new, category in zip(self.original, self.selected, self.entries): - if old and (not new): + for old, new, category in zip(self.original, self.selected, + self.entries): + if old and not new: uncats.append(category) - if (not old) and new: + if not old and new: cats.append(category) db.uncategorizeBook(self.book, uncats) # add the newly selected categories @@ -516,12 +518,15 @@ class categorySelector(browserWindow): class columnSelector(browserWindow): columnDefs = [('Column',100,None)] - entries = [{'column': 'id'}, {'column': 'isbn'}, {'column': 'lccn'}, - {'column': 'title'}, {'column': 'subtitle'}, {'column': 'authors'}, + entries = [ + {'column': 'id'}, {'column': 'isbn'}, {'column': 'lccn'}, + {'column': 'title'}, {'column': 'subtitle'}, {'column': 'authors'}, {'column': 'edition'}, {'column': 'publisher'}, {'column': 'publish year'}, {'column': 'publish month'}, - {'column': 'publish location'}, {'column': 'pages'}, {'column': 'pagination'}, - {'column': 'weight'}, {'column': 'last updated'}] + {'column': 'publish location'}, {'column': 'pages'}, + {'column': 'pagination'}, {'column': 'weight'}, + {'column': 'last updated'}, + ] def __init__(self,window,helpbar,height=40,width=20): self.selected = [False,False,False,False,False,False,False, diff --git a/db_layer.py b/db_layer.py index e055bc9..e4860be 100644 --- a/db_layer.py +++ b/db_layer.py @@ -1,6 +1,8 @@ import sys import sqlite3 +import permissions + dbFile = 'sqLibrary.db' bookTable = 'books' bookCategoryTable='book_categories' @@ -12,7 +14,9 @@ CREATE TABLE IF NOT EXISTS books (id INTEGER PRIMARY KEY AUTOINCREMENT, isbn, lccn, title, subtitle, authors, edition, publisher, publish_year, publish_month, publish_location, - pages, pagination, weight, last_updated DATETIME DEFAULT current_timestamp, deleted BOOLEAN DEFAULT 0); + pages, pagination, weight, + last_updated DATETIME DEFAULT current_timestamp, + deleted BOOLEAN DEFAULT 0); CREATE TABLE IF NOT EXISTS categories (cat_id INTEGER PRIMARY KEY, category STRING UNIQUE ON CONFLICT IGNORE); @@ -43,7 +47,8 @@ BEGIN DELETE FROM book_categories WHERE cat_id = old.cat_id; END; -CREATE TRIGGER IF NOT EXISTS insert_book_category_time AFTER INSERT ON book_categories +CREATE TRIGGER IF NOT EXISTS insert_book_category_time AFTER INSERT +ON book_categories BEGIN UPDATE books SET last_updated = DATETIME('NOW') WHERE id = new.id; END; @@ -52,15 +57,16 @@ END; ################################3 # character escaping, etc for sql queries ################################# -def colify(s): +def _colify(s): return s.replace(" ","_").lower() -def stringify(v): +def _stringify(v): return '"' + str(v).strip().replace('"','""') + '"' ################################### # book functions ################################## +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def addBook(book): conn = sqlite3.connect(dbFile) c = conn.cursor() @@ -68,21 +74,28 @@ def addBook(book): vals = [] for k,v in book.items(): if v!="": - cols.append(colify(k)) - vals.append(stringify(v)) + cols.append(_colify(k)) + vals.append(_stringify(v)) - query = "INSERT INTO "+bookTable+" ("+", ".join(cols)+") VALUES ("+", ".join(vals)+");" + query = ("INSERT INTO "+bookTable+" ("+", ".join(cols)+") VALUES ("+ + ", ".join(vals)+");") c.execute(query) conn.commit() c.close() +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def updateBook(book, bookID): + ''' + Takes book attribute dictionary and a string representating the book ID + number, and returns updates the book accordingly + ''' conn = sqlite3.connect(dbFile) c = conn.cursor() updates=[] for k,v in book.items(): - updates.append(colify(k)+"="+stringify(v)) - query = "UPDATE "+bookTable+" SET " + ", ".join(updates)+" WHERE id = " +str(bookID)+";" + updates.append(_colify(k)+"="+_stringify(v)) + query = ("UPDATE "+bookTable+" SET " + ", ".join(updates)+" WHERE id = " + + str(bookID)+";") c.execute(query) conn.commit() c.close() @@ -97,9 +110,15 @@ def getBooks(): return books def getBooksByCategory(cat): + ''' + Takes a string representating the category ID number, and returns + non-deleted books in that category + ''' conn = sqlite3.connect(dbFile) c = conn.cursor() - query = "SELECT "+",".join(map(colify,columns))+" FROM "+bookTable+" JOIN "+bookCategoryTable+" USING (id) WHERE cat_id = :id AND deleted=0;" + query = ("SELECT "+",".join(map(_colify,columns))+" FROM "+bookTable+ + " JOIN "+bookCategoryTable+ + " USING (id) WHERE cat_id = :id AND deleted=0;") c.execute(query,cat) books = [_query_to_book(b) for b in c] c.close() @@ -124,6 +143,7 @@ def getBookByID(bookid): return book # removes book from catalogue +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def removeBook(bookid): conn = sqlite3.connect(dbFile) c = conn.cursor() @@ -132,6 +152,7 @@ def removeBook(bookid): conn.commit() c.close() +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def removeBooks(books): conn = sqlite3.connect(dbFile) c = conn.cursor() @@ -142,6 +163,7 @@ def removeBooks(books): c.close() # restores trashed books +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def restoreBooks(books): conn = sqlite3.connect(dbFile) c = conn.cursor() @@ -151,7 +173,8 @@ def restoreBooks(books): conn.commit() c.close() -# fully deletes book from removedBooks table +# fully deletes book from books table +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def deleteBook(bookid): conn = sqlite3.connect(dbFile) c = conn.cursor() @@ -160,6 +183,7 @@ def deleteBook(bookid): conn.commit() c.close() +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def deleteBooks(books): conn = sqlite3.connect(dbFile) c = conn.cursor() @@ -180,7 +204,8 @@ def _query_to_book(book_query): def getBookCategories(book): conn = sqlite3.connect(dbFile) c = conn.cursor() - query = "SELECT id,cat_id,category FROM "+bookCategoryTable+" JOIN "+categoryTable+" USING (cat_id) WHERE id = :id ;" + query = ("SELECT id,cat_id,category FROM "+bookCategoryTable+" JOIN "+ + categoryTable+" USING (cat_id) WHERE id = :id ;") c.execute(query,book) cats = [] for book_id,cat_id,cat_name in c: @@ -188,16 +213,19 @@ def getBookCategories(book): c.close() return cats +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def categorizeBook(book, cats): conn = sqlite3.connect(dbFile) c = conn.cursor() - query = "INSERT OR IGNORE INTO "+bookCategoryTable+" (id,cat_id) VALUES (?, ?);" + query = ("INSERT OR IGNORE INTO "+bookCategoryTable+ + " (id,cat_id) VALUES (?, ?);") for cat in cats: args = (book['id'],cat['id']) c.execute(query,args) conn.commit() c.close() +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def uncategorizeBook(book, cats): conn = sqlite3.connect(dbFile) c = conn.cursor() @@ -219,14 +247,17 @@ def getCategories(): c.close() return cats +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def addCategory(cat): conn = sqlite3.connect(dbFile) c = conn.cursor() - query = "INSERT OR IGNORE INTO "+categoryTable+" (category) VALUES ("+stringify(cat)+");" + query = ("INSERT OR IGNORE INTO "+categoryTable+" (category) VALUES (" + +_stringify(cat)+");") c.execute(query) conn.commit() c.close() +@permissions.check_permissions(permissions.PERMISSION_LIBCOM) def deleteCategories(cats): conn = sqlite3.connect(dbFile) c = conn.cursor() @@ -239,20 +270,20 @@ def deleteCategories(cats): ######################################### # Database initialization ######################################### -def createBooksTable(): +def _createBooksTable(): conn = sqlite3.connect(dbFile) c = conn.cursor() c.executescript(bookTableCreation) conn.commit() c.close() -def createTriggers(): +def _createTriggers(): conn = sqlite3.connect(dbFile) c = conn.cursor() c.executescript(bookTriggerCreation) conn.commit() c.close() - -createBooksTable() -createTriggers() +def initializeDatabase(): + _createBooksTable() + _createTriggers() diff --git a/exceptions.py b/exceptions.py new file mode 100644 index 0000000..a82df8a --- /dev/null +++ b/exceptions.py @@ -0,0 +1,17 @@ +import abc + +class LibrarianException(Exception, metaclass=abc.ABCMeta): + @abc.abstractproperty + def error_msg(self): + pass + + def __str__(self): + return self.error_msg + +class PermissionsError(LibrarianException): + def __init__(self, permission_string): + self.permission_string = permission_string + + @property + def error_msg(self): + return "Need privilege level {}".format(self.permission_string) diff --git a/help_bar.py b/help_bar.py index fabe313..880eeac 100644 --- a/help_bar.py +++ b/help_bar.py @@ -28,7 +28,8 @@ class helpBar: r=0 c=0 for key,command in self.commands: - self.w.addnstr(r,c,key+" "+command+" "*self.colWidth,self.colWidth-1) + self.w.addnstr(r,c,key+" "+command+" "*self.colWidth, + self.colWidth-1) self.w.chgat(r,c,2,curses.A_REVERSE) c+=self.colWidth if c > self.colWidth*self.numCols: diff --git a/librarian.py b/librarian.py index dfc3e04..9415321 100755 --- a/librarian.py +++ b/librarian.py @@ -131,12 +131,14 @@ def catMenu(): cat.clear() -m = [("Browse Library", browseMenu), - ("Add Book", addForm), - ("Categories", catMenu), - ("View Trash", trashMenu), - ("",exit), - ("Exit", exit)] -curses.wrapper(menutest, m) +if __name__ == "__main__": + db.initializeDatabase() + m = [("Browse Library", browseMenu), + ("Add Book", addForm), + ("Categories", catMenu), + ("View Trash", trashMenu), + ("",exit), + ("Exit", exit)] + curses.wrapper(menutest, m) diff --git a/librarian_tests/test_db_layer.py b/librarian_tests/test_db_layer.py new file mode 100644 index 0000000..50a0302 --- /dev/null +++ b/librarian_tests/test_db_layer.py @@ -0,0 +1,490 @@ +import contextlib +import os +import sqlite3 +import unittest + +import db_layer +import exceptions +import permissions + +class TestDBSetup(unittest.TestCase): + def tearDown(self): + try: + os.remove(db_layer.dbFile) + except FileNotFoundError: + pass + + def test_db_setup(self): + conn = sqlite3.connect(db_layer.dbFile) + + with self.assertRaises(sqlite3.OperationalError): + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT * FROM books;') + + db_layer.initializeDatabase() + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT * FROM books;') + + def test_existing_db_file(self): + conn = sqlite3.connect(db_layer.dbFile) + db_layer.initializeDatabase() + with contextlib.closing(conn.cursor()) as c: + c.execute('INSERT INTO books (isbn) VALUES (1111111111);') + conn.commit() + + db_layer.initializeDatabase() + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT isbn FROM books;') + rows = list(c) + + self.assertEqual([1111111111], [row[0] for row in rows]) + +class TestDBFunctions(unittest.TestCase): + maxDiff = None + + def assertISBNs(self, expected_isbns, book_dicts): + expected_isbns = frozenset(expected_isbns) + actual_isbns = frozenset(book.get('isbn') for book in book_dicts) + self.assertEqual(expected_isbns, actual_isbns) + + def assertCategories(self, expected_categories, book_dicts): + expected_categories = frozenset(expected_categories) + actual_categories = frozenset(book['category'] for book in book_dicts) + self.assertEqual(expected_categories, actual_categories) + + def setUp(self): + db_layer.initializeDatabase() + permissions._CURRENT_GROUPS_GETTER = lambda: ["office", "libcom"] + + conn = sqlite3.connect(db_layer.dbFile) + + with contextlib.closing(conn.cursor()) as c: + c.execute(''' +INSERT INTO books (isbn, lccn, title, subtitle, authors, edition, publisher, + publish_year, publish_month, publish_location, pages, + pagination, weight) +VALUES +(1111111111111, 2222222222, 'Attack of the bad code', 'Return of the GOTO', + 'cdchawthorne', '1st', 'CSC Publishing', '1992', 'June', 'Waterloo, Canada', + '496', 'xxvi, 493 p', '1 kg'), +(3333333333333, 4444444444, 'Star Wars VI--Return of the Jedi', 'Now in text!', + 'George Lucas', '2nd', 'Lucas Film', '2013', 'November', 'Somewhere, USA', + '8128', 'xx, 8100 p', '10 kg'); + ''') + + with contextlib.closing(conn.cursor()) as c: + c.execute(''' +INSERT INTO books (isbn, lccn, deleted) VALUES +(5555555555555, NULL, 0), (NULL, 6666666666, 0), (7777777777777, NULL, 1); + ''') + + with contextlib.closing(conn.cursor()) as c: + c.execute(''' +INSERT INTO categories (category) +VALUES ('My special category'), ('My second special category'); + ''') + + with contextlib.closing(conn.cursor()) as c: + c.execute('''SELECT rowid, isbn, lccn, last_updated FROM books;''') + for row in c: + if row[1] == 1111111111111: + self.book0_id = row[0] + self.book0_last_updated = row[3] + elif row[1] == 3333333333333: + self.book1_id = row[0] + self.book1_last_updated = row[3] + elif row[1] == 5555555555555: + self.book2_id = row[0] + self.book2_last_updated = row[3] + elif row[2] == 6666666666: + self.book3_id = row[0] + self.book3_last_updated = row[3] + elif row[1] == 7777777777777: + self.book4_id = row[0] + self.book4_last_updated = row[3] + else: + self.assertTrue(False, + "Unexpected data in DB during setup") + + with contextlib.closing(conn.cursor()) as c: + c.execute('''SELECT rowid, category FROM categories;''') + for row in c: + if row[1] == 'My special category': + self.category0_id = row[0] + elif row[1] == 'My second special category': + self.category1_id = row[0] + + with contextlib.closing(conn.cursor()) as c: + c.execute(''' +INSERT INTO book_categories (id, cat_id) VALUES (?, ?), (?, ?), (?, ?), (?, ?); + ''', + (self.book1_id, self.category0_id, + self.book1_id, self.category1_id, + self.book0_id, self.category1_id, + self.book2_id, self.category1_id, + ) + ) + + conn.commit() + + def tearDown(self): + try: + os.remove(db_layer.dbFile) + except FileNotFoundError: + pass + + def test_getBooks(self): + books = db_layer.getBooks() + + expected_book0 = { + 'id': self.book0_id, + 'isbn': 1111111111111, + 'lccn': 2222222222, + 'title': 'Attack of the bad code', + 'subtitle': 'Return of the GOTO', + 'authors': 'cdchawthorne', + 'edition': '1st', + 'publisher': 'CSC Publishing', + 'publish year': '1992', + 'publish month': 'June', + 'publish location': 'Waterloo, Canada', + 'pages': '496', + 'pagination': 'xxvi, 493 p', + 'weight': '1 kg', + 'last updated': self.book0_last_updated, + } + + found_book0 = False + for book in books: + if book.get('isbn') == 1111111111111: + found_book0 = True + self.assertEqual(book, expected_book0) + + self.assertTrue(found_book0, "getBooks() missing book0") + + expected_isbns = [1111111111111, 3333333333333, 5555555555555, None] + self.assertISBNs(expected_isbns, books) + + def test_getBooksByCategory(self): + books = db_layer.getBooksByCategory(str(self.category0_id)) + expected_isbns = [3333333333333] + self.assertISBNs(expected_isbns, books) + + def test_getRemovedBooks(self): + books = db_layer.getRemovedBooks() + expected_isbns = [7777777777777] + self.assertISBNs(expected_isbns, books) + + def test_addBook(self): + db_layer.addBook({'isbn': 8888888888888, 'title': 'New book'}) + + conn = sqlite3.connect(db_layer.dbFile) + + with contextlib.closing(conn.cursor()) as c: + c.execute(''' +SELECT title FROM books WHERE isbn = '8888888888888'; + ''') + rows = list(c) + + self.assertEqual(['New book'], [row[0] for row in rows]) + + def test_updateBook(self): + db_layer.updateBook({'title': 'Attack of the questionable code'}, + str(self.book0_id)) + + conn = sqlite3.connect(db_layer.dbFile) + + with contextlib.closing(conn.cursor()) as c: + c.execute(''' +SELECT title FROM books WHERE id = ?; + ''', (self.book0_id,)) + rows = list(c) + + self.assertEqual(['Attack of the questionable code'], + [row[0] for row in rows]) + + def test_getBookById(self): + book = db_layer.getBookByID(self.book0_id) + self.assertEqual('Attack of the bad code', book['title']) + + def test_removeBook(self): + conn = sqlite3.connect(db_layer.dbFile) + with contextlib.closing(conn.cursor()) as c: + c.execute(''' +SELECT deleted FROM books WHERE id = ?; + ''', (self.book1_id,)) + rows = list(c) + + self.assertEqual([0], [row[0] for row in rows]) + + db_layer.removeBook(self.book1_id) + + with contextlib.closing(conn.cursor()) as c: + c.execute(''' +SELECT deleted FROM books WHERE id = ?; + ''', (self.book1_id,)) + rows = list(c) + + self.assertEqual([1], [row[0] for row in rows]) + + def test_removeBooks(self): + conn = sqlite3.connect(db_layer.dbFile) + with contextlib.closing(conn.cursor()) as c: + c.execute(''' +SELECT deleted FROM books WHERE id = ? OR id = ?; + ''', (self.book0_id, self.book1_id,)) + rows = list(c) + + self.assertEqual([0,0], [row[0] for row in rows]) + + db_layer.removeBooks([str(self.book0_id), str(self.book1_id)]) + + with contextlib.closing(conn.cursor()) as c: + c.execute(''' +SELECT deleted FROM books WHERE id = ? OR id = ?; + ''', (self.book0_id, self.book1_id)) + rows = list(c) + + self.assertEqual([1,1], [row[0] for row in rows]) + + def test_deleteBook(self): + conn = sqlite3.connect(db_layer.dbFile) + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;', + (self.book0_id, self.book1_id)) + count = c.fetchone()[0] + + self.assertEqual(2, count) + + db_layer.deleteBook(self.book0_id) + db_layer.deleteBook(self.book1_id) + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;', + (self.book0_id, self.book1_id)) + count = c.fetchone()[0] + + self.assertEqual(0, count) + + # Code duplication? What's that? + def test_deleteBooks(self): + conn = sqlite3.connect(db_layer.dbFile) + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;', + (self.book0_id, self.book1_id)) + count = c.fetchone()[0] + + self.assertEqual(2, count) + + db_layer.deleteBooks([str(self.book0_id), str(self.book1_id)]) + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;', + (self.book0_id, self.book1_id)) + count = c.fetchone()[0] + + self.assertEqual(0, count) + + def test_getBookCategories(self): + categories = db_layer.getBookCategories(str(self.book1_id)) + expected_categories = ['My special category', + 'My second special category'] + self.assertCategories(expected_categories, categories) + + categories = db_layer.getBookCategories(str(self.book0_id)) + expected_categories = ['My second special category'] + self.assertCategories(expected_categories, categories) + + categories = db_layer.getBookCategories(str(self.book3_id)) + expected_categories = [] + self.assertCategories(expected_categories, categories) + + def test_categorizeBook(self): + conn = sqlite3.connect(db_layer.dbFile) + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT cat_id FROM book_categories WHERE id = ?;', + (self.book3_id,)) + rows = list(c) + + self.assertEqual([], rows) + + db_layer.categorizeBook({'id': self.book3_id}, + [{'id': self.category0_id}, + {'id': self.category1_id}]) + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT cat_id FROM book_categories WHERE id = ?;', + (self.book3_id,)) + rows = list(c) + + self.assertEqual(frozenset([self.category0_id, self.category1_id]), + frozenset(row[0] for row in rows)) + + def test_uncategorizeBook(self): + conn = sqlite3.connect(db_layer.dbFile) + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT cat_id FROM book_categories WHERE id = ?;', + (self.book1_id,)) + rows = list(c) + + self.assertEqual(frozenset([self.category0_id, self.category1_id]), + frozenset(row[0] for row in rows)) + + db_layer.uncategorizeBook({'id': self.book1_id}, + [{'id': self.category0_id}, + {'id': self.category1_id}]) + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT cat_id FROM book_categories WHERE id = ?;', + (self.book3_id,)) + rows = list(c) + + self.assertEqual([], rows) + + def test_getCategories(self): + categories = db_layer.getCategories() + expected_categories = ['My special category', + 'My second special category'] + self.assertCategories(expected_categories, categories) + + def test_addCategory(self): + conn = sqlite3.connect(db_layer.dbFile) + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT category FROM categories;') + rows = list(c) + + expected_categories = ['My special category', + 'My second special category'] + self.assertEqual(frozenset(expected_categories), + frozenset(row[0] for row in rows)) + + db_layer.addCategory('My third special category') + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT category FROM categories;') + rows = list(c) + + expected_categories.append('My third special category') + self.assertEqual(frozenset(expected_categories), + frozenset(row[0] for row in rows)) + + def test_deleteCategories(self): + conn = sqlite3.connect(db_layer.dbFile) + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT COUNT(*) FROM categories;') + rows = list(c) + + self.assertEqual(2, rows[0][0]) + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT COUNT(*) FROM book_categories;') + rows = list(c) + + self.assertEqual(4, rows[0][0]) + + db_layer.deleteCategories([str(self.category1_id)]) + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT COUNT(*) FROM categories;') + rows = list(c) + + self.assertEqual(1, rows[0][0]) + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT COUNT(*) FROM book_categories;') + rows = list(c) + + self.assertEqual(1, rows[0][0]) + +class TestDBFunctionPermissions(unittest.TestCase): + def setUp(self): + db_layer.initializeDatabase() + # Need book in database or getBookByID will error when we test it + conn = sqlite3.connect(db_layer.dbFile) + + with contextlib.closing(conn.cursor()) as c: + c.execute('INSERT INTO books (isbn) VALUES (5555555555555);') + + with contextlib.closing(conn.cursor()) as c: + c.execute('SELECT rowid FROM books;') + self.dummy_book_id = c.fetchone()[0] + + def tearDown(self): + try: + os.remove(db_layer.dbFile) + except FileNotFoundError: + pass + + def verify_permissions(self, fn, authorized_groups, unauthorized_groups): + for group in authorized_groups: + permissions._CURRENT_GROUPS_GETTER = lambda: [group] + fn() + + for group in unauthorized_groups: + permissions._CURRENT_GROUPS_GETTER = lambda: [group] + with self.assertRaises(exceptions.PermissionsError): + fn() + + def test_db_function_permissions(self): + function_dicts = [ + {'fn': lambda: db_layer.addBook({'isbn': 8888888888888}), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + {'fn': lambda: db_layer.updateBook({'isbn': 9999999999999}, 0), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + {'fn': lambda: db_layer.getBooks(), + 'authorized_groups': ['libcom', 'office', 'unauthorized'], + 'unauthorized_groups': []}, + {'fn': lambda: db_layer.getBooksByCategory('0'), + 'authorized_groups': ['libcom', 'office', 'unauthorized'], + 'unauthorized_groups': []}, + {'fn': lambda: db_layer.getRemovedBooks(), + 'authorized_groups': ['libcom', 'office', 'unauthorized'], + 'unauthorized_groups': []}, + {'fn': lambda: db_layer.getBookByID(self.dummy_book_id), + 'authorized_groups': ['libcom', 'office', 'unauthorized'], + 'unauthorized_groups': []}, + {'fn': lambda: db_layer.removeBook(0), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + {'fn': lambda: db_layer.removeBooks(['0']), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + {'fn': lambda: db_layer.restoreBooks(['0']), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + {'fn': lambda: db_layer.deleteBook(0), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + {'fn': lambda: db_layer.deleteBooks(['0']), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + {'fn': lambda: db_layer.getBookCategories('0'), + 'authorized_groups': ['libcom', 'office', 'unauthorized'], + 'unauthorized_groups': []}, + {'fn': lambda: db_layer.categorizeBook({'id':'0'}, []), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + {'fn': lambda: db_layer.uncategorizeBook({'id':'0'}, []), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + {'fn': lambda: db_layer.getCategories(), + 'authorized_groups': ['libcom', 'office', 'unauthorized'], + 'unauthorized_groups': []}, + {'fn': lambda: db_layer.addCategory('Cat5'), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + {'fn': lambda: db_layer.deleteCategories([]), + 'authorized_groups': ['libcom'], + 'unauthorized_groups': ['office', 'unauthorized']}, + ] + + for function_dict in function_dicts: + self.verify_permissions(function_dict['fn'], + function_dict['authorized_groups'], + function_dict['unauthorized_groups']) diff --git a/permissions.py b/permissions.py new file mode 100644 index 0000000..232a05c --- /dev/null +++ b/permissions.py @@ -0,0 +1,34 @@ +import grp +import os + +import exceptions + +class _PermissionLevel: + def __init__(self, group_name, pretty_name): + self.group_name = group_name + self.pretty_name = pretty_name + +PERMISSION_OFFICE = _PermissionLevel("office", "Office worker") +PERMISSION_LIBCOM = _PermissionLevel("libcom", "Library Committee") + +def check_permissions(permission_level): + def decorator(fn): + def wrapped_function(*args, **kwargs): + if not has_permission(permission_level): + raise exceptions.PermissionsError(permission_level.pretty_name) + return fn(*args, **kwargs) + + return wrapped_function + + return decorator + +def has_permission(permission_level): + return permission_level.group_name in _CURRENT_GROUPS_GETTER() + +def _current_group_names(): + group_ids = os.getgroups() + group_names = [grp.getgrgid(group_id).gr_name for group_id in group_ids] + return group_names + +# Hack to allow dependency injection for testing +_CURRENT_GROUPS_GETTER = _current_group_names diff --git a/run_tests.sh b/run_tests.sh new file mode 100755 index 0000000..82c42cc --- /dev/null +++ b/run_tests.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +librarian_dir="$(realpath "$(dirname "$0")")" +librarian_tests_dir="${librarian_dir}/librarian_tests" + +export PYTHONPATH="${librarian_dir}" +cd "${librarian_tests_dir}" + +python -m unittest "$@" test_*.py