14. 线程池与异步任务
# 14. 线程池与异步任务
自从咱们的服务器引入了有序集合(sorted set)这种数据类型,就出现了一个小“bug”:删除键的时候有点问题。要是有序集合特别大,释放它的节点可能得花老长时间,在销毁键的过程中,服务器就像被施了定身咒一样“卡”住了。不过别担心,用多线程把析构操作从主线程里“拽”出来,就能轻松解决这个问题。
首先,咱们来认识一下“线程池(thread pool)”,从名字就能看出来,它就像一个线程的“大池子”。池子里的线程负责从任务队列里“捞”任务,然后执行。用pthread
的API实现一个多生产者多消费者的队列,其实也没那么难。(虽然在咱们这儿只有一个生产者。)
pthread
里有两个重要的“小帮手”:pthread_mutex_t
和pthread_cond_t
,它们分别叫互斥锁(mutex)和条件变量(condition variable)。要是你对它们还不太熟,建议读完这章之后,去恶补一下多线程的知识。(比如看看pthread
API的手册、翻翻操作系统的教材、找些线上课程啥的。)
下面简单给大家介绍一下这俩“小帮手”:
- 任务队列会被多个线程(生产者和消费者)访问,就像大家都想去抢同一个宝贝,所以很明显,得用互斥锁来“把门”,保证同一时间只有一个线程能进去“拿东西”。
- 消费者线程没事干的时候,就应该“睡大觉”,只有当队列里有任务了,才能把它们“叫醒”,这就是条件变量的活儿啦。
线程池的数据类型定义如下:
struct Work {
void (*f)(void * ) = NULL;
void *arg = NULL;
};
struct TheadPool {
std::vector<pthread_t> threads;
std::deque<Work> queue;
pthread_mutex_t mu;
pthread_cond_t not_empty;
};
2
3
4
5
6
7
8
9
10
11
thread_pool_init
函数负责初始化线程池,并且启动线程。pthread
类型的变量得用pthread_xxx_init
函数来初始化,pthread_create
函数则用来启动一个线程,目标函数是worker
。
void thread_pool_init(TheadPool *tp, size_t num_threads) {
assert(num_threads > 0);
int rv = pthread_mutex_init(&tp->mu, NULL);
assert(rv == 0);
rv = pthread_cond_init(&tp->not_empty, NULL);
assert(rv == 0);
tp->threads.resize(num_threads);
for (size_t i = 0; i < num_threads; ++i) {
int rv = pthread_create(&tp->threads[i], NULL, &worker, tp);
assert(rv == 0);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
消费者的代码如下:
static void *worker(void *arg) {
TheadPool *tp = (TheadPool * )arg;
while (true) {
pthread_mutex_lock(&tp->mu);
// 等待条件:队列不为空
while (tp->queue.empty()) {
pthread_cond_wait(&tp->not_empty, &tp->mu);
}
// 拿到任务啦
Work w = tp->queue.front();
tp->queue.pop_front();
pthread_mutex_unlock(&tp->mu);
// 开始干活
w.f(w.arg);
}
return NULL;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
生产者的代码如下:
void thread_pool_queue(TheadPool *tp, void (*f)(void * ), void *arg) {
Work w;
w.f = f;
w.arg = arg;
pthread_mutex_lock(&tp->mu);
tp->queue.push_back(w);
pthread_cond_signal(&tp->not_empty);
pthread_mutex_unlock(&tp->mu);
}
2
3
4
5
6
7
8
9
10
下面给大家解释一下:
- 不管是生产者还是消费者,访问队列的代码都被
pthread_mutex_lock
和pthread_mutex_unlock
“包”起来了,这就好比给队列加了个“门禁”,同一时间只有一个线程能进入队列。 - 消费者拿到互斥锁之后,得检查一下队列:
- 如果队列里有任务,就从队列里“捞”一个任务出来,然后释放互斥锁,开始干活。
- 要是队列空了,那就先释放互斥锁,然后“睡大觉”,等条件变量把它“叫醒”。这一步通过调用
pthread_cond_wait
就能实现。
- 生产者把任务放进队列之后,得调用
pthread_cond_signal
,把可能正在“呼呼大睡”的消费者叫醒。 - 消费者被
pthread_cond_wait
叫醒之后,会自动再次持有互斥锁。醒了之后,消费者还得再检查一下条件,如果队列还是空的(条件不满足),那就接着回去“睡大觉”。
关于条件变量的使用,还得再多说几句:pthread_cond_wait
函数一般都得放在一个循环里,用来检查条件是否满足。这是因为在被唤醒的消费者拿到互斥锁之前,其他消费者可能已经把条件给改变了;互斥锁可不是从发出信号的线程直接“移交”到被唤醒的线程手里的!要是你看到条件变量没用在循环里,那很可能是写错啦。
下面给大家梳理一下具体的操作顺序,帮助大家理解条件变量的使用:
- 生产者发出信号。
- 生产者释放互斥锁。
- 某个消费者拿到互斥锁,把队列里的任务都“清空”了。
- 有个消费者被生产者的信号叫醒,也拿到了互斥锁,结果发现队列是空的!
注意,pthread_cond_signal
不需要用互斥锁保护,在释放互斥锁之后再发出信号也是没问题的。
线程池搞定啦,现在把它加到咱们的服务器里:
// 全局变量
static struct {
HMap db;
// 所有客户端连接的映射,通过fd作为键
std::vector<Conn *> fd2conn;
// 空闲连接的定时器
DList idle_list;
// 生存时间(TTL)的定时器
std::vector<HeapItem> heap;
// 线程池
TheadPool tp;
} g_data;
2
3
4
5
6
7
8
9
10
11
12
在main
函数里:
// 一些初始化操作
dlist_init(&g_data.idle_list);
thread_pool_init(&g_data.tp, 4);
2
3
entry_del
函数也得改改:把销毁大的有序集合的操作放到线程池里。不过线程池只处理大的集合,毕竟多线程也是有“开销”的。
// 立即释放键
static void entry_destroy(Entry *ent) {
switch (ent->type) {
case T_ZSET:
zset_dispose(ent->zset);
delete ent->zset;
break;
}
delete ent;
}
static void entry_del_async(void *arg) {
entry_destroy((Entry * )arg);
}
// 从键空间分离后释放entry
static void entry_del(Entry *ent) {
entry_set_ttl(ent, -1);
const size_t k_large_container_size = 10000;
bool too_big = false;
switch (ent->type) {
case T_ZSET:
too_big = hm_size(&ent->zset->hmap) > k_large_container_size;
break;
}
if (too_big) {
thread_pool_queue(&g_data.tp, &entry_del_async, ent);
} else {
entry_destroy(ent);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 练习题
- 信号量(semaphore)常被当作多线程的原语,来替代条件变量和互斥锁。试着用信号量实现一个线程池。
- 下面这些有意思的练习题能帮你进一步理解这些原语:
- 用信号量实现互斥锁。(很简单)
- 用条件变量实现信号量。(比较容易)
- 只用互斥锁实现条件变量。(有点难度)
- 现在你知道这些原语在某种程度上是等价的,那为啥会更倾向于用其中一个呢?
14_server.cpp
avl.cpp
avl.h
common.h
hashtable.cpp
hashtable.h
heap.cpp
heap.h
list.h
thread_pool.cpp
thread_pool.h
zset.cpp
zset.h