📌Golang📌常用包📌amqp091-go.txt
"github.com/rabbitmq/amqp091-go"是一个由RabbitMQ核心团队维护的amqp0.9.1的Go客户端。
========== ========== ========== ========== ==========
建立连接:
func Dial(url string) (*Connection, error) 等价于 DialTLS(url string, nil)
func DialConfig(url string, config Config) (*Connection, error)
func DialTLS(url string, amqps *tls.Config) (*Connection, error)
func DialTLS_ExternalAuth(url string, amqps *tls.Config) (*Connection, error)
func Open(conn io.ReadWriteCloser, config Config) (*Connection, error)
url参数格式为:scheme://username:password@address,scheme为amqp或amqps,例如:amqp://guest:guest@localhost:5672
未指定Config参数时,服务器心跳间隔默认为10秒,初始读取截止时间设置为30秒。
func (c *Connection) Channel() (*Channel, error)
打开一个唯一的并发服务器通道来处理大量AMQP消息。
最佳实践建议:
每个进程为生产者持有一个从单一连接建立的单一信道,所有生产者共用;
为消费者持有一个连接,每组消费者分别从连接建立一个信道给同组消费者共用。
========== ========== ========== ========== ==========
交换机和队列的声明与绑定可在管理后台统一操作。
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error
func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error)
func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error
func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error
========== ========== ========== ========== ==========
type Publishing struct {
Headers Table
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // Transient(0/1)暂态,更高的吞吐量,但在重新启动时不会恢复消息。Persistent(2)持久
Priority uint8 // 优先级0~9
CorrelationId string // correlation identifier
ReplyTo string // address to to reply to (ex: RPC)
Expiration string // message expiration spec
MessageId string // 消息ID,需投递时赋值,不会自动赋值
Timestamp time.Time // 消息时间戳,需投递时赋值,否则为UTC0时
Type string // message type name
UserId string // creating user id - ex: "guest"
AppId string // creating application id
Body []byte // 消息体
}
func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error
发送一条消息到交换机
func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error
控制服务器在接收到ack之前将尝试在网络上为消费者保留多少消息或多少字节。
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
func (ch *Channel) ConsumeWithContext(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
立即开始消费消息,消费者标识consumer应在信道上确保唯一,并且在需要停止获取消息调用ch.Cancel方法时传入该消费者标识。
consumer为空时,程序包会使用原子值生成进程内唯一标识。传入context的方法,当监听context取消时会自动对该消费者调用ch.Cancel方法。
func (ch *Channel) Cancel(consumer string, noWait bool) error
停止向消费者交付新的消息。
========== ========== ========== ========== ==========
type Delivery struct {
Acknowledger Acknowledger // the channel from which this delivery arrived
ConsumerTag string // Valid only with Channel.Consume
MessageCount uint32 // Valid only with Channel.Get
DeliveryTag uint64
Redelivered bool // 首次消费时为false,requeue后为true
Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key
// ... 更多字段同Publishing结构,值源于投递时赋值。
}
消费者的autoAck为false时手动确认消息,当autoAck为true时不应调用以下方法:
func (d Delivery) Ack(multiple bool) error
确认一条消息
func (d Delivery) Nack(multiple, requeue bool) error
func (d Delivery) Reject(requeue bool) error
否定确认消息
========== ========== ========== ========== ==========
package rabbitmq
import (
"context"
"crypto/tls"
"crypto/x509"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"sync"
)
type Config struct {
Address string
Username string
Password string
Cert, Key, Ca string
}
// NewConnect 初始化单个连接 for consumer
func NewConnect(cfg *Config) *amqp.Connection {
scheme := "amqp"
var tlsConfig *tls.Config
if cfg.Cert != "" && cfg.Key != "" && cfg.Ca != "" {
certificate, err := tls.X509KeyPair([]byte(cfg.Cert), []byte(cfg.Key))
if err != nil {
log.Fatal(err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM([]byte(cfg.Ca)) {
log.Fatal("failed to parse root certificate")
}
tlsConfig = &tls.Config{
Certificates: []tls.Certificate{certificate},
RootCAs: pool,
}
scheme = "amqps"
}
dsn := scheme + "://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Address
conn, err := amqp.DialTLS(dsn, tlsConfig)
if err != nil {
log.Fatal(err)
}
return conn
}
// NewChannel 初始化单个连接并创建单个信道 for producer
func NewChannel(cfg *Config) *amqp.Channel {
conn := NewConnect(cfg)
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
return ch
}
/*
消费者方法简单封装,队列已在mq管理后台或其他地方创建。
初始化一个*Consumer,指定连接,队列名,并发数,逻辑方法
handler方法返回nil会自动ack消息,返回error会reject并判断是否requeue
调用*Consumer的Stop方法,会停止获取消息,并阻塞等待正在处理消息的handler方法完成
*/
type Consumer struct {
ch *amqp.Channel
stop context.CancelFunc
wg *sync.WaitGroup
}
type HandlerFunc func(*amqp.Delivery) error
func NewConsumer(conn *amqp.Connection, qname string, concurrency int, handler HandlerFunc) *Consumer {
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
if err = ch.Qos(1, 0, false); err != nil {
log.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
consumer := &Consumer{
ch: ch,
stop: cancel,
wg: &sync.WaitGroup{},
}
for i := 0; i < concurrency; i++ {
consumer.wg.Add(1)
go func() {
msgs, err := ch.ConsumeWithContext(ctx, qname, "", false, false, false, false, nil)
if err != nil {
log.Fatal(err)
}
for d := range msgs {
if handler(&d) == nil { // 处理成功确认
_ = d.Ack(false)
} else { // 处理失败拒绝,为避免无限requeue,仅当首次出错或队列有delivery-limit时requeue
requeue := !d.Redelivered || (d.Headers != nil && d.Headers["x-delivery-count"] != nil)
_ = d.Reject(requeue)
}
}
consumer.wg.Done()
}()
}
return consumer
}
func (c *Consumer) Stop() {
c.stop()
c.wg.Wait()
_ = c.ch.Close()
}
========== ========== ========== ========== ==========
func main() {
conn := rabbitmq.NewConnect(&rabbitmq.Config{
Username: "guest",
Password: "guest",
Address: "localhost:5672",
})
consumer := rabbitmq.NewConsumer(conn, "qname", 4, Handle)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-quit
consumer.Stop()
}
func Handle(d *amqp.Delivery) error {
log.Println(d.MessageId)
log.Println(d.Headers)
log.Println(d.Timestamp)
log.Println(string(d.Body))
return nil
}