Redis学习记录

编程入门 行业动态 更新时间:2024-10-27 13:25:59

原文链接:https://blog.csdn/qq_43509535/article/details/111880326

文章目录

  • NoSql概述
    • 为什么要用NoSql
    • NoSQL的四大分类
  • Redis入门
    • 概述
    • Linux环境安装redis
    • 测试性能
    • 基础知识
  • 五大数据类型
    • Redis-key
    • String
    • List
    • Set
    • Hash
    • Zset(有序集合)
  • 三种特殊数据类型
    • geospatial地理位置
    • Hyperloglog
    • Bitmap
  • 事务
  • Jedis
    • 常用API
    • 通过Jedis再次理解事物
  • SpringBoot整合Redis
    • RedisUtil
    • 实战记录
      • Spring缓存
      • Redis缓存
      • RedisConfig.java
  • Redis.conf详解
  • Redis持久化
    • RDB(Redis Database)
    • AOF(append only file)
  • Redis发布订阅
  • Redis主从复制
    • **概念**
    • 环境配置
    • 一主二从
    • 层层链路
    • 哨兵模式
  • Redis缓存穿透和雪崩
    • 缓存穿透
      • 布隆过滤器
      • 缓存空对象
    • 缓存击穿
    • 缓存雪崩

NoSql概述

为什么要用NoSql

1、单机MySQL的年代

90年代,一个基本的网站访问量一般不会太大,单个数据库完全足够!

那个时候,更多的去使用静态网页Html~~服务器根本没有太大的压力!

思考一下,这种情况下:整个网站的瓶颈是什么?

1、数据量如果太大、一个机器放不下了!

2、数据的索引(B+ Tree ) ,一个机器内存也放不下

3、访问量(读写混合),一个服务器承受不了~

只要你开始出现以上的三种情况之一,那么你就必须要晋级! |

2、Memcache(缓存) + MySQL + 垂直拆分,读写分离

网站80%的情况都是在读,每次都要去查询数据库的话就十分的麻烦!所以说我们希望减轻数据的压力,我们可以使用缓存来保证效率!

网站80%的情况都是在读,每次都要去查询数据库的话就十分的麻烦!所以说我们希望减轻数据的压力,我们可以使用缓存来保证效率!

发展过程︰优化数据结构和索引–>文件缓存(IO ) —> Memcached (当时最热门的技术)

3、分库分表 + 水平拆分 + MySQL集群

早些年MylSAM:表锁,十分影响效率!高并发下就会出现严重的锁问题

转战Innodb :行锁

慢慢的就开始使用分库分表来解决写的压力!

MySQL在哪个年代推出了表分区!这个并没有多少公司使用!MySQL的集群,很好满足哪个年代的所有需求!

4、如今最近的年代

2010–2020十年之间,世界已经发生了翻天覆地的变化;(定位,也是一种数据,音乐,热榜!)

MySQL等关系型数据库就不够用了!数据量很多,变化很快~!

MySQL有的使用它来存储一些比较大的文件,博客,图片!数据库表很大,效率就低了!如果有一种数据库来专门处理这种数据,

MySQL压力就变得十分小(研究如何处理这些问题!)大数据的IO压力下,表几乎没法更大!

目前一个基本的互联网项目

为什么要用NoSQL

用户的个人信息,社交网络,地理位置。用户自己产生的数据,用户日志等爆发式增长

这时候我们就需要使用NoSQL数据库的,NoSQL可以很好的处理以上情况。

什么是NoSQL

NoSQL= Not Only sQL(不仅仅是SQL )关系型数据库∶表格,行,列

泛指非关系型数据库的,随着web2.0互联网的诞生!传统的关系型数据库很难对付web2.0时代!尤其是超大规模的高并发的社区!暴露出来很多难以克服的问题,

NoSQL在当今大数据环境下发展的十分迅速,Redis是发展最快的,而且是我们当下必须要掌握的一个技术!

很多的数据类型用户的个人信息,社交网络,地理位置。这些数据类型的存储不需要一个固定的格式!不需要多余的操作就可以横向扩展的!Map<String,Object>使用键值对来控制!

NoSQL特点

解耦!

1、方便扩展(数据之间没有关系,很好扩展!)

2、大数据量高性能(Redis 一秒写8万次,读取11万,NoSQL的缓存记录级,是一种细粒度的缓存,性能会比较高!)

3、数据类型是多样型的!(不需要事先设计数据库!随取随用!如果是数据量十分大的表,很多人就无法设计了!)

4、传统RDBMSNoSQL

传统的 RDBMS
-结构化组织
-sQL
-数据和关系都存在单独的表中
-操作操作,数据定义语言
-严格的一致性
-基础的事务
-......
Nosql
-不仅仅是数据
-没有固定的查询语言
-键值对存储,列存储,文档存储,图形数据库(社交关系)
-最终一致性,
-CAP定理和BASE(异地多活)
-高性能,高可用,高可扩
-......

了解3V+3高

  • 大数据时代的3V:主要是描述问题的

    • 海量Volume
    • 多样Variety
    • 实时Velocity
  • 大数据时代的3高:主要是对程序的要求

    • 高并发
    • 高可拓
    • 高性能

NoSQL的四大分类

KV键值对

  • 新浪:Redis
  • 美团:redis+Tair
  • 阿里、百度:Redis+memecache

文档型数据库(bson格式,和json一样)

  • MongoDB
    • MongoDB是一个基于分布式文件存储的数据库,C++编写,主要用来处理大量的文档
    • MongoDB是一个介于关系型数据库和非关系型数据库中间的产品,MongoDB是非关系型数据库中功能最丰富的,最像关系型数据库。
  • ConchDB

列存储数据库

  • HBase
  • 分布式文件系统

图关系数据库

  • 他不是存图形,放的是关系,比如∶朋友圈社交网络,广告推荐
  • Neo4jInfoGrid

四种NoSQL对比

Redis入门

概述

redis是什么?

Redis ( Remote Dictionary Server ),远程字典服务

是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API,是当下最热门的NoSQL技术之一!也被人们称之为结构化数据库!

Redis能干嘛?

1、内存存储,持久化,内存中断电即失,所以说持久化很重要(rdbaof

2、效率高,可以用于高速缓存

3、发布订阅系统

4、地图信息分析

5、计时器、计数器(浏览量!)

6、……

特性

1、多样的数据类型

2、持久化

3、集群

4、事务

Linux环境安装redis

下载linux一次性开发工具

  • 默认的CentOS存储库包含一个名为“Development Tools”的软件包组,其中包括GNU编译器集合,GNU调试器以及编译软件所需的其他开发库和工具。
dnf -y group install "Development Tools"

检测gcc版本,保证高于5.3

gcc --version

将redis压缩包上传到服务器,程序建议放到opt目录下,这里我不移动了

解压压缩包

tar -zxvf redis-6.0.9.tar.gz

把需要的文件全部配置好

make

再次make确认以下

make install

redis的默认安装路径/usr/local/bin/

将redis配置文件拷贝一份放到这个文件夹下

mkdir phzconfig
cp /root/redis-6.0.9/redis.conf redis.conf phzconfig

  • 我们之后就可以用这个配置文件启动

redis默认不是后台启动的,需要修改配置文件

修改为yes

启动redis服务

#回到bin目录
cd /usr/local/bin
#启动服务
redis-server phzconfig/redis.conf
#使用redis客户端进行连接
redis-cli -p 6379

测试

停止redis服务

测试性能

redis-benchmark是官方自带的一个压力测试工具

#测试100个并发连接,100000个请求
redis-benchmark -h localhost -p 6379 -c 100 -n 100000

基础知识

redis默认有16个数据库

默认使用的是第0个

#切换数据库
SELECT 2
#查询数据库大小
DBSIZE
#查看当前数据库中所有的key
keys *
#清空所有数据库的值
FLUSHALL
#清空当前数据库的值
FLUSHDB
  • redis6以前是单线程的,但是6支持了多线程

Redis是很快的,官方表示,Redis是基于内存操作,CPU不是Redis性能瓶颈,Redis的瓶颈是根据机器的内存和网络带宽,既然可以使用单线程来实现,就使用单线程了!

Redis是C语言写的,官方提供的数据为100000+的QPS,完全不比同样是使用key-vale的Memecache差!

  • Redis为什么单线程还这么快?

误区1︰高性能的服务器一定是多线程的?

误区2∶多线程(CPU上下文会切换!)一定比单线程效率高!

CPU>内存>硬盘
核心: redis是将所有的数据全部放在内存中的,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换∶耗时的操作),对于内存系统来说,如果没有上下文切换效率就是最高的!多次读写都是在一个CPU上的,在内存情况下,这个就是最佳的方案!

五大数据类型

Redis-key

#查看key对应value的类型
type name
#获取某个key的值
get name
#查看某个值的剩余存活时间(搭配过期时间)
#time to live
ttl name
#移动某一个key的值(1指的是目标数据库)
move name 1
#给某个值设置过期时间(单位秒)
EXPIRE name 10
#判断某个键是否存在(存在返回1,不存在返回0)
EXISTS name

String

#设置键值对
set name phz

#给某个值追加数据
APPEND name hello word

#获取某个key对应的值的长度
STRLEN age

#自增1
INCR num

#自减一
DECR num

#自定义步长增加
INCRBY num 20

#自定义步长减少
DECRBY num 20

#获取某个值的一部分(闭区间,0,1,2,3)
GETRANGE name 0 3

#给某个值替换其中的部分字段(其实字符下表和替换指定字符串,不用指定长度,替换字符串多长就替换多长)
SETRANGE name 0 haha

#setex (set with expire)设置过期时间
#setne (set if not exists)不存在再设置(在分布式锁中经常使用)
setex name 30 phz#设置name值30s到期,值为phz
setnx name p#如果name不存在再设置p,如果存在设置失败

#批量设置,这里会出现三个键值对
mset k1 v1 k2 v2 k3 v3

#当然也可以在设置的过程中进行判断是否存在
#下面这个例子中k1已经存在了,k4不存在,但并不会新建一个k4,因为这个设置值的过程是具有原子性的操作
msetnx k1 v0 k4 v4

#常规设置对象
set user:1 {name:phz,age:4}
#巧妙设计对象
mset user:1:name phz user:1:age 10 user:2:name p user:2:ag

#批量获取
mget user:1:name user:2:name

#获取马上赋值,如果没有这个db,返回的是(nil),但是这个值已经设置进去了,已经可以直接查询到,如果有这个db,会输出它的值,然后再对其重新赋值
getset db redis

String类似的使用场景: value除了是我们的字符串还可以是我们的数字

List

在redis里面,我们可以把list玩成,栈、队列、阻塞队列

所有的list命令都是以l开头的

#设置list集合
LPUSH list one
LPUSH list two
LPUSH list three

#获取指定区间的list集合中的值,取值是倒序,类似于栈,先进后出
LRANGE list 0 -1#查询全部,也是倒序

#也可以反向插入,类似于双向链表了
RPUSH list zero

#移除命令,当然也分为左移除和右移除
LPOP list#左边的
RPOP list#右边的

#通过下表获取list中某一个值
lindex list 0

#获取list长度
llen list

#移除list中指定的值
lrem list 1 one#如果存在多个相同值的数据,即多个one,那么这个1表示从上到下(从左到右),删除第一次出现的那个值,如果想两个都移除,就填2

#截断某一部分值(取出来)
ltrim list 0 1#把0和1号值取出来

#将list中的部分值拿出来放到新的ontherlist中
rpoplpush list otherlist

#更新list中指定位置的值,必须是存在的值
lset list 0 item

#在list某个位置插入值
linsert list before two three
linsert list after one zero

小结

1、他实际上是一个链表,before Node after , left , right都可以插入值,如果key不存在,创建新的链表

2、如果key存在,新增内容

3、如果移除了所有值,空链表,也代表不存在!

4、在两边插入或者改动值,效率最高!中间元素,相对来说效率会低一点~

5、消息排队!消息队列( Lpush Rpop ) ,栈( Lpush Lpop )

Set

set中的值是不能重复的

#给set赋值
sadd set hello

#获取set中所有的值(无序的)
SMEMBERS set

#判断某一个值是否再set中
SISMEMBER set hello

#获取set集合中元素个数
scard set

#移除set集合中某一个值
srem set hello 

