网络连接建立
# Memcached源码分析三 网络连接建立
接着上一篇继续分析,上一篇请参考 《Memcached源码阅读之网络监听的建立》,这篇主要分析TCP的连接建立(从前面的代码分析可以看出,这个过程是由主线程驱动的),UDP没有连接建立的过程,所以之间进行连接分发,我们后续分析,现在直接上代码进行讲解。
conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport,
struct event_base *base)
{
conn *c = conn_from_freelist();
//获取一个空闲连接,conn是Memcached内部对网络连接的一个封装
//如果没有空闲的连接
if (NULL == c)
{
if (!(c = (conn *)calloc(1, sizeof(conn))))//申请空间
{
fprintf(stderr, "calloc()\n");
return NULL;
}MEMCACHED_CONN_CREATE(c);
//进行一些初始化
c->rbuf = c->wbuf = 0;
c->ilist = 0;
c->suffixlist = 0;
c->iov = 0;
c->msglist = 0;
c->hdrbuf = 0;
c->rsize = read_buffer_size;
c->wsize = DATA_BUFFER_SIZE;
c->isize = ITEM_LIST_INITIAL;
c->suffixsize = SUFFIX_LIST_INITIAL;
c->iovsize = IOV_LIST_INITIAL;
c->msgsize = MSG_LIST_INITIAL;
c->hdrsize = 0;
//每个conn都自带读入和输出缓冲区,在进行网络收发数据时,特别方便
c->rbuf = (char *)malloc((size_t)c->rsize);
c->wbuf = (char *)malloc((size_t)c->wsize);
c->ilist = (item **)malloc(sizeof(item *) * c->isize);
c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize);
c->msglist = (struct msghdr *) malloc(
sizeof(struct msghdr) * c->msgsize);
if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0
|| c->msglist == 0 || c->suffixlist == 0)
{
conn_free(c);
fprintf(stderr, "malloc()\n");
return NULL;
}
STATS_LOCK();
//统计变量更新
stats.conn_structs++;
STATS_UNLOCK();
}
c->transport = transport;
c->protocol = settings.binding_protocol;
if (!settings.socketpath)
{
c->request_addr_size = sizeof(c->request_addr);
}
else
{
c->request_addr_size = 0;
}
//输出一些日志信息
if (settings.verbose > 1)
{
if (init_state == conn_listening)
{
fprintf(stderr, "<%d server listening (%s)\n", sfd,
prot_text(c->protocol));
}
else if (IS_UDP(transport))
{
fprintf(stderr, "<%d server listening (udp)\n", sfd);
}
else if (c->protocol == negotiating_prot)
{
fprintf(stderr, "<%d new auto-negotiating client connection\n",
sfd);
}
else if (c->protocol == ascii_prot)
{
fprintf(stderr, "<%d new ascii client connection.\n", sfd);
}
else if (c->protocol == binary_prot)
{
fprintf(stderr, "<%d new binary client connection.\n", sfd);
}
else
{
fprintf(stderr, "<%d new unknown (%d) client connection\n", sfd,
c->protocol);
assert(false);
}
}
c->sfd = sfd;
c->state = init_state;
c->rlbytes = 0;
c->cmd = -1;
c->rbytes = c->wbytes = 0;
c->wcurr = c->wbuf;
c->rcurr = c->rbuf;
c->ritem = 0;
c->icurr = c->ilist;
c->suffixcurr = c->suffixlist;
c->ileft = 0;
c->suffixleft = 0;
c->iovused = 0;
c->msgcurr = 0;
c->msgused = 0;
c->write_and_go = init_state;
c->write_and_free = 0;
c->item = 0;
c->noreply = false;
//建立sfd描述符上面的event事件,事件回调函数为event_handler
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1)
{
//如果建立libevent事件失败,将创建的conn添加到空闲列表中
if (conn_add_to_freelist(c))
{
conn_free(c);
}
perror("event_add");
return NULL;
}
STATS_LOCK();
//统计信息更新
stats.curr_conns++;
stats.total_conns++;
STATS_UNLOCK();
MEMCACHED_CONN_ALLOCATE(c->sfd);
return c;
}
//获得conn
conn *conn_from_freelist()
{
conn *c;
pthread_mutex_lock(&conn_lock);//操作链表,加锁,保持同步
//freecurr为静态全局变量
if (freecurr > 0)
{
//freeconns是在Memcached启动时初始化的
c = freeconns[--freecurr];
}
else//没有conn
{
c = NULL;
}
pthread_mutex_unlock(&conn_lock);
return c;
}
//添加conn到空闲链表中
bool conn_add_to_freelist(conn *c)
{
bool ret = true;
pthread_mutex_lock(&conn_lock);
//freeconns还有空间
if (freecurr < freetotal)
{
freeconns[freecurr++] = c;//直接添加
ret = false;
}
else
{
//没有多余空间,进行扩容,按目前容量的2倍进行扩容
size_t newsize = freetotal * 2;
conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
if (new_freeconns)
{
freetotal = newsize;
freeconns = new_freeconns;
freeconns[freecurr++] = c;
ret = false;
}
}
pthread_mutex_unlock(&conn_lock);
return ret;
}
//libevent事件回调函数的处理,回调函数被调用时,表明Memcached监听的端口号有网络事件到了
void event_handler(const int fd, const short which, void *arg)
{
conn *c;
c = (conn *)arg;
assert(c != NULL);
c->which = which;
//这种情况应该很少出现
if (fd != c->sfd)
{
if (settings.verbose > 0)
fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
conn_close(c);
return;
}
//进入业务处理状态机
drive_machine(c);
return;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
编辑 (opens new window)
上次更新: 2023/12/11, 22:32:09