8.3 探究 redis-cli 端的网络通信模型
我们接着探究一下 redis 源码自带的客户端 redis-cli 的网络通信模块。
我们使用 gdb 把 redis-cli 跑起来以后,原来打算按 Ctrl + C 让程序中断下来查看一下 redis-cli 跑起来有几个线程,但是实验之后发现,这样并不能让程序中断下来,反而会导致 redis-cli 这个进程退出。
退出的原因是 redis-cli 启动后会在一个 while 循环里面等待用户输入,这个逻辑位于 reply.c 函数中:
//redis-cli.c 1909行
static void repl(void) {
//...省略部分代码...
//redis-cli.c 1939行
while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
//...省略部分代码...
}
exit(0);
}
2
3
4
5
6
7
8
9
10
如果输入的不符合预期,例如按下 Ctrl+C,while 条件不成立导致 while 循环结束就会退出主进程。
我们换个方法来调试 redis-cli,先直接把 redis-cli 跑起来,然后查看下 redis-cli 的进程 id,并用 gdb attach 命令附加到 redis-cli 进程上。
[root@myaliyun src]# ps -ef | grep redis-cli
root 27559 25850 0 14:35 pts/2 00:00:00 ./redis-cli
root 27813 26804 0 14:39 pts/0 00:00:00 grep --color=auto redis-cli
[root@myaliyun src]# gdb attach 27559
...省略部分输出...
Attaching to process 27559
Reading symbols from /root/redis-6.0.3/src/redis-cli...
...省略部分输出...
(gdb) info threads
Id Target Id Frame
* 1 Thread 0x7f3d3c6cf740 (LWP 27559) "redis-cli" 0x00007f3d3bba16e0 in __read_nocancel () from /usr/lib64/libpthread.so.0
(gdb)
2
3
4
5
6
7
8
9
10
11
12
使用 info threads 查看线程数目,通过上面的输出,我们发现 redis-cli 只有一个主线程。既然只有一个主线程,那么我们可以断定 redis-cli 中的发给 redis-server 的命令肯定都是同步的,这里同步的意思是发送命令后一直等待服务器应答或者应答超时。
在 **redis-cli **的 main 函数(位于文件 redis-cli.c 中)有这样一段代码:
//redis-cli.c 8091行
/* Start interactive mode when no command is provided */
if (argc == 0 && !config.eval) {
/* Ignore SIGPIPE in interactive mode to force a reconnect */
signal(SIGPIPE, SIG_IGN);
/* Note that in repl mode we don't abort on connection error.
* A new attempt will be performed for every command send. */
cliConnect(0);
repl();
}
2
3
4
5
6
7
8
9
10
11
其中 cliConnect(0) 调用代码如下:
//redis-cli.c 859行
static int cliConnect(int force) {
if (context == NULL || force) {
if (context != NULL) {
redisFree(context);
}
if (config.hostsocket == NULL) {
context = redisConnect(config.hostip,config.hostport);
} else {
context = redisConnectUnix(config.hostsocket);
}
if (context->err) {
fprintf(stderr,"Could not connect to Redis at ");
if (config.hostsocket == NULL)
fprintf(stderr,"%s:%d: %s\n",config.hostip,config.hostport,context->errstr);
else
fprintf(stderr,"%s: %s\n",config.hostsocket,context->errstr);
redisFree(context);
context = NULL;
return REDIS_ERR;
}
/* Set aggressive KEEP_ALIVE socket option in the Redis context socket
* in order to prevent timeouts caused by the execution of long
* commands. At the same time this improves the detection of real
* errors. */
anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
/* Do AUTH and select the right DB. */
if (cliAuth() != REDIS_OK)
return REDIS_ERR;
if (cliSelect() != REDIS_OK)
return REDIS_ERR;
}
return REDIS_OK;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
这个函数做的工作可以分为三步:
context = redisConnect(config.hostip,config.hostport);
cliAuth()
cliSelect()
我们先来看第一步 redisConnect 函数,这个函数实际又调用 redisConnectWithOptions 函数,后者又调用 redisContextConnectBindTcp,redisContextConnectBindTcp 内部调用 _redisContextConnectTcp 函数。为了清晰的表达这种调用关系,我们在 _redisContextConnectTcp 处加个断点,然后使用 run 命令重启下,程序触发断点后,输入 bt 命令查看一下此时的调用堆栈:
(gdb) b _redisContextConnectTcp
Breakpoint 2 at 0x42e7c2: file net.c, line 342.
(gdb) r
The program being debugged has been started already.
Start it from the beginning? (y or n) y
Starting program: /root/redis-6.0.3/src/redis-cli
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib64/libthread_db.so.1".
Breakpoint 2, _redisContextConnectTcp (c=0x44e050, addr=0x44e011 "127.0.0.1", port=6379, timeout=0x0, source_addr=0x0) at net.c:342
342 int blocking = (c->flags & REDIS_BLOCK);
(gdb) bt
#0 _redisContextConnectTcp (c=0x44e050, addr=0x44e011 "127.0.0.1", port=6379, timeout=0x0, source_addr=0x0) at net.c:342
#1 0x000000000042ef17 in redisContextConnectBindTcp (c=0x44e050, addr=0x44e011 "127.0.0.1", port=6379, timeout=0x0, source_addr=0x0) at net.c:513
#2 0x0000000000426202 in redisConnectWithOptions (options=0x7fffffffe1a0) at hiredis.c:767
#3 0x0000000000426317 in redisConnect (ip=0x44e011 "127.0.0.1", port=6379) at hiredis.c:792
#4 0x000000000040c585 in cliConnect (flags=0) at redis-cli.c:866
#5 0x00000000004213f8 in main (argc=0, argv=0x7fffffffe320) at redis-cli.c:8098
(gdb)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
_redisContextConnectTcp 函数是实际连接 redis-server 的地方,先调用 API getaddrinfo 解析传入进来的 ip 地址和端口号(笔者这里是 127.0.0.1 和 6379),然后创建 socket,并将socket 设置成非阻塞模式,接着调用 API connect 函数,由于 socket 是非阻塞模式,connect 函数会立即返回 -1。接着调用 redisContextWaitReady 函数,该函数中调用 API poll 检测连接的 socket 是否可写(POLLOUT),如果可写则表示连接 redis-server 成功。由于 _redisContextConnectTcp 代码较多,我们去掉一些无关的代码,整理出关键逻辑的伪码如下:
//net.c 335行
static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,
const struct timeval *timeout,
const char *source_addr) {
//...省略部分代码...
rv = getaddrinfo(c->tcp.host,_port,&hints,&servinfo)) != 0
s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1
redisSetBlocking(c,0) != REDIS_OK
connect(s,p->ai_addr,p->ai_addrlen)
redisContextWaitReady(c,timeout_msec) != REDIS_OK
return rv; // Need to return REDIS_OK if alright
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
redisContextWaitReady 函数的代码(位于 net.c 文件中)如下:
//net.c 241行
static int redisContextWaitReady(redisContext *c, long msec) {
struct pollfd wfd[1];
wfd[0].fd = c->fd;
wfd[0].events = POLLOUT;
if (errno == EINPROGRESS) {
int res;
if ((res = poll(wfd, 1, msec)) == -1) {
__redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
redisContextCloseFd(c);
return REDIS_ERR;
} else if (res == 0) {
errno = ETIMEDOUT;
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
redisContextCloseFd(c);
return REDIS_ERR;
}
if (redisCheckSocketError(c) != REDIS_OK)
return REDIS_ERR;
return REDIS_OK;
}
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
redisContextCloseFd(c);
return REDIS_ERR;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
使用 b redisContextWaitReady 增加一个断点,然后使用 run 命令重新运行下 redis-cli,程序会停在我们设置的断点出,然后使用 bt 命令得到当前调用堆栈:
(gdb) b redisContextWaitReady
Breakpoint 4 at 0x42e460: file net.c, line 244.
(gdb) r
The program being debugged has been started already.
Start it from the beginning? (y or n) y
Starting program: /root/redis-6.0.3/src/redis-cli
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib64/libthread_db.so.1".
Breakpoint 4, redisContextWaitReady (c=0x44e050, msec=-1) at net.c:244
244 wfd[0].fd = c->fd;
(gdb) bt
#0 redisContextWaitReady (c=0x44e050, msec=-1) at net.c:244
#1 0x000000000042edad in _redisContextConnectTcp (c=0x44e050, addr=0x44e011 "127.0.0.1", port=6379, timeout=0x0, source_addr=0x0) at net.c:475
#2 0x000000000042ef17 in redisContextConnectBindTcp (c=0x44e050, addr=0x44e011 "127.0.0.1", port=6379, timeout=0x0, source_addr=0x0) at net.c:513
#3 0x0000000000426202 in redisConnectWithOptions (options=0x7fffffffe1a0) at hiredis.c:767
#4 0x0000000000426317 in redisConnect (ip=0x44e011 "127.0.0.1", port=6379) at hiredis.c:792
#5 0x000000000040c585 in cliConnect (flags=0) at redis-cli.c:866
#6 0x00000000004213f8 in main (argc=0, argv=0x7fffffffe320) at redis-cli.c:8098
(gdb)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
连接 redis-server 成功以后,会接着调用上文中提到的 cliAuth 和 cliSelect 函数,这两个函数分别根据是否配置了 config.auth 和 config.dbnum 来给 redis-server 发送相关命令。由于我们这里没配置,所以这两个函数实际什么也不做。
583 static int cliSelect(void) {
(gdb) n
585 if (config.dbnum == 0) return REDIS_OK;
(gdb) p config.dbnum
$11 = 0
2
3
4
5
接着调用 repl 函数,在这个函数中是一个 while 循环,不断从命令行中获取用户输入:
//redis-cli.c 1909行
static void repl(void) {
//...省略无关代码...
while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
if (line[0] != '\0') {
argv = cliSplitArgs(line,&argc);
if (history) linenoiseHistoryAdd(line);
if (historyfile) linenoiseHistorySave(historyfile);
if (argv == NULL) {
printf("Invalid argument(s)\n");
linenoiseFree(line);
continue;
} else if (argc > 0) {
if (strcasecmp(argv[0],"quit") == 0 ||
strcasecmp(argv[0],"exit") == 0)
{
exit(0);
} else if (argv[0][0] == ':') {
cliSetPreferences(argv,argc,1);
continue;
} else if (strcasecmp(argv[0],"restart") == 0) {
if (config.eval) {
config.eval_ldb = 1;
config.output = OUTPUT_RAW;
return; /* Return to evalMode to restart the session. */
} else {
printf("Use 'restart' only in Lua debugging mode.");
}
} else if (argc == 3 && !strcasecmp(argv[0],"connect")) {
sdsfree(config.hostip);
config.hostip = sdsnew(argv[1]);
config.hostport = atoi(argv[2]);
cliRefreshPrompt();
cliConnect(1);
} else if (argc == 1 && !strcasecmp(argv[0],"clear")) {
linenoiseClearScreen();
} else {
long long start_time = mstime(), elapsed;
int repeat, skipargs = 0;
char *endptr;
repeat = strtol(argv[0], &endptr, 10);
if (argc > 1 && *endptr == '\0' && repeat) {
skipargs = 1;
} else {
repeat = 1;
}
issueCommandRepeat(argc-skipargs, argv+skipargs, repeat);
/* If our debugging session ended, show the EVAL final
* reply. */
if (config.eval_ldb_end) {
config.eval_ldb_end = 0;
cliReadReply(0);
printf("\n(Lua debugging session ended%s)\n\n",
config.eval_ldb_sync ? "" :
" -- dataset changes rolled back");
}
elapsed = mstime()-start_time;
if (elapsed >= 500 &&
config.output == OUTPUT_STANDARD)
{
printf("(%.2fs)\n",(double)elapsed/1000);
}
}
}
/* Free the argument vector */
sdsfreesplitres(argv,argc);
}
/* linenoise() returns malloc-ed lines like readline() */
linenoiseFree(line);
}
exit(0);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
得到用户输入的一行命令后,先保存到历史记录中(以便下一次按键盘上的上下箭头键再次输入),然后校验命令的合法性,如果是本地命令(不需要发送给服务器的命令,如 quit、exit)则直接执行,如果是远端命令,则调用 issueCommandRepeat 函数发送给服务器端:
//redis-cli.c 1820行
static int issueCommandRepeat(int argc, char **argv, long repeat) {
while (1) {
config.cluster_reissue_command = 0;
if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
cliConnect(1);
/* If we still cannot send the command print error.
* We'll try to reconnect the next time. */
if (cliSendCommand(argc,argv,repeat) != REDIS_OK) {
cliPrintContextError();
return REDIS_ERR;
}
}
/* Issue the command again if we got redirected in cluster mode */
if (config.cluster_mode && config.cluster_reissue_command) {
cliConnect(1);
} else {
break;
}
}
return REDIS_OK;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
实际发送命令的函数是 cliSendCommand,在 cliSendCommand 函数中又调用 cliReadReply 函数,后者又调用 redisGetReply 函数,在 redisGetReply 函数中又调用 redisBufferWrite 函数,在 redisBufferWrite 函数中最终调用系统 API write 将我们输入的命令发出去。
redisBufferWrite 函数定义如下:
//hiredis.c 903行
int redisBufferWrite(redisContext *c, int *done) {
int nwritten;
/* Return early when the context has seen an error. */
if (c->err)
return REDIS_ERR;
if (sdslen(c->obuf) > 0) {
nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
if (nwritten == -1) {
if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
/* Try again later */
} else {
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
} else if (nwritten > 0) {
if (nwritten == (signed)sdslen(c->obuf)) {
sdsfree(c->obuf);
c->obuf = sdsempty();
} else {
sdsrange(c->obuf,nwritten,-1);
}
}
}
if (done != NULL) *done = (sdslen(c->obuf) == 0);
return REDIS_OK;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
使用 b redisBufferWrite 增加一个断点,然后使用 run 命令将 redis-cli 重新运行起来,接着在 redis-cli 中输入 set hello world (hello 是 key, world 是 value)这一个简单的指令后,使用 bt 命令查看调用堆栈如下:
127.0.0.1:6379> set hello world
Breakpoint 5, redisBufferWrite (c=0x44e050, done=0x7fffffffe09c) at hiredis.c:906
906 if (c->err)
(gdb) bt
#0 redisBufferWrite (c=0x44e050, done=0x7fffffffe09c) at hiredis.c:906
#1 0x0000000000426942 in redisGetReply (c=0x44e050, reply=0x7fffffffe0c8) at hiredis.c:948
#2 0x000000000040d565 in cliReadReply (output_raw_strings=0) at redis-cli.c:1192
#3 0x000000000040ddbd in cliSendCommand (argc=3, argv=0x4a4800, repeat=0) at redis-cli.c:1361
#4 0x000000000040f94a in issueCommandRepeat (argc=3, argv=0x4a4800, repeat=1) at redis-cli.c:1823
#5 0x000000000041010a in repl () at redis-cli.c:2018
#6 0x00000000004213fd in main (argc=0, argv=0x7fffffffe320) at redis-cli.c:8099
(gdb)
2
3
4
5
6
7
8
9
10
11
12
13
当然,待发送的数据需要存储在一个全局静态变量 context 中,这是一个结构体,定义在 hiredis.h 文件中。
//hiredis.c 206行
/* Context for a connection to Redis */
typedef struct redisContext {
int err; /* Error flags, 0 when there is no error */
char errstr[128]; /* String representation of error when applicable */
int fd;
int flags;
char *obuf; /* Write buffer */
redisReader *reader; /* Protocol reader */
enum redisConnectionType connection_type;
struct timeval *timeout;
struct {
char *host;
char *source_addr;
int port;
} tcp;
struct {
char *path;
} unix_sock;
} redisContext;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
其中字段 obuf 指向的是一个 sds 类型的对象,这个对象用来存储当前需要发送的命令,这也同时解决了命令一次发不完需要暂时缓存下来的问题。
在 redisGetReply 函数中发完数据后立马调用 redisBufferRead 去收取服务器的应答。
//redis-cli.c 1186行
int redisGetReply(redisContext *c, void **reply) {
int wdone = 0;
void *aux = NULL;
/* Try to read pending replies */
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
/* For the blocking context, flush output buffer and read reply */
if (aux == NULL && c->flags & REDIS_BLOCK) {
/* Write until done */
do {
if (redisBufferWrite(c,&wdone) == REDIS_ERR)
return REDIS_ERR;
} while (!wdone);
/* Read until there is a reply */
do {
if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR;
if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
return REDIS_ERR;
} while (aux == NULL);
}
/* Set reply object */
if (reply != NULL) *reply = aux;
return REDIS_OK;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
拿到应答后就可以解析并显示在终端了。
总结起来,redis-cli 是一个实实在在的网络同步通信方式,只不过通信的 socket 仍然设置成非阻塞模式,这样有如下三个好处:
- 使用 connect 连接服务器时,connect 函数不会阻塞,可以立即返回,之后调用 poll 检测 socket 是否可写来判断是否连接成功。
- 在发数据时,如果因为对端 TCP 窗口太小发不出去,write函数也会立即返回,不会阻塞,此时可以将未发送的数据暂存,下次继续发送。
- 在收数据时,如果当前没有数据可读,则 read 函数也不会阻塞,程序也可以立即返回,继续响应用户的输入。
redis-cli 的代码不多,但是包含了很多常用的网络编程经典写法,如果读者想提高自己的网络编程能力,redis-cli 的代码是一份不错的学习素材。