
NoSQL
NoSQL特点
1、方便扩展(数据之间没有关系,很好扩展!)
2、大数据量高性能(Redis一秒写8万次,读取11万,NoSQL的缓存记录级,是一种细粒度的缓存,性能会比较高)
3、数据类型是多样型的!(不需要事先设计数据库!随取随用!如果是数据量十分大的表,很多人就无法设计了)
4、传统RDBMS和NoSQL
传统的RDBMS - 结构化组织 - SQL - 数据和关系都存在单独的表中 - 操作操作,数据定义语言 - 严格的一致性 - 基础的事务
|
Nosql - 不仅仅是数据 - 没有固定的查询语言 - 键值对存储,列存储,文档存储,图形数据库(社交关系) - 最终一致性, - CAP定理 和 BASE(异地多活) - 高性能,高可用,高可扩
|
3V+V高
1.海量Volume
2.多样Variety
3.实时Velocity
1.高并发
2.高可拓
3.高性能
四大分类
列存储数据库
HBase
分布式文件系统
图关系数据库
不是存图片,是存关系(类似:社交)
四大数据库区别

Redis
Attempt
概述
Redis(Remote Dictionary Server),即远程字典服务
一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Vlue数据库,并提供多种语言的API
Redis功能
1、内存存储、持久化,内存中是断电即失、所以说持久化很重要(rdb、aof)
2、效率高,可以用于高速缓存
3、发布订阅系统、
4、地图信息分析
5、计时器、计数器(浏览量!)
特性
1、多样的数据类型
2、持久化
3、集群
4、事务
安装
Wdinow –》Github :Release 3.2.100 · microsoftarchive/redis · GitHub
Window
1、启动:redis-server.exe
2、使用Redis客户端(redis-cli.exe)连接redis

Linux
Linux下安装Redis详解
1、解压
tar -zxvf redis-7.0.12.tar.gz
|
2、进入解压后的文件
3、安装c++
4、配置所有的文件
5、redis的默认安装路径、/usr/local/bin

6、将redis的配置文件拷贝到 /usr/local/bin/自定义文件夹/,以后所有使用的配置文件使用其中的
mkdir chenconfig
cp /opt/redis-7.0.12/redis.conf chenconfig
|
7、修改配置文件为 默认启动
启动、连接、关闭
redis-server chenconfig/redis.conf
redis-cli -p 6379 ping
shutdown exit
ps -ef|grep redis
|
docker pull redis
mkdir -p /mydata/redis/conf/ mkdir -p /mydata/redis/data/ chmod -R 777 /mydata/ cd /mydata/redis/conf
wget http://download.redis.io/redis-stable/redis.conf
sudo docker run -p 6379:6379 --name redis -v /mydata/redis/conf:/etc/redis/redis.conf -v /mydata/redis/data:/data -d redis redis-server /etc/redis/redis.conf --appendonly yes --requirepass "密码"
docker start redis
docker exec -it redis redis-cli
auth 密码
cd /mydata/redis/conf vi redis.conf
appendonly yes
protected-mode no
daemonize yes
bind 127.0.0.1
requirepass 密码
|
benchmark 性能测试

redis-benchmark -h localhost -p 6379 -c 100 -n 100000
|

基础知识
@Autowired private StringRedisTemplate stringRedisTemplate;
|
redis默认有16个数据库,默认第0个数据库
select 数据库下标
dbszie / DBSIZE
keys * stringRedisTemplate.opsForValue().getOperations().keys("*")
flushdb
flushall
|
RedisConnection connection = stringRedisTemplate.getConnectionFactory().getConnection(); connection.flushDb(); connection.flushAll();
|
Redis是单线程的:官方表示,Redis;是基于内存操作,CPU不是Redis’性能瓶颈,Redis的瓶颈是根据机器的内存和网络带宽
五大数据类型

Redis-Key
keys *
set key value stringRedisTemplate.opsForValue().set("key","value");
get key stringRedisTemplate.opsForValue().get("key")
type key
exists key
move key dbNumbern
expire key time(s) stringRedisTemplate.opsForValue().getAndExpire("key", Duration.ofDays(Time))
ttl key
|
String(字符串)
mset Key Value Key Value
append key addValue
strlen key
incr key stringRedisTemplate.opsForValue().increment("key")
decr key
incrby key n
decrby key n
getrange key startIndex endIndex
SETRANGE key index string
setnx key string
setex key time string stringRedisTemplate.boundValueOps("moeny").set("99",5, TimeUnit.MINUTES);
set key value NX EX 300 SET productld:lock 0xx9p03001 NX EX 30000
set user:1 {name:chen,age:2}
|
String类以的使用场景:value除了是我们的字符串还可以是我们的数字!
List(列表)
基本类型:列表

