项目来自黑马程序员的黑马点评,个人认为是很好的SpringBoot+Redis教程项目。不过疑似有点烂大街了。
视频教程:黑马点评项目实战,掌握企业实战项目真实应用场景,一套精通redis缓存技术_哔哩哔哩_bilibili
我的实现:gitee.com/wkling/Dianping
CSDN上优质的文字教程:黑马点评项目学习笔记(15w字详解,堪称史上最详细,欢迎收藏)-CSDN博客
项目技术栈
- spring boot
- Redis
- MySQL
- vue
- ngnix
- mybatis-plus
技术点
- Redis缓存
- 缓存刷新机制(基于interceptor实现)
- 缓存一致性(缓存更新策略,超时剔除与主动更新)
- Redis全局唯一自增
- 缓存击穿与缓存穿透解决方法(逻辑过期或互斥锁、缓存空字符串)
- 超卖问题-添加乐观锁
- 一人一单—spring事务注解、代理对象、方法加锁
- Redis基于
setnx实现分布式锁,锁误删问题、锁原子性操作问题 - Redssion分布式锁框架
- Redis消息队列
- Lua脚本实现事务原子化
- Redis SortedSet实现点赞用户排名
- Feed流的滚动查询方式,基于SortedSet
踩过的坑
2025年08月15日 12:11:42:
问题:前端跑在8080端口,后端跑在8081端口,前端请求的URL也是8080端口的。
原因:nginx配置了前端代理,把所有8080/api的请求转发到8081,通过这种方式解决了跨域问题。
2025年08月15日 21:34:08:
问题:执行 LambdaQueryWrapper.eq(User::getPhone, phone)时报错Caused by: java.lang.reflect.InaccessibleObjectException
原因:Java 17的模块系统限制了反射访问 java.lang.invoke包,而 MyBatis-Plus 的 Lambda 表达式解析依赖此反射机制。降级java8解决。
2025年08月16日 12:13:56:
判空逻辑优化:shopJson.isEmpty()不能这样用,因为如果shopJson是空就会有异常,用StrUtil.isBlank(shopJson)
2025年08月16日 12:17:54:
手动new拦截器会导致自动注入失效:
interceptorRegistry.addInterceptor(new LoginInterceptor());
interceptorRegistry.addInterceptor(new RefreshTokenInterceptor());
后果:拦截器内部的@Autowired注解(如 StringRedisTemplate)不会生效,导致 NullPointerException
| 操作 | 结果 |
|---|---|
new LoginInterceptor() | 绕过Spring容器,导致: 1. @Autowired失效 2. AOP代理失效 3. 生命周期不受控 |
@Autowired注入 | Spring自动处理依赖,保证: 1. 依赖注入有效 2. 单例管理 3. 代理增强 |
功能实现

