Added permission checking. Added unit tests for the db_layer and permissions.
authorChristopher Hawthorne <cdchawthorne@gmail.com>
Sat, 9 Nov 2013 04:35:53 +0000 (23:35 -0500)
committerChristopher Hawthorne <cdchawthorne@gmail.com>
Sat, 9 Nov 2013 04:35:53 +0000 (23:35 -0500)
TODO
book_data.py
browser.py
db_layer.py
exceptions.py [new file with mode: 0644]
help_bar.py
librarian.py
librarian_tests/test_db_layer.py [new file with mode: 0644]
permissions.py [new file with mode: 0644]
run_tests.sh [new file with mode: 0755]

diff --git a/TODO b/TODO
index c98c126..a4b55fc 100644 (file)
--- a/TODO
+++ b/TODO
@@ -14,6 +14,8 @@ _Code Quality Improvements_
 Document all functions
 Conform to python naming conventions and code style
 Make db_layer use a helper function to handle most of the database queries
+Unify the db_layer interface w.r.t. whether book_ids should be passed as
+strings or ints or dicts.
 
 
 _Bugs_
index e4c7c7a..d5c089a 100644 (file)
@@ -36,7 +36,9 @@ Keys:
 def openLibrary_isbn(ISBN):
     isbn = str(ISBN)
     try:
-        jsondata = urlopen("http://openlibrary.org/api/books?format=json&jscmd=data&bibkeys=ISBN:"+isbn, timeout=3)
+        jsondata = urlopen("http://openlibrary.org/api/books"
+                           "?format=json&jscmd=data&bibkeys=ISBN:"+isbn,
+                           timeout=3)
     except URLError:
         return {}
     openBook = loads(jsondata.read().decode('utf-8'))
@@ -62,7 +64,8 @@ def openLibrary_isbn(ISBN):
             book["publish location"] += "; " + v['name']
         book['publish location'] = book['publish location'][2:]
 
-    # for lccn, there maybe be multiple values in the query. I'm just taking the first, but the full list may be useful
+    # for lccn, there maybe be multiple values in the query. I'm just taking
+    # the first, but the full list may be useful
     if "lccn" in openBook['identifiers']:
         book["lccn"]=int(openBook['identifiers']['lccn'][0])
     if "publish_date" in openBook:
@@ -82,7 +85,9 @@ def openLibrary_isbn(ISBN):
 def openLibrary_lccn(LCCN):
     lccn = str(LCCN)
     try:
-        jsondata = urlopen("http://openlibrary.org/api/books?format=json&jscmd=data&bibkeys=lccn:"+lccn, timeout=3)
+        jsondata = urlopen("http://openlibrary.org/api/books"
+                           "?format=json&jscmd=data&bibkeys=lccn:"+lccn,
+                           timeout=3)
     except URLError:
         return {}
     openBook = loads(jsondata.read().decode('utf-8'))
@@ -108,7 +113,8 @@ def openLibrary_lccn(LCCN):
             book["publish location"] += "; " + v['name']
         book['publish location'] = book['publish location'][2:]
 
-    # for isbn, there maybe be multiple values in the query. I'm just taking the first, but the full list may be useful
+    # for isbn, there maybe be multiple values in the query. I'm just taking
+    # the first, but the full list may be useful
     # There are also ISBN's that have non-number values :(
     if "isbn_10" in openBook['identifiers']:
         book["isbn"]=openBook['identifiers']['isbn_10'][0]
index fc8d422..88a78f0 100644 (file)
@@ -8,8 +8,8 @@ class browserWindow:
     topline = 0
     entries = []
     selected = list()
-    commands = [(' /', 'search'), (' n', 'find next'), (' N', 'find previous'), 
-            ('F6', 'Sort Column'), (' q', 'quit')]
+    commands = [(' /', 'search'), (' n', 'find next'), (' N', 'find previous'),
+                ('F6', 'Sort Column'), (' q', 'quit')]
     cs = []
     # column definitions are in (label, weight, specified width) triples
     columnDefs = [('something',1,None)]
