Boltdb学习笔记之二--数据结构

2022-03-17 00:00:00 数据 节点 返回 递归 分裂

在boltdb中,核心的数据结构当属B+树了。B+树是数据库或文件系统中常见的数据结构,它的特点是能够保证数据稳定有序,因为每个叶子节点的深度都相同,因此其插入和修改操作拥有较稳定的时间复杂度。

那么boltdb中B+树的节点是如何表示的呢?答案是node
node
对应page
在内存中的数据结构,node
序列化之后便是page
page
反序列化之后是node
。因为node
在内存中,且node
对应B+树的一个节点

在概念上,boltdb中的一个Bucket
对应一棵B+树。如下图所示

node

node
分为两种,branch node和leaf node。

branch node如下所示,需要注意的是,boltdb中的branch node与通常的B+树略有不同,通常B+树中branch node的key数量比子节点数量多1, 而boltdb中key和子节点是一一对应的,这里应该是出于简化设计的考虑。因此在boltdb的branch node中,子节点i
子树中的key应当介于key(i)
key(i+1)
之间,左闭右开。

leaf node如下所示。我们注意到叶子节点的数据除了key
value
之外,还有一个flags
,它的作用是为了区分普通数据和子Bucket
数据。当flags != 0
时,key为当前Bucket下的子Bucket名,value可反序列化为bucket
结构,其中记录着子Bucket对应B+树的根节点所在的page id。稍后在Bucket
一节中我们将会深入探讨。

node
代码如下:

type nodes []*node

// inode represents an internal node inside of a node.
// It can be used to point to elements in a page or point
// to an element which hasn't been added to a page yet.
type inode struct {
 flags uint32
 pgid  pgid
 key   []byte
 value []byte
}

type inodes []inode

// node represents an in-memory, deserialized page.
type node struct {
 bucket     *Bucket
 isLeaf     bool
 unbalanced bool
 spilled    bool
 key        []byte
 pgid       pgid
 parent     *node
 children   nodes
 inodes     inodes
}

node中的成员变量:

  • bucket
    : 当前节点所属的Bucket(直属bucket, 不包含祖先)
  • isLeaf
    : 当前节点是否为叶子节点
  • unbalanced
    : 表示该节点是否有可能被合并。如果从node
    中去掉了某对kv数据, 则unbalance从false变成true。合并策略见
  • spilled
    : 表示该节点是否已经被拆分。在一个写事务中,已拆分的节点不会再被拆分。当node
    中插入数据,导致node
    中数据大小超过一定阈值时,便会触发node
    的拆分,
  • pgid
    : 该node对应page的page id
  • parent
    : 当前节点的父节点
  • inodes
    : 本节点内的key/value数据对。对于叶子节点来说,key和value都存在。对于非叶子节点来说,只有key存在。
  • key
    inodes
    中所有key/value数据中小的key
  • children
    : 当前节点的所有子节点。如果当前节点是非叶子节点,则len(inodes) = len(children)
    ; 如果当前节点是叶子节点,则children为空

node
中的成员函数:

  • minKeys
    : node允许存在的少key数量(leaf node情况下为1。branch node情况下为2)
  • size
    : 返回node序列化成page之后的字节数,boltdb中以此判断当前节点是否需要分裂或合并
  • sizeLessThan
    : 判断node序列化之后大小是否小于给定大小。出于性能考虑,实现中做了提前返回,比直接调用node.size
    并判断是否小于给定大小要快
  • pageElementSize
    : 当前节点为leaf node时,返回leafPageElementSize
    , 即一个leafPageElement
    对象的长度;当前节点为branch node时,返回branchPageElementSize
    , 即branchPageElement
    对象的长度。
  • childAt
    : 返回第index
    个子节点。实现上,首先从inodes
    中拿到子节点的page id, 然后从bucket
    中根据page id进行索引,终返回子节点对应的node
    对象
  • childIndex
    : 输入child node, 从inodes
    数组中二分查找该child node的位置。注意这里之所以能使用二分查找是因为inodes
    中所有元素都是按照key进行升序排列的
  • numChildren
    : 返回当前节点的子节点数量
  • nextSibling
    : 当前节点的右兄弟节点,如果没有返回nil。实现上,首先调用node.childIndex
    判断当前节点在其父节点中的位置index
    ,然后返回父节点index+1
    位置的子节点
  • prevSibling
    : 当前节点的左兄弟节点,如果没有返回nil。实现上类似nextSibling

node的序列化: node -> page

node.write
: 将node对象序列化到page中

