队列demo"/>
GO语言使用redis stream队列demo
GO语言使用redis stream队列demo
package mainimport ("context""fmt""github/go-redis/redis/v8""os""os/signal""syscall""time"
)var client *redis.Client
var ctx context.Context
var key = "my_streamKey" //key
var myConsumer = "my_consumer" //消费者
var group = "my_group" // 消费者组的名称func main() {Close()Init()go NoAck()for {XAdd()time.Sleep(time.Second * 1)}}
func Close() {// 创建一个通道来接收信号sigCh := make(chan os.Signal, 1)// 监听指定的信号signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)// 启动一个 goroutine 来处理接收到的信号go func() {// 等待信号sig := <-sigChfmt.Println("接收到信号:", sig)// 在这里执行程序关闭前的清理操作// 在这里编写你的程序逻辑err := client.Close()if err != nil {fmt.Println("关闭redis err:", err)return}fmt.Println("关闭redis")// 退出程序os.Exit(0)}()
}
func Init() {// 创建Redis客户端client = redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379", // Redis服务器地址和端口Password: "123456", // Redis服务器密码,如果有的话DB: 12, // Redis数据库索引PoolSize: 200, // 连接池大小})ctx = client.Context()groupInit()// 启动消费者go func() {for {streams, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, // 消费者组的名称Consumer: myConsumer, // 消费者的名称Streams: []string{key, ">"}, // Stream的名称和IDCount: 100, // 要读取的消息数量Block: time.Second * 1, // 阻塞时间,0表示不阻塞}).Result()if err != nil {//fmt.Println("XReadGroup error:", err)continue}for _, stream := range streams {streamName := stream.Streamfor _, message := range stream.Messages {messageID := message.IDmessageValues := message.Valuesfmt.Printf("Stream: %s, Message ID: %s, Values: %v\n", streamName, messageID, messageValues)// 标记消息已经被消费者读取err := client.XAck(ctx, key, group, messageID).Err()if err != nil {fmt.Println("XAck error:", err)return}fmt.Println("消息已经被标记为已读取")}}}}()}func XAdd() {// 发布消息到StreamstreamID, err := client.XAdd(ctx, &redis.XAddArgs{Stream: key, // Stream的名称Values: map[string]interface{}{"key1": "value1","key2": "value2",},}).Result()if err != nil {fmt.Println("XAdd error:", err)return}fmt.Println("Stream ID:", streamID)
}/*
*
读取未被ack的消息
*/
func NoAck() {fmt.Println("开始读取未被ack的消息")streams, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, // 消费者组的名称Consumer: myConsumer, // 消费者的名称Streams: []string{key, ">"}, // Stream的名称和IDCount: 10000, // 要读取的消息数量NoAck: true, // 设置为true,表示读取未被ack的消息Block: time.Second * 1, // 阻塞时间,0表示不阻塞}).Result()if err != nil {fmt.Println("没有读取到未被ack的消息 error:", err)return}for _, stream := range streams {streamName := stream.Streamfor _, message := range stream.Messages {messageID := message.IDmessageValues := message.Valuesfmt.Printf("Stream: %s, Message ID: %s, Values: %v\n", streamName, messageID, messageValues)// 标记消息已经被消费者读取err := client.XAck(ctx, key, group, messageID).Err()if err != nil {fmt.Println("XAck error:", err)return}fmt.Println("消息已经被标记为已读取")}}
}
func groupInit() {// 判断key是否存在existsKey, err := client.Exists(ctx, key).Result()if err != nil {fmt.Println("Exists error:", err)return}if existsKey == 1 {fmt.Println("Key存在")// 获取所有消费者组信息groups, err := client.XInfoGroups(ctx, key).Result()if err != nil {fmt.Println("XInfoGroups error:", err)return}// 判断目标消费者组是否存在exists := falsefor _, g := range groups {if g.Name == group {exists = truebreak}}if exists {fmt.Println("消费者组存在")} else {fmt.Println("消费者组不存在")// 创建StreamstreamCreated, err := client.XGroupCreateMkStream(ctx, key, group, "0").Result()if err != nil {fmt.Println("创建消费组 XGroupCreateMkStream error:", err)}fmt.Println("创建消费组:", streamCreated)}} else if existsKey == 0 {fmt.Println("Key不存在")// 创建StreamstreamCreated, err := client.XGroupCreateMkStream(ctx, key, group, "0").Result()if err != nil {fmt.Println("创建消费组 XGroupCreateMkStream error:", err)}fmt.Println("创建消费组:", streamCreated)} else {fmt.Println("Exists返回值异常")}}
更多推荐
GO语言使用redis stream队列demo
发布评论