在redis里面,Iist可设置成为,栈、队列、阻塞队列!
所有的List命令都是用 l 来头的
lpush mylist value
rpush mylist value
lrange mylist starIndex endIndex
lpop mylist
rpop mylist
rpoplpush mylist mylist2
lindex mylist 1
llen mylist
lrem mylist 1 0
ltrim mylist 1 2
EXISTS mylist lset mylist 0 chen
linsert mylist before/after a b
|
总结:
- 实际上是一个链表,before Node after,left,right都可以插入值
- 如果key不存在,创建新的链表
- 如果key存在,新增内容
- 如果移除了所有值,空链表,也代表不存在!
- 在两边插入或者改动值,效率最高!中间元素,相对来说效率会低一点
set(无序集合)
sadd myset chen
smembers myset
sismembers myset chen
scard myset
srandmember myset
smove myset1 myset2 chen
sdiiff seta setb
sinter seta setb
sunion seta setb
|
Hash(哈希)
Map集合,key-map 时候这个值是一个map集合!
hset myhast chen 12
hmset myhash chen 1 peng 2
hget myhast chen
hmget myhash chen peng
hgetall myhash
hdel myhash chen
hlen myhash
hexists myhash peng
hkeys muhash
hvals muhash
hincrby myhash a 1
hsetnx myhash a 1
|
hash变更的数据user name age,尤其是是用户信息之类的,经常变动的信息!hash更适合于对象的存储,String更加适合字符串存储!
zset(有序集合)
- 在set的基础上新增位置选择,可实现排序(将参与排序的字段设置为位置号码)
zadd myzset 0 chen
zadd myzset 0 chen 1 peng
zrange myzset 0 -1
zadd money 100 a 200 b 300 c 5000 d 6000 e 7000 f zrangebyscore money -inf +inf zrevrange money 0 -1 zrangebyscore money -inf +inf withscores
zrem money f
zcard myzset
zcount chen 1 3
|
案例思路:st排序存储班级成绩表,工资表排序!
普通消息,1,重要消息2,带权重进行判断!
排行榜应用实现
三大特殊类型数据
geospatial 地理位置
- 底层还是 zser (可使用 zrem删除、zrange查看)
测试数据:经纬度查询定位
geoadd : 往集合添加元素
geoadd china:city 113.27079 23.13567 guangzhou
|
**geopos ** : 获取
geopos china:city guangzhou
|
geohash : 获取集合中的元素的经纬度返回的是哈希值:降为一维
geohash china:city shenzheng
|
geodist: 两点距离
geodist china:city guangzhou shenzheng 单位
|
georadius : 以当前经纬度为中心寻找某半径的元素
- 获取指定的数量:count x
- 获取之间距离:withdist
- 获取对方的经纬度:withcoord
georadius china:city 中心经纬度 半径长度 单位
georadius china:city 中心经纬度 半径长度 单位 count 数量 withdist withcoord
|
georadiusbymember :以列表中某一地点为中心寻找半径内的元素
georadiusbymember china:city shenzheng 1000 km
|
Hyperloglog 基数统计
优点:占用的内存是固定,2^64不同的元素的技术,只需要废12KB内存!
网页的UV(一个人访问一个网站多次,但是还是算作一个人)
PV:可重复
传统的方式,set保存用户的id,然后就可以统计set中的元素数量作为标准判断!
这个方式如果保存大量的用户d,就会比较麻烦!我们的目的是为了计数,而不是保存用户id;
pfadd mykey a b c d e f g
pfcount mykey
pfmerge mykey3 mykey1 mykey2
|
Bitmaps 位图场景
位存储
统计用户信息,活跃,不活跃!登录、未登录!打卡,365打卡!两个状态的,都可以使用Bitmaps!
Bitmaps位图,数据结构!都是操作二进制位来进行记录,就只有0和1两个状态
setbit : 记录某一天的状态 0 / 1
setbit sign 0 1 setbit sign 1 1
getbit sign 0
bitcount sign
|
事务
Rdis事务本质:一组命令的集合!一个事务中的所有命令都会被序列化,在事务执行过程的中,会按照顺序执行!
一次性、顺序性、排他性!执行一些列的命令!
Redis事务没有没有隔离级别的概念!
所有的命令在事务中,并没有直接被执行!只有发起执行命令的时候才会执行!Exec
==Redis.单条命令式保存原子性的1,但是事务不保证原子性!==
redis的事务:
- 开启事务(multi)
- 命令入队(…)
- 执行事务(exec)
正常执行事务
multi
127.0.0.1:6379(TX)> set k1 1 QUEUED 127.0.0.1:6379(TX)> set k2 2 QUEUED 127.0.0.1:6379(TX)> set k3 3 QUEUED
exec
|
放弃执行事务
注意:
Redis实现乐观锁
Watch : 监控
unwatch : 取消监控
watch money :OK multi :OK decrby money 30 :QUEUED incrby out 30 :QUEUED exec :(nil)
|
Jedis
Jedis是Redis官方推荐的java连接开发工具!使用java操作Redis中间件
1、依赖
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>4.4.3</version> </dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency>
|
2、编码测试
Jedis jedis = new Jedis("127.0.0.1",6379);
System.out.println(jedis.ping());
|
Jedis jedis = new Jedis("127.0.0.1",6379);
System.out.println(jedis.ping()); jedis.flushDB();
System.out.println("清空数据:"+jedis.flushDB()); System.out.println("判断某个键是否存在:"+jedis.exists("username")); System.out.println("新增<'username','chen'>的键值对:"+jedis.set("username'","chen")); System.out.println("新增<'password','password'>的键值对:"+jedis.set("password","password")); System.out.println("系统中所有的键如下:"); Set<String> keys = jedis.keys("*"); System.out.println(keys); System.out.println("password:"+jedis.del("password")); System.out.println("判断键password是否存在:"+jedis.exists("password")); System.out.println("查看键username所存储的值的类型:"+jedis.type("username")); System.out.println("随机返回key空间的一个:"+jedis.randomKey()); System.out.println("取出改后的name:"+jedis.get("name")); System.out.println("按索引查询:"+jedis.select(0)); System.out.println("删除当前选择数据库中的所有key:"+jedis.flushDB()); System.out.println("返回当前数据库中key的数目:"+jedis.dbSize()); System.out.println("删除所有数据库中的所有key:"+jedis.flushAll());
|
SpringBoot整合
Reids工具类
1、依赖项

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>
|
说明:在SpringBoot22.x之后,原来使用的jedis被替换为了lettuce
jedis:采用的直连,多个线程操作的话,是不安全的,如果想要避免不安全的,使用jedis pool连接池!
lettuce:采用netty,实例可以再多个线程中进行共享,不存在线程不安全的情况!
2、配置
- 源码 RedisAutoConfiguration
spring: redis: host: 192.168.238.3 port: 6379 lettuce: pool: #连接池 max-active: 8 #最大连接 max-idle: 8 #最大空闲连接 min-idle: 0 #最小空闲连接 max-wait: 100 #连接等待时间
|
3、测试
RedisTemplate
@Autowired private RedisTemplate redisTemplate;
@Test void contextLoads() {
redisTemplate.opsForValue().set("mykey","辰呀"); System.out.println(redisTemplate.opsForValue().get("mykey")); User user = new User("辰呀", 9); redisTemplate.opsForValue().set("user",user); System.out.println(redisTemplate.opsForValue().get("user"));
}
|
StringRedisTemplate
@Resource private StringRedisTemplate stringRedisTemplate;
private static final ObjectMapper mapper = new ObjectMapper();
User user = new User("辰呀", 9); String s = mapper.writeValueAsString(user);
stringRedisTemplate.opsForValue().set("user",s); String value = stringRedisTemplate.opsForValue().get("user");
mapper.readValue(value, User.class);
String key = KEY_SMS_CODE_REG + phone; stringRedisTemplate.boundValueOps(键).set(值,5, TimeUnit.MINUTES); Long incr = stringRedisTemplate.boundValueOps(键).increment();
stringRedisTemplate.hasKey(key) String querySmsCode = stringRedisTemplate.boundValueOps(key).get();
stringRedisTemplate.boundZSetOps(Zset的键).add(键,值);
|
4、序列化
- 解决写入乱码问题(传递是对象的时候需要对对象进行序列化,两种方法:①转为json ②实体类上实现序列化)
- 数据的存储和传输
String s = new ObjectMapper().writeValueAsString(user); redisTemplate.opsForValue().set("user",s);
public class User implements Serializable { }
|
5、自定义RedisTemplate
@Configuration public class RedisConfig {
@Bean @SuppressWarnings("all") public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<String,Object>(); template.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> objectJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); objectJackson2JsonRedisSerializer.setObjectMapper(mapper); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer); template.setHashKeySerializer(stringRedisSerializer); template.setValueSerializer(objectJackson2JsonRedisSerializer); template.setHashKeySerializer(objectJackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; }
}
|
Redis.conf
cd /mydata/redis/conf vi redis.conf
protected-mode no
daemonize yes
bind 127.0.0.1
pidfile /var/run redis_6379.pid
loglevel notice logfile "文件名"
database 16
|
快照
持久化,在规定的时间内,执行了多少次操作,则会持久化到文件.rdb .aof
save 900 1 save 300 10
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dir ./
|
security : 安全
config get requirepass
requirepass root
config set requirepass "密码"
auth 密码
|
clients : 客户端
maxclients 10000
maxmemory <bytes>
maxmemory-policy noeviction
当 Redis 内存使用达到 maxmemory 时,需要选择设置好的 maxmemory-policy 进行对数据进行淘汰机制。
1.volatile-lru(least recently used):最近最少使用算法,从设置了过期时间的键key中选择空转时间最长的键值对清除掉;
2.volatile-lfu(least frequently used):最近最不经常使用算法,从设置了过期时间的键中选择某段时间之内使用频次最小的键值对清除掉;
3.volatile-ttl:从设置了过期时间的键中选择过期时间最早的键值对清除;
4.volatile-random:从设置了过期时间的键中,随机选择键进行清除;
5.allkeys-lru:最近最少使用算法,从所有的键中选择空转时间最长的键值对清除;
6.allkeys-lfu:最近最不经常使用算法,从所有的键中选择某段时间之内使用频次最少的键值对清除;
7.allkeys-random:所有的键中,随机选择键进行删除;
8.noeviction:不做任何的清理工作,在redis的内存超过限制之后,所有的写入操作都会返回错误;但是读操作都能正常的进行;
|
append only模式 aof配置
appendonly no appendfilename "appendonly.aof"
appendfsync always appendfsync everysec appendfsync no
|
Redis持久化
Rdis是内存数据库,如果不将内存中的数据库状态保存到磁盘,那么一旦服务器进程退出,服务器中的数据库状态也会消失。所以Redis提供了持久化功能!
RDB(Redis DataBase)