#随机从set中选出一个值
SRANDMEMBER set

#随机移除set集合中的一个值
spop set

#将一个的值移动到指定的另外一个key中,如果这个新的key不存在则自动创建
smove set set2 hello

#假设set1有a,b,c;set2有c,d,e
#输出两个set中不同的元素(差集)
sdiff set1 set2#输出a,b
#输出两个set中相同的元素(交集)
sinter set1 set2#输出c
#输出两个set中所有的元素(并集)
sunion set1 set2#输出a,b,c,d,e

微博,A用户将所有关注的人放在一个set集合中!将它的粉丝也放在一个集合中共同关注,共同爱好,二度好友,推荐好友!(六度分割理论)

Hash

Map集合,key-<key-vlaue>

#给hash设置值
HSET hash name phz

#获取hash中指定键的值
HGET hash name

#同样支持同时设置多个值
HMSET hash age 20 email 1551402789@qq

#获取多个值
HMGET hash age email

#获取全部的值
HGETALL hash 

#删除某一个值
HDEL hash age

#获取hash中保存了多少集合
HLEN hash 

#只获取hash中的key
HKEYS hash

#只获取hash中的value
HVALS hash

#给某个值设置自增
HINCRBY hash age 2

#给某个值设置自减
HINCRBY hash age -1
HDECRBY hash age 2

#如果某个字段不存在设置,存在不设置
HSETNX hash name p

Zset(有序集合)

#自定义排序方式
zadd set 1 one 2 two 3 three
#例如
zadd salary 2500 zhangsan
zadd salary 5000 xiaohong
zadd salary 200 phz
#zrangebyscore salary -inf +inf(通过score排序,在值中处于负无穷大到正无穷大的进行排序,即所有元素,这里如果写0,-1将是空,默认是闭区间,如果要开区间,在对应数前面加一个“(”)
#ZREVRANGE salary 0 -1(这个地方参数就是下标而不是值)
zrange salary 0 -1 
1) "phz"
2) "zhangsan"
3) "xiaohong"

#移除某个元素
zrem salary phz

#获取集合中的个数
zcard salary

#判断某个区间中值的个数
zcount salary 3000 5000

三种特殊数据类型

geospatial地理位置

朋友的定位,附近的人,打车距离计算

Redis的Geo在Redis3.2版本就推出了!

这个功能可以推算地理位置的信息,两地之间的距离,方圆几里的人

可以查询一些测试数据:经纬度查询

#添加地理位置
#规则:地球南北两极无法添加,我们一般是下载城市数据通过java程序导入
GEOADD china:city 116.46 39.92 beijing
GEOADD china:city 121.48 31.22 shanghai
GEOADD china:city 106.54 29.59 chongqing
GEOADD china:city 114.07 22.62 shengzheng
GEOADD china:city 120.19 30.26 hangzhou 
GEOADD china:city 108.95 34.27 xian
# 这种错误就是因为经纬度参数不合法
127.0.0.1:6379> GEOADD china:city 39.92 116.46 beijing
(error) ERR invalid longitude,latitude pair 39.920000,116.460000
#获取指定位置的地理位置信息
GEOPOS china:city beijing chongqing hangzhou
#获取指定两个地点的距离,可指定单位(也可不写,默认m)
    #m表示单位为米。
    #km表示单位为千米。
    #mi表示单位为英里。
    #ft表示单位为英尺。
GEODIST china:city beijing chongqing km
#获取以给定经纬度为中心,找出某一半径内的元素(withdist显示出直线距离,withcoord显示经纬度,count 3 选出符合条件的指定个数的结果,前提是有这么多)
GEORADIUS china:city 110 30 1000 km withdist withcoord count 3
#获取指定成员为中心一定范围内的其他成员
GEORADIUSBYMEMBER china:city xian 1000 km
#该命令将返回11个字符的Geohash字符串!
GEOHASH china:city beijing
"wx4g455wfe0"(二维的经纬度转换为一维的字符串)

由于geospatial底层就是Zset实现的,所以通过Zset命令也可以操作

Hyperloglog

什么是基数?

A(1,3,5,7,8,7)——5

B(1,3,5,7,8)——5

基数(不重复的元素),可以接受误差

简介

Redis Hyperloglog 基数统计的算法

优点∶占用的内存是固定,2*64不同的元素的基数,只需要花费12KB内存,如果要从内存角度来比较的话Hyperloglog首选

网页的UV(一个人访问一个网站多次,但是还是算作一个人)

传统的方式,set 保存用户的id,然后就可以统计set 中的元素数量作为标准判断

这个方式如果保存大量的用户id,就会比较麻烦,我们的目的是为了计数,而不是保存用户id

使用Hyperloglog获取的基数,有0.81%错误率,统计UV任务,可以忽略不计

#往Hyperloglog中添加数据
PFadd key1 a b c d e f
PFadd key2 e f g h i j k 

#获取基数
PFcount key1

#合并两个key(并集,会过滤重复元素)
PFMERGE key3 key1 key2

如果允许容错,就使用Hyperloglog

如果不允许容错,就是用Set或者自己的数据类型

Bitmap

位存储

是否活跃,是否登录,365天打卡,等两个状态的,都可以使用Bitmaps ,这种数据结构,都是操作二进制位来进行记录,就只有0和1两个状态。

365天= 365 bit 1字节= 8bit 46个字节左右

#设置一周打卡值,七天
setbit sign 0 1
setbit sign 1 0
setbit sign 2 1
setbit sign 3 0
setbit sign 4 0
setbit sign 5 1
setbit sign 6 1

#获取某一天是否打开
getbit sign 3

#获取打卡天数(默认全部,后面可跟一个开始结束范围)
bitcount sign

事务

Redis事务本质∶一组命令的集合

一个事务中的所有命令都会被序列化,在事务执行过程的中,会按照顺序执行

一次性、顺序性、排他性,执行一系列的命令

----- 队列set set set执行------

Redis事务没有没有隔离级别的概念

所有的命令在事务中,并没有直接被执行,只有发起执行命令的时候才会执行

Redis单条命令式保存原子性的,但是事务不保证原子性

redis事务:

  • 开启事务(multi)
  • 命令入队(…)
  • 执行事务( exec)

正常执行事务

#执行事务
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> set name phz
QUEUED
127.0.0.1:6379> set age 20
QUEUED
127.0.0.1:6379> get name
QUEUED
127.0.0.1:6379> get age
QUEUED
127.0.0.1:6379> exec
1) OK
2) OK
3) "phz"
4) "20"

#取消事务
DISCARD

编译型异常(代码有问题!命令有错! ),事务中所有的命令都不会被执行!

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> set name phz
QUEUED
127.0.0.1:6379> set age 20
QUEUED
127.0.0.1:6379> sett email 123@163
(error) ERR unknown command `sett`, with args beginning with: `email`, `123@163`, 
127.0.0.1:6379> get name 
QUEUED
127.0.0.1:6379> get age
QUEUED
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.

运行时异常(1/0),如果事务队列中存在语法性,那么执行命令的时候,其他命令是可以正常执行的,错误命令抛出异常!

127.0.0.1:6379> set name phz
OK
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> INCR name
QUEUED
127.0.0.1:6379> set age 20
QUEUED
127.0.0.1:6379> get age
QUEUED
127.0.0.1:6379> exec
1) (error) ERR value is not an integer or out of range
2) OK
3) "20"

监控

悲观锁:

  • 很悲观,什么时候都有可能出现问题,无论做什么都会加锁

乐观锁:

  • 相对悲观锁而言,乐观锁假设认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回用户错误的信息,让用户决定如何去做。
  • 获取version,一般是通过为数据库表增加一个数字类型的 “version” 字段来实现,即为数据增加一个版本标识。当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,判断数据库表对应记录的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新,否则认为是过期数据。

redis的监控watch,相当于是redis的乐观锁

  • 正常情况

    127.0.0.1:6379> set money 200
    OK
    127.0.0.1:6379> set out 0
    OK
    127.0.0.1:6379> WATCH money#可以同时监视多个
    OK
    127.0.0.1:6379> MULTI
    OK
    127.0.0.1:6379> DECRBY money 20
    QUEUED
    127.0.0.1:6379> INCRBY out 20
    QUEUED
    127.0.0.1:6379> exec
    1) (integer) 180
    2) (integer) 20
    
  • 异常情况

    • 终端1
    127.0.0.1:6379> set money 200
    OK
    127.0.0.1:6379> set out 0
    OK
    127.0.0.1:6379> WATCH money
    OK
    127.0.0.1:6379> MULTI
    OK
    127.0.0.1:6379> INCRBY money 20
    QUEUED
    127.0.0.1:6379> DECRBY out 20
    QUEUED
    #这个时候不立即执行事务操作换到终端2上执行修改操作
    #执行完毕这边再提交事务
    127.0.0.1:6379> exec
    (nil)#表示修改失败
    
    • 终端2
    127.0.0.1:6379> get money
    "200"
    127.0.0.1:6379> set money 1000
    OK
    
#如果发现事务执行失败,放弃监视,也就是解锁,重新监视
unwatch#取消监视所有的key
watch money

Jedis

我们要使用Java来操作Redis

什么是jedis?是Redis官方推荐的java连接开发工具,使用]ava操作Redis的中间件。如果你要使用java操作redis,那么一定要对Jedis十分的熟悉!

  • 使用Jedis

第一步:导入依赖

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.3.0</version>
</dependency>

第二步,实例化Jedis对象

Jedis jedis = new Jedis("39.105.43.3", 6379);

连接远程服务器失败

解决方案

  • 第一步:

    找到redis的配置文件,修改requirepass,即访问redis密码

  • 第二步

  • 第三步

    重启redis

    redis-cli -p 6379 shutdown
    redis-server phzconfig/redis.conf
    redis-cli -p 6379
    

    提示输入密码

    auth 123456
    

  • 第四步

    打开安全组配置(如果服务器的防火墙处于打开状态,且没有给他添加授权端口可能会连接失败,要么关闭,要么添加端口)

第三步:给jedis设置访问密码

jedis.auth("123456");

第五步:测试连接

System.out.println(jedis.ping());

常用API

key

@Test
public void testPing() {
    Jedis jedis = new Jedis("39.105.43.3", 6379);
    jedis.auth("123456");

    System.out.println("清空数据:" + jedis.flushDB());
    System.out.println("判断某个键是否存在:" + jedis.exists("username"));
    System.out.println("新增<'username','phz'>的键值对:" + jedis.set("username", "phz"));
    System.out.println("新增<'password','password'>的键值对:" + jedis.set("password", "password"));
    System.out.print("系统中所有的键如下:");
    Set<String> keys = jedis.keys("*");
    System.out.println(keys);

    System.out.println("删除键password: " + jedis.del("password"));
    System.out.println("判断键password是否存在: " + jedis.exists("password"));
    System.out.println("查看键username所存储的值的类型:" + jedis.type("username"));
    System.out.println("随机返回key空间的一个:" + jedis.randomKey());
    System.out.println("重命名key:" + jedis.rename("username", "name"));
    System.out.println("取出改后的name: " + jedis.get("name"));
    System.out.println("按索引查询:" + jedis.select(0));
    System.out.println("删除当前选择数据库中的所有key:" + jedis.flushDB());
    System.out.println("返回当前数据库中key的数目:" + jedis.dbSize());
    System.out.println("删除所有数据库中的所有key:" + jedis.flushAll());
}
================================================执行结果================================================
清空数据:OK
判断某个键是否存在:false
新增<'username','phz'>的键值对:OK
新增<'password','password'>的键值对:OK
系统中所有的键如下:[password, username]
删除键password: 1
判断键password是否存在: false
查看键username所存储的值的类型:string
随机返回key空间的一个:username
重命名key:OK
取出改后的name: phz
按索引查询:OK
删除当前选择数据库中的所有key:OK
返回当前数据库中key的数目:0
删除所有数据库中的所有key:OK

String