短信登录
- 基于Session实现登录
- 集群的session共享问题
- 基于Redis实现共享session登录
流程如下:
- 提交手机号—前端校验—点击发送验证码
- 后端生成验证码,保存验证码到Session/Redis中
- 前端点击登录,发送请求,在实现的demo中 直接打印在后端控制台上模拟短信验证码功能
- 后端校验前端的验证码,前端通过cookie/url字段捎带上验证码,后端在session/Redis中基于token检验用户的验证码
- 验证码通过后,如果不存在用户就创建,存在就将用户信息存到Session/Redis里(Redis中,结构为{phone: code})
- 全程有一个拦截器
LoginInterceptor,拦截器默认排除/user/login请求,当访问其他url请求的时候,拦截器启动,并判断用户的登录状态(同样基于Session或Redis)
public Result sendCode(String phone, HttpSession session) {
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式不正确");
}
// 手机号合法,生成验证码,并保存到Redis中
String code = RandomUtil.randomNumbers(6);
stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, RedisConstants.LOGIN_CODE_TTL, TimeUnit.MINUTES);
// 发送验证码 此处用后端控制台模拟验证码的短信发送
log.info("验证码:{}", code);
return Result.ok();
}
用Redis解决Session集群共享问题
集群的session共享问题
多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。 所以需要用一台新的服务器跑Redis,存缓存,用Redis缓存来解决Session共享问题。
需要注意的点:
- 代码实现中,stringRedisTemplate需要保证K-V都是string类型(数据结构用的Redis Hash)
- radis需要设置有效期,有效期刷新可以在拦截器中实现
- Redis的数据结构采用HashMap,因为短信验证涉及多个字段(phone、验证码、用户id等)如果用String,String的value需要Json化,更麻烦
验证流程:
- 收到前端发的验证码,结构为{phone: code}
- 先验证code与服务端Redis存着的一致与否
- 再验证phone是否存在数据库中,不存在就在数据库中注册一个账号
- 之后,封装一个token返回给前端,token包括用户的id等数据
- 这个token同时也是服务端缓存用户数据的Redis数据的Key
public Result login(LoginFormDTO loginForm, HttpSession session) {
String phone = loginForm.getPhone();
String code = loginForm.getCode();
// 1、判断手机号是否合法
if (RegexUtils.isPhoneInvalid(phone)) {
return Result.fail("手机号格式不正确");
}
// 2、判断验证码是否正确
// 从redis中拿到缓存的验证码
String redisCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
if (code == null || !code.equals(redisCode)) {
return Result.fail("验证码不正确");
}
// 3、判断手机号是否是已存在的用户
LambdaQueryWrapper<User> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(User::getPhone, phone);
User user = this.getOne(lambdaQueryWrapper);
if (Objects.isNull(user)) {
// 用户不存在,需要注册
user = createUserWithPhone(phone);
}
// 4、保存用户信息到Redis中,便于后面逻辑的判断(比如登录判断、随时取用户信息,减少对数据库的查询)
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
// stringRedisTemplate要求key与value都为string类型 如果用原生的k-v结构: token-userMap, 会导致userDTO中的long类型字段id
// 由于不是string类型而报错, 需要修改userMap的映射对 使类型全为string
// 下面这段代码实现的就是把userDTO的三个字段类型都改成string封装成string
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create().setIgnoreNullValue(true).
setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
String token = UUID.randomUUID().toString(true);
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
log.info("用户tokenKey:" + tokenKey);
// 设置redis有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
return Result.ok(token);
}
双层登录拦截器
登录拦截器(Login Interceptor)的核心作用是 在用户访问需要身份验证的接口时,自动检查登录状态。避免出现直接通过URL就能访问一些需要身份鉴权的页面。
为什么需要双层拦截器
单拦截器的问题在于,单login拦截器会不拦截一些页面(如首页,登录页等),如果用户一直在这些页面停留,不会触发redis缓存刷新,那用户用着用着就发现Redis过期了。用第二个拦截器专门实现Redis刷新。

springboot中如何实现拦截器
- 写拦截器类,类需要实现
HandlerInterceptor接口,在类上要带上@Component注解,把他注册为Bean; 拦截器有3个生命周期:preHandle,在 控制器方法执行前被调用(路由匹配成功后)postHandle,在 控制器方法执行后调用afterCompletion,在 整个请求处理完成后(包括视图渲染或异常处理)调用,必定执行
- 在mvcConfig(实现Spring的
WebMvcConfigurer)中,重写addInterceptors接口,添加先前实现的拦截器类。
具体实现:
- 第一层是刷新拦截器,获取token,用
String token = request.getHeader("authorization");常见的身份验证Token会通过Authorization头部传递。 - Redis里面找一下这个token有value吗,找不到说明没有,直接response(401)。
- 找的到说明存在,把信息存在
ThreadLocal中,并且刷新token的有效期。 - 第二层拦截器较为简单,只需要拦截URL+二次鉴权就行了。
拦截器执行顺序
| preHandle | 按注册顺序执行(先Refresh后Login) | preHandle() |
| postHandle | 按注册逆序执行(先Login后Refresh) | postHandle() |
| afterCompletion | 按注册逆序执行(先Login后Refresh) | afterCompletion() |

