博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【rabbitMQ之二】rabbitMQ之工作队列(消息ACK、消息持久化、公平分派)-go语言
阅读量:4113 次
发布时间:2019-05-25

本文共 4824 字,大约阅读时间需要 16 分钟。

1.消息ACK

如果不进行ACK,当消费端挂掉,比如channel关闭、connection关闭、TCPconnection关闭等都会使得消息丢失,而不进行重发。所以需要ACK,为了测试,关掉自动ACK选项,自己手动ACK,当接受到消息,sleep几秒再ACK

msgs, err := ch.Consume(		q.Name, // queue		"",     // consumer		false,  // auto-ack   不进行自动ACK		false,  // exclusive		false,  // no-local		false,  // no-wait		nil,    // args	)	failOnError(err, "Failed to register a consumer")	forever := make(chan bool)	go func() {		for d := range msgs {  // msgs 是一个channel,从中取东西			log.Printf("Received a message: %s", d.Body)			dot_count := bytes.Count(d.Body, []byte("."))  // 统计d.Body中的"."的个数			t := time.Duration(dot_count)			time.Sleep(t * time.Second) // 有几个点就sleep几秒			log.Printf("Done")			d.Ack(false)  // 手动ACK,如果不ACK的话,那么无法保证这个消息被处理,可能它已经丢失了(比如消息队列挂了)		}	}()

2.消息持久化

消息ACK保证了消息不会丢失,但是当rabbitMQ Server停止(不是consumer 挂掉)的时候,我们的所有消息都会丢失。

针对这种情况,我们先确保消息队列的持久化,设置消息队列的durable选项为true

q, err := ch.QueueDeclare(		"task_queue", // name		true,         // durable 队列持久化		false,        // delete when unused		false,        // exclusive		false,        // no-wait		nil,          // arguments	)	failOnError(err, "Failed to declare a queue")

队列持久化了,接下来就是消息持久化,使用amqp.Persistent选项

body := bodyFrom(os.Args)	err = ch.Publish(		"",     // exchange		q.Name, // routing key		false,  // mandatory		false,		amqp.Publishing{			DeliveryMode: amqp.Persistent, // 消息持久化,虽然消息设置持久化了,但是并不能保证一定会			ContentType:  "text/plain",			Body:         []byte(body),		})	failOnError(err, "Failed to publish a message")	log.Printf(" [x] Sent %s", body)

这种方式并不能保证消息一定持久化到硬盘中,可以还未来的及写入硬盘

3.消息的公平分派

设置Qos,设置预取大小prefetch,当prefetch=1时,表示在没收到consumer的ACK消息之前,只会为其consumer分派一个消息

// 为了保证公平分发,不至于其中某个consumer一直处理,而其他不处理	err = ch.Qos(		1,     // prefetch count  在server收到consumer的ACK之前,预取的数量。为1,表示在没收到consumer的ACK之前,只会为其分发一个消息		0,     // prefetch size 大于0时,表示在收到consumer确认消息之前,将size个字节保留在网络中		false, // global  true:Qos对同一个connection的所有channel有效; false:Qos对同一个channel上的所有consumer有效	)

4.producer端完整程序

package mainimport (	"log"	"os"	"strings"	"github.com/streadway/amqp")func failOnError(err error, msg string) {	if err != nil {		log.Fatalf("%s: %s", msg, err)	}}func main() {	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")	failOnError(err, "Failed to connect to RabbitMQ")	defer conn.Close()	ch, err := conn.Channel()	failOnError(err, "Failed to open a channel")	defer ch.Close()	q, err := ch.QueueDeclare(		"task_queue", // name		true,         // durable  发送与接送两端队列都要持久化,队列持久化了,但是并不能保证消息也持久化		false,        // delete when unused		false,        // exclusive		false,        // no-wait		nil,          // arguments	)	failOnError(err, "Failed to declare a queue")	body := bodyFrom(os.Args)	err = ch.Publish(		"",     // exchange		q.Name, // routing key		false,  // mandatory		false,		amqp.Publishing{			DeliveryMode: amqp.Persistent, // 消息持久化,虽然消息设置持久化了,但是并不能保证一定会			ContentType:  "text/plain",			Body:         []byte(body),		})	failOnError(err, "Failed to publish a message")	log.Printf(" [x] Sent %s", body)}func bodyFrom(args []string) string {	var s string	if (len(args) < 2) || os.Args[1] == "" {		s = "hello"	} else {		s = strings.Join(args[1:], " ")	}	return s}

5.consumer端完整代码

package mainimport (	"bytes"	"log"	"time"	"github.com/streadway/amqp")func failOnError(err error, msg string) {	if err != nil {		log.Fatalf("%s: %s", msg, err)	}}func main() {	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")	failOnError(err, "Failed to connect to RabbitMQ")	defer conn.Close()	ch, err := conn.Channel()	failOnError(err, "Failed to open a channel")	defer ch.Close()	q, err := ch.QueueDeclare(		"task_queue", // name		true,         // durable 队列持久化		false,        // delete when unused		false,        // exclusive		false,        // no-wait		nil,          // arguments	)	failOnError(err, "Failed to declare a queue")	// 为了保证公平分发,不至于其中某个consumer一直处理,而其他不处理	err = ch.Qos(		1,     // prefetch count  在server收到consumer的ACK之前,预取的数量。为1,表示在没收到consumer的ACK之前,只会为其分发一个消息		0,     // prefetch size 大于0时,表示在收到consumer确认消息之前,将size个字节保留在网络中		false, // global  true:Qos对同一个connection的所有channel有效; false:Qos对同一个channel上的所有consumer有效	)	failOnError(err, "Failed to set QoS")	msgs, err := ch.Consume(		q.Name, // queue		"",     // consumer		false,  // auto-ack   不进行自动ACK		false,  // exclusive		false,  // no-local		false,  // no-wait		nil,    // args	)	failOnError(err, "Failed to register a consumer")	forever := make(chan bool)	go func() {		for d := range msgs {  // msgs 是一个channel,从中取东西			log.Printf("Received a message: %s", d.Body)			dot_count := bytes.Count(d.Body, []byte("."))  // 统计d.Body中的"."的个数			t := time.Duration(dot_count)			time.Sleep(t * time.Second) // 有几个点就sleep几秒			log.Printf("Done")			d.Ack(false)  // 手动ACK,如果不ACK的话,那么无法保证这个消息被处理,可能它已经丢失了(比如消息队列挂了)		}	}()	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")	<-forever}

6.运行

producer:

consumer:

consumer1 consumer2
你可能感兴趣的文章
C# 简单的矩阵运算
查看>>
gcc 常用选项详解
查看>>
c++输出文件流ofstream用法详解
查看>>
firewalld的基本使用
查看>>
Linux下SVN客户端使用教程
查看>>
Linux分区方案
查看>>
nc 命令详解
查看>>
如何使用 systemd 中的定时器
查看>>
git命令速查表
查看>>
linux进程监控和自动重启的简单实现
查看>>
OpenFeign学习(三):OpenFeign配置生成代理对象
查看>>
OpenFeign学习(四):OpenFeign的方法同步请求执行
查看>>
OpenFeign学习(五):OpenFeign请求结果处理及重试控制
查看>>
OpenFeign学习(六):OpenFign进行表单提交参数或传输文件
查看>>
OpenFeign学习(七):Spring Cloud OpenFeign的使用
查看>>
Ribbon 学习(二):Spring Cloud Ribbon 加载配置原理
查看>>
Ribbon 学习(三):RestTemplate 请求负载流程解析
查看>>
深入理解HashMap
查看>>
XML生成(一):DOM生成XML
查看>>
XML生成(三):JDOM生成
查看>>