@Test
public void testString() {
    Jedis jedis = new Jedis("39.105.43.3", 6379);
    jedis.auth("123456");

    jedis.flushDB();
    System.out.println("=========增加数据==========");
    System.out.println(jedis.set("key1", "value1"));
    System.out.println(jedis.set("key2", "value2"));
    System.out.println(jedis.set("key3", "value3"));
    System.out.println("删除键key2 : " + jedis.del("key2"));
    System.out.println("获取键key2 : " + jedis.get("key2"));
    System.out.println("修改key1 : " + jedis.set("key1", "value1Changed"));
    System.out.println("获取key1的值 :" + jedis.get("key1"));
    System.out.println("在key3后面加入值 : " + jedis.append("key3", "End"));
    System.out.println("key3的值 : " + jedis.get("key3"));
    System.out.println("增加多个键值对 : " + jedis.mset("key01 ", "value01", "key02", "value02", "key03", "value03"));
    System.out.println(" 获取多个键值对 : " + jedis.mget("key01", "key02", "key03"));
    System.out.println("获取多个键值对 : " + jedis.mget("key01 ", " key02", "key03", "key04"));
    System.out.println("删除多个键值对 : " + jedis.del("key01", "key02"));
    System.out.println("获取多个键值对 : " + jedis.mget("key01 ", "key02", "key03"));

    jedis.flushDB();
    System.out.println("==========新增键值对防止覆盖原先值===========");
    System.out.println(jedis.setnx("key1", "value1"));
    System.out.println(jedis.setnx("key2", "value2"));
    System.out.println(jedis.setnx("key2", "value2-new"));
    System.out.println(jedis.get("key1"));
    System.out.println(jedis.get("key2"));

    System.out.println("=========新增键值对并设置有效时间===========");
    System.out.println(jedis.setex("key3", 2, "value3"));
    System.out.println(jedis.get("key3"));
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(jedis.get("key3"));

    System.out.println("===========获取原值,更新为新值=========");
    System.out.println(jedis.getSet("key2", "key2GetSet"));
    System.out.println(jedis.get("key2"));
    System.out.println("获得key2的值的字串:" + jedis.getrange("key2", 2,4));
}
================================================执行结果================================================
=========增加数据==========
OK
OK
OK
删除键key2 : 1
获取键key2 : null
修改key1 : OK
获取key1的值 :value1Changed
在key3后面加入值 : 9
key3的值 : value3End
增加多个键值对 : OK
 获取多个键值对 : [null, value02, value03]
获取多个键值对 : [value01, null, value03, null]
删除多个键值对 : 1
获取多个键值对 : [value01, null, value03]
==========新增键值对防止覆盖原先值===========
1
1
0
value1
value2
=========新增键值对并设置有效时间===========
OK
value3
null
===========获取原值,更新为新值=========
value2
key2GetSet
获得key2的值的字串:y2G

List

@Test
public void testList() {
    Jedis jedis = new Jedis("39.105.43.3", 6379);
    jedis.auth("123456");
    jedis.flushDB();
    System.out.println("========添加一个list=========");
    jedis.lpush("collections", "ArrayList", "Vector", "Stack", "HashMap", "WleakHashMap", "LinkedHlashNap");
    jedis.lpush("collections", "HashSet");
    jedis.lpush("collections", "TreeSet");
    jedis.lpush("col1ections", "TreeMap");
    System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1));//-1代表倒数第一个元素-2 代表倒数第二个元素
    System.out.println("collections区间e-3的元素:" + jedis.lrange("collections", 0, 3));
    System.out.println("==============================");
    //删除列表指定的值,第二个参数为删除的个数(有重复时),后add进去的值先被删,类似于出栈
    System.out.println("删除指定元素个数:" + jedis.lrem("collections", 2, "HashMap"));
    System.out.println(" collections的内容:" + jedis.lrange("collections", 0, -1));
    System.out.println("删除下表e-3区间之外的元素:" + jedis.ltrim("collections", 0, 3));
    System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1));
    System.out.println(" collections列表出栈(左端): " + jedis.lpop("collections"));
    System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1));
    System.out.println("collections添加元素,从列表右端,与lpush相对应。" + jedis.rpush("collections", "EnumMap"));
    System.out.println("collections的内容:" + jedis.lrange("collections", 0, -1));
    System.out.println("collections列表出栈(右端):" + jedis.rpop("collections"));
    System.out.println("collections的内容: " + jedis.lrange("collections", 0, -1));
    System.out.println("修改collections指定下标1的内容:" + jedis.lset("collections", 1, "LinkedArrayList"));
    System.out.println(" collections的内容:" + jedis.lrange("collections", 0, -1));
    System.out.println("==============================");
    System.out.println("collections的长度:" + jedis.llen("collections"));
    System.out.println("获取collections下标为2的元素:" + jedis.lindex("collections", 2));
    System.out.println("==============================");
    jedis.lpush("sortedList", "3", "6", "2", "0", "7", "4");
    System.out.println("sortedList排序前:" + jedis.lrange("sortedList", 0, -1));
    System.out.println(jedis.sort("sortedList"));
    System.out.println(" sortedList排序后: " + jedis.lrange("sortedList", 0, -1));
}
================================================执行结果================================================
========添加一个list=========
collections的内容:[TreeSet, HashSet, LinkedHlashNap, WleakHashMap, HashMap, Stack, Vector, ArrayList]
collections区间e-3的元素:[TreeSet, HashSet, LinkedHlashNap, WleakHashMap]
==============================
删除指定元素个数:1
 collections的内容:[TreeSet, HashSet, LinkedHlashNap, WleakHashMap, Stack, Vector, ArrayList]
删除下表e-3区间之外的元素:OK
collections的内容:[TreeSet, HashSet, LinkedHlashNap, WleakHashMap]
 collections列表出栈(左端): TreeSet
collections的内容:[HashSet, LinkedHlashNap, WleakHashMap]
collections添加元素,从列表右端,与lpush相对应。4
collections的内容:[HashSet, LinkedHlashNap, WleakHashMap, EnumMap]
collections列表出栈(右端):EnumMap
collections的内容: [HashSet, LinkedHlashNap, WleakHashMap]
修改collections指定下标1的内容:OK
 collections的内容:[HashSet, LinkedArrayList, WleakHashMap]
==============================
collections的长度:3
获取collections下标为2的元素:WleakHashMap
==============================
sortedList排序前:[4, 7, 0, 2, 6, 3]
[0, 2, 3, 4, 6, 7]
sortedList排序后: [4, 7, 0, 2, 6, 3]

Set

@Test
public void TestSet() {
    Jedis jedis = new Jedis("39.105.43.3", 6379);
    jedis.auth("123456");
    jedis.flushDB();
    System.out.println("========向集合中添加元素(不重复)=======");
    System.out.println(jedis.sadd("eleSet", "e1", "e2", " e4", " e3", " e0", " e8", " e7", "e5"));
    System.out.println(jedis.sadd("eleSet", "e6"));
    System.out.println(jedis.sadd("eleSet", "e6"));
    System.out.println("eleSet的所有元素为:" + jedis.smembers("eleSet"));
    System.out.println("删除一个元素e0:" + jedis.srem("eleSet", "e0"));
    System.out.println("eleSet的所有元素为: " + jedis.smembers("e1eSet"));
    System.out.println("删除两个元素e7和e6: " + jedis.srem("eleSet", "e7", " e6"));
    System.out.println("eleSet的所有元素为: " + jedis.smembers("eleSet"));
    System.out.println("随机的移除集合中的一个元素: " + jedis.spop("eleSet"));
    System.out.println("随机的移除集合中的一个元素: " + jedis.spop("eleSet"));
    System.out.println("eleSet的所有元素为: " + jedis.smembers("eleSet"));
    System.out.println("eleSet中包含元素的个数:" + jedis.scard("eleSet"));
    System.out.println("e3是否在eleSet中: " + jedis.sismember("eleSet", "e3"));
    System.out.println("e1是否在eleSet中: " + jedis.sismember("eleSet", "e1"));
    System.out.println("el是否在eleSet中: " + jedis.sismember("eleSet", "e5"));
    System.out.println("=====================");
    System.out.println(jedis.sadd("eleSet1", "e1", "e2", " e4", " e3", "e0", " e8", "e7", "e5"));
    System.out.println(jedis.sadd("eleSet2", "e1", "e2", " e4", "e3", "e0", "e8"));
    System.out.println("将e1eSet1中删除e1并存入eleSet3中:" + jedis.smove("eleSet1", "elset.", "e1"));//移到集合元素
    System.out.println("将e1eSet1中删除e2并存入eleSet3中: " + jedis.smove("eleSet1", "eleSet3", " e2"));
    System.out.println("eleSet1中的元素: " + jedis.smembers("eleSet1"));
    System.out.println("eleSet3中的元素:" + jedis.smembers("eleSet3"));
    System.out.println("===========集合运算==========");
    System.out.println("eleSet1中的元素:" + jedis.smembers("eleSet1"));
    System.out.println("eleSet2中的元素。" + jedis.smembers("eleSet2"));
    System.out.println("eleSet1和eleSet2的交集: " + jedis.sinter("eleSet1", "eleSet2"));
    System.out.println("eleSet1和eleSet2的并集: " + jedis.sunion("eleSet1", "eleSet2"));
    System.out.println("eleSet1和eleSet2的差集: " + jedis.sdiff("eleSet1", "eleSet2"));//eLeset1中有,eleSet2中没有
    jedis.sinterstore("eleSet4", "eleSet1", "eleSet2");//求交集并将交集保存到dsthey的集合
    System.out.println("eleset4中的元素:" + jedis.smembers("eleSet4"));
}
================================================执行结果================================================
========向集合中添加元素(不重复)=======
8
1
0
eleSet的所有元素为:[ e7,  e3,  e8, e2,  e4, e5, e1,  e0, e6]
删除一个元素e0:0
eleSet的所有元素为: []
删除两个元素e7和e6: 0
eleSet的所有元素为: [e5, e1,  e0,  e7,  e3,  e4, e2,  e8, e6]
随机的移除集合中的一个元素: e6
随机的移除集合中的一个元素: e2
eleSet的所有元素为: [ e0,  e7,  e3,  e4,  e8, e5, e1]
eleSet中包含元素的个数:7
e3是否在eleSet中: false
e1是否在eleSet中: true
el是否在eleSet中: true
=====================
8
6
将e1eSet1中删除e1并存入eleSet3中:1
将e1eSet1中删除e2并存入eleSet3中: 0
eleSet1中的元素: [e0, e7,  e3,  e8, e2,  e4, e5]
eleSet3中的元素:[]
===========集合运算==========
eleSet1中的元素:[e0, e7,  e3,  e8, e2,  e4, e5]
eleSet2中的元素。[e3, e1, e0, e8, e2,  e4]
eleSet1和eleSet2的交集: [e0, e2,  e4]
eleSet1和eleSet2的并集: [ e3, e8, e2,  e4,  e8, e5, e0, e7, e3, e1]
eleSet1和eleSet2的差集: [e5,  e3, e7,  e8]
eleset4中的元素:[e0,  e4, e2]

Hash

@Test
public void testHash() {
    Jedis jedis = new Jedis("39.105.43.3", 6379);
    jedis.auth("123456");
    jedis.flushDB();
    Map<String, String> map = new HashMap<>();
    map.put("key1", "value1");
    map.put("key2", "value2");
    map.put("key3", "value3");
    map.put("key4", "value4");
    //添加名称为hash(hey) 的hash元素
    jedis.hmset("hash", map);
    //向名称为hash的hash中添加key为key5. value为value5元素
    jedis.hset("hash", "key5", "value5");
    System.out.println("散列hash的所有键值对为:" + jedis.hgetAll("hash")); //return Map<String,String>
    System.out.println("散列hash的所有键为:" + jedis.hkeys("hash")); //return Set<String>
    System.out.println("散列hash的所有值为:" + jedis.hvals("hash")); //return List < String >
    System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加key6:" + jedis.hincrBy("hash", "key6", 1));
    System.out.println("散列hash的所有键值对为: " + jedis.hgetAll("hash"));
    System.out.println("将key6保存的值加上一个整数,如果key6不存在则添加key6:" + jedis.hincrBy("hash", "key6", 1));
    System.out.println("散列hash的所有键值对为: " + jedis.hgetAll("hash"));
    System.out.println("删除一个或者多个键值对:" + jedis.hdel(" hash ", " key2 "));
    System.out.println("散列hash的所有键值对为:" + jedis.hgetAll(" hash "));
    System.out.println("散列hash中键值对的个数: " + jedis.hlen("hash"));
    System.out.println("判断hash中是否存在key2: " + jedis.hexists("hash", "key2"));
    System.out.println("判断hash中是否存在key3:" + jedis.hexists(" hash ", " key3 "));
    System.out.println("获取hash中的值:" + jedis.hmget(" hash ", " key3 "));
    System.out.println("获取hash中的值:" + jedis.hmget("hash", "key3", "key4"));
}
================================================执行结果================================================
散列hash的所有键值对为:{key1=value1, key2=value2, key5=value5, key3=value3, key4=value4}
散列hash的所有键为:[key1, key2, key5, key3, key4]
散列hash的所有值为:[value1, value4, value2, value3, value5]
将key6保存的值加上一个整数,如果key6不存在则添加key6:1
散列hash的所有键值对为: {key1=value1, key2=value2, key5=value5, key6=1, key3=value3, key4=value4}
将key6保存的值加上一个整数,如果key6不存在则添加key6:2
散列hash的所有键值对为: {key1=value1, key2=value2, key5=value5, key6=2, key3=value3, key4=value4}
删除一个或者多个键值对:0
散列hash的所有键值对为:{}
散列hash中键值对的个数: 6
判断hash中是否存在key2: true
判断hash中是否存在key3:false
获取hash中的值:[null]
获取hash中的值:[value3, value4]

