业务场景

排行榜是一个很常见的需求场景。

用户活跃度排行榜,主要基于redis的zset的数据结构来实现的。

zsetRedis提供的一个非常特别的数据结构,常用作排行榜等功能,以用户idvalue,关注时间或者分数作为score进行排序。

redis 127.0.0.1:6379> zadd redis.net.cn 0 redis
(integer) 1
redis 127.0.0.1:6379> zadd redis.net.cn 0 mongodb
(integer) 1
redis 127.0.0.1:6379> zadd redis.net.cn 0 rabitmq
(integer) 1
redis 127.0.0.1:6379> zadd redis.net.cn 0 rabitmq
(integer) 0
redis 127.0.0.1:6379> ZRANGEBYSCORE redis.net.cn 0 1000
 
1) "redis"
2) "mongodb"
3) "rabitmq"

方案设计

1 场景说明

地信派中,提供了一个用户的活跃度排行榜,分为日、月两个榜单。

活跃度计算方式:

  • 用户访问一个新的页面 + 1分

  • 对于一篇文章,用户点赞、收藏,+ 2 分。取消点赞、收藏,将之前加的分收回

  • 文章评论,+ 3 分

  • 发布一篇审核通过的文章,+ 10分

榜单:展示活跃度前30的用户

2 方案设计

存储单元

展示的排行榜中,应该具有以下信息

// 用来标识具体的用户
long userId;
// 用户在排行榜上的排名
long rank;
// 用户的历史最高分,也就是当前排行榜上的积分
long score;

数据结构

排行榜,一般都是连续的,借此我们可以使用一个合适的数据结构,LinkedList,好处在于排名变动时,不需要数组拷贝。

3 redis使用方案

主要使用redis的zset数据结构,带权重的集合。

  • set:确保元素唯一性,使用zset可以根据score进行排序

  • score权重:即用户的积分

  • filed:用户的唯一标识信息

排行榜实现

1 用户活跃度积分更新

需要实现一个更新用户活跃度的方法UserActivityRankServiceImpl,首先需要定义一个实体,覆盖活跃度相关的业务场景。

ActivityScoreBo

package com.github.paicoding.forum.service.rank.service.model;

import lombok.Data;
import lombok.experimental.Accessors;

/**
 * 用户活跃度参数传递实体
 * 每个用户维护一个更新操作历史记录表
 * key:activity_rank_{user_id}_{年月日}
 * field:活跃度更新的key
 * value:添加的活跃度
 *  访问新页面 + 1分
 *  对于一篇文章,点赞、收藏 + 2分;取消点赞、收藏,将之前的活跃分收回
 *  文章评论 + 3分
 *  发布一篇审核通过的文章 + 10分
 */
@Data
@Accessors(chain = true)
public class ActivityScoreBo {
    /**
     * 访问的页面
     */
    private String path;

    /**
     * 访问的目标文章id
     */
    private Long articleId;

    /**
     * 评论:true为添加评论,false为取消评论
     */
    private Boolean rate;

    /**
     * 点赞: true为点赞,false为取消点赞
     */
    private Boolean praise;

    /**
     * 收藏增:true为收藏,false为取消收藏
     */
    private Boolean collect;

    /**
     * 发布文章增加活跃度
     */
    private Boolean publishArticle;

    /**
     * 被关注的用户id
     */
    private Long followedUserId;

    /**
     * 关注增加活跃度
     */
    private Boolean follow;
}

每个用户每一天使用一张表记录。

业务流程

获取到Bo后,如何进行活跃度更新?

  • 获取到Bo

  • 对于增加活跃度

    • 做一个幂等,防止重复添加(例如多次刷新一个页面,实际上是加一次积分)

    • 如果幂等了,不用更新,直接返回;否则,执行更新

  • 对于减少活跃度

    • 判断之前有没有增加过活跃度,防止扣减为负数

    • 之前没有扣减过,直接返回;否则,执行扣减

1.1 幂等策略

幂等性就是对于同一个操作,用户多次请求后,结果应该是一样的,不会因为重复请求导致数据不一致或者业务异常。


为了防止重复增加活跃度,如何做幂等呢?

直接将用户的加分项,直接记录下来,在具体执行加分操作时,进行幂等判断。

每一个用户每一天产生一个活跃度记录表。

