📌Golang📌常用包📌go-nsq.txt
"github.com/nsqio/go-nsq"是Go连接NSQ的官方包。

配置结构中的常用设置
type Config struct {
	DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`
	ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
	WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`

	// Maximum duration when REQueueing (for doubling of deferred requeue)
	MaxRequeueDelay     time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
	DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`

	// Maximum amount of time to backoff when processing fails. 0 == no backoff
	MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
	// Unit of time for calculating consumer backoff
	BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
	// Maximum number of times this consumer will attempt to process a message before giving up
	MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

	TlsConfig *tls.Config `opt:"tls_config"`

	// Maximum number of messages to allow in flight (concurrency knob)
	MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`

	// The server-side message timeout for messages delivered to this client
	MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`

	// Secret for nsqd authentication (requires nsqd 0.2.29+)
	AuthSecret string `opt:"auth_secret"`
	// Use AuthSecret as 'Authorization: Bearer {AuthSecret}' on lookupd queries
	LookupdAuthorization bool `opt:"skip_lookupd_authorization" default:"true"`
}

返回一个新的默认配置,配置项可以直接赋值设置
func NewConfig() *Config 

返回指定地址的生产者
func NewProducer(addr string, config *Config) (*Producer, error)

此方法可用于验证新创建的Producer实例是否正确正确配置,而不是依赖生产者的懒惰“发布时连接”行为。
func (w *Producer) Ping() error

优雅停止,会阻塞直至完成
func (w *Producer) Stop()

同步投递消息的常用方法
func (w *Producer) Publish(topic string, body []byte) error
func (w *Producer) MultiPublish(topic string, body [][]byte) error
func (w *Producer) DeferredPublish(topic string, delay time.Duration, body []byte) error
延时投递和REQ命令最长delay时间均为1hour

Message是包含id、正文和元数据的基本数据类型。
type Message struct {
	ID        [16]byte  // 消息ID
	Body      []byte    // 消息体
	Timestamp int64     // 时间戳(ns)
	Attempts  uint16    // 尝试次数(从1开始)
	// ...
}

Handler是消费者的消息处理接口,当返回值为nil时,消费者会自动执行FINishing;返回值非nil时,消费者会自动执行REQueing。
type Handler interface {
	HandleMessage(message *Message) error
}

一个实现了Handler接口的方法类型
type HandlerFunc func(message *Message) error

禁止Handler程序自动返回响应值(基于返回值是否为nil的FIN/REQ)。
func (m *Message) DisableAutoResponse()

判断消息是否设置为自动响应
func (m *Message) IsAutoResponseDisabled() bool

判断消息是否已响应
func (m *Message) HasResponded() bool

发送FIN命令
func (m *Message) Finish()

发送TOUCH命令
func (m *Message) Touch()

延迟向发送此消息的nsqd发送REQ命令,delay为-1会根据尝试次数和默认延时计算。
func (m *Message) Requeue(delay time.Duration)

与Requeue类似但不会backoff
func (m *Message) RequeueWithoutBackoff(delay time.Duration)

创建一个指定topic和channel的消费者
func NewConsumer(topic string, channel string, config *Config) (*Consumer, error)

设置消费者的处理程序,concurrency为要生成的goroutine的数量。如果在连接到NSQD或NSQLookupd后调用会panic
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int)
func (r *Consumer) AddHandler(handler Handler) // r.AddConcurrentHandlers(handler, 1)

将nsqlookupd地址添加到此消费者实例的列表中
func (r *Consumer) ConnectToNSQLookupd(addr string) error
func (r *Consumer) ConnectToNSQLookupds(addresses []string) error

使用多个nsqd地址直接连接,用于连接本地实例。(通常使用ConnectToNSQLookupd[s])
func (r *Consumer) ConnectToNSQD(addr string) error
func (r *Consumer) ConnectToNSQDs(addresses []string) error

优雅停止,会阻塞直至完成
func (r *Consumer) Stop()

========== ========== ========== ========== ==========

package gnsq

import (
	"github.com/nsqio/go-nsq"
	"log"
)

func NewProducer(addr string) *nsq.Producer {
	producer, err := nsq.NewProducer(addr, nsq.NewConfig())
	if err != nil {
		log.Fatal(err)
	}
	producer.SetLogger(silent{}, nsq.LogLevelDebug)
	err = producer.Ping()
	if err != nil {
		log.Fatal(err)
	}
	return producer
}

func NewConsumer(addr, topic, channel string, concurrency int, handler nsq.HandlerFunc) *nsq.Consumer {
	cfg := nsq.NewConfig()
	cfg.MaxInFlight = concurrency
	consumer, err := nsq.NewConsumer(topic, channel, cfg)
	if err != nil {
		log.Fatal(err)
	}
	consumer.SetLogger(silent{}, nsq.LogLevelDebug)
	consumer.AddConcurrentHandlers(handler, concurrency)
	err = consumer.ConnectToNSQLookupd(addr)
	if err != nil {
		log.Fatal(err)
	}
	return consumer
}

type silent struct{}

func (silent) Output(_ int, _ string) error {
	return nil
}

========== ========== ========== ========== ==========

func main() {
	consumer := gnsq.NewConsumer("127.0.0.1:4161", "topic", "channel", 4, Handle)
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	<-quit
	consumer.Stop()
}

func Handle(msg *nsq.Message) error {
	log.Println(string(msg.ID[:]))
	log.Println(msg.Attempts)
	log.Println(msg.Timestamp)
	log.Println(string(msg.Body))
	return nil
}