通过Jedis再次理解事物

@Test
public void testTransaction() {
    Jedis jedis = new Jedis("39.105.43.3", 6379);
    jedis.auth("123456");
    jedis.flushDB();
    JSONObject jsonobject = new JSONObject();
    jsonobject.put("he1lo", "wor1d");
    jsonobject.put("name", "phz");//开启事务
    Transaction multi = jedis.multi();
    String result = jsonobject.toJSONString();
    try {
        multi.set("user1", result);
        int i = 1/0;//代码抛出异常,执行失败
        multi.set("user2", result);
        multi.exec();//执行事务
    } catch (Exception e) {
        multi.discard();// 放弃事务
        e.printStackTrace();
    } finally {
        System.out.println(jedis.get("user1"));
        System.out.println(jedis.get("user2"));
        jedis.close();//关闭连接
    }
}

SpringBoot整合Redis

说明︰在SpringBoot2.x之后,原来使用的jedis被替换为了lettuce?

jedis :采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用jedis pool连接池!更像BIO模式

lettuce :采用netty ,实例可以在多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据了,更像NIO模式

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

源码分析

//如果不存在这个bean才生效,所以我们可以自己定义一个RedisTemplate来替换
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    //默认的RedisTemplate没有过多的配置,redis对象都需要被序列化
    //两个泛型都是Object类型,我们在使用的时候都需要强制转换<String,Object>
    RedisTemplate<Object, Object> template = new RedisTemplate();
    template.setConnectionFactory(redisConnectionFactory);
    return template;
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
//由于String是redis中常用的数据类型,所以单独提取出来一个String的bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
    StringRedisTemplate template = new StringRedisTemplate();
    template.setConnectionFactory(redisConnectionFactory);
    return template;
}

如何配置redis?

如上图,所有redis配置类中定义的方法都可以被配置

spring.redis.host=39.105.43.3
spring.redis.port=6379
spring.redis.password=123456

配置完成以后我们就可以到测试类中探索一下springboot如何实现redis中的方法的

基本的数据操作可以直接用redisTemplate进行操作

对数据库的操作我们可以获取一个redis数据库连接对象

简单测试

@Test
void contextLoads() {
    RedisConnection connection = Objects.requireNonNull(redisTemplate.getConnectionFactory()).getConnection();
    connection.flushDb();
    redisTemplate.opsForValue().set("name","彭焕智");
    System.out.println(redisTemplate.opsForValue().get("name"));
}

但是我们观察服务端的key信息发现

前面有一堆看不懂的乱码,即我们设置的key为name,但是我们存储的时候却并不是name,这就是因为我们redis所有的操作都应该要序列化才行,

查看序列化源码发现默认使用的jdk序列化

所以我们就需要自己定义一个自己的RedisTemplate了,以下给出官方的RedisTemplate,我们直接仿写

@Bean
@ConditionalOnMissingBean(
    name = {"redisTemplate"}
)
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    RedisTemplate<Object, Object> template = new RedisTemplate();
    template.setConnectionFactory(redisConnectionFactory);
    return template;
}
/**
 * @author PengHuAnZhi
 * @createTime 2020/12/4 7:58
 * @projectName HaveFun
 * @className RedisConfig.java
 * @description TODO
 */
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
    //固定模板,一般企业中可以直接使用
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        //实际开发<String, Object>用的很多,所以就这里指定
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

        //序列化配置
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
        Jackson2JsonRedisSerializer<Object> objectJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        objectJackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        template.setKeySerializer(objectJackson2JsonRedisSerializer);

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        //key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //value采用String的序列化方式
        template.setValueSerializer(objectJackson2JsonRedisSerializer);
        //hash的也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        //hash的值采用jackson的序列化方式
        template.setHashValueSerializer(objectJackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    /**
     * 配置一个CacheManager才能使用@Cacheable等注解
     */
    @Bean
    public CacheManager cacheManager(RedisConnectionFactory connectionFactory) {
        Jackson2JsonRedisSerializer<Object> objectJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        //生成一个默认配置,通过config对象即可对缓存进行自定义配置
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
        config = config
                // 设置 key为string序列化
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
                // 设置value为json序列化
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(objectJackson2JsonRedisSerializer))
                // 不缓存空值
                .disableCachingNullValues()
                // 设置缓存的默认过期时间 60分钟
                .entryTtl(Duration.ofHours(1L));

        //特殊缓存空间应用不同的配置
        Map<String, RedisCacheConfiguration> map = new HashMap<>();
        //将轮播图地址设置永不过期,因为基本不会改动它
        map.put("activity_constant", config.entryTtl(Duration.ofMinutes(-1L)));//provider1缓存空间过期时间 30分钟

        //使用自定义的缓存配置初始化一个RedisCacheManager

        return RedisCacheManager.builder(connectionFactory)
                .cacheDefaults(config) //默认配置
                .withInitialCacheConfigurations(map) //特殊缓存
                .transactionAware() //事务
                .build();
    }
}

这个时候我们再进行操作

@Test
void test() throws JsonProcessingException {
    //真实开发环境中我们存数据不是以对象存储,而是以json串来存储,而且对象都需要实现序列化
    User user = new User("彭焕智", 21);
    String jsonUser = new ObjectMapper().writeValueAsString(user);
    redisTemplate.opsForValue().set("user",jsonUser);
    System.out.println(redisTemplate.opsForValue().get("user"));
}

RedisUtil

package com.phz;

import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.TimeUnit;


/**
 * Redis工具类
 *
 */
@Component
public class RedisUtil {

    @Resource
    private StringRedisTemplate redisTemplate;