save 900 1 save 300 10
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dir ./
|
rdb持久化触发规则
1、save的规则满足的情况下,会自动触发rdb规则
2、执行flushall命令,也会触发我们的rdb规则I
3、退出redis,也会产生rdb文件!
恢复rdb文件
- 只需要将rdb文件放在我们redis,启动目录就可以,redis启动的时候会自动检查dump.rdb恢复其中的数据
- 查看需要存放的位置,连接后查看
127.0.0.1:6379> config get dir 1) "dir" 2) "/data"
|
AOF(Append Only File)
将所有的命令全部都记录下来,history,恢复的时候就把这个文件命令全部执行一遍

appendonly no
redis-check-aof --fix appendonly.aof
|
appendonly no appendfilename "appendonly.aof"
appendfsync always appendfsync everysec appendfsync no
|
Redis发布订阅
订阅 subscribe
subscribe chen
psubscribe chen peng
|
发布 publish
Redis主从复制

环境配置
info replication 127.0.0.1:6379> info replication
role:master connected_slaves:0
|
一主二从
==默认情况下,每台Redis服务器都是主节点==,配置从机就好
- 配置从机后,主机会发送整个数据文件到从机,完成数据同步
- **全量复制 : ** 数据同步,连接上主机
- 增量复制 : 数据同步后主机每次更改都会同步过去
salveof 192.168.238.3 6379
|
- 只有主机可以写,从机只能读,主机内容会被同步到从机
主机宕机:
主机断开连接,从机依旧连接到主机的,但是没有写操作,这个时候,主机如果回来了,从机依旧可以直接获取到主机写的信息!
从机宕机
如果是使用命令行,来配置的主从,这个时候从机如果更启了,数据丢失,就会变回主机,但要设置回从机后,立马获取回主机的全部数据
两主一从
主机 – 从机(下从机的主机) – 从机
- 中间的依然是从机(solve)身份:无法写入,只能读
变为主机–主机宕机:其他节点手动连接到该新的主节点
- 主机重启后,该节点依然为主节点,只能重新配置为从节点
哨兵模式【*】

