CppGuide社区 CppGuide社区
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
  • C++语言面试问题集锦
  • 🔥交易系统开发岗位求职与面试指南 (opens new window)
  • 第1章 高频C++11重难点知识解析
  • 第2章 Linux GDB高级调试指南
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 高性能网络通信协议设计精要
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 后端服务重要模块设计探索
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 源码分析系列

    • leveldb源码分析
    • libevent源码分析
    • Memcached源码分析
    • TeamTalk源码分析
    • 优质源码分享 (opens new window)
    • 🔥远程控制软件gh0st源码分析
  • 从零手写C++项目系列

    • 🔥C++游戏编程入门(零基础学C++)
    • 🔥使用C++17从零开发一个调试器 (opens new window)
    • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
    • 🔥使用C++从零写一个C语言编译器 (opens new window)
    • 🔥从零用C语言写一个Redis
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • Go语言特性

    • Go系统接口编程
    • 高效Go并发编程
    • Go性能调优
    • Go项目架构设计
  • Go项目实战

    • 🔥使用Go从零开发一个数据库
    • 🔥使用Go从零开发一个编译器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥用Go从零写一个编排器(类Kubernetes) (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
  • C++语言面试问题集锦
  • 🔥交易系统开发岗位求职与面试指南 (opens new window)
  • 第1章 高频C++11重难点知识解析
  • 第2章 Linux GDB高级调试指南
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 高性能网络通信协议设计精要
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 后端服务重要模块设计探索
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 源码分析系列

    • leveldb源码分析
    • libevent源码分析
    • Memcached源码分析
    • TeamTalk源码分析
    • 优质源码分享 (opens new window)
    • 🔥远程控制软件gh0st源码分析
  • 从零手写C++项目系列

    • 🔥C++游戏编程入门(零基础学C++)
    • 🔥使用C++17从零开发一个调试器 (opens new window)
    • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
    • 🔥使用C++从零写一个C语言编译器 (opens new window)
    • 🔥从零用C语言写一个Redis
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • Go语言特性

    • Go系统接口编程
    • 高效Go并发编程
    • Go性能调优
    • Go项目架构设计
  • Go项目实战

    • 🔥使用Go从零开发一个数据库
    • 🔥使用Go从零开发一个编译器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥使用Go从零开发一个解释器 (opens new window)
    • 🔥用Go从零写一个编排器(类Kubernetes) (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
  • 服务器资源调整
  • 初始化参数解析
  • 网络监听的建立
  • 网络连接建立
  • 内存初始化
  • 资源初始化
  • get过程
  • cas属性
  • 内存池
  • 连接队列
  • Hash表操作
  • LRU操作
  • set操作
  • do_item_alloc操作
  • item结构
  • Hash表扩容
  • 线程交互
  • 状态机
  • Memcached源码分析
zhangxf
2023-04-02

资源初始化

# Memcached源码阅读五 资源初始化

Memcached内部有hash表,各种统计信息,工作线程,网络,连接,内存结构等,在memcached启动时(执行main函数),会对这些资源进行初始化的,网络和内存的初始化操作放到后续分析,这次分析hash表,统计信息,工作线程,网络连接的初始化过程。

1 hash表的初始化

//hash表的初始化,传入的参数是启动时传入的
assoc_init(settings.hashpower_init);
//hashsize的实现
#define hashsize(n) ((ub4)1<<(n))
//主hash表结构定义,在hash表扩容时,会有次hash表,所以有主次hash表区分,该结构是指针的指针,也即相当于数组指针
static item** primary_hashtable = 0;

void assoc_init(const int hashtable_init) {
    if (hashtable_init) {
        //如果设置了初始化参数,则按设置的参数进行初始化
        hashpower = hashtable_init;
    }

    //hashpower的默认值为16,如果未设置新值,则按默认值进行初始化
    primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));
    if (! primary_hashtable) {
        fprintf(stderr, "Failed to init hashtable.\n");
        exit(EXIT_FAILURE);
    }

    STATS_LOCK();//全局统计信息加锁,保证数据同步
    stats.hash_power_level = hashpower;
    stats.hash_bytes = hashsize(hashpower) * sizeof(void *);
    STATS_UNLOCK();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

2 统计信息的初始化

Memcached内部有很多全局的统计信息,用于实时获取各个资源的使用情况,后面将会看到,所有对统计信息的更新都需要加锁,而这些信息的更新是和Memcached的操作次数同数量级的,所以,在一定程度来说,这些统计信息对性能有影响。

stats结构是对统计信息的一个抽象,各个字段都比较好理解,不做解释。

struct stats {
    pthread_mutex_t mutex;
    unsigned int    curr_items;
    unsigned int    total_items;
    uint64_t        curr_bytes;
    unsigned int    curr_conns;
    unsigned int    total_conns;
    uint64_t        rejected_conns;
    unsigned int    reserved_fds;
    unsigned int    conn_structs;
    uint64_t        get_cmds;
    uint64_t        set_cmds;
    uint64_t        touch_cmds;
    uint64_t        get_hits;
    uint64_t        get_misses;
    uint64_t        touch_hits;
    uint64_t        touch_misses;
    uint64_t        evictions;
    uint64_t        reclaimed;
    time_t          started;          /* when the process was started */
    bool            accepting_conns;  /* whether we are currently accepting */
    uint64_t        listen_disabled_num;
    unsigned int    hash_power_level; /* Better hope it's not over 9000 */
    uint64_t        hash_bytes;       /* size used for hash tables */
    bool            hash_is_expanding; /* If the hash table is being expanded */
    uint64_t        expired_unfetched; /* items reclaimed but never touched */
    uint64_t        evicted_unfetched; /* items evicted but never touched */
    bool            slab_reassign_running; /* slab reassign in progress */
    uint64_t        slabs_moved;       /* times slabs were moved around */
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

统计信息的初始化也就是对stats变量的一个初始化。

//全局对象的定义
struct stats stats;
//全局变量的初始化,该全局变量在memcached启动之后,一直使用
static void stats_init(void)
{
    stats.curr_items = stats.total_items = stats.curr_conns =
                       stats.total_conns = stats.conn_structs = 0;
    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses =
                     stats.evictions = stats.reclaimed = 0;
    stats.touch_cmds = stats.touch_misses = stats.touch_hits =
                       stats.rejected_conns = 0;
    stats.curr_bytes = stats.listen_disabled_num = 0;
    stats.hash_power_level = stats.hash_bytes = stats.hash_is_expanding = 0;
    stats.expired_unfetched = stats.evicted_unfetched = 0;
    stats.slabs_moved = 0;
    stats.accepting_conns = true; /* assuming we start in this state. */
    stats.slab_reassign_running = false;

    /* make the time we started always be 2 seconds before we really
     did, so time(0) - time.started is never zero.  if so, things
     like 'settings.oldest_live' which act as booleans as well as
     values are now false in boolean context... */
    process_started = time(0) - 2;
    stats_prefix_init();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

3 工作线程的初始化

Memcached采用了典型的Master-Worker的线程模式,Master就是由main线程来充当,而Worker线程则是通过Pthread创建的。

//传入线程个数和libevent的main_base实例
thread_init(settings.num_threads, main_base);


//工作线程初始化
void thread_init(int nthreads, struct event_base *main_base) {
    int         i;
    int         power;
    //初始化各种锁和条件变量
    pthread_mutex_init(&cache_lock, NULL);
    pthread_mutex_init(&stats_lock, NULL);

    pthread_mutex_init(&init_lock, NULL);
    pthread_cond_init(&init_cond, NULL);

    pthread_mutex_init(&cqi_freelist_lock, NULL);
    cqi_freelist = NULL;

    //Memcached对hash桶的锁采用分段锁,按线程个数来分段,默认总共是1<<16个hash桶,而锁的数目是1<<power个 
    /* Want a wide lock table, but don't waste memory */
    if (nthreads < 3) {
        power = 10;
    } else if (nthreads < 4) {
        power = 11;
    } else if (nthreads < 5) {
        power = 12;
    } else {
        /* 8192 buckets, and central locks don't scale much past 5 threads */
        power = 13;
    }

    item_lock_count = hashsize(power);
    //申请1<<power个pthread_mutex_t锁,保存在item_locks数组。
    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
    if (! item_locks) {
        perror("Can't allocate item locks");
        exit(1);
    }

    //对这些锁进行初始化,这部分可参考APUE的线程部分
    for (i = 0; i < item_lock_count; i++) {
        pthread_mutex_init(&item_locks[i], NULL);
    }

    /*创建线程的局部变量,该局部变量的名称为item_lock_type_key,用于保存主hash表所持有的锁的类型
    主hash表在进行扩容时,该锁类型会变为全局的锁,否则(不在扩容过程中),则是局部锁*/
    pthread_key_create(&item_lock_type_key, NULL);
    pthread_mutex_init(&item_global_lock, NULL);

    //申请nthreds个工作线程,LIBEVENT_THREAD是Memcached内部对工作线程的一个封装
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
    if (! threads) {
        perror("Can't allocate thread descriptors");
        exit(1);
    }

    /*分发线程的初始化,分发线程的base为main_base
    线程id为main线程的线程id*/
    dispatcher_thread.base = main_base;
    dispatcher_thread.thread_id = pthread_self();
    //工作线程的初始化,工作线程和主线程(main线程)是通过pipe管道进行通信的
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {//初始化pipe管道
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];//读管道绑定到工作线程的接收消息的描述符
        threads[i].notify_send_fd = fds[1];//写管道绑定到工作线程的发送消息的描述符

        setup_thread(&threads[i]);//添加工作线程到libevent中
        /* Reserve three fds for the libevent base, and two for the pipe */
        stats.reserved_fds += 5;//统计信息更新
    }

    //创建工作线程
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }

    //等待所有工作线程创建完毕
    pthread_mutex_lock(&init_lock);
    wait_for_thread_registration(nthreads);
    pthread_mutex_unlock(&init_lock);
}

//Memcached内部工作线程的封装
typedef struct {
    pthread_t thread_id;       //线程ID
    struct event_base *base;   //libevent的不是线程安全的,每个工作线程持有一个libevent实例,用于pipe管道通信和socket通信
    struct event notify_event; //用于监听pipe管道的libevent事件
    int notify_receive_fd;      //接收pipe管道消息描述符
    int notify_send_fd;         //发送pipe管道消息描述符
    struct thread_stats stats;  //每个线程对应的统计信息
    struct conn_queue *new_conn_queue; //每个线程都有一个工作队列,主线程接受的连接,挂载到该消息队列中
    cache_t *suffix_cache;      //后缀cache
    uint8_t item_lock_type;     //线程操作hash表持有的锁类型,有局部锁和全局锁
} LIBEVENT_THREAD;

//分发线程的封装
typedef struct {
    pthread_t thread_id;        //线程id
    struct event_base *base;    //libevent实例
} LIBEVENT_DISPATCHER_THREAD;

//工作线程绑定到libevent实例
static void setup_thread(LIBEVENT_THREAD *me) {
    me->base = event_init();//创建libevent实例
    if (! me->base) {
        fprintf(stderr, "Can't allocate event base\n");
        exit(1);
    }

    //创建管道读的libevent事件,事件的回调函数处理具体的业务信息,关于回调函数的处理,后续分析
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);//设置libevent实例

    //添加事件到libevent中
    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }

    //创建消息队列,用于接受主线程连接
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue);//消息队列初始化

    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
        perror("Failed to initialize mutex");
        exit(EXIT_FAILURE);
    }

    //创建线程的后缀cache,没搞懂这个cache有什么作用。
    me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
                                    NULL, NULL);
    if (me->suffix_cache == NULL) {
        fprintf(stderr, "Failed to create suffix cache\n");
        exit(EXIT_FAILURE);
    }
} 