序列化步骤:

  • 根据node.isLeaf
    设置page.flags
  • 检查inodes
    数组长度是否超过上限0xFFFF
    ,超过则panic(因为B+树的分裂策略,这种情况不可能发生,除非有脏数据)
  • inodes
    中的数据复制到page.ptr
    所指向的缓冲区中。如果node是leaf node, 则page缓冲区中写入的是[]leafPageElement
    和所有key/value对。如果node是branch node, 则page缓冲区中写入的是[]branchPageElement
    和所有的key值。
 for i, item := range n.inodes {
  _assert(len(item.key) > "write: zero-length inode key")

  // Write the page element.
  if n.isLeaf {
   elem := p.leafPageElement(uint16(i))
   elem.pos = uint32(uintptr(unsafe.Pointer(&b[])) - uintptr(unsafe.Pointer(elem)))
   elem.flags = item.flags
   elem.ksize = uint32(len(item.key))
   elem.vsize = uint32(len(item.value))
  } else {
   elem := p.branchPageElement(uint16(i))
   elem.pos = uint32(uintptr(unsafe.Pointer(&b[])) - uintptr(unsafe.Pointer(elem)))
   elem.ksize = uint32(len(item.key))
   elem.pgid = item.pgid
   _assert(elem.pgid != p.id, "write: circular dependency occurred")
  }

        // If the length of key+value is larger than the max allocation size
  // then we need to reallocate the byte array pointer.
  //
  // See: https://github.com/boltdb/bolt/pull/335
  klen, vlen := len(item.key), len(item.value)
  if len(b) < klen+vlen {
   b = (*[maxAllocSize]byte)(unsafe.Pointer(&b[]))[:]
  }

  // Write data for the element to the end of the page.
  copy(b[:], item.key)
  b = b[klen:]
  copy(b[:], item.value)
  b = b[vlen:]
 }

node的反序列化:page -> node

node.read
: 从page对象初始化node,即page反序列化成node 反序列化步骤如下:

  • 复制page id到node中
  • 根据page.flags
    确定node.isLeaf
  • 将page缓冲区中的数据复制到node.inodes
    中。branch page还是leaf page两种情况下数据格式略有不同。
// read initializes the node from a page.
func (n *node) read(p *page) {
 n.pgid = p.id
 n.isLeaf = ((p.flags & leafPageFlag) != )
 n.inodes = make(inodes, int(p.count))

 for i := ; i < int(p.count); i++ {
  inode := &n.inodes[i]
  if n.isLeaf {
   elem := p.leafPageElement(uint16(i))
   inode.flags = elem.flags
   inode.key = elem.key()
   inode.value = elem.value()
  } else {
   elem := p.branchPageElement(uint16(i))
   inode.pgid = elem.pgid
   inode.key = elem.key()
  }
  _assert(len(inode.key) > "read: zero-length inode key")
 }

 // Save first key so we can find the node in the parent when we spill.
 if len(n.inodes) >  {
  n.key = n.inodes[].key
  _assert(len(n.key) > "read: zero-length node key")
 } else {
  n.key = nil
 }
}

插入数据

node.put
: 向当前节点中插入key/value
pageid
flags
这两个输入参数是相关的,当flags不为零时,表示这对kv是Bucket数据,此时pageid
是其bucket的root page
。当flags为零时,pageid
没有意义,此时这对kv就是普通数据。

实现上:

  • 首先从当前节点的inodes
    二分搜索 -key/value
    对插入位置index
  • 如果inodes
    中存在等值的key
    , 则覆盖其value
  • 如果inodes
    中不存在等值key
    , 则在index位置处插入一个inode
// put inserts a key/value.
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
    ...

 // Find insertion index.
 index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })

 // Add capacity and shift nodes if we don't have an exact match and need to insert.
 exact := (len(n.inodes) >  && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))
 if !exact {
  n.inodes = append(n.inodes, inode{})
  copy(n.inodes[index+1:], n.inodes[index:])
 }

 inode := &n.inodes[index]
 inode.flags = flags
 inode.key = newKey
 inode.value = value
 inode.pgid = pgid
 _assert(len(inode.key) > "put: zero-length inode key")
}

删除数据

node.del
: 从当期节点中删除一对kv数据。后设置unbalanced为true, 表示本节点有可能触发合并

实现上:

  • 首先从inodes
    中二分查找key
    ,对应位置为index
  • inodes
    中删除index
    位置的inode
    对象