@@ -90,7 +90,8 @@ class browserWindow:
                 self.w.addstr(row+3, 1, " ")
             for k,width in self.columns:
                 if k.lower() in entry:
-                    self.w.addnstr(row+3,cursor,str(entry[k.lower()])+" "*width,width)
+                    self.w.addnstr(row+3, cursor,
+                                   str(entry[k.lower()])+" "*width, width)
                 cursor += width+1
         else:
             self.w.addstr(row+3,1," "*(self.mx-2))
@@ -492,10 +493,11 @@ class categorySelector(browserWindow):
         # first removed the deselected ones
         uncats = []
         cats = []
-        for old, new, category in zip(self.original, self.selected, self.entries):
-            if old and (not new):
+        for old, new, category in zip(self.original, self.selected,
+                                      self.entries):
+            if old and not new:
                 uncats.append(category)
-            if (not old) and new:
+            if not old and new:
                 cats.append(category)
         db.uncategorizeBook(self.book, uncats)
         # add the newly selected categories
@@ -516,12 +518,15 @@ class categorySelector(browserWindow):
 
 class columnSelector(browserWindow):
     columnDefs = [('Column',100,None)]
-    entries = [{'column': 'id'}, {'column': 'isbn'}, {'column': 'lccn'},
-            {'column': 'title'}, {'column': 'subtitle'}, {'column': 'authors'}, 
+    entries = [
+            {'column': 'id'}, {'column': 'isbn'}, {'column': 'lccn'},
+            {'column': 'title'}, {'column': 'subtitle'}, {'column': 'authors'},
             {'column': 'edition'}, {'column': 'publisher'}, 
             {'column': 'publish year'}, {'column': 'publish month'}, 
-            {'column': 'publish location'}, {'column': 'pages'}, {'column': 'pagination'}, 
-            {'column': 'weight'}, {'column': 'last updated'}]
+            {'column': 'publish location'}, {'column': 'pages'},
+            {'column': 'pagination'}, {'column': 'weight'},
+            {'column': 'last updated'},
+    ]
 
     def __init__(self,window,helpbar,height=40,width=20):
         self.selected = [False,False,False,False,False,False,False,
index e055bc9..e4860be 100644 (file)
@@ -1,6 +1,8 @@
 import sys
 import sqlite3
 
+import permissions
+
 dbFile = 'sqLibrary.db'
 bookTable = 'books'
 bookCategoryTable='book_categories'
@@ -12,7 +14,9 @@ CREATE TABLE IF NOT EXISTS books
     (id INTEGER PRIMARY KEY AUTOINCREMENT, 
      isbn, lccn, title, subtitle, authors, edition, 
      publisher, publish_year, publish_month, publish_location, 
-     pages, pagination, weight, last_updated DATETIME DEFAULT current_timestamp, deleted BOOLEAN DEFAULT 0);
+     pages, pagination, weight,
+     last_updated DATETIME DEFAULT current_timestamp,
+     deleted BOOLEAN DEFAULT 0);
 
 CREATE TABLE IF NOT EXISTS categories
     (cat_id INTEGER PRIMARY KEY, category STRING UNIQUE ON CONFLICT IGNORE);
@@ -43,7 +47,8 @@ BEGIN
     DELETE FROM book_categories WHERE cat_id = old.cat_id;
 END;
 
-CREATE TRIGGER IF NOT EXISTS insert_book_category_time AFTER INSERT ON book_categories
+CREATE TRIGGER IF NOT EXISTS insert_book_category_time AFTER INSERT
+ON book_categories
 BEGIN
     UPDATE books SET last_updated = DATETIME('NOW') WHERE id = new.id;
 END;
@@ -52,15 +57,16 @@ END;
 ################################3
 # character escaping, etc for sql queries
 #################################
-def colify(s):
+def _colify(s):
     return s.replace(" ","_").lower()
 
-def stringify(v):
+def _stringify(v):
     return '"' + str(v).strip().replace('"','""') + '"'
 
 ###################################
 # book functions
 ##################################
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def addBook(book):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
@@ -68,21 +74,28 @@ def addBook(book):
     vals = []
     for k,v in book.items():
         if v!="":