    public void setRedisTemplate(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public StringRedisTemplate getRedisTemplate() {
        return this.redisTemplate;
    }


    /** -------------------key相关操作--------------------- */

    /**
     * 删除key
     *
     * @param key
     */
    public void delete(String key) {
        redisTemplate.delete(key);
    }

    /**
     * 批量删除key
     *
     * @param keys
     */
    public void delete(Collection<String> keys) {
        redisTemplate.delete(keys);
    }

    /**
     * 序列化key
     *
     * @param key
     * @return
     */
    public byte[] dump(String key) {
        return redisTemplate.dump(key);
    }

    /**
     * 是否存在key
     *
     * @param key
     * @return
     */
    public Boolean hasKey(String key) {
        return redisTemplate.hasKey(key);
    }

    /**
     * 设置过期时间
     *
     * @param key
     * @param timeout
     * @param unit
     * @return
     */
    public Boolean expire(String key, long timeout, TimeUnit unit) {
        return redisTemplate.expire(key, timeout, unit);
    }

    /**
     * 设置过期时间
     *
     * @param key
     * @param date
     * @return
     */
    public Boolean expireAt(String key, Date date) {
        return redisTemplate.expireAt(key, date);
    }

    /**
     * 查找匹配的key
     *
     * @param pattern
     * @return
     */
    public Set<String> keys(String pattern) {
        return redisTemplate.keys(pattern);
    }

    /**
     * 将当前数据库的 key 移动到给定的数据库 db 当中
     *
     * @param key
     * @param dbIndex
     * @return
     */
    public Boolean move(String key, int dbIndex) {
        return redisTemplate.move(key, dbIndex);
    }

    /**
     * 移除 key 的过期时间,key 将持久保持
     *
     * @param key
     * @return
     */
    public Boolean persist(String key) {
        return redisTemplate.persist(key);
    }

    /**
     * 返回 key 的剩余的过期时间
     *
     * @param key
     * @param unit
     * @return
     */
    public Long getExpire(String key, TimeUnit unit) {
        return redisTemplate.getExpire(key, unit);
    }

    /**
     * 返回 key 的剩余的过期时间
     *
     * @param key
     * @return
     */
    public Long getExpire(String key) {
        return redisTemplate.getExpire(key);
    }

    /**
     * 从当前数据库中随机返回一个 key
     *
     * @return
     */
    public String randomKey() {
        return redisTemplate.randomKey();
    }

    /**
     * 修改 key 的名称
     *
     * @param oldKey
     * @param newKey
     */
    public void rename(String oldKey, String newKey) {
        redisTemplate.rename(oldKey, newKey);
    }

    /**
     * 仅当 newkey 不存在时,将 oldKey 改名为 newkey
     *
     * @param oldKey
     * @param newKey
     * @return
     */
    public Boolean renameIfAbsent(String oldKey, String newKey) {
        return redisTemplate.renameIfAbsent(oldKey, newKey);
    }

    /**
     * 返回 key 所储存的值的类型
     *
     * @param key
     * @return
     */
    public DataType type(String key) {
        return redisTemplate.type(key);
    }

    /** -------------------string相关操作--------------------- */

    /**
     * 设置指定 key 的值
     * @param key
     * @param value
     */
    public void set(String key, String value) {
        redisTemplate.opsForValue().set(key, value);
    }

    /**
     * 获取指定 key 的值
     * @param key
     * @return
     */
    public String get(String key) {
        return redisTemplate.opsForValue().get(key);
    }

    /**
     * 返回 key 中字符串值的子字符
     * @param key
     * @param start
     * @param end
     * @return
     */
    public String getRange(String key, long start, long end) {
        return redisTemplate.opsForValue().get(key, start, end);
    }

    /**
     * 将给定 key 的值设为 value ,并返回 key 的旧值(old value)
     *
     * @param key
     * @param value
     * @return
     */
    public String getAndSet(String key, String value) {
        return redisTemplate.opsForValue().getAndSet(key, value);
    }

    /**
     * 对 key 所储存的字符串值,获取指定偏移量上的位(bit)
     *
     * @param key
     * @param offset
     * @return
     */
    public Boolean getBit(String key, long offset) {
        return redisTemplate.opsForValue().getBit(key, offset);
    }

    /**
     * 批量获取
     *
     * @param keys
     * @return
     */
    public List<String> multiGet(Collection<String> keys) {
        return redisTemplate.opsForValue().multiGet(keys);
    }

    /**
     * 设置ASCII码, 字符串'a'的ASCII码是97, 转为二进制是'01100001', 此方法是将二进制第offset位值变为value
     *
     * @param key
     * @param offset
     *            位置
     * @param value
     *            值,true为1, false为0
     * @return
     */
    public boolean setBit(String key, long offset, boolean value) {
        return redisTemplate.opsForValue().setBit(key, offset, value);
    }

    /**
     * 将值 value 关联到 key ,并将 key 的过期时间设为 timeout
     *
     * @param key
     * @param value
     * @param timeout
     *            过期时间
     * @param unit
     *            时间单位, 天:TimeUnit.DAYS 小时:TimeUnit.HOURS 分钟:TimeUnit.MINUTES
     *            秒:TimeUnit.SECONDS 毫秒:TimeUnit.MILLISECONDS
     */
    public void setEx(String key, String value, long timeout, TimeUnit unit) {
        redisTemplate.opsForValue().set(key, value, timeout, unit);
    }

    /**
     * 只有在 key 不存在时设置 key 的值
     *
     * @param key
     * @param value
     * @return 之前已经存在返回false,不存在返回true
     */
    public boolean setIfAbsent(String key, String value) {
        return redisTemplate.opsForValue().setIfAbsent(key, value);
    }

    /**
     * 用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始
     *
     * @param key
     * @param value
     * @param offset
     *            从指定位置开始覆写
     */
    public void setRange(String key, String value, long offset) {
        redisTemplate.opsForValue().set(key, value, offset);
    }

    /**
     * 获取字符串的长度
     *
     * @param key
     * @return
     */
    public Long size(String key) {
        return redisTemplate.opsForValue().size(key);
    }

    /**
     * 批量添加
     *
     * @param maps
     */
    public void multiSet(Map<String, String> maps) {
        redisTemplate.opsForValue().multiSet(maps);
    }

    /**
     * 同时设置一个或多个 key-value 对,当且仅当所有给定 key 都不存在
     *
     * @param maps
     * @return 之前已经存在返回false,不存在返回true
     */
    public boolean multiSetIfAbsent(Map<String, String> maps) {
        return redisTemplate.opsForValue().multiSetIfAbsent(maps);
    }

    /**
     * 增加(自增长), 负数则为自减
     *
     * @param key
     * @param increment
     * @return
     */
    public Long incrBy(String key, long increment) {
        return redisTemplate.opsForValue().increment(key, increment);
    }

    /**
     *
     * @param key
     * @param increment
     * @return
     */
    public Double incrByFloat(String key, double increment) {
        return redisTemplate.opsForValue().increment(key, increment);
    }

    /**
     * 追加到末尾
     *
     * @param key
     * @param value
     * @return
     */
    public Integer append(String key, String value) {
        return redisTemplate.opsForValue().append(key, value);
    }

    /** -------------------hash相关操作------------------------- */

    /**
     * 获取存储在哈希表中指定字段的值
     *
     * @param key
     * @param field
     * @return
     */
    public Object hGet(String key, String field) {
        return redisTemplate.opsForHash().get(key, field);
    }

    /**
     * 获取所有给定字段的值
     *
     * @param key
     * @return
     */
    public Map<Object, Object> hGetAll(String key) {
        return redisTemplate.opsForHash().entries(key);
    }

    /**
     * 获取所有给定字段的值
     *
     * @param key
     * @param fields
     * @return
     */
    public List<Object> hMultiGet(String key, Collection<Object> fields) {
        return redisTemplate.opsForHash().multiGet(key, fields);
    }

    public void hPut(String key, String hashKey, String value) {
        redisTemplate.opsForHash().put(key, hashKey, value);
    }

    public void hPutAll(String key, Map<String, String> maps) {
        redisTemplate.opsForHash().putAll(key, maps);
    }

    /**
     * 仅当hashKey不存在时才设置
     *
     * @param key
     * @param hashKey
     * @param value
     * @return
     */
    public Boolean hPutIfAbsent(String key, String hashKey, String value) {
        return redisTemplate.opsForHash().putIfAbsent(key, hashKey, value);
    }

    /**
     * 删除一个或多个哈希表字段
     *
     * @param key
     * @param fields
     * @return
     */
    public Long hDelete(String key, Object... fields) {
        return redisTemplate.opsForHash().delete(key, fields);
    }

    /**
     * 查看哈希表 key 中,指定的字段是否存在
     *
     * @param key
     * @param field
     * @return
     */
    public boolean hExists(String key, String field) {
        return redisTemplate.opsForHash().hasKey(key, field);
    }

    /**
     * 为哈希表 key 中的指定字段的整数值加上增量 increment
     *
     * @param key
     * @param field
     * @param increment
     * @return
     */
    public Long hIncrBy(String key, Object field, long increment) {
        return redisTemplate.opsForHash().increment(key, field, increment);
    }

    /**
     * 为哈希表 key 中的指定字段的整数值加上增量 increment
     *
     * @param key
     * @param field
     * @param delta
     * @return
     */
    public Double hIncrByFloat(String key, Object field, double delta) {
        return redisTemplate.opsForHash().increment(key, field, delta);
    }

    /**
     * 获取所有哈希表中的字段
     *
     * @param key
     * @return
     */
    public Set<Object> hKeys(String key) {
        return redisTemplate.opsForHash().keys(key);
    }

    /**
     * 获取哈希表中字段的数量
     *
     * @param key
     * @return
     */
    public Long hSize(String key) {
        return redisTemplate.opsForHash().size(key);
    }

    /**
     * 获取哈希表中所有值
     *
     * @param key
     * @return
     */
    public List<Object> hValues(String key) {
        return redisTemplate.opsForHash().values(key);
    }

    /**
     * 迭代哈希表中的键值对
     *
     * @param key
     * @param options
     * @return
     */
    public Cursor<Map.Entry<Object, Object>> hScan(String key, ScanOptions options) {
        return redisTemplate.opsForHash().scan(key, options);
    }

    /** ------------------------list相关操作---------------------------- */

    /**
     * 通过索引获取列表中的元素
     *
     * @param key
     * @param index
     * @return
     */
    public String lIndex(String key, long index) {
        return redisTemplate.opsForList().index(key, index);
    }

    /**
     * 获取列表指定范围内的元素
     *
     * @param key
     * @param start
     *            开始位置, 0是开始位置
     * @param end
     *            结束位置, -1返回所有
     * @return
     */
    public List<String> lRange(String key, long start, long end) {
        return redisTemplate.opsForList().range(key, start, end);
    }

    /**
     * 存储在list头部
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPush(String key, String value) {
        return redisTemplate.opsForList().leftPush(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPushAll(String key, String... value) {
        return redisTemplate.opsForList().leftPushAll(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPushAll(String key, Collection<String> value) {
        return redisTemplate.opsForList().leftPushAll(key, value);
    }

    /**
     * 当list存在的时候才加入
     *
     * @param key
     * @param value
     * @return
     */
    public Long lLeftPushIfPresent(String key, String value) {
        return redisTemplate.opsForList().leftPushIfPresent(key, value);
    }

    /**
     * 如果pivot存在,再pivot前面添加
     *
     * @param key
     * @param pivot
     * @param value
     * @return
     */
    public Long lLeftPush(String key, String pivot, String value) {
        return redisTemplate.opsForList().leftPush(key, pivot, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPush(String key, String value) {
        return redisTemplate.opsForList().rightPush(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPushAll(String key, String... value) {
        return redisTemplate.opsForList().rightPushAll(key, value);
    }

    /**
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPushAll(String key, Collection<String> value) {
        return redisTemplate.opsForList().rightPushAll(key, value);
    }

    /**
     * 为已存在的列表添加值
     *
     * @param key
     * @param value
     * @return
     */
    public Long lRightPushIfPresent(String key, String value) {
        return redisTemplate.opsForList().rightPushIfPresent(key, value);
    }

    /**
     * 在pivot元素的右边添加值
     *
     * @param key
     * @param pivot
     * @param value
     * @return
     */
    public Long lRightPush(String key, String pivot, String value) {
        return redisTemplate.opsForList().rightPush(key, pivot, value);
    }

    /**
     * 通过索引设置列表元素的值
     *
     * @param key
     * @param index
     *            位置
     * @param value
     */
    public void lSet(String key, long index, String value) {
        redisTemplate.opsForList().set(key, index, value);
    }

    /**
     * 移出并获取列表的第一个元素
     *
     * @param key
     * @return 删除的元素
     */
    public String lLeftPop(String key) {
        return redisTemplate.opsForList().leftPop(key);
    }

    /**
     * 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
     *
     * @param key
     * @param timeout
     *            等待时间
     * @param unit
     *            时间单位
     * @return
     */
    public String lBLeftPop(String key, long timeout, TimeUnit unit) {
        return redisTemplate.opsForList().leftPop(key, timeout, unit);
    }

    /**
     * 移除并获取列表最后一个元素
     *
     * @param key
     * @return 删除的元素
     */
    public String lRightPop(String key) {
        return redisTemplate.opsForList().rightPop(key);
    }

    /**
     * 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
     *
     * @param key
     * @param timeout
     *            等待时间
     * @param unit
     *            时间单位
     * @return
     */
    public String lBRightPop(String key, long timeout, TimeUnit unit) {
        return redisTemplate.opsForList().rightPop(key, timeout, unit);
    }

    /**
     * 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
     *
     * @param sourceKey
     * @param destinationKey
     * @return
     */
    public String lRightPopAndLeftPush(String sourceKey, String destinationKey) {
        return redisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
                destinationKey);
    }

    /**
     * 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止
     *
     * @param sourceKey
     * @param destinationKey
     * @param timeout
     * @param unit
     * @return
     */
    public String lBRightPopAndLeftPush(String sourceKey, String destinationKey,
                                        long timeout, TimeUnit unit) {
        return redisTemplate.opsForList().rightPopAndLeftPush(sourceKey,
                destinationKey, timeout, unit);
    }

    /**
     * 删除集合中值等于value得元素
     *
     * @param key
     * @param index
     *            index=0, 删除所有值等于value的元素; index>0, 从头部开始删除第一个值等于value的元素;
     *            index<0, 从尾部开始删除第一个值等于value的元素;
     * @param value
     * @return
     */
    public Long lRemove(String key, long index, String value) {
        return redisTemplate.opsForList().remove(key, index, value);
    }

    /**
     * 裁剪list
     *
     * @param key
     * @param start
     * @param end
     */
    public void lTrim(String key, long start, long end) {
        redisTemplate.opsForList().trim(key, start, end);
    }

    /**
     * 获取列表长度
     *
     * @param key
     * @return
     */
    public Long lLen(String key) {
        return redisTemplate.opsForList().size(key);
    }

    /** --------------------set相关操作-------------------------- */

    /**
     * set添加元素
     *
     * @param key
     * @param values
     * @return
     */
    public Long sAdd(String key, String... values) {
        return redisTemplate.opsForSet().add(key, values);
    }

    /**
     * set移除元素
     *
     * @param key
     * @param values
     * @return
     */
    public Long sRemove(String key, Object... values) {
        return redisTemplate.opsForSet().remove(key, values);
    }

    /**
     * 移除并返回集合的一个随机元素
     *
     * @param key
     * @return
     */
    public String sPop(String key) {
        return redisTemplate.opsForSet().pop(key);
    }

    /**
     * 将元素value从一个集合移到另一个集合
     *
     * @param key
     * @param value
     * @param destKey
     * @return
     */
    public Boolean sMove(String key, String value, String destKey) {
        return redisTemplate.opsForSet().move(key, value, destKey);
    }

    /**
     * 获取集合的大小
     *
     * @param key
     * @return
     */
    public Long sSize(String key) {
        return redisTemplate.opsForSet().size(key);
    }

    /**
     * 判断集合是否包含value
     *
     * @param key
     * @param value
     * @return
     */
    public Boolean sIsMember(String key, Object value) {
        return redisTemplate.opsForSet().isMember(key, value);
    }

    /**
     * 获取两个集合的交集
     *
     * @param key
     * @param otherKey
     * @return
     */
    public Set<String> sIntersect(String key, String otherKey) {
        return redisTemplate.opsForSet().intersect(key, otherKey);
    }

    /**
     * 获取key集合与多个集合的交集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<String> sIntersect(String key, Collection<String> otherKeys) {
        return redisTemplate.opsForSet().intersect(key, otherKeys);
    }

    /**
     * key集合与otherKey集合的交集存储到destKey集合中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long sIntersectAndStore(String key, String otherKey, String destKey) {
        return redisTemplate.opsForSet().intersectAndStore(key, otherKey,
                destKey);
    }

    /**
     * key集合与多个集合的交集存储到destKey集合中
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long sIntersectAndStore(String key, Collection<String> otherKeys,
                                   String destKey) {
        return redisTemplate.opsForSet().intersectAndStore(key, otherKeys,
                destKey);
    }

    /**
     * 获取两个集合的并集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<String> sUnion(String key, String otherKeys) {
        return redisTemplate.opsForSet().union(key, otherKeys);
    }

    /**
     * 获取key集合与多个集合的并集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<String> sUnion(String key, Collection<String> otherKeys) {
        return redisTemplate.opsForSet().union(key, otherKeys);
    }

    /**
     * key集合与otherKey集合的并集存储到destKey中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long sUnionAndStore(String key, String otherKey, String destKey) {
        return redisTemplate.opsForSet().unionAndStore(key, otherKey, destKey);
    }

    /**
     * key集合与多个集合的并集存储到destKey中
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long sUnionAndStore(String key, Collection<String> otherKeys,
                               String destKey) {
        return redisTemplate.opsForSet().unionAndStore(key, otherKeys, destKey);
    }

    /**
     * 获取两个集合的差集
     *
     * @param key
     * @param otherKey
     * @return
     */
    public Set<String> sDifference(String key, String otherKey) {
        return redisTemplate.opsForSet().difference(key, otherKey);
    }

    /**
     * 获取key集合与多个集合的差集
     *
     * @param key
     * @param otherKeys
     * @return
     */
    public Set<String> sDifference(String key, Collection<String> otherKeys) {
        return redisTemplate.opsForSet().difference(key, otherKeys);
    }

    /**
     * key集合与otherKey集合的差集存储到destKey中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long sDifference(String key, String otherKey, String destKey) {
        return redisTemplate.opsForSet().differenceAndStore(key, otherKey,
                destKey);
    }

    /**
     * key集合与多个集合的差集存储到destKey中
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long sDifference(String key, Collection<String> otherKeys,
                            String destKey) {
        return redisTemplate.opsForSet().differenceAndStore(key, otherKeys,
                destKey);
    }

    /**
     * 获取集合所有元素
     *
     * @param key
     * @return
     */
    public Set<String> setMembers(String key) {
        return redisTemplate.opsForSet().members(key);
    }

    /**
     * 随机获取集合中的一个元素
     *
     * @param key
     * @return
     */
    public String sRandomMember(String key) {
        return redisTemplate.opsForSet().randomMember(key);
    }

    /**
     * 随机获取集合中count个元素
     *
     * @param key
     * @param count
     * @return
     */
    public List<String> sRandomMembers(String key, long count) {
        return redisTemplate.opsForSet().randomMembers(key, count);
    }

    /**
     * 随机获取集合中count个元素并且去除重复的
     *
     * @param key
     * @param count
     * @return
     */
    public Set<String> sDistinctRandomMembers(String key, long count) {
        return redisTemplate.opsForSet().distinctRandomMembers(key, count);
    }

    /**
     *
     * @param key
     * @param options
     * @return
     */
    public Cursor<String> sScan(String key, ScanOptions options) {
        return redisTemplate.opsForSet().scan(key, options);
    }

    /**------------------zSet相关操作--------------------------------*/

    /**
     * 添加元素,有序集合是按照元素的score值由小到大排列
     *
     * @param key
     * @param value
     * @param score
     * @return
     */
    public Boolean zAdd(String key, String value, double score) {
        return redisTemplate.opsForZSet().add(key, value, score);
    }

    /**
     *
     * @param key
     * @param values
     * @return
     */
    public Long zAdd(String key, Set<TypedTuple<String>> values) {
        return redisTemplate.opsForZSet().add(key, values);
    }

    /**
     *
     * @param key
     * @param values
     * @return
     */
    public Long zRemove(String key, Object... values) {
        return redisTemplate.opsForZSet().remove(key, values);
    }

    /**
     * 增加元素的score值,并返回增加后的值
     *
     * @param key
     * @param value
     * @param delta
     * @return
     */
    public Double zIncrementScore(String key, String value, double delta) {
        return redisTemplate.opsForZSet().incrementScore(key, value, delta);
    }

    /**
     * 返回元素在集合的排名,有序集合是按照元素的score值由小到大排列
     *
     * @param key
     * @param value
     * @return 0表示第一位
     */
    public Long zRank(String key, Object value) {
        return redisTemplate.opsForZSet().rank(key, value);
    }

    /**
     * 返回元素在集合的排名,按元素的score值由大到小排列
     *
     * @param key
     * @param value
     * @return
     */
    public Long zReverseRank(String key, Object value) {
        return redisTemplate.opsForZSet().reverseRank(key, value);
    }

    /**
     * 获取集合的元素, 从小到大排序
     *
     * @param key
     * @param start
     *            开始位置
     * @param end
     *            结束位置, -1查询所有
     * @return
     */
    public Set<String> zRange(String key, long start, long end) {
        return redisTemplate.opsForZSet().range(key, start, end);
    }

    /**
     * 获取集合元素, 并且把score值也获取
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Set<TypedTuple<String>> zRangeWithScores(String key, long start,
                                                    long end) {
        return redisTemplate.opsForZSet().rangeWithScores(key, start, end);
    }

    /**
     * 根据Score值查询集合元素
     *
     * @param key
     * @param min
     *            最小值
     * @param max
     *            最大值
     * @return
     */
    public Set<String> zRangeByScore(String key, double min, double max) {
        return redisTemplate.opsForZSet().rangeByScore(key, min, max);
    }

    /**
     * 根据Score值查询集合元素, 从小到大排序
     *
     * @param key
     * @param min
     *            最小值
     * @param max
     *            最大值
     * @return
     */
    public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
                                                           double min, double max) {
        return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max);
    }

    /**
     *
     * @param key
     * @param min
     * @param max
     * @param start
     * @param end
     * @return
     */
    public Set<TypedTuple<String>> zRangeByScoreWithScores(String key,
                                                           double min, double max, long start, long end) {
        return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,
                start, end);
    }

    /**
     * 获取集合的元素, 从大到小排序
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Set<String> zReverseRange(String key, long start, long end) {
        return redisTemplate.opsForZSet().reverseRange(key, start, end);
    }

    /**
     * 获取集合的元素, 从大到小排序, 并返回score值
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Set<TypedTuple<String>> zReverseRangeWithScores(String key,
                                                           long start, long end) {
        return redisTemplate.opsForZSet().reverseRangeWithScores(key, start,
                end);
    }

    /**
     * 根据Score值查询集合元素, 从大到小排序
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Set<String> zReverseRangeByScore(String key, double min,
                                            double max) {
        return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max);
    }

    /**
     * 根据Score值查询集合元素, 从大到小排序
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Set<TypedTuple<String>> zReverseRangeByScoreWithScores(
            String key, double min, double max) {
        return redisTemplate.opsForZSet().reverseRangeByScoreWithScores(key,
                min, max);
    }

    /**
     *
     * @param key
     * @param min
     * @param max
     * @param start
     * @param end
     * @return
     */
    public Set<String> zReverseRangeByScore(String key, double min,
                                            double max, long start, long end) {
        return redisTemplate.opsForZSet().reverseRangeByScore(key, min, max,
                start, end);
    }

    /**
     * 根据score值获取集合元素数量
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Long zCount(String key, double min, double max) {
        return redisTemplate.opsForZSet().count(key, min, max);
    }

    /**
     * 获取集合大小
     *
     * @param key
     * @return
     */
    public Long zSize(String key) {
        return redisTemplate.opsForZSet().size(key);
    }

    /**
     * 获取集合大小
     *
     * @param key
     * @return
     */
    public Long zZCard(String key) {
        return redisTemplate.opsForZSet().zCard(key);
    }

    /**
     * 获取集合中value元素的score值
     *
     * @param key
     * @param value
     * @return
     */
    public Double zScore(String key, Object value) {
        return redisTemplate.opsForZSet().score(key, value);
    }

    /**
     * 移除指定索引位置的成员
     *
     * @param key
     * @param start
     * @param end
     * @return
     */
    public Long zRemoveRange(String key, long start, long end) {
        return redisTemplate.opsForZSet().removeRange(key, start, end);
    }

    /**
     * 根据指定的score值的范围来移除成员
     *
     * @param key
     * @param min
     * @param max
     * @return
     */
    public Long zRemoveRangeByScore(String key, double min, double max) {
        return redisTemplate.opsForZSet().removeRangeByScore(key, min, max);
    }

    /**
     * 获取key和otherKey的并集并存储在destKey中
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long zUnionAndStore(String key, String otherKey, String destKey) {
        return redisTemplate.opsForZSet().unionAndStore(key, otherKey, destKey);
    }

    /**
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long zUnionAndStore(String key, Collection<String> otherKeys,
                               String destKey) {
        return redisTemplate.opsForZSet()
                .unionAndStore(key, otherKeys, destKey);
    }

    /**
     * 交集
     *
     * @param key
     * @param otherKey
     * @param destKey
     * @return
     */
    public Long zIntersectAndStore(String key, String otherKey,
                                   String destKey) {
        return redisTemplate.opsForZSet().intersectAndStore(key, otherKey,
                destKey);
    }

    /**
     * 交集
     *
     * @param key
     * @param otherKeys
     * @param destKey
     * @return
     */
    public Long zIntersectAndStore(String key, Collection<String> otherKeys,
                                   String destKey) {
        return redisTemplate.opsForZSet().intersectAndStore(key, otherKeys,
                destKey);
    }

    /**
     *
     * @param key
     * @param options
     * @return
     */
    public Cursor<TypedTuple<String>> zScan(String key, ScanOptions options) {
        return redisTemplate.opsForZSet().scan(key, options);
    }

}

实战记录

Spring缓存

实际上是存在spring容器中的

第一步,在springboot全局启动类上加一个开去缓存注解

//开启缓存
@EnableCaching
@SpringBootApplication
public class HavefunApplication {
        SpringApplication.run(HavefunApplication.class, args);
    }
}

