NoSql
NoSQL特点
- 方便扩展(数据之间没有关系,很好扩展!)
- 大数据量高性能(Redis一秒写8万次,读取11完万,NoSQL的缓存记录级,是一种细粒度的缓存,性能会比较高!)
- 数据类型是多样型的!(不需要事先设计数据库 随去随用 如果是数据量十分大的表,很多人就无法设计!)
- 传统的RDBMS和NoSQL
传统的RDBMS
- 结构化组织
- SQL
- 数据和关系都存在单独的表中
- 操作,数据定义语言
- 严格的一致性
- 基础的书屋
- ......
NoSQL
- 不仅仅是数据
- 没有固定的查询语言
- 键值对存储,列存储,文档存储,图形数据库(社交关系)
- 最终一致性
- CAP定理和BASE理论(异地多活)
- 高性能 高可用 高可扩
- ....
了解:3V+3高
大数据时代3V:主要是描述问题
大数据时代3高:主要是对程序的要求
NoSQL的四大类
KV键值对:
- 新浪:redis
- 美团:redis+tair
- 阿里、百度:redis+memecache
文档行数据库(bson格式和json一样)
- MongoDB
- MongoDB是一个基于分布式文件存储的数据库,C++编写,主要用来处理大量的文档!
- MongoDB是一个介于关系型数据库和非关系型数据库中间的产品!MongoDB是非关系型数据库中功能最丰富,最像关系型数据库的!
- ConthDB
列存储数据库
- HBase
- 分布式文件系统
图形关系数据库
- 不是存图形,放的是关系 比如:朋友圈社交 广告推荐
- Neo4j,InfoGrid
Redis(Remote Dictionary Server )
Redis作用
- 内存存储、持久化,内存中是断电即失 所以说持久化很重要(rdb,aof)
- 效率高,可用于高速缓存
- 发布订阅系统
- 地图信息分析
- 计时器、计数器(浏览量)
- ...
特性:
- 多样的数据类型
- 持久化
- 集群
- 事务
- ...
测试性能
redis-benchmark是一个压力测试工具!
官方自带的性能测试工具
使用命令:
redis-benchmark + 命令参数
测试
#测试:100个并发 100000请求
redis-benchmark -h localhost -p 6379 -c 100 -n 100000
基础知识
redis默认有16个数据库
默认使用第0个
可以使用select进行数据库切换
keys * 查看数据库所有的key
flushdb 清除当前数据库
FLUSHALL 清除所有数据库
Redis是单线程的! Redis很快 Redis是基于内存操作的 CPU不是Redis性能瓶颈 Redis的瓶颈是根据机器的内存和网络带宽 既然可以单线程实现就使用单线程
Redis是C语言写的 官方提供数据为100000+QPS 完全不比同样使用key-vaule的Menencache差
Redis为什么单线程还那么快?
- 误区1:高性能的服务器一定是多线程的?
- 误区2:多线程(CPU上下文切换)一定比单线程效率高?
先去了解CPU>内存>硬盘的速度
核心:redis是将所有的数据全部放在内存中,所以说使用单线程去操作效率就是最高的,多线程(CPU上下文会切换:耗时的操作),对于内存系统来说,如果没有上下文切换效率就是最高的,多次读写都在一个CPU上,在内存情况下,就是最佳的方案!
Redis-Key
keys * 查看所有的key
set * 添加一个key
EXISTS * 判断当前key是否存在
move * * 移除当前库里的key 第一个*是指key 第二个*指库
EXPIRE * * 设置key的过期时间第一个*是指key 第二个*指时间 单位S
ttl * 查看当前key的剩余时间
get * 获得key
tpye * 查看key的类型
五大数据类型
String(字符串)
127.0.0.1:6379> set key1 v1 #设置值
OK
127.0.0.1:6379> get key1 #获取值
"v1"
127.0.0.1:6379> keys * #获取所有的key
1) "key1"
127.0.0.1:6379> exists key1 #判断某一个key是否存在
(integer) 1
127.0.0.1:6379> append key1 "hello" #追加字符串 如果当前key不存在 相当于setkey
(integer) 7
127.0.0.1:6379> strlen key1 #获取字符串长度
(integer) 7127.0.0.1:6379> incr views #自增1
(integer) 1
127.0.0.1:6379> decr views #自减1
(integer) 0127.0.0.1:6379> incrby views 10 #可以设置步长,指定增量
(integer) 9
127.0.0.1:6379> decrby views 5 #可以设置步长,指定减量
(integer) 4字符串范围 range
127.0.0.1:6379> set key1 "hello,hzx"
OK
127.0.0.1:6379> GETRANGE key1 0 3 #查看下标0~3的字符串
"hell"
127.0.0.1:6379> GETRANGE key1 0 -1 #查看所有的字符串
"hello,hzx"替换
127.0.0.1:6379> set key2 abcdefg
OK
127.0.0.1:6379> SETRANGE key2 1 xx #替换指定下标开始的字符串
(integer) 7
127.0.0.1:6379> get key2
"axxdefg"
# setex(set with expire) #设置过期时间
# setnx(set if not exist) #不存在设置(在分布式锁中常常使用)
127.0.0.1:6379> setex key3 30 hello #设置key3的值为hello 30秒后过期
OK
127.0.0.1:6379> ttl key3
(integer) 24
127.0.0.1:6379> get key3
"hello"
127.0.0.1:6379> setnx mykey "redis" #如果mykey不存在 创建mykey
(integer) 1
127.0.0.1:6379> ttl key3
(integer) -2
127.0.0.1:6379> setnx mykey "MongoDB" #如果mykey存在 创建失败
(integer) 0
127.0.0.1:6379> get mykey
"redis"#mset
#gset
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3 #同时设置多个值
OK
127.0.0.1:6379> keys *
1) "k1"
2) "k3"
3) "k2"
127.0.0.1:6379> mget k1 k2 k3 #同时获取多个值
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379> msetnx k1 v1 k4 v4 #msetnx 是一个原子性操作 要么一起成功 要么一起失败
(integer) 0
127.0.0.1:6379> get k4
(nil)#对象
set user:1{name:zhangsan , age:3} #设置一个user:1对象值为json字符来保存以恶个对象!
#这里的key是一个巧妙的设计:user:{id}:{filed} ,如此设计在Redis中完全OK
127.0.0.1:6379> mset user:1:name zhangsan user:1:age 2
OK
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhangsan"
2) "2"
getset #先get再set
127.0.0.1:6379> getset db redis #如果不存在值 则返回nil
(nil) #如果存在值 获取原来的值并设置新的值
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db mongodb
"redis"
127.0.0.1:6379> get db
"mongodb"
数据结构是相同的
String类型的使用场景:value除了是字符串还可以是数字
- 计数器
- 统计多单位的数量
- 粉丝数
- 对象缓存存储
List(列表)
基本的数据类型,列表
在redis里面可以把list玩成 栈 队列 或者 阻塞队列
所有的list命令都是用l开头的
127.0.0.1:6379> lpush list one #将一个值或者多个值,插入到列表头部(左)
(integer) 1
127.0.0.1:6379> lpush list two
(integer) 2
127.0.0.1:6379> lpush list three
(integer) 3
127.0.0.1:6379> lrange list 0 -1 #获取list中的值
1) "three"
2) "two"
3) "one"
127.0.0.1:6379> lrange list 0 1 #通过区间获取具体的值
1) "three"
2) "two"
127.0.0.1:6379> rpush list right #将一个值或者多个值,插入到列表尾部(右)
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
3) "one"
4) "right"LPOP RPOP
127.0.0.1:6379> lpop list #移除list第一个元素
"three"
127.0.0.1:6379> rpop list #移除list最后一个元素
"right"
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
LINDEX
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
127.0.0.1:6379> lindex list 1 #通过下标获取list的某一个值
"one"
127.0.0.1:6379> lindex list 0
"two"Llen
127.0.0.1:6379> lpush list 1 2 3
(integer) 3
127.0.0.1:6379> llen list #获取list长度
(integer) 3移除指定的值 LREM
127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "3"
3) "2"
4) "1"
127.0.0.1:6379> lrem list 1 1 #第一个1代表移除几个 第二个1是值
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "3"
2) "3"
3) "2"
127.0.0.1:6379> lrange list 2 3 #移除 两个3 从上往下移除
1) "2"trim 修剪:list截断!
127.0.0.1:6379> rpush mylist "hello"
(integer) 1
127.0.0.1:6379> rpush mylist "hello1"
(integer) 2
127.0.0.1:6379> rpush mylist "hello2"
(integer) 3
127.0.0.1:6379> rpush mylist "hello3"
(integer) 4
127.0.0.1:6379> ltrim mylist 1 2 #通过下标截取指定的长度 这个list已经被改变 截断了
OK 只剩下截取的元素
127.0.0.1:6379> lrange mylist 0 -1
1) "hello1"
2) "hello2"rpoplpush 移除list最后一元素将它移到新的列表中
127.0.0.1:6379> clear
127.0.0.1:6379> rpush mylist "hello"
(integer) 1
127.0.0.1:6379> rpush mylist "hello1"
(integer) 2
127.0.0.1:6379> rpush mylist "hello2"
(integer) 3
127.0.0.1:6379> rpoplpush mylist myotherlist #移除list最后一元素将它移到新的列表中
"hello2"
127.0.0.1:6379> lrange mylist 0 -1 #查看原来list
1) "hello"
2) "hello1"
127.0.0.1:6379> lrange myotherlist 0 -1 #查看目标list
1) "hello2"lset 将列表中指定下标的值替换为另一个值 更新操作
127.0.0.1:6379> exists list #判断这个列表是否存在
(integer) 0
127.0.0.1:6379> lset list 0 item #如果不存在列表更新就会报错
(error) ERR no such key
127.0.0.1:6379> lpush list "value1"
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "value1"
127.0.0.1:6379> lset list 0 item #如果存在 更新当前下标值
OK
127.0.0.1:6379> lrange list 0 0
1) "item"
127.0.0.1:6379> lset list 1 item #如果不存在 就会报错
(error) ERR index out of rangelinsert #将某个具体的value插入到列表中某个元素的前面或者后面
127.0.0.1:6379> rpush mylist "hello"
(integer) 1
127.0.0.1:6379> rpush mylist "world"
(integer) 2
127.0.0.1:6379> linsert mylist before "world" "other"
(integer) 3
127.0.0.1:6379> lrange mylist 0 -1
1) "hello"
2) "other"
3) "world"
127.0.0.1:6379> linsert mylist after "world" "new"
(integer) 4
127.0.0.1:6379> lrange mylist 0 -1
1) "hello"
2) "other"
3) "world"
4) "new"
List小结
- 实际上是一个链表 before node after left right都可以插入值
- 如果key不存在 创建新的链表
- 如果key存在 新增内容
- 如果移除了所有制 空链表 也代表不存在
- 在两边插入或者改动值 效率最高 中间元素 相对来说效率低一点
消息排队!消息队列 Lpush Rpop 栈(Lpush Lpop)
Set(集合)
set中的值是不能重复的
127.0.0.1:6379> sadd myset "hello" #set集合中添加元素
(integer) 1
127.0.0.1:6379> sadd myset "kuangshen"
(integer) 1
127.0.0.1:6379> sadd myset "lovekuangshen"
(integer) 1
127.0.0.1:6379> smembers myset #查看集合所有元素
1) "lovekuangshen"
2) "kuangshen"
3) "hello"
127.0.0.1:6379> sismember myset "hello" #查看set集合中是否存在该元素
(integer) 1
127.0.0.1:6379> sismember myset "world"
(integer) 0127.0.0.1:6379> scard myset #获取集合中元素的个数
(integer) 3127.0.0.1:6379> srem myset "hello" #移除集合中指定元素
(integer) 1
127.0.0.1:6379> smembers myset
1) "lovekuangshen"
2) "kuangshen"srandmember #随机抽选出指定个数元素
127.0.0.1:6379> srandmember myset #随机抽选出一个元素
"lovekuangshen2"
127.0.0.1:6379> srandmember myset
"kuangshen"
127.0.0.1:6379> srandmember myset
"kuangshen"
127.0.0.1:6379> srandmember myset
"lovekuangshen"
127.0.0.1:6379> srandmember myset 2 #随机抽选出两个元素
1) "lovekuangshen2"
2) "kuangshen"
127.0.0.1:6379> srandmember myset 22
1) "lovekuangshen2"
2) "lovekuangshen"
3) "kuangshen"
127.0.0.1:6379> srandmember myset 2
1) "lovekuangshen2"
2) "kuangshen"
127.0.0.1:6379> srandmember myset 2
1) "lovekuangshen2"
2) "lovekuangshen"随机删除key
127.0.0.1:6379> smembers myset
1) "lovekuangshen2"
2) "lovekuangshen"
3) "kuangshen"
127.0.0.1:6379> spop myset #随机删除一些set集合中的元素
"lovekuangshen"
127.0.0.1:6379> spop myset
"lovekuangshen2"
127.0.0.1:6379> smembers myset
1) "kuangshen"将一个指定的值 移动到另外一个指定的集合中
127.0.0.1:6379> sadd myset "hello"
(integer) 1
127.0.0.1:6379> sadd myset "wordl"
(integer) 1
127.0.0.1:6379> sadd myset "kuangshen"
(integer) 1
127.0.0.1:6379> sadd myset "set2"
(integer) 1
127.0.0.1:6379> smove myset myset2 "kuangshen" #将一个指定的值 移动到另外一个
(integer) 1 指定的集合中
127.0.0.1:6379> smembers myset
1) "wordl"
2) "set2"
3) "hello"
127.0.0.1:6379> smembers myset2
1) "kuangshen"微博 B站 共同关注!(并集)
数字集合类:
- 差集
- 交集
- 并集
127.0.0.1:6379> sadd key1 a
(integer) 1
127.0.0.1:6379> sadd key1 b
(integer) 1
127.0.0.1:6379> sadd key1 c
(integer) 1
127.0.0.1:6379> sadd key2 c
(integer) 1
127.0.0.1:6379> sadd key2 d
(integer) 1
127.0.0.1:6379> sadd key2 e
(integer) 1
127.0.0.1:6379> sdiff key1 key2 #差集
1) "a"
2) "b"
127.0.0.1:6379> sinter key1 key2 #交集 共同好友就是这样实现
1) "c"
127.0.0.1:6379> sunion key1 key2 #并集
1) "a"
2) "c"
3) "b"
4) "e"
5) "d"
微博 A用户将所有关注的人放在一个set集合中 将它的粉丝也放在一个集合中
共同关注 共同爱好 二度好友 推荐好友(六度分割理论)
Hash(哈希)
Map集合,key-value 这时候value是一个map集合 key-map!map又是<K,V> ,key-<key,value>
本质和String类型没有太大的区别
127.0.0.1:6379> hset myhash field1 kuangshen #set一个具体的key-val
(integer) 1
127.0.0.1:6379> hget myhash field1 #获取一个字段值
"kuangshen"
127.0.0.1:6379> hmset myhash field1 hello field2 world #set多个key-val
OK
127.0.0.1:6379> hmget myhash field1 field2 #获取多个字段值
1) "hello"
2) "world"
127.0.0.1:6379> hgetall myhash #获取全部字段值 以key-val展示
1) "field1" #key
2) "hello" #val
3) "field2" #key
4) "world" #val127.0.0.1:6379> hdel myhash field1 #删除指定的key字段 对应value值也就消失了
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "field2"
2) "world"hlen 获取长度
127.0.0.1:6379> hgetall myhash
1) "field2"
2) "world"
127.0.0.1:6379> hlen myhash #获取字段数量
(integer) 1127.0.0.1:6379> hexists myhash field1 #判断指定字段是否存在
(integer) 1
127.0.0.1:6379> hexists myhash field3
(integer) 0#只获得所有的key
#只获得所有的value
127.0.0.1:6379> hkeys myhash #只获得所有的key
1) "field2"
2) "field1"
127.0.0.1:6379> hvals myhash #只获得所有的value
1) "world"
2) "hello"incr decr 跟String类型基本相同
127.0.0.1:6379> hset myhash field3 5
(integer) 1
127.0.0.1:6379> hincrby myhash field3 1 #指定自增
(integer) 6
127.0.0.1:6379> hincrby myhash field3 -1
(integer) 5
127.0.0.1:6379> hsetnx myhash field4 hello #如果不存在则可以设置
(integer) 1
127.0.0.1:6379> hsetnx myhash field4 world #如果存在 则不可设置
(integer) 0
hash变更的数据user name age,尤其是用户信息之类的,经常变动的信息!hash更适合于对象的存储,String更加适合字符串的存储
Zset(有序集合)
在set的基础上,增加了一个值,set k1 v1 zset k1 score1 v1
127.0.0.1:6379> zadd myset 1 one #添加值
(integer) 1
127.0.0.1:6379> zadd myset 2 two
(integer) 1
127.0.0.1:6379> zadd myset 3 three
(integer) 1
127.0.0.1:6379> zrange myset 0 -1 #输出所有值
1) "one"
2) "two"
3) "three"排序如何实现
#ZRANGEBYSCORE key min max
127.0.0.1:6379> zadd salary 2500 xiaohong #添加三个用户
(integer) 1
127.0.0.1:6379> zadd salary 5000 zhangsan
(integer) 1
127.0.0.1:6379> zadd salary 500 kuangshen
(integer) 1
127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf #显示全部用户从小到大排序
1) "kuangshen"
2) "xiaohong"
3) "zhangsan"127.0.0.1:6379> ZREVRANGE salary 0 -1 #从大到小排序
1) "zhangsan"
2) "kuangshen"127.0.0.1:6379> ZRANGEBYSCORE salary -inf +inf withscores #附带成绩
1) "kuangshen"
2) "500"
3) "xiaohong"
4) "2500"
5) "zhangsan"
6) "5000"#移除rem中的元素
127.0.0.1:6379> zrange salary 0 -1
1) "kuangshen"
2) "xiaohong"
3) "zhangsan"
127.0.0.1:6379> zrem salary xiaohong #移除指定元素
(integer) 1
127.0.0.1:6379> zrange salary 0 -1
1) "kuangshen"
2) "zhangsan"127.0.0.1:6379> zcard salary #获取有序集合中的个数
(integer) 2127.0.0.1:6379> zadd myset 1 hello 2 world 3 kuangshen #多个添加
(integer) 3
127.0.0.1:6379> zcount myset 1 3 #获取区间内成员数量
(integer) 3
127.0.0.1:6379> zcount myset 1 2
(integer) 2
其余一些API,可以看看官方文档查看!
案例思路:比set过了排序 set能做的 它都可以 工资表排序
普通消息:1 重要消息:2 带权重进行判断
排行榜应用实现,取TopN测试!
三种特殊数据类型
geospatial地理位置
朋友的定位,附近的人,打车距离计算?
Redis的GEO在Redis3.2版本就推出了!这个功能可以推算地理位置的信息,两地之间的距离,方圆几里的人!
使用地理位置,需要地理位置数据可查询一些测试数据:城市经纬度查询-国内城市经度纬度在线查询工具
GEOADD
#geoadd 添加地理位置
#规则:两级无法直接添加,一般下载城市数据 直接使用Java程序导入
#有效的经度从-180度到180度。
有效的纬度从-85.05112878度到85.05112878度。
当坐标位置超出上述指定范围时,该命令将会返回一个错误。
参数key值(纬度,经度,名称)
127.0.0.1:6379> geoadd china:city 116.40 39.90 beijing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
(integer) 1
127.0.0.1:6379> geoadd china:city 106.50 39.53 chongqing 114.05 22.52 shenzheng
(integer) 2
127.0.0.1:6379> geoadd china:city 120.16 30.24 hangzhou 108.96 34.26 xian
(integer) 2
GEOPOS
127.0.0.1:6379> geopos china:city beijing #获取指定城市的经度和纬度
1) 1) "116.39999896287918091"
2) "39.90000009167092543"
127.0.0.1:6379> geopos china:city chongqing xian
1) 1) "106.49999767541885376"
2) "39.52999923987874098"
2) 1) "108.96000176668167114"
2) "34.25999964418929977"
GEODIST
如果两个位置之间的其中一个不存在, 那么命令返回空值。
指定单位的参数 unit 必须是以下单位的其中一个:
- m 表示单位为米。
- km 表示单位为千米。
- mi 表示单位为英里。
- ft 表示单位为英尺。
如果用户没有显式地指定单位参数, 那么 GEODIST
默认使用米作为单位。
GEODIST
命令在计算距离时会假设地球为完美的球形, 在极限情况下, 这一假设最大会造成 0.5% 的误差。
127.0.0.1:6379> geodist china:city beijing xian #查看北京到西安的直线距离
"910056.5237"
127.0.0.1:6379> geodist china:city beijing xian km #单位km 默认:m
"910.0565"
GEORADIUS 以给定的经纬度为中心,找出某一半径内的元素
我附近的人(获取所有附近的人的地址,定位!)通过半径来查询!
范围可以使用以下其中一个单位:
- m 表示单位为米。
- km 表示单位为千米。
- mi 表示单位为英里。
- ft 表示单位为英尺。
在给定以下可选项时, 命令会返回额外的信息:
WITHDIST
: 在返回位置元素的同时, 将位置元素与中心之间的距离也一并返回。 距离的单位和用户给定的范围单位保持一致。WITHCOORD
: 将位置元素的经度和维度也一并返回。WITHHASH
: 以 52 位有符号整数的形式, 返回位置元素经过原始 geohash 编码的有序集合分值。 这个选项主要用于底层应用或者调试, 实际中的作用并不大。
命令默认返回未排序的位置元素。 通过以下两个参数, 用户可以指定被返回位置元素的排序方式:
ASC
: 根据中心的位置, 按照从近到远的方式返回位置元素。DESC
: 根据中心的位置, 按照从远到近的方式返回位置元素。
在默认情况下, GEORADIUS 命令会返回所有匹配的位置元素。 虽然用户可以使用 COUNT <count>
选项去获取前 N 个匹配元素, 但是因为命令在内部可能会需要对所有被匹配的元素进行处理, 所以在对一个非常大的区域进行搜索时, 即使只使用 COUNT
选项去获取少量元素, 命令的执行速度也可能会非常慢。 但是从另一方面来说, 使用 COUNT
选项去减少需要返回的元素数量, 对于减少带宽来说仍然是非常有用的。
127.0.0.1:6379> GEORADIUS china:city 110 30 1000 km #以110,30经纬度为中心,寻找方圆1000km的城市 所有的数据都应该录入 china:city
1) "xian"
2) "shenzheng"
3) "hangzhou"
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km
1) "xian"
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km withcoord #显示他人定位信息
1) 1) "xian"
2) 1) "108.96000176668167114"
2) "34.25999964418929977"
127.0.0.1:6379> GEORADIUS china:city 110 30 500 km withcoord withdist count 1 #筛选出指定的结果
1) 1) "xian"
2) "483.8340"
3) 1) "108.96000176668167114"
2) "34.25999964418929977"
GEORADIUSBYMEMBER
#找出位于指定元素周围的其他元素!
127.0.0.1:6379> GEORADIUSBYMEMBER china:city beijing 1000 km
1) "beijing"
2) "xian"
3) "chongqing"
127.0.0.1:6379> GEORADIUSBYMEMBER china:city beijing 500 km
1) "beijing"
GEOHASH返回一个或多个位置元素的 Geohash 表示。
该命令将返回11个字符的Geohash字符串
#将二维的经纬度转化为一维的字符串,如果两个字符串越接近则距离越近
127.0.0.1:6379> geohash china:city beijing chongqing
1) "wx4fbxxfke0"
2) "wr58zknyvv0"
GEO底层的实现原理就是Zset!我们可以使用Zset命令可以操作GEO
127.0.0.1:6379> zrange china:city 0 -1 #查看地图中全部的元素
1) "xian"
2) "chongqing"
3) "shenzheng"
4) "hangzhou"
5) "shanghai"
6) "beijing"
127.0.0.1:6379> zrem china:city beijing #移除指定元素
(integer) 1
127.0.0.1:6379> zrange china:city 0 -1
1) "xian"
2) "chongqing"
3) "shenzheng"
4) "hangzhou"
5) "shanghai"
Hyperloglog
什么是基数?
A{1,3,5,7,8,7} B{1,3,5,7,8}
基数(不重复的元素) = 5,可以接受误差!
简介
Redis2.8.9版本就更新了Hyperloglog数据jieg!
Redis Hyperloglog 基数统计的算法!
优点:占用的内存是固定,2^64不同的元素的技术,只需要费12KB的内存!如果要从内存角度来比较的话Hyperloglog首选!
网页的UV(一个人访问一个网站多次,但是还是算作一个人)
传统的方式,set保存用户的id,然后就可以统计set中的元素数量作为标准判断!
这个方式如果保存大量的用户id,就会比较麻烦!我们的目的是为了技术,而不是保存用户id;
0.81%错误率!统计UV任务,可以忽略不计的!
测试使用
127.0.0.1:6379> PFADD mykey a b c d e f g h i j #创建第一组元素
(integer) 1
127.0.0.1:6379> PFCOUNT mykey #统计mykey元素的基数数量
(integer) 10
127.0.0.1:6379> PFADD mykey2 i j z x c v b n m #创建第二组元素
(integer) 1
127.0.0.1:6379> PFCOUNT mykey2
(integer) 9
127.0.0.1:6379> PFMERGE mykey3 mykey mykey2 #合并两组mykey mykey2 => mykey3 并集
OK
127.0.0.1:6379> PFCOUNT mykey3 #查看并集数量
(integer) 15
如果允许容错,那么一定可以使用Hyperloglog!
如果不允许容错,使用set或者自己的数据类型!
Bitmap
位存储
统计用户活跃 不活跃! 登录 未登录! 打卡 ! 两个状态的都可以使用Bitmap 0 1 !
Bitmap位图,数据结构!都是操作二进制位来进行记录,就只有0和1两个状态!
365天 = 365bit 1字节 = 8bit 大约需要46字节来存储一个人一年的打卡记录
使用bitmap来记录周一到周日的打卡!
周一:1 周二:0 周三:0 周四:1 .....
127.0.0.1:6379> setbit sign 0 1
(integer) 0
127.0.0.1:6379> setbit sign 1 0
(integer) 0
127.0.0.1:6379> setbit sign 2 0
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 4 1
(integer) 0
127.0.0.1:6379> setbit sign 5 0
(integer) 0
127.0.0.1:6379> setbit sign 6 0
(integer) 0
查看某一天是否有打卡!
127.0.0.1:6379> getbit sign 3
(integer) 1
127.0.0.1:6379> getbit sign 6
(integer) 0
统计操作!统计打卡天数
127.0.0.1:6379> bitcount sign #统计这周打卡记录,就可以看到是否全勤
(integer) 3
事务
MySQL:ACID!
要么同时成功,要么同时失败!(原子性)
Redis事务本质:一组命令的集合!一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行!
一次性、顺序性、排他性!执行一些列的命令!
队列 set set set 执行
Redis事务没有隔离级别的概念!
所有的命令在事务中,并没有直接被执行!只有发起执行命令的时候才会执行!Exec
Redis单条命令是保证原子性的,但Redis的事务是不保证原子性的!
redis的事务:
- 开启事务(MULTI)
- 命令入队(……)
- 执行事务(EXEC)
正常执行事务!
127.0.0.1:6379> MULTI #开启事务
OK#命令入队
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> get k2
QUEUED
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> exec #执行事务
1) OK
2) OK
3) "v2"
4) OK
放弃事务!
127.0.0.1:6379> MULTI #开启事务
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> DISCARD #取消事务
OK
127.0.0.1:6379> get k3 #事务队列中的命令都不会被执行!
(nil)
127.0.0.1:6379> get k1
(nil)
编译型异常(代码有问题!命令有错!),事务中所有的命令都不会被执行!
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> getset k3 #错误的命令
(error) ERR wrong number of arguments for 'getset' command
127.0.0.1:6379(TX)> set k4 v4
QUEUED
127.0.0.1:6379(TX)> set k5 v5
QUEUED
127.0.0.1:6379(TX)> EXEC #执行事务报错
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k5 #所有的命令都不会执行
(nil)
运行时异常(1/0),如果事务队列中存在语法性错误,那么执行命令的时候,其他命令是可以正常执行的,错误命令抛出异常!
127.0.0.1:6379> set k1 "v1"
OK
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> incr k1 #会执行的时候失败
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> get k3
QUEUED
127.0.0.1:6379(TX)> EXEC
1) (error) ERR value is not an integer or out of range #虽然第一条命令报错但是依旧执
2) OK 行成功
3) OK
4) "v3"
127.0.0.1:6379> get k2
"v2"
127.0.0.1:6379> get k3
"v3"
监控!Watch(面试常问!)
悲观锁:
- 很悲观,认为什么时候都会出现问题,无论做什么都加锁!
乐观锁:
- 很乐观,认为什么时候都不会出现问题,所以不会上锁!更新数据的时候判断一下在此期间是否有人修改数据
- 获取version
- 更新的时候比较version
Redis监视测试
正常执行成功!
127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money #监视money对象
OK
127.0.0.1:6379> MULTI #事务正常结束,数据期间没有发生变动,这个时候就正常执行
OK 成功!
127.0.0.1:6379(TX)> DECRBY money 20
QUEUED
127.0.0.1:6379(TX)> INCRBY out 20
QUEUED
127.0.0.1:6379(TX)> EXEC
1) (integer) 80
2) (integer) 20
测试多线程修改值,使用watch可以当做redis的乐观锁操作!
线程1
127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money #监视money
OK
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379(TX)> decrby money 10
QUEUED
127.0.0.1:6379(TX)> incrby out 10
QUEUED
127.0.0.1:6379(TX)> EXEC # 执行之前 另一个线程 修改了值 会导致事务执行失败
(nil)线程2
127.0.0.1:6379> get money
"100"
127.0.0.1:6379> set money 1000
OK
如果修改失败,获取最新的值就好
Jedis
使用Java来操作Reids
什么是jedis?是Redis官方推荐的Java连接开发工具!使用Java操作Redis中间件!如果你要使用Java操作redis,那么一定要对jedis十分熟悉!
知其然并知其所以然!
测试
1、导入对应的依赖
<!-- 导入jedis包 -->
<!-- https://mvnrepository/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.7.0</version>
</dependency>
<!-- fastjson -->
<!-- https://mvnrepository/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
2、编码测试:
- 连接数据库
- 操作命令
- 断开连接
连接:
public class TestPing {
public static void main(String[] args) {
//new jedis 对象
Jedis jedis = new Jedis("127.0.0.1",6379);
//jedis所有的命令都是redis所有的指令
System.out.println(jedis.ping());
}
}
输出:
Jedis常用API
String Set Zset Map List
所有的api命令,就是上面的指令,一个都没有变化
public class TestPing {
public static void main(String[] args) {
//new jedis 对象
Jedis jedis = new Jedis("127.0.0.1",6379);
//jedis所有的命令都是redis所有的指令
System.out.println(jedis.ping());
System.out.println("清空数据:"+jedis.flushDB());
System.out.println("判断某个键是否存在:"+jedis.exists("username"));
System.out.println("新增<'usename','kuangshen'>的键值对:"+jedis.set("username","kuangshen"));
System.out.println("新增<'password','password'>的键值对:"+jedis.set("password","password"));
System.out.println("系统中所有的键如下:");
Set<String> keys = jedis.keys("*");
System.out.println(keys);
System.out.println("删除键password:"+jedis.del("password"));
System.out.println("判断键password是否存在:"+jedis.exists("password"));
System.out.println("查看键username所存储的值的类型:"+jedis.type("username"));
System.out.println("随机返回key空间的一个:"+jedis.randomKey());
System.out.println("重命名key:"+jedis.rename("username", "name"));
System.out.println("取出改后的name:"+jedis.get("name"));
System.out.println("按索引查询:"+jedis.select(0));
System.out.println("删除当前选择数据库中的所有key:"+jedis.flushDB());
System.out.println("返回当前数据库中key的数目:"+jedis.dbSize());
System.out.println("删除所有数据库中的所有key:"+jedis.flushAll());
}
}
事务
public class TestTX {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1",6379);
jedis.flushAll();
JSONObject jsonObject= new JSONObject();
jsonObject.put("hello", "world");
jsonObject.put("name", "kuangshen");
//开启事务
Transaction multi = jedis.multi();
String g = jsonObject.toJSONString();
// jedis.watch(g); //乐观锁
try {
multi.set("user1", g);
multi.set("user2", g);
int i = 1/0;//代码抛出异常 事务执行失败
multi.exec();//执行事务
} catch (Exception e) {
// TODO: handle exception
multi.discard();//放弃事务
e.printStackTrace();
}finally {
System.out.println(jedis.get("user1"));
System.out.println(jedis.get("user2"));
jedis.close();//关闭连接
}
}
}
SpringBoot整合
SpringBoot操作数据:spring-data jpa jdbc mongodb redis!
SpringData也是和SpingBoot齐名的项目
说明:在springboot2.x之后,原来使用的jedis被替换为了lettuce
jedis:采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用jedis pool连接池!BIO
lettuce:采用netty,实例可以再多个线程中进行共享,不存在线程不安全的情况!可以减少线程数据,更像NIO模式
源码分析:
整合
1、导入依赖
<!-- 操作redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency
2、配置连接
#配置redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
3、测试
package com.example;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
@SpringBootTest
class RedisSpringbootApplicationTests {
@Autowired
private RedisTemplate redisTemplate;
@Test
void contextLoads() {
//redisTemplate 操作不同的数据类型 api和指令是一样的
//redisTemplate.opsForValue(); 操作字符串 类似String
//redisTemplate.opsForList(); 操作list 类似list
//redisTemplate.opsForSet()
//redisTemplate.opsForHash()
//redisTemplate.opsForZSet()
//redisTemplate.opsForGeo()
//redisTemplate.opsForHyperLogLog()
//除了基本的操作 常用的方法都可以通过redisTemplate操作 比如事务 和基本的CRUD
//获取redis的连接对象
//RedisConnection redisConnection = redisTemplate.getConnectionFactory().getConnection();
//redisConnection.flushDb();
//redisConnection.flushAll();
redisTemplate.opsForValue().set("mykey", "hzx真帅");
System.out.println(redisTemplate.opsForValue().get("mykey"));
}
}
自己编写RedisTemplate
@Configuration
public class RedisConfig {
//自己编写RedisTemplate
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
自己编写工具类RedisUtil
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
//真实的开发中 都可以看到封装的工具类
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通缓存获取
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
redis操作不是很难 重要的是要去理解redis的思想和每种数据结构的用处和作用场景!
Redis.conf详解
单位
配置文件units单位对大小写不敏感
包含 可以包含别的配置文件 组合成一个
网络 配置
bind 127.0.0.1 #绑定的IP
protected-mode yes #保护模式
port 6379 #端口设置
通用GENERAL
daemonize yes #以守护进程的方式运行 默认是no 开启时为yes
pidfile /var/run/redis_6379.pid #如果以后台的方式运行 需要指定一个pid文件
#日志
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably) 生成环境
# warning (only very important / critical messages are logged)
loglevel noticelogfile "" #日志文件的位置名 如果为空 就是标准的输出
database 16 #数据库数量 默认16个数据库 最大16个
always-show-logo no #启动logo
快照
save 900 1 #如果900s内 如果至少有一个1 key进行修改 进行持久化操作
save 300 10 #如果300s内 如果有10key进行了修改 进行持久化操作
save 60 10000 #如果60s内 如果有10000key进行修改 进行持久化操作
#之后学习持久化 会自己定义测试
stop-writes-on-bgsave-error yes #持久化如果出错 是否继续工作
rdbcompression yes #是否压缩rdb文件 需要消耗一些CPU资源 (rdb文件就是持久化文件)
rdbchecksum yes #保存rdb文件的时候 进行错误的校验检查
dir ./ #rdb文件保存目录 默认当前目录下
持久化,在规定的时间内 执行了多少次操作 则会持久化到文件 .rdb .aof
redis是内存数据库 如果没有持久化 那么数据库断点及失
REPLICATION 复制 单机搭建多个reids实现
SECURITY安全
requirepass #默认没有密码 可以设置redis密码
127.0.0.1:6379> config get requirepass #获取redis密码
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass "hzx*1027" #设置密码
OK
127.0.0.1:6379> config set requirepass "hzx*1027" #设置密码后有可能所有命令没有权限
OK
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "hzx*1027"
127.0.0.1:6379> auth hzx*1027 #使用密码进行验证登录
OK
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "hzx*1027"
CLIENTS限制
maxclients 10000 #默认连接redis最大客户端数量是10000
MEMORY MANAGEMENT
maxmemory <bytes> #redis配置内存最大的容量
maxmemory-policy noeviction #内存上限处理策略(6种),默认是第六种
- volatile-lru:只对设置了过期时间的key进行LRU(默认值)
- allkeys-lru : 删除lru算法的key
- volatile-random:随机删除即将过期key
- allkeys-random:随机删除
- volatile-ttl : 删除即将过期的
- noeviction : 永不过期,返回错误
APPEND ONLY MODE APPEND ONLY模式 AOF配置
appendonly no #默认不开启AOF模式的,默认使用rdb方式持久化 在绝大部分情况下 rdb完全够用
appendfilename "appendonly.aof" #持久化aof文件名字 默认"appendonly.aof"
# appendfsync always #每次修改都会sync 消耗性能
appendfsync everysec #每秒执行一次sync(同步),可能会丢失这1s的数据(这秒宕机)
# appendfsync no #不执行 sync 这个时候操作系统自己同步数据 数度最快
Redis持久化
Redis是内存数据库,如果不将内存中的数据库保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以Redis提供了持久化功能!
RDB(Redis DataBase)
什么是RDB?
在主从复制中,rdb就是备用!放在从机上
在指定的时间间隔内将内存中数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时将快照文件直接读到内存里。
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丢失。默认使用RDB,一般情况下不需要修改这个配置!
生产环境下对dump.rdb进行备份。
rdb保存的文件是dump.rdb 都是配置文件中快照中进行配置的
生成rdb文件触发规则
- save的规则满足的情况下,会自动触发rdb规则
- 执行flushall命令,也会触发rab规则
- 退出redis,也会触发
备份就自动生成一个dump.rdb
如何恢复rdb文件?
- 只需要将rdb文件放在redis启动目录就可以,redis启动的时候会自动检测dump.rdb恢复其中的数据
- 查看需要存在的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin" #如果在这个目录下存在dump.rdb文件,启动就会自动恢复其中数据
几乎默认的配置就够用了
优点:
- 适合大规模的数据恢复!dump.rdb别乱删
- 对数据的完整性要求不高!
缺点:
- 需要一定的时间间隔进行操作!如果redis意外宕机,最后一次修改数据就没有了。
- fork进程的时候,会占用一定的内存空间。
AOF(Append Only File)
将我们的所有命令都记录下来,history,恢复的时候就把这个文件全部都执行一遍!
以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据,换言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的恢复工作
Aof保存的是appendonly.aof文件
append
默认不开启 需要手动进行配置 只需要将appendonly 改为yes就开启了aof!
重启reids就可以生效了!
如果aof文件有错误,redis启动不起来,需要修复aof文件,redis提供了一个工具redis-check-aof
如果文件正常,重启就可以直接恢复!丢掉错误数据。
重写规则说明
aof默认文件无限追加,文件越来越大
如果aof文件大于64M 就会fork一个新的进程进行aof重写。
优点:
- 每次修改都同步,文件的完整性更加好
- 每秒同步一次,可能会丢失一秒的数据
- 不打开从不同步,效率最高
缺点:
- 相对于数据文件来说,aof远远大于rdb,修复的速度也比rdb慢
- aof(IO操作)运行效率也要比rdb慢,所以redis默认持久化是rdb
扩展:
- RDB持久化方式能够在指定的时间间隔内对你的数据进行快照存储
- AOF持久化方式记录每次对服务器写的操作,当服务器重启的时候会重新执行这些命令来恢复原始的数据,AOF命令以Redis协议追加保存每次写的操作到文件末尾,Redis还能对AOF文件进行后台重写,使得AOF文件的体积不至于过大。
- 只做缓存,如果你只希望你的数据在服务器运行的时候存在,你也可以不使用任何持久化
- 同时开启两种持久化方式 ·在这种情况下,当redis重启的时候会优先载入AOF文件来恢复原始的数据,因为通常情况下AOF文件保存的数据集要比RDB文件保存的数据集要完整。 ·RDB的数据不实时,同时使用两者时服务器重启也只会找AOF文件,那要不要使用AOF文件呢?建议不要,因为RDB更适合用于备份数据库(AOF在不断变化不好备份),快速重启,而且不会有AOF可能存在潜在的BUG,留着作为一个万一的手段。
- 性能建议
- 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次就够了,只保留save 900 1 这条规则。
- 如果Enable AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了,代价一是带来了持续的IO,二是AOF rewrite的最后将rewrite过程种产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上,默认超过原大小100%重写可以改到适当的数值。
- 如果不Enable AOF,仅靠Master-Slave Repllcation实现高可用性也可以,能省掉一大笔IO,也减少了rewrite时带来的系统波动。代价是如果Master/Slave同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个Master/Slave中的RDB文件,载入较新的那个,微博就是这种架构。
Redis发布订阅
通信 队列 发送者========订阅者
Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接受消息。
Redis客户端可以订阅任意数量的频道。
订阅/发布消息图:
(第一个:消息发布者,第二个:频道,第三个:消息订阅者)
下图展示了频道channel1,以及订阅这个频道的三个客户端--client2、client5和client1之间的关系:
当有新消息通过PUBLISH命令发送给频道channel1时,这个消息就会被发送给订阅它的三个客户端:
命令
这些命令被广泛用于构建即时通信应用,比如网络聊天室(chatroom)和实时广播、实时提醒等。
测试
订阅端:
127.0.0.1:6379> SUBSCRIBE kuangshenshuo #订阅一个频道
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "kuangshenshuo"
3) (integer) 1
#等待读取推送的信息
1) "message" #消息
2) "kuangshenshuo" #消息的频道
3) "hello" #消息内容
1) "message"
2) "kuangshenshuo"
3) "hello,redis"
发送端:
127.0.0.1:6379> PUBLISH kuangshenshuo
(error) ERR wrong number of arguments for 'publish' command
127.0.0.1:6379> PUBLISH kuangshenshuo "hello" #f发布者发布消息到频道
(integer) 1
127.0.0.1:6379> PUBLISH kuangshenshuo "hello,redis"
(integer) 1
原理
Redis是使用C实现的,通过分析Redis源码里的pubsub.c文件,了解发布和订阅机制的底层实现,借此加深对Redis的理解。
Reids通过PUBLISH、SUBSCRIBE和PSUBSCRIBE等命令实现发布和订阅功能。
通过SUBSCRIBE命令订阅某频道后,reids-server里维护了一个字典,字典的键就是一个个channel,而字典的值则是一个链表,链表中保存了所有订阅这个channel的客户端。SUBSCRIBE命令的关键,就是将客户端添加到给定channel的订阅链表中。
通过PUBLISH命令向订阅者发送消息,redis-server会使用给定的频道作为键,在它所维护的channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
Pub/Sub从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,你可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
使用场景:
- 实时消息系统
- 实时聊天(频道当作聊天室,消息回显给所有人)
- 订阅关注系统
稍微复杂的场景会使用消息中间件如MQ...
Redis主从复制
概念
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器。前者称为主节点(master/leader),后者称为从节点(slave/follower);数据的复制是单向的,只能由主节点到从节点。Master以写为主,Slave以读为主。
默认情况下,每台Redis服务器都是主节点;
且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。
主从复制的作用主要包括:
1、数据冗余︰主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
2、故障恢复∶当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。
3、负载均衡∶在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
4、高可用基石︰除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的(宕机),原因如下:
1、从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大;
2、从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。
电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是"多读少写"。对于这种场景,我们可以使如下这种架构︰
主从复制,读写分离!80%的情况下都是在进行读操作!减缓服务器的压力!架构中经常使用!一主二从!
只要在公司中,主从复制就是必须要使用的,因为在真实的项目中不可能单机使用Redis!
环境配置
只配置从库,不用配置主库!
127.0.0.1:6379> INFO replication #查看当前库的信息
# Replication
role:master #角色
connected_slaves:0 #没有从机
master_failover_state:no-failover
master_replid:a070e17cb74c2b1b9a9172eaef13c1ff5b654407
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
复制3个配置文件,然后修改对应的信息
- 端口
- pid名字
- log文件名字
- dump.rdb名字
修改完毕之后,启动我们的3个redis服务器,可以通过进程信息查看
一主二从
默认情况下,每台Redis服务器都是主节点;我们一般情况下只用配置从机。
认老大!一主(6379)二从(6380、6381)
127.0.0.1:6380> SLAVEOF 127.0.0.1 6379 #SLAVEOF host port 找谁当老大
OK
127.0.0.1:6380> info replication
# Replication
role:slave #当前角色是从机
master_host:127.0.0.1 #可以看到主机信息
master_port:6379
master_link_status:up
master_last_io_seconds_ago:0
master_sync_in_progress:0
slave_repl_offset:28
slave_priority:100
slave_read_only:1
replica_announced:1
connected_slaves:0
master_failover_state:no-failover
master_replid:0cc051bd8d9e9601ae838a03b3d3fde324498532
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:28
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:28
#从主机查看
127.0.0.1:6379> INFO replication
# Replication
role:master
connected_slaves:2 #多了从机的配置
slave0:ip=127.0.0.1,port=6380,state=online,offset=224,lag=0 #从机配置
slave1:ip=127.0.0.1,port=6381,state=online,offset=224,lag=0
master_failover_state:no-failover
master_replid:0cc051bd8d9e9601ae838a03b3d3fde324498532
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:224
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:224
如果两个都配置完了,就会有两个从机。
真实的主从配置应该在配置文件中配置,这样的话是永久的,这里是命令配置是暂时的
细节
主机可以写,从机不能写只能读!主机中所有的信息和数据,都会自动被从机保存
主机写:
从机只能读取内容:
测试:主机断开连接,从机依旧连接到主机的,但是没有写操作,这个时候,主机如果回来了,从机依旧可以直接获取到主机写的信息!
如果是使用命令行,来配置的主从,从机这个时候如果重启,就会变为主机!只要变为从机,立马就会从主机中获取值!
复制原理
Slave启动成功连接到master后会发送一个sync同步命令
Master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave ,并完成一次完全同步。
全量复制:而slave服务在接收到数据库文件数据后,将具存盘并加载到内存中。
增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步
但是只要是重新连接master ,一次完全同步(全量复制)将被自动执行!我们的数据一定可以在从机中看到!
层层链路
上一个M连接下一个S
这时候也可以完成我们的主从复制
如果没有主机了,这个时候能不能选出一个主机出来?手动!
如果主机断开了连接,我们可以使用SLAVEOF no one 让自己变成主机!其他的节点就可以手动连接到最新的这个主机(手动)!如果这个时候以前的主机修复,就需要重新配置主机。
哨兵模式
(自动选取主机)
概述
主从切换技术的方法是︰当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内服务不可用。这不是一种推荐的方式,更多时候,我们优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel (哨兵)架构来解决这个问题。
谋朝篡位的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
这里的哨兵有两个作用:
- 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
- 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其他的从服务器,修改配置文件,让它们切换主机。
然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,我们可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。
测试
我们目前的状态是一主二从。
1、配置哨兵配置文件sentinel.conf
#sentinel monitor 被监控的名称 host port 1
sentinel monitor myredis 127.0.0.1 6379 1
后面的数字1,代表主机挂了,slave投票看谁接替成为主机,票数最多的,就会成为主机
2、启动哨兵
如果Master节点断开了,这个时候就会从从机中随机选择一个主机(这里有一个投票算法)
哨兵日志:
哨兵模式
如果主机此时回来了,只能归并到新的主机下,当作从机,这就是哨兵模式的规则。
优点:
- 哨兵集群,基于主从复制模式,所有的主从配置优点,它全有
- 主从可以切换,故障可以转移,系统的可用性就会更好
- 哨兵模式就是主从模式的升级,手动到自动,更加健壮
缺点:
- Redis不好在线扩容,集群容量一旦到达上限,在线扩容就十分麻烦
- 实现哨兵模式的配置其实很麻烦,里面有很多选择
哨兵模式全部配置
# Example sentinel.conf
# 哨兵sentinel实例运行的端口 默认26379 如果有哨兵集群,我们还需要配置每个哨兵端口
port 26379
# 哨兵sentinel的工作目录
dir /tmp
# 哨兵sentinel监控的redis主节点的 ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 1
# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000
# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,
# 这个数字越小,完成failover所需的时间就越长,
# 但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
# 可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1
# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000
# SCRIPTS EXECUTION
#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。
#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,
#这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,
#一个是事件的类型,
#一个是事件的描述。
#如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。
#通知脚本
# sentinel notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh
# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
# 以下参数将会在调用脚本时传给脚本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
# 目前<state>总是“failover”,
# <role>是“leader”或者“observer”中的一个。
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentinel client-reconfig-script <master-name> <script-path>
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
Redis缓存穿透和雪崩(面试高频,工作常用)
Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。
另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。
缓存穿透
概念
缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中,于是都去请求了持久层数据库。这会给持久层数据库造成很大的压力,这时候就相当于出现了缓存穿透。
解决方案
布隆过滤器
布隆过滤器是一种数据结构,对所有可能查询的参数以hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力;
缓存空对象
当存储层不命中后,即使返回的空对象也将其缓存起来,同时设置一个过期时间,之后再访问这个数据将会从缓存中获取,保护了后端数据源;
但是这种方法会存在两个问题:
1、如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键
2、即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。
缓存击穿
概述
这里需要注意和缓存击穿的区别,缓存击穿,堤指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。
当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。
解决方案
设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点 key过期后产生的问题。
加互斥锁
分布式锁∶使用分布式锁,保证对于每个key同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
缓存雪崩
概述
缓存雪崩,是指在某一个时间段,缓存集中过期失效。Redis宕机!
产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商品时间比较集中的放入了缓存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都过期了。而对这批商品的访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。
其实集中过期,倒不是非常致命,比较致命的缓存雪崩,是缓存服务器某个节点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓存,这个时候,数据库也是可以顶住压力的。无非就是对数据库产生周期性的压力而已。而缓存服务节点的宕机,对数据库服务器造成的压力是不可预知的,很有可能瞬间就把数据库压垮。
解决方案
redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis ,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。(异地多活)
限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。
关注狂神说Java 免费教学资料
更多推荐
Redis---学习笔记
发布评论