10.二级索引
# 10. 二级索引
在本章中,我们将为数据库添加额外的索引(也称为二级索引)。查询将不再局限于主键。
# 10.1 索引定义
Indexes
和 IndexPrefixes
字段被添加到表定义中。与表本身一样,每个索引在键值存储(KV store)中都被分配了一个键前缀。
// 表定义
type TableDef struct {
// 用户定义
Name string
Types []uint32 // 列类型
Cols []string // 列名
PKeys int // 前PKeys列是主键
Indexes [][]string
// 为不同表/索引自动分配的B树键前缀
Prefix uint32
IndexPrefixes []uint32
}
2
3
4
5
6
7
8
9
10
11
12
要通过索引查找行,索引必须包含主键的副本。我们将通过在索引后附加主键列来实现这一点;这也使得索引键唯一,这是B树查找代码所要求的。
func checkIndexKeys(tdef *TableDef, index []string) ([]string, error) {
icols := map[string]bool{}
for _, c := range index {
// 检查索引列
// 省略...
icols[c] = true
}
// 将主键添加到索引中
for _, c := range tdef.Cols[:tdef.PKeys] {
if!icols[c] {
index = append(index, c)
}
}
assert(len(index) < len(tdef.Cols))
return index, nil
}
func colIndex(tdef *TableDef, col string) int {
for i, c := range tdef.Cols {
if c == col {
return i
}
}
return -1
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
在创建新表之前,会检查索引并附加主键。
func tableDefCheck(tdef *TableDef) error {
// 验证表定义
// 省略...
// 验证索引
for i, index := range tdef.Indexes {
index, err := checkIndexKeys(tdef, index)
if err != nil {
return err
}
tdef.Indexes[i] = index
}
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
创建新表时会分配多个键前缀。
// 创建新表
func (db *DB) TableNew(tdef *TableDef) error {
if err := tableDefCheck(tdef); err != nil {
return err
}
// 检查表是否已存在
// 省略...
// 分配新的前缀
tdef.Prefix = /* 省略... */
for i := range tdef.Indexes {
prefix := tdef.Prefix + 1 + uint32(i)
tdef.IndexPrefixes = append(tdef.IndexPrefixes, prefix)
}
// 更新下一个前缀
ntree := 1 + uint32(len(tdef.Indexes))
binary.LittleEndian.PutUint32(meta.Get("val").Str, tdef.Prefix+ntree)
_, err := dbUpdate(db, TDEF_META, *meta, 0)
if err != nil {
return err
}
// 存储定义
// 省略...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 10.2 维护索引
更新行之后,我们需要从索引中删除旧行。B树接口被修改为返回更新前的值。
type InsertReq struct {
tree *BTree
// 输出
Added bool // 添加了新键
Updated bool // 添加了新键或旧键被更改
Old []byte // 更新前的值
// 输入
Key []byte
Val []byte
Mode int
}
type DeleteReq struct {
tree *BTree
// 输入
Key []byte
// 输出
Old []byte
}
func (tree *BTree) InsertEx(req *InsertReq)
func (tree *BTree) DeleteEx(req *DeleteReq)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
以下是在索引中添加或删除记录的函数。这里我们遇到一个问题:使用二级索引更新表涉及键值存储中的多个键,这些操作应该原子性地完成。我们将在后面的章节中解决这个问题。
const (
INDEX_ADD = 1
INDEX_DEL = 2
)
// 在添加或删除记录后维护索引
func indexOp(db *DB, tdef *TableDef, rec Record, op int) {
key := make([]byte, 0, 256)
irec := make([]Value, len(tdef.Cols))
for i, index := range tdef.Indexes {
// 索引键
for j, c := range index {
irec[j] = *rec.Get(c)
}
// 更新键值存储
key = encodeKey(key[:0], tdef.IndexPrefixes[i], irec[:len(index)])
done, err := false, error(nil)
switch op {
case INDEX_ADD:
done, err = db.kv.Update(&InsertReq{Key: key})
case INDEX_DEL:
done, err = db.kv.Del(&DeleteReq{Key: key})
default:
panic("what?")
}
assert(err == nil) // XXX:将在后面的章节中修复这个问题
assert(done)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
更新或删除行之后维护索引:
// 向表中添加一行
func dbUpdate(db *DB, tdef *TableDef, rec Record, mode int) (bool, error) {
// 省略...
req := InsertReq{Key: key, Val: val, Mode: mode}
added, err := db.kv.Update(&req)
if err != nil ||!req.Updated || len(tdef.Indexes) == 0 {
return added, err
}
// 维护索引
if req.Updated &&!req.Added {
decodeValues(req.Old, values[tdef.PKeys:]) // 获取旧行
indexOp(db, tdef, Record{tdef.Cols, values}, INDEX_DEL)
}
if req.Updated {
indexOp(db, tdef, rec, INDEX_ADD)
}
return added, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 通过主键删除记录
func dbDelete(db *DB, tdef *TableDef, rec Record) (bool, error) {
// 省略...
deleted, err := db.kv.Del(&req)
if err != nil ||!deleted || len(tdef.Indexes) == 0 {
return deleted, err
}
// 维护索引
if deleted {
// 同理...
}
return true, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 10.3 使用二级索引
# 步骤1:选择索引
我们还将使用索引前缀来实现范围查询。例如,对于索引 [a, b, c]
(该索引包含前缀 [a]
),我们可以进行 x < a AND a < y
这样的查询。选择索引的过程,其实就是根据输入前缀来匹配列。在考虑二级索引之前,会优先考虑主键。
func findIndex(tdef *TableDef, keys []string) (int, error) {
pk := tdef.Cols[:tdef.PKeys]
if isPrefix(pk, keys) {
// 使用主键。
// 也适用于无键的全表扫描。
return -1, nil
}
// 寻找合适的索引
winner := -2
for i, index := range tdef.Indexes {
if!isPrefix(index, keys) {
continue
}
if winner == -2 || len(index) < len(tdef.Indexes[winner]) {
winner = i
}
}
if winner == -2 {
return -2, fmt.Errorf("no index found")
}
return winner, nil
}
func isPrefix(long []string, short []string) bool {
if len(long) < len(short) {
return false
}
for i, c := range short {
if long[i] != c {
return false
}
}
return true
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 步骤2:编码索引前缀
如果输入键使用的是索引前缀而不是完整索引,我们可能需要对额外的列进行编码。例如,对于使用索引 [a, b]
进行 v1 < a
的查询,我们不能直接将 [v1] < key
作为底层B树的查询条件,因为任何形如 [v1, v2]
的键都满足 [v1] < [v1, v2]
,但这可能不符合 v1 < a
的条件。
在这种情况下,我们可以使用 [v1, MAX] < key
,这里的 MAX
是列 b
的最大可能值。以下是为部分查询键编码额外列的函数。
// 范围键可以是索引键的前缀,
// 我们可能需要对缺失的列进行编码,以使比较操作有效。
func encodeKeyPartial(
out []byte, prefix uint32, values []Value,
tdef *TableDef, keys []string, cmp int,
) []byte {
out = encodeKey(out, prefix, values)
// 根据比较运算符,将缺失的列编码为最小值或最大值。
// 1. 空字符串比所有可能的值编码都小,
// 因此对于CMP_LT和CMP_GE,我们无需添加任何内容。
// 2. 最大编码为全0xff字节。
max := cmp == CMP_GT || cmp == CMP_LE
loop:
for i := len(values); max && i < len(keys); i++ {
switch tdef.Types[colIndex(tdef, keys[i])] {
case TYPE_BYTES:
out = append(out, 0xff)
break loop // 在此处停止,因为没有字符串编码以0xff开头
case TYPE_INT64:
out = append(out, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff)
default:
panic("what?")
}
}
return out
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
对于 int64
类型,最大值被编码为全 0xff
字节。问题在于字符串没有最大值。我们可以将 "\xff"
用作 “伪最大字符串值” 的编码,并修改常规字符串编码,使其不以 "\xff"
开头。
如果字符串的第一个字节是 "\xff"
或 "\xfe"
,则用 "\xfe"
字节对其进行转义。这样,所有字符串编码都小于 "\xff"
。
// 1. 字符串编码为以null结尾的字符串,
// 转义null字节,使字符串中不包含null字节。
// 2. "\xff" 表示键比较中的最高序,
// 如果第一个字节是0xff,也对其进行转义。
func escapeString(in []byte) []byte {
// 省略……
pos := 0
if len(in) > 0 && in[0] >= 0xfe {
out[0] = 0xfe
out[1] = in[0]
pos += 2
in = in[1:]
}
// 省略……
return out
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 步骤3:通过索引获取行数据
索引键包含所有主键列,这样我们就可以找到完整的行数据。现在,Scanner
类型能够识别所选的索引。
// 范围查询的迭代器
type Scanner struct {
// 省略……
db *DB
tdef *TableDef
indexNo int // -1:使用主键;>= 0:使用索引
iter *BIter // 底层B树迭代器
keyEnd []byte // 编码后的Key2
}
2
3
4
5
6
7
8
9
// 获取当前行
func (sc *Scanner) Deref(rec *Record) {
assert(sc.Valid())
tdef := sc.tdef
rec.Cols = tdef.Cols
rec.Vals = rec.Vals[:0]
key, val := sc.iter.Deref()
if sc.indexNo < 0 {
// 主键,解码键值对
// 省略……
} else {
// 二级索引
// 索引不使用键值对存储的“值”部分
assert(len(val) == 0)
// 首先解码主键
index := tdef.Indexes[sc.indexNo]
ival := make([]Value, len(index))
for i, c := range index {
ival[i].Type = tdef.Types[colIndex(tdef, c)]
}
decodeValues(key[4:], ival)
icol := Record{index, ival}
// 通过主键获取行数据
rec.Cols = tdef.Cols[:tdef.PKeys]
for _, c := range rec.Cols {
rec.Vals = append(rec.Vals, *icol.Get(c))
}
// 待办事项:如果索引包含所有列,则跳过此步骤
ok, err := dbGet(sc.db, tdef, rec)
assert(ok && err == nil)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 步骤4:整合所有部分
dbScan
函数经过修改,可以使用二级索引。现在,通过索引前缀进行的范围查询也能正常工作。并且,它还可以在没有任何键的情况下扫描整个表(如果没有指定列,则选择主键)。
func dbScan(db *DB, tdef *TableDef, req *Scanner) error {
// 合理性检查
// 省略……
// 选择索引
indexNo, err := findIndex(tdef, req.Key1.Cols)
if err != nil {
return err
}
index, prefix := tdef.Cols[:tdef.PKeys], tdef.Prefix
if indexNo >= 0 {
index, prefix = tdef.Indexes[indexNo], tdef.IndexPrefixes[indexNo]
}
req.db = db
req.tdef = tdef
req.indexNo = indexNo
// 定位到起始键
keyStart := encodeKeyPartial(
nil, prefix, req.Key1.Vals, tdef, index, req.Cmp1)
req.keyEnd = encodeKeyPartial(
nil, prefix, req.Key2.Vals, tdef, index, req.Cmp2)
req.iter = db.kv.tree.Seek(keyStart, req.Cmp1)
return nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 步骤5:恭喜
我们已经实现了关系型数据库的一些主要功能:表、范围查询和二级索引。我们可以开始为数据库添加更多功能和查询语言。不过,仍有一些重要方面缺失:事务和并发,这些将在后续章节中探讨。