2 * See the file LICENSE for redistribution information.
4 * Copyright (c) 1996, 1997
5 * Sleepycat Software. All rights reserved.
8 * Copyright (c) 1990, 1993, 1994, 1995, 1996
9 * Keith Bostic. All rights reserved.
12 * Copyright (c) 1990, 1993, 1994, 1995
13 * The Regents of the University of California. All rights reserved.
15 * Redistribution and use in source and binary forms, with or without
16 * modification, are permitted provided that the following conditions
18 * 1. Redistributions of source code must retain the above copyright
19 * notice, this list of conditions and the following disclaimer.
20 * 2. Redistributions in binary form must reproduce the above copyright
21 * notice, this list of conditions and the following disclaimer in the
22 * documentation and/or other materials provided with the distribution.
23 * 3. All advertising materials mentioning features or use of this software
24 * must display the following acknowledgement:
25 * This product includes software developed by the University of
26 * California, Berkeley and its contributors.
27 * 4. Neither the name of the University nor the names of its contributors
28 * may be used to endorse or promote products derived from this software
29 * without specific prior written permission.
31 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
32 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
33 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
34 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
35 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
36 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
37 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
38 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
39 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
40 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
47 static const char sccsid[] = "@(#)bt_split.c 10.14 (Sleepycat) 9/3/97";
50 #ifndef NO_SYSTEM_INCLUDES
51 #include <sys/types.h>
64 static int __bam_page __P((DB *, EPG *, EPG *));
65 static int __bam_pinsert __P((DB *, EPG *, PAGE *, PAGE *));
66 static int __bam_psplit __P((DB *, EPG *, PAGE *, PAGE *, int));
67 static int __bam_root __P((DB *, EPG *));
73 * PUBLIC: int __bam_split __P((DB *, void *));
81 enum { UP, DOWN } dir;
82 int exact, level, ret;
87 * The locking protocol we use to avoid deadlock to acquire locks by
88 * walking down the tree, but we do it as lazily as possible, locking
89 * the root only as a last resort. We expect all stack pages to have
90 * been discarded before we're called; we discard all short-term locks.
92 * When __bam_split is first called, we know that a leaf page was too
93 * full for an insert. We don't know what leaf page it was, but we
94 * have the key/recno that caused the problem. We call XX_search to
95 * reacquire the leaf page, but this time get both the leaf page and
96 * its parent, locked. We then split the leaf page and see if the new
97 * internal key will fit into the parent page. If it will, we're done.
99 * If it won't, we discard our current locks and repeat the process,
100 * only this time acquiring the parent page and its parent, locked.
101 * This process repeats until we succeed in the split, splitting the
102 * root page as the final resort. The entire process then repeats,
103 * as necessary, until we split a leaf page.
106 * A traditional method of speeding this up is to maintain a stack of
107 * the pages traversed in the original search. You can detect if the
108 * stack is correct by storing the page's LSN when it was searched and
109 * comparing that LSN with the current one when it's locked during the
110 * split. This would be an easy change for this code, but I have no
111 * numbers that indicate it's worthwhile.
113 for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
115 * Acquire a page and its parent, locked.
117 if ((ret = (dbp->type == DB_BTREE ?
118 __bam_search(dbp, arg, S_WRPAIR, level, NULL, &exact) :
120 (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
123 /* Split the page. */
124 ret = t->bt_csp[0].page->pgno == PGNO_ROOT ?
125 __bam_root(dbp, &t->bt_csp[0]) :
126 __bam_page(dbp, &t->bt_csp[-1], &t->bt_csp[0]);
130 /* Once we've split the leaf page, we're done. */
131 if (level == LEAFLEVEL)
134 /* Switch directions. */
140 * It's possible to fail to split repeatedly, as other
141 * threads may be modifying the tree, or the page usage
142 * is sufficiently bad that we don't get enough space
157 * Split the root page of a btree.
171 if (cp->page->level >= MAXBTREELEVEL)
174 /* Create new left and right pages for the split. */
176 if ((ret = __bam_new(dbp, TYPE(cp->page), &lp)) != 0 ||
177 (ret = __bam_new(dbp, TYPE(cp->page), &rp)) != 0)
179 P_INIT(lp, dbp->pgsize, lp->pgno,
180 PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
181 cp->page->level, TYPE(cp->page));
182 P_INIT(rp, dbp->pgsize, rp->pgno,
183 ISINTERNAL(cp->page) ? PGNO_INVALID : lp->pgno, PGNO_INVALID,
184 cp->page->level, TYPE(cp->page));
186 /* Split the page. */
187 if ((ret = __bam_psplit(dbp, cp, lp, rp, 1)) != 0)
190 /* Log the change. */
191 if (DB_LOGGING(dbp)) {
194 memset(&__a, 0, sizeof(__a));
196 __a.size = dbp->pgsize;
198 if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbp->txn,
199 &LSN(cp->page), 0, dbp->log_fileid, PGNO(lp), &LSN(lp),
200 PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &__lsn,
203 LSN(lp) = LSN(rp) = LSN(cp->page);
206 /* Clean up the new root page. */
207 if ((ret = (dbp->type == DB_RECNO ?
208 __ram_root(dbp, cp->page, lp, rp) :
209 __bam_broot(dbp, cp->page, lp, rp))) != 0)
212 /* Success -- write the real pages back to the store. */
213 (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
214 (void)__BT_TLPUT(dbp, cp->lock);
215 (void)memp_fput(dbp->mpf, lp, DB_MPOOL_DIRTY);
216 (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
219 ++t->lstat.bt_rootsplit;
223 (void)__bam_free(dbp, lp);
225 (void)__bam_free(dbp, rp);
226 (void)memp_fput(dbp->mpf, cp->page, 0);
227 (void)__BT_TLPUT(dbp, cp->lock);
233 * Split the non-root page of a btree.
236 __bam_page(dbp, pp, cp)
249 /* Create new right page for the split. */
250 if ((ret = __bam_new(dbp, TYPE(cp->page), &rp)) != 0)
252 P_INIT(rp, dbp->pgsize, rp->pgno,
253 ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->pgno,
254 ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->next_pgno,
255 cp->page->level, TYPE(cp->page));
257 /* Create new left page for the split. */
258 if ((lp = (PAGE *)malloc(dbp->pgsize)) == NULL) {
263 memset(lp, 0xff, dbp->pgsize);
265 P_INIT(lp, dbp->pgsize, cp->page->pgno,
266 ISINTERNAL(cp->page) ? PGNO_INVALID : cp->page->prev_pgno,
267 ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
268 cp->page->level, TYPE(cp->page));
274 * Only the indices are sorted on the page, i.e., the key/data pairs
275 * aren't, so it's simpler to copy the data from the split page onto
276 * two new pages instead of copying half the data to the right page
277 * and compacting the left page in place. Since the left page can't
278 * change, we swap the original and the allocated left page after the
281 if ((ret = __bam_psplit(dbp, cp, lp, rp, 0)) != 0)
285 * Fix up the previous pointer of any leaf page following the split
289 * There are interesting deadlock situations here as we write-lock a
290 * page that's not in our direct ancestry. Consider a cursor walking
291 * through the leaf pages, that has the previous page read-locked and
292 * is waiting on a lock for the page we just split. It will deadlock
293 * here. If this is a problem, we can fail in the split; it's not a
294 * problem as the split will succeed after the cursor passes through
295 * the page we're splitting.
297 if (TYPE(cp->page) == P_LBTREE && rp->next_pgno != PGNO_INVALID) {
298 if ((ret = __bam_lget(dbp,
299 0, rp->next_pgno, DB_LOCK_WRITE, &tplock)) != 0)
301 if ((ret = __bam_pget(dbp, &tp, &rp->next_pgno, 0)) != 0)
305 /* Insert the new pages into the parent page. */
306 if ((ret = __bam_pinsert(dbp, pp, lp, rp)) != 0)
309 /* Log the change. */
310 if (DB_LOGGING(dbp)) {
313 memset(&__a, 0, sizeof(__a));
315 __a.size = dbp->pgsize;
318 if ((ret = __bam_split_log(dbp->dbenv->lg_info, dbp->txn,
319 &cp->page->lsn, 0, dbp->log_fileid, PGNO(cp->page),
320 &LSN(cp->page), PGNO(rp), &LSN(rp), (u_int32_t)NUM_ENT(lp),
321 tp == NULL ? 0 : PGNO(tp),
322 tp == NULL ? &__lsn : &LSN(tp), &__a)) != 0)
325 LSN(lp) = LSN(rp) = LSN(cp->page);
327 LSN(tp) = LSN(cp->page);
330 /* Copy the allocated page into place. */
331 memcpy(cp->page, lp, LOFFSET(lp));
332 memcpy((u_int8_t *)cp->page + HOFFSET(lp),
333 (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
334 FREE(lp, dbp->pgsize);
337 /* Finish the next-page link. */
339 tp->prev_pgno = rp->pgno;
341 /* Success -- write the real pages back to the store. */
342 (void)memp_fput(dbp->mpf, pp->page, DB_MPOOL_DIRTY);
343 (void)__BT_TLPUT(dbp, pp->lock);
344 (void)memp_fput(dbp->mpf, cp->page, DB_MPOOL_DIRTY);
345 (void)__BT_TLPUT(dbp, cp->lock);
346 (void)memp_fput(dbp->mpf, rp, DB_MPOOL_DIRTY);
348 (void)memp_fput(dbp->mpf, tp, DB_MPOOL_DIRTY);
349 (void)__BT_TLPUT(dbp, tplock);
354 FREE(lp, dbp->pgsize);
356 (void)__bam_free(dbp, rp);
358 (void)memp_fput(dbp->mpf, tp, 0);
359 (void)__BT_TLPUT(dbp, tplock);
361 (void)memp_fput(dbp->mpf, pp->page, 0);
362 (void)__BT_TLPUT(dbp, pp->lock);
363 (void)memp_fput(dbp->mpf, cp->page, 0);
364 (void)__BT_TLPUT(dbp, cp->lock);
370 * Fix up the btree root page after it has been split.
372 * PUBLIC: int __bam_broot __P((DB *, PAGE *, PAGE *, PAGE *));
375 __bam_broot(dbp, rootp, lp, rp)
377 PAGE *rootp, *lp, *rp;
379 BINTERNAL bi, *child_bi;
385 * If the root page was a leaf page, change it into an internal page.
386 * We copy the key we split on (but not the key's data, in the case of
387 * a leaf page) to the new root page.
389 P_INIT(rootp, dbp->pgsize,
390 PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
393 * The btree comparison code guarantees that the left-most key on any
394 * level of the tree is never used, so it doesn't need to be filled in.
397 B_TSET(bi.type, B_KEYDATA, 0);
399 if (F_ISSET(dbp, DB_BT_RECNUM)) {
400 bi.nrecs = __bam_total(lp);
401 RE_NREC_SET(rootp, bi.nrecs);
403 memset(&hdr, 0, sizeof(hdr));
405 hdr.size = SSZA(BINTERNAL, data);
406 memset(&data, 0, sizeof(data));
407 data.data = (char *)"";
410 __db_pitem(dbp, rootp, 0, BINTERNAL_SIZE(0), &hdr, &data)) != 0)
415 /* Copy the first key of the child page onto the root page. */
416 child_bi = GET_BINTERNAL(rp, 0);
418 bi.len = child_bi->len;
419 B_TSET(bi.type, child_bi->type, 0);
421 if (F_ISSET(dbp, DB_BT_RECNUM)) {
422 bi.nrecs = __bam_total(rp);
423 RE_NREC_ADJ(rootp, bi.nrecs);
426 hdr.size = SSZA(BINTERNAL, data);
427 data.data = child_bi->data;
428 data.size = child_bi->len;
429 if ((ret = __db_pitem(dbp, rootp, 1,
430 BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
433 /* Increment the overflow ref count. */
434 if (B_TYPE(child_bi->type) == B_OVERFLOW && (ret =
435 __db_ioff(dbp, ((BOVERFLOW *)(child_bi->data))->pgno)) != 0)
439 /* Copy the first key of the child page onto the root page. */
440 child_bk = GET_BKEYDATA(rp, 0);
441 switch (B_TYPE(child_bk->type)) {
443 bi.len = child_bk->len;
444 B_TSET(bi.type, child_bk->type, 0);
446 if (F_ISSET(dbp, DB_BT_RECNUM)) {
447 bi.nrecs = __bam_total(rp);
448 RE_NREC_ADJ(rootp, bi.nrecs);
451 hdr.size = SSZA(BINTERNAL, data);
452 data.data = child_bk->data;
453 data.size = child_bk->len;
454 if ((ret = __db_pitem(dbp, rootp, 1,
455 BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
460 bi.len = BOVERFLOW_SIZE;
461 B_TSET(bi.type, child_bk->type, 0);
463 if (F_ISSET(dbp, DB_BT_RECNUM)) {
464 bi.nrecs = __bam_total(rp);
465 RE_NREC_ADJ(rootp, bi.nrecs);
468 hdr.size = SSZA(BINTERNAL, data);
469 data.data = child_bk;
470 data.size = BOVERFLOW_SIZE;
471 if ((ret = __db_pitem(dbp, rootp, 1,
472 BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
475 /* Increment the overflow ref count. */
476 if (B_TYPE(child_bk->type) == B_OVERFLOW && (ret =
477 __db_ioff(dbp, ((BOVERFLOW *)child_bk)->pgno)) != 0)
481 return (__db_pgfmt(dbp, rp->pgno));
485 return (__db_pgfmt(dbp, rp->pgno));
492 * Fix up the recno root page after it has been split.
494 * PUBLIC: int __ram_root __P((DB *, PAGE *, PAGE *, PAGE *));
497 __ram_root(dbp, rootp, lp, rp)
499 PAGE *rootp, *lp, *rp;
505 /* Initialize the page. */
506 P_INIT(rootp, dbp->pgsize,
507 PGNO_ROOT, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
509 /* Initialize the header. */
510 memset(&hdr, 0, sizeof(hdr));
512 hdr.size = RINTERNAL_SIZE;
514 /* Insert the left and right keys, set the header information. */
516 ri.nrecs = __bam_total(lp);
517 if ((ret = __db_pitem(dbp, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
519 RE_NREC_SET(rootp, ri.nrecs);
521 ri.nrecs = __bam_total(rp);
522 if ((ret = __db_pitem(dbp, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
524 RE_NREC_ADJ(rootp, ri.nrecs);
530 * Insert a new key into a parent page, completing the split.
533 __bam_pinsert(dbp, parent, lchild, rchild)
536 PAGE *lchild, *rchild;
538 BINTERNAL bi, *child_bi;
539 BKEYDATA *child_bk, *tmp_bk;
546 u_int32_t n, nbytes, nksize;
550 ppage = parent->page;
552 /* If handling record numbers, count records split to the right page. */
553 nrecs = dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM) ?
554 __bam_total(rchild) : 0;
557 * Now we insert the new page's first key into the parent page, which
558 * completes the split. The parent points to a PAGE and a page index
559 * offset, where the new key goes ONE AFTER the index, because we split
563 * Some btree algorithms replace the key for the old page as well as
564 * the new page. We don't, as there's no reason to believe that the
565 * first key on the old page is any better than the key we have, and,
566 * in the case of a key being placed at index 0 causing the split, the
567 * key is unavailable.
569 off = parent->indx + O_INDX;
572 * Calculate the space needed on the parent page.
574 * Prefix trees: space hack used when inserting into BINTERNAL pages.
575 * Retain only what's needed to distinguish between the new entry and
576 * the LAST entry on the page to its left. If the keys compare equal,
577 * retain the entire key. We ignore overflow keys, and the entire key
578 * must be retained for the next-to-leftmost key on the leftmost page
579 * of each level, or the search will fail. Applicable ONLY to internal
580 * pages that have leaf pages as children. Further reduction of the
581 * key between pairs of internal pages loses too much information.
583 switch (TYPE(rchild)) {
585 child_bi = GET_BINTERNAL(rchild, 0);
586 nbytes = BINTERNAL_PSIZE(child_bi->len);
588 if (P_FREESPACE(ppage) < nbytes)
589 return (DB_NEEDSPLIT);
591 /* Add a new record for the right page. */
592 bi.len = child_bi->len;
593 B_TSET(bi.type, child_bi->type, 0);
594 bi.pgno = rchild->pgno;
596 memset(&hdr, 0, sizeof(hdr));
598 hdr.size = SSZA(BINTERNAL, data);
599 memset(&data, 0, sizeof(data));
600 data.data = child_bi->data;
601 data.size = child_bi->len;
602 if ((ret = __db_pitem(dbp, ppage, off,
603 BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
606 /* Increment the overflow ref count. */
607 if (B_TYPE(child_bi->type) == B_OVERFLOW && (ret =
608 __db_ioff(dbp, ((BOVERFLOW *)(child_bi->data))->pgno)) != 0)
612 child_bk = GET_BKEYDATA(rchild, 0);
613 switch (B_TYPE(child_bk->type)) {
615 nbytes = BINTERNAL_PSIZE(child_bk->len);
616 nksize = child_bk->len;
617 if (t->bt_prefix == NULL)
619 if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
621 tmp_bk = GET_BKEYDATA(lchild, NUM_ENT(lchild) - P_INDX);
622 if (B_TYPE(tmp_bk->type) != B_KEYDATA)
624 memset(&a, 0, sizeof(a));
625 a.size = tmp_bk->len;
626 a.data = tmp_bk->data;
627 memset(&b, 0, sizeof(b));
628 b.size = child_bk->len;
629 b.data = child_bk->data;
630 nksize = t->bt_prefix(&a, &b);
631 if ((n = BINTERNAL_PSIZE(nksize)) < nbytes) {
632 t->lstat.bt_pfxsaved += nbytes - n;
635 noprefix: nksize = child_bk->len;
637 if (P_FREESPACE(ppage) < nbytes)
638 return (DB_NEEDSPLIT);
641 B_TSET(bi.type, child_bk->type, 0);
642 bi.pgno = rchild->pgno;
644 memset(&hdr, 0, sizeof(hdr));
646 hdr.size = SSZA(BINTERNAL, data);
647 memset(&data, 0, sizeof(data));
648 data.data = child_bk->data;
650 if ((ret = __db_pitem(dbp, ppage, off,
651 BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
656 nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
658 if (P_FREESPACE(ppage) < nbytes)
659 return (DB_NEEDSPLIT);
661 bi.len = BOVERFLOW_SIZE;
662 B_TSET(bi.type, child_bk->type, 0);
663 bi.pgno = rchild->pgno;
665 memset(&hdr, 0, sizeof(hdr));
667 hdr.size = SSZA(BINTERNAL, data);
668 memset(&data, 0, sizeof(data));
669 data.data = child_bk;
670 data.size = BOVERFLOW_SIZE;
671 if ((ret = __db_pitem(dbp, ppage, off,
672 BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
675 /* Increment the overflow ref count. */
676 if (B_TYPE(child_bk->type) == B_OVERFLOW && (ret =
677 __db_ioff(dbp, ((BOVERFLOW *)child_bk)->pgno)) != 0)
681 return (__db_pgfmt(dbp, rchild->pgno));
686 nbytes = RINTERNAL_PSIZE;
688 if (P_FREESPACE(ppage) < nbytes)
689 return (DB_NEEDSPLIT);
691 /* Add a new record for the right page. */
692 memset(&hdr, 0, sizeof(hdr));
694 hdr.size = RINTERNAL_SIZE;
695 ri.pgno = rchild->pgno;
697 if ((ret = __db_pitem(dbp,
698 ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
702 return (__db_pgfmt(dbp, rchild->pgno));
705 /* Adjust the parent page's left page record count. */
706 if (dbp->type == DB_RECNO || F_ISSET(dbp, DB_BT_RECNUM)) {
707 /* Log the change. */
708 if (DB_LOGGING(dbp) &&
709 (ret = __bam_cadjust_log(dbp->dbenv->lg_info,
710 dbp->txn, &LSN(ppage), 0, dbp->log_fileid,
711 PGNO(ppage), &LSN(ppage), (u_int32_t)parent->indx,
712 -(int32_t)nrecs, (int32_t)0)) != 0)
715 /* Update the left page count. */
716 if (dbp->type == DB_RECNO)
717 GET_RINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
719 GET_BINTERNAL(ppage, parent->indx)->nrecs -= nrecs;
727 * Do the real work of splitting the page.
730 __bam_psplit(dbp, cp, lp, rp, cleft)
738 db_indx_t half, nbytes, off, splitp, top;
739 int adjust, cnt, isbigkey, ret;
743 adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
746 * If we're splitting the first (last) page on a level because we're
747 * inserting (appending) a key to it, it's likely that the data is
748 * sorted. Moving a single item to the new page is less work and can
749 * push the fill factor higher than normal. If we're wrong it's not
750 * a big deal, we'll just do the split the right way next time.
753 if (NEXT_PGNO(pp) == PGNO_INVALID &&
754 ((ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page) - 1) ||
755 (!ISINTERNAL(pp) && cp->indx == NUM_ENT(cp->page))))
756 off = NUM_ENT(cp->page) - adjust;
757 else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
762 ++t->lstat.bt_fastsplit;
767 * Split the data to the left and right pages. Try not to split on
768 * an overflow key. (Overflow keys on internal pages will slow down
769 * searches.) Refuse to split in the middle of a set of duplicates.
771 * First, find the optimum place to split.
773 * It's possible to try and split past the last record on the page if
774 * there's a very large record at the end of the page. Make sure this
775 * doesn't happen by bounding the check at the next-to-last entry on
778 * Note, we try and split half the data present on the page. This is
779 * because another process may have already split the page and left
780 * it half empty. We don't try and skip the split -- we don't know
781 * how much space we're going to need on the page, and we may need up
782 * to half the page for a big item, so there's no easy test to decide
783 * if we need to split or not. Besides, if two threads are inserting
784 * data into the same place in the database, we're probably going to
785 * need more space soon anyway.
787 top = NUM_ENT(pp) - adjust;
788 half = (dbp->pgsize - HOFFSET(pp)) / 2;
789 for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
792 if (B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA)
794 BINTERNAL_SIZE(GET_BINTERNAL(pp, off)->len);
796 nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
799 if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
801 BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
803 nbytes += BOVERFLOW_SIZE;
806 if (B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)
808 BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
810 nbytes += BOVERFLOW_SIZE;
813 nbytes += RINTERNAL_SIZE;
816 nbytes += BKEYDATA_SIZE(GET_BKEYDATA(pp, off)->len);
819 return (__db_pgfmt(dbp, pp->pgno));
824 * Splitp is either at or just past the optimum split point. If
825 * it's a big key, try and find something close by that's not.
827 if (TYPE(pp) == P_IBTREE)
828 isbigkey = B_TYPE(GET_BINTERNAL(pp, off)->type) != B_KEYDATA;
829 else if (TYPE(pp) == P_LBTREE)
830 isbigkey = B_TYPE(GET_BKEYDATA(pp, off)->type) != B_KEYDATA;
834 for (cnt = 1; cnt <= 3; ++cnt) {
835 off = splitp + cnt * adjust;
836 if (off < (db_indx_t)NUM_ENT(pp) &&
837 ((TYPE(pp) == P_IBTREE &&
838 B_TYPE(GET_BINTERNAL(pp,off)->type) == B_KEYDATA) ||
839 B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA)) {
843 if (splitp <= (db_indx_t)(cnt * adjust))
845 off = splitp - cnt * adjust;
846 if (TYPE(pp) == P_IBTREE ?
847 B_TYPE(GET_BINTERNAL(pp, off)->type) == B_KEYDATA :
848 B_TYPE(GET_BKEYDATA(pp, off)->type) == B_KEYDATA) {
855 * We can't split in the middle a set of duplicates. We know that
856 * no duplicate set can take up more than about 25% of the page,
857 * because that's the point where we push it off onto a duplicate
858 * page set. So, this loop can't be unbounded.
860 if (F_ISSET(dbp, DB_AM_DUP) && TYPE(pp) == P_LBTREE &&
861 pp->inp[splitp] == pp->inp[splitp - adjust])
862 for (cnt = 1;; ++cnt) {
863 off = splitp + cnt * adjust;
864 if (off < NUM_ENT(pp) &&
865 pp->inp[splitp] != pp->inp[off]) {
869 if (splitp <= (db_indx_t)(cnt * adjust))
871 off = splitp - cnt * adjust;
872 if (pp->inp[splitp] != pp->inp[off]) {
873 splitp = off + adjust;
879 /* We're going to split at splitp. */
880 if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
882 if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
885 /* Adjust the cursors. */
886 __bam_ca_split(dbp, pp->pgno, lp->pgno, rp->pgno, splitp, cleft);
892 * Copy a set of records from one page to another.
894 * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
897 __bam_copy(dbp, pp, cp, nxt, stop)
902 db_indx_t dup, nbytes, off;
905 * Copy the rest of the data to the right page. Nxt is the next
906 * offset placed on the target page.
908 for (dup = off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
911 if (B_TYPE(GET_BINTERNAL(pp, nxt)->type) == B_KEYDATA)
913 BINTERNAL_SIZE(GET_BINTERNAL(pp, nxt)->len);
915 nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
919 * If we're on a key and it's a duplicate, just copy
922 if (off != 0 && (nxt % P_INDX) == 0 &&
923 pp->inp[nxt] == pp->inp[nxt - P_INDX]) {
924 cp->inp[off] = cp->inp[off - P_INDX];
929 if (B_TYPE(GET_BKEYDATA(pp, nxt)->type) == B_KEYDATA)
931 BKEYDATA_SIZE(GET_BKEYDATA(pp, nxt)->len);
933 nbytes = BOVERFLOW_SIZE;
936 nbytes = RINTERNAL_SIZE;
939 return (__db_pgfmt(dbp, pp->pgno));
941 cp->inp[off] = HOFFSET(cp) -= nbytes;
942 memcpy(P_ENTRY(cp, off), P_ENTRY(pp, nxt), nbytes);