-            cols.append(colify(k))
-            vals.append(stringify(v))
+            cols.append(_colify(k))
+            vals.append(_stringify(v))
     
-    query = "INSERT INTO "+bookTable+" ("+", ".join(cols)+") VALUES ("+", ".join(vals)+");"
+    query = ("INSERT INTO "+bookTable+" ("+", ".join(cols)+") VALUES ("+
+             ", ".join(vals)+");")
     c.execute(query)
     conn.commit()
     c.close()
 
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def updateBook(book, bookID):
+    '''
+    Takes book attribute dictionary and a string representating the book ID
+    number, and returns updates the book accordingly
+    '''
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
     updates=[]
     for k,v in book.items():
-        updates.append(colify(k)+"="+stringify(v))
-    query = "UPDATE "+bookTable+" SET " +  ", ".join(updates)+" WHERE id = " +str(bookID)+";"
+        updates.append(_colify(k)+"="+_stringify(v))
+    query = ("UPDATE "+bookTable+" SET " +  ", ".join(updates)+" WHERE id = " +
+             str(bookID)+";")
     c.execute(query)
     conn.commit()
     c.close()
@@ -97,9 +110,15 @@ def getBooks():
     return books
 
 def getBooksByCategory(cat):
+    '''
+    Takes a string representating the category ID number, and returns
+    non-deleted books in that category
+    '''
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
-    query = "SELECT "+",".join(map(colify,columns))+" FROM "+bookTable+" JOIN "+bookCategoryTable+" USING (id) WHERE cat_id = :id AND deleted=0;"
+    query = ("SELECT "+",".join(map(_colify,columns))+" FROM "+bookTable+
+             " JOIN "+bookCategoryTable+
+             " USING (id) WHERE cat_id = :id AND deleted=0;")
     c.execute(query,cat)
     books = [_query_to_book(b) for b in c]
     c.close()
@@ -124,6 +143,7 @@ def getBookByID(bookid):
     return book
 
 # removes book from catalogue
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def removeBook(bookid):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
@@ -132,6 +152,7 @@ def removeBook(bookid):
     conn.commit()
     c.close()
 
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def removeBooks(books):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
@@ -142,6 +163,7 @@ def removeBooks(books):
     c.close()
 
 # restores trashed books
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def restoreBooks(books):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
@@ -151,7 +173,8 @@ def restoreBooks(books):
     conn.commit()
     c.close()
 
-# fully deletes book from removedBooks table
+# fully deletes book from books table
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def deleteBook(bookid):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
@@ -160,6 +183,7 @@ def deleteBook(bookid):
     conn.commit()
     c.close()
 
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def deleteBooks(books):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
@@ -180,7 +204,8 @@ def _query_to_book(book_query):
 def getBookCategories(book):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
-    query = "SELECT id,cat_id,category FROM "+bookCategoryTable+" JOIN "+categoryTable+" USING (cat_id) WHERE id = :id ;"
+    query = ("SELECT id,cat_id,category FROM "+bookCategoryTable+" JOIN "+
+             categoryTable+" USING (cat_id) WHERE id = :id ;")
     c.execute(query,book)
     cats = []
     for book_id,cat_id,cat_name in c:
@@ -188,16 +213,19 @@ def getBookCategories(book):
     c.close()
     return cats
 
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def categorizeBook(book, cats):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
-    query = "INSERT OR IGNORE INTO "+bookCategoryTable+" (id,cat_id) VALUES (?, ?);"
+    query = ("INSERT OR IGNORE INTO "+bookCategoryTable+
+             " (id,cat_id) VALUES (?, ?);")
     for cat in cats:
         args = (book['id'],cat['id'])
         c.execute(query,args)
     conn.commit()
     c.close()
 
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def uncategorizeBook(book, cats):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
@@ -219,14 +247,17 @@ def getCategories():
     c.close()
     return cats
 
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def addCategory(cat):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
-    query = "INSERT OR IGNORE INTO "+categoryTable+" (category) VALUES ("+stringify(cat)+");"
+    query = ("INSERT OR IGNORE INTO "+categoryTable+" (category) VALUES ("
+             +_stringify(cat)+");")
     c.execute(query)
     conn.commit()
     c.close()
 
