Update from db 2.7.5.
[kopensolaris-gnu/glibc.git] / db2 / btree / bt_recno.c
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1997, 1998
5  *      Sleepycat Software.  All rights reserved.
6  */
7
8 #include "config.h"
9
10 #ifndef lint
11 static const char sccsid[] = "@(#)bt_recno.c    10.53 (Sleepycat) 12/11/98";
12 #endif /* not lint */
13
14 #ifndef NO_SYSTEM_INCLUDES
15 #include <sys/types.h>
16
17 #include <errno.h>
18 #include <limits.h>
19 #include <string.h>
20 #endif
21
22 #include "db_int.h"
23 #include "db_page.h"
24 #include "btree.h"
25 #include "db_ext.h"
26 #include "shqueue.h"
27 #include "db_shash.h"
28 #include "lock.h"
29 #include "lock_ext.h"
30
31 static int __ram_add __P((DBC *, db_recno_t *, DBT *, u_int32_t, u_int32_t));
32 static int __ram_delete __P((DB *, DB_TXN *, DBT *, u_int32_t));
33 static int __ram_fmap __P((DBC *, db_recno_t));
34 static int __ram_i_delete __P((DBC *));
35 static int __ram_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
36 static int __ram_source __P((DB *, RECNO *, const char *));
37 static int __ram_sync __P((DB *, u_int32_t));
38 static int __ram_update __P((DBC *, db_recno_t, int));
39 static int __ram_vmap __P((DBC *, db_recno_t));
40 static int __ram_writeback __P((DBC *));
41
42 /*
43  * In recno, there are two meanings to the on-page "deleted" flag.  If we're
44  * re-numbering records, it means the record was implicitly created.  We skip
45  * over implicitly created records if doing a cursor "next" or "prev", and
46  * return DB_KEYEMPTY if they're explicitly requested..  If not re-numbering
47  * records, it means that the record was implicitly created, or was deleted.
48  * We skip over implicitly created or deleted records if doing a cursor "next"
49  * or "prev", and return DB_KEYEMPTY if they're explicitly requested.
50  *
51  * If we're re-numbering records, then we have to detect in the cursor that
52  * a record was deleted, and adjust the cursor as necessary on the next get.
53  * If we're not re-numbering records, then we can detect that a record has
54  * been deleted by looking at the actual on-page record, so we completely
55  * ignore the cursor's delete flag.  This is different from the B+tree code.
56  * It also maintains whether the cursor references a deleted record in the
57  * cursor, and it doesn't always check the on-page value.
58  */
59 #define CD_SET(dbp, cp) {                                               \
60         if (F_ISSET(dbp, DB_RE_RENUMBER))                               \
61                 F_SET(cp, C_DELETED);                                   \
62 }
63 #define CD_CLR(dbp, cp) {                                               \
64         if (F_ISSET(dbp, DB_RE_RENUMBER))                               \
65                 F_CLR(cp, C_DELETED);                                   \
66 }
67 #define CD_ISSET(dbp, cp)                                               \
68         (F_ISSET(dbp, DB_RE_RENUMBER) && F_ISSET(cp, C_DELETED))
69
70 /*
71  * __ram_open --
72  *      Recno open function.
73  *
74  * PUBLIC: int __ram_open __P((DB *, DB_INFO *));
75  */
76 int
77 __ram_open(dbp, dbinfo)
78         DB *dbp;
79         DB_INFO *dbinfo;
80 {
81         BTREE *t;
82         DBC *dbc;
83         RECNO *rp;
84         int ret, t_ret;
85
86         /* Allocate and initialize the private btree structure. */
87         if ((ret = __os_calloc(1, sizeof(BTREE), &t)) != 0)
88                 return (ret);
89         dbp->internal = t;
90         __bam_setovflsize(dbp);
91
92         /* Allocate and initialize the private recno structure. */
93         if ((ret = __os_calloc(1, sizeof(*rp), &rp)) != 0)
94                 return (ret);
95         /* Link in the private recno structure. */
96         t->recno = rp;
97
98         /*
99          * Intention is to make sure all of the user's selections are okay
100          * here and then use them without checking.
101          */
102         if (dbinfo == NULL) {
103                 rp->re_delim = '\n';
104                 rp->re_pad = ' ';
105                 rp->re_fd = -1;
106                 F_SET(rp, RECNO_EOF);
107         } else {
108                 /*
109                  * If the user specified a source tree, open it and map it in.
110                  *
111                  * !!!
112                  * We don't complain if the user specified transactions or
113                  * threads.  It's possible to make it work, but you'd better
114                  * know what you're doing!
115                  */
116                 if (dbinfo->re_source == NULL) {
117                         rp->re_fd = -1;
118                         F_SET(rp, RECNO_EOF);
119                 } else {
120                         if ((ret =
121                             __ram_source(dbp, rp, dbinfo->re_source)) != 0)
122                         goto err;
123                 }
124
125                 /* Copy delimiter, length and padding values. */
126                 rp->re_delim =
127                     F_ISSET(dbp, DB_RE_DELIMITER) ? dbinfo->re_delim : '\n';
128                 rp->re_pad = F_ISSET(dbp, DB_RE_PAD) ? dbinfo->re_pad : ' ';
129
130                 if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
131                         if ((rp->re_len = dbinfo->re_len) == 0) {
132                                 __db_err(dbp->dbenv,
133                                     "record length must be greater than 0");
134                                 ret = EINVAL;
135                                 goto err;
136                         }
137                 } else
138                         rp->re_len = 0;
139         }
140
141         /* Initialize the remaining fields/methods of the DB. */
142         dbp->am_close = __ram_close;
143         dbp->del = __ram_delete;
144         dbp->put = __ram_put;
145         dbp->stat = __bam_stat;
146         dbp->sync = __ram_sync;
147
148         /* Start up the tree. */
149         if ((ret = __bam_read_root(dbp)) != 0)
150                 goto err;
151
152         /* Set the overflow page size. */
153         __bam_setovflsize(dbp);
154
155         /* If we're snapshotting an underlying source file, do it now. */
156         if (dbinfo != NULL && F_ISSET(dbinfo, DB_SNAPSHOT)) {
157                 /* Allocate a cursor. */
158                 if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
159                         goto err;
160
161                 /* Do the snapshot. */
162                 if ((ret = __ram_update(dbc,
163                     DB_MAX_RECORDS, 0)) != 0 && ret == DB_NOTFOUND)
164                         ret = 0;
165
166                 /* Discard the cursor. */
167                 if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
168                         ret = t_ret;
169
170                 if (ret != 0)
171                         goto err;
172         }
173
174         return (0);
175
176 err:    /* If we mmap'd a source file, discard it. */
177         if (rp->re_smap != NULL)
178                 (void)__db_unmapfile(rp->re_smap, rp->re_msize);
179
180         /* If we opened a source file, discard it. */
181         if (rp->re_fd != -1)
182                 (void)__os_close(rp->re_fd);
183         if (rp->re_source != NULL)
184                 __os_freestr(rp->re_source);
185
186         __os_free(rp, sizeof(*rp));
187
188         return (ret);
189 }
190
191 /*
192  * __ram_delete --
193  *      Recno db->del function.
194  */
195 static int
196 __ram_delete(dbp, txn, key, flags)
197         DB *dbp;
198         DB_TXN *txn;
199         DBT *key;
200         u_int32_t flags;
201 {
202         CURSOR *cp;
203         DBC *dbc;
204         db_recno_t recno;
205         int ret, t_ret;
206
207         DB_PANIC_CHECK(dbp);
208
209         /* Check for invalid flags. */
210         if ((ret = __db_delchk(dbp,
211             key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
212                 return (ret);
213
214         /* Acquire a cursor. */
215         if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
216                 return (ret);
217
218         DEBUG_LWRITE(dbc, txn, "ram_delete", key, NULL, flags);
219
220         /* Check the user's record number and fill in as necessary. */
221         if ((ret = __ram_getno(dbc, key, &recno, 0)) != 0)
222                 goto err;
223
224         /* Do the delete. */
225         cp = dbc->internal;
226         cp->recno = recno;
227         ret = __ram_i_delete(dbc);
228
229         /* Release the cursor. */
230 err:    if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
231                 ret = t_ret;
232
233         return (ret);
234 }
235
236 /*
237  * __ram_i_delete --
238  *      Internal version of recno delete, called by __ram_delete and
239  *      __ram_c_del.
240  */
241 static int
242 __ram_i_delete(dbc)
243         DBC *dbc;
244 {
245         BKEYDATA bk;
246         BTREE *t;
247         CURSOR *cp;
248         DB *dbp;
249         DBT hdr, data;
250         PAGE *h;
251         db_indx_t indx;
252         int exact, ret, stack;
253
254         dbp = dbc->dbp;
255         cp = dbc->internal;
256         t = dbp->internal;
257         stack = 0;
258
259         /*
260          * If this is CDB and this isn't a write cursor, then it's an error.
261          * If it is a write cursor, but we don't yet hold the write lock, then
262          * we need to upgrade to the write lock.
263          */
264         if (F_ISSET(dbp, DB_AM_CDB)) {
265                 /* Make sure it's a valid update cursor. */
266                 if (!F_ISSET(dbc, DBC_RMW | DBC_WRITER))
267                         return (EINVAL);
268
269                 if (F_ISSET(dbc, DBC_RMW) &&
270                     (ret = lock_get(dbp->dbenv->lk_info, dbc->locker,
271                     DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
272                     &dbc->mylock)) != 0)
273                         return (EAGAIN);
274         }
275
276         /* Search the tree for the key; delete only deletes exact matches. */
277         if ((ret = __bam_rsearch(dbc, &cp->recno, S_DELETE, 1, &exact)) != 0)
278                 goto err;
279         if (!exact) {
280                 ret = DB_NOTFOUND;
281                 goto err;
282         }
283         stack = 1;
284
285         h = cp->csp->page;
286         indx = cp->csp->indx;
287
288         /*
289          * If re-numbering records, the on-page deleted flag can only mean
290          * that this record was implicitly created.  Applications aren't
291          * permitted to delete records they never created, return an error.
292          *
293          * If not re-numbering records, the on-page deleted flag means that
294          * this record was implicitly created, or, was deleted at some time.
295          * The former is an error because applications aren't permitted to
296          * delete records they never created, the latter is an error because
297          * if the record was "deleted", we could never have found it.
298          */
299         if (B_DISSET(GET_BKEYDATA(h, indx)->type)) {
300                 ret = DB_KEYEMPTY;
301                 goto err;
302         }
303
304         if (F_ISSET(dbp, DB_RE_RENUMBER)) {
305                 /* Delete the item, adjust the counts, adjust the cursors. */
306                 if ((ret = __bam_ditem(dbc, h, indx)) != 0)
307                         goto err;
308                 __bam_adjust(dbc, -1);
309                 __ram_ca(dbp, cp->recno, CA_DELETE);
310
311                 /*
312                  * If the page is empty, delete it.   The whole tree is locked
313                  * so there are no preparations to make.
314                  */
315                 if (NUM_ENT(h) == 0 && h->pgno != PGNO_ROOT) {
316                         stack = 0;
317                         ret = __bam_dpages(dbc);
318                 }
319         } else {
320                 /* Use a delete/put pair to replace the record with a marker. */
321                 if ((ret = __bam_ditem(dbc, h, indx)) != 0)
322                         goto err;
323
324                 B_TSET(bk.type, B_KEYDATA, 1);
325                 bk.len = 0;
326                 memset(&hdr, 0, sizeof(hdr));
327                 hdr.data = &bk;
328                 hdr.size = SSZA(BKEYDATA, data);
329                 memset(&data, 0, sizeof(data));
330                 data.data = (char *)"";
331                 data.size = 0;
332                 if ((ret = __db_pitem(dbc,
333                     h, indx, BKEYDATA_SIZE(0), &hdr, &data)) != 0)
334                         goto err;
335         }
336         F_SET(t->recno, RECNO_MODIFIED);
337
338 err:    if (stack)
339                 __bam_stkrel(dbc, 0);
340
341         /* If we upgraded the CDB lock upon entry; downgrade it now. */
342         if (F_ISSET(dbp, DB_AM_CDB) && F_ISSET(dbc, DBC_RMW))
343                 (void)__lock_downgrade(dbp->dbenv->lk_info, dbc->mylock,
344                     DB_LOCK_IWRITE, 0);
345         return (ret);
346 }
347
348 /*
349  * __ram_put --
350  *      Recno db->put function.
351  */
352 static int
353 __ram_put(dbp, txn, key, data, flags)
354         DB *dbp;
355         DB_TXN *txn;
356         DBT *key, *data;
357         u_int32_t flags;
358 {
359         DBC *dbc;
360         db_recno_t recno;
361         int ret, t_ret;
362
363         DB_PANIC_CHECK(dbp);
364
365         /* Check for invalid flags. */
366         if ((ret = __db_putchk(dbp,
367             key, data, flags, F_ISSET(dbp, DB_AM_RDONLY), 0)) != 0)
368                 return (ret);
369
370         /* Allocate a cursor. */
371         if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
372                 return (ret);
373
374         DEBUG_LWRITE(dbc, txn, "ram_put", key, data, flags);
375
376         /*
377          * If we're appending to the tree, make sure we've read in all of
378          * the backing source file.  Otherwise, check the user's record
379          * number and fill in as necessary.
380          */
381         ret = flags == DB_APPEND ?
382             __ram_update(dbc, DB_MAX_RECORDS, 0) :
383             __ram_getno(dbc, key, &recno, 1);
384
385         /* Add the record. */
386         if (ret == 0)
387                 ret = __ram_add(dbc, &recno, data, flags, 0);
388
389         /* Discard the cursor. */
390         if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
391                 ret = t_ret;
392
393         /* Return the record number if we're appending to the tree. */
394         if (ret == 0 && flags == DB_APPEND)
395                 *(db_recno_t *)key->data = recno;
396
397         return (ret);
398 }
399
400 /*
401  * __ram_sync --
402  *      Recno db->sync function.
403  */
404 static int
405 __ram_sync(dbp, flags)
406         DB *dbp;
407         u_int32_t flags;
408 {
409         DBC *dbc;
410         int ret, t_ret;
411
412         /*
413          * Sync the underlying btree.
414          *
415          * !!!
416          * We don't need to do a panic check or flags check, the "real"
417          * sync function does all that for us.
418          */
419         if ((ret = __db_sync(dbp, flags)) != 0)
420                 return (ret);
421
422         /* Allocate a cursor. */
423         if ((ret = dbp->cursor(dbp, NULL, &dbc, 0)) != 0)
424                 return (ret);
425
426         DEBUG_LWRITE(dbc, NULL, "ram_sync", NULL, NULL, flags);
427
428         /* Copy back the backing source file. */
429         ret = __ram_writeback(dbc);
430
431         /* Discard the cursor. */
432         if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
433                 ret = t_ret;
434
435         return (ret);
436 }
437
438 /*
439  * __ram_close --
440  *      Recno db->close function.
441  *
442  * PUBLIC: int __ram_close __P((DB *));
443  */
444 int
445 __ram_close(dbp)
446         DB *dbp;
447 {
448         RECNO *rp;
449
450         rp = ((BTREE *)dbp->internal)->recno;
451
452         /* Close any underlying mmap region. */
453         if (rp->re_smap != NULL)
454                 (void)__db_unmapfile(rp->re_smap, rp->re_msize);
455
456         /* Close any backing source file descriptor. */
457         if (rp->re_fd != -1)
458                 (void)__os_close(rp->re_fd);
459
460         /* Free any backing source file name. */
461         if (rp->re_source != NULL)
462                 __os_freestr(rp->re_source);
463
464         /* Free allocated memory. */
465         __os_free(rp, sizeof(RECNO));
466         ((BTREE *)dbp->internal)->recno = NULL;
467
468         /* Close the underlying btree. */
469         return (__bam_close(dbp));
470 }
471
472 /*
473  * __ram_c_del --
474  *      Recno cursor->c_del function.
475  *
476  * PUBLIC: int __ram_c_del __P((DBC *, u_int32_t));
477  */
478 int
479 __ram_c_del(dbc, flags)
480         DBC *dbc;
481         u_int32_t flags;
482 {
483         CURSOR *cp;
484         DB *dbp;
485         int ret;
486
487         dbp = dbc->dbp;
488         cp = dbc->internal;
489
490         DB_PANIC_CHECK(dbp);
491
492         /* Check for invalid flags. */
493         if ((ret = __db_cdelchk(dbp, flags,
494             F_ISSET(dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0)
495                 return (ret);
496
497         DEBUG_LWRITE(dbc, dbc->txn, "ram_c_del", NULL, NULL, flags);
498
499         /*
500          * If we are running CDB, this had better be either a write
501          * cursor or an immediate writer.
502          */
503         if (F_ISSET(dbp, DB_AM_CDB))
504                 if (!F_ISSET(dbc, DBC_RMW | DBC_WRITER))
505                         return (EINVAL);
506
507         /*
508          * The semantics of cursors during delete are as follows: if record
509          * numbers are mutable (DB_RE_RENUMBER is set), deleting a record
510          * causes the cursor to automatically point to the record immediately
511          * following.  In this case it is possible to use a single cursor for
512          * repeated delete operations, without intervening operations.
513          *
514          * If record numbers are not mutable, then records are replaced with
515          * a marker containing a delete flag.  If the record referenced by
516          * this cursor has already been deleted, we will detect that as part
517          * of the delete operation, and fail.
518          */
519         return (__ram_i_delete(dbc));
520 }
521
522 /*
523  * __ram_c_get --
524  *      Recno cursor->c_get function.
525  *
526  * PUBLIC: int __ram_c_get __P((DBC *, DBT *, DBT *, u_int32_t));
527  */
528 int
529 __ram_c_get(dbc, key, data, flags)
530         DBC *dbc;
531         DBT *key, *data;
532         u_int32_t flags;
533 {
534         CURSOR *cp, copy;
535         DB *dbp;
536         PAGE *h;
537         db_indx_t indx;
538         int exact, ret, stack, tmp_rmw;
539
540         dbp = dbc->dbp;
541         cp = dbc->internal;
542
543         DB_PANIC_CHECK(dbp);
544
545         /* Check for invalid flags. */
546         if ((ret = __db_cgetchk(dbc->dbp,
547             key, data, flags, cp->recno != RECNO_OOB)) != 0)
548                 return (ret);
549
550         /* Clear OR'd in additional bits so we can check for flag equality. */
551         tmp_rmw = 0;
552         if (LF_ISSET(DB_RMW)) {
553                 if (!F_ISSET(dbp, DB_AM_CDB)) {
554                         tmp_rmw = 1;
555                         F_SET(dbc, DBC_RMW);
556                 }
557                 LF_CLR(DB_RMW);
558         }
559
560         DEBUG_LREAD(dbc, dbc->txn, "ram_c_get",
561             flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
562
563         /* Initialize the cursor for a new retrieval. */
564         copy = *cp;
565
566 retry:  /* Update the record number. */
567         stack = 0;
568         switch (flags) {
569         case DB_CURRENT:
570                 /*
571                  * If record numbers are mutable: if we just deleted a record,
572                  * there is no action necessary, we return the record following
573                  * the deleted item by virtue of renumbering the tree.
574                  */
575                 break;
576         case DB_NEXT:
577                 /*
578                  * If record numbers are mutable: if we just deleted a record,
579                  * we have to avoid incrementing the record number so that we
580                  * return the right record by virtue of renumbering the tree.
581                  */
582                 if (CD_ISSET(dbp, cp))
583                         break;
584
585                 if (cp->recno != RECNO_OOB) {
586                         ++cp->recno;
587                         break;
588                 }
589                 /* FALLTHROUGH */
590         case DB_FIRST:
591                 flags = DB_NEXT;
592                 cp->recno = 1;
593                 break;
594         case DB_PREV:
595                 if (cp->recno != RECNO_OOB) {
596                         if (cp->recno == 1) {
597                                 ret = DB_NOTFOUND;
598                                 goto err;
599                         }
600                         --cp->recno;
601                         break;
602                 }
603                 /* FALLTHROUGH */
604         case DB_LAST:
605                 flags = DB_PREV;
606                 if (((ret = __ram_update(dbc,
607                     DB_MAX_RECORDS, 0)) != 0) && ret != DB_NOTFOUND)
608                         goto err;
609                 if ((ret = __bam_nrecs(dbc, &cp->recno)) != 0)
610                         goto err;
611                 if (cp->recno == 0) {
612                         ret = DB_NOTFOUND;
613                         goto err;
614                 }
615                 break;
616         case DB_SET:
617         case DB_SET_RANGE:
618                 if ((ret = __ram_getno(dbc, key, &cp->recno, 0)) != 0)
619                         goto err;
620                 break;
621         }
622
623         /* Return the key if the user didn't give us one. */
624         if (flags != DB_SET && flags != DB_SET_RANGE &&
625             (ret = __db_retcopy(key, &cp->recno, sizeof(cp->recno),
626             &dbc->rkey.data, &dbc->rkey.ulen, dbp->db_malloc)) != 0)
627                 goto err;
628
629         /* Search the tree for the record. */
630         if ((ret = __bam_rsearch(dbc, &cp->recno,
631             F_ISSET(dbc, DBC_RMW) ? S_FIND_WR : S_FIND, 1, &exact)) != 0)
632                 goto err;
633         stack = 1;
634         if (!exact) {
635                 ret = DB_NOTFOUND;
636                 goto err;
637         }
638         h = cp->csp->page;
639         indx = cp->csp->indx;
640
641         /*
642          * If re-numbering records, the on-page deleted flag means this record
643          * was implicitly created.  If not re-numbering records, the on-page
644          * deleted flag means this record was implicitly created, or, it was
645          * deleted at some time.  Regardless, we skip such records if doing
646          * cursor next/prev operations, and fail if the application requested
647          * them explicitly.
648          */
649         if (B_DISSET(GET_BKEYDATA(h, indx)->type)) {
650                 if (flags == DB_NEXT || flags == DB_PREV) {
651                         (void)__bam_stkrel(dbc, 0);
652                         goto retry;
653                 }
654                 ret = DB_KEYEMPTY;
655                 goto err;
656         }
657
658         /* Return the data item. */
659         if ((ret = __db_ret(dbp,
660             h, indx, data, &dbc->rdata.data, &dbc->rdata.ulen)) != 0)
661                 goto err;
662
663         /* The cursor was reset, no further delete adjustment is necessary. */
664         CD_CLR(dbp, cp);
665
666 err:    if (stack)
667                 (void)__bam_stkrel(dbc, 0);
668
669         /* Release temporary lock upgrade. */
670         if (tmp_rmw)
671                 F_CLR(dbc, DBC_RMW);
672
673         if (ret != 0)
674                 *cp = copy;
675
676         return (ret);
677 }
678
679 /*
680  * __ram_c_put --
681  *      Recno cursor->c_put function.
682  *
683  * PUBLIC: int __ram_c_put __P((DBC *, DBT *, DBT *, u_int32_t));
684  */
685 int
686 __ram_c_put(dbc, key, data, flags)
687         DBC *dbc;
688         DBT *key, *data;
689         u_int32_t flags;
690 {
691         CURSOR *cp, copy;
692         DB *dbp;
693         int exact, ret;
694         void *arg;
695
696         dbp = dbc->dbp;
697         cp = dbc->internal;
698
699         DB_PANIC_CHECK(dbp);
700
701         if ((ret = __db_cputchk(dbc->dbp, key, data, flags,
702             F_ISSET(dbc->dbp, DB_AM_RDONLY), cp->recno != RECNO_OOB)) != 0)
703                 return (ret);
704
705         DEBUG_LWRITE(dbc, dbc->txn, "ram_c_put", NULL, data, flags);
706
707         /*
708          * If we are running CDB, this had better be either a write
709          * cursor or an immediate writer.  If it's a regular writer,
710          * that means we have an IWRITE lock and we need to upgrade
711          * it to a write lock.
712          */
713         if (F_ISSET(dbp, DB_AM_CDB)) {
714                 if (!F_ISSET(dbc, DBC_RMW | DBC_WRITER))
715                         return (EINVAL);
716
717                 if (F_ISSET(dbc, DBC_RMW) &&
718                     (ret = lock_get(dbp->dbenv->lk_info, dbc->locker,
719                     DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
720                     &dbc->mylock)) != 0)
721                         return (EAGAIN);
722         }
723
724         /* Initialize the cursor for a new retrieval. */
725         copy = *cp;
726
727         /*
728          * To split, we need a valid key for the page.  Since it's a cursor,
729          * we have to build one.
730          *
731          * The split code discards all short-term locks and stack pages.
732          */
733         if (0) {
734 split:          arg = &cp->recno;
735                 if ((ret = __bam_split(dbc, arg)) != 0)
736                         goto err;
737         }
738
739         if ((ret = __bam_rsearch(dbc, &cp->recno, S_INSERT, 1, &exact)) != 0)
740                 goto err;
741         if (!exact) {
742                 ret = DB_NOTFOUND;
743                 goto err;
744         }
745         if ((ret = __bam_iitem(dbc, &cp->csp->page,
746             &cp->csp->indx, key, data, flags, 0)) == DB_NEEDSPLIT) {
747                 if ((ret = __bam_stkrel(dbc, 0)) != 0)
748                         goto err;
749                 goto split;
750         }
751         if ((ret = __bam_stkrel(dbc, 0)) != 0)
752                 goto err;
753
754         switch (flags) {
755         case DB_AFTER:
756                 /* Adjust the cursors. */
757                 __ram_ca(dbp, cp->recno, CA_IAFTER);
758
759                 /* Set this cursor to reference the new record. */
760                 cp->recno = copy.recno + 1;
761                 break;
762         case DB_BEFORE:
763                 /* Adjust the cursors. */
764                 __ram_ca(dbp, cp->recno, CA_IBEFORE);
765
766                 /* Set this cursor to reference the new record. */
767                 cp->recno = copy.recno;
768                 break;
769         }
770
771         /* The cursor was reset, no further delete adjustment is necessary. */
772         CD_CLR(dbp, cp);
773
774 err:    if (F_ISSET(dbp, DB_AM_CDB) && F_ISSET(dbc, DBC_RMW))
775                 (void)__lock_downgrade(dbp->dbenv->lk_info, dbc->mylock,
776                     DB_LOCK_IWRITE, 0);
777
778         if (ret != 0)
779                 *cp = copy;
780
781         return (ret);
782 }
783
784 /*
785  * __ram_ca --
786  *      Adjust cursors.
787  *
788  * PUBLIC: void __ram_ca __P((DB *, db_recno_t, ca_recno_arg));
789  */
790 void
791 __ram_ca(dbp, recno, op)
792         DB *dbp;
793         db_recno_t recno;
794         ca_recno_arg op;
795 {
796         CURSOR *cp;
797         DBC *dbc;
798
799         /*
800          * Adjust the cursors.  See the comment in __bam_ca_delete().
801          */
802         DB_THREAD_LOCK(dbp);
803         for (dbc = TAILQ_FIRST(&dbp->active_queue);
804             dbc != NULL; dbc = TAILQ_NEXT(dbc, links)) {
805                 cp = dbc->internal;
806                 switch (op) {
807                 case CA_DELETE:
808                         if (recno > cp->recno)
809                                 --cp->recno;
810                         if (recno == cp->recno)
811                                 CD_SET(dbp, cp);
812                         break;
813                 case CA_IAFTER:
814                         if (recno > cp->recno)
815                                 ++cp->recno;
816                         break;
817                 case CA_IBEFORE:
818                         if (recno >= cp->recno)
819                                 ++cp->recno;
820                         break;
821                 }
822         }
823         DB_THREAD_UNLOCK(dbp);
824 }
825
826 /*
827  * __ram_getno --
828  *      Check the user's record number, and make sure we've seen it.
829  *
830  * PUBLIC: int __ram_getno __P((DBC *, const DBT *, db_recno_t *, int));
831  */
832 int
833 __ram_getno(dbc, key, rep, can_create)
834         DBC *dbc;
835         const DBT *key;
836         db_recno_t *rep;
837         int can_create;
838 {
839         DB *dbp;
840         db_recno_t recno;
841
842         dbp = dbc->dbp;
843
844         /* Check the user's record number. */
845         if ((recno = *(db_recno_t *)key->data) == 0) {
846                 __db_err(dbp->dbenv, "illegal record number of 0");
847                 return (EINVAL);
848         }
849         if (rep != NULL)
850                 *rep = recno;
851
852         /*
853          * Btree can neither create records nor read them in.  Recno can
854          * do both, see if we can find the record.
855          */
856         return (dbp->type == DB_RECNO ?
857             __ram_update(dbc, recno, can_create) : 0);
858 }
859
860 /*
861  * __ram_update --
862  *      Ensure the tree has records up to and including the specified one.
863  */
864 static int
865 __ram_update(dbc, recno, can_create)
866         DBC *dbc;
867         db_recno_t recno;
868         int can_create;
869 {
870         BTREE *t;
871         DB *dbp;
872         RECNO *rp;
873         db_recno_t nrecs;
874         int ret;
875
876         dbp = dbc->dbp;
877         t = dbp->internal;
878         rp = t->recno;
879
880         /*
881          * If we can't create records and we've read the entire backing input
882          * file, we're done.
883          */
884         if (!can_create && F_ISSET(rp, RECNO_EOF))
885                 return (0);
886
887         /*
888          * If we haven't seen this record yet, try to get it from the original
889          * file.
890          */
891         if ((ret = __bam_nrecs(dbc, &nrecs)) != 0)
892                 return (ret);
893         if (!F_ISSET(rp, RECNO_EOF) && recno > nrecs) {
894                 if ((ret = rp->re_irec(dbc, recno)) != 0)
895                         return (ret);
896                 if ((ret = __bam_nrecs(dbc, &nrecs)) != 0)
897                         return (ret);
898         }
899
900         /*
901          * If we can create records, create empty ones up to the requested
902          * record.
903          */
904         if (!can_create || recno <= nrecs + 1)
905                 return (0);
906
907         dbc->rdata.dlen = 0;
908         dbc->rdata.doff = 0;
909         dbc->rdata.flags = 0;
910         if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
911                 if (dbc->rdata.ulen < rp->re_len) {
912                         if ((ret =
913                             __os_realloc(&dbc->rdata.data, rp->re_len)) != 0) {
914                                 dbc->rdata.ulen = 0;
915                                 dbc->rdata.data = NULL;
916                                 return (ret);
917                         }
918                         dbc->rdata.ulen = rp->re_len;
919                 }
920                 dbc->rdata.size = rp->re_len;
921                 memset(dbc->rdata.data, rp->re_pad, rp->re_len);
922         } else
923                 dbc->rdata.size = 0;
924
925         while (recno > ++nrecs)
926                 if ((ret = __ram_add(dbc,
927                     &nrecs, &dbc->rdata, 0, BI_DELETED)) != 0)
928                         return (ret);
929         return (0);
930 }
931
932 /*
933  * __ram_source --
934  *      Load information about the backing file.
935  */
936 static int
937 __ram_source(dbp, rp, fname)
938         DB *dbp;
939         RECNO *rp;
940         const char *fname;
941 {
942         size_t size;
943         u_int32_t bytes, mbytes, oflags;
944         int ret;
945
946         /*
947          * !!!
948          * The caller has full responsibility for cleaning up on error --
949          * (it has to anyway, in case it fails after this routine succeeds).
950          */
951         if ((ret = __db_appname(dbp->dbenv,
952             DB_APP_DATA, NULL, fname, 0, NULL, &rp->re_source)) != 0)
953                 return (ret);
954
955         oflags = F_ISSET(dbp, DB_AM_RDONLY) ? DB_RDONLY : 0;
956         if ((ret =
957             __db_open(rp->re_source, oflags, oflags, 0, &rp->re_fd)) != 0) {
958                 __db_err(dbp->dbenv, "%s: %s", rp->re_source, strerror(ret));
959                 return (ret);
960         }
961
962         /*
963          * XXX
964          * We'd like to test to see if the file is too big to mmap.  Since we
965          * don't know what size or type off_t's or size_t's are, or the largest
966          * unsigned integral type is, or what random insanity the local C
967          * compiler will perpetrate, doing the comparison in a portable way is
968          * flatly impossible.  Hope that mmap fails if the file is too large.
969          */
970         if ((ret = __os_ioinfo(rp->re_source,
971             rp->re_fd, &mbytes, &bytes, NULL)) != 0) {
972                 __db_err(dbp->dbenv, "%s: %s", rp->re_source, strerror(ret));
973                 return (ret);
974         }
975         if (mbytes == 0 && bytes == 0) {
976                 F_SET(rp, RECNO_EOF);
977                 return (0);
978         }
979
980         size = mbytes * MEGABYTE + bytes;
981         if ((ret = __db_mapfile(rp->re_source,
982             rp->re_fd, (size_t)size, 1, &rp->re_smap)) != 0)
983                 return (ret);
984         rp->re_cmap = rp->re_smap;
985         rp->re_emap = (u_int8_t *)rp->re_smap + (rp->re_msize = size);
986         rp->re_irec = F_ISSET(dbp, DB_RE_FIXEDLEN) ?  __ram_fmap : __ram_vmap;
987         return (0);
988 }
989
990 /*
991  * __ram_writeback --
992  *      Rewrite the backing file.
993  */
994 static int
995 __ram_writeback(dbc)
996         DBC *dbc;
997 {
998         DB *dbp;
999         DBT key, data;
1000         RECNO *rp;
1001         db_recno_t keyno;
1002         ssize_t nw;
1003         int fd, ret, t_ret;
1004         u_int8_t delim, *pad;
1005
1006         dbp = dbc->dbp;
1007         rp = ((BTREE *)dbp->internal)->recno;
1008
1009         /* If the file wasn't modified, we're done. */
1010         if (!F_ISSET(rp, RECNO_MODIFIED))
1011                 return (0);
1012
1013         /* If there's no backing source file, we're done. */
1014         if (rp->re_source == NULL) {
1015                 F_CLR(rp, RECNO_MODIFIED);
1016                 return (0);
1017         }
1018
1019         /*
1020          * Read any remaining records into the tree.
1021          *
1022          * !!!
1023          * This is why we can't support transactions when applications specify
1024          * backing (re_source) files.  At this point we have to read in the
1025          * rest of the records from the file so that we can write all of the
1026          * records back out again, which could modify a page for which we'd
1027          * have to log changes and which we don't have locked.  This could be
1028          * partially fixed by taking a snapshot of the entire file during the
1029          * db_open(), or, since db_open() isn't transaction protected, as part
1030          * of the first DB operation.  But, if a checkpoint occurs then, the
1031          * part of the log holding the copy of the file could be discarded, and
1032          * that would make it impossible to recover in the face of disaster.
1033          * This could all probably be fixed, but it would require transaction
1034          * protecting the backing source file, i.e. mpool would have to know
1035          * about it, and we don't want to go there.
1036          */
1037         if ((ret =
1038             __ram_update(dbc, DB_MAX_RECORDS, 0)) != 0 && ret != DB_NOTFOUND)
1039                 return (ret);
1040
1041         /*
1042          * !!!
1043          * Close any underlying mmap region.  This is required for Windows NT
1044          * (4.0, Service Pack 2) -- if the file is still mapped, the following
1045          * open will fail.
1046          */
1047         if (rp->re_smap != NULL) {
1048                 (void)__db_unmapfile(rp->re_smap, rp->re_msize);
1049                 rp->re_smap = NULL;
1050         }
1051
1052         /* Get rid of any backing file descriptor, just on GP's. */
1053         if (rp->re_fd != -1) {
1054                 (void)__os_close(rp->re_fd);
1055                 rp->re_fd = -1;
1056         }
1057
1058         /* Open the file, truncating it. */
1059         if ((ret = __db_open(rp->re_source,
1060             DB_SEQUENTIAL | DB_TRUNCATE,
1061             DB_SEQUENTIAL | DB_TRUNCATE, 0, &fd)) != 0) {
1062                 __db_err(dbp->dbenv, "%s: %s", rp->re_source, strerror(ret));
1063                 return (ret);
1064         }
1065
1066         /*
1067          * We step through the records, writing each one out.  Use the record
1068          * number and the dbp->get() function, instead of a cursor, so we find
1069          * and write out "deleted" or non-existent records.
1070          */
1071         memset(&key, 0, sizeof(key));
1072         memset(&data, 0, sizeof(data));
1073         key.size = sizeof(db_recno_t);
1074         key.data = &keyno;
1075
1076         /*
1077          * We'll need the delimiter if we're doing variable-length records,
1078          * and the pad character if we're doing fixed-length records.
1079          */
1080         delim = rp->re_delim;
1081         if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
1082                 if ((ret = __os_malloc(rp->re_len, NULL, &pad)) != 0)
1083                         goto err;
1084                 memset(pad, rp->re_pad, rp->re_len);
1085         } else
1086                 COMPQUIET(pad, NULL);
1087         for (keyno = 1;; ++keyno) {
1088                 switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {
1089                 case 0:
1090                         if ((ret =
1091                             __os_write(fd, data.data, data.size, &nw)) != 0)
1092                                 goto err;
1093                         if (nw != (ssize_t)data.size) {
1094                                 ret = EIO;
1095                                 goto err;
1096                         }
1097                         break;
1098                 case DB_KEYEMPTY:
1099                         if (F_ISSET(dbp, DB_RE_FIXEDLEN)) {
1100                                 if ((ret =
1101                                     __os_write(fd, pad, rp->re_len, &nw)) != 0)
1102                                         goto err;
1103                                 if (nw != (ssize_t)rp->re_len) {
1104                                         ret = EIO;
1105                                         goto err;
1106                                 }
1107                         }
1108                         break;
1109                 case DB_NOTFOUND:
1110                         ret = 0;
1111                         goto done;
1112                 }
1113                 if (!F_ISSET(dbp, DB_RE_FIXEDLEN)) {
1114                         if ((ret = __os_write(fd, &delim, 1, &nw)) != 0)
1115                                 goto err;
1116                         if (nw != 1) {
1117                                 ret = EIO;
1118                                 goto err;
1119                         }
1120                 }
1121         }
1122
1123 err:
1124 done:   /* Close the file descriptor. */
1125         if ((t_ret = __os_close(fd)) != 0 || ret == 0)
1126                 ret = t_ret;
1127
1128         if (ret == 0)
1129                 F_CLR(rp, RECNO_MODIFIED);
1130         return (ret);
1131 }
1132
1133 /*
1134  * __ram_fmap --
1135  *      Get fixed length records from a file.
1136  */
1137 static int
1138 __ram_fmap(dbc, top)
1139         DBC *dbc;
1140         db_recno_t top;
1141 {
1142         DB *dbp;
1143         DBT data;
1144         RECNO *rp;
1145         db_recno_t recno;
1146         u_int32_t len;
1147         u_int8_t *sp, *ep, *p;
1148         int ret;
1149
1150         if ((ret = __bam_nrecs(dbc, &recno)) != 0)
1151                 return (ret);
1152
1153         dbp = dbc->dbp;
1154         rp = ((BTREE *)(dbp->internal))->recno;
1155
1156         if (dbc->rdata.ulen < rp->re_len) {
1157                 if ((ret = __os_realloc(&dbc->rdata.data, rp->re_len)) != 0) {
1158                         dbc->rdata.ulen = 0;
1159                         dbc->rdata.data = NULL;
1160                         return (ret);
1161                 }
1162                 dbc->rdata.ulen = rp->re_len;
1163         }
1164
1165         memset(&data, 0, sizeof(data));
1166         data.data = dbc->rdata.data;
1167         data.size = rp->re_len;
1168
1169         sp = (u_int8_t *)rp->re_cmap;
1170         ep = (u_int8_t *)rp->re_emap;
1171         while (recno < top) {
1172                 if (sp >= ep) {
1173                         F_SET(rp, RECNO_EOF);
1174                         return (DB_NOTFOUND);
1175                 }
1176                 len = rp->re_len;
1177                 for (p = dbc->rdata.data;
1178                     sp < ep && len > 0; *p++ = *sp++, --len)
1179                         ;
1180
1181                 /*
1182                  * Another process may have read this record from the input
1183                  * file and stored it into the database already, in which
1184                  * case we don't need to repeat that operation.  We detect
1185                  * this by checking if the last record we've read is greater
1186                  * or equal to the number of records in the database.
1187                  *
1188                  * XXX
1189                  * We should just do a seek, since the records are fixed
1190                  * length.
1191                  */
1192                 if (rp->re_last >= recno) {
1193                         if (len != 0)
1194                                 memset(p, rp->re_pad, len);
1195
1196                         ++recno;
1197                         if ((ret = __ram_add(dbc, &recno, &data, 0, 0)) != 0)
1198                                 return (ret);
1199                 }
1200                 ++rp->re_last;
1201         }
1202         rp->re_cmap = sp;
1203         return (0);
1204 }
1205
1206 /*
1207  * __ram_vmap --
1208  *      Get variable length records from a file.
1209  */
1210 static int
1211 __ram_vmap(dbc, top)
1212         DBC *dbc;
1213         db_recno_t top;
1214 {
1215         DBT data;
1216         RECNO *rp;
1217         db_recno_t recno;
1218         u_int8_t *sp, *ep;
1219         int delim, ret;
1220
1221         rp = ((BTREE *)(dbc->dbp->internal))->recno;
1222
1223         if ((ret = __bam_nrecs(dbc, &recno)) != 0)
1224                 return (ret);
1225
1226         memset(&data, 0, sizeof(data));
1227
1228         delim = rp->re_delim;
1229
1230         sp = (u_int8_t *)rp->re_cmap;
1231         ep = (u_int8_t *)rp->re_emap;
1232         while (recno < top) {
1233                 if (sp >= ep) {
1234                         F_SET(rp, RECNO_EOF);
1235                         return (DB_NOTFOUND);
1236                 }
1237                 for (data.data = sp; sp < ep && *sp != delim; ++sp)
1238                         ;
1239
1240                 /*
1241                  * Another process may have read this record from the input
1242                  * file and stored it into the database already, in which
1243                  * case we don't need to repeat that operation.  We detect
1244                  * this by checking if the last record we've read is greater
1245                  * or equal to the number of records in the database.
1246                  */
1247                 if (rp->re_last >= recno) {
1248                         data.size = sp - (u_int8_t *)data.data;
1249                         ++recno;
1250                         if ((ret = __ram_add(dbc, &recno, &data, 0, 0)) != 0)
1251                                 return (ret);
1252                 }
1253                 ++rp->re_last;
1254                 ++sp;
1255         }
1256         rp->re_cmap = sp;
1257         return (0);
1258 }
1259
1260 /*
1261  * __ram_add --
1262  *      Add records into the tree.
1263  */
1264 static int
1265 __ram_add(dbc, recnop, data, flags, bi_flags)
1266         DBC *dbc;
1267         db_recno_t *recnop;
1268         DBT *data;
1269         u_int32_t flags, bi_flags;
1270 {
1271         BKEYDATA *bk;
1272         CURSOR *cp;
1273         DB *dbp;
1274         PAGE *h;
1275         db_indx_t indx;
1276         int exact, isdeleted, ret, stack;
1277
1278         dbp = dbc->dbp;
1279         cp = dbc->internal;
1280
1281 retry:  /* Find the slot for insertion. */
1282         if ((ret = __bam_rsearch(dbc, recnop,
1283             S_INSERT | (flags == DB_APPEND ? S_APPEND : 0), 1, &exact)) != 0)
1284                 return (ret);
1285         h = cp->csp->page;
1286         indx = cp->csp->indx;
1287         stack = 1;
1288
1289         /*
1290          * If re-numbering records, the on-page deleted flag means this record
1291          * was implicitly created.  If not re-numbering records, the on-page
1292          * deleted flag means this record was implicitly created, or, it was
1293          * deleted at some time.
1294          *
1295          * If DB_NOOVERWRITE is set and the item already exists in the tree,
1296          * return an error unless the item was either marked for deletion or
1297          * only implicitly created.
1298          */
1299         isdeleted = 0;
1300         if (exact) {
1301                 bk = GET_BKEYDATA(h, indx);
1302                 if (B_DISSET(bk->type))
1303                         isdeleted = 1;
1304                 else
1305                         if (flags == DB_NOOVERWRITE) {
1306                                 ret = DB_KEYEXIST;
1307                                 goto err;
1308                         }
1309         }
1310
1311         /*
1312          * Select the arguments for __bam_iitem() and do the insert.  If the
1313          * key is an exact match, or we're replacing the data item with a
1314          * new data item, replace the current item.  If the key isn't an exact
1315          * match, we're inserting a new key/data pair, before the search
1316          * location.
1317          */
1318         switch (ret = __bam_iitem(dbc,
1319             &h, &indx, NULL, data, exact ? DB_CURRENT : DB_BEFORE, bi_flags)) {
1320         case 0:
1321                 /*
1322                  * Don't adjust anything.
1323                  *
1324                  * If we inserted a record, no cursors need adjusting because
1325                  * the only new record it's possible to insert is at the very
1326                  * end of the tree.  The necessary adjustments to the internal
1327                  * page counts were made by __bam_iitem().
1328                  *
1329                  * If we overwrote a record, no cursors need adjusting because
1330                  * future DBcursor->get calls will simply return the underlying
1331                  * record (there's no adjustment made for the DB_CURRENT flag
1332                  * when a cursor get operation immediately follows a cursor
1333                  * delete operation, and the normal adjustment for the DB_NEXT
1334                  * flag is still correct).
1335                  */
1336                 break;
1337         case DB_NEEDSPLIT:
1338                 /* Discard the stack of pages and split the page. */
1339                 (void)__bam_stkrel(dbc, 0);
1340                 stack = 0;
1341
1342                 if ((ret = __bam_split(dbc, recnop)) != 0)
1343                         goto err;
1344
1345                 goto retry;
1346                 /* NOTREACHED */
1347         default:
1348                 goto err;
1349         }
1350
1351
1352 err:    if (stack)
1353                 __bam_stkrel(dbc, 0);
1354
1355         return (ret);
1356 }