本文共 4824 字,大约阅读时间需要 16 分钟。
1.消息ACK
如果不进行ACK,当消费端挂掉,比如channel关闭、connection关闭、TCPconnection关闭等都会使得消息丢失,而不进行重发。所以需要ACK,为了测试,关掉自动ACK选项,自己手动ACK,当接受到消息,sleep几秒再ACK
msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack 不进行自动ACK false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { // msgs 是一个channel,从中取东西 log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) // 统计d.Body中的"."的个数 t := time.Duration(dot_count) time.Sleep(t * time.Second) // 有几个点就sleep几秒 log.Printf("Done") d.Ack(false) // 手动ACK,如果不ACK的话,那么无法保证这个消息被处理,可能它已经丢失了(比如消息队列挂了) } }()
2.消息持久化
消息ACK保证了消息不会丢失,但是当rabbitMQ Server停止(不是consumer 挂掉)的时候,我们的所有消息都会丢失。
针对这种情况,我们先确保消息队列的持久化,设置消息队列的durable选项为true
q, err := ch.QueueDeclare( "task_queue", // name true, // durable 队列持久化 false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue")
队列持久化了,接下来就是消息持久化,使用amqp.Persistent选项
body := bodyFrom(os.Args) err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, amqp.Publishing{ DeliveryMode: amqp.Persistent, // 消息持久化,虽然消息设置持久化了,但是并不能保证一定会 ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body)
这种方式并不能保证消息一定持久化到硬盘中,可以还未来的及写入硬盘
3.消息的公平分派
设置Qos,设置预取大小prefetch,当prefetch=1时,表示在没收到consumer的ACK消息之前,只会为其consumer分派一个消息
// 为了保证公平分发,不至于其中某个consumer一直处理,而其他不处理 err = ch.Qos( 1, // prefetch count 在server收到consumer的ACK之前,预取的数量。为1,表示在没收到consumer的ACK之前,只会为其分发一个消息 0, // prefetch size 大于0时,表示在收到consumer确认消息之前,将size个字节保留在网络中 false, // global true:Qos对同一个connection的所有channel有效; false:Qos对同一个channel上的所有consumer有效 )
4.producer端完整程序
package mainimport ( "log" "os" "strings" "github.com/streadway/amqp")func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) }}func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "task_queue", // name true, // durable 发送与接送两端队列都要持久化,队列持久化了,但是并不能保证消息也持久化 false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") body := bodyFrom(os.Args) err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, amqp.Publishing{ DeliveryMode: amqp.Persistent, // 消息持久化,虽然消息设置持久化了,但是并不能保证一定会 ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body)}func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s}
5.consumer端完整代码
package mainimport ( "bytes" "log" "time" "github.com/streadway/amqp")func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) }}func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "task_queue", // name true, // durable 队列持久化 false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 为了保证公平分发,不至于其中某个consumer一直处理,而其他不处理 err = ch.Qos( 1, // prefetch count 在server收到consumer的ACK之前,预取的数量。为1,表示在没收到consumer的ACK之前,只会为其分发一个消息 0, // prefetch size 大于0时,表示在收到consumer确认消息之前,将size个字节保留在网络中 false, // global true:Qos对同一个connection的所有channel有效; false:Qos对同一个channel上的所有consumer有效 ) failOnError(err, "Failed to set QoS") msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack 不进行自动ACK false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { // msgs 是一个channel,从中取东西 log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) // 统计d.Body中的"."的个数 t := time.Duration(dot_count) time.Sleep(t * time.Second) // 有几个点就sleep几秒 log.Printf("Done") d.Ack(false) // 手动ACK,如果不ACK的话,那么无法保证这个消息被处理,可能它已经丢失了(比如消息队列挂了) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever}
6.运行
producer:
consumer:
consumer1 | consumer2 |