+@permissions.check_permissions(permissions.PERMISSION_LIBCOM)
 def deleteCategories(cats):
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
@@ -239,20 +270,20 @@ def deleteCategories(cats):
 #########################################
 # Database initialization
 #########################################
-def createBooksTable():
+def _createBooksTable():
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
     c.executescript(bookTableCreation)
     conn.commit()
     c.close()
 
-def createTriggers():
+def _createTriggers():
     conn = sqlite3.connect(dbFile)
     c = conn.cursor()
     c.executescript(bookTriggerCreation)
     conn.commit()
     c.close()
 
-
-createBooksTable()
-createTriggers()
+def initializeDatabase():
+    _createBooksTable()
+    _createTriggers()
diff --git a/exceptions.py b/exceptions.py
new file mode 100644 (file)
index 0000000..a82df8a
--- /dev/null
@@ -0,0 +1,17 @@
+import abc
+
+class LibrarianException(Exception, metaclass=abc.ABCMeta):
+    @abc.abstractproperty
+    def error_msg(self):
+        pass
+
+    def __str__(self):
+        return self.error_msg
+
+class PermissionsError(LibrarianException):
+    def __init__(self, permission_string):
+        self.permission_string = permission_string
+
+    @property
+    def error_msg(self):
+        return "Need privilege level {}".format(self.permission_string)
index fabe313..880eeac 100644 (file)
@@ -28,7 +28,8 @@ class helpBar:
         r=0
         c=0
         for key,command in self.commands:
-            self.w.addnstr(r,c,key+" "+command+" "*self.colWidth,self.colWidth-1)
+            self.w.addnstr(r,c,key+" "+command+" "*self.colWidth,
+                           self.colWidth-1)
             self.w.chgat(r,c,2,curses.A_REVERSE)
             c+=self.colWidth
             if c > self.colWidth*self.numCols:
index dfc3e04..9415321 100755 (executable)
@@ -131,12 +131,14 @@ def catMenu():
     cat.clear()
 
 
