
地信派实现用户活跃度排行榜
业务场景
排行榜是一个很常见的需求场景。
用户活跃度排行榜,主要基于redis的zset的数据结构来实现的。
zset
是Redis
提供的一个非常特别的数据结构,常用作排行榜等功能,以用户id
为value
,关注时间或者分数作为score
进行排序。
redis 127.0.0.1:6379> zadd redis.net.cn 0 redis
(integer) 1
redis 127.0.0.1:6379> zadd redis.net.cn 0 mongodb
(integer) 1
redis 127.0.0.1:6379> zadd redis.net.cn 0 rabitmq
(integer) 1
redis 127.0.0.1:6379> zadd redis.net.cn 0 rabitmq
(integer) 0
redis 127.0.0.1:6379> ZRANGEBYSCORE redis.net.cn 0 1000
1) "redis"
2) "mongodb"
3) "rabitmq"
方案设计
1 场景说明
地信派中,提供了一个用户的活跃度排行榜,分为日、月两个榜单。
活跃度计算方式:
用户访问一个新的页面 + 1分
对于一篇文章,用户点赞、收藏,+ 2 分。取消点赞、收藏,将之前加的分收回
文章评论,+ 3 分
发布一篇审核通过的文章,+ 10分
榜单:展示活跃度前30的用户
2 方案设计
存储单元
展示的排行榜中,应该具有以下信息
// 用来标识具体的用户
long userId;
// 用户在排行榜上的排名
long rank;
// 用户的历史最高分,也就是当前排行榜上的积分
long score;
数据结构
排行榜,一般都是连续的,借此我们可以使用一个合适的数据结构,LinkedList,好处在于排名变动时,不需要数组拷贝。
3 redis使用方案
主要使用redis的zset数据结构,带权重的集合。
set:确保元素唯一性,使用zset可以根据score进行排序
score权重:即用户的积分
filed:用户的唯一标识信息
排行榜实现
1 用户活跃度积分更新
需要实现一个更新用户活跃度的方法UserActivityRankServiceImpl,首先需要定义一个实体,覆盖活跃度相关的业务场景。
ActivityScoreBo
package com.github.paicoding.forum.service.rank.service.model;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* 用户活跃度参数传递实体
* 每个用户维护一个更新操作历史记录表
* key:activity_rank_{user_id}_{年月日}
* field:活跃度更新的key
* value:添加的活跃度
* 访问新页面 + 1分
* 对于一篇文章,点赞、收藏 + 2分;取消点赞、收藏,将之前的活跃分收回
* 文章评论 + 3分
* 发布一篇审核通过的文章 + 10分
*/
@Data
@Accessors(chain = true)
public class ActivityScoreBo {
/**
* 访问的页面
*/
private String path;
/**
* 访问的目标文章id
*/
private Long articleId;
/**
* 评论:true为添加评论,false为取消评论
*/
private Boolean rate;
/**
* 点赞: true为点赞,false为取消点赞
*/
private Boolean praise;
/**
* 收藏增:true为收藏,false为取消收藏
*/
private Boolean collect;
/**
* 发布文章增加活跃度
*/
private Boolean publishArticle;
/**
* 被关注的用户id
*/
private Long followedUserId;
/**
* 关注增加活跃度
*/
private Boolean follow;
}
每个用户每一天使用一张表记录。
业务流程
获取到Bo后,如何进行活跃度更新?
获取到Bo
对于增加活跃度
做一个幂等,防止重复添加(例如多次刷新一个页面,实际上是加一次积分)
如果幂等了,不用更新,直接返回;否则,执行更新
对于减少活跃度
判断之前有没有增加过活跃度,防止扣减为负数
之前没有扣减过,直接返回;否则,执行扣减
1.1 幂等策略
幂等性就是对于同一个操作,用户多次请求后,结果应该是一样的,不会因为重复请求导致数据不一致或者业务异常。
为了防止重复增加活跃度,如何做幂等呢?
直接将用户的加分项,直接记录下来,在具体执行加分操作时,进行幂等判断。
每一个用户每一天产生一个活跃度记录表。
使用zset数据结构
key:activity_ran_{userId}_{年月日}
value:
field(value):活跃度更新的key,如访问页面, path_/article/detail/1
score: 添加的积分
1.2 榜单积分更新
/**
* 分数更新
*
*
* @param key
* @param value
* @param score
* @return
*/
public static Double zIncrBy(String key, String value, Integer score) {
return template.execute(new RedisCallback<Double>() {
@Override
public Double doInRedis(RedisConnection connection) throws DataAccessException {
// 对score进行增量更新
// 对积分表,ZINCRBY key increment field
return connection.zIncrBy(keyBytes(key), score, valBytes(value));
}
});
}
1.3 具体实现
/**
* 添加活跃分
*
* @param userId 用于更新活跃积分的用户
* @param activityScore 触发活跃积分的时间类型
*/
@Override
public void addActivityScore(Long userId, ActivityScoreBo activityScore) {
if (userId == null) {
return;
}
// 1. 计算活跃度(正为加活跃,负为减活跃)
String field;
int score = 0;
if (activityScore.getPath() != null) {
// 页面访问 + 1分
field = "path_" + activityScore.getPath();
score = 1;
} else if (activityScore.getArticleId() != null) {
field = activityScore.getArticleId() + "_";
if (activityScore.getPraise() != null) {
// 点赞
field += "praise";
// 取消点赞 -2 分
score = BooleanUtils.isTrue(activityScore.getPraise()) ? 2 : -2;
} else if (activityScore.getCollect() != null) {
// 收藏
field += "collect";
score = BooleanUtils.isTrue(activityScore.getCollect()) ? 2 : -2;
} else if (activityScore.getRate() != null) {
// 评论回复
field += "rate";
score = BooleanUtils.isTrue(activityScore.getRate()) ? 3 : -3;
} else if (BooleanUtils.isTrue(activityScore.getPublishArticle())) {
// 发布文章
field += "publish";
score += 10;
}
} else if (activityScore.getFollowedUserId() != null) {
// 关注添加积分
field = activityScore.getFollowedUserId() + "_follow";
score = BooleanUtils.isTrue(activityScore.getFollow()) ? 2 : -2;
} else {
return;
}
// 日活和月活
final String todayRankKey = todayRankKey(); //eg.activity_rank_20250223
final String monthRankKey = monthRankKey(); //eg.activity_rank_202502
// 2. 幂等:判断之前是否有更新过相关的活跃度信息 activity_rank_userId年月日
final String userActionKey = ACTIVITY_SCORE_KEY + userId + DateUtil.format(DateTimeFormatter.ofPattern("yyyyMMdd"), System.currentTimeMillis());
// 获取下用户活跃度记录表
Integer ans = RedisClient.hGet(userActionKey, field, Integer.class);
if (ans == null) {
// 2.1 之前没有加分记录,执行具体的加分
if (score > 0) {
// 记录加分记录
RedisClient.hSet(userActionKey, field, score);
// 个人用户的操作记录,保存一个月的有效期,方便用户查询自己最近31天的活跃情况
RedisClient.expire(userActionKey, 31 * DateUtil.ONE_DAY_SECONDS);
// 更新当天和当月的活跃度排行榜
Double newAns = RedisClient.zIncrBy(todayRankKey, String.valueOf(userId), score);
RedisClient.zIncrBy(monthRankKey, String.valueOf(userId), score);
if (log.isDebugEnabled()) {
log.info("活跃度更新加分! key#field = {}#{}, add = {}, newScore = {}", todayRankKey, userId, score, newAns);
}
if (newAns <= score) {
// 由于上面只实现了日/月活跃度的增加,但是没有设置对应的有效期;为了避免持久保存导致redis占用较高;因此这里设定了缓存的有效期
// 日活跃榜单,保存31天;月活跃榜单,保存1年
// 为什么是 newAns <= score 才设置有效期呢?
// 因为 newAns 是用户当天的活跃度,如果发现和需要增加的活跃度 scopre 相等,则表明是今天的首次添加记录,此时设置有效期就比较符合预期了
// 但是请注意,下面的实现有两个缺陷:
// 1. 对于月的有效期,就变成了本月,每天的首次增加活跃度时,都会重新刷一下它的有效期,这样就和预期中的首次添加缓存时,设置有效期不符
// 2. 若先增加活跃度1,再减少活跃度1,然后再加活跃度1,同样会导致重新算了有效期
// 严谨一些的写法,应该是 先判断 key 的 ttl, 对于没有设置的才进行设置有效期,如下
Long ttl = RedisClient.ttl(todayRankKey);
if (!NumUtil.upZero(ttl)) {
RedisClient.expire(todayRankKey, 31 * DateUtil.ONE_DAY_SECONDS);
}
ttl = RedisClient.ttl(monthRankKey);
if (!NumUtil.upZero(ttl)) {
RedisClient.expire(monthRankKey, 12 * DateUtil.ONE_MONTH_SECONDS);
}
}
}
} else if (ans > 0) {
// 2.2 之前已经加过分,因此这次减分可以执行(用户活跃度记录表直接移除,排行榜减分即可)
if (score < 0) {
// 移除用户的活跃执行记录 --> 即移除用来做防重复添加活跃度的幂等键
Boolean oldHave = RedisClient.hDel(userActionKey, field);
if (BooleanUtils.isTrue(oldHave)) {
// 日排行榜记录表和月排行记录表减分
Double newAns = RedisClient.zIncrBy(todayRankKey, String.valueOf(userId), score);
RedisClient.zIncrBy(monthRankKey, String.valueOf(userId), score);
if (log.isDebugEnabled()) {
log.info("活跃度更新减分! key#field = {}#{}, add = {}, newScore = {}", todayRankKey, userId, score, newAns);
}
}
}
}
}
业务逻辑并不复杂,但是仍存在问题
事务问题:多次redis操作,存在事务问题
并发问题:没有做并发,幂等无法100%生效,仍然存在重发添加或者扣减活跃度的问题
1.4 触发活跃度更新
上述只提供了活跃度更新的方法,以及对应的存储方案,如何实现自动更新?
使用Event/Listener方式处理活跃度更新。
文章/用户相关的操作时间监听,并更新对应的活跃度
package com.github.paicoding.forum.service.rank.service.listener;
import com.github.paicoding.forum.api.model.context.ReqInfoContext;
import com.github.paicoding.forum.api.model.enums.ArticleEventEnum;
import com.github.paicoding.forum.api.model.event.ArticleMsgEvent;
import com.github.paicoding.forum.api.model.vo.notify.NotifyMsgEvent;
import com.github.paicoding.forum.service.article.repository.entity.ArticleDO;
import com.github.paicoding.forum.service.comment.repository.entity.CommentDO;
import com.github.paicoding.forum.service.rank.service.UserActivityRankService;
import com.github.paicoding.forum.service.rank.service.model.ActivityScoreBo;
import com.github.paicoding.forum.service.user.repository.entity.UserFootDO;
import com.github.paicoding.forum.service.user.repository.entity.UserRelationDO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 用户活跃相关的消息监听器
*
* @author YiHui
* @date 2023/8/19
*/
@Component
public class UserActivityListener {
@Autowired
private UserActivityRankService userActivityRankService;
/**
* 用户操作行为,增加对应的积分
*
* @param msgEvent
*/
@EventListener(classes = NotifyMsgEvent.class)
@Async
public void notifyMsgListener(NotifyMsgEvent msgEvent) {
switch (msgEvent.getNotifyType()) {
case COMMENT:
case REPLY:
CommentDO comment = (CommentDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setRate(true).setArticleId(comment.getArticleId()));
break;
case COLLECT:
UserFootDO foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setCollect(true).setArticleId(foot.getDocumentId()));
break;
case CANCEL_COLLECT:
foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setCollect(false).setArticleId(foot.getDocumentId()));
break;
case PRAISE:
foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPraise(true).setArticleId(foot.getDocumentId()));
break;
case CANCEL_PRAISE:
foot = (UserFootDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPraise(false).setArticleId(foot.getDocumentId()));
break;
case FOLLOW:
UserRelationDO relation = (UserRelationDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setFollow(true).setFollowedUserId(relation.getUserId()));
break;
case CANCEL_FOLLOW:
relation = (UserRelationDO) msgEvent.getContent();
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setFollow(false).setFollowedUserId(relation.getUserId()));
break;
default:
}
}
/**
* 发布文章,更新对应的积分
*
* @param event
*/
@Async
@EventListener(ArticleMsgEvent.class)
public void publishArticleListener(ArticleMsgEvent<ArticleDO> event) {
ArticleEventEnum type = event.getType();
if (type == ArticleEventEnum.ONLINE) {
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPublishArticle(true).setArticleId(event.getContent().getId()));
}
}
}
发布文章事件
/**
* 发布文章,更新对应的积分
*
* @param event
*/
@Async
@EventListener(ArticleMsgEvent.class)
public void publishArticleListener(ArticleMsgEvent<ArticleDO> event) {
ArticleEventEnum type = event.getType();
if (type == ArticleEventEnum.ONLINE) {
userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPublishArticle(true).setArticleId(event.getContent().getId()));
}
}
基于用户浏览行为的活跃度更新,使用Filter/Inteceptor实现
@Slf4j
@Component
public class GlobalViewInterceptor implements AsyncHandlerInterceptor {
@Autowired
private GlobalInitService globalInitService;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (handler instanceof HandlerMethod) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Permission permission = handlerMethod.getMethod().getAnnotation(Permission.class);
if (permission == null) {
permission = handlerMethod.getBeanType().getAnnotation(Permission.class);
}
if (permission == null || permission.role() == UserRole.ALL) {
if (ReqInfoContext.getReqInfo() != null ) {
// 用户活跃度更新
SpringUtil.getBean(UserActivityRankService.class).addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPath(ReqInfoContext.getReqInfo().getPath()));
}
return true;
}
2 排行榜查询
基本流程
从redis中获取topN的用户+评分
查询用户信息
根据用户评分进行排序,并更新每个用户的排名
@Override
public List<RankItemDTO> queryRankList(ActivityRankTimeEnum time, int size) {
// 获取集合名
String rankKey = time == ActivityRankTimeEnum.DAY ? todayRankKey() : monthRankKey();
// 1. 获取topN的活跃用户
List<ImmutablePair<String, Double>> rankList = RedisClient.zTopNScore(rankKey, size);
if (CollectionUtils.isEmpty(rankList)) {
return Collections.emptyList();
}
// 2. 查询用户对应的基本信息
// 构建userId -> 活跃评分的map映射,用于补齐用户信息
Map<Long, Integer> userScoreMap = rankList.stream().collect(Collectors.toMap(s -> Long.valueOf(s.getLeft()), s -> s.getRight().intValue()));
// 根据用户id批量查询用户信息
List<SimpleUserInfoDTO> users = userService.batchQuerySimpleUserInfo(userScoreMap.keySet());
// 3. 根据评分进行排序
List<RankItemDTO> rank = users.stream()
.map(user -> new RankItemDTO().setUser(user).setScore(userScoreMap.getOrDefault(user.getUserId(), 0)))
.sorted((o1, o2) -> Integer.compare(o2.getScore(), o1.getScore()))
.collect(Collectors.toList());
// 4. 补齐每个用户的排名
IntStream.range(0, rank.size()).forEach(i -> rank.get(i).setRank(i + 1));
return rank;
}
- 感谢你赐予我前进的力量