GO语言
01GO基础-001GO语言简介
01GO基础-002语言环境安装
01GO基础-003Go 语言结构
01GO基础-004Go 语言基础语法1
01GO基础-004Go 语言基础语法2
01GO基础-004Go 语言基础语法3
01GO基础-005Go 语言数据类型
01GO基础-006Go 语言变量
01GO基础-007Go 语言常量
01GO基础-008Go 语言运算符
01GO基础-009条件语句
01GO基础-010循环语句
01GO基础-011函数
01GO基础-012变量作用域
01GO基础-013数组
01GO基础-014指针
01GO基础-015结构体
01GO基础-016切片
01GO基础-017范围(Range)
01GO基础-018Map
01GO基础-019递归函数
01GO基础-020类型转换
01GO基础-021接口
01GO基础-022异常处理
01GO基础-023并发
01GO基础-024strings
01GO基础-025可变参数
01GO基础-026接口2
01GO基础-027异常处理2
01GO基础-028sync包详解
01GO基础-029Context
02GO进阶001包
02GO进阶002init()函数
02GO进阶003包的注意点
02GO进阶003使用go module导入本地包
02GO进阶004 time包
02GO进阶005 file操作
02GO进阶006 io操作
02GO进阶007 os包(文件 I/O、文件属性、目录与链接、创建和移除链接)
02GO进阶008复制文件
02GO进阶009断点续传
02GO进阶010 bufio包
02GO进阶011ioutil包
02GO进阶012遍历文件夹
02GO进阶013并发编程介绍
02GO进阶014Goroutine协程
02GO进阶015 GPM
02GO进阶016 runtime包
02GO进阶017 Channel
02GO进阶018 Goroutine池
02GO进阶019 定时器
02GO进阶020 select
02GO进阶021并发安全和锁
02GO进阶022sync
02GO进阶023原子操作
02GO进阶024 GMP原理与调度
02GO进阶025爬虫小案例
02GO进阶026 面向对象-匿名字段
02GO进阶026 面向对象-接口
02GO进阶027网络编程-互联网协议介绍
02GO进阶027网络编程-socket
02GO进阶027网络编程-http编程
02GO进阶027网络编程-websocket编程
02GO进阶028数据操作-MYSQL
02GO进阶028数据操作-REDIS
02GO进阶028数据操作-RTCD
02GO进阶028数据操作-ZOOKEEPER
02GO进阶028数据操作-KAFKA
02GO进阶028数据操作-RabbitMQ
02GO进阶028数据操作-ElasticSearch
02GO进阶028数据操作-NSQ
02GO进阶028数据操作-memcached
02GO进阶028数据操作-GORM
02GO进阶029beego框架-安装
02GO进阶029beego框架-快速入门
02GO进阶029beego框架-MVC架构介绍-controller设计-参数配置
02GO进阶029beego框架-MVC架构介绍-controller设计-路由设置
02GO进阶029beego框架-MVC架构介绍-controller设计-控制器函数
02GO进阶029beego框架-MVC架构介绍-controller设计-XSRF过滤
02GO进阶029beego框架-MVC架构介绍-controller设计-请求数据处理
02GO进阶029beego框架-MVC架构介绍-controller设计-Session控制
02GO进阶029beego框架-MVC架构介绍-controller设计-过滤器
02GO进阶029beego框架-MVC架构介绍-controller设计-Flash数据
02GO进阶029beego框架-MVC架构介绍-controller设计-URL构建
02GO进阶029beego框架-MVC架构介绍-controller设计-多种格式数据输出
02GO进阶029beego框架-MVC架构介绍-controller设计-表单数据验证
02GO进阶029beego框架-MVC架构介绍-controller设计-错误处理
02GO进阶029beego框架-MVC架构介绍-controller设计-日志处理
02GO进阶029beego框架-MVC架构介绍-model设计-概述
02GO进阶029beego框架-MVC架构介绍-model设计-CRUD操作
02GO进阶029beego框架-MVC架构介绍-model设计-高级查询
02GO进阶029beego框架-MVC架构介绍-model设计-原生SQL查询
02GO进阶029beego框架-MVC架构介绍-model设计-构造查询
02GO进阶029beego框架-MVC架构介绍-model设计-事务处理
02GO进阶029beego框架-MVC架构介绍-model设计-模型定义
02GO进阶029beego框架-MVC架构介绍-model设计-命令模式
02GO进阶029beego框架-MVC架构介绍-model设计-测试用例
02GO进阶029beego框架-MVC架构介绍-view设计-beego 模板语法指南
02GO进阶029beego框架-MVC架构介绍-view设计-模板处理
02GO进阶029beego框架-MVC架构介绍-view设计-其他
本文档使用 MrDoc 发布
-
+
首页
02GO进阶028数据操作-NSQ
## 安装 Mac安装nsq: 按照安装文档中的说明进行操作。 打开终端: 执行:$ brew install nsq 若:-bash: brew: command not found 执行:$ ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 然后执行:brew install nsq 在一个shell中,开始nsqlookupd: $ nsqlookupd 在另一个shell中,开始nsqd: $ nsqd --lookupd-tcp-address=127.0.0.1:4160 在另一个shell中,开始nsqadmin: $ nsqadmin --lookupd-http-address=127.0.0.1:4161 发布初始消息(也在集群中创建主题): $ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test' 最后,在另一个shell中,开始nsq_to_file: $ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161 发布更多消息nsqd: $ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test' $ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test' 验证事物按预期工作,在Web浏览器中打开http://127.0.0.1:4171/ 以查看nsqadminUI并查看统计信息。另外,检查test.*.log写入的日志文件()的内容/tmp。 这里的重要教训是nsq_to_file(客户端)未明确告知test 主题产生的位置,它从中检索此信息,nsqlookupd并且尽管有连接的时间,但不会丢失任何消息。 ## 生产者 运行Nsq服务集群 首先启动nsqlookud,在一个shell中,开始nsqlookupd: $ nsqlookupd 在另一个shell中,开始nsqd: $ nsqd --lookupd-tcp-address=127.0.0.1:4160 在另一个shell中,开始nsqadmin: $ nsqadmin --lookupd-http-address=127.0.0.1:4161 发布初始消息(也在集群中创建主题): $ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test' 最后,在另一个shell中,开始nsq_to_file: $ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161 验证事物按预期工作,在Web浏览器中打开http://127.0.0.1:4171/ 以查看nsqadminUI并查看统计信息。另外,检查test.*.log写入的日志文件()的内容/tmp。 链接nsq 并创建生产者: ```go package main import ( "fmt" nsq "github.com/nsqio/go-nsq" ) func main() { // 定义nsq生产者 var producer *nsq.Producer // 初始化生产者 // producer, err := nsq.NewProducer("地址:端口", nsq.*Config ) producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig()) if err != nil { panic(err) } err = producer.Ping() if nil != err { // 关闭生产者 producer.Stop() producer = nil } fmt.Println("ping nsq success") } ``` 生产者创建topic并写入nsq: ```go package main import ( "fmt" nsq "github.com/nsqio/go-nsq" ) func main() { // 定义nsq生产者 var producer *nsq.Producer // 初始化生产者 // producer, err := nsq.NewProducer("地址:端口", nsq.*Config ) producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig()) if err != nil { panic(err) } err = producer.Ping() if nil != err { // 关闭生产者 producer.Stop() producer = nil } // 生产者写入nsq,10条消息,topic = "test" topic := "test" for i := 0; i < 10; i++ { message := fmt.Sprintf("message:%d", i) if producer != nil && message != "" { //不能发布空串,否则会导致error err = producer.Publish(topic, []byte(message)) // 发布消息 if err != nil { fmt.Printf("producer.Publish,err : %v", err) } fmt.Println(message) } } fmt.Println("producer.Publish success") } ``` ## 消费者 ```go //Nsq接收测试 package main import ( "fmt" "time" "github.com/nsqio/go-nsq" ) // 消费者 type ConsumerT struct{} // 主函数 func main() { InitConsumer("test", "test-channel", "127.0.0.1:4161") for { time.Sleep(time.Second * 10) } } //处理消息 func (*ConsumerT) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) return nil } //初始化消费者 func InitConsumer(topic string, channel string, address string) { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second //设置重连时间 c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者 if err != nil { panic(err) } c.SetLogger(nil, 0) //屏蔽系统日志 c.AddHandler(&ConsumerT{}) // 添加消费者接口 //建立NSQLookupd连接 if err := c.ConnectToNSQLookupd(address); err != nil { panic(err) } //建立多个nsqd连接 // if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150", "127.0.0.1:4152"}); err != nil { // panic(err) // } // 建立一个nsqd连接 // if err := c.ConnectToNSQD("127.0.0.1:4150"); err != nil { // panic(err) // } } ```
admin
2024年12月25日 14:50
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码