11.原子事务
# 11. 原子事务
事务允许以原子性方式执行多个更新操作(要么全部执行,要么全部不执行)。在上一章中,更新一行数据可能会导致多个键值对(KV)更新(由于二级索引的存在),这些更新不具备原子性,如果被中断(不具备抗崩溃能力),可能会导致数据损坏。实现事务可以解决这个问题。
目前,我们只考虑顺序执行,并发问题将在下一章讨论。
# 11.1 键值对事务接口
第一步是添加键值对事务类型。
// 键值对事务
type KVTX struct {
// 后续补充……
}
1
2
3
4
2
3
4
结束事务有两种方式:
- 通过提交事务来持久化更改。
- 或者通过中止事务来回滚操作。
// 开始一个事务
func (kv *KV) Begin(tx *KVTX)
// 结束一个事务:提交更新
func (kv *KV) Commit(tx *KVTX) error
// 结束一个事务:回滚
func (kv *KV) Abort(tx *KVTX)
1
2
3
4
5
6
2
3
4
5
6
读取和更新键值对存储的方法被移到了事务类型中。请注意,这些方法不再会失败,因为它们不执行输入/输出(IO)操作,IO操作由提交事务时执行,提交操作可能会失败。
// 键值对操作
func (tx *KVTX) Get(key []byte) ([]byte, bool) {
return tx.db.tree.Get(key)
}
func (tx *KVTX) Seek(key []byte, cmp int) *BIter {
return tx.db.tree.Seek(key, cmp)
}
func (tx *KVTX) Update(req *InsertReq) bool {
tx.db.tree.InsertEx(req)
return req.Added
}
func (tx *KVTX) Del(req *DeleteReq) bool {
return tx.db.tree.DeleteEx(req)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
# 11.2 数据库事务接口
类似地,我们也会为数据库(DB)添加事务类型,它是对键值对事务类型的包装。
// 数据库事务
type DBTX struct {
kv KVTX
db *DB
}
func (db *DB) Begin(tx *DBTX) {
tx.db = db
db.kv.Begin(&tx.kv)
}
func (db *DB) Commit(tx *DBTX) error {
return db.kv.Commit(&tx.kv)
}
func (db *DB) Abort(tx *DBTX) {
db.kv.Abort(&tx.kv)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
读取和更新方法也被移到了事务类型中。
func (tx *DBTX) TableNew(tdef *TableDef) error
func (tx *DBTX) Get(table string, rec *Record) (bool, error)
func (tx *DBTX) Set(table string, rec Record, mode int) (bool, error)
func (tx *DBTX) Delete(table string, rec Record) (bool, error)
func (tx *DBTX) Scan(table string, req *Scanner) error
1
2
3
4
5
2
3
4
5
对数据库代码的修改主要是改变函数的参数,代码清单中省略这些修改内容。
# 11.3 实现键值对事务
事务类型保存了内存数据结构的副本:树的根节点指针和空闲列表头指针。
// 键值对事务
type KVTX struct {
db *KV
// 用于回滚
tree struct {
root uint64
}
free struct {
head uint64
}
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
这用于事务回滚。回滚事务只需将指针指向之前的树的根节点,即使在写入B树数据时出现IO错误,也能轻松完成。
// 开始一个事务
func (kv *KV) Begin(tx *KVTX) {
tx.db = kv
tx.tree.root = kv.tree.root
tx.free.head = kv.free.head
}
1
2
3
4
5
6
2
3
4
5
6
// 回滚树和其他内存数据结构。
func rollbackTX(tx *KVTX) {
kv := tx.db
kv.tree.root = tx.tree.root
kv.free.head = tx.free.head
kv.page.nfree = 0
kv.page.nappend = 0
kv.page.updates = map[uint64][]byte{}
}
// 结束一个事务:回滚
func (kv *KV) Abort(tx *KVTX) {
rollbackTX(tx)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
提交事务与我们之前持久化数据的方式没有太大区别,只是在提交的第一阶段出现错误时需要回滚。
// 结束一个事务:提交更新
func (kv *KV) Commit(tx *KVTX) error {
if kv.tree.root == tx.tree.root {
return nil // 没有更新?
}
// 阶段1:将页数据持久化到磁盘。
if err := writePages(kv); err != nil {
rollbackTX(tx)
return err
}
// 页数据必须在主页面之前到达磁盘。这里使用fsync作为屏障。
if err := kv.fp.Sync(); err != nil {
rollbackTX(tx)
return fmt.Errorf("fsync: %w", err)
}
// 此时事务已可见。
kv.page.flushed += uint64(kv.page.nappend)
kv.page.nfree = 0
kv.page.nappend = 0
kv.page.updates = map[uint64][]byte{}
// 阶段2:更新主页面以指向新的树。
// 注意:如果阶段2失败,无法将树回滚到旧版本。
// 因为无法得知主页面的状态。从旧根节点更新可能会导致数据损坏。
if err := masterStore(kv); err != nil {
return err
}
if err := kv.fp.Sync(); err != nil {
return fmt.Errorf("fsync: %w", err)
}
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
本章的改动不多,因为我们省略了一个重要方面——并发,这将在下一章探讨。