GO语言使用redis stream队列demo

编程入门 行业动态 更新时间:2024-10-12 18:20:55

GO语言使用redis stream<a href=https://www.elefans.com/category/jswz/34/1771257.html style=队列demo"/>

GO语言使用redis stream队列demo

GO语言使用redis stream队列demo

package mainimport ("context""fmt""github/go-redis/redis/v8""os""os/signal""syscall""time"
)var client *redis.Client
var ctx context.Context
var key = "my_streamKey"       //key
var myConsumer = "my_consumer" //消费者
var group = "my_group"         // 消费者组的名称func main() {Close()Init()go NoAck()for {XAdd()time.Sleep(time.Second * 1)}}
func Close() {// 创建一个通道来接收信号sigCh := make(chan os.Signal, 1)// 监听指定的信号signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)// 启动一个 goroutine 来处理接收到的信号go func() {// 等待信号sig := <-sigChfmt.Println("接收到信号:", sig)// 在这里执行程序关闭前的清理操作// 在这里编写你的程序逻辑err := client.Close()if err != nil {fmt.Println("关闭redis err:", err)return}fmt.Println("关闭redis")// 退出程序os.Exit(0)}()
}
func Init() {// 创建Redis客户端client = redis.NewClient(&redis.Options{Addr:     "127.0.0.1:6379", // Redis服务器地址和端口Password: "123456",          // Redis服务器密码,如果有的话DB:       12,                   // Redis数据库索引PoolSize: 200,                  // 连接池大小})ctx = client.Context()groupInit()// 启动消费者go func() {for {streams, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{Group:    group,              // 消费者组的名称Consumer: myConsumer,         // 消费者的名称Streams:  []string{key, ">"}, // Stream的名称和IDCount:    100,                // 要读取的消息数量Block:    time.Second * 1,    // 阻塞时间,0表示不阻塞}).Result()if err != nil {//fmt.Println("XReadGroup error:", err)continue}for _, stream := range streams {streamName := stream.Streamfor _, message := range stream.Messages {messageID := message.IDmessageValues := message.Valuesfmt.Printf("Stream: %s, Message ID: %s, Values: %v\n", streamName, messageID, messageValues)// 标记消息已经被消费者读取err := client.XAck(ctx, key, group, messageID).Err()if err != nil {fmt.Println("XAck error:", err)return}fmt.Println("消息已经被标记为已读取")}}}}()}func XAdd() {// 发布消息到StreamstreamID, err := client.XAdd(ctx, &redis.XAddArgs{Stream: key, // Stream的名称Values: map[string]interface{}{"key1": "value1","key2": "value2",},}).Result()if err != nil {fmt.Println("XAdd error:", err)return}fmt.Println("Stream ID:", streamID)
}/*
*
读取未被ack的消息
*/
func NoAck() {fmt.Println("开始读取未被ack的消息")streams, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{Group:    group,              // 消费者组的名称Consumer: myConsumer,         // 消费者的名称Streams:  []string{key, ">"}, // Stream的名称和IDCount:    10000,              // 要读取的消息数量NoAck:    true,               // 设置为true,表示读取未被ack的消息Block:    time.Second * 1,    // 阻塞时间,0表示不阻塞}).Result()if err != nil {fmt.Println("没有读取到未被ack的消息 error:", err)return}for _, stream := range streams {streamName := stream.Streamfor _, message := range stream.Messages {messageID := message.IDmessageValues := message.Valuesfmt.Printf("Stream: %s, Message ID: %s, Values: %v\n", streamName, messageID, messageValues)// 标记消息已经被消费者读取err := client.XAck(ctx, key, group, messageID).Err()if err != nil {fmt.Println("XAck error:", err)return}fmt.Println("消息已经被标记为已读取")}}
}
func groupInit() {// 判断key是否存在existsKey, err := client.Exists(ctx, key).Result()if err != nil {fmt.Println("Exists error:", err)return}if existsKey == 1 {fmt.Println("Key存在")// 获取所有消费者组信息groups, err := client.XInfoGroups(ctx, key).Result()if err != nil {fmt.Println("XInfoGroups error:", err)return}// 判断目标消费者组是否存在exists := falsefor _, g := range groups {if g.Name == group {exists = truebreak}}if exists {fmt.Println("消费者组存在")} else {fmt.Println("消费者组不存在")// 创建StreamstreamCreated, err := client.XGroupCreateMkStream(ctx, key, group, "0").Result()if err != nil {fmt.Println("创建消费组 XGroupCreateMkStream error:", err)}fmt.Println("创建消费组:", streamCreated)}} else if existsKey == 0 {fmt.Println("Key不存在")// 创建StreamstreamCreated, err := client.XGroupCreateMkStream(ctx, key, group, "0").Result()if err != nil {fmt.Println("创建消费组 XGroupCreateMkStream error:", err)}fmt.Println("创建消费组:", streamCreated)} else {fmt.Println("Exists返回值异常")}}

更多推荐

GO语言使用redis stream队列demo

本文发布于:2023-11-17 12:16:32,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1643483.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:队列   语言   stream   redis   demo

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!