📌Golang📌常用包📌go-nsq.txt
"github.com/nsqio/go-nsq"是Go连接NSQ的官方包。
配置结构中的常用设置
type Config struct {
DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`
ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`
// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
// Maximum amount of time to backoff when processing fails. 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
// Unit of time for calculating consumer backoff
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
TlsConfig *tls.Config `opt:"tls_config"`
// Maximum number of messages to allow in flight (concurrency knob)
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`
// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
// Secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
// Use AuthSecret as 'Authorization: Bearer {AuthSecret}' on lookupd queries
LookupdAuthorization bool `opt:"skip_lookupd_authorization" default:"true"`
}
返回一个新的默认配置,配置项可以直接赋值设置
func NewConfig() *Config
返回指定地址的生产者
func NewProducer(addr string, config *Config) (*Producer, error)
此方法可用于验证新创建的Producer实例是否正确正确配置,而不是依赖生产者的懒惰“发布时连接”行为。
func (w *Producer) Ping() error
优雅停止,会阻塞直至完成
func (w *Producer) Stop()
同步投递消息的常用方法
func (w *Producer) Publish(topic string, body []byte) error
func (w *Producer) MultiPublish(topic string, body [][]byte) error
func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error
延时投递和REQ命令最长delay时间均为1hour
Message是包含id、正文和元数据的基本数据类型。
type Message struct {
ID [16]byte // 消息ID
Body []byte // 消息体
Timestamp int64 // 时间戳(ns)
Attempts uint16 // 尝试次数(从1开始)
// ...
}
Handler是消费者的消息处理接口,当返回值为nil时,消费者会自动执行FINishing;返回值非nil时,消费者会自动执行REQueing。
type Handler interface {
HandleMessage(message *Message) error
}
一个实现了Handler接口的方法类型
type HandlerFunc func(message *Message) error
禁止Handler程序自动返回响应值(基于返回值是否为nil的FIN/REQ)。
func (m *Message) DisableAutoResponse()
判断消息是否设置为自动响应
func (m *Message) IsAutoResponseDisabled() bool
判断消息是否已响应
func (m *Message) HasResponded() bool
发送FIN命令
func (m *Message) Finish()
发送TOUCH命令
func (m *Message) Touch()
延迟向发送此消息的nsqd发送REQ命令,delay为-1会根据尝试次数和默认延时计算。
func (m *Message) Requeue(delay time.Duration)
与Requeue类似但不会backoff
func (m *Message) RequeueWithoutBackoff(delay time.Duration)
创建一个指定topic和channel的消费者
func NewConsumer(topic string, channel string, config *Config) (*Consumer, error)
设置消费者的处理程序,concurrency为要生成的goroutine的数量。如果在连接到NSQD或NSQLookupd后调用会panic
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int)
func (r *Consumer) AddHandler(handler Handler) // r.AddConcurrentHandlers(handler, 1)
将nsqlookupd地址添加到此消费者实例的列表中
func (r *Consumer) ConnectToNSQLookupd(addr string) error
func (r *Consumer) ConnectToNSQLookupds(addresses []string) error
使用多个nsqd地址直接连接,用于连接本地实例。(通常使用ConnectToNSQLookupd[s])
func (r *Consumer) ConnectToNSQD(addr string) error
func (r *Consumer) ConnectToNSQDs(addresses []string) error
优雅停止,会阻塞直至完成
func (r *Consumer) Stop()
========== ========== ========== ========== ==========
package gnsq
import (
"github.com/nsqio/go-nsq"
"log"
)
func NewProducer(addr string) *nsq.Producer {
producer, err := nsq.NewProducer(addr, nsq.NewConfig())
if err != nil {
log.Fatal(err)
}
producer.SetLogger(silent{}, nsq.LogLevelDebug)
err = producer.Ping()
if err != nil {
log.Fatal(err)
}
return producer
}
func NewConsumer(addr, topic, channel string, concurrency int, handler nsq.HandlerFunc) *nsq.Consumer {
cfg := nsq.NewConfig()
cfg.MaxInFlight = concurrency
consumer, err := nsq.NewConsumer(topic, channel, cfg)
if err != nil {
log.Fatal(err)
}
consumer.SetLogger(silent{}, nsq.LogLevelDebug)
consumer.AddConcurrentHandlers(handler, concurrency)
err = consumer.ConnectToNSQLookupd(addr)
if err != nil {
log.Fatal(err)
}
return consumer
}
type silent struct{}
func (silent) Output(_ int, _ string) error {
return nil
}
========== ========== ========== ========== ==========
func main() {
consumer := gnsq.NewConsumer("127.0.0.1:4161", "topic", "channel", 4, Handle)
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
<-quit
consumer.Stop()
}
func Handle(msg *nsq.Message) error {
log.Println(string(msg.ID[:]))
log.Println(msg.Attempts)
log.Println(msg.Timestamp)
log.Println(string(msg.Body))
return nil
}