📌服务组件📌NSQ.txt
官方文档: https://nsq.io/overview/design.html
下载安装: https://nsq.io/deployment/installing.html
mac通过brew安装完不要执行:brew services start nsq
windows下载安装包解压后,可将bin目录添加到环境变量
特性担保:
消息默认情况下不持久化(如果将--mem-queue-size设置为0,所有的消息将会存储到磁盘)
消息最少会被投递一次
接收到的消息是无序的
消费者最终找出所有主题的生产者
NSQ由三个守护进程组成:
nsqd是接收、排队并向客户端发送消息的守护进程。
nsqlookupd是管理拓扑信息并提供最终一致的发现服务的守护进程。
nsqadmin是一个WebUI,用于实时检查集群(并执行各种管理任务)。
NSQ中的数据流被建模为流和消费者的树状结构。
topic是一个独特的数据流。channel是订阅给定主题的消费者的逻辑分组。
topic和channel命名规则为:1~64字符,字母、数字、下划线、短横线、句点。
启动nsqlookupd:
nsqlookupd
启动nsqadmin:
nsqadmin --lookupd-http-address=127.0.0.1:4161
启动nsqd:
nsqd --lookupd-tcp-address=127.0.0.1:4160
如nsqlookup不能接收到该节点需指定--broadcast-address=127.0.0.1
如不指定-data-path默认用户主目录,如不指定tcp端口和http端口默认是4150和4151
多节点运行需要预先创建多个节点目录,每个节点启动命令指定-data-path,-tcp-address,-http-address参数
关闭终端窗口后台运行,可将启动命令编写到统一的脚本,
linux/mac下创建.sh文件写入:
nsqlookupd &
nsqadmin --lookupd-http-address=127.0.0.1:4161 &
nsqd --lookupd-tcp-address=127.0.0.1:4160 &
windows下(可以不添加到环境变量,D:/Program/nsq为解压后的目录)创建.ps1文件写入:
Start-Process -WindowStyle Hidden D:/Program/nsq/bin/nsqlookupd
Start-Process -WindowStyle Hidden D:/Program/nsq/bin/nsqd -ArgumentList "--lookupd-tcp-address=127.0.0.1:4160"
Start-Process -WindowStyle Hidden D:/Program/nsq/bin/nsqadmin -ArgumentList "--lookupd-http-address=127.0.0.1:4161"
投递单条消息默认最大1MB,如需改变默认值,可以启动nsqd的时候附上参数-max-msg-size
开放端口号:
nsqd生产者TCP端口4150
nsqd生产者HTTP端口4151
nsqlookupd用于接收nsqd广播TCP端口4160
nsqlookupd用于接收客户端发送的管理和发现操作请求HTTP端口4161
nsqadmin用于管理页面HTTP端口4171
向nsqd发送消息(省略了" -X POST")
curl -d "{MESSAGE}" 'http://127.0.0.1:4151/pub?topic={TOPIC}'
curl -d "{MESSAGE}\n{MESSAGE}\n{MESSAGE}" 'http://127.0.0.1:4151/mpub?topic={TOPIC}'
nsqd的topic和channel的创建删除清空暂停操作
curl -X POST 'http://127.0.0.1:4151/topic/create?topic={TOPIC}'
curl -X POST 'http://127.0.0.1:4151/topic/delete?topic={TOPIC}'
curl -X POST 'http://127.0.0.1:4151/channel/create?topic={TOPIC}&channel={CHANNEL}'
curl -X POST 'http://127.0.0.1:4151/channel/delete?topic={TOPIC}&channel={CHANNEL}'
curl -X POST 'http://127.0.0.1:4151/topic/empty?topic={TOPIC}'
curl -X POST 'http://127.0.0.1:4151/channel/empty?topic={TOPIC}&channel={CHANNEL}'
curl -X POST 'http://127.0.0.1:4151/topic/pause?topic={TOPIC}'
curl -X POST 'http://127.0.0.1:4151/topic/unpause?topic={TOPIC}'
curl -X POST 'http://127.0.0.1:4151/channel/pause?topic={TOPIC}&channel={CHANNEL}'
curl -X POST 'http://127.0.0.1:4151/channel/unpause?topic={TOPIC}&channel={CHANNEL}'
获取nsqd信息
curl 'http://127.0.0.1:4151/ping' #返回"OK"字符
curl 'http://127.0.0.1:4151/info'
eg: {"version":"1.2.0","broadcast_address":"127.0.0.1","hostname":"k8s-node01","http_port":4151,"tcp_port":4150,"start_time":1656032884}
curl 'http://127.0.0.1:4151/stats'
可选参数:format=json(default=text),topic,channel,include_clients=false(default=true),include_mem=false(default=true)
获取nsqlookupd信息
curl 'http://127.0.0.1:4161/ping' #返回"OK"字符
curl 'http://127.0.0.1:4161/info' #返回版本信息
curl 'http://127.0.0.1:4161/topics'
eg: {"topics":["topic_name1","topic_name2"]}
curl 'http://127.0.0.1:4161/channels?topic={TOPIC}'
eg: {"channels":["channel_1","channel_2"]}
curl 'http://127.0.0.1:4161/lookup?topic={TOPIC}'
eg: {"channels":["channel_1","channel_2"],"producers":[{"remote_address":"127.0.0.1:53226","hostname":"VM-16-15-centos","broadcast_address":"127.0.0.1","tcp_port":4150,"http_port":4151,"version":"1.2.1"}]}
curl 'http://10.225.16.15:4161/nodes'
eg: {"producers":[{"remote_address":"127.0.0.1:53226","hostname":"VM-16-15-centos","broadcast_address":"127.0.0.1","tcp_port":4150,"http_port":4151,"version":"1.2.1","tombstones":[false,false],"topics":["topic_name1","topic_name2"]}]}
nsqlookupd同样可以创建和删除topic和channel
curl -X POST 'http://127.0.0.1:4161/topic/create?topic={TOPIC}'
curl -X POST 'http://127.0.0.1:4161/topic/delete?topic={TOPIC}'
curl -X POST 'http://127.0.0.1:4161/channel/create?topic={TOPIC}&channel={CHANNEL}'
curl -X POST 'http://127.0.0.1:4161/channel/delete?topic={TOPIC}&channel={CHANNEL}'
可在web浏览器中打开 http://127.0.0.1:4171/ 查看nsqadmin用户界面并查看统计信息。
nsqadmin提供的API
GET http://127.0.0.1:4171/api/topics
eg: {"topics":["topic_name1","topic_name2"]}
GET http://127.0.0.1:4171/api/topics/:topic
eg: {"depth":0,"memory_depth":0,"backend_depth":0,"message_count":4,"paused":false,...}
GET http://127.0.0.1:4171/api/topics/:topic/:channel
eg: {"depth":0,"memory_depth":0,"backend_depth":0,"in_flight_count":0,"deferred_count":0,"requeue_count":10,"timeout_count":0,"message_count":4,,"paused":false,...}
========== ========== ========== ========== ==========
nsqd是一个负责接收、排队、转发消息到客户端的守护进程;
nsqlookupd是一个管理集群(nsqd)拓扑信息并提供最终一致性的服务注册与发现的守护进程;
nsqadmin是一个管理界面,提供实时的集群中topic、channel和message的统计信息,还提供与这些实体相关的各种管理接口。
message即代表的是数据,它被生产者创建并发送到指定的topic,而消费者可以从指定的topic和channel接收并消费消息。
topic代表生产者投递消息的一个逻辑键值,它可以将消息进行分类。一个nsqd上可包含多个topic,topic可在其第一次接收到生产者投递的消息时创建。
channel代表消费者订阅某个nsqd上的topic的消息。每当生产者将消息发布到一个topic上,消息会被拷贝到与topic关联的所有的channel。
而且,多个消费者可以订阅同一个channel,channel会将其接收到的消息随机(即随机负载均衡)发送到与其关联的一个客户端。
同topic类似,channel也可不用提前创建,消费者在第一次订阅消息的时候会创建此channel(若其不存在)。
channel从topic接收的消息首先会在内存中排队,当达到内存队列长度上限,就被写到持久化存储。
========== ========== ========== ========== ==========
go-nsq客户端消费一个不存在的topic不会异常退出,只会打印"{TOPIC}_NOT_FOUND"错误,待该topic创建后可立即正常消费。
但是当有多个channel的时候,最先一批消息可能仅复制到某一个channel上,最好先在nsqadmin上提前创建topic和channel。
PauseTopic后,仍可正常往该topic投递消息,但是新消息暂时不会复制到channel。
UnPauseTopic后,topic堆积的消息会立即复制到各channel上。
PauseChannel后,客户端会暂停消费channel上的消息,投递到topic的消息仍然可以正常复制到channel上。
UnPauseChannel后,客户端会恢复消费channel上的消息,channel如被删除会被运行的客户端重新创建。