空参数的缓存

在需要查询返回值的一个方法上加启动缓存的注解

@Cacheable(value = "test",key = "'testkey'")
public String[] getRotationChartPictures() {
    System.out.println("进来了");
    return new String[]{ValueConfig.SERVER_URL + "localPictures/1.png",
                        ValueConfig.SERVER_URL + "localPictures/2.png",
                        ValueConfig.SERVER_URL + "localPictures/3.png",
                        ValueConfig.SERVER_URL + "localPictures/4.png",};
}

测试的时候发现,除了第一次使用这个方法,第二次及以后都不在进入这个方法,为什么呢?

第一次访问这个方法,先去缓存中查找有没有testkey的记录,发现没有,就进入getRotationChartPictures方法,执行完毕返回后,将返回值标记为testkey,并存入test缓存区,下一次请求,发现缓存区有testkey,就不再进入方法,直接从缓存区找。

但是会出现一个问题:当我们取的数据是从数据库中查出来的,如果这个数据库中的数据被修改了,那这个地方取出来的缓存永远都不会更新,所以我们就需要在有可能修改该数据的方法上面加上清除缓存的注解

//也可以偷懒,不写key,但是这样就是清除所有test缓存区中的数据,还得写上allEntries = true
@CacheEvict(value = "test",key = "'testkey'")

单参数的缓存

如果返回值方法上面有参数怎么办?如果第一次查询id为1的数据,第二次查询却是2怎么办,还能使用缓存吗??很简单,方式如下

@Cacheable(value = "test",key = "'testkey'+#id")

同样的,如果我们对这个方法所返回的数据有更新,我们还是需要去清理缓存,我们在清理缓存的时候,必须考虑所有可能和这个返回数据有关系的地方,都需要进行删除,但是我们不同的方法上面所使用的缓存key是不一样的,所以要同时清理很多的key的时候,我们只能依靠直接清理一个缓存区来解决,这就要求了所有和这个数据有关联的方法所产生的key的缓存区都需要相同,不能随意开启不同的缓存区

这个时候清理缓存就需要用到新的参数

@CacheEvict(value = "test",AllEntries = true)

多参数的缓存

@Cacheable(value = "test",key = "'testkey'+#id+','+#name")

无论是单参还是多参,总之都是字符串的拼接,只是需要注意变量前面需要加上井号,唯一目的就只是作为不同的缓存key的区分,比如下面有一个key23单参缓存,然后还有一个双参缓存2和3,如果用key23就冲突了,所以加一个逗号表示key2,3,当然也可以加其他的符号

Redis缓存

同样需要引入redis依赖

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.4.0</version>
</dependency>

运行项目报错

Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.impl.GenericObjectPoolConfig
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_261]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_261]
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[na:1.8.0_261]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_261]
	... 48 common frames omitted

引入commons-pool2依赖

<!-- https://mvnrepository/artifact/org.apachemons/commons-pool2 -->
<dependency>
    <groupId>org.apachemons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.9.0</version>
</dependency>

我们之前的spring缓存注解不需要任何修改,可以直接运行,测试访问

使用redisTemplate实现下单30min不支付自动取消

redisTemplate.opsForValue().set(id,"一些描述信息,无所谓,也可以直接存订单的详细信息",Duration.ofMinutes(30L));