// del removes a key from the node.
func (n *node) del(key []byte) {
 // Find index of key.
 index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 })

 // Exit if the key isn't found.
 if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
  return
 }

 // Delete inode from the node.
 n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)

 // Mark the node as needing rebalancing.
 n.unbalanced = true
}

解引用

node.dereference
: 将所有inodes
数据复制到Heap
上,避免对DB
mmap
缓冲区的引用,避免因为boltdb重新mmap
造成原有mmap
缓冲区失效。这个方法是递归的,对当前节点执行dereference
会递归对所有子节点调用dereference

释放

node.free
: 将当前节点对应的page
freelist
中释放。

Bucket

在Boltdb学习笔记之〇--概述中我们提到,Bucket类似于关系型数据库中的表,区别在于Bucket支持嵌套,一个Bucket中可以包含很多子Bucket。而不管是父Bucket还是子Bucket,每个Bucket都对应一个B+树,B+树中存储着该Bucket的所有数据和children Bucket。因为Bucket与B+树一一对应,boltdb中所有B+树操作都能在Bucket中找到对应接口。因此接下来我们以Bucket为载体总结B+树相关操作。

以下代码定义了bucket
Bucket
两种类型。在bucket
中,root
表示该Bucket对应的B+树的根节点page id,sequence
是一个单调递增的id, 对应一个Bucket。bucket
可序列化为[]byte
类型,作为leaf page中的value值。同样的,在leaf page中,如果某条leafPageElement
数据中flags != 0
, 我们也可读取其value值并将其反序列成bucket
对象。

// bucket represents the on-file representation of a bucket.
// This is stored as the "value" of a bucket key. If the bucket is small enough,
// then its root page can be stored inline in the "value", after the bucket
// header. In the case of inline buckets, the "root" will be 0.
type bucket struct {
    root     pgid   // page id of the bucket's root-level page
    sequence uint64 // monotonically incrementing, used by NextSequence()
}

Bucket
bucket
组成,除此之外还有其他一些成员,注意这些字段仅存在于内存中,无法序列化到db文件中。

// Bucket represents a collection of key/value pairs inside the database.
type Bucket struct {
    *bucket
    tx       *Tx                // the associated transaction
    buckets  map[string]*Bucket // subbucket cache
    page     *page              // inline page reference
    rootNode *node              // materialized node for the root page.
    nodes    map[pgid]*node     // node cache

    // Sets the threshold for filling nodes when they split. By default,
    // the bucket will fill to 50% but it can be useful to increase this
    // amount if you know that your write workloads are mostly append-only.
    //
    // This is non-persisted across transactions so it must be set in every Tx.
    FillPercent float64
}

  • tx
    Bucket
    所关联的事务id
  • buckets
    : 缓存当前Bucket
    下的子Bucket
  • page
    : 仅当Bucket
    inline
    时有效,即本Bucket
    下的所有数据都可容纳于一个page中。
  • rootNode
    : 缓存当前Bucket对应的B+树根节点
  • nodes
    : 缓存Bucket中B+树的节点。
  • FillPercent
    : 设置Bucket对应的B+树中的分裂阈值。默认值为50%。

B+树相关操作

查询数据

Bucket.Get
会在当前Bucket
中搜索- key
对应的value, 如果B+树中找不到key, 或者key对应的value是子Bucket数据,则返回nil值。否则返回key对应的value值。

// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
func (b *Bucket) Get(key []byte) []byte {
 k, v, flags := b.Cursor().seek(key)

 // Return nil if this is a bucket.
 if (flags & bucketLeafFlag) !=  {
  return nil
 }

 // If our target node isn't the same key as what's passed in then return nil.
 if !bytes.Equal(key, k) {
  return nil
 }
 return v
}

以上b.Curosr().seek(key)
即完成了B+树中key的查找, Curosr.seek
代码如下

// seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used.
func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
 _assert(c.bucket.tx.db != nil"tx closed")

 // Start from root page/node and traverse to correct page.
 c.stack = c.stack[:]
 c.search(seek, c.bucket.root)
 ref := &c.stack[len(c.stack)-1]

 // If the cursor is pointing to the end of page/node then return nil.
 if ref.index >= ref.count() {
  return nilnil
 }

 // If this is a bucket then return a nil value.
 return c.keyValue()
}

