checkout success/failure for integrity error, and database move
[library/.git] / librarian_tests / test_db_layer.py
1 import contextlib
2 import os
3 import sqlite3
4 import unittest
5
6 import db_layer
7 import exceptions
8 import permissions
9
10 class TestDBSetup(unittest.TestCase):
11     def tearDown(self):
12         try:
13             os.remove(db_layer.dbFile)
14         except FileNotFoundError:
15             pass
16
17     def test_db_setup(self):
18         conn = sqlite3.connect(db_layer.dbFile)
19
20         with self.assertRaises(sqlite3.OperationalError):
21             with contextlib.closing(conn.cursor()) as c:
22                 c.execute('SELECT * FROM books;')
23
24         db_layer.initializeDatabase()
25
26         with contextlib.closing(conn.cursor()) as c:
27             c.execute('SELECT * FROM books;')
28
29     def test_existing_db_file(self):
30         conn = sqlite3.connect(db_layer.dbFile)
31         db_layer.initializeDatabase()
32         with contextlib.closing(conn.cursor()) as c:
33             c.execute('INSERT INTO books (isbn) VALUES (1111111111);')
34         conn.commit()
35
36         db_layer.initializeDatabase()
37         with contextlib.closing(conn.cursor()) as c:
38             c.execute('SELECT isbn FROM books;')
39             rows = list(c)
40
41         self.assertEqual([1111111111], [row[0] for row in rows])
42
43 class TestDBFunctions(unittest.TestCase):
44     maxDiff = None
45
46     def assertISBNs(self, expected_isbns, book_dicts):
47         expected_isbns = frozenset(expected_isbns)
48         actual_isbns = frozenset(book.get('isbn') for book in book_dicts)
49         self.assertEqual(expected_isbns, actual_isbns)
50
51     def assertCategories(self, expected_categories, book_dicts):
52         expected_categories = frozenset(expected_categories)
53         actual_categories = frozenset(book['category'] for book in book_dicts)
54         self.assertEqual(expected_categories, actual_categories)
55
56     def setUp(self):
57         db_layer.initializeDatabase()
58         permissions._CURRENT_GROUPS_GETTER = lambda: ["office", "libcom"]
59
60         conn = sqlite3.connect(db_layer.dbFile)
61
62         with contextlib.closing(conn.cursor()) as c:
63             c.execute('''
64 INSERT INTO books (isbn, lccn, title, subtitle, authors, edition, publisher,
65                    publish_year, publish_month, publish_location, pages,
66                    pagination, weight)
67 VALUES
68 (1111111111111, 2222222222, 'Attack of the bad code', 'Return of the GOTO',
69  'cdchawthorne', '1st', 'CSC Publishing', '1992', 'June', 'Waterloo, Canada',
70  '496', 'xxvi, 493 p', '1 kg'),
71 (3333333333333, 4444444444, 'Star Wars VI--Return of the Jedi', 'Now in text!',
72  'George Lucas', '2nd', 'Lucas Film', '2013', 'November', 'Somewhere, USA',
73  '8128', 'xx, 8100 p', '10 kg');
74             ''')
75
76         with contextlib.closing(conn.cursor()) as c:
77             c.execute('''
78 INSERT INTO books (isbn, lccn, deleted) VALUES
79 (5555555555555, NULL, 0), (NULL, 6666666666, 0), (7777777777777, NULL, 1);
80             ''')
81
82         with contextlib.closing(conn.cursor()) as c:
83             c.execute('''
84 INSERT INTO categories (category)
85 VALUES ('My special category'), ('My second special category');
86             ''')
87
88         with contextlib.closing(conn.cursor()) as c:
89             c.execute('''SELECT rowid, isbn, lccn, last_updated FROM books;''')
90             for row in c:
91                 if row[1] == 1111111111111:
92                     self.book0_id = row[0]
93                     self.book0_last_updated = row[3]
94                 elif row[1] == 3333333333333:
95                     self.book1_id = row[0]
96                     self.book1_last_updated = row[3]
97                 elif row[1] == 5555555555555:
98                     self.book2_id = row[0]
99                     self.book2_last_updated = row[3]
100                 elif row[2] == 6666666666:
101                     self.book3_id = row[0]
102                     self.book3_last_updated = row[3]
103                 elif row[1] == 7777777777777:
104                     self.book4_id = row[0]
105                     self.book4_last_updated = row[3]
106                 else:
107                     self.assertTrue(False,
108                                     "Unexpected data in DB during setup")
109
110         with contextlib.closing(conn.cursor()) as c:
111             c.execute('''SELECT rowid, category FROM categories;''')
112             for row in c:
113                 if row[1] == 'My special category':
114                     self.category0_id = row[0]
115                 elif row[1] == 'My second special category':
116                     self.category1_id = row[0]
117
118         with contextlib.closing(conn.cursor()) as c:
119             c.execute('''
120 INSERT INTO book_categories (id, cat_id) VALUES (?, ?), (?, ?), (?, ?), (?, ?);
121             ''',
122                       (self.book1_id, self.category0_id,
123                        self.book1_id, self.category1_id,
124                        self.book0_id, self.category1_id,
125                        self.book2_id, self.category1_id,
126                       )
127             )
128
129         conn.commit()
130
131     def tearDown(self):
132         try:
133             os.remove(db_layer.dbFile)
134         except FileNotFoundError:
135             pass
136
137     def test_getBooks(self):
138         books = db_layer.getBooks()
139
140         expected_book0 = {
141                 'id': self.book0_id,
142                 'isbn': 1111111111111,
143                 'lccn': 2222222222,
144                 'title': 'Attack of the bad code',
145                 'subtitle': 'Return of the GOTO',
146                 'authors': 'cdchawthorne',
147                 'edition': '1st',
148                 'publisher': 'CSC Publishing',
149                 'publish year': '1992',
150                 'publish month': 'June',
151                 'publish location': 'Waterloo, Canada',
152                 'pages': '496',
153                 'pagination': 'xxvi, 493 p',
154                 'weight': '1 kg',
155                 'last updated': self.book0_last_updated,
156         }
157
158         found_book0 = False
159         for book in books:
160             if book.get('isbn') == 1111111111111:
161                 found_book0 = True
162                 self.assertEqual(book, expected_book0)
163
164         self.assertTrue(found_book0, "getBooks() missing book0")
165
166         expected_isbns = [1111111111111, 3333333333333, 5555555555555, None]
167         self.assertISBNs(expected_isbns, books)
168
169     def test_getBooksByCategory(self):
170         books = db_layer.getBooksByCategory(str(self.category0_id))
171         expected_isbns = [3333333333333]
172         self.assertISBNs(expected_isbns, books)
173
174     def test_getRemovedBooks(self):
175         books = db_layer.getRemovedBooks()
176         expected_isbns = [7777777777777]
177         self.assertISBNs(expected_isbns, books)
178
179     def test_addBook(self):
180         db_layer.addBook({'isbn': 8888888888888, 'title': 'New book'})
181         
182         conn = sqlite3.connect(db_layer.dbFile)
183
184         with contextlib.closing(conn.cursor()) as c:
185             c.execute('''
186 SELECT title FROM books WHERE isbn = '8888888888888';
187             ''')
188             rows = list(c)
189         
190         self.assertEqual(['New book'], [row[0] for row in rows])
191
192     def test_updateBook(self):
193         db_layer.updateBook({'title': 'Attack of the questionable code'},
194                             str(self.book0_id))
195
196         conn = sqlite3.connect(db_layer.dbFile)
197
198         with contextlib.closing(conn.cursor()) as c:
199             c.execute('''
200 SELECT title FROM books WHERE id = ?;
201             ''', (self.book0_id,))
202             rows = list(c)
203         
204         self.assertEqual(['Attack of the questionable code'],
205                          [row[0] for row in rows])
206
207     def test_getBookById(self):
208         book = db_layer.getBookByID(self.book0_id)
209         self.assertEqual('Attack of the bad code', book['title'])
210
211     def test_removeBook(self):
212         conn = sqlite3.connect(db_layer.dbFile)
213         with contextlib.closing(conn.cursor()) as c:
214             c.execute('''
215 SELECT deleted FROM books WHERE id = ?;
216             ''', (self.book1_id,))
217             rows = list(c)
218
219         self.assertEqual([0], [row[0] for row in rows])
220
221         db_layer.removeBook(self.book1_id)
222
223         with contextlib.closing(conn.cursor()) as c:
224             c.execute('''
225 SELECT deleted FROM books WHERE id = ?;
226             ''', (self.book1_id,))
227             rows = list(c)
228
229         self.assertEqual([1], [row[0] for row in rows])
230
231     def test_removeBooks(self):
232         conn = sqlite3.connect(db_layer.dbFile)
233         with contextlib.closing(conn.cursor()) as c:
234             c.execute('''
235 SELECT deleted FROM books WHERE id = ? OR id = ?;
236             ''', (self.book0_id, self.book1_id,))
237             rows = list(c)
238
239         self.assertEqual([0,0], [row[0] for row in rows])
240
241         db_layer.removeBooks([str(self.book0_id), str(self.book1_id)])
242
243         with contextlib.closing(conn.cursor()) as c:
244             c.execute('''
245 SELECT deleted FROM books WHERE id = ? OR id = ?;
246             ''', (self.book0_id, self.book1_id))
247             rows = list(c)
248
249         self.assertEqual([1,1], [row[0] for row in rows])
250
251     def test_deleteBook(self):
252         conn = sqlite3.connect(db_layer.dbFile)
253
254         with contextlib.closing(conn.cursor()) as c:
255             c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
256                       (self.book0_id, self.book1_id))
257             count = c.fetchone()[0]
258
259         self.assertEqual(2, count)
260
261         db_layer.deleteBook(self.book0_id)
262         db_layer.deleteBook(self.book1_id)
263
264         with contextlib.closing(conn.cursor()) as c:
265             c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
266                       (self.book0_id, self.book1_id))
267             count = c.fetchone()[0]
268
269         self.assertEqual(0, count)
270
271     # Code duplication? What's that?
272     def test_deleteBooks(self):
273         conn = sqlite3.connect(db_layer.dbFile)
274
275         with contextlib.closing(conn.cursor()) as c:
276             c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
277                       (self.book0_id, self.book1_id))
278             count = c.fetchone()[0]
279
280         self.assertEqual(2, count)
281
282         db_layer.deleteBooks([str(self.book0_id), str(self.book1_id)])
283
284         with contextlib.closing(conn.cursor()) as c:
285             c.execute('SELECT COUNT(*) FROM books WHERE id = ? OR id = ?;',
286                       (self.book0_id, self.book1_id))
287             count = c.fetchone()[0]
288
289         self.assertEqual(0, count)
290
291     def test_getBookCategories(self):
292         categories = db_layer.getBookCategories(str(self.book1_id))
293         expected_categories = ['My special category',
294                                'My second special category']
295         self.assertCategories(expected_categories, categories)
296
297         categories = db_layer.getBookCategories(str(self.book0_id))
298         expected_categories = ['My second special category']
299         self.assertCategories(expected_categories, categories)
300
301         categories = db_layer.getBookCategories(str(self.book3_id))
302         expected_categories = []
303         self.assertCategories(expected_categories, categories)
304
305     def test_categorizeBook(self):
306         conn = sqlite3.connect(db_layer.dbFile)
307         with contextlib.closing(conn.cursor()) as c:
308             c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
309                       (self.book3_id,))
310             rows = list(c)
311
312         self.assertEqual([], rows)
313
314         db_layer.categorizeBook({'id': self.book3_id},
315                                 [{'id': self.category0_id},
316                                  {'id': self.category1_id}])
317
318         with contextlib.closing(conn.cursor()) as c:
319             c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
320                       (self.book3_id,))
321             rows = list(c)
322
323         self.assertEqual(frozenset([self.category0_id, self.category1_id]),
324                          frozenset(row[0] for row in rows))
325
326     def test_uncategorizeBook(self):
327         conn = sqlite3.connect(db_layer.dbFile)
328         with contextlib.closing(conn.cursor()) as c:
329             c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
330                       (self.book1_id,))
331             rows = list(c)
332
333         self.assertEqual(frozenset([self.category0_id, self.category1_id]),
334                          frozenset(row[0] for row in rows))
335
336         db_layer.uncategorizeBook({'id': self.book1_id},
337                                   [{'id': self.category0_id},
338                                    {'id': self.category1_id}])
339
340         with contextlib.closing(conn.cursor()) as c:
341             c.execute('SELECT cat_id FROM book_categories WHERE id = ?;',
342                       (self.book3_id,))
343             rows = list(c)
344
345         self.assertEqual([], rows)
346
347     def test_getCategories(self):
348         categories = db_layer.getCategories()
349         expected_categories = ['My special category',
350                                'My second special category']
351         self.assertCategories(expected_categories, categories)
352
353     def test_addCategory(self):
354         conn = sqlite3.connect(db_layer.dbFile)
355         with contextlib.closing(conn.cursor()) as c:
356             c.execute('SELECT category FROM categories;')
357             rows = list(c)
358
359         expected_categories = ['My special category',
360                                'My second special category']
361         self.assertEqual(frozenset(expected_categories),
362                          frozenset(row[0] for row in rows))
363         
364         db_layer.addCategory('My third special category')
365
366         with contextlib.closing(conn.cursor()) as c:
367             c.execute('SELECT category FROM categories;')
368             rows = list(c)
369
370         expected_categories.append('My third special category')
371         self.assertEqual(frozenset(expected_categories),
372                          frozenset(row[0] for row in rows))
373
374     def test_deleteCategories(self):
375         conn = sqlite3.connect(db_layer.dbFile)
376
377         with contextlib.closing(conn.cursor()) as c:
378             c.execute('SELECT COUNT(*) FROM categories;')
379             rows = list(c)
380
381         self.assertEqual(2, rows[0][0])
382
383         with contextlib.closing(conn.cursor()) as c:
384             c.execute('SELECT COUNT(*) FROM book_categories;')
385             rows = list(c)
386
387         self.assertEqual(4, rows[0][0])
388
389         db_layer.deleteCategories([str(self.category1_id)])
390
391         with contextlib.closing(conn.cursor()) as c:
392             c.execute('SELECT COUNT(*) FROM categories;')
393             rows = list(c)
394
395         self.assertEqual(1, rows[0][0])
396
397         with contextlib.closing(conn.cursor()) as c:
398             c.execute('SELECT COUNT(*) FROM book_categories;')
399             rows = list(c)
400
401         self.assertEqual(1, rows[0][0])
402
403 class TestDBFunctionPermissions(unittest.TestCase):
404     def setUp(self):
405         db_layer.initializeDatabase()
406         # Need book in database or getBookByID will error when we test it
407         conn = sqlite3.connect(db_layer.dbFile)
408
409         with contextlib.closing(conn.cursor()) as c:
410             c.execute('INSERT INTO books (isbn) VALUES (5555555555555);')
411
412         with contextlib.closing(conn.cursor()) as c:
413             c.execute('SELECT rowid FROM books;')
414             self.dummy_book_id = c.fetchone()[0]
415
416     def tearDown(self):
417         try:
418             os.remove(db_layer.dbFile)
419         except FileNotFoundError:
420             pass
421
422     def verify_permissions(self, fn, authorized_groups, unauthorized_groups):
423         for group in authorized_groups:
424             permissions._CURRENT_GROUPS_GETTER = lambda: [group]
425             fn()
426
427         for group in unauthorized_groups:
428             permissions._CURRENT_GROUPS_GETTER = lambda: [group]
429             with self.assertRaises(exceptions.PermissionsError):
430                 fn()
431
432     def test_db_function_permissions(self):
433         function_dicts = [
434                 {'fn': lambda: db_layer.addBook({'isbn': 8888888888888}),
435                  'authorized_groups': ['libcom'],
436                  'unauthorized_groups': ['office', 'unauthorized']},
437                 {'fn': lambda: db_layer.updateBook({'isbn': 9999999999999}, 0),
438                  'authorized_groups': ['libcom'],
439                  'unauthorized_groups': ['office', 'unauthorized']},
440                 {'fn': lambda: db_layer.getBooks(),
441                  'authorized_groups': ['libcom', 'office', 'unauthorized'],
442                  'unauthorized_groups': []},
443                 {'fn': lambda: db_layer.getBooksByCategory('0'),
444                  'authorized_groups': ['libcom', 'office', 'unauthorized'],
445                  'unauthorized_groups': []},
446                 {'fn': lambda: db_layer.getRemovedBooks(),
447                  'authorized_groups': ['libcom', 'office', 'unauthorized'],
448                  'unauthorized_groups': []},
449                 {'fn': lambda: db_layer.getBookByID(self.dummy_book_id),
450                  'authorized_groups': ['libcom', 'office', 'unauthorized'],
451                  'unauthorized_groups': []},
452                 {'fn': lambda: db_layer.removeBook(0),
453                  'authorized_groups': ['libcom'],
454                  'unauthorized_groups': ['office', 'unauthorized']},
455                 {'fn': lambda: db_layer.removeBooks(['0']),
456                  'authorized_groups': ['libcom'],
457                  'unauthorized_groups': ['office', 'unauthorized']},
458                 {'fn': lambda: db_layer.restoreBooks(['0']),
459                  'authorized_groups': ['libcom'],
460                  'unauthorized_groups': ['office', 'unauthorized']},
461                 {'fn': lambda: db_layer.deleteBook(0),
462                  'authorized_groups': ['libcom'],
463                  'unauthorized_groups': ['office', 'unauthorized']},
464                 {'fn': lambda: db_layer.deleteBooks(['0']),
465                  'authorized_groups': ['libcom'],
466                  'unauthorized_groups': ['office', 'unauthorized']},
467                 {'fn': lambda: db_layer.getBookCategories('0'),
468                  'authorized_groups': ['libcom', 'office', 'unauthorized'],
469                  'unauthorized_groups': []},
470                 {'fn': lambda: db_layer.categorizeBook({'id':'0'}, []),
471                  'authorized_groups': ['libcom'],
472                  'unauthorized_groups': ['office', 'unauthorized']},
473                 {'fn': lambda: db_layer.uncategorizeBook({'id':'0'}, []),
474                  'authorized_groups': ['libcom'],
475                  'unauthorized_groups': ['office', 'unauthorized']},
476                 {'fn': lambda: db_layer.getCategories(),
477                  'authorized_groups': ['libcom', 'office', 'unauthorized'],
478                  'unauthorized_groups': []},
479                 {'fn': lambda: db_layer.addCategory('Cat5'),
480                  'authorized_groups': ['libcom'],
481                  'unauthorized_groups': ['office', 'unauthorized']},
482                 {'fn': lambda: db_layer.deleteCategories([]),
483                  'authorized_groups': ['libcom'],
484                  'unauthorized_groups': ['office', 'unauthorized']},
485         ]
486
487         for function_dict in function_dicts:
488             self.verify_permissions(function_dict['fn'],
489                                     function_dict['authorized_groups'],
490                                     function_dict['unauthorized_groups'])