获取订单剩余时间

Long time = redisTemplate.getExpire("key");

RedisConfig.java

/**
 * @author PengHuAnZhi
 * @createTime 2020/12/4 7:58
 * @projectName HaveFun
 * @className RedisConfig.java
 * @description TODO
 */
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
    //固定模板,一般企业中可以直接使用
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        //实际开发<String, Object>用的很多,所以就这里指定
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

        //序列化配置
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
        Jackson2JsonRedisSerializer<Object> objectJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        objectJackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        template.setKeySerializer(objectJackson2JsonRedisSerializer);

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        //key采用String的序列化方式
        template.setKeySerializer(stringRedisSerializer);
        //value采用String的序列化方式
        template.setValueSerializer(objectJackson2JsonRedisSerializer);
        //hash的也采用String的序列化方式
        template.setHashKeySerializer(stringRedisSerializer);
        //hash的值采用jackson的序列化方式
        template.setHashValueSerializer(objectJackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    /**
     * 配置一个CacheManager才能使用@Cacheable等注解
     */
    @Bean
    public CacheManager cacheManager(RedisConnectionFactory connectionFactory) {
        Jackson2JsonRedisSerializer<Object> objectJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
        //生成一个默认配置,通过config对象即可对缓存进行自定义配置
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
        config = config
                // 设置 key为string序列化
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()))
                // 设置value为json序列化
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(objectJackson2JsonRedisSerializer))
                // 不缓存空值
                .disableCachingNullValues()
                // 设置缓存的默认过期时间 60分钟
                .entryTtl(Duration.ofHours(1L));

        //特殊缓存空间应用不同的配置
        Map<String, RedisCacheConfiguration> map = new HashMap<>();
        //将轮播图地址设置永不过期,因为基本不会改动它
        map.put("activity_constant", config.entryTtl(Duration.ofMinutes(-1L)));//provider1缓存空间过期时间 30分钟

        //使用自定义的缓存配置初始化一个RedisCacheManager

        return RedisCacheManager.builder(connectionFactory)
                .cacheDefaults(config) //默认配置
                .withInitialCacheConfigurations(map) //特殊缓存
                .transactionAware() //事务
                .build();
    }
}

Redis.conf详解

redis在启动的时候,就是通过配置文件来启动的

单位,大小写不敏感

# 1k => 1000 bytes
# 1kb => 1024 bytes
# 1m => 1000000 bytes
# 1mb => 1024*1024 bytes
# 1g => 1000000000 bytes
# 1gb => 1024*1024*1024 bytes
#
# units are case insensitive so 1GB 1Gb 1gB are all the same.

还可以包含其他配置文件

# include /path/to/local.conf
# include /path/to/other.conf

网络

  • 我们在配置idea远程连接的时候,就把这里的bind本地连接给注释了
#bind 127.0.0.1
  • 保护模式,端口号,tcp连接队列
protected-mode yes
port 6379
tcp-backlog 511

通用配置

  • 我们在安装redis的时候这里就把daemonize 设置为了yes,就是可以后台运行,即守护进程运行
daemonize yes
  • 当我们开启了守护进程运行,我们就必须指定一个pid文件
pidfile /var/run/redis_6379.pid
  • loglevel表示日志级别,具体选项都在上方注释
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel notice
  • 日志生成的位置
logfile ""
  • 默认16个数据库
databases 16
  • 是否总是显示log
always-show-logo yes
  • 快照

    持久化,在规定的时间内,执行了多少次操作,则会持久化到文件.rdb或. aof

    redis是内存数据库,如果没有持久化,那么数据断电即失

#900s内,如果有一个key被修改过,我们就进行持久化操作
save 900 1
#300s内,如果有十个key被修改过,我们就进行持久化操作
save 300 10
#60s内,如果有一万个key被修改过,我们就进行持久化操作
save 60 10000
  • 如果持久化出错,是否还需要继续工作
stop-writes-on-bgsave-error yes
  • 是否需要压缩rdb文件,需要消耗一些cpu资源
rdbcompression yes
  • 保存rdb文件的时候,是否要进行一些错误的校验,自动修复
rdbchecksum yes
  • 持久化生成的rdb文件保存的目录
dir ./
  • 主从复制的一些配置,后面详解,这里不讲
################################# REPLICATION #################################
  • 安全等
################################## SECURITY ###################################

#设置密码
requirepass 123456
#命令行设置密码
config set requirepass "123456"
#清除密码
config set requirepass ""
#登录
auth 123456
  • 客户端限制
################################### CLIENTS ####################################
#最多同时有10000个客户端连接
# maxclients 10000
  • 内存限制
############################## MEMORY MANAGEMENT ################################
#redis配置最大的内存容量 
# maxmemory <bytes>

#内存满了以后的处理策略,移除一些过期的key
# maxmemory-policy noeviction
1、volatile-lru:只对设置了过期时间的key进行LRU(默认值) 

2、allkeys-lru : 删除lru算法的key   

3、volatile-random:随机删除即将过期key   

4、allkeys-random:随机删除   

5、volatile-ttl : 删除即将过期的   

6、noeviction : 永不过期,返回错误
  • AOF模式
############################## APPEND ONLY MODE ###############################

#默认不开启aof模式,默认使用rdb持久化方式,在大部分情况下,rdb完全够用了	  	
appendonly no
#持久化的文件的名字
appendfilename "appendonly.aof"

# appendfsync always#每次修改都会同步,消耗性能
appendfsync everysec#每秒同步一次,可能会丢失这一秒的数据
# appendfsync no    #不同步,这个时候操作系统自己同步数据,速度最快,但不推荐

Redis持久化

Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以Redis提供了持久化功能

RDB(Redis Database)

什么是RDB

在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。Redis会单独创建( fork )一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。

我们的rdb文件在配置文件中都有定义

dump.rdb

测试

关闭服务再次测试

  • 有dump.rdb文件的时候
127.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> shutdown
not connected> exit
[root@iZ2ze5wj5w33v3gyd9kv1bZ bin]# redis-server phzconfig/redis.conf 
[root@iZ2ze5wj5w33v3gyd9kv1bZ bin]# redis-cli -p 6379
127.0.0.1:6379> auth 123456
OK
127.0.0.1:6379> get k1
"v1"
  • 没有dump.rdb文件的时候
127.0.0.1:6379> set key1 v1
OK
127.0.0.1:6379> SHUTDOWN
not connected> exit
[root@iZ2ze5wj5w33v3gyd9kv1bZ bin]# rm -rf dump.rdb 
[root@iZ2ze5wj5w33v3gyd9kv1bZ bin]# redis-server phzconfig/redis.conf 
[root@iZ2ze5wj5w33v3gyd9kv1bZ bin]# redis-cli -p 6379
127.0.0.1:6379> auth 123456
OK
127.0.0.1:6379> get key1
(nil)

当我们执行了flush命令,save命令,退出redis,都会自动生成一个dump.rdb文件

如何恢复rdb文件!

只需要将rdb文件放在我们redis启动目录就可以,redis启动的时候会自动检查dump.rdb恢复其中的数据!

#获取存放路径
config get dir

优点:
1、适合大规模的数据恢复!

2、对数据的完整性要不高!

缺点:

1、需要一定的时间间隔进程操作!如果redis意外宕机了,这个最后一次修改数据就没有的了!

2、fork进程的时候,会占用一定的内容空间!!

AOF(append only file)

将我们的所有命令都记录下来,类似history,恢复的时候就把这个文件全部在执行一遍

以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作
Aof保存的是appendonly.aof 文件

重写规则

aof默认就是文件的无限追加,文件会越来越大

如果aof文件大于64m,太大了! fork一个新的进程来将我们的文件进行重写!

默认是不开启的,需要开启就需要在配置文件中的这个no改为yes

#重新启动,使配置文件生效
shutdown

我们就发现在同级目录出现了这个appendonly.aof文件

写入一个命令,再次打开该文件

当我们在关机后,人为修改这个aof文件,或者rdb文件,我们的数据就会被破坏,我们redis就会启动不起来,我们就需要修复这个文件,我们就可以使用redis给我们提供的修复工具redis-check-aof或者redis-check-rdb

#执行修复
redis-check-aof --fix appendonly.aof

扩展

1、RDB持久化方式能够在指定的时间间隔内对你的数据进行快照存储

2、AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。

3、只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化

4、同时开启两种持久化方式

  • 在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为在通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。
  • RDB的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要只使用AOF呢?作者建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能潜在的Bug,留着作为一个万一的手段。

5、性能建议

  • 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留save 9001这条规则。

  • 如果Enable AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite 的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite 的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。

  • 如果不Enable AOF,仅靠Master-Slave Replcation实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统皮动。代价是如果Master/Slave同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个MasterlSlave 中的RDB文件,载入较新的那个,微博就是这种架构。

Redis发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式︰发送者(pub)发送消息,订阅者(sub)接收消息。微信,微博、关注系统
Redis客户端可以订阅任意数量的频道。
订阅/发布消息图︰
第一个:消息发送者,第二个:频道第三个:消息订阅者!

下图展示了频道channel1,以及订阅这个频道的三个客户端―—client2、 client5和client1之间的关系∶

当有新消息通过PUBLISH 命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端

命令

  • 监听某个频道
SUBSCRIBE phz

  • 向某个频道发送信息
PUBLISH phz "hello phz"

原理

  • Redis是使用C实现的,通过SUBSCRIBE 命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个个channel,而字典的值则是一个链表,链表中保存了所有订阅这个channel 的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定channel的订阅链表中。通过PUBLISH命令向订阅者发送消息,redis-server会使用给定的频道作为键,在它所维护的channe字典中查找记录了订阅者这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
  • Pub/Sub 从字面上理解就是发布( Publish )与订阅( Subscribe ),在Redis中,你可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。

Redis主从复制

概念

