25cfacc4d0bf075577196d373297f0890987bacc
[kopensolaris-gnu/glibc.git] / db2 / btree / bt_split.c
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 1996, 1997
5  *      Sleepycat Software.  All rights reserved.
6  */
7 /*
8  * Copyright (c) 1990, 1993, 1994, 1995, 1996
9  *      Keith Bostic.  All rights reserved.
10  */
11 /*
12  * Copyright (c) 1990, 1993, 1994, 1995
13  *      The Regents of the University of California.  All rights reserved.
14  *
15  * Redistribution and use in source and binary forms, with or without
16  * modification, are permitted provided that the following conditions
17  * are met:
18  * 1. Redistributions of source code must retain the above copyright
19  *    notice, this list of conditions and the following disclaimer.
20  * 2. Redistributions in binary form must reproduce the above copyright
21  *    notice, this list of conditions and the following disclaimer in the
22  *    documentation and/or other materials provided with the distribution.
23  * 3. All advertising materials mentioning features or use of this software
24  *    must display the following acknowledgement:
25  *      This product includes software developed by the University of
26  *      California, Berkeley and its contributors.
27  * 4. Neither the name of the University nor the names of its contributors
28  *    may be used to endorse or promote products derived from this software
29  *    without specific prior written permission.
30  *
31  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
32  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
33  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
34  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
35  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
36  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
37  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
38  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
39  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
40  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
41  * SUCH DAMAGE.
42  */
43
44 #include "config.h"
45
46 #ifndef lint
47 static const char sccsid[] = "@(#)bt_split.c    10.14 (Sleepycat) 9/3/97";
48 #endif /* not lint */
49
50 #ifndef NO_SYSTEM_INCLUDES
51 #include <sys/types.h>
52
53 #include <errno.h>
54 #include <limits.h>
55 #include <stdio.h>
56 #include <stdlib.h>
57 #include <string.h>
58 #endif
59
60 #include "db_int.h"
61 #include "db_page.h"
62 #include "btree.h"
63
64 static int __bam_page __P((DB *, EPG *, EPG *));
65 static int __bam_pinsert __P((DB *, EPG *, PAGE *, PAGE *));
66 static int __bam_psplit __P((DB *, EPG *, PAGE *, PAGE *, int));
67 static int __bam_root __P((DB *, EPG *));
68
69 /*
70  * __bam_split --
71  *      Split a page.
72  *
73  * PUBLIC: int __bam_split __P((DB *, void *));
74  */
75 int
76 __bam_split(dbp, arg)
77         DB *dbp;
78         void *arg;
79 {
80         BTREE *t;
81         enum { UP, DOWN } dir;
82         int exact, level, ret;
83
84         t = dbp->internal;
85
86         /*
87          * The locking protocol we use to avoid deadlock to acquire locks by
88          * walking down the tree, but we do it as lazily as possible, locking
89          * the root only as a last resort.  We expect all stack pages to have
90          * been discarded before we're called; we discard all short-term locks.
91          *
92          * When __bam_split is first called, we know that a leaf page was too
93          * full for an insert.  We don't know what leaf page it was, but we
94          * have the key/recno that caused the problem.  We call XX_search to
95          * reacquire the leaf page, but this time get both the leaf page and
96          * its parent, locked.  We then split the leaf page and see if the new
97          * internal key will fit into the parent page.  If it will, we're done.
98          *
99          * If it won't, we discard our current locks and repeat the process,
100          * only this time acquiring the parent page and its parent, locked.
101          * This process repeats until we succeed in the split, splitting the
102          * root page as the final resort.  The entire process then repeats,
103          * as necessary, until we split a leaf page.
104          *
105          * XXX
106          * A traditional method of speeding this up is to maintain a stack of
107          * the pages traversed in the original search.  You can detect if the
108          * stack is correct by storing the page's LSN when it was searched and
109          * comparing that LSN with the current one when it's locked during the
110          * split.  This would be an easy change for this code, but I have no
111          * numbers that indicate it's worthwhile.
112          */
113         for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
114                 /*
115                  * Acquire a page and its parent, locked.
116                  */
117                 if ((ret = (dbp->type == DB_BTREE ?
118                     __bam_search(dbp, arg, S_WRPAIR, level, NULL, &exact) :
119                     __bam_rsearch(dbp,
120                         (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
121                         return (ret);
122
123                 /* Split the page. */
124                 ret = t->bt_csp[0].page->pgno == PGNO_ROOT ?
125                     __bam_root(dbp, &t->bt_csp[0]) :
126                     __bam_page(dbp, &t->bt_csp[-1], &t->bt_csp[0]);
127
128                 switch (ret) {
129                 case 0:
130                         /* Once we've split the leaf page, we're done. */
131                         if (level == LEAFLEVEL)
132                                 return (0);
133
134                         /* Switch directions. */
135                         if (dir == UP)
136                                 dir = DOWN;
137                         break;
138                 case DB_NEEDSPLIT:
139                         /*
140                          * It's possible to fail to split repeatedly, as other
141                          * threads may be modifying the tree, or the page usage
142                          * is sufficiently bad that we don't get enough space
143                          * the first time.
144                          */
145                         if (dir == DOWN)
146                                 dir = UP;
147                         break;
148                 default:
149                         return (ret);
150                 }
151         }
152         /* NOTREACHED */
153 }
154
155 /*
156  * __bam_root --
157  *      Split the root page of a btree.
158  */
159 static int
160 __bam_root(dbp, cp)
161         DB *dbp;
162         EPG *cp;
163 {
164         BTREE *t;
165         PAGE *lp, *rp;
166         int ret;
167
168         t = dbp->internal;
169
170         /* Yeah, right. */
171         if (cp->page->level >= MAXBTREELEVEL)
172                 return (ENOSPC);
173
174         /* Create new left and right pages for the split. */
175         lp = rp = NULL;
176         if ((ret = __bam_new(dbp, TYPE(cp->page), &lp)) != 0 ||
177             (ret = __bam_new(dbp, TYPE(cp->page), &rp)) != 0)
178                 goto err;
179         P_INIT(lp, dbp->pgsize, lp->pgno,
180             PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
181             cp->page->level, TYPE(cp->page));
182         P_INIT(rp, dbp->pgsize, rp->pgno,
183             ISINTERNAL(cp->page) ?  PGNO_INVALID : lp->pgno, PGNO_INVALID,
184             cp->page->level, TYPE(cp->page));
185
186         /* Split the page. */
187         if ((ret = __bam_psplit(dbp, cp, lp, rp, 1)) != 0)
188                 goto err;
189
190         /* Log the change. */
191         if (DB_LOGGING(dbp)) {
192                 DBT __a;
193                 DB_LSN __lsn;
194                 memset(&__a, 0, sizeof(__a));
195                 __a.data = cp->page;
196                 __a.size = dbp->pgsize;
197                 ZERO_LSN(__lsn);
198                 if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbp->txn,
199                     &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp),
200                     PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &__lsn,
201                     &__a)) != 0)
202                         goto err;
203                 LSN(lp) = LSN(rp) = LSN(cp->page);
204         }
205
206         /* Clean up the new root page. */
207         if ((ret = (dbp->type == DB_RECNO ?
208             __ram_root(dbp, cp->page, lp, rp) :
209             __bam_broot(dbp, cp->page, lp, rp))) != 0)
210                 goto err;
211
212         /* Success -- write the real pages back to the store. */
213         (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
214         (void)__BT_TLPUT(dbp, cp->lock);
215         (void)memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY);
216         (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
217
218         ++t->lstat.bt_split;
219         ++t->lstat.bt_rootsplit;
220         return (0);
221
222 err:    if (lp != NULL)
223                 (void)__bam_free(dbp, lp);
224         if (rp != NULL)
225                 (void)__bam_free(dbp, rp);
226         (void)memp_fput(dbp->mpf, cp->page, 0);
227         (void)__BT_TLPUT(dbp, cp->lock);
228         return (ret);
229 }
230
231 /*
232  * __bam_page --
233  *      Split the non-root page of a btree.
234  */
235 static int
236 __bam_page(dbp, pp, cp)
237         DB *dbp;
238         EPG *pp, *cp;
239 {
240         BTREE *t;
241         DB_LOCK tplock;
242         PAGE *lp, *rp, *tp;
243         int ret;
244
245         t = dbp->internal;
246         lp = rp = tp = NULL;
247         ret = -1;
248
249         /* Create new right page for the split. */
250         if ((ret = __bam_new(dbp, TYPE(cp->page), &rp)) != 0)
251                 return (ret);
252         P_INIT(rp, dbp->pgsize, rp->pgno,
253             ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->pgno,
254             ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->next_pgno,
255             cp->page->level, TYPE(cp->page));
256
257         /* Create new left page for the split. */
258         if ((lp = (PAGE *)malloc(dbp->pgsize)) == NULL) {
259                 ret = ENOMEM;
260                 goto err;
261         }
262 #ifdef DEBUG
263         memset(lp, 0xff, dbp->pgsize);
264 #endif
265         P_INIT(lp, dbp->pgsize, cp->page->pgno,
266             ISINTERNAL(cp->page) ?  PGNO_INVALID : cp->page->prev_pgno,
267             ISINTERNAL(cp->page) ?  PGNO_INVALID : rp->pgno,
268             cp->page->level, TYPE(cp->page));
269         ZERO_LSN(lp->lsn);
270
271         /*
272          * Split right.
273          *
274          * Only the indices are sorted on the page, i.e., the key/data pairs
275          * aren't, so it's simpler to copy the data from the split page onto
276          * two new pages instead of copying half the data to the right page
277          * and compacting the left page in place.  Since the left page can't
278          * change, we swap the original and the allocated left page after the
279          * split.
280          */
281         if ((ret = __bam_psplit(dbp, cp, lp, rp, 0)) != 0)
282                 goto err;
283
284         /*
285          * Fix up the previous pointer of any leaf page following the split
286          * page.
287          *
288          * !!!
289          * There are interesting deadlock situations here as we write-lock a
290          * page that's not in our direct ancestry.  Consider a cursor walking
291          * through the leaf pages, that has the previous page read-locked and
292          * is waiting on a lock for the page we just split.  It will deadlock
293          * here.  If this is a problem, we can fail in the split; it's not a
294          * problem as the split will succeed after the cursor passes through
295          * the page we're splitting.
296          */
297         if (TYPE(cp->page) == P_LBTREE && rp->next_pgno != PGNO_INVALID) {
298                 if ((ret = __bam_lget(dbp,
299                     0, rp->next_pgno, DB_LOCK_WRITE, &tplock)) != 0)
300                         goto err;
301                 if ((ret = __bam_pget(dbp, &tp, &rp->next_pgno, 0)) != 0)
302                         goto err;
303         }
304
305         /* Insert the new pages into the parent page. */
306         if ((ret = __bam_pinsert(dbp, pp, lp, rp)) != 0)
307                 goto err;
308
309         /* Log the change. */
310         if (DB_LOGGING(dbp)) {
311                 DBT __a;
312                 DB_LSN __lsn;
313                 memset(&__a, 0, sizeof(__a));
314                 __a.data = cp->page;
315                 __a.size = dbp->pgsize;
316                 if (tp == NULL)
317                         ZERO_LSN(__lsn);
318                 if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbp->txn,
319                     &cp->page->lsn, 0, dbp->log_fileid, PGNO(cp->page),
320                     &LSN(cp->page), PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp),
321                     tp == NULL ? 0 : PGNO(tp),
322                     tp == NULL ? &__lsn : &LSN(tp), &__a)) != 0)
323                         goto err;
324
325                 LSN(lp) = LSN(rp) = LSN(cp->page);
326                 if (tp != NULL)
327                         LSN(tp) = LSN(cp->page);
328         }
329
330         /* Copy the allocated page into place. */
331         memcpy(cp->page, lp, LOFFSET(lp));
332         memcpy((u_int8_t *)cp->page + HOFFSET(lp),
333             (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
334         FREE(lp, dbp->pgsize);
335         lp = NULL;
336
337         /* Finish the next-page link. */
338         if (tp != NULL)
339                 tp->prev_pgno = rp->pgno;
340
341         /* Success -- write the real pages back to the store. */
342         (void)memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY);
343         (void)__BT_TLPUT(dbp, pp->lock);
344         (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
345         (void)__BT_TLPUT(dbp, cp->lock);
346         (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
347         if (tp != NULL) {
348                 (void)memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY);
349                 (void)__BT_TLPUT(dbp, tplock);
350         }
351         return (0);
352
353 err:    if (lp != NULL)
354                 FREE(lp, dbp->pgsize);
355         if (rp != NULL)
356                 (void)__bam_free(dbp, rp);
357         if (tp != NULL) {
358                 (void)memp_fput(dbp->mpf, tp, 0);
359                 (void)__BT_TLPUT(dbp, tplock);
360         }
361         (void)memp_fput(dbp->mpf, pp->page, 0);
362         (void)__BT_TLPUT(dbp, pp->lock);
363         (void)memp_fput(dbp->mpf, cp->page, 0);
364         (void)__BT_TLPUT(dbp, cp->lock);
365         return (ret);
366 }
367
368 /*
369  * __bam_broot --
370  *      Fix up the btree root page after it has been split.
371  *
372  * PUBLIC: int __bam_broot __P((DB *, PAGE *, PAGE *, PAGE *));
373  */
374 int
375 __bam_broot(dbp, rootp, lp, rp)
376         DB *dbp;
377         PAGE *rootp, *lp, *rp;
378 {
379         BINTERNAL bi, *child_bi;
380         BKEYDATA *child_bk;
381         DBT hdr, data;
382         int ret;
383
384         /*
385          * If the root page was a leaf page, change it into an internal page.
386          * We copy the key we split on (but not the key's data, in the case of
387          * a leaf page) to the new root page.
388          */
389         P_INIT(rootp, dbp->pgsize,
390             PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
391
392         /*
393          * The btree comparison code guarantees that the left-most key on any
394          * level of the tree is never used, so it doesn't need to be filled in.
395          */
396         bi.len = 0;
397         B_TSET(bi.type, B_KEYDATA, 0);
398         bi.pgno = lp->pgno;
399         if (F_ISSET(dbp, DB_BT_RECNUM)) {
400                 bi.nrecs = __bam_total(lp);
401                 RE_NREC_SET(rootp, bi.nrecs);
402         }
403         memset(&hdr, 0, sizeof(hdr));
404         hdr.data = &bi;
405         hdr.size = SSZA(BINTERNAL, data);
406         memset(&data, 0, sizeof(data));
407         data.data = (char *)"";
408         data.size = 0;
409         if ((ret =
410             __db_pitem(dbp, rootp, 0, BINTERNAL_SIZE(0), &hdr, &data)) != 0)
411                 return (ret);
412
413         switch (TYPE(rp)) {
414         case P_IBTREE:
415                 /* Copy the first key of the child page onto the root page. */
416                 child_bi = GET_BINTERNAL(rp, 0);
417
418                 bi.len = child_bi->len;
419                 B_TSET(bi.type, child_bi->type, 0);
420                 bi.pgno = rp->pgno;
421                 if (F_ISSET(dbp, DB_BT_RECNUM)) {
422                         bi.nrecs = __bam_total(rp);
423                         RE_NREC_ADJ(rootp, bi.nrecs);
424                 }
425                 hdr.data = &bi;
426                 hdr.size = SSZA(BINTERNAL, data);
427                 data.data = child_bi->data;
428                 data.size = child_bi->len;
429                 if ((ret = __db_pitem(dbp, rootp, 1,
430                     BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
431                         return (ret);
432
433                 /* Increment the overflow ref count. */
434                 if (B_TYPE(child_bi->type) == B_OVERFLOW && (ret =
435                     __db_ioff(dbp, ((BOVERFLOW *)(child_bi->data))->pgno)) != 0)
436                         return (ret);
437                 break;
438         case P_LBTREE:
439                 /* Copy the first key of the child page onto the root page. */
440                 child_bk = GET_BKEYDATA(rp, 0);
441                 switch (B_TYPE(child_bk->type)) {
442                 case B_KEYDATA:
443                         bi.len = child_bk->len;
444                         B_TSET(bi.type, child_bk->type, 0);
445                         bi.pgno = rp->pgno;
446                         if (F_ISSET(dbp, DB_BT_RECNUM)) {
447                                 bi.nrecs = __bam_total(rp);
448                                 RE_NREC_ADJ(rootp, bi.nrecs);
449                         }
450                         hdr.data = &bi;
451                         hdr.size = SSZA(BINTERNAL, data);
452                         data.data = child_bk->data;
453                         data.size = child_bk->len;
454                         if ((ret = __db_pitem(dbp, rootp, 1,
455                             BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
456                                 return (ret);
457                         break;
458                 case B_DUPLICATE:
459                 case B_OVERFLOW:
460                         bi.len = BOVERFLOW_SIZE;
461                         B_TSET(bi.type, child_bk->type, 0);
462                         bi.pgno = rp->pgno;
463                         if (F_ISSET(dbp, DB_BT_RECNUM)) {
464                                 bi.nrecs = __bam_total(rp);
465                                 RE_NREC_ADJ(rootp, bi.nrecs);
466                         }
467                         hdr.data = &bi;
468                         hdr.size = SSZA(BINTERNAL, data);
469                         data.data = child_bk;
470                         data.size = BOVERFLOW_SIZE;
471                         if ((ret = __db_pitem(dbp, rootp, 1,
472                             BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
473                                 return (ret);
474
475                         /* Increment the overflow ref count. */
476                         if (B_TYPE(child_bk->type) == B_OVERFLOW && (ret =
477                             __db_ioff(dbp, ((BOVERFLOW *)child_bk)->pgno)) != 0)
478                                 return (ret);
479                         break;
480                 default:
481                         return (__db_pgfmt(dbp, rp->pgno));
482                 }
483                 break;
484         default:
485                 return (__db_pgfmt(dbp, rp->pgno));
486         }
487         return (0);
488 }
489
490 /*
491  * __ram_root --
492  *      Fix up the recno root page after it has been split.
493  *
494  * PUBLIC: int __ram_root __P((DB *, PAGE *, PAGE *, PAGE *));
495  */
496 int
497 __ram_root(dbp, rootp, lp, rp)
498         DB *dbp;
499         PAGE *rootp, *lp, *rp;
500 {
501         DBT hdr;
502         RINTERNAL ri;
503         int ret;
504
505         /* Initialize the page. */
506         P_INIT(rootp, dbp->pgsize,
507             PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
508
509         /* Initialize the header. */
510         memset(&hdr, 0, sizeof(hdr));
511         hdr.data = &ri;
512         hdr.size = RINTERNAL_SIZE;
513
514         /* Insert the left and right keys, set the header information. */
515         ri.pgno = lp->pgno;
516         ri.nrecs = __bam_total(lp);
517         if ((ret = __db_pitem(dbp, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
518                 return (ret);
519         RE_NREC_SET(rootp, ri.nrecs);
520         ri.pgno = rp->pgno;
521         ri.nrecs = __bam_total(rp);
522         if ((ret = __db_pitem(dbp, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
523                 return (ret);
524         RE_NREC_ADJ(rootp, ri.nrecs);
525         return (0);
526 }
527
528 /*
529  * __bam_pinsert --
530  *      Insert a new key into a parent page, completing the split.
531  */
532 static int
533 __bam_pinsert(dbp, parent, lchild, rchild)
534         DB *dbp;
535         EPG *parent;
536         PAGE *lchild, *rchild;
537 {
538         BINTERNAL bi, *child_bi;
539         BKEYDATA *child_bk, *tmp_bk;
540         BTREE *t;
541         DBT a, b, hdr, data;
542         PAGE *ppage;
543         RINTERNAL ri;
544         db_indx_t off;
545         db_recno_t nrecs;
546         u_int32_t n, nbytes, nksize;
547         int ret;
548
549         t = dbp->internal;
550         ppage = parent->page;
551
552         /* If handling record numbers, count records split to the right page. */
553         nrecs = dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM) ?
554             __bam_total(rchild) : 0;
555
556         /*
557          * Now we insert the new page's first key into the parent page, which
558          * completes the split.  The parent points to a PAGE and a page index
559          * offset, where the new key goes ONE AFTER the index, because we split
560          * to the right.
561          *
562          * XXX
563          * Some btree algorithms replace the key for the old page as well as
564          * the new page.  We don't, as there's no reason to believe that the
565          * first key on the old page is any better than the key we have, and,
566          * in the case of a key being placed at index 0 causing the split, the
567          * key is unavailable.
568          */
569         off = parent->indx + O_INDX;
570
571         /*
572          * Calculate the space needed on the parent page.
573          *
574          * Prefix trees: space hack used when inserting into BINTERNAL pages.
575          * Retain only what's needed to distinguish between the new entry and
576          * the LAST entry on the page to its left.  If the keys compare equal,
577          * retain the entire key.  We ignore overflow keys, and the entire key
578          * must be retained for the next-to-leftmost key on the leftmost page
579          * of each level, or the search will fail.  Applicable ONLY to internal
580          * pages that have leaf pages as children.  Further reduction of the
581          * key between pairs of internal pages loses too much information.
582          */
583         switch (TYPE(rchild)) {
584         case P_IBTREE:
585                 child_bi = GET_BINTERNAL(rchild, 0);
586                 nbytes = BINTERNAL_PSIZE(child_bi->len);
587
588                 if (P_FREESPACE(ppage) < nbytes)
589                         return (DB_NEEDSPLIT);
590
591                 /* Add a new record for the right page. */
592                 bi.len = child_bi->len;
593                 B_TSET(bi.type, child_bi->type, 0);
594                 bi.pgno = rchild->pgno;
595                 bi.nrecs = nrecs;
596                 memset(&hdr, 0, sizeof(hdr));
597                 hdr.data = &bi;
598                 hdr.size = SSZA(BINTERNAL, data);
599                 memset(&data, 0, sizeof(data));
600                 data.data = child_bi->data;
601                 data.size = child_bi->len;
602                 if ((ret = __db_pitem(dbp, ppage, off,
603                     BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
604                         return (ret);
605
606                 /* Increment the overflow ref count. */
607                 if (B_TYPE(child_bi->type) == B_OVERFLOW && (ret =
608                     __db_ioff(dbp, ((BOVERFLOW *)(child_bi->data))->pgno)) != 0)
609                         return (ret);
610                 break;
611         case P_LBTREE:
612                 child_bk = GET_BKEYDATA(rchild, 0);
613                 switch (B_TYPE(child_bk->type)) {
614                 case B_KEYDATA:
615                         nbytes = BINTERNAL_PSIZE(child_bk->len);
616                         nksize = child_bk->len;
617                         if (t->bt_prefix == NULL)
618                                 goto noprefix;
619                         if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
620                                 goto noprefix;
621                         tmp_bk = GET_BKEYDATA(lchild, NUM_ENT(lchild) - P_INDX);
622                         if (B_TYPE(tmp_bk->type) != B_KEYDATA)
623                                 goto noprefix;
624                         memset(&a, 0, sizeof(a));
625                         a.size = tmp_bk->len;
626                         a.data = tmp_bk->data;
627                         memset(&b, 0, sizeof(b));
628                         b.size = child_bk->len;
629                         b.data = child_bk->data;
630                         nksize = t->bt_prefix(&a, &b);
631                         if ((n = BINTERNAL_PSIZE(nksize)) < nbytes) {
632                                 t->lstat.bt_pfxsaved += nbytes - n;
633                                 nbytes = n;
634                         } else
635 noprefix:                       nksize = child_bk->len;
636
637                         if (P_FREESPACE(ppage) < nbytes)
638                                 return (DB_NEEDSPLIT);
639
640                         bi.len = nksize;
641                         B_TSET(bi.type, child_bk->type, 0);
642                         bi.pgno = rchild->pgno;
643                         bi.nrecs = nrecs;
644                         memset(&hdr, 0, sizeof(hdr));
645                         hdr.data = &bi;
646                         hdr.size = SSZA(BINTERNAL, data);
647                         memset(&data, 0, sizeof(data));
648                         data.data = child_bk->data;
649                         data.size = nksize;
650                         if ((ret = __db_pitem(dbp, ppage, off,
651                             BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
652                                 return (ret);
653                         break;
654                 case B_DUPLICATE:
655                 case B_OVERFLOW:
656                         nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
657
658                         if (P_FREESPACE(ppage) < nbytes)
659                                 return (DB_NEEDSPLIT);
660
661                         bi.len = BOVERFLOW_SIZE;
662                         B_TSET(bi.type, child_bk->type, 0);
663                         bi.pgno = rchild->pgno;
664                         bi.nrecs = nrecs;
665                         memset(&hdr, 0, sizeof(hdr));
666                         hdr.data = &bi;
667                         hdr.size = SSZA(BINTERNAL, data);
668                         memset(&data, 0, sizeof(data));
669                         data.data = child_bk;
670                         data.size = BOVERFLOW_SIZE;
671                         if ((ret = __db_pitem(dbp, ppage, off,
672                             BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
673                                 return (ret);
674
675                         /* Increment the overflow ref count. */
676                         if (B_TYPE(child_bk->type) == B_OVERFLOW && (ret =
677                             __db_ioff(dbp, ((BOVERFLOW *)child_bk)->pgno)) != 0)
678                                 return (ret);
679                         break;
680                 default:
681                         return (__db_pgfmt(dbp, rchild->pgno));
682                 }
683                 break;
684         case P_IRECNO:
685         case P_LRECNO:
686                 nbytes = RINTERNAL_PSIZE;
687
688                 if (P_FREESPACE(ppage) < nbytes)
689                         return (DB_NEEDSPLIT);
690
691                 /* Add a new record for the right page. */
692                 memset(&hdr, 0, sizeof(hdr));
693                 hdr.data = &ri;
694                 hdr.size = RINTERNAL_SIZE;
695                 ri.pgno = rchild->pgno;
696                 ri.nrecs = nrecs;
697                 if ((ret = __db_pitem(dbp,
698                     ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
699                         return (ret);
700                 break;
701         default:
702                 return (__db_pgfmt(dbp, rchild->pgno));
703         }
704
705         /* Adjust the parent page's left page record count. */
706         if (dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) {
707                 /* Log the change. */
708                 if (DB_LOGGING(dbp) &&
709                     (ret = __bam_cadjust_log(dbp->dbenv->lg_info,
710                     dbp->txn, &LSN(ppage), 0, dbp->log_fileid,
711                     PGNO(ppage), &LSN(ppage), (u_int32_t)parent->indx,
712                     -(int32_t)nrecs, (int32_t)0)) != 0)
713                         return (ret);
714
715                 /* Update the left page count. */
716                 if (dbp->type == DB_RECNO)
717                         GET_RINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
718                 else
719                         GET_BINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
720         }
721
722         return (0);
723 }
724
725 /*
726  * __bam_psplit --
727  *      Do the real work of splitting the page.
728  */
729 static int
730 __bam_psplit(dbp, cp, lp, rp, cleft)
731         DB *dbp;
732         EPG *cp;
733         PAGE *lp, *rp;
734         int cleft;
735 {
736         BTREE *t;
737         PAGE *pp;
738         db_indx_t half, nbytes, off, splitp, top;
739         int adjust, cnt, isbigkey, ret;
740
741         t = dbp->internal;
742         pp = cp->page;
743         adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
744
745         /*
746          * If we're splitting the first (last) page on a level because we're
747          * inserting (appending) a key to it, it's likely that the data is
748          * sorted.  Moving a single item to the new page is less work and can
749          * push the fill factor higher than normal.  If we're wrong it's not
750          * a big deal, we'll just do the split the right way next time.
751          */
752         off = 0;
753         if (NEXT_PGNO(pp) == PGNO_INVALID &&
754             ((ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page) - 1) ||
755             (!ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page))))
756                 off = NUM_ENT(cp->page) - adjust;
757         else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
758                 off = adjust;
759
760         ++t->lstat.bt_split;
761         if (off != 0) {
762                 ++t->lstat.bt_fastsplit;
763                 goto sort;
764         }
765
766         /*
767          * Split the data to the left and right pages.  Try not to split on
768          * an overflow key.  (Overflow keys on internal pages will slow down
769          * searches.)  Refuse to split in the middle of a set of duplicates.
770          *
771          * First, find the optimum place to split.
772          *
773          * It's possible to try and split past the last record on the page if
774          * there's a very large record at the end of the page.  Make sure this
775          * doesn't happen by bounding the check at the next-to-last entry on
776          * the page.
777          *
778          * Note, we try and split half the data present on the page.  This is
779          * because another process may have already split the page and left
780          * it half empty.  We don't try and skip the split -- we don't know
781          * how much space we're going to need on the page, and we may need up
782          * to half the page for a big item, so there's no easy test to decide
783          * if we need to split or not.  Besides, if two threads are inserting
784          * data into the same place in the database, we're probably going to
785          * need more space soon anyway.
786          */
787         top = NUM_ENT(pp) - adjust;
788         half = (dbp->pgsize - HOFFSET(pp)) / 2;
789         for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
790                 switch (TYPE(pp)) {
791                 case P_IBTREE:
792                         if (B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA)
793                                 nbytes +=
794                                    BINTERNAL_SIZE(GET_BINTERNAL(pp, off)->len);
795                         else
796                                 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
797                         break;
798                 case P_LBTREE:
799                         if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
800                                 nbytes +=
801                                     BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
802                         else
803                                 nbytes += BOVERFLOW_SIZE;
804
805                         ++off;
806                         if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
807                                 nbytes +=
808                                     BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
809                         else
810                                 nbytes += BOVERFLOW_SIZE;
811                         break;
812                 case P_IRECNO:
813                         nbytes += RINTERNAL_SIZE;
814                         break;
815                 case P_LRECNO:
816                         nbytes += BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
817                         break;
818                 default:
819                         return (__db_pgfmt(dbp, pp->pgno));
820                 }
821 sort:   splitp = off;
822
823         /*
824          * Splitp is either at or just past the optimum split point.  If
825          * it's a big key, try and find something close by that's not.
826          */
827         if (TYPE(pp) == P_IBTREE)
828                 isbigkey = B_TYPE(GET_BINTERNAL(pp, off)->type) != B_KEYDATA;
829         else if (TYPE(pp) == P_LBTREE)
830                 isbigkey = B_TYPE(GET_BKEYDATA(pp, off)->type) != B_KEYDATA;
831         else
832                 isbigkey = 0;
833         if (isbigkey)
834                 for (cnt = 1; cnt <= 3; ++cnt) {
835                         off = splitp + cnt * adjust;
836                         if (off < (db_indx_t)NUM_ENT(pp) &&
837                             ((TYPE(pp) == P_IBTREE &&
838                             B_TYPE(GET_BINTERNAL(pp,off)->type) == B_KEYDATA) ||
839                             B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)) {
840                                 splitp = off;
841                                 break;
842                         }
843                         if (splitp <= (db_indx_t)(cnt * adjust))
844                                 continue;
845                         off = splitp - cnt * adjust;
846                         if (TYPE(pp) == P_IBTREE ?
847                             B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA :
848                             B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) {
849                                 splitp = off;
850                                 break;
851                         }
852                 }
853
854         /*
855          * We can't split in the middle a set of duplicates.  We know that
856          * no duplicate set can take up more than about 25% of the page,
857          * because that's the point where we push it off onto a duplicate
858          * page set.  So, this loop can't be unbounded.
859          */
860         if (F_ISSET(dbp, DB_AM_DUP) && TYPE(pp) == P_LBTREE &&
861             pp->inp[splitp] == pp->inp[splitp - adjust])
862                 for (cnt = 1;; ++cnt) {
863                         off = splitp + cnt * adjust;
864                         if (off < NUM_ENT(pp) &&
865                             pp->inp[splitp] != pp->inp[off]) {
866                                 splitp = off;
867                                 break;
868                         }
869                         if (splitp <= (db_indx_t)(cnt * adjust))
870                                 continue;
871                         off = splitp - cnt * adjust;
872                         if (pp->inp[splitp] != pp->inp[off]) {
873                                 splitp = off + adjust;
874                                 break;
875                         }
876                 }
877
878
879         /* We're going to split at splitp. */
880         if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
881                 return (ret);
882         if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
883                 return (ret);
884
885         /* Adjust the cursors. */
886         __bam_ca_split(dbp, pp->pgno, lp->pgno, rp->pgno, splitp, cleft);
887         return (0);
888 }
889
890 /*
891  * __bam_copy --
892  *      Copy a set of records from one page to another.
893  *
894  * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
895  */
896 int
897 __bam_copy(dbp, pp, cp, nxt, stop)
898         DB *dbp;
899         PAGE *pp, *cp;
900         u_int32_t nxt, stop;
901 {
902         db_indx_t dup, nbytes, off;
903
904         /*
905          * Copy the rest of the data to the right page.  Nxt is the next
906          * offset placed on the target page.
907          */
908         for (dup = off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
909                 switch (TYPE(pp)) {
910                 case P_IBTREE:
911                         if (B_TYPE(GET_BINTERNAL(pp, nxt)->type) == B_KEYDATA)
912                                 nbytes =
913                                     BINTERNAL_SIZE(GET_BINTERNAL(pp, nxt)->len);
914                         else
915                                 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
916                         break;
917                 case P_LBTREE:
918                         /*
919                          * If we're on a key and it's a duplicate, just copy
920                          * the offset.
921                          */
922                         if (off != 0 && (nxt % P_INDX) == 0 &&
923                             pp->inp[nxt] == pp->inp[nxt - P_INDX]) {
924                                 cp->inp[off] = cp->inp[off - P_INDX];
925                                 continue;
926                         }
927                         /* FALLTHROUGH */
928                 case P_LRECNO:
929                         if (B_TYPE(GET_BKEYDATA(pp, nxt)->type) == B_KEYDATA)
930                                 nbytes =
931                                     BKEYDATA_SIZE(GET_BKEYDATA(pp, nxt)->len);
932                         else
933                                 nbytes = BOVERFLOW_SIZE;
934                         break;
935                 case P_IRECNO:
936                         nbytes = RINTERNAL_SIZE;
937                         break;
938                 default:
939                         return (__db_pgfmt(dbp, pp->pgno));
940                 }
941                 cp->inp[off] = HOFFSET(cp) -= nbytes;
942                 memcpy(P_ENTRY(cp, off), P_ENTRY(pp, nxt), nbytes);
943         }
944         return (0);
945 }