Cursor.seek
中调用了Cursor.search
, 该函数会从B+树根节点开始递归搜索- key = seek
的数据。如下所示,Cursor.search

  • 首先根据page id获取B+树根节点对应的page
    node
    对象(如果Bucket中已经缓存了该page, 则node
    对象有效,否则为空)
  • 然后对page类型进行校验,必须是branch page或leaf page类型。换言之,page类型不能是meta page或freelist page
  • 接着进行递归搜索:如果n
    有效(对应节点已被Bucket缓存),则执行Cursor.searchNode
    ;否则执行Cursor.searchPage
  • 递归搜索的边界条件是到达B+树叶子节点,此时调用Cursor.nsearch
    在叶子节点中查找key, 记录下个大于等于目标key的位置。
// search recursively performs a binary search against a given page/node until it finds a given key.
func (c *Cursor) search(key []byte, pgid pgid) {
 p, n := c.bucket.pageNode(pgid)
 if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) ==  {
  panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags))
 }
 e := elemRef{page: p, node: n}
 c.stack = append(c.stack, e)

 // If we're on a leaf page/node then find the specific node.
 if e.isLeaf() {
  c.nsearch(key)
  return
 }

 if n != nil {
  c.searchNode(key, n)
  return
 }
 c.searchPage(key, p)
}

Cursor.searchNode
里,在节点n
inodes
数组中二分查找key
,找到后一个小于等于key
值的inode
的位置,接着在该位置对应的子节点中继续查找key

func (c *Cursor) searchNode(key []byte, n *node) {
 var exact bool
 index := sort.Search(len(n.inodes), func(i int) bool {
  // TODO(benbjohnson): Optimize this range search. It's a bit hacky right now.
  // sort.Search() finds the lowest index where f() != -1 but we need the highest index.
  ret := bytes.Compare(n.inodes[i].key, key)
  if ret ==  {
   exact = true
  }
  return ret != -1
 })
 if !exact && index >  {
  index--
 }
 c.stack[len(c.stack)-1].index = index

 // Recursively search to the next page.
 c.search(key, n.inodes[index].pgid)
}

插入数据

接口Bucket.Put
用于在当前Bucket
对应的B+树中插入一对key/value数据。如果key已存在,则用新value覆盖老value值,如果key不存在,则新增key/value对。

查询数据
这一节我们已经见到了Cursor.seek
这个接口,它的功能从Bucket
中搜索目标key。如果目标key存在,返回对应的key, value和flags(用于区分value是普通数据还是子Bucket数据)。如果目标key不存在,则返回大于目标key的小key, 及其对应的value、flags

// Put sets the value for a key in the bucket.
// If the key exist then its previous value will be overwritten.
// Supplied value must remain valid for the life of the transaction.
// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
func (b *Bucket) Put(key []byte, value []byte) error {
 if b.tx.db == nil {
  return ErrTxClosed
 } else if !b.Writable() {
  return ErrTxNotWritable
 } else if len(key) ==  {
  return ErrKeyRequired
 } else if len(key) > MaxKeySize {
  return ErrKeyTooLarge
 } else if int64(len(value)) > MaxValueSize {
  return ErrValueTooLarge
 }

 // Move cursor to correct position.
 c := b.Cursor()
 k, _, flags := c.seek(key)

 // Return an error if there is an existing key with a bucket value.
 if bytes.Equal(key, k) && (flags&bucketLeafFlag) !=  {
  return ErrIncompatibleValue
 }

 // Insert into node.
 key = cloneBytes(key)
 c.node().put(key, key, value, )

 return nil
}

所以以上代码中,当Cursor.seek
返回之后,首先校验flags
, 如果为子Bucket数据返回ErrIncompatibleValue
错误。然后将key/value数据插入到Cursor
对象c
当前所在的节点上。

删除数据

Bucket.Delete
从当前Bucket对应的B+树中删除目标key的数据。如果目标key不存在则什么也不做

实现上:

  • 校验:事务中db是否为空,当前Bucket是否可写
  • 调用Cursor.seek
    寻找目标key在B+树上的位置。
  • 在B+树叶子节点上删除目标key
// Delete removes a key from the bucket.
// If the key does not exist then nothing is done and a nil error is returned.
// Returns an error if the bucket was created from a read-only transaction.
func (b *Bucket) Delete(key []byte) error {
 if b.tx.db == nil {
  return ErrTxClosed
 } else if !b.Writable() {
  return ErrTxNotWritable
 }

 // Move cursor to correct position.
 c := b.Cursor()
 _, _, flags := c.seek(key)

 // Return an error if there is already existing bucket value.
 if (flags & bucketLeafFlag) !=  {
  return ErrIncompatibleValue
 }

 // Delete the node if we have a matching key.
 c.node().del(key)

 return nil
}