主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(masterlk/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写为主,Slave以读为主。

默认情况下,每台Redis服务器都是主节点;且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。主从复制的作用主要包括:

1、数据冗余∶主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。

2、故障恢复∶当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。3、负载均衡∶在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。

4、高可用基石︰除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。

一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的,原因如下︰

1、从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大﹔

2、从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。

电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是"多读少写"。对于这种场景,我们可以使如下这种架构∶

主从复制,读写分离。80%的情况下都是在进行读操作,减缓服务器的压力!架构中经常使用,一主二从

环境配置

只配置从库,不配置主库,因为每台redis服务本身默认就是一个主库

首先打开四个终端,在其中一个终端开启一个redis服务,查看该服务的一些信息

info replication
127.0.0.1:6379> info replication
# Replication
role:master#角色
connected_slaves:0#从机个数

关闭redis服务,将redis配置文件复制三个

修改每一个配置文件,拿redis-79为例,其他的80,81都要进行对应的修改

对应开启三个服务

如果开启成功就会看到三个log文件,和三个进程信息

一主二从

开启的三台redis服务默认都是主机

配置一主(79)二从(80,81)

#参数为主机的信息
SLAVEOF 127.0.0.1 6379

如果主机设置了密码,那么从机的配置文件中需要将主机的密码配置好,被坑了。。。

两者都修改完成后,shutdown重启以下,然后再重新配置一主二从,然后就会在主机上看见我们的两个从机了

正式的主从配置应该从配置文件修改,那样就是永久的,这里使用命令只是暂时的

配置完成后,主机可以写,从机就不能写了,只能读 ,主机中所有的数据都会被复制到从机中去

如果尝试在从机中写:

  • 如果主机突然断电,宕机,从机不受影响,没有配置哨兵,两个从机还是从机,配置信息依然生效,只是没有写操作了,当主机恢复,进行操作仍然还是可以共享到主机的数据
  • 如果从机突然断电,宕机,主机在从机断线以后中写入的数据,从机就拿不到了,而且通过命令行配置的主从复制,重连后配置就失效了,不能够重新获取到主机的数据,需要重新配置,如果是配置文件配置的主从复制,则不需要再手动配置,重连后依然生效,同样的。从机只要连接上了主机,之前是否共享过的数据都可以直接复制给从机

复制的原理

Slave启动成功连接到master后会发送一个sync命令

Master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。

全量复制:slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步

只要是重新连接master,一次完全同步(全量复制)将被自动执行

层层链路

将从机80作为81的主机,79写入的数据,81依然能收到,但是这个80依然无法写入

当79这个主机断开连接以后,就会造成群龙无首的局面,这是作为从机,可以手动配置自己当主机,这个时候79回来了,自己却没有了从机,变成了光杆司令

SLAVEOF no one

哨兵模式

以下内容均是最基础的配置,云服务器配置不高,开不了更多的哨兵线程,就只开启了一个

自动选取老大

以上配置实际生产环境几乎不用

主从切换技术的方法是︰当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel (哨兵)架构来解决这个问题。

谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。

这里的哨兵有两个作用

  • 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
  • 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。

然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控各个哨兵之间还会进行监控,这样就形成了多哨兵模式。

假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover(重新选举)过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下

线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操

作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线

测试:目前状态仍然是一主二从,79主,80,81从

在配置文件夹下,即配置文件同级目录新建sentinel.conf文件

#sentinel monitor 被监控名称 host port 1
sentinel monitor myredis 127.0.0.1 6379 1

后面的这个数字1,代表主机挂了,slave从机投票看让谁接替成为主机,票数最多的,就会成为主机

启动哨兵

redis-sentinel phzconfig/sentinel.conf

这个时候发现没有从机的相关信息,又被坑了,主机有密码,配置文件还得加上参数

#myredis与配置哨兵起的名字一致
sentinel auth-pass myredis 123456

重新开启哨兵

这个时候我们关闭6379

稍等片刻,重新选举主机

重新登录原来的主机,发现被谋反了,只能作为从机了

所以哨兵模式就是如果Master节点断开了,这个时候就会从从机中随机选择一个服务器!(这里面有一个投票算法)

优点:

1、哨兵集群,基于主从复制模式,所有的主从配置优点,它全有2、主从可以切换,故障可以转移,系统的可用性就会更好

3、哨兵模式就是主从模式的升级,手动到自动,更加健壮!

缺点︰

1、Redis不好啊在线扩容的,集群容量一旦到达上限,在线扩容就十分麻烦!

2、实现哨兵模式的配置其实是很麻烦的,里面有很多选择!

官方完整配置

# Example sentinel.conf
# port <sentinel-port>
# The port that this sentinel instance will run on
# sentinel实例运行的端口
port 26379
# sentinel announce-ip <ip>
# sentinel announce-port <port>
#
# The above two configuration directives are useful in environments where,
# because of NAT, Sentinel is reachable from outside via a non-local address.
#
# When announce-ip is provided, the Sentinel will claim the specified IP address
# in HELLO messages used to gossip its presence, instead of auto-detecting the
# local address as it usually does.
#
# Similarly when announce-port is provided and is valid and non-zero, Sentinel
# will announce the specified TCP port.
#
# The two options don't need to be used together, if only announce-ip is
# provided, the Sentinel will announce the specified IP and the server port
# as specified by the "port" option. If only announce-port is provided, the
# Sentinel will announce the auto-detected local IP and the specified port.
#
# Example:
#
# sentinel announce-ip 1.2.3.4
# dir <working-directory>
# Every long running process should have a well-defined working directory.
# For Redis Sentinel to chdir to /tmp at startup is the simplest thing
# for the process to don't interferer with administrative tasks such as
# unmounting filesystems.
dir /tmp
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
# master-name : master Redis Server名称
# ip : master Redis Server的IP地址
# redis-port : master Redis Server的端口号
# quorum : 主实例判断为失效至少需要 quorum 个 Sentinel 进程的同意,只要同意 Sentinel 的数量不达标,自动failover就不会执行
#
# Tells Sentinel to monitor this master, and to consider it in O_DOWN
# (Objectively Down) state only if at least <quorum> sentinels agree.
#
# Note that whatever is the ODOWN quorum, a Sentinel will require to
# be elected by the majority of the known Sentinels in order to
# start a failover, so no failover can be performed in minority.
#
# Slaves are auto-discovered, so you don't need to specify slaves in
# any way. Sentinel itself will rewrite this configuration file adding
# the slaves using additional configuration options.
# Also note that the configuration file is rewritten when a
# slave is promoted to master.
#
# Note: master name should not include special characters or spaces.
# The valid charset is A-z 0-9 and the three characters ".-_".
# 
sentinel monitor mymaster 127.0.0.1 6379 2
# sentinel auth-pass <master-name> <password>
#
# Set the password to use to authenticate with the master and slaves.
# Useful if there is a password set in the Redis instances to monitor.
#
# Note that the master password is also used for slaves, so it is not
# possible to set a different password in masters and slaves instances
# if you want to be able to monitor these instances with Sentinel.
#
# However you can have Redis instances without the authentication enabled
# mixed with Redis instances requiring the authentication (as long as the
# password set is the same for all the instances requiring the password) as
# the AUTH command will have no effect in Redis instances with authentication
# switched off.
#
# Example:
#
# sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
# sentinel down-after-milliseconds <master-name> <milliseconds>
#
# Number of milliseconds the master (or any attached slave or sentinel) should
# be unreachable (as in, not acceptable reply to PING, continuously, for the
# specified period) in order to consider it in S_DOWN state (Subjectively
# Down).
# 选项指定了 Sentinel 认为Redis实例已经失效所需的毫秒数。当实例超过该时间没有返回PING,或者直接返回错误, 那么 Sentinel 将这个实例标记为主观下线(subjectively down,简称 SDOWN )
#
# Default is 30 seconds.
sentinel down-after-milliseconds mymaster 30000
# sentinel parallel-syncs <master-name> <numslaves>
#
# How many slaves we can reconfigure to point to the new slave simultaneously
# during the failover. Use a low number if you use the slaves to serve query
# to avoid that all the slaves will be unreachable at about the same
# time while performing the synchronization with the master.
# 选项指定了在执行故障转移时, 最多可以有多少个从Redis实例在同步新的主实例, 在从Redis实例较多的情况下这个数字越小,同步的时间越长,完成故障转移所需的时间就越长。
sentinel parallel-syncs mymaster 1
# sentinel failover-timeout <master-name> <milliseconds>
#
# Specifies the failover timeout in milliseconds. It is used in many ways:
#
# - The time needed to re-start a failover after a previous failover was
#   already tried against the same master by a given Sentinel, is two
#   times the failover timeout.
#
# - The time needed for a slave replicating to a wrong master according
#   to a Sentinel current configuration, to be forced to replicate
#   with the right master, is exactly the failover timeout (counting since
#   the moment a Sentinel detected the misconfiguration).
#
# - The time needed to cancel a failover that is already in progress but
#   did not produced any configuration change (SLAVEOF NO ONE yet not
#   acknowledged by the promoted slave).
#
# - The maximum time a failover in progress waits for all the slaves to be
#   reconfigured as slaves of the new master. However even after this time
#   the slaves will be reconfigured by the Sentinels anyway, but not with
#   the exact parallel-syncs progression as specified.
# 如果在该时间(ms)内未能完成failover操作,则认为该failover失败
#
# Default is 3 minutes.
sentinel failover-timeout mymaster 180000
# SCRIPTS EXECUTION
#
# sentinel notification-script and sentinel reconfig-script are used in order
# to configure scripts that are called to notify the system administrator
# or to reconfigure clients after a failover. The scripts are executed
# with the following rules for error handling:
#
# If script exits with "1" the execution is retried later (up to a maximum
# number of times currently set to 10).
#
# If script exits with "2" (or an higher value) the script execution is
# not retried.
#
# If script terminates because it receives a signal the behavior is the same
# as exit code 1.
#
# A script has a maximum running time of 60 seconds. After this limit is
# reached the script is terminated with a SIGKILL and the execution retried.
# NOTIFICATION SCRIPT
#
# sentinel notification-script <master-name> <script-path>
# 
# Call the specified notification script for any sentinel event that is
# generated in the WARNING level (for instance -sdown, -odown, and so forth).
# This script should notify the system administrator via email, SMS, or any
# other messaging system, that there is something wrong with the monitored
# Redis systems.
#
# The script is called with just two arguments: the first is the event type
# and the second the event description.
#
# The script must exist and be executable in order for sentinel to start if
# this option is provided.
# 指定sentinel检测到该监控的redis实例指向的实例异常时,调用的报警脚本。该配置项可选,但是很常用。
#
# Example:
#
# sentinel notification-script mymaster /var/redis/notify.sh
# CLIENTS RECONFIGURATION SCRIPT
#
# sentinel client-reconfig-script <master-name> <script-path>
#
# When the master changed because of a failover a script can be called in
# order to perform application-specific tasks to notify the clients that the
# configuration has changed and the master is at a different address.
# 
# The following arguments are passed to the script:
#
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
#
# <state> is currently always "failover"
# <role> is either "leader" or "observer"
# 
# The arguments from-ip, from-port, to-ip, to-port are used to communicate
# the old address of the master and the new address of the elected slave
# (now a master).
#
# This script should be resistant to multiple invocations.
#
# Example:
#
# sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

常用配置

#Example sentine1.conf

#哨兵sentine1实例运行的端口默认26379
port 26379

#哨兵sentine1的工作目录
dir /tmpl

#哨兵sentine1监控的redis主节点的 ip port
# master-name可以自己命名的主节点名字只能由字母A-z、数字0-9、这三个字符".-_"组成。
# quorum配置多少个sentine1哨兵统一认为master主节点失联那么这时客观上认为主节点失联了
# sentine7 monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 2

#当在Redis实例中开启了requirepass foobared 授权密码这样所有连接Redis实例的客户端都要提供密码
#设置哨兵sentine1连接主从的密码注意必须为主从设置一样的验证密码
# sentine7 auth-pass <master-name> <password>
sentine1 auth-pass mymaster MySUPER--secret-O123password

#指定多少毫秒之后主节点没有应答哨兵sentine1此时哨兵主观上认为主节点下线默认30秒
# sentine7 down-after-mi77iseconds <master-name> <mi77iseconds>
sentine7 down-after-mi77iseconds mymaster 30000

# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行同步,这个数字越小,完成failover所需的时间就越长,但是如果这个数字越大,就意味着越多的slave因为rep7ication而不可用。可以通过将这个值设为1来保证每次只有一个slave处于不能处理命令请求的状态。
#sentinel para7le1-syncs <master-name> <nums1aves>
sentine1 para1le7-syncs mymaster 1

#故障转移的超时时间failover-timeout可以用在以下这些方面:
#1.同一个sentine1对同一个master两次fai1over之间的间隔时间。
#2.当一个slave从一个错误的master那里同步数据开始计算时间。直到s7ave被纠正为向正确的master那里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failoveri时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不para7le1-syncs所配置的规则来了
#默认三分钟
#sentine7 failover-timeout <master-name> <mi77iseconds>
sentine1 failover-timeout mymaster 180000

# SCRIPTS EXECUTION
#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
#对于脚本的运行结果有以下规则;
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。
#通知型脚本:当sentine7有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,这时这个脚本应该通过邮件,SNS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,一个是事件的类型,一个是事件的描述。如果sentine1.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentine1无法正常启动成功。
#通知脚本
# sentine7 notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh


#客户端重新配置主节点参数脚本
#当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
#以下参数将会在调用脚本时传给脚本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
#目前<state>总是“fai1over",
# <role>是“leader”或者"observer”中的一个。
#参数 from-ip,from-port,to-ip,to-port是用来和旧的master和新的master(即旧的s1ave)通信的
#这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentinel client-reconfig-script <master-name> <script-path>
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

Redis缓存穿透和雪崩

Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。

另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。

缓存穿透

(查不到)

缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中(秒杀!),于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。

布隆过滤器

布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;

缓存空对象

当存储层不命中后,即使返回的空对象也将其缓存起来,同时会设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;

但是这种方法会存在两个问题︰

1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键;

2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。

缓存击穿

(查太多,量太大,缓存过期)

概述

这里需要注意和缓存击穿的区别,缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞

当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。

解决方案

设置热点数据永不过期

从缓存层面来看,没有设置过期时间,所以不会出现热点 key过期后产生的问题。加互斥锁

分布式锁

使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。

缓存雪崩

概念

缓存雪崩,是指在某一个时间段,缓存集中过期失效。Redis宕机

产生雪崩的原因之一,比如在双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

解决方案

redis高可用

这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis ,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。(异地多活)

限流降级

这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热

数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。

更多推荐

Redis学习记录

本文发布于:2023-06-14 08:19:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1454650.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Redis

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!