//创建工作线程
static void create_worker(void *(*func)(void *), void *arg) {
    pthread_t       thread;
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);//Posix线程部分,线程属性初始化
    //通过pthread_create创建线程,线程处理函数是通过外部传入的处理函数为worker_libevent
    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
        fprintf(stderr, "Can't create thread: %s\n",
                strerror(ret));
        exit(1);
    }
}

//线程处理函数
static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;
    //默认的hash表的锁为局部锁
    me->item_lock_type = ITEM_LOCK_GRANULAR;
    pthread_setspecific(item_lock_type_key, &me->item_lock_type);//设定线程的属性
    //用于控制工作线程初始化,通过条件变量来控制
    register_thread_initialized();
    //工作线程的libevent实例启动
    event_base_loop(me->base, 0);
    return NULL;
}

//阻塞工作线程
static void wait_for_thread_registration(int nthreads) {
    while (init_count < nthreads) {
        pthread_cond_wait(&init_cond, &init_lock);//在条件变量init_cond上面阻塞,阻塞个数为nthreads-init_count
    }
}

//唤醒工作线程
static void register_thread_initialized(void) {
    pthread_mutex_lock(&init_lock);
    init_count++;
    pthread_cond_signal(&init_cond);
    pthread_mutex_unlock(&init_lock);
}

