6 消息传递
# 6 消息传递
在上一章中,我们了解到消息传递是可扩展系统中通信的关键实现方式。它提供了可靠的异步通信,支持一对一通信(队列)以及一对多(发布/订阅)模型。
在本章中,我们将详细探讨消息传递,从理论概念入手,深入研究两个广泛使用的系统。我们将涵盖以下主题:
- 性能特征
- 基于代理的消息传递
- Apache Kafka深度剖析
- 无代理消息传递
- NSQ深度剖析
- 集成模式
# 性能特征
一个消息传递系统的性能可以从四个方面来评判:可扩展性、可用性、延迟和吞吐量。这些因素往往相互制约,架构师常常需要权衡,决定牺牲某一方面来提升其他方面:
- 可扩展性(Scalability):指系统在处理负载增加时,不会导致延迟或可用性这两个因素出现明显下降的能力。这里的负载可以指主题数量、消费者数量、生产者数量、每秒消息数量或平均消息大小等。
- 可用性(Availability):在分布式系统中,各个单元(服务器、磁盘、网络等)都可能出现各种问题。系统的可用性衡量的是系统对这些故障的抵御能力,以确保最终用户能够正常使用。
- 延迟(Latency):指消息从生产者到达消费者所需的时间。
- 吞吐量(Throughput):指消息传递系统每秒能够处理的消息数量。
延迟和吞吐量之间存在典型的权衡关系。为了优化吞吐量,我们可以对消息进行批量处理,但这会对延迟产生严重的负面影响。
# 基于代理的消息传递
代理(Broker)是消息传递系统中的中介组件。在这种系统中,客户端连接到代理,而不是直接相互连接。每当客户端想要发送和接收消息时,它们需要在代理上指定邮箱/主题/队列。生产者连接到代理并将消息发送到特定队列。消费者连接到代理,并指定它们想要从中读取消息的队列名称。代理主要有以下职责:
- 可靠地维护队列、生产者和消费者之间的映射关系:这包括以持久化格式存储消息。
- 处理消息生产:即存储生产者写入的消息。
- 处理消息消费:意味着确保消费者能够可靠地获取消息,并提供避免重复消息的机制。
- 路由和转换:消息代理可能会对每条消息进行转换或维护多个副本,以支持不同的拓扑模型,后续章节将对此进行描述。
# 队列模型
在标准队列模型中,生产者和消费者之间有一个先进先出的持久化缓冲区,如下图所示:
所有生产者和消费者在初始化时都连接到代理。消费者还需要通过注册订阅特定队列来表明对这些队列的兴趣。每当生产者向队列发送消息时,该消息会被传输到代理,并持久化存储在对应的队列中。这些消息会被传递给一个或多个已注册订阅该队列的消费者。如果在队列模型中有多个消费者注册,每条消息只会被传递给其中一个消费者,从而实现消息消费和处理的负载均衡机制。
消费者获取消息有两种方式:
- 拉取模式(Pull mode):消费者端的消息传递客户端定期轮询代理,检查是否有新消息。
- 推送模式(Push mode):消费者注册一个端点(例如,一个HTTPS URL),生产的消息会使用HTTPS POST等协议发送到该URL。
拉取模式有点浪费资源,因为每次拉取时可能并没有消息。另一方面,推送模式要求消费者有可供代理访问的端点,这在通过网络防火墙进行配置时比较困难。
# 发布/订阅模型
队列模型和发布/订阅模型的主要区别在于,在发布/订阅模型中,每个消费者都会收到消息的副本,而不是只有一个消费者能收到。通过下图可以更好地理解这一点:
如你所见,消费者1和消费者2都收到了消息m1和m2的副本。
当你希望多个消费者处理相同的消息时,这种模型非常有用,这在我们之前介绍的事件驱动架构(EDA)中通常是常见的需求。
你可能会想,这是否意味着在发布/订阅设置中无法实现负载均衡?它不会影响可扩展性吗?
通常,消息传递系统还会通过一种称为虚拟主题(这是ActiveMQ使用的特定术语,但大多数队列系统都具备类似功能)的机制,为主题提供负载均衡功能。该方案如下图所示:
可以看到,主题仍然进行发布/订阅式的消息路由,但每个消费者实际上都有一个订阅队列。消费者的多个实例可以获取不同的消息,从而实现消费者端的横向扩展和负载均衡。需要注意的是,订阅队列是消费者注册订阅主题时自动创建的,无需手动创建。
# 投递语义
在编写从队列中获取消息并执行某些操作的代码时,理解消息投递保证的语义非常重要。
# 确认机制
考虑这样一种情况:消费者获取了一条消息(m1),但在处理它之前,程序崩溃(或重启)了。因此,m1并没有被完全消费,但从代理的角度来看,它已经被投递了。通过下图可以更好地理解这种情况:
为了处理这种情况,代理通常提供允许消费者确认消息的功能。一旦消息被投递,代理会暂时将其从队列中移除,并存储到其他临时存储区。消费者应该为其消费的每条消息发送确认信息。当代理收到一条消息的确认信息时,会将该消息从临时存储区移除。此外,当向消费者发送消息时,代理会启动一个确认截止时间的定时器。如果这个时间到期,代理会认为消费者在处理消息时出现故障,并将消息从临时存储区移回队列。
这能实现可靠的消息消费吗?确认模型支持三种可靠的消息投递语义,我们现在来了解一下。
# 至少一次投递
在至少一次投递(At-least-once delivery)保证下,代理确保它会将每条所需的消息至少投递一次给消费者。大多数情况下,消息只会被接收一次,但有时可能会出现重复消息。下图描述了这种情况:
为了避免重复处理,消费者需要在自己这一端对消息进行去重。通常的做法是将消息ID存储在数据库中。当即将处理一条消息时,会查询这个数据库,以确保该消息之前未被处理过。另一方面,如果消息处理是幂等的,我们可以直接忽略重复处理。
# 至多一次投递
有时,避免发送重复消息至关重要,比如发送电子邮件时,你肯定不想给客户发送垃圾邮件。为了实现这一点,可以采用至多一次投递(At-most-once delivery)的投递语义。不使用确认机制是实现至多一次投递的一种方式。然而,由于大多数消息传递系统都有确认机制,另一种实现方式是在开始处理消息时就确认消息,如下图所示:
# 恰好一次交付
恰好一次(Exactly-once)语义是最理想的交付保证,但如果没有消息代理(broker)和消费者之间的某种协作,就无法实现。如果消息代理使用确认机制向生产者确认消息发布,那么每个消息都必须有唯一的ID,以便消息代理对重复消息进行去重。
在消费者端,可以使用去重机制来确保不会处理重复消息。
一些消息系统,如Kafka,支持 “从一个队列消费并发布到多个队列” 的原子语义。这对事件驱动架构(EDA)非常有帮助,我们将在深入探讨Apache Kafka的部分详细介绍。
# 弹性
本节描述的消息传递模型可能会出现很多问题:
- 消息代理可能会发生故障,导致存储在其上的所有消息丢失。为了应对这种情况,通常会将消息代理部署为冗余实例集群。作为写入提交的一部分,每条消息都会在一组机器上进行复制。如果一条消息被复制到n个消息代理上,这意味着系统可以容忍n - 1个实例发生故障。
- 生产者与消息代理之间的通信可能会失败,从而导致消息丢失。这通常通过确认机制来解决(如前文所述)。可以通过为消息设置序列号来避免产生重复消息。
- 消费者与消息代理之间的消息通信可能会失败,这也会导致消息丢失。因此,除非消费者明确确认已处理消息,否则消息不应从消息代理中删除。正如我们在 “确认” 部分看到的,进行确认时,可能会导致至少一次或至多一次的交付语义。
通过在消费者端进行去重以及消息代理和消费者之间的协作,可以实现恰好一次交付。
许多应用程序在从n个队列读取消息并写入m个队列时,还需要一些原子性保证。像Kafka这样的消息系统提供了事务结构来实现这一点。我们将在深入探讨Apache Kafka的部分详细了解这一内容。
# 高级消息队列协议(AMQP)
AMQP代表异步消息队列协议(Asynchronous Message Queuing Protocol),是消息系统的一个开放标准。它最初是为金融系统(交易和银行业务)设计的,因此非常注重可靠性、可扩展性和可管理性。
AMQP规范定义了以下一些结构:
- 它有一个网络层协议,用于描述消息代理、生产者和消费者之间交换的数据包(在AMQP术语中称为帧,frames)。该规范定义了九种类型的帧,涉及连接建立/拆除和流量控制等方面。
- 它有一个自描述编码方案,用于描述一系列数据类型。还包括注释,用于赋予实体额外的含义(例如,一个字符串可能会被注释为表示它是一个URL)。
在AMQP中,消息被发布到交换器(exchanges)。然后,消息会根据称为绑定(bindings)的规则被路由到不同的队列。消费者从队列中获取消息:
网络被认为是不可靠的,消费者可能无法处理消息。因此,AMQP模型引入了消息确认(message acknowledgements)的概念:当一条消息被交付给消费者后,消费者在处理完该消息后会向消息代理发送确认信息。这将触发消息代理从队列中删除该消息。确认通常会伴随着超时机制;如果一条消息在超时时间内未得到确认,它将被重新发送给消费者。
路由算法取决于交换器的类型。有多种方法可以实现队列(一对一)或发布/订阅(一对多)的行为。如果由于某种原因消息无法被路由,它将被放入一个名为死信队列(dead-letter queue)的特殊队列中。RabbitMQ是一个使用AMQP结构的开源项目。
RabbitMQ管理插件提供了一个HTTP API、基于浏览器的用户界面和命令行界面(CLI),用于管理和监控。它支持定义具有企业级安全性的复杂路由拓扑。AMQP的缺点是其吞吐量不如Kafka,并且为了维护路由元数据的辅助数据结构,会增加复杂性和额外的资源消耗。
在吞吐量方面,RabbitMQ每秒大约能处理20,000条消息。
# 深入剖析Apache Kafka
Apache Kafka是一个流消息传递平台,最初由领英(LinkedIn)开发,现在是Apache的顶级项目。它能够在消息代理集群上实现消息的无缝持久化分发,并且分发能力可以随着负载进行扩展。由于其更高的吞吐量、更简单的架构、负载均衡语义以及集成选项,它越来越多地被用于取代传统的消息代理,如AMQP。
# 概念
在Kafka中,主题(topic)是消息发布和消费的队列的正式名称。Kafka中的主题提供了前面描述的虚拟主题队列模型,即存在多个逻辑订阅者时,每个订阅者都会收到消息的副本,但一个逻辑订阅者可以有多个实例,并且每个订阅者实例会收到不同的消息。
主题被建模为一个分区日志,如下所示:
来源:http://kafka.apache.org/documentation.html#introduction
新消息会被追加到日志的一个分区中。日志分区是一个有序、不可变的消息列表。主题分区中的每条消息都通过列表中的偏移量(offset)来标识。分区有几个作用:
- 一个日志(主题)的规模可以超过单个机器(节点)的容量。单个分区需要存储在一台机器上,但整个主题可以分布在多台机器上。
- 主题分区为消费者提供了并行性和可扩展性。
Kafka只保证同一主题分区内消息的顺序,而不保证同一主题不同分区之间消息的顺序。在设计应用程序时,这是一个需要牢记的关键点。
每当生成一条新消息时,它会被持久化存储在为该主题分区指定的一组消息代理实例上,这些实例称为同步副本(In-Sync Replicas,ISRs)。每个ISR中有一个节点充当领导者(leader),零个或多个节点充当跟随者(followers)。领导者处理所有的读写请求,并将状态复制到跟随者上。领导者和跟随者之间会定期进行心跳检测,如果一个领导者被判定为故障,就会举行选举以选出新的领导者。需要注意的是,一个节点可以是一个主题分区的领导者,同时是其他主题分区的跟随者。这使得负载可以在集群的节点之间均匀分布。
消息会在消息代理上保留一段可配置的时间,这与它们是否被消费无关。例如,如果一个主题的保留策略设置为一周,那么在消息发布后的一周内,它都可供消费。之后,消息将被删除(存储空间被回收)。
与大多数其他消息系统不同,Kafka对消费者的消费情况只保留极少的信息。客户端可以记住它们在每个主题分区中的偏移量,并尝试随机访问日志中的消息。
Kafka还提供了一种机制,让消息代理根据消费者的明确指示记住其偏移量。这种设计降低了消息代理的复杂性,并能够高效地支持不同消费速度的多个消费者:
(来源:http://kafka.apache.org/documentation.html#introduction)
消费者A可以按照自己的速度消费消息,而不受消费者B消费速度的影响。
生产者在特定主题上发布消息。他们可以选择自己的分区器,为每条消息选择一个分区,也可以选择默认的(随机/轮询)分区器。通常,大多数Kafka客户端会在生产者端对消息进行批量处理,所以写入操作只是将消息存储在缓冲区中。这些消息会定期刷新到消息代理上。
如前文 “发布/订阅模型” 部分所述,Kafka主题提供了发布/订阅队列模型。因此,可以有多个逻辑消费者,每个消费者都会收到所有消息。然而,一个逻辑消费者(例如一个微服务)会有多个实例,理想情况下,我们希望在这些消费者实例之间负载均衡消息的消费和处理。Kafka通过一种称为消费者组(consumer groups)的结构实现了这一点。
每次消费者注册从一个主题获取消息时,它会发送一个标签(字符串)来描述这个逻辑消费者(例如服务名称)。Kafka消息代理将具有相同组名的每个实例视为属于同一个逻辑消费者,并且每个实例只会收到消息的一个子集。因此,消息将在消费者实例之间有效地实现负载均衡。
如前所述,主题分区是消息消费的并行单元。让我们看看这是如何实现的。当一个消费者实例注册从一个主题获取消息时,它有两个选择:
- 手动注册消费该主题的特定分区。
- 让Kafka自动在消费者实例之间分配主题分区。
第一个选项很简单,应用程序可以很好地控制谁处理哪些消息。然而,第二个选项,即组协调器(group coordinator)功能,是一个非常强大的工具,可以在分布式系统中实现可扩展性和弹性。在这种情况下,消费者无需指定具体要消费的分区,而是由消息代理自动将分区分配给消费者实例:
(来源:http://kafka.apache.org/documentation.html#introduction)
在上图中,主题有四个分区,分布在两台服务器上。消费者组A有两个实例,每个实例被分配了两个主题分区。另一方面,消费者组B有四个实例,每个实例被分配了一个主题分区。消息代理(组协调器功能)通过与消费者组实例之间的心跳机制来维护消费者组成员关系。当有新的消费者组实例加入时,主题分区会自动重新分配。如果一个实例发生故障,其分区将被分配给剩余的实例。
# 发布消息
Sarama(https://github.com/Shopify/sarama)是最广泛使用的纯Go语言编写的Apache Kafka客户端库。我们将使用它来演示与Kafka的各种交互。
为Kafka生成一条消息涉及多个步骤,包括确定消息应该发送到主题的哪个分区、找到该主题分区的领导者,以及将消息传输到该消息代理(领导者)实例。在Sarama中,生产者基于管道并发模式,每条消息由四个结构体处理。每个结构体都有一个goroutine,实现消息发布过程中的一个阶段,这些阶段通过通道连接。
生产者有两种选择,如下文所述。
# AsyncProducer接口
顾名思义,这是一个非阻塞API,它将消息路由到消息代理,并通过两个输出通道异步提供有关消息处理结果的信号。消息通过一个输入通道接收。AsyncProducer接口的代码如下:
type AsyncProducer interface {
// AsyncClose触发生产者的关闭操作。当Errors和Successes通道都关闭时,关闭操作完成。调用AsyncClose时,
// 你必须继续从这些通道读取数据,以处理任何正在传输中的消息的结果。
AsyncClose()
// Close关闭生产者,并等待所有缓冲的消息被刷新。在生产者对象超出作用域之前,你必须调用此函数,否则可能会导致内存泄漏。
// 在关闭底层客户端之前,你必须先调用此函数。
Close() error
// Input是用户写入希望发送的消息的输入通道。
Input() chan<- *ProducerMessage
// Successes是当Return.Successes启用时,返回给用户的成功输出通道。如果Return.Successes为true,你必须从此通道读取数据,
// 否则生产者将发生死锁。建议在单个select语句中同时发送和读取消息。
Successes() <-chan *ProducerMessage
// Errors是返回给用户的错误输出通道。你必须从此通道读取数据,否则当通道满时,生产者将发生死锁。或者,你可以在配置中
// 将Producer.Return.Errors设置为false,以防止返回错误。
Errors() <-chan *ProducerError
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Input()方法返回一个只写通道,该通道接受指向ProducerMessage结构体的指针。这是主要的接口,客户端通过这个通道写入消息(封装在ProducerMessage结构体中)。客户端在发布消息后,必须从Errors()通道读取数据。
为了回收生产者资源,我们可以在生产者上调用Close()或AsyncClose()以避免内存泄漏。
下面的代码展示了使用select语句对各种通道进行操作的AsyncProducer示例用法:
package main
import (
"fmt"
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"strconv"
"time"
)
// 我们要发送的示例消息
type Message struct {
Who string
TimeAsString string
}
func main() {
// 创建配置
config := sarama.NewConfig()
// 以下设置表示所需的可靠性级别
// 这里表示我们希望ISR中的所有消息代理都进行确认
config.Producer.RequiredAcks = sarama.WaitForAll
// 发送消息的最大重试次数(默认值为3)
config.Producer.Retry.Max = 5
// 你不需要列出所有的消息代理,只需提供几个种子节点,它们会告知客户端集群中的其他消息代理
brokers := []string{"localhost:9092"}
asyncProducer, err := sarama.NewAsyncProducer(brokers, config)
if err != nil {
// 无法连接
panic(err)
}
defer func() {
if err := asyncProducer.Close(); err != nil {
log.Fatalln(err)
}
}()
// 捕获SIGINT信号以跳出循环并进行清理
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
exitProgram := make(chan struct{})
// 简单的无限循环,用于发送当前时间
var nPublished, nErrors int
go func() {
for {
time.Sleep(5 * time.Second)
// 构造消息
body := Message{
Who: "aProcess",
TimeAsString: strconv.Itoa(int(time.Now().Unix())),
}
// 序列化消息
payload, _ := json.Marshal(body)
msg := &sarama.ProducerMessage{
Topic: "currentTime",
Key: sarama.StringEncoder("aKey"),
Value: sarama.ByteEncoder(payload),
}
select {
case producer.Input() <- msg:
nPublished++
fmt.Println("Produce message")
case err := <-producer.Errors():
nErrors++
fmt.Println("Failed to produce message:", err)
case <-signals:
exitProgram <- struct{}{}
}
log.Printf("Published: %d; Errors: %d\n", nPublished, nErrors)
}
}()
<-exitProgram // 等待程序被终止
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
你也可以使用单独的goroutine遍历Errors和Success通道,而不是使用select语句。
# 同步生产者(The Sync producer)
Samara还提供了一个阻塞式生产者接口,用于等待消息可靠地传递。它的用法与异步生产者(AsyncProducer)类似,但不是使用通道,而是调用阻塞式的SendMessage()
方法。
以下代码片段展示了其用法:
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
// 配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Errors = true // 对于同步生产者,此选项必须为true
config.Producer.Return.Success = true // 对于同步生产者,此选项必须为true
// 连接到本地运行的Kafka代理
brokers := []string{"localhost:9092"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
panic(err)
}
// 清理操作
defer func() {
if err := producer.Close(); err != nil {
panic(err)
}
}()
msg := &sarama.ProducerMessage{
Topic: "currentTime",
Value: sarama.StringEncoder(strconv.Itoa(int(time.Now().Unix()))),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
fmt.Printf("FAILED to publish message: %s\n", err)
} else {
fmt.Printf("message sent | partition(%d)/offset(%d)\n", partition, offset)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 消息消费(Consuming messages)
Sarama提供了一个底层的消息消费API,但也有更高级的库,这些库基于“深入探究Apache Kafka”部分描述的负载均衡机制提供组协调功能。这个库名为Sarama(https://github.com/bsm/sarama-cluster)。
该库要求Kafka版本为v0.9及以上,并遵循https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design中描述的协议。
主要接口是一个用于接收消息的通道。以下是一个示例程序:
package main
import (
"fmt"
cluster "github.com/bsm/sarama-cluster"
"log"
"os"
"os/signal"
)
func main() {
// 配置设置,启用错误和通知功能
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
// 指定代理坐标和感兴趣的主题
brokers := []string{"localhost:9092"}
topics := []string{"topic_a", "topic_b"}
// 连接并注册,指定消费者组名称
consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// 处理错误
go func() {
for err := range consumer.Errors() {
log.Printf("Error: %s\n", err.Error())
}
}()
// 处理通知
go func() {
for ntf := range consumer.Notifications() {
log.Printf("Rebalanced: %+v\n", ntf)
}
}()
// 处理消息
for msg := range consumer.Messages() {
fmt.Fprintf(os.Stdout, "%s-%d-%d-%s-%s\n",
msg.Topic,
msg.Partition,
msg.Offset,
msg.Key,
msg.Value) // <- 在此处实际处理消息
consumer.MarkOffset(msg, "") // 提交此消息的偏移量
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
for循环之前的大部分代码是设置代码。主要的消息消费操作发生在对consumer.Messages()
返回的通道进行迭代的过程中。在这个循环中,每当有新消息可用时,它会被封装在msg
对象中传递。处理完消息后,客户端可以使用consumer.MarkOffset()
调用将偏移量提交给Kafka,从而表明消费者(组)已经处理了该消息,并且不想再接收它。通常,会使用一个处理函数来处理消息,只有当处理函数执行成功后,才提交该消息。对于长时间运行的处理任务,我们可以将msg
对象本身传递给处理函数,然后在后续的下游操作中确认消息。consumer.MarkOffset()
的第二个参数表示提交时的消费者状态。这可以用作检查点,以便从特定点恢复消息消费。
Sarama有一些可调整的参数,以确保可以应用背压(back-pressure)机制,使消费者能够以自己适应的速率处理数据。其中一些参数如下:
Config.Consumer.MaxWaitTime
:定义了消费者在向代理请求更多消息之前应等待的时间。Config.ChannelBufferSize
:定义了缓冲通道(输入和输出)的大小。Config.Consumer.MaxProcessingTime
:定义了消费者处理消息所需的时间。
# 流处理(Stream processing)
Goka(https://github.com/lovoo/goka)是一个强大的Go语言流处理库,它使用Apache Kafka。它可用于构建诸如第5章“走向分布式”中提到的命令查询职责分离(CQRS)和事件溯源(event sourcing)架构等流处理模式。
在Goka中,流处理应用程序有三个主要构建块:发射器(emitters)、处理器(processors)和视图(views)。 发射器向Kafka生成键值对消息。例如,它可以是一个监听数据库变更日志的监听器,并将变更日志作为事件发送出去。处理器是一组回调函数,用于消费消息并对消息进行状态转换。处理器被分组集群化管理。与Kafka的消费者组重新平衡功能类似,Goka会在处理器组中的各个处理器之间分配主题分区。每个处理器组在一种称为组表(group table)的结构中维护状态。这是一个存储在Kafka中的分区键值表,并带有本地缓存。视图提供对组表的只读访问。
下图展示了这种抽象:
(来源:https://github.com/lovoo/goka)
以下是一个处理器示例,用于统计接收到的消息数量:
func process(ctx goka.Context, msg interface{}) {
var nMessages int
if val := ctx.Value(); val != nil {
nMessages = val.(int)
}
nMessages++
ctx.SetValue(nMessages)
}
2
3
4
5
6
7
8
传递的goka.Context
参数是最强大的结构,它支持保存组表并向其他处理器发送消息。更多详细信息可在https://github.com/lovoo/goka上查看,你还可以在https://github.com/lovoo/goka/tree/master/examples中查看更多示例。
# 无代理消息传递(Brokerless messaging)
基于代理的模型有几个优点:
- 连接的服务之间有清晰的隔离。生产者只需要知道代理的地址。
- 生产者和消费者的生命周期不必重叠。生产者可以向代理发送消息,然后下线,之后消费者上线并读取该消息。
然而,它也存在一些缺点:
- 代理可能成为瓶颈,所有消息都需要通过它进行传递,这可能会影响性能。
- 存在不必要的网络I/O。
例如,在一个典型的带有代理和四个处理器的事件驱动架构(EDA)中,我们会得到如下的通信模式:
有了中央代理,通信效率很难再提高。但是,如果允许处理器之间直接通信,我们可以实现更高效的通信,并降低端到端延迟:
如果我们让代理仅充当目录服务,而不是消息转发器,就可以在保留代理架构的一些优点(如隔离级别)的同时,获得对等架构的效率。
NSQ是一个消息传递平台(碰巧是用Go语言编写的),下一节将对其进行介绍。
# 深入探究NSQ
NSQ是一个实时分布式消息传递平台,它倡导“无代理消息传递”部分描述的去中心化拓扑结构。
# 概念
一个NSQ系统由以下组件构成:
- 虚拟主题结构(The virtual topic construct):主题是NSQ中生成的消息的目的地。每个主题有一个或多个通道,这些通道是每个消费者的队列,从而实现发布/订阅(Pub/Sub)行为。主题和通道在首次使用时会隐式创建。
nsqd
:这是主要组件,它负责接收、排队并将消息传递给客户端。它可以作为守护进程运行,也可以嵌入到Go应用程序中。它可以独立运行,但通常会与nsqlookupd
配置在一个集群中。这些实例为所有连接的生成消息的客户端托管多个主题,并为所有主题提供通道。主题、通道和nsqd
之间的关系如下图所示:(来源:http://nsq.io/overview/design.html)
nsqlookupd
:这是一个守护进程,它掌握整个集群的情况并管理拓扑结构。客户端向nsqlookupd
实例查询,以发现托管特定主题消息(即作为生产者)的不同nsqd
实例。它的主要工作是提供目录服务,使客户端(消费者)可以查找它们感兴趣的要从中消费消息的主题所对应的nsqd
实例的地址。
每个nsqd
实例都与nsqlookupd
保持一个长期的TCP连接,通过这个连接,nsqd
会推送自身的状态(如健康状况或主题信息)。nsqlookupd
会整理这些信息,然后为希望消费特定主题的客户端提供正确的nsqd
实例集。nsqd
和nsqlookupd
实例独立运行,它们之间没有对等通信。对于nsqlookupd
,通过运行多个独立实例来实现高可用性。
客户端会轮询所有配置好的nsqlookupd
实例,并整合响应结果。客户端直接连接到nsqd
实例,如下图所示:
(来源:http://nsq.io/overview/design.html )
需要注意的是,一个主题的消息可能存在于多个nsqd
实例中,客户端会连接到所有承载该主题消息的nsqd
实例。
当客户端连接到一个nsqd
实例时,交互过程如下:
- 客户端表明它已准备好接收消息。客户端不是发送二进制信号,而是发送一个称为RDY状态的值,该值表明客户端准备好接收多少条消息。
nsqd
发送一条消息,并将数据暂时移动到另一个本地存储(如果需要重传)。- 消息被消费后,如果是需要重新入队的客户端,会回复一个FIN(完成)数据包,然后生成该消息的
nsqd
实例的通道会丢弃这条消息。如果客户端出现错误,可以使用REQ(重新入队)数据包请求再次获取该消息。 nsqd
在发送消息后会启动一个超时机制,如果没有收到响应,会自动将消息重新入队:(来源:http://nsq.io/overview/design.html )
NSQ可以将消息存储在内存中,并根据可配置的水印将其刷新到磁盘。如果需要保证所有消息的持久性,可以进行相应配置,使消息始终写入磁盘。为实现高可用性,客户端可以向多个nsqd
实例写入消息(这假定消费者是幂等的)。
# 发布消息
Go-NSQ
是NSQ官方的Go语言客户端,可在https://github.com/nsqio/go-nsq 上获取。
下面是NewProducer
函数:
func NewProducer(addr string, config *Config) (*Producer, error)
它为指定的nsqd
地址返回一个Producer
实例。nsqd
实例与客户端Producer
实例之间是一对一的映射关系。TCP连接采用延迟管理方式(在发布消息时才会建立连接)。
配置包含一系列可调整的参数,通过NewConfig()
方法创建。
Producer
结构体有两个用于发布消息的方法:
- 同步发布:
func (w *Producer) Publish(topic string, body []byte) error
- 异步发布:
func (w *Producer) PublishAsync(topic string, body []byte, doneChan
chan *ProducerTransaction, args...interface{}) error
2
这个方法异步发送消息,调用后立即返回。doneChan
通道会接收到一个ProducerTransaction
实例,如果有错误会包含错误信息,args
主要用于内部管理。
下面的代码片段展示了如何发布消息:
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
)
func main() {
// 连接
pCfg := nsq.NewConfig()
producer, err := nsq.NewProducer("127.0.0.1:4160", pCfg)
if err != nil {
log.Fatalf("failed creating producer %s", err)
}
// 异步发布
destinationTopic := "my_topic"
responseChan := make(chan *ProducerTransaction)
err = producer.PublishAsync(destinationTopic, []byte("a_message"),
responseChan, "some_args")
// 检查状态
// 这里为了展示直接内联处理
status := <-responseChan
if status.Error != nil {
log.Printf("Error received %s \n", status.Error.Error())
} else {
log.Printf("Success Arg received : %s \n",
status.Args[0].(string)) // 应该是some_args
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 消费消息
NewConsumer
方法为给定的主题创建一个新的NSQ消费者实例。我们需要指定通道名称,并在配置中传递各种可调整参数。该方法的签名如下:
func NewConsumer(topic string, channel string, config *Config) (*Consumer,
error)
2
Consumer
实例会配备一个处理程序,该处理程序会通过不同的goroutine并发执行。这个处理程序会处理从指定主题/通道消费的消息。如果进行了相应配置,Consumer
实例还会轮询nsqlookupd
实例,以发现并管理与nsqd
实例的连接。这是通过Consumer
结构体的ConnectToNSQD()
方法实现的。
下面的代码片段展示了基本用法:
package main
import (
"github.com/nsqio/go-nsq"
"log"
)
type MyMessageHandler struct {
totalMessages int
}
func (h *MyMessageHandler) HandleMessage(message *Message) error {
h.totalMessages++
log.Printf("Message no %d received, body : %s \n", h.totalMessages,
string(message.Body))
}
func main() {
config := NewConfig()
topicName := "my_topic"
channelName := "my_chan"
cons, err := NewConsumer(topicName, channelName, config)
if err != nil {
log.Fatal(err)
}
cons.AddHandler(&MyMessageHandler{})
// 等待退出信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// 停止消费者
cons.Stop()
<-cons.StopChan // 等待清理完成
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
你可能想知道消息的确认是如何进行的。默认情况下,当处理程序返回时,消息会被确认。然而,在处理程序中调用Message
结构体对象的DisableAutoResponse()
方法可以禁用自动确认。然后,我们可以在消息对象上调用Finish()
或Requeue()
方法来确认消息或请求重新传输消息。如下代码所示:
type MyMessageHandler struct{}
func (h *MyMessageHandler) HandleMessage(m *nsq.Message) error {
m.DisableAutoResponse()
delegateChannel <- m
return nil
}
go func() {
for m := range delegateChannel {
err := doSomeWork(m) // 一些耗时较长的任务
if err != nil {
m.Requeue(-1)
continue
}
m.Finish()
}
}()
cfg := nsq.NewConfig()
cfg.MaxInFlight = 1000 // 允许同时处理的最大消息数(并发控制参数)
topicName := "my_topic"
channelName := "my_chan"
cons, err := nsq.NewConsumer(topicName, channelName, cfg)
if err != nil {
log.Fatalf(err.Error())
}
// 下面的方法是AddHandler的替代方法,用于启用并发处理
// 第二个参数是用于处理的goroutine数量
cons.AddConcurrentHandlers(&MyMessageHandler{}, 20)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 集成模式
我们刚刚介绍的消息传递原语可以通过各种功能进行扩展,以实现复杂的架构模式。在第5章 “走向分布式” 中,我们了解了事件驱动架构范式。在本节中,我们将更详细地研究使用消息传递进行组件集成的各种模式。
虽然这些模式原则上可以通过持久化消息传递(如Kafka、NSQ等)来实现,但我们将使用Golang通道原语来演示集成模式。
# 请求 - 回复模式
在这种模式中,服务A(请求方)希望服务B(响应方)完成一些工作,并且期望得到请求的输出结果。考虑到服务B可能正在处理许多其他服务的请求,它该如何响应呢?
解决方案是让请求方将响应主题(在这种情况下是通道)告知响应方。这将响应方与请求方解耦。一个请求消息可能如下所示:
type Request struct {
someArg string
replyTo chan<- Response
}
type Response struct {
reply string
}
2
3
4
5
6
7
8
注意请求方随消息一起发送的replyTo
通道。请求方和响应方的代码如下:
func responder(c <-chan Request) {
for request := range c {
var resp Response
resp.reply = "reply-to-" + request.someArg
request.replyTo <- resp
}
}
func requestor(c chan<- Request) {
myChannel := make(chan Response)
for i := 0; i < 5; i++ {
c <- Request{fmt.Sprintf("message%d", i), myChannel}
resp := <-myChannel
fmt.Printf("request %d, response %s\n", i, resp.reply)
}
// 工作完成后清理
close(myChannel)
}
func main() {
requestChannel := make(chan Request)
go responder(requestChannel)
go requestor(requestChannel)
time.Sleep(time.Second * 10)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 关联标识符模式
在事件驱动架构设置中,消息可能会流经多个服务。在这种情况下,为每条消息分配一个唯一标识符非常重要,以便在服务代码中进行关联和调试。
Golang有多种提供GUID(全局唯一标识符)生成功能的库:
- 使用时间引入随机性并实现时间聚类。
- 用随机数据填充ID的其余部分。
- 以允许字典排序的方式将GUID编码为URL安全的字符串。
各种选项的总结如下表所示:
包 | 示例ID | 说明 |
---|---|---|
github.com/segmentio/ksuid | 0pPKHjWprnVxGH7dEsAoXX2YQvU | 4字节时间(秒) + 16字节随机数据 |
github.com/rs/xid | b50vl5e54p1000fo3gh0 | 4字节时间(秒) + 3字节机器ID + 2字节进程ID + 3字节随机数据 |
github.com/kjk/betterguid | -Kmdih_fs4ZZccpx2Hl1 | 8字节时间(毫秒) + 9字节随机数据 |
github.com/sony/sonyflake | 20f8707d6000108 | 基于Twitter生成推文ID的设计,简单但随机性最低。6字节时间(10毫秒) + 1字节序列号 + 2字节机器ID |
为便于调试,必须在日志中记录这些关联标识符以及日志需要传达的其他信息。
# 管道和过滤器模式
在许多需求场景中,单个事件会触发一个工作流(或一系列处理步骤),作为对该事件所需响应的一部分。例如,在票务支付场景中,我们可能需要验证支付信息、确认预订,并发送电子发票。每个单独的操作都是独立的,有时还会成为多个工作流的一部分。我们需要一种方式将这些处理器组合起来,以实现不同的工作流。
管道和过滤器架构模式旨在通过将工作流分解为一系列较小的、独立的处理器(称为过滤器)来提供解决方案。过滤器之间通过称为管道的消息通道连接。
每个过滤器都有一个非常简单的接口 —— 它从一个输入管道接收输入消息,并将输出写入到一个输出管道。它在内部封装了消息的处理过程,并且在初始化时可以将更多数据作为上下文的一部分。一个过滤器的输出管道连接到另一个过滤器的输入管道,从而组成工作流。
以下代码展示了一个计算 $y = x^2 + c$ 值的工作流示例。发射器过滤器是链的起点,它生成从0到给定值的数字。这些数字随后流入 xSquare
过滤器,该过滤器对值进行平方并将结果输出到另一个管道。然后,这些结果作为输入进入 addC
过滤器,该过滤器完成等式的最后一部分计算:
func emitter(till int) <-chan int {
out := make(chan int)
go func() {
for i := 0; i < till; i++ {
out <- i
}
close(out)
}()
return out
}
func xSquare(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for x := range in {
out <- x * x
}
close(out) // close forward
}()
return out
}
func addC(in <-chan int, c int) <-chan int {
out := make(chan int)
go func() {
for x := range in {
out <- x + c
}
close(out) // close forward
}()
return out
}
func main() {
// y = x*x + c
out := addC(
xSquare(emitter(3)),
5)
for y := range out {
fmt.Println(y)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
完成这段代码编写后,很容易将其扩展为计算 $y = x^4 + c$,如下所示:
// y = x*x*x*x + c
out1 := addC(
xSquare(xSquare(emitter(3))),
5)
for y := range out1 {
fmt.Println(y)
}
2
3
4
5
6
7
这段代码将输出:
5
6
21
2
3
# 基于内容的路由器模式
在管道和过滤器模式中,我们经常会遇到这样的用例:消息的目的地并不总是固定的,实际上取决于消息中的上下文。例如,在旅游网站中,对于不同酒店和航班的处理(查看),我们可能有不同的目标主题。基于内容的路由器模式会检查消息内容,并根据消息中包含的数据/元数据对消息进行路由。
路由函数可能是系统架构中的一个脆弱点,可能会成为存放各种杂项逻辑和频繁更改的地方。克服这一问题的一种方法是使用基于规则的引擎来决定消息路由。Govaluate(https://github.com/Knetic/govaluate)是一个很好的基于规则的评估框架,可用于此目的:
来源:http://camel.apache.org/dynamic-router.html
# 扇入模式
有时存在这样的需求:从多个源获取消息并进行一些处理。但无法保证在给定时间哪个源会准备好消息。如果我们在循环中处理所有源,那么对于没有消息的源,循环将会阻塞。虽然可以检查消息是否可用并设置超时,但这会使代码变得更加复杂。
理想的解决方案是将消息合并到一个扇入(fanIn)通道中,然后再用于处理。以下代码片段展示了这种模式:
func main() {
c := fanIn(emitter("Source1"), emitter("Source2"))
for i := 0; i < 10; i++ {
fmt.Println(<-c) // 显示FanIn通道的输出。
}
}
// 这个函数将多个源合并到一个Fan-In通道
func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string) // FanIn通道
// 为避免阻塞,在单独的协程中监听输入通道
go func() {
for {
c <- <-input1 // 将消息写入FanIn通道,这是一个阻塞调用。
}
}()
go func() {
for {
c <- <-input2 // 将消息写入FanIn通道,这是一个阻塞调用。
}
}()
return c
}
// 模拟数据源的函数
func emitter(name string) <-chan string {
c := make(chan string)
go func() {
for i := 0; ; i++ {
c <- fmt.Sprintf("[%s] says %d", name, i)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) // 休眠一段时间
}
}()
return c
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
我们也可以使用 select
关键字来实现扇入,实际上,这在Go语言中更符合习惯用法。使用 select
的扇入实现如下:
func fanInSelect(input1, input2 <-chan string) <-chan string {
out := make(chan string)
go func() {
for {
select {
case in := <-input1:
out <- in
case in := <-input2:
out <- in
}
}
}()
return out
}
2
3
4
5
6
7
8
9
10
11
12
13
14
扇入模式可以与请求 - 回复模式结合使用,以便在扇入通道的输入之间实现顺序控制。每个源在发送消息后,会在其自身的布尔通道上阻塞,该通道作为消息的一部分被传递到扇入通道。然后,在扇入处理器通过该通道发送信号解除阻塞之前,源实际上处于暂停状态。
# 扇出模式
除了对整个消息进行路由外,路由器还可以拆分消息,然后将它们发送给不同的组件进行处理。以下代码片段展示了这种模式:
type Message struct {
body string
key int
}
func main() {
evenPipe, oddPipe := fanOut(emitter())
sink("even", evenPipe)
sink("odd", oddPipe)
time.Sleep(10 * time.Second)
}
// 这个函数将输入消息扇出到不同的通道
func fanOut(input <-chan Message) (<-chan Message, <-chan Message) {
even := make(chan Message) // 扇出通道
odd := make(chan Message) // 扇出通道
// 启动扇出循环
go func() {
for {
msg := <-input
if msg.key%2 == 0 {
even <- msg
} else {
odd <- msg
}
}
}()
return even, odd
}
// 模拟数据源的函数
func emitter() <-chan Message {
c := make(chan Message)
go func() {
for i := 0; ; i++ {
c <- Message{fmt.Sprintf("Message[%d]", i), i}
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) // 休眠一段时间
}
}()
return c
}
func sink(name string, in <-chan Message) {
go func() {
for {
msg := <-in
fmt.Printf("[%s] says %s\n", name, msg.body)
}
}()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
该拓扑结构包含一个发射器,它连接到一个扇出组件,该扇出组件根据消息体中的键将输出多路复用到两个输出通道(此代码片段也是基于内容的路由器模式的一个示例)。
# 后台工作者模式
有时,消息处理的一部分不需要立即产生任何有用的输出。例如,当预订机票并完成支付后,向用户返回“机票预订成功!” 即可,同时通过电子邮件向他们发送详细行程;我们无需在发送电子邮件之前,让客户端在预订机票的API上阻塞。
后台工作者模式通过使一个组件能够将工作委托给在后台运行的其他组件,来解决这类情况。
worker.go
和 worker_test.go
文件(https://github.com/cookingkode/worker)是一个通用框架的示例,其中工作协程接受以下类型的工作:
type Work struct {
Key string
Args interface{}
}
2
3
4
该框架使用键将工作消息分发给一组后台工作者。
可以从一个处理函数中启动一个后台工作者,该处理函数将工作对象作为参数:
func sampleHandler(work *Work) {
fmt.Printf("Dummy Handler \t")
fmt.Printf("Work :: %v : %v\n", work.Key, work.Args)
}
2
3
4
然后,驱动程序可以启动工作者并分配工作:
func Driver() {
// 创建4个工作者
w := NewWorker(4, sampleHandler)
w.StartWork()
// 给工作者分配一些工作
dumbWork := &Work{Key: "hi", Args: "there",}
w.Push(dumbWork)
w.Push(dumbWork)
time.Sleep(6000 * time.Millisecond)
// 此时sampleHandler应该已经打印了两次
// 停止
w.StopWork()
}
2
3
4
5
6
7
8
9
10
11
12
13
# 总结
在本章中,我们学习了一些消息传递理论,以及Apache Kafka和NSQ。消息传递在构建微服务中起着关键作用。我们还通过实际代码描述了各种消息传递模式,如请求 - 回复、扇出、管道和过滤器模式。
在下一章中,我们将学习如何构建API。