防止单个哨兵下线

测试:一主二从
1、配置哨兵文件
vim sentinel.conf
sentinel monitor myreids 192.168.238.3 6379 1
|
2、启动哨兵
redis-sentinel 配置文件/sentinel.conf
|
如果Master节点断开了,这个时候就会从从机中随机选择一个服务器!(这里面有一个投票算法!)
如果主机恢复了,只能归并到新的主机下,当做从机
Redis缓存

缓存一致性

- 防止写操作过多,选择删除缓存,在查询的时候再更新缓存,实现延迟加载,不然每次更新数据库都更新缓存,无效写操作较多
- 保证原子性:单体系统,将缓存与数据库操作放住一个事务中;分布式系统:使用TCC分布式事务
- 线程安全问题:

- 调用者(增删查改)只在缓存中操作,维护一个线程异步将缓存中的操作一次性批处理持久化到数据库中,保存一致性

延迟双删
先删除redis,再更新数据库,延迟N秒后再删除一次redis。
缓存穿透和雪崩
缓存穿透(缓存中没有)

布隆过滤器
缓存空对象
缓存击穿(查询单点高并发)
概念
缓存击穿,是指一个ky非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个ky在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库
当某个ky在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数据,并且回写缓存,会导使数据库瞬间压力过大。
解决方案
1、设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以不会出现热点ky过期后产生的问题。
2、加互斥锁
分布式锁:使用分布式锁,保证对于每个ky同时只有一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
缓存雪崩
概述
缓存雪崩,是指在某一个时间段,缓存集中过期失效。Redis宕机!
访问查询,都落到了数据库上,对于数据库而言,就会产生周期性的压力波峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂掉的情况。

