资源初始化
# Memcached源码阅读五 资源初始化
Memcached
内部有hash表,各种统计信息,工作线程,网络,连接,内存结构等,在memcached启动时(执行main函数),会对这些资源进行初始化的,网络和内存的初始化操作放到后续分析,这次分析hash表,统计信息,工作线程,网络连接的初始化过程。
1 hash表的初始化
//hash表的初始化,传入的参数是启动时传入的
assoc_init(settings.hashpower_init);
//hashsize的实现
#define hashsize(n) ((ub4)1<<(n))
//主hash表结构定义,在hash表扩容时,会有次hash表,所以有主次hash表区分,该结构是指针的指针,也即相当于数组指针
static item** primary_hashtable = 0;
void assoc_init(const int hashtable_init) {
if (hashtable_init) {
//如果设置了初始化参数,则按设置的参数进行初始化
hashpower = hashtable_init;
}
//hashpower的默认值为16,如果未设置新值,则按默认值进行初始化
primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
if (! primary_hashtable) {
fprintf(stderr, "Failed to init hashtable.\n");
exit(EXIT_FAILURE);
}
STATS_LOCK();//全局统计信息加锁,保证数据同步
stats.hash_power_level = hashpower;
stats.hash_bytes = hashsize(hashpower) * sizeof(void *);
STATS_UNLOCK();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2 统计信息的初始化
Memcached
内部有很多全局的统计信息,用于实时获取各个资源的使用情况,后面将会看到,所有对统计信息的更新都需要加锁,而这些信息的更新是和Memcached的操作次数同数量级的,所以,在一定程度来说,这些统计信息对性能有影响。
stats结构是对统计信息的一个抽象,各个字段都比较好理解,不做解释。
struct stats {
pthread_mutex_t mutex;
unsigned int curr_items;
unsigned int total_items;
uint64_t curr_bytes;
unsigned int curr_conns;
unsigned int total_conns;
uint64_t rejected_conns;
unsigned int reserved_fds;
unsigned int conn_structs;
uint64_t get_cmds;
uint64_t set_cmds;
uint64_t touch_cmds;
uint64_t get_hits;
uint64_t get_misses;
uint64_t touch_hits;
uint64_t touch_misses;
uint64_t evictions;
uint64_t reclaimed;
time_t started; /* when the process was started */
bool accepting_conns; /* whether we are currently accepting */
uint64_t listen_disabled_num;
unsigned int hash_power_level; /* Better hope it's not over 9000 */
uint64_t hash_bytes; /* size used for hash tables */
bool hash_is_expanding; /* If the hash table is being expanded */
uint64_t expired_unfetched; /* items reclaimed but never touched */
uint64_t evicted_unfetched; /* items evicted but never touched */
bool slab_reassign_running; /* slab reassign in progress */
uint64_t slabs_moved; /* times slabs were moved around */
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
统计信息的初始化也就是对stats变量的一个初始化。
//全局对象的定义
struct stats stats;
//全局变量的初始化,该全局变量在memcached启动之后,一直使用
static void stats_init(void)
{
stats.curr_items = stats.total_items = stats.curr_conns =
stats.total_conns = stats.conn_structs = 0;
stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses =
stats.evictions = stats.reclaimed = 0;
stats.touch_cmds = stats.touch_misses = stats.touch_hits =
stats.rejected_conns = 0;
stats.curr_bytes = stats.listen_disabled_num = 0;
stats.hash_power_level = stats.hash_bytes = stats.hash_is_expanding = 0;
stats.expired_unfetched = stats.evicted_unfetched = 0;
stats.slabs_moved = 0;
stats.accepting_conns = true; /* assuming we start in this state. */
stats.slab_reassign_running = false;
/* make the time we started always be 2 seconds before we really
did, so time(0) - time.started is never zero. if so, things
like 'settings.oldest_live' which act as booleans as well as
values are now false in boolean context... */
process_started = time(0) - 2;
stats_prefix_init();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
3 工作线程的初始化
Memcached
采用了典型的Master-Worker
的线程模式,Master就是由main线程来充当,而Worker线程则是通过Pthread创建的。
//传入线程个数和libevent的main_base实例
thread_init(settings.num_threads, main_base);
//工作线程初始化
void thread_init(int nthreads, struct event_base *main_base) {
int i;
int power;
//初始化各种锁和条件变量
pthread_mutex_init(&cache_lock, NULL);
pthread_mutex_init(&stats_lock, NULL);
pthread_mutex_init(&init_lock, NULL);
pthread_cond_init(&init_cond, NULL);
pthread_mutex_init(&cqi_freelist_lock, NULL);
cqi_freelist = NULL;
//Memcached对hash桶的锁采用分段锁,按线程个数来分段,默认总共是1<<16个hash桶,而锁的数目是1<<power个
/* Want a wide lock table, but don't waste memory */
if (nthreads < 3) {
power = 10;
} else if (nthreads < 4) {
power = 11;
} else if (nthreads < 5) {
power = 12;
} else {
/* 8192 buckets, and central locks don't scale much past 5 threads */
power = 13;
}
item_lock_count = hashsize(power);
//申请1<<power个pthread_mutex_t锁,保存在item_locks数组。
item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
if (! item_locks) {
perror("Can't allocate item locks");
exit(1);
}
//对这些锁进行初始化,这部分可参考APUE的线程部分
for (i = 0; i < item_lock_count; i++) {
pthread_mutex_init(&item_locks[i], NULL);
}
/*创建线程的局部变量,该局部变量的名称为item_lock_type_key,用于保存主hash表所持有的锁的类型
主hash表在进行扩容时,该锁类型会变为全局的锁,否则(不在扩容过程中),则是局部锁*/
pthread_key_create(&item_lock_type_key, NULL);
pthread_mutex_init(&item_global_lock, NULL);
//申请nthreds个工作线程,LIBEVENT_THREAD是Memcached内部对工作线程的一个封装
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
if (! threads) {
perror("Can't allocate thread descriptors");
exit(1);
}
/*分发线程的初始化,分发线程的base为main_base
线程id为main线程的线程id*/
dispatcher_thread.base = main_base;
dispatcher_thread.thread_id = pthread_self();
//工作线程的初始化,工作线程和主线程(main线程)是通过pipe管道进行通信的
for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {//初始化pipe管道
perror("Can't create notify pipe");
exit(1);
}
threads[i].notify_receive_fd = fds[0];//读管道绑定到工作线程的接收消息的描述符
threads[i].notify_send_fd = fds[1];//写管道绑定到工作线程的发送消息的描述符
setup_thread(&threads[i]);//添加工作线程到libevent中
/* Reserve three fds for the libevent base, and two for the pipe */
stats.reserved_fds += 5;//统计信息更新
}
//创建工作线程
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
//等待所有工作线程创建完毕
pthread_mutex_lock(&init_lock);
wait_for_thread_registration(nthreads);
pthread_mutex_unlock(&init_lock);
}
//Memcached内部工作线程的封装
typedef struct {
pthread_t thread_id; //线程ID
struct event_base *base; //libevent的不是线程安全的,每个工作线程持有一个libevent实例,用于pipe管道通信和socket通信
struct event notify_event; //用于监听pipe管道的libevent事件
int notify_receive_fd; //接收pipe管道消息描述符
int notify_send_fd; //发送pipe管道消息描述符
struct thread_stats stats; //每个线程对应的统计信息
struct conn_queue *new_conn_queue; //每个线程都有一个工作队列,主线程接受的连接,挂载到该消息队列中
cache_t *suffix_cache; //后缀cache
uint8_t item_lock_type; //线程操作hash表持有的锁类型,有局部锁和全局锁
} LIBEVENT_THREAD;
//分发线程的封装
typedef struct {
pthread_t thread_id; //线程id
struct event_base *base; //libevent实例
} LIBEVENT_DISPATCHER_THREAD;
//工作线程绑定到libevent实例
static void setup_thread(LIBEVENT_THREAD *me) {
me->base = event_init();//创建libevent实例
if (! me->base) {
fprintf(stderr, "Can't allocate event base\n");
exit(1);
}
//创建管道读的libevent事件,事件的回调函数处理具体的业务信息,关于回调函数的处理,后续分析
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);//设置libevent实例
//添加事件到libevent中
if (event_add(&me->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
//创建消息队列,用于接受主线程连接
me->new_conn_queue = malloc(sizeof(struct conn_queue));
if (me->new_conn_queue == NULL) {
perror("Failed to allocate memory for connection queue");
exit(EXIT_FAILURE);
}
cq_init(me->new_conn_queue);//消息队列初始化
if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
perror("Failed to initialize mutex");
exit(EXIT_FAILURE);
}
//创建线程的后缀cache,没搞懂这个cache有什么作用。
me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
NULL, NULL);
if (me->suffix_cache == NULL) {
fprintf(stderr, "Failed to create suffix cache\n");
exit(EXIT_FAILURE);
}
}
//创建工作线程
static void create_worker(void *(*func)(void *), void *arg) {
pthread_t thread;
pthread_attr_t attr;
int ret;
pthread_attr_init(&attr);//Posix线程部分,线程属性初始化
//通过pthread_create创建线程,线程处理函数是通过外部传入的处理函数为worker_libevent
if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
fprintf(stderr, "Can't create thread: %s\n",
strerror(ret));
exit(1);
}
}
//线程处理函数
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
//默认的hash表的锁为局部锁
me->item_lock_type = ITEM_LOCK_GRANULAR;
pthread_setspecific(item_lock_type_key, &me->item_lock_type);//设定线程的属性
//用于控制工作线程初始化,通过条件变量来控制
register_thread_initialized();
//工作线程的libevent实例启动
event_base_loop(me->base, 0);
return NULL;
}
//阻塞工作线程
static void wait_for_thread_registration(int nthreads) {
while (init_count < nthreads) {
pthread_cond_wait(&init_cond, &init_lock);//在条件变量init_cond上面阻塞,阻塞个数为nthreads-init_count
}
}
//唤醒工作线程
static void register_thread_initialized(void) {
pthread_mutex_lock(&init_lock);
init_count++;
pthread_cond_signal(&init_cond);
pthread_mutex_unlock(&init_lock);
}
//每个线程持有的统计信息
struct thread_stats {
pthread_mutex_t mutex;
uint64_t get_cmds;
uint64_t get_misses;
uint64_t touch_cmds;
uint64_t touch_misses;
uint64_t delete_misses;
uint64_t incr_misses;
uint64_t decr_misses;
uint64_t cas_misses;
uint64_t bytes_read;
uint64_t bytes_written;
uint64_t flush_cmds;
uint64_t conn_yields; /* # of yields for connections (-R option)*/
uint64_t auth_cmds;
uint64_t auth_errors;
struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
};
//每个slab的统计信息
struct slab_stats {
uint64_t set_cmds;
uint64_t get_hits;
uint64_t touch_hits;
uint64_t delete_hits;
uint64_t cas_hits;
uint64_t cas_badval;
uint64_t incr_hits;
uint64_t decr_hits;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
4 连接的初始化
static conn **freeconns;//空闲连接列表
//连接初始化
static void conn_init(void)
{
freetotal = 200;//空闲连接总数
freecurr = 0;//当前空闲的索引
//申请200个空间
if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL)
{
fprintf(stderr, "Failed to allocate connection structures\n");
}
return;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
编辑 (opens new window)
上次更新: 2023/12/11, 22:32:09