节点分裂

随着用户不断向Bucket
中插入新数据,B+树的某些叶子节点可能会超过分裂阈值,从而触发分裂。

Bucket.spill
实现了当前Bucket
对应的B+树的分裂,以及所有子Bucket
对应的B+树的分裂(递归的)。其实现如下:

  • 首先遍历缓存中的所有子Bucket,对于每个子Bucket
    • 如果子Bucket小到可以inlineable
      , 则将子Bucket对应B+树中所有page释放,并将该子Bucket序列化到value中
    • 否则对子Bucket递归调用Bucket.spill
      ,并将更新后的Bucket header序列化到value中.
    • 在当前Bucket对应的B+树中更新子Bucket的value, key为子Bucket名
  • 然后对当前Bucket对应的B+树调用node.spill
    ,执行节点分裂
  • 更新当前Bucket对应的B+树的根节点(B+树节点分裂可能会生成新的根节点)
// spill writes all the nodes for this bucket to dirty pages.
func (b *Bucket) spill() error {
 // Spill all child buckets first.
 for name, child := range b.buckets {
  // If the child bucket is small enough and it has no child buckets then
  // write it inline into the parent bucket's page. Otherwise spill it
  // like a normal bucket and make the parent value a pointer to the page.
  var value []byte
  if child.inlineable() {
   child.free()
   value = child.write()
  } else {
   if err := child.spill(); err != nil {
    return err
   }

   // Update the child bucket header in this bucket.
   value = make([]byte, unsafe.Sizeof(bucket{}))
   var bucket = (*bucket)(unsafe.Pointer(&value[]))
   *bucket = *child.bucket
  }

  // Skip writing the bucket if there are no materialized nodes.
  if child.rootNode == nil {
   continue
  }

  // Update parent node.
  var c = b.Cursor()
  k, _, flags := c.seek([]byte(name))
  if !bytes.Equal([]byte(name), k) {
   panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k))
  }
  if flags&bucketLeafFlag ==  {
   panic(fmt.Sprintf("unexpected bucket header flag: %x", flags))
  }
  c.node().put([]byte(name), []byte(name), value, , bucketLeafFlag)
 }

 // Ignore if there's not a materialized root node.
 if b.rootNode == nil {
  return nil
 }

 // Spill nodes.
 if err := b.rootNode.spill(); err != nil {
  return err
 }
 b.rootNode = b.rootNode.root()

 // Update the root node for this bucket.
 if b.rootNode.pgid >= b.tx.meta.pgid {
  panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid))
 }
 b.root = b.rootNode.pgid

 return nil
}

从上面的分析可以看到B+树分裂的核心逻辑其实在node.spill
中,代码略长这里就不列出了,感兴趣的同学可查看源码。这里仅总结基本思路:

在对一个B+树节点执行分裂操作时:

  • 首先将当前节点的所有子节点按照node.key
    升序排序
  • 然后当前节点的所有子节点递归调用node.spill
    , 执行可能触发的分裂操作
  • 接着对当前节点调用node.split
    ,返回一组分裂后的新节点, 放置于当前节点的父节点下,用于替换当前节点
  • 后处理边界条件:如果当前节点是B+树根节点,判断node.spill
    中是否生成了新的根节点,如果是则对新根节点执行node.spill

node.split
中, 将当前节点根据一定的策略分裂成一组新节点, 策略如下:

  1. 如果当前节点中len(n.inodes) <= minKeysPerPage * 2
    (分裂后每个page中key的数量不超过minKeysPerPage
    ),则不必分裂
  2. 如果当前节点序列化之后的大小小于pageSize
    (该节点可被一个page容纳), 则不必分裂
  3. 根据fillPercent
    计算出node分裂的大小阈值threshold
    fillPercent * pageSize
  4. 根据threshold
    ,找到当前node的分裂点(超过阈值的临界inode的位置)
  5. 如果当前node没有parent,即自身就是B+树根节点, 则创建一个新节点作为当前节点的父节点
  6. 在当前节点分裂点处一分为二,生成两个新节点left
    right
    , 接着对right
    节点重复执行1-6步

节点合并

当boltdb中的B+树不断删除数据之后,导致一些节点中的数据越来越小,此时需要对低于一定阈值的节点进行递归合并。代码中由Bucket.reblance
实现.

在该函数中,首先对所有缓存的node调用node.rebalance
执行合并操作,然后对所有的子Bucket递归调用Bucket.rebalance