-m = [("Browse Library", browseMenu),
-     ("Add Book", addForm),
-     ("Categories", catMenu),
-     ("View Trash", trashMenu),
-     ("",exit),
-     ("Exit", exit)]
-curses.wrapper(menutest, m)
+if __name__ == "__main__":
+    db.initializeDatabase()
+    m = [("Browse Library", browseMenu),
+         ("Add Book", addForm),
+         ("Categories", catMenu),
+         ("View Trash", trashMenu),
+         ("",exit),
+         ("Exit", exit)]
+    curses.wrapper(menutest, m)
 
 
diff --git a/librarian_tests/test_db_layer.py b/librarian_tests/test_db_layer.py
new file mode 100644 (file)
index 0000000..50a0302
--- /dev/null
@@ -0,0 +1,490 @@
+import contextlib
+import os
+import sqlite3
+import unittest
+
+import db_layer
+import exceptions
+import permissions
+
+class TestDBSetup(unittest.TestCase):
+    def tearDown(self):
+        try:
+            os.remove(db_layer.dbFile)
+        except FileNotFoundError:
+            pass
+
+    def test_db_setup(self):
+        conn = sqlite3.connect(db_layer.dbFile)
+
+        with self.assertRaises(sqlite3.OperationalError):
+            with contextlib.closing(conn.cursor()) as c:
+                c.execute('SELECT * FROM books;')
+
+        db_layer.initializeDatabase()
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT * FROM books;')
+
+    def test_existing_db_file(self):
+        conn = sqlite3.connect(db_layer.dbFile)
+        db_layer.initializeDatabase()
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('INSERT INTO books (isbn) VALUES (1111111111);')
+        conn.commit()
+
+        db_layer.initializeDatabase()
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT isbn FROM books;')
+            rows = list(c)
+
+        self.assertEqual([1111111111], [row[0] for row in rows])
+
+class TestDBFunctions(unittest.TestCase):
+    maxDiff = None
+
+    def assertISBNs(self, expected_isbns, book_dicts):
+        expected_isbns = frozenset(expected_isbns)
+        actual_isbns = frozenset(book.get('isbn') for book in book_dicts)
+        self.assertEqual(expected_isbns, actual_isbns)
+
+    def assertCategories(self, expected_categories, book_dicts):
+        expected_categories = frozenset(expected_categories)
+        actual_categories = frozenset(book['category'] for book in book_dicts)
+        self.assertEqual(expected_categories, actual_categories)
+
+    def setUp(self):
+        db_layer.initializeDatabase()
+        permissions._CURRENT_GROUPS_GETTER = lambda: ["office", "libcom"]
+
+        conn = sqlite3.connect(db_layer.dbFile)
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''
+INSERT INTO books (isbn, lccn, title, subtitle, authors, edition, publisher,
+                   publish_year, publish_month, publish_location, pages,
+                   pagination, weight)
+VALUES
+(1111111111111, 2222222222, 'Attack of the bad code', 'Return of the GOTO',
+ 'cdchawthorne', '1st', 'CSC Publishing', '1992', 'June', 'Waterloo, Canada',
+ '496', 'xxvi, 493 p', '1 kg'),
+(3333333333333, 4444444444, 'Star Wars VI--Return of the Jedi', 'Now in text!',
+ 'George Lucas', '2nd', 'Lucas Film', '2013', 'November', 'Somewhere, USA',
+ '8128', 'xx, 8100 p', '10 kg');
+            ''')
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''
+INSERT INTO books (isbn, lccn, deleted) VALUES
+(5555555555555, NULL, 0), (NULL, 6666666666, 0), (7777777777777, NULL, 1);
+            ''')
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''
+INSERT INTO categories (category)
+VALUES ('My special category'), ('My second special category');
+            ''')
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''SELECT rowid, isbn, lccn, last_updated FROM books;''')
+            for row in c:
+                if row[1] == 1111111111111:
+                    self.book0_id = row[0]
+                    self.book0_last_updated = row[3]
+                elif row[1] == 3333333333333:
+                    self.book1_id = row[0]
+                    self.book1_last_updated = row[3]
+                elif row[1] == 5555555555555:
+                    self.book2_id = row[0]
+                    self.book2_last_updated = row[3]
+                elif row[2] == 6666666666:
+                    self.book3_id = row[0]
+                    self.book3_last_updated = row[3]
+                elif row[1] == 7777777777777:
+                    self.book4_id = row[0]
+                    self.book4_last_updated = row[3]
+                else:
+                    self.assertTrue(False,
+                                    "Unexpected data in DB during setup")
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''SELECT rowid, category FROM categories;''')
+            for row in c:
+                if row[1] == 'My special category':
+                    self.category0_id = row[0]
+                elif row[1] == 'My second special category':
+                    self.category1_id = row[0]
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''
+INSERT INTO book_categories (id, cat_id) VALUES (?, ?), (?, ?), (?, ?), (?, ?);
+            ''',
+                      (self.book1_id, self.category0_id,
+                       self.book1_id, self.category1_id,
+                       self.book0_id, self.category1_id,
+                       self.book2_id, self.category1_id,
+                      )
+            )
+
+        conn.commit()
+
+    def tearDown(self):
+        try:
+            os.remove(db_layer.dbFile)
+        except FileNotFoundError:
+            pass
+
+    def test_getBooks(self):
+        books = db_layer.getBooks()
+
+        expected_book0 = {
+                'id': self.book0_id,
+                'isbn': 1111111111111,
+                'lccn': 2222222222,
+                'title': 'Attack of the bad code',
+                'subtitle': 'Return of the GOTO',
+                'authors': 'cdchawthorne',
+                'edition': '1st',
+                'publisher': 'CSC Publishing',
+                'publish year': '1992',
+                'publish month': 'June',
+                'publish location': 'Waterloo, Canada',
+                'pages': '496',
+                'pagination': 'xxvi, 493 p',
+                'weight': '1 kg',
+                'last updated': self.book0_last_updated,
+        }
+
+        found_book0 = False
+        for book in books:
+            if book.get('isbn') == 1111111111111:
+                found_book0 = True
+                self.assertEqual(book, expected_book0)
+
+        self.assertTrue(found_book0, "getBooks() missing book0")
+
+        expected_isbns = [1111111111111, 3333333333333, 5555555555555, None]
+        self.assertISBNs(expected_isbns, books)
+
+    def test_getBooksByCategory(self):
+        books = db_layer.getBooksByCategory(str(self.category0_id))
+        expected_isbns = [3333333333333]
+        self.assertISBNs(expected_isbns, books)
+
+    def test_getRemovedBooks(self):
+        books = db_layer.getRemovedBooks()
+        expected_isbns = [7777777777777]
+        self.assertISBNs(expected_isbns, books)
+
+    def test_addBook(self):
+        db_layer.addBook({'isbn': 8888888888888, 'title': 'New book'})
+        
+        conn = sqlite3.connect(db_layer.dbFile)
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''
+SELECT title FROM books WHERE isbn = '8888888888888';
+            ''')
+            rows = list(c)
+        
+        self.assertEqual(['New book'], [row[0] for row in rows])
+
+    def test_updateBook(self):
+        db_layer.updateBook({'title': 'Attack of the questionable code'},
+                            str(self.book0_id))
+
+        conn = sqlite3.connect(db_layer.dbFile)
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''
+SELECT title FROM books WHERE id = ?;
+            ''', (self.book0_id,))
+            rows = list(c)
+        
+        self.assertEqual(['Attack of the questionable code'],
+                         [row[0] for row in rows])
+
+    def test_getBookById(self):
+        book = db_layer.getBookByID(self.book0_id)
+        self.assertEqual('Attack of the bad code', book['title'])
+
+    def test_removeBook(self):
+        conn = sqlite3.connect(db_layer.dbFile)
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''
+SELECT deleted FROM books WHERE id = ?;
+            ''', (self.book1_id,))
+            rows = list(c)
+
+        self.assertEqual([0], [row[0] for row in rows])
+
+        db_layer.removeBook(self.book1_id)
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''
+SELECT deleted FROM books WHERE id = ?;
+            ''', (self.book1_id,))
+            rows = list(c)
+
+        self.assertEqual([1], [row[0] for row in rows])
+
+    def test_removeBooks(self):
+        conn = sqlite3.connect(db_layer.dbFile)
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''
+SELECT deleted FROM books WHERE id = ? OR id = ?;
+            ''', (self.book0_id, self.book1_id,))
+            rows = list(c)
+
+        self.assertEqual([0,0], [row[0] for row in rows])
+
+        db_layer.removeBooks([str(self.book0_id), str(self.book1_id)])
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('''
+SELECT deleted FROM books WHERE id = ? OR id = ?;
+            ''', (self.book0_id, self.book1_id))
+            rows = list(c)
+
+        self.assertEqual([1,1], [row[0] for row in rows])
+
+    def test_deleteBook(self):
+        conn = sqlite3.connect(db_layer.dbFile)
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
+                      (self.book0_id, self.book1_id))
+            count = c.fetchone()[0]
+
+        self.assertEqual(2, count)
+
+        db_layer.deleteBook(self.book0_id)
+        db_layer.deleteBook(self.book1_id)
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
+                      (self.book0_id, self.book1_id))
+            count = c.fetchone()[0]
+
+        self.assertEqual(0, count)
+
+    # Code duplication? What's that?
+    def test_deleteBooks(self):
+        conn = sqlite3.connect(db_layer.dbFile)
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
+                      (self.book0_id, self.book1_id))
+            count = c.fetchone()[0]
+
+        self.assertEqual(2, count)
+
+        db_layer.deleteBooks([str(self.book0_id), str(self.book1_id)])
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
+                      (self.book0_id, self.book1_id))
+            count = c.fetchone()[0]
+
+        self.assertEqual(0, count)
+
+    def test_getBookCategories(self):
+        categories = db_layer.getBookCategories(str(self.book1_id))
+        expected_categories = ['My special category',
+                               'My second special category']
+        self.assertCategories(expected_categories, categories)
+
+        categories = db_layer.getBookCategories(str(self.book0_id))
+        expected_categories = ['My second special category']
+        self.assertCategories(expected_categories, categories)
+
+        categories = db_layer.getBookCategories(str(self.book3_id))
+        expected_categories = []
+        self.assertCategories(expected_categories, categories)
+
+    def test_categorizeBook(self):
+        conn = sqlite3.connect(db_layer.dbFile)
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
+                      (self.book3_id,))
+            rows = list(c)
+
+        self.assertEqual([], rows)
+
+        db_layer.categorizeBook({'id': self.book3_id},
+                                [{'id': self.category0_id},
+                                 {'id': self.category1_id}])
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
+                      (self.book3_id,))
+            rows = list(c)
+
+        self.assertEqual(frozenset([self.category0_id, self.category1_id]),
+                         frozenset(row[0] for row in rows))
+
+    def test_uncategorizeBook(self):
+        conn = sqlite3.connect(db_layer.dbFile)
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
+                      (self.book1_id,))
+            rows = list(c)
+
+        self.assertEqual(frozenset([self.category0_id, self.category1_id]),
+                         frozenset(row[0] for row in rows))
+
+        db_layer.uncategorizeBook({'id': self.book1_id},
+                                  [{'id': self.category0_id},
+                                   {'id': self.category1_id}])
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
+                      (self.book3_id,))
+            rows = list(c)
+
+        self.assertEqual([], rows)
+
+    def test_getCategories(self):
+        categories = db_layer.getCategories()
+        expected_categories = ['My special category',
+                               'My second special category']
+        self.assertCategories(expected_categories, categories)
+
+    def test_addCategory(self):
+        conn = sqlite3.connect(db_layer.dbFile)
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT category FROM categories;')
+            rows = list(c)
+
+        expected_categories = ['My special category',
+                               'My second special category']
+        self.assertEqual(frozenset(expected_categories),
+                         frozenset(row[0] for row in rows))
+        
+        db_layer.addCategory('My third special category')
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT category FROM categories;')
+            rows = list(c)
+
+        expected_categories.append('My third special category')
+        self.assertEqual(frozenset(expected_categories),
+                         frozenset(row[0] for row in rows))
+
+    def test_deleteCategories(self):
+        conn = sqlite3.connect(db_layer.dbFile)
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT COUNT(*) FROM categories;')
+            rows = list(c)
+
+        self.assertEqual(2, rows[0][0])
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT COUNT(*) FROM book_categories;')
+            rows = list(c)
+
+        self.assertEqual(4, rows[0][0])
+
+        db_layer.deleteCategories([str(self.category1_id)])
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT COUNT(*) FROM categories;')
+            rows = list(c)
+
+        self.assertEqual(1, rows[0][0])
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT COUNT(*) FROM book_categories;')
+            rows = list(c)
+
+        self.assertEqual(1, rows[0][0])
+
+class TestDBFunctionPermissions(unittest.TestCase):
+    def setUp(self):
+        db_layer.initializeDatabase()
+        # Need book in database or getBookByID will error when we test it
+        conn = sqlite3.connect(db_layer.dbFile)
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('INSERT INTO books (isbn) VALUES (5555555555555);')
+
+        with contextlib.closing(conn.cursor()) as c:
+            c.execute('SELECT rowid FROM books;')
+            self.dummy_book_id = c.fetchone()[0]
+
+    def tearDown(self):
+        try:
+            os.remove(db_layer.dbFile)
+        except FileNotFoundError:
+            pass
+
+    def verify_permissions(self, fn, authorized_groups, unauthorized_groups):
+        for group in authorized_groups:
+            permissions._CURRENT_GROUPS_GETTER = lambda: [group]
+            fn()
+
+        for group in unauthorized_groups:
+            permissions._CURRENT_GROUPS_GETTER = lambda: [group]
+            with self.assertRaises(exceptions.PermissionsError):
+                fn()
+
+    def test_db_function_permissions(self):
+        function_dicts = [
+                {'fn': lambda: db_layer.addBook({'isbn': 8888888888888}),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+                {'fn': lambda: db_layer.updateBook({'isbn': 9999999999999}, 0),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+                {'fn': lambda: db_layer.getBooks(),
+                 'authorized_groups': ['libcom', 'office', 'unauthorized'],
+                 'unauthorized_groups': []},
+                {'fn': lambda: db_layer.getBooksByCategory('0'),
+                 'authorized_groups': ['libcom', 'office', 'unauthorized'],
+                 'unauthorized_groups': []},
+                {'fn': lambda: db_layer.getRemovedBooks(),
+                 'authorized_groups': ['libcom', 'office', 'unauthorized'],
+                 'unauthorized_groups': []},
+                {'fn': lambda: db_layer.getBookByID(self.dummy_book_id),
+                 'authorized_groups': ['libcom', 'office', 'unauthorized'],
+                 'unauthorized_groups': []},
+                {'fn': lambda: db_layer.removeBook(0),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+                {'fn': lambda: db_layer.removeBooks(['0']),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+                {'fn': lambda: db_layer.restoreBooks(['0']),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+                {'fn': lambda: db_layer.deleteBook(0),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+                {'fn': lambda: db_layer.deleteBooks(['0']),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+                {'fn': lambda: db_layer.getBookCategories('0'),
+                 'authorized_groups': ['libcom', 'office', 'unauthorized'],
+                 'unauthorized_groups': []},
+                {'fn': lambda: db_layer.categorizeBook({'id':'0'}, []),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+                {'fn': lambda: db_layer.uncategorizeBook({'id':'0'}, []),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+                {'fn': lambda: db_layer.getCategories(),
+                 'authorized_groups': ['libcom', 'office', 'unauthorized'],
+                 'unauthorized_groups': []},
+                {'fn': lambda: db_layer.addCategory('Cat5'),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+                {'fn': lambda: db_layer.deleteCategories([]),
+                 'authorized_groups': ['libcom'],
+                 'unauthorized_groups': ['office', 'unauthorized']},
+        ]
+
+        for function_dict in function_dicts:
+            self.verify_permissions(function_dict['fn'],
+                                    function_dict['authorized_groups'],
+                                    function_dict['unauthorized_groups'])
diff --git a/permissions.py b/permissions.py
new file mode 100644 (file)
index 0000000..232a05c
--- /dev/null
@@ -0,0 +1,34 @@
+import grp
+import os
+
+import exceptions
+
+class _PermissionLevel:
+    def __init__(self, group_name, pretty_name):
+        self.group_name = group_name
+        self.pretty_name = pretty_name
+
+PERMISSION_OFFICE = _PermissionLevel("office", "Office worker")
+PERMISSION_LIBCOM = _PermissionLevel("libcom", "Library Committee")
+
+def check_permissions(permission_level):
+    def decorator(fn):
+        def wrapped_function(*args, **kwargs):
+            if not has_permission(permission_level):
+                raise exceptions.PermissionsError(permission_level.pretty_name)
+            return fn(*args, **kwargs)
+
+        return wrapped_function
+
+    return decorator
+
+def has_permission(permission_level):
+    return permission_level.group_name in _CURRENT_GROUPS_GETTER()
+
+def _current_group_names():
+    group_ids = os.getgroups()
+    group_names = [grp.getgrgid(group_id).gr_name for group_id in group_ids]
+    return group_names
+
+# Hack to allow dependency injection for testing
+_CURRENT_GROUPS_GETTER = _current_group_names
diff --git a/run_tests.sh b/run_tests.sh
new file mode 100755 (executable)
index 0000000..82c42cc
--- /dev/null
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+
+librarian_dir="$(realpath "$(dirname "$0")")"
+librarian_tests_dir="${librarian_dir}/librarian_tests"
+
+export PYTHONPATH="${librarian_dir}"
+cd "${librarian_tests_dir}"
+
+python -m unittest "$@" test_*.py