4.B树:实践(第一部分)
# 04. B树:实践(第一部分)
本章用Go语言实现一个不可变的B + 树。实现过程较为精简,便于理解。
# 4.1 节点格式
我们的B树最终要持久化到磁盘,所以首先需要设计B树节点的存储格式。如果没有这种格式,我们就无法得知节点的大小,也不知道何时对节点进行分裂。
一个节点由以下部分组成:
- 固定大小的头部,包含节点类型(叶节点或内部节点)和键的数量。
- 指向子节点的指针列表(内部节点使用)。
- 指向每个键值对的偏移量列表。
- 压缩存储的键值对。
| type | nkeys | pointers | offsets | key-values
| 2B | 2B | nkeys * 8B | nkeys * 2B | . . .
2
这是键值对的格式,先存储长度,再存储数据。
| klen | vlen | key | val |
| 2B | 2B | . . . | . . . |
2
为简化操作,叶节点和内部节点采用相同的格式。
# 4.2 数据类型
既然我们最终要将B树存储到磁盘上,那么在内存中也使用字节数组作为数据结构不是很好吗?
type BNode struct {
data []byte // 可存储到磁盘
}
const (
BNODE_NODE = 1 // 不存储值的内部节点
BNODE_LEAF = 2 // 存储值的叶节点
)
2
3
4
5
6
7
8
并且我们不能使用内存指针,这里的指针是指向磁盘页的64位整数,而不是指向内存节点的指针。我们将添加一些回调函数来抽象这部分内容,以便数据结构代码能保持纯粹的数据结构特性。
type BTree struct {
// 指针(非零的页号)
root uint64
// 用于管理磁盘页的回调函数
get func(uint64) BNode // 解引用指针
new func(BNode) uint64 // 分配新页
del func(uint64) // 释放页
}
2
3
4
5
6
7
8
页大小定义为4K字节。8K或16K等更大的页大小也可行。
我们还对键和值的大小做了一些限制,确保单个键值对总能存储在单个页中。如果你需要支持更大的键或值,就必须为它们分配额外的页,这会增加复杂性。
const HEADER = 4
const BTREE_PAGE_SIZE = 4096
const BTREE_MAX_KEY_SIZE = 1000
const BTREE_MAX_VAL_SIZE = 3000
func init() {
node1max := HEADER + 8 + 2 + 4 + BTREE_MAX_KEY_SIZE + BTREE_MAX_VAL_SIZE
assert(node1max <= BTREE_PAGE_SIZE)
}
2
3
4
5
6
7
8
9
# 4.3 解码B树节点
由于节点只是一个字节数组,我们添加一些辅助函数来访问其内容。
// header
func (node BNode) btype() uint16 {
return binary.LittleEndian.Uint16(node.data)
}
func (node BNode) nkeys() uint16 {
return binary.LittleEndian.Uint16(node.data[2:4])
}
func (node BNode) setHeader(btype uint16, nkeys uint16) {
binary.LittleEndian.PutUint16(node.data[0:2], btype)
binary.LittleEndian.PutUint16(node.data[2:4], nkeys)
}
2
3
4
5
6
7
8
9
10
11
// pointers
func (node BNode) getPtr(idx uint16) uint64 {
assert(idx < node.nkeys())
pos := HEADER + 8*idx
return binary.LittleEndian.Uint64(node.data[pos:])
}
func (node BNode) setPtr(idx uint16, val uint64) {
assert(idx < node.nkeys())
pos := HEADER + 8*idx
binary.LittleEndian.PutUint64(node.data[pos:], val)
}
2
3
4
5
6
7
8
9
10
11
偏移量列表的一些细节:
- 偏移量是相对于第一个键值对位置的。
- 第一个键值对的偏移量始终为零,因此不会存储在列表中。
- 我们在偏移量列表中存储最后一个键值对末尾的偏移量,用于确定节点的大小。
// offset list
func offsetPos(node BNode, idx uint16) uint16 {
assert(1 <= idx && idx <= node.nkeys())
return HEADER + 8*node.nkeys() + 2*(idx-1)
}
func (node BNode) getOffset(idx uint16) uint16 {
if idx == 0 {
return 0
}
return binary.LittleEndian.Uint16(node.data[offsetPos(node, idx):])
}
func (node BNode) setOffset(idx uint16, offset uint16) {
binary.LittleEndian.PutUint16(node.data[offsetPos(node, idx):], offset)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
偏移量列表用于快速定位第n个键值对。
// key-values
func (node BNode) kvPos(idx uint16) uint16 {
assert(idx <= node.nkeys())
return HEADER + 8*node.nkeys() + 2*node.nkeys() + node.getOffset(idx)
}
func (node BNode) getKey(idx uint16) []byte {
assert(idx < node.nkeys())
pos := node.kvPos(idx)
klen := binary.LittleEndian.Uint16(node.data[pos:])
return node.data[pos+4:][:klen]
}
func (node BNode) getVal(idx uint16) []byte {
assert(idx < node.nkeys())
pos := node.kvPos(idx)
klen := binary.LittleEndian.Uint16(node.data[pos+0:])
vlen := binary.LittleEndian.Uint16(node.data[pos+2:])
return node.data[pos+4+klen:][:vlen]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
以及用于确定节点大小的函数:
// node size in bytes
func (node BNode) nbytes() uint16 {
return node.kvPos(node.nkeys())
}
2
3
4
# 4.4 B树插入操作
代码分为几个小步骤。
# 步骤1:查找键
要将键插入叶节点,我们需要在有序的键值列表中查找其位置。
// 返回第一个键范围与目标键有交集的子节点(kid[i] <= 键)
// TODO: 二分查找
func nodeLookupLE(node BNode, key []byte) uint16 {
nkeys := node.nkeys()
found := uint16(0)
// 第一个键是从父节点复制过来的,
// 因此它总是小于或等于目标键。
for i := uint16(1); i < nkeys; i++ {
cmp := bytes.Compare(node.getKey(i), key)
if cmp <= 0 {
found = i
}
if cmp >= 0 {
break
}
}
return found
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
该查找函数适用于叶节点和内部节点。注意,比较时会跳过第一个键,因为在父节点中已经对其进行过比较。
# 步骤2:更新叶节点
查找到插入位置后,我们需要创建一个包含新键的节点副本。
// 向叶节点添加新键
func leafInsert(
new BNode, old BNode, idx uint16,
key []byte, val []byte,
) {
new.setHeader(BNODE_LEAF, old.nkeys()+1)
nodeAppendRange(new, old, 0, 0, idx)
nodeAppendKV(new, idx, 0, key, val)
nodeAppendRange(new, old, idx+1, idx, old.nkeys()-idx)
}
2
3
4
5
6
7
8
9
10
nodeAppendRange
函数用于将键从旧节点复制到新节点。
// 将多个键值对复制到指定位置
func nodeAppendRange(
new BNode, old BNode,
dstNew uint16, srcOld uint16, n uint16,
) {
assert(srcOld+n <= old.nkeys())
assert(dstNew+n <= new.nkeys())
if n == 0 {
return
}
// 指针
for i := uint16(0); i < n; i++ {
new.setPtr(dstNew+i, old.getPtr(srcOld+i))
}
// 偏移量
dstBegin := new.getOffset(dstNew)
srcBegin := old.getOffset(srcOld)
for i := uint16(1); i <= n; i++ { // 注意:范围是 [1, n]
offset := dstBegin + old.getOffset(srcOld+i) - srcBegin
new.setOffset(dstNew+i, offset)
}
// 键值对
begin := old.kvPos(srcOld)
end := old.kvPos(srcOld + n)
copy(new.data[new.kvPos(dstNew):], old.data[begin:end])
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
nodeAppendKV
函数用于将一个键值对复制到新节点。
// 将一个键值对复制到指定位置
func nodeAppendKV(new BNode, idx uint16, ptr uint64, key []byte, val []byte) {
// 指针
new.setPtr(idx, ptr)
// 键值对
pos := new.kvPos(idx)
binary.LittleEndian.PutUint16(new.data[pos+0:], uint16(len(key)))
binary.LittleEndian.PutUint16(new.data[pos+2:], uint16(len(val)))
copy(new.data[pos+4:], key)
copy(new.data[pos+4+uint16(len(key)):], val)
// 下一个键的偏移量
new.setOffset(idx+1, new.getOffset(idx)+4+uint16((len(key)+len(val))))
}
2
3
4
5
6
7
8
9
10
11
12
13
# 步骤3:递归插入
插入键的主函数。
// 向节点插入一个键值对,结果可能会分裂为2个节点。
// 调用者负责释放输入节点,并对结果节点进行分裂和分配操作。
func treeInsert(tree *BTree, node BNode, key []byte, val []byte) BNode {
// 结果节点。
// 允许其大小超过1页,如果超页会进行分裂
new := BNode{data: make([]byte, 2*BTREE_PAGE_SIZE)}
// 在哪里插入键?
idx := nodeLookupLE(node, key)
// 根据节点类型进行操作
switch node.btype() {
case BNODE_LEAF:
// 叶节点,node.getKey(idx) <= 键
if bytes.Equal(key, node.getKey(idx)) {
// 找到键,更新它。
leafUpdate(new, node, idx, key, val)
} else {
// 在该位置之后插入。
leafInsert(new, node, idx+1, key, val)
}
case BNODE_NODE:
// 内部节点,将其插入子节点。
nodeInsert(tree, new, node, idx, key, val)
default:
panic("bad node!")
}
return new
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
leafUpdate
函数与leafInsert
函数类似。
# 步骤4:处理内部节点
接下来是处理内部节点的代码。
// treeInsert()的一部分:向内部节点插入键值对
func nodeInsert(
tree *BTree, new BNode, node BNode, idx uint16,
key []byte, val []byte,
) {
// 获取并释放子节点
kptr := node.getPtr(idx)
knode := tree.get(kptr)
tree.del(kptr)
// 递归插入到子节点
knode = treeInsert(tree, knode, key, val)
// 分裂结果
nsplit, splited := nodeSplit3(knode)
// 更新子节点链接
nodeReplaceKidN(tree, new, node, idx, splited[:nsplit]...)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 步骤5:分裂大节点
向节点插入键会增加其大小,可能导致超过页大小。在这种情况下,需要将节点分裂为多个较小的节点。
最大允许的键大小和值大小仅保证单个键值对总能存储在一个页中。在最坏的情况下,较大的节点会分裂为3个节点(中间是一个大的键值对)。
// 将超过允许大小的节点分裂为两个。
// 第二个节点总能存储在一个页中。
func nodeSplit2(left BNode, right BNode, old BNode) {
// 代码省略...
}
// 如果节点太大则进行分裂。结果是1 - 3个节点。
func nodeSplit3(old BNode) (uint16, [3]BNode) {
if old.nbytes() <= BTREE_PAGE_SIZE {
old.data = old.data[:BTREE_PAGE_SIZE]
return 1, [3]BNode{old}
}
left := BNode{make([]byte, 2*BTREE_PAGE_SIZE)} // 之后可能会分裂
right := BNode{make([]byte, BTREE_PAGE_SIZE)}
nodeSplit2(left, right, old)
if left.nbytes() <= BTREE_PAGE_SIZE {
left.data = left.data[:BTREE_PAGE_SIZE]
return 2, [3]BNode{left, right}
}
// 左节点仍然太大
leftleft := BNode{make([]byte, BTREE_PAGE_SIZE)}
middle := BNode{make([]byte, BTREE_PAGE_SIZE)}
nodeSplit2(leftleft, middle, left)
assert(leftleft.nbytes() <= BTREE_PAGE_SIZE)
return 3, [3]BNode{leftleft, middle, right}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 步骤6:更新内部节点
向节点插入键可能会产生1个、2个或3个节点。父节点必须相应地进行自我更新。更新内部节点的代码与更新叶节点的代码类似。
// 用多个链接替换一个链接
func nodeReplaceKidN(
tree *BTree, new BNode, old BNode, idx uint16,
kids ...BNode,
) {
inc := uint16(len(kids))
new.setHeader(BNODE_NODE, old.nkeys()+inc-1)
nodeAppendRange(new, old, 0, 0, idx)
for i, node := range kids {
nodeAppendKV(new, idx+uint16(i), tree.new(node), node.getKey(0), nil)
}
nodeAppendRange(new, old, idx+inc, idx+1, old.nkeys()-(idx+1))
}
2
3
4
5
6
7
8
9
10
11
12
13
我们已经完成了B树的插入操作。删除操作及剩余代码将在下一章介绍。