ThreadLocal应用与实现
ThreadLocal 提供了线程局部的变量,每个线程都有自己独立的变量副本,互不干扰。它实现了线程隔离,避免了多线程环境下的共享变量竞争问题。项目中,封装成UserHolder来使用。
ThreadLocal底层其实是一个Map,Key是ThreadLocal对象本身,Value就是set的值。Key是弱引用的,当没会被GC自动回收,Value是强引用的,无法被GC自动回收,所以需要自己手动remove不用的ThreadLocal值。
public class UserHolder {
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
public static void saveUser(UserDTO user){
tl.set(user);
}
public static UserDTO getUser(){
return tl.get();
}
// 移除线程绑定的对象而非线程本身
public static void removeUser(){
tl.remove();
}
}
数据查询缓存
缓存更新策略(为了解决数据一致性问题)
在修改数据库的同时,更新缓存。
缓存更新的实现一般是删掉缓存,然后再添加一个新的,比用更新命令更新缓存更高效
缓存与数据库的操作更新顺序,优先执行数据库更新(原则是优先执行速度慢的)。
为什么要先更新数据库再删除缓存?
- 先删除缓存,再更新数据库,如果这个时候有请求,会出现缓存穿透,全部去访问数据库了。
- 先更新数据库可能存在脏读,例如线程1更新数据库,更新缓存的这一刻,有线程2也更新了数据库,这样线程读的数据就是脏的,这种情况的概率特别低,因为需要满足线程1正在更新缓存且线程2更新数据库,更新缓存的操作是特别快的。
常用缓存读写策略
- 旁路缓存读写,写(更新)的时候先更新数据库,再删缓存;读的时候先读缓存,读不到再读数据库,读到了再把内容写到缓存里。最常用的策略,较适用于读多写少的场景。
- 读写穿透,把cache当主要对象。写的时候,先更新cache,之后再更新数据库;当读的时候,缓存里读的到就返回,读不到就请求给数据库,数据库返回给cache,cache再返回,较适用数据实时性要求较高、对一致性要求严格的场景
- 异步缓存写入,和读写穿透差不多,区别在于数据库同步的方式,读写穿透是同步的,异步缓存写入是异步的
缓存穿透问题

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库,造成数据库压力巨大。
解决方法一般采用缓存空值。
if(shopJson != null && shopJson.equals("")){
// 说明 返回的shopJson是一个用以解决缓存穿透的""字符串
return null;
}
还有一种方法是布隆过滤器:
布隆过滤器是一种空间效率极高(使用位数组bit array存储)的概率型数据结构,用于快速判断一个元素是否存在于某个集合中。
优点是空间占用小,缺点是其基于概率,仍有误判的风险(布隆过滤器说存在数据,实际上不一定存在)。
缓存击穿问题
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
常见的解决方案有两种: 逻辑过期(设置缓存TTL永不过期,缓存数据里面加个逻辑TTL,拿数据的时候判断逻辑TTL有没有过期,优点是性能高,缺点是内存占用大) 互斥锁(加锁访问数据库,性能差,容易出现死锁,但是内存占用小)
互斥锁方案,基于Redis的setnx指令实现,setnx指令能够实现:当且仅当 key不存在时,将 key的值设为 value,设置成功返回1,否则返回0,相关代码如下:
// 当有访问数据库的请求需要处理时 自旋锁尝试拿锁
while (!tryLock(mutexKey)) {
if (++spinCount > maxSpins) {
log.warn("获取锁失败,已达到最大自旋次数: {}", maxSpins);
return null; // 或者可以返回旧数据/降级处理
}
Thread.sleep(spinInterval); // 短暂休眠后继续尝试
}
private boolean tryLock(String key){
// 基于Redis的setnex实现互斥锁
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private boolean unlock(String key){
Boolean flag = stringRedisTemplate.delete(key);
return BooleanUtil.isTrue(flag);
}
测试一下,自己的PC上(AMD R7-7730U)QPS(query per second)400的时候,接口吞吐能达到339。

逻辑过期方案,要注意一定要先进行数据预热。就不压测了,因为自己的实现上也加了一层互斥锁访问数据库,效果肯定是没上一个方案好的。
// 通过自己的逻辑TTL判断是否过期 如果过期 需要缓存重建
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
JSONObject data = (JSONObject)redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
// 判断是否过期
if(expireTime.isAfter(LocalDateTime.now())){
// 未过期 返回
return shop;
}
// 拿锁访问数据库
try {
// 自旋锁尝试拿锁
while (!tryLock(mutexKey)) {
if (++spinCount > maxSpins) {
log.warn("获取锁失败,已达到最大自旋次数: {}", maxSpins);
return null; // 或者可以返回旧数据/降级处理
}
Thread.sleep(spinInterval); // 短暂休眠后继续尝试
}
// 拿锁成功 使用线程进行缓存重建
CACHE_REBUILD_POOL.submit(() -> {
this.saveShop2Redis(id, 30L);
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
unlock(mutexKey);
}
// 数据预热、重建缓存
public void saveShop2Redis(Long id, Long expireSeconds){
Shop shop = this.getById(id);
log.info("查询到shop");
RedisData redisData = new RedisData();
redisData.setData(shop);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds));
stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(redisData));
}
缓存雪崩问题
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力
一般解决方法:给不同的Key的TTL添加随机值、给业务添加多级缓存。
本项目没有对预防缓存雪崩进行实现。
秒杀场景
分布式ID
为什么要实现全局唯一ID?
自增ID有些局限性:安全隐私问题,自增太规律的话,可能会被人推测出一些敏感信息;不方便分布式储存,一般来说,数据库的表,超过500万行就要考虑分表分库了。
分布式ID的实现方式
一般有:数据库自增、UUID、Redis自增。本项目,采用的实现方式是时间戳+Redis自增的序列号。
本项目的ID是64位的,timestamp基于标准时间gap确定,之后将timestamp左移32位后接上Redis缓存返回的自增序列号。
public long nextId(String keyPrefix){
// 1、生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 2、生成序列号
// 以当天的时间戳为key,防止一直自增下去导致超时,这样每天的极限都是 2^{31}
String date = now.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
Long count = stringRedisTemplate.opsForValue().increment(ID_PREFIX + keyPrefix + ":" + date);
// 3、拼接并返回 位运算
// key是不变的 订单号是基于timestamp和Redis自增拼接的
return timestamp << COUNT_BITS | count;
}
乐观锁解决单体架构下的库存超卖问题
高并发场景下,如果什么也不做,会出现库存超卖问题。例子如下,线程1和线程2都查订单,都觉得库存还够,然后就都对订单进行了扣减。问题的本质是因为查库存-扣减订单的过程是非原子化导致的。
超卖问题的常见解决方案:
悲观锁,认为线程安全问题一定会发生,因此操作数据库之前都需要先获取锁,确保线程串行执行。常见的悲观锁有:synchronized、lock
乐观锁,认为线程安全问题不一定发生,因此不加锁,只会在更新数据库的时候去判断有没有其它线程对数据进行修改,如果没有修改则认为是安全的,直接更新数据库中的数据即可,如果修改了则说明不安全,直接抛异常或者等待重试。常见的实现方式有:版本号法(读取的时候看一眼版本号,要修改的时候再看一眼版本号, 版本号相同说明没被改过)、CAS操作
用CAS方法,解决乐观锁的超卖问题:
CAS法不加版本号,在set数据库的时候,再查一次数据,看看是否有变化,不变就说明没人改过,修改即可。下面代码的实现CAS方法的核心就是eq("stock", voucher.getStock()),不过这样会导致数据已修改就终止操作了,所以可以改进为:gt(SeckillVoucher::getStock, 0)
// 添加乐观锁 CAS法 gt(SeckillVoucher::getStock, 0)是eq("stock", voucher.getStock())的优化版
boolean success = seckillVoucherService.update().setSql("stock = stock - 1")
.eq("stock", voucher.getStock()).gt(SeckillVoucher::getStock, 0).update();
if(!success){
return Result.fail("更新数据库失败");
}
注意:这种方案,能够保证库存不低于0,但是不能保证一人一单。
要想实现单机统架构下的一人一单,直接考虑悲观锁。
分布式锁解决集群架构下的一人一单问题
通过加悲观锁,能够解决单机架构下的一人一单问题,但是在集群下就行不通了。在集群模式下,加锁只是对该JVM给当前这台服务器的请求的加锁,而集群是多台服务器,所以要使用分布式锁,满足集群模式下多进程可见并且互斥的锁。
简单的Redis分布式锁实现方式,基于SETNX指令,key为锁名称,value为每个线程自己的threadID。代码如下:
@Override
public boolean tryLock(long timeoutSec) {
String threadId = ID_PREFIX + Thread.currentThread().getId() + "";
// SET KEY_PREFIX:name name为锁名称
Boolean result = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
}
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
分布式锁的优化
锁误删问题
为了避免长时间持有锁导致的效率低下等问题,为Redis的锁添加了一个持有时间。但是假如某线程在持有锁的时候,进行了IO等耗时操作,然后Redis到期释放锁了,线程2拿到了锁执行任务,等线程1IO完毕之后,其以为自己还持有锁,就执行了放锁操作,结果实际上这个锁已经是线程二正在使用了。
最简单的优化方式就是,放锁的时候,判断一下是不是自己的锁。
@Override
public void unlock() {
// 判断锁的线程标识是否与当前线程一致
String currentThreadFlag = ID_PREFIX + Thread.currentThread().getId();
String redisThreadFlag = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if (currentThreadFlag != null || currentThreadFlag.equals(redisThreadFlag)) {
// 一致,说明当前的锁就是当前线程的锁,可以直接释放
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

Lua脚本解决分布式锁误删问题、实现原子化锁释放
判断锁是否是自己的到释放锁这一过程并不是原子性的,可能会出现判断完了后刚好JVM跑去执行GC 把线程阻塞住了,导致超时释放锁,同样会导致锁误删问题。解决方法是需要确保判断到释放的过程是原子性。使用Lua脚本。
// static包裹的静态代码块 作用是在类初始化的时候调用一次
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("lua/unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
// 放锁 放锁利用Lua脚本原子化操作
public void unlock() {
// 执行lua脚本
// stringRedisTemplate.execute第二个参数是一个集合
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId()
);
}
Lua脚本的流程:
- 获取锁中的线程标示
- 判断是否与指定的标示(当前线程标示)一致
- 如果一致则释放锁(删除)
- 如果不一致则什么都不做
-- 先获取线程标识 get key[1]
local id = redis.call('get', KEYS[1])
-- 比较缓存中的线程标识与当前线程标识是否一致
if (id == ARGV[1]) then
-- 一致,直接删除
return redis.call('del', KEYS[1])
end
-- 不一致,返回0
return 0
实现异步秒杀优化
为什么用异步秒杀?
串行执行的实现方法并发能力弱。
分成两个线程,将耗时较短的逻辑判断放到Redis中,实际上,只要满足库存充足且一人一单,一定是可以下单成功的,完全可以直接返回给用户成功的信息,然后后台开一个线程,后台线程去慢慢执行队列里的任务(操作数据库),这样能够提供系统的并发能力。

基于JVM阻塞队列实现
基于JVM自己的阻塞队列来实现异步操作,即新开的线程将任务放入JVM自己的阻塞队列中。实现细节如下:
Java的阻塞队列为:
private BlockingQueue<VoucherOrder> ordersTask = new ArrayBlockingQueue<>(1024 * 1024);
并且,需要在一开始,就注册一个线程处理任务。
// Spring Bean 初始化完成后
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
/**
* 线程任务: 不断从阻塞队列中获取订单
*/
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true){
try {
// 取任务
log.info("get task");
VoucherOrder voucherOrder = ordersTask.take();
// 执行任务
handleVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.error("异常", e);
throw new RuntimeException(e);
}
}
}
}
// 任务为将数据持久化到数据库中。
private void handleVoucherOrder(VoucherOrder voucherOrder){
//创建订单 不能再userHoder了 因为现在是多线程 ThradLocal下取不到主线程的东西
// Long userId = UserHolder.getUser().getId();
Long userId = voucherOrder.getUserId();
// 自己创建分布式锁对象
SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate,"order:" + userId);
boolean isLocked = lock.tryLock(1200);
if(!isLocked){
log.info("获取锁失败");
}
try{
log.info("threadProxy.createVoucherOrder_v2(voucherOrder);");
// 具体的持久化任务 交由createVoucherOrder_v2执行
threadProxy.createVoucherOrder_v2(voucherOrder);
}finally {
lock.unlock();
}
}
此处有个关键点,关于threadProxy的使用。AopContext.currentProxy()的作用是获取当前类的 Spring AOP 代理对象。具体来说,为什么子线程的任务handleVoucherOrder需要用threadProxy.createVoucherOrder_v2(voucherOrder),通过主线程的代理来调用处理方法,而不直接用this.来访问?
核心就在于:
Spring 事务的本质:当在方法上添加 @Transactional时,Spring 会为该 Bean 创建一个代理对象,在目标方法执行前后插入事务管理逻辑(如开启事务、提交/回滚)。如果直接用this.createVoucherOrder_v2来执行添加操作,会导致createVoucherOrder_v2函数的@Transactional失效,既事务失效。因为直接用this.createVoucherOrder_v2,this指向的是原始对象(VoucherOrderServiceImpl),而非 Spring 生成的代理对象。
最后,controller直接指向的serviceimpl如下,Lua脚本主要完成了库存检查与用户是否有购买资格的判断。
// 查优惠券等操作 放到lua脚本里面写
Long orderId = redisIDWorker.nextId("order");
Long userId = UserHolder.getUser().getId();
// 1、执行Lua脚本,判断用户是否具有秒杀资格
log.info("执行lua脚本 添加到redis消息队列");
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT_v2,
Collections.emptyList(),
voucherId.toString(), userId.toString(), orderId.toString());
int r = result.intValue();
// 如果r = 0 说明有购买资格 待处理事件已放入消息队列
if(r != 0){
// 脚本执行结果不为0 无购买资格
return Result.fail(r == 2 ? "不能重复下单" : "库存不足");
}
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setVoucherId(voucherId);
voucherOrder.setUserId(userId);
// 作为类的成员变量赋值,方便子线程使用主线程获得的完整代理对象
threadProxy = (IVoucherOrderService)AopContext.currentProxy();
基于Redis-Stream实现
JVM阻塞队列是基于内存的,掉电的话阻塞队列里的任务全消失了,而且jvm有JVM自己的内存限制。用Redis自带的队列来实现消息队列更合适。
Redis中能够实现MQ效果的主要由以下三种方式:

stream类型消息队列,使用的是消费者组模式(Consumer Group)
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列,具备以下特点
- 消息分流:队列中的消息会分留给组内的不同消费者,而不是重复消费者,从而加快消息处理的速度
- 消息标识:消费者会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后读取消息,确保每一个消息都会被消费
- 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list,当处理完成后,需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移除
代码实现和基于JVM的架构差不多,只是需要添加lua脚本:
-- 如果能够执行添加库存的操作,就将添加消息
redis.call('XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId);
线程中执行任务的代码为:
// 获取消息队列中的订单信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
// 2、判断消息获取是否成功
if (list == null || list.isEmpty()) {
// 2.1 消息获取失败,说明没有消息,进入下一次循环获取消息
continue;
}
// 获取成功可以下单
压力测试对比
压测对比三种情况(普通乐观锁实现的并发访问、基于JVM阻塞队列的异步优化、基于Redis-Stream实现的异步优化)



点赞排名、用户签到与UV统计
使用ZSet实现点赞排名
基于Redis的SortedSet结构实现。ZSet 有两种不同的实现,分别是 ziplist 和 skiplist。当有序集合对象同时满足以下两个条件时,使用 ziplist(所有数据(包括元素值、长度信息)存储在一块连续的内存空间中,省去了指针,直接通过偏移量访问相邻元素。):
ZSet 保存的键值对数量少于 128 个;
每个元素的长度小于 64 字节。
如果不满足上述两个条件,那么使用 skiplist 。
常用的命令:zadd key value score,添加value的scorezscore key value,获取value的score,没有就返回不存在
默认排序规则:
ZRANGE key 0 -1→ 返回所有元素,按score从小到大排序。升序ZREVRANGE key 0 -1→ 返回所有元素,按score从大到小排序。降序
为了实现点赞排名(优先显示最近的点赞用户,类似朋友圈),需要将时间戳作为score(score默认为升序)实现逻辑分为:点赞博客功能与查询点赞功能
点赞功能:判断用户是否点赞,如果没点赞就zadd,否则就remove.
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
boolean result;
if (score == null) {
// 1.1 用户未点赞,点赞数+1
result = this.update(new LambdaUpdateWrapper<Blog>()
.eq(Blog::getId, id)
.setSql("liked = liked + 1"));
if (result) {
// 数据库更新成功,更新缓存 zadd key value score
stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());
}
} else {
// 1.2 用户已点赞,点赞数-1
result = this.update(new LambdaUpdateWrapper<Blog>()
.eq(Blog::getId, id)
.setSql("liked = liked - 1"));
if (result) {
// 数据更新成功,更新缓存 zremove key value
stringRedisTemplate.opsForZSet().remove(key, userId.toString());
}
}
点赞展示功能:
// 查询Top5的点赞用户 zrange key 0 4
String key = BLOG_LIKED_KEY + id;
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
用户签到
用户签到功能,用bitmap来实现。Redis中有一种特殊的数据结构,叫Bitmap。
BitMap的操作命令有:
- SETBIT:向指定位置(offset)存入一个0或1
- GETBIT :获取指定位置(offset)的bit值
- BITCOUNT :统计BitMap中值为1的bit位的数量
- BITFIELD key GET type offset :读取指定位数的bit位
在 Redis 中,一个 Bitmap 的寻址上限是 2³²(约 42.9 亿)个位,这恰好需要 512MB 的内存来存储,可以说Bitmap最大上限叫啥512MB
原因:当使用 SETBIT命令时,实际上是在操作一个 String 值。下面的 offset(偏移量)参数的类型是 整数,在Redis的实现中,这个 offset被定义为 无符号 32 位整数。因此只能寻址 2³²
SETBIT key offset value
实现代码:
// 获取当前登录用户
Long userId = UserHolder.getUser().getId();
// 获取日期
LocalDateTime now = LocalDateTime.now();
// 拼接key
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userId + keySuffix;
// 获取今天是本月的第几天
int dayOfMonth = now.getDayOfMonth();
// 获取本月截止今天为止的所有的签到记录,返回的是一个List, result.get(0)获取到一个十进制的数字
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
UV统计
UV:全称Unique Visitor,独立访客量。1天内同一个用户多次访问该网站,只记录1次。
PV:全称Page View,页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。
UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖。
Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。
代码实现如下,保存100W条记录,内存占用增加几乎为0:
/**
* 测试 HyperLogLog 实现 UV 统计的误差
*/
@Test
public void testHyperLogLog() {
String[] values = new String[1000];
// 批量保存100w条用户记录,每一批1个记录
int j = 0;
for (int i = 0; i < 1000000; i++) {
j = i % 1000;
values[j] = "user_" + i;
if (j == 999) {
// 发送到Redis
stringRedisTemplate.opsForHyperLogLog().add("hl2", values);
}
}
// 统计数量
Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
System.out.println("count = " + count);
}
项目部署
- 前端:localhost:8080
- 后端:localhost:8081
Windows下启动:
- 启动前端:
start nginx,重新加载:nginx -s reload - 启动Redis:启动Redis目录下的
start.bat - 启动MySQL
- 启动后端:Idea直接启动项目