//每个线程持有的统计信息
struct thread_stats {
    pthread_mutex_t   mutex;
    uint64_t          get_cmds;
    uint64_t          get_misses;
    uint64_t          touch_cmds;
    uint64_t          touch_misses;
    uint64_t          delete_misses;
    uint64_t          incr_misses;
    uint64_t          decr_misses;
    uint64_t          cas_misses;
    uint64_t          bytes_read;
    uint64_t          bytes_written;
    uint64_t          flush_cmds;
    uint64_t          conn_yields; /* # of yields for connections (-R option)*/
    uint64_t          auth_cmds;
    uint64_t          auth_errors;
    struct slab_stats slab_stats[MAX_NUMBER_OF_SLAB_CLASSES];
}; 

//每个slab的统计信息
struct slab_stats {
    uint64_t  set_cmds;
    uint64_t  get_hits;
    uint64_t  touch_hits;
    uint64_t  delete_hits;
    uint64_t  cas_hits;
    uint64_t  cas_badval;
    uint64_t  incr_hits;
    uint64_t  decr_hits;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221

4 连接的初始化

static conn **freeconns;//空闲连接列表
//连接初始化
static void conn_init(void)
{
    freetotal = 200;//空闲连接总数
    freecurr = 0;//当前空闲的索引
        //申请200个空间
        if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL)
    {
        fprintf(stderr, "Failed to allocate connection structures\n");
    }
    return;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
编辑 (opens new window)
上次更新: 2023/12/11, 22:32:09
内存初始化
get过程

← 内存初始化 get过程→

最近更新
01
第二章 关键字static及其不同用法
03-27
02
第一章 auto与类型推导
03-27
03
C++语言面试问题集锦 目录与说明
03-27
更多文章>
Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式