线程交互
# Memcached源码阅读十六 线程交互
Memcached
按之前的分析可以知道,其是典型的Master-Worker
线程模型,这种模型很典型,其工作模型是Master绑定端口,监听网络连接,接受网络连接之后,通过线程间通信来唤醒Worker线程,Worker线程已经连接的描述符执行读写操作,这种模型简化了整个通信模型,下面分析下这个过程。
case conn_listening:
addrlen = sizeof(addr);
//Master线程(main)进入状态机之后执行accept操作,这个操作也是非阻塞的。
if ((sfd = accept(c->sfd, (struct sockaddr *) &addr, &addrlen)) == -1)
{
//非阻塞模型,这个错误码继续等待
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
stop = true;
}
//连接超载
else if (errno == EMFILE)
{
if (settings.verbose > 0)
fprintf(stderr, "Too many open connections\n");
accept_new_conns(false);
stop = true;
}
else
{
perror("accept()");
stop = true;
}
break;
}
//已经accept成功,将accept之后的描述符设置为非阻塞的
if ((flags = fcntl(sfd, F_GETFL, 0)) < 0
|| fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0)
{
perror("setting O_NONBLOCK");
close(sfd);
break;
}
//判断是否超过最大连接数
if (settings.maxconns_fast
&& stats.curr_conns + stats.reserved_fd >= settings.maxconns - 1)
{
str = "ERROR Too many open connections\r\n";
res = write(sfd, str, strlen(str));
close(sfd);
STATS_LOCK();
stats.rejected_conns++;
STATS_UNLOCK();
}
else
{
//直线连接分发
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
}
stop = true;
break;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
这个是TCP的连接建立过程,由于UDP不需要建立连接,所以直接分发给Worker线程,让Worker线程进行读写操作,而TCP在建立连接之后,也执行连接分发(和UDP的一样),下面看看dispatch_conn_new
内部是如何进行链接分发的。
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport)
{
//创建一个连接队列
CQ_ITEM *item = cqi_new();
char buf[1];
//通过round-robin算法选择一个线程
int tid = (last_thread + 1) % settings.num_threads;
//thread数组存储了所有的工作线程
LIBEVENT_THREAD *thread = threads + tid;
//缓存这次的线程编号,下次待用
last_thread = tid;
//sfd表示accept之后的描述符
item->sfd = sfd;
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
//投递item信息到Worker线程的工作队列中
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
buf[0] = 'c';
//在Worker线程的notify_send_fd写入字符c,表示有连接
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
投递到子线程的连接队列之后,同时,通过往子线程的PIPE管道写入字符c来,下面我们看看子线程是如何处理的?
//子线程会在PIPE管道读上面建立libevent事件,事件回调函数是thread_libevent_process
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
//PIPE管道读取一个字节的数据
if (read(fd, buf, 1) != 1)
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
switch (buf[0])
{
case 'c':
//从连接队列读出Master线程投递的消息
item = cq_pop(me->new_conn_queue);
if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport, me->base);//创建连接
if (c == NULL) {
if (IS_UDP(item->transport)) {
fprintf(stderr, "Can't listen for events on UDP socket\n");
exit(1);
} else {
if (settings.verbose > 0) {
fprintf(stderr, "Can't listen for events on fd %d\n",
item->sfd);
}
close(item->sfd);
}
} else {
c->thread = me;
}
cqi_free(item);
}
break;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
之前分析过conn_new
的执行流程,conn_new里面会建立sfd的网络监听libevent事件,事件回调函数为event_handler
。
event_set(&c->event, sfd, event_flags, event_handler, (void *) c);
event_base_set(base, &c->event);
1
2
2
event_handler
的执行流程最终会进入到业务处理的状态机中,关于状态机,后续分析。
编辑 (opens new window)
上次更新: 2023/12/11, 22:32:09