解决方案:
redisi高可用
既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。(异地多活!)
限流降级
在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个ky只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的ky,设置不同的过期时间,让缓存失效的时间点尽量均匀。
分布式锁
使用setnx,存在在创建失败,服务进行创建,如果能创建则默认为获取到锁,创建失败则获取锁失败,业务完成需要释放锁(删除key 、设置过期时间)
set key value NX EX 300 SET productld:lock 0xx9p03001 NX EX 30000
|



Redisson


1、依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.22.1</version> </dependency>
|
2、配置类【推荐】

@Configuration public class RedisConfig { @Bean public RedissonClient redissonClient(){ Config config = new Config(); config.useSingleServer().setAddress("redis://47.115.222.113:6379"); return Redisson.create(config); } }
|
3、获取锁 Rlock
多了设置等待时间。实现重试 Rlock 是 使用Hash类型

RLock lock = redissonClient.getLock("lock:order" + userID); boolean isLock = lock.tryLock(1, 30, TimeUnit.SECONDS); if (isLock){ try { System.out.println("执行业务"); }finally { lock.unlock(); } }
|
可重入锁
同一线程:线程持有一把锁,但线程可以多次获取同一把锁,而不会被锁本身所阻塞。这是可重入锁的一个主要特性。
- 可重入原理:判断是不是同一个线程,是的话给锁 ,需要记录当前线程标识和重入次数



Rlock底层就算是这lua脚本,所以我们直接使用RLock即可
private RLock lock;
@BeforeEach void setUp(){ lock=redissonClient.getLock("order"); }
@Test void method1(){ boolean isLock = lock.tryLock(); if (!isLock){ log.error("获取锁失败.....1"); return; } try { log.info("获取锁成功......1"); method2(); log.info("开始执行业务.....1"); }finally { log.warn("准备释放锁.....1"); lock.unlock(); } }
@Test void method2(){ boolean isLock = lock.tryLock(); if (!isLock){ log.error("获取锁失败.....2"); return; } try { log.info("获取锁成功......2"); log.info("开始执行业务.....2"); }finally { log.warn("准备释放锁.....2"); lock.unlock(); } }
|
超时释放
- 使用watchdog,不可以自己设置ttl,使用默认30s,如果ttl到期,业务还没有完成,自动续加30/3=10秒,一直续加到业务完成释放锁就删除

实践
延时任务