// rebalance attempts to balance all nodes.
func (b *Bucket) rebalance() {
 for _, n := range b.nodes {
  n.rebalance()
 }
 for _, child := range b.buckets {
  child.rebalance()
 }
}

从上面的分析看到,节点合并的核心逻辑在node.rebalance
中, 代码较长,这里仅总结其合并策略:

  1. 如果当前节点的unbalanced
    为false, 说明该节点从在本次事务中未删除任何数据,那么无需对其合并
  2. 如果当前节点大小大于pageSize
    的1/4, 且其中key的数量大于node.minKeys()
    , 那么无需对其合并
  3. 除了1和2两种情况下,说明当前节点需要合并,合并步骤如下:
  4. 如果当前节点是B+树的根节点。按照以下两种情况细分
    • 如果当前节点非leaf node且子节点个数为1,将子节点的所有子节点放置于当前节点下,并去除子节点。
    • 如果当前节点是leaf node,直接返回
  5. 如果当前节点的子节点个数为零,则将其从父节点的inodes
    children
    中删除,然后对父节点重新执行rebalance
  6. 如果当前节点是其父节点的个子节点,则将右兄弟节点合并到当前节点中, 并对父节点执行rebalance
  7. 如果当前节点不是其父节点的个子节点,则将当前节点合并到其左兄弟节点中, 并对父节点执行rebalance

Bucket相关操作

创建子Bucket

Bucket.CreateBucket
: 在当前Bucket下创建子Bucket.

  • 首先进行校验:db是否有效,当前Bucket是否可写,待新建Bucket的name是否为空
  • 如果同名的子Bucket已存在,返回ErrBucketExists
    错误
  • 如果同名的key/value数据已存在,返回ErrIncompatibleValue
    错误
  • 在对应的B+树叶子节点中插入子Bucket数据,并返回子Bucket

注意:CreateBucket
会产生一个dirty node和一个new Bucket对象,都是在内存中,在事务提交之前不会flush到磁盘,不会影响其他只读事务。

// CreateBucket creates a new bucket at the given key and returns the new bucket.
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
 if b.tx.db == nil {
  return nil, ErrTxClosed
 } else if !b.tx.writable {
  return nil, ErrTxNotWritable
 } else if len(key) ==  {
  return nil, ErrBucketNameRequired
 }

 // Move cursor to correct position.
 c := b.Cursor()
 k, _, flags := c.seek(key)

 // Return an error if there is an existing key.
 if bytes.Equal(key, k) {
  if (flags & bucketLeafFlag) !=  {
   return nil, ErrBucketExists
  }
  return nil, ErrIncompatibleValue
 }

 // Create empty, inline bucket.
 var bucket = Bucket{
  bucket:      &bucket{},
  rootNode:    &node{isLeaf: true},
  FillPercent: DefaultFillPercent,
 }
 var value = bucket.write()

 // Insert into node.
 key = cloneBytes(key)
 c.node().put(key, key, value, , bucketLeafFlag)

 // Since subbuckets are not allowed on inline buckets, we need to
 // dereference the inline page, if it exists. This will cause the bucket
 // to be treated as a regular, non-inline bucket for the rest of the tx.
 b.page = nil

 return b.Bucket(key), nil
}

获取子Bucket

Bucket.Bucket
实现了从当前Bucket中获取名为name
的子Bucket。首先从缓存Bucket.buckets
中查找子Bucket, 有则直接返回。否则从当前Bucket对应的B+树中查找该子Bucket, 如果存在则将子Bucket加入到缓存中,并返回;否则返回nil

// Bucket retrieves a nested bucket by name.
// Returns nil if the bucket does not exist.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) Bucket(name []byte) *Bucket {
 if b.buckets != nil {
  if child := b.buckets[string(name)]; child != nil {
   return child
  }
 }

 // Move cursor to key.
 c := b.Cursor()
 k, v, flags := c.seek(name)

 // Return nil if the key doesn't exist or it is not a bucket.
 if !bytes.Equal(name, k) || (flags&bucketLeafFlag) ==  {
  return nil
 }

 // Otherwise create a bucket and cache it.
 var child = b.openBucket(v)
 if b.buckets != nil {
  b.buckets[string(name)] = child
 }

 return child
}

删除子Bucket

Bucket.DeleteBucket
实现了在当前Bucket中删除指定name
的子Bucket. 由于子Bucket可能还嵌套包含自己的子Bucket, 因此在从当前Bucket对应的B+树中删除该子Bucket时,还需递归调用Bucket.DeleteBucket
将子Bucket中的子Bucket删除。

相关文章