📌Golang📌常用包📌amqp091-go.txt
"github.com/rabbitmq/amqp091-go"是一个由RabbitMQ核心团队维护的amqp0.9.1的Go客户端。

========== ========== ========== ========== ==========

建立连接:
func Dial(url string) (*Connection, error) 等价于 DialTLS(url string, nil)
func DialConfig(url string, config Config) (*Connection, error)
func DialTLS(url string, amqps *tls.Config) (*Connection, error)
func DialTLS_ExternalAuth(url string, amqps *tls.Config) (*Connection, error)
func Open(conn io.ReadWriteCloser, config Config) (*Connection, error)
url参数格式为:scheme://username:password@address,scheme为amqp或amqps,例如:amqp://guest:guest@localhost:5672
未指定Config参数时,服务器心跳间隔默认为10秒,初始读取截止时间设置为30秒。

func (c *Connection) Channel() (*Channel, error)
打开一个唯一的并发服务器通道来处理大量AMQP消息。

最佳实践建议:
每个进程为生产者持有一个从单一连接建立的单一信道,所有生产者共用;
为消费者持有一个连接,每组消费者分别从连接建立一个信道给同组消费者共用。

========== ========== ========== ========== ==========

交换机和队列的声明与绑定可在管理后台统一操作。

func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error

func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)

func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error

========== ========== ========== ========== ==========

type Publishing struct {
	Headers         Table
	ContentType     string    // MIME content type
	ContentEncoding string    // MIME content encoding
	DeliveryMode    uint8     // Transient(0/1)暂态,更高的吞吐量,但在重新启动时不会恢复消息。Persistent(2)持久
	Priority        uint8     // 优先级0~9
	CorrelationId   string    // correlation identifier
	ReplyTo         string    // address to to reply to (ex: RPC)
	Expiration      string    // message expiration spec
	MessageId       string    // 消息ID,需投递时赋值,不会自动赋值
	Timestamp       time.Time // 消息时间戳,需投递时赋值,否则为UTC0时
	Type            string    // message type name
	UserId          string    // creating user id - ex: "guest"
	AppId           string    // creating application id
	Body            []byte    // 消息体
}

func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
发送一条消息到交换机

func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
控制服务器在接收到ack之前将尝试在网络上为消费者保留多少消息或多少字节。

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
立即开始消费消息,消费者标识consumer应在信道上确保唯一,并且在需要停止获取消息调用ch.Cancel方法时传入该消费者标识。
consumer为空时,程序包会使用原子值生成进程内唯一标识。传入context的方法,当监听context取消时会自动对该消费者调用ch.Cancel方法。

func (ch *Channel) Cancel(consumer string, noWait bool) error
停止向消费者交付新的消息。

========== ========== ========== ========== ==========

type Delivery struct {
	Acknowledger Acknowledger // the channel from which this delivery arrived
	ConsumerTag string        // Valid only with Channel.Consume
	MessageCount uint32       // Valid only with Channel.Get
	DeliveryTag uint64
	Redelivered bool   // 首次消费时为false,requeue后为true
	Exchange    string // basic.publish exchange
	RoutingKey  string // basic.publish routing key
	// ... 更多字段同Publishing结构,值源于投递时赋值。
}

消费者的autoAck为false时手动确认消息,当autoAck为true时不应调用以下方法:

func (d Delivery) Ack(multiple bool) error
确认一条消息

func (d Delivery) Nack(multiple, requeue bool) error
func (d Delivery) Reject(requeue bool) error
否定确认消息

========== ========== ========== ========== ==========

package rabbitmq

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
	"sync"
)

type Config struct {
	Address       string
	Username      string
	Password      string
	Cert, Key, Ca string
}

// NewConnect 初始化单个连接 for consumer
func NewConnect(cfg *Config) *amqp.Connection {
	scheme := "amqp"
	var tlsConfig *tls.Config
	if cfg.Cert != "" && cfg.Key != "" && cfg.Ca != "" {
		certificate, err := tls.X509KeyPair([]byte(cfg.Cert), []byte(cfg.Key))
		if err != nil {
			log.Fatal(err)
		}
		pool := x509.NewCertPool()
		if !pool.AppendCertsFromPEM([]byte(cfg.Ca)) {
			log.Fatal("failed to parse root certificate")
		}
		tlsConfig = &tls.Config{
			Certificates: []tls.Certificate{certificate},
			RootCAs:      pool,
		}
		scheme = "amqps"
	}
	dsn := scheme + "://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Address
	conn, err := amqp.DialTLS(dsn, tlsConfig)
	if err != nil {
		log.Fatal(err)
	}
	return conn
}

// NewChannel 初始化单个连接并创建单个信道 for producer
func NewChannel(cfg *Config) *amqp.Channel {
	conn := NewConnect(cfg)
	ch, err := conn.Channel()
	if err != nil {
		log.Fatal(err)
	}
	return ch
}

/*
消费者方法简单封装,队列已在mq管理后台或其他地方创建。
初始化一个*Consumer,指定连接,队列名,并发数,逻辑方法
handler方法返回nil会自动ack消息,返回error会reject并判断是否requeue
调用*Consumer的Stop方法,会停止获取消息,并阻塞等待正在处理消息的handler方法完成
*/

type Consumer struct {
	ch   *amqp.Channel
	stop context.CancelFunc
	wg   *sync.WaitGroup
}

type HandlerFunc func(*amqp.Delivery) error

func NewConsumer(conn *amqp.Connection, qname string, concurrency int, handler HandlerFunc) *Consumer {
	ch, err := conn.Channel()
	if err != nil {
		log.Fatal(err)
	}
	if err = ch.Qos(1, 0, false); err != nil {
		log.Fatal(err)
	}
	ctx, cancel := context.WithCancel(context.Background())
	consumer := &Consumer{
		ch:   ch,
		stop: cancel,
		wg:   &sync.WaitGroup{},
	}
	for i := 0; i < concurrency; i++ {
		consumer.wg.Add(1)
		go func() {
			msgs, err := ch.ConsumeWithContext(ctx, qname, "", false, false, false, false, nil)
			if err != nil {
				log.Fatal(err)
			}
			for d := range msgs {
				if handler(&d) == nil { // 处理成功确认
					_ = d.Ack(false)
				} else { // 处理失败拒绝,为避免无限requeue,仅当首次出错或队列有delivery-limit时requeue
					requeue := !d.Redelivered || (d.Headers != nil && d.Headers["x-delivery-count"] != nil)
					_ = d.Reject(requeue)
				}
			}
			consumer.wg.Done()
		}()
	}
	return consumer
}

func (c *Consumer) Stop() {
	c.stop()
	c.wg.Wait()
	_ = c.ch.Close()
}

========== ========== ========== ========== ==========

func main() {
	conn := rabbitmq.NewConnect(&rabbitmq.Config{
		Username: "guest",
		Password: "guest",
		Address:  "localhost:5672",
	})
	consumer := rabbitmq.NewConsumer(conn, "qname", 4, Handle)
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-quit
	consumer.Stop()
}

func Handle(d *amqp.Delivery) error {
	log.Println(d.MessageId)
	log.Println(d.Headers)
	log.Println(d.Timestamp)
	log.Println(string(d.Body))
	return nil
}