使用zset数据结构

  • key:activity_ran_{userId}_{年月日}

  • value:

    • field(value):活跃度更新的key,如访问页面, path_/article/detail/1

    • score: 添加的积分

1.2 榜单积分更新

   /**
     * 分数更新
     *
     *
     * @param key
     * @param value
     * @param score
     * @return
     */
    public static Double zIncrBy(String key, String value, Integer score) {
        return template.execute(new RedisCallback<Double>() {
            @Override
            public Double doInRedis(RedisConnection connection) throws DataAccessException {
                // 对score进行增量更新
                // 对积分表,ZINCRBY key increment field
                return connection.zIncrBy(keyBytes(key), score, valBytes(value));
            }
        });
    }

1.3 具体实现

    /**
     * 添加活跃分
     *
     * @param userId        用于更新活跃积分的用户
     * @param activityScore 触发活跃积分的时间类型
     */
    @Override
    public void addActivityScore(Long userId, ActivityScoreBo activityScore) {
        if (userId == null) {
            return;
        }

        // 1. 计算活跃度(正为加活跃,负为减活跃)
        String field;
        int score = 0;
        if (activityScore.getPath() != null) {
            // 页面访问 + 1分
            field = "path_" + activityScore.getPath();
            score = 1;
        } else if (activityScore.getArticleId() != null) {
            field = activityScore.getArticleId() + "_";
            if (activityScore.getPraise() != null) {
                // 点赞
                field += "praise";
                // 取消点赞 -2 分
                score = BooleanUtils.isTrue(activityScore.getPraise()) ? 2 : -2;
            } else if (activityScore.getCollect() != null) {
                // 收藏
                field += "collect";
                score = BooleanUtils.isTrue(activityScore.getCollect()) ? 2 : -2;
            } else if (activityScore.getRate() != null) {
                // 评论回复
                field += "rate";
                score = BooleanUtils.isTrue(activityScore.getRate()) ? 3 : -3;
            } else if (BooleanUtils.isTrue(activityScore.getPublishArticle())) {
                // 发布文章
                field += "publish";
                score += 10;
            }
        } else if (activityScore.getFollowedUserId() != null) {
            // 关注添加积分
            field = activityScore.getFollowedUserId() + "_follow";
            score = BooleanUtils.isTrue(activityScore.getFollow()) ? 2 : -2;
        } else {
            return;
        }
        // 日活和月活
        final String todayRankKey = todayRankKey(); //eg.activity_rank_20250223
        final String monthRankKey = monthRankKey(); //eg.activity_rank_202502
        // 2. 幂等:判断之前是否有更新过相关的活跃度信息 activity_rank_userId年月日
        final String userActionKey = ACTIVITY_SCORE_KEY + userId + DateUtil.format(DateTimeFormatter.ofPattern("yyyyMMdd"), System.currentTimeMillis());
        // 获取下用户活跃度记录表
        Integer ans = RedisClient.hGet(userActionKey, field, Integer.class);
        if (ans == null) {
            // 2.1 之前没有加分记录,执行具体的加分
            if (score > 0) {
                // 记录加分记录
                RedisClient.hSet(userActionKey, field, score);
                // 个人用户的操作记录,保存一个月的有效期,方便用户查询自己最近31天的活跃情况
                RedisClient.expire(userActionKey, 31 * DateUtil.ONE_DAY_SECONDS);

                // 更新当天和当月的活跃度排行榜
                Double newAns = RedisClient.zIncrBy(todayRankKey, String.valueOf(userId), score);
                RedisClient.zIncrBy(monthRankKey, String.valueOf(userId), score);
                if (log.isDebugEnabled()) {
                    log.info("活跃度更新加分! key#field = {}#{}, add = {}, newScore = {}", todayRankKey, userId, score, newAns);
                }
                if (newAns <= score) {
                    // 由于上面只实现了日/月活跃度的增加,但是没有设置对应的有效期;为了避免持久保存导致redis占用较高;因此这里设定了缓存的有效期
                    // 日活跃榜单,保存31天;月活跃榜单,保存1年
                    // 为什么是 newAns <= score 才设置有效期呢?
                    // 因为 newAns 是用户当天的活跃度,如果发现和需要增加的活跃度 scopre 相等,则表明是今天的首次添加记录,此时设置有效期就比较符合预期了
                    // 但是请注意,下面的实现有两个缺陷:
                    //  1. 对于月的有效期,就变成了本月,每天的首次增加活跃度时,都会重新刷一下它的有效期,这样就和预期中的首次添加缓存时,设置有效期不符
                    //  2. 若先增加活跃度1,再减少活跃度1,然后再加活跃度1,同样会导致重新算了有效期
                    // 严谨一些的写法,应该是 先判断 key 的 ttl, 对于没有设置的才进行设置有效期,如下
                    Long ttl = RedisClient.ttl(todayRankKey);
                    if (!NumUtil.upZero(ttl)) {
                        RedisClient.expire(todayRankKey, 31 * DateUtil.ONE_DAY_SECONDS);
                    }
                    ttl = RedisClient.ttl(monthRankKey);
                    if (!NumUtil.upZero(ttl)) {
                        RedisClient.expire(monthRankKey, 12 * DateUtil.ONE_MONTH_SECONDS);
                    }
                }
            }
        } else if (ans > 0) {
            // 2.2 之前已经加过分,因此这次减分可以执行(用户活跃度记录表直接移除,排行榜减分即可)
            if (score < 0) {
                // 移除用户的活跃执行记录 --> 即移除用来做防重复添加活跃度的幂等键
                Boolean oldHave = RedisClient.hDel(userActionKey, field);
                if (BooleanUtils.isTrue(oldHave)) {
                    // 日排行榜记录表和月排行记录表减分
                    Double newAns = RedisClient.zIncrBy(todayRankKey, String.valueOf(userId), score);
                    RedisClient.zIncrBy(monthRankKey, String.valueOf(userId), score);
                    if (log.isDebugEnabled()) {
                        log.info("活跃度更新减分! key#field = {}#{}, add = {}, newScore = {}", todayRankKey, userId, score, newAns);
                    }
                }
            }
        }
    }

