详解 (MAC端 + Linux端)"/>
NSQ使用详解 (MAC端 + Linux端)
关于nsq的功能和特性,本文中不再赘述。
对nsq完全没有了解的同学,可以读一下nsq官方文档,英文阅读能力有限的同学可以阅读翻译后的版本。但是翻译后的quick_start(快速开始)部分的代码有点问题。
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test'
应该改成
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
也就是pub写成了put。大家拿来练习的时候要注意。
一、nsq安装
mac的同学直接
$ brew install nsq
linux请到nsq下载目录中下载,并解压到自己的工作路径中。如:
~/nsq_1.1.0
下面需要运行nsqd等程序的时候,mac的同学可以直接键入
nsqd <--paramters-->
而linux的同学还需要键入工作路径,如
./~/nsq_1.1.0/bin/nsqd <--paramters-->,
当然你也可以设置系统路径来避免键入工作路径这个步骤。
二、实现官方文档上面的入门代码
代码原址在此
使用mac的同学可能会发现自己运行到第六步时
$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
在这里会报错。这是由于nsq_to_file无法找到nsqd。无法找到的原因是我们在运行第三步
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
没有指定--broadcast-address,Mac会默认--broadcast-address=“”。我们需要自己指定--broadcast-address,在本机运行时可以直接为127.0.0.1,如quick_start中的代码可以运行成功。这里参考了一篇博客,感恩的心~
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
但是,如果你的nsqd和lookup是不在同一台机器上,你需要设置成你nsqd现在运行的机器的ip地址。这样等你访问admin的时候才能成功。
到这里,你已经能够完成nsq在单机上的运行了~
三、多个nsq的单机运行实例
大家可以参考这篇博客,但是你在具体运行这篇博客的代码时,当你运行第二个nsqd时
nsqd --lookupd-tcp-address=127.0.0.1:4160 --tcp-address=0.0.0.0:4152 --http-address=0.0.0.0:4153
会报错,--data-path已经被其他nsqd占用。在这里你需要自己为nsqd指定--data-path.
四、多机运行nsqd
我的开发环境为一台linux服务器(IP:10.224.8.168),一台mac主机(IP:10.95.48.67)
下面我会在服务器上运行:nsqlookupd,一个nsqd,consumser
mac主机上运行:一个nsqd,admin,producer
首先服务器上运行nsqlookupd
./nsqlookupd
服务器运行一个nsqd
./nsqd --lookupd-tcp-address=10.224.8.168:4160 --broadcast-address=10.224.8.168 --data-path=/tmp
本地主机运行一个nsqd
nsqd --lookupd-tcp-address=10.224.8.168:4160 --broadcast-address=10.95.48.67 --data-path=/tmp
本地主机运行nsqadmin
nsqadmin --lookupd-http-address=10.224.8.168:4161
本地主机运行producer,向两个nsqd发送数据
没有go-nsq包的先去get一个nsq包
go get github/nsqio/go-nsq
package main
import ("time""fmt""log""github/nsqio/go-nsq"
)
func main() {err := initConsumer("test1", "test-channel1", "10.224.8.168:4161")if err != nil {log.Fatal("init Consumer error")}err = initConsumer("test2","test-channel2","10.224.8.168:4161")if err != nil {log.Fatal("init Consumer error")}select {}
}
type nsqHandler struct {nsqConsumer *nsq.ConsumermessagesReceived int
}
//处理消息
func (nh *nsqHandler)HandleMessage(msg *nsq.Message) error{nh.messagesReceived++fmt.Printf("receive ID:%s,addr:%s,message:%s",msg.ID, msg.NSQDAddress, string(msg.Body))fmt.Println()return nil
}
func initConsumer(topic, channel, addr string) error {cfg := nsq.NewConfig()cfg.LookupdPollInterval = 3*time.Secondc,err := nsq.NewConsumer(topic,channel,cfg)if err != nil {log.Println("init Consumer NewConsumer error:",err)return err}handler := &nsqHandler{nsqConsumer:c}c.AddHandler(handler)err = c.ConnectToNSQLookupd(addr)if err != nil {log.Println("init Consumer ConnectToNSQLookupd error:",err)return err}return nil
}
服务器运行consumer,从两个nsqd中拉取信息
package main
import ("time""fmt""log""github/nsqio/go-nsq"
)
func main() {err := initConsumer("test1", "test-channel1", "10.224.8.168:4161")if err != nil {log.Fatal("init Consumer error")}err = initConsumer("test2","test-channel2","10.224.8.168:4161")if err != nil {log.Fatal("init Consumer error")}select {}
}
type nsqHandler struct {nsqConsumer *nsq.ConsumermessagesReceived int
}
//处理消息
func (nh *nsqHandler)HandleMessage(msg *nsq.Message) error{nh.messagesReceived++fmt.Printf("receive ID:%s,addr:%s,message:%s",msg.ID, msg.NSQDAddress, string(msg.Body))fmt.Println()return nil
}
func initConsumer(topic, channel, addr string) error {cfg := nsq.NewConfig()cfg.LookupdPollInterval = 3*time.Secondc,err := nsq.NewConsumer(topic,channel,cfg)if err != nil {log.Println("init Consumer NewConsumer error:",err)return err}handler := &nsqHandler{nsqConsumer:c}c.AddHandler(handler)err = c.ConnectToNSQLookupd(addr)if err != nil {log.Println("init Consumer ConnectToNSQLookupd error:",err)return err}return nil
}
最后上传两张图,供以后参考
更多推荐
NSQ使用详解 (MAC端 + Linux端)
发布评论