go-nsq接收消息的处理:
与发送消息不同,consumer连接的是nsqlookupd地址
func (h *messageHandler) HandleMessage(m *go_nsq.Message) error {
if len(m.Body) == 0 {
return nil
}
log.Println(string(m.Body), m.ID, m.Attempts, m.NSQDAddress, m.Timestamp)
return nil
}
func main() {
log.SetFlags(log.Lshortfile | log.Ltime)
config := go_nsq.NewConfig()
consumer, err := go_nsq.NewConsumer("test", "test_channel22", config)
if err != nil {
log.Println(err)
}
defer consumer.Stop()
consumer.AddConcurrentHandlers(&messageHandler{}, 2)
err = consumer.ConnectToNSQLookupd("192.168.200.151:4161")
//err = consumer.ConnectToNSQD("192.168.200.151:4150")
if err != nil {
log.Fatal(err)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1)
<-sigChan
}
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
//...... 省略部分代码
if numLookupd == 1 {
r.queryLookupd() //lookupd查询producers
r.wg.Add(1)
go r.lookupdLoop() //定时获取producers,保证最终获取所有的producer
}
return nil
}
查看发送和接收到的数据包
返回json格式
consumer 连接topic所在的nsgd
发送版本号
收到消息
接收消息流程:
- consumer.ConnectToNSQLookupd("192.168.200.151:4161")
- r.queryLookupd()
- err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
- err = r.ConnectToNSQD(addr)
下面章节分析nsqd内部收到producer和consumer注册 后处理流程;以及nsq的问题和可能的增强方法