业务逻辑并不复杂,但是仍存在问题

  • 事务问题:多次redis操作,存在事务问题

  • 并发问题:没有做并发,幂等无法100%生效,仍然存在重发添加或者扣减活跃度的问题

1.4 触发活跃度更新

上述只提供了活跃度更新的方法,以及对应的存储方案,如何实现自动更新?

使用Event/Listener方式处理活跃度更新。

文章/用户相关的操作时间监听,并更新对应的活跃度

package com.github.paicoding.forum.service.rank.service.listener;

import com.github.paicoding.forum.api.model.context.ReqInfoContext;
import com.github.paicoding.forum.api.model.enums.ArticleEventEnum;
import com.github.paicoding.forum.api.model.event.ArticleMsgEvent;
import com.github.paicoding.forum.api.model.vo.notify.NotifyMsgEvent;
import com.github.paicoding.forum.service.article.repository.entity.ArticleDO;
import com.github.paicoding.forum.service.comment.repository.entity.CommentDO;
import com.github.paicoding.forum.service.rank.service.UserActivityRankService;
import com.github.paicoding.forum.service.rank.service.model.ActivityScoreBo;
import com.github.paicoding.forum.service.user.repository.entity.UserFootDO;
import com.github.paicoding.forum.service.user.repository.entity.UserRelationDO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * 用户活跃相关的消息监听器
 *
 * @author YiHui
 * @date 2023/8/19
 */
@Component
public class UserActivityListener {
    @Autowired
    private UserActivityRankService userActivityRankService;

    /**
     * 用户操作行为,增加对应的积分
     *
     * @param msgEvent
     */
    @EventListener(classes = NotifyMsgEvent.class)
    @Async
    public void notifyMsgListener(NotifyMsgEvent msgEvent) {
        switch (msgEvent.getNotifyType()) {
            case COMMENT:
            case REPLY:
                CommentDO comment = (CommentDO) msgEvent.getContent();
                userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setRate(true).setArticleId(comment.getArticleId()));
                break;
            case COLLECT:
                UserFootDO foot = (UserFootDO) msgEvent.getContent();
                userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setCollect(true).setArticleId(foot.getDocumentId()));
                break;
            case CANCEL_COLLECT:
                foot = (UserFootDO) msgEvent.getContent();
                userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setCollect(false).setArticleId(foot.getDocumentId()));
                break;
            case PRAISE:
                foot = (UserFootDO) msgEvent.getContent();
                userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPraise(true).setArticleId(foot.getDocumentId()));
                break;
            case CANCEL_PRAISE:
                foot = (UserFootDO) msgEvent.getContent();
                userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPraise(false).setArticleId(foot.getDocumentId()));
                break;
            case FOLLOW:
                UserRelationDO relation = (UserRelationDO) msgEvent.getContent();
                userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setFollow(true).setFollowedUserId(relation.getUserId()));
                break;
            case CANCEL_FOLLOW:
                relation = (UserRelationDO) msgEvent.getContent();
                userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setFollow(false).setFollowedUserId(relation.getUserId()));
                break;
            default:
        }
    }

    /**
     * 发布文章,更新对应的积分
     *
     * @param event
     */
    @Async
    @EventListener(ArticleMsgEvent.class)
    public void publishArticleListener(ArticleMsgEvent<ArticleDO> event) {
        ArticleEventEnum type = event.getType();
        if (type == ArticleEventEnum.ONLINE) {
            userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPublishArticle(true).setArticleId(event.getContent().getId()));
        }
    }

}

发布文章事件

    /**
     * 发布文章,更新对应的积分
     *
     * @param event
     */
    @Async
    @EventListener(ArticleMsgEvent.class)
    public void publishArticleListener(ArticleMsgEvent<ArticleDO> event) {
        ArticleEventEnum type = event.getType();
        if (type == ArticleEventEnum.ONLINE) {
            userActivityRankService.addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPublishArticle(true).setArticleId(event.getContent().getId()));
        }
    }

基于用户浏览行为的活跃度更新,使用Filter/Inteceptor实现

@Slf4j
@Component
public class GlobalViewInterceptor implements AsyncHandlerInterceptor {
    @Autowired
    private GlobalInitService globalInitService;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        if (handler instanceof HandlerMethod) {
            HandlerMethod handlerMethod = (HandlerMethod) handler;
            Permission permission = handlerMethod.getMethod().getAnnotation(Permission.class);
            if (permission == null) {
                permission = handlerMethod.getBeanType().getAnnotation(Permission.class);
            }

            if (permission == null || permission.role() == UserRole.ALL) {
                if (ReqInfoContext.getReqInfo() != null ) {
                    // 用户活跃度更新
                    SpringUtil.getBean(UserActivityRankService.class).addActivityScore(ReqInfoContext.getReqInfo().getUserId(), new ActivityScoreBo().setPath(ReqInfoContext.getReqInfo().getPath()));
                }
                return true;
            }

2 排行榜查询

基本流程

  • 从redis中获取topN的用户+评分

  • 查询用户信息

  • 根据用户评分进行排序,并更新每个用户的排名

    @Override
    public List<RankItemDTO> queryRankList(ActivityRankTimeEnum time, int size) {
        // 获取集合名
        String rankKey = time == ActivityRankTimeEnum.DAY ? todayRankKey() : monthRankKey();
        // 1. 获取topN的活跃用户
        List<ImmutablePair<String, Double>> rankList = RedisClient.zTopNScore(rankKey, size);
        if (CollectionUtils.isEmpty(rankList)) {
            return Collections.emptyList();
        }

        // 2. 查询用户对应的基本信息
        // 构建userId -> 活跃评分的map映射,用于补齐用户信息
        Map<Long, Integer> userScoreMap = rankList.stream().collect(Collectors.toMap(s -> Long.valueOf(s.getLeft()), s -> s.getRight().intValue()));
        // 根据用户id批量查询用户信息
        List<SimpleUserInfoDTO> users = userService.batchQuerySimpleUserInfo(userScoreMap.keySet());

        // 3. 根据评分进行排序
        List<RankItemDTO> rank = users.stream()
                .map(user -> new RankItemDTO().setUser(user).setScore(userScoreMap.getOrDefault(user.getUserId(), 0)))
                .sorted((o1, o2) -> Integer.compare(o2.getScore(), o1.getScore()))
                .collect(Collectors.toList());

        // 4. 补齐每个用户的排名
        IntStream.range(0, rank.size()).forEach(i -> rank.get(i).setRank(i + 1));
        return rank;
    }