睿阳知识库 睿阳知识库
首页
  • npm库配置
  • PC端

    • npm常用命令
    • vue问题记录
    • SEO基础知识及优化
    • 禁止别人调试我的前端页面代码
  • 移动端

    • 小程序
    • Risun.js使用说明
  • Java

    • Maven库配置
    • RSP开发框架
    • RSP框架插件
  • .NET

    • Nuget库配置
  • Python

    • Pypi库配置
  • 常见问题

    • Word转Pdf字体错乱
    • 使用Jacob进行Word导出PDF
  • 自动部署
  • 前端带路径
  • Linux

    • 应用部署
  • Windows

    • 应用部署
  • 视频监控
  • MySQL系列~
  • 应用高可用
  • 静态代码扫描
  • OpenSSH版本升级
  • 区块链~
  • 软件过程文档目录
  • 著作权申请须知及申报示例
  • 项目申报技巧
  • 项目竣工资料清单
  • 科技项目申报流程及注意事项
  • 初级职称申报
  • 产品需求文档基础知识
  • 产品经理需了解的技术知识
  • 墨刀原型设计指南
  • 文档规范
  • 文档规范
  • 投标工作总结(一)
  • 科技项目申报心得体会
  • 数字孪生
  • AI
  • RSP基础平台
  • RMCloud
  • 区块链
  • 网络态势感知
  • 国产化
  • 数据湖
  • 贡献度
  • 文档编写说明
  • Markdown教程
工作链接
首页
  • npm库配置
  • PC端

    • npm常用命令
    • vue问题记录
    • SEO基础知识及优化
    • 禁止别人调试我的前端页面代码
  • 移动端

    • 小程序
    • Risun.js使用说明
  • Java

    • Maven库配置
    • RSP开发框架
    • RSP框架插件
  • .NET

    • Nuget库配置
  • Python

    • Pypi库配置
  • 常见问题

    • Word转Pdf字体错乱
    • 使用Jacob进行Word导出PDF
  • 自动部署
  • 前端带路径
  • Linux

    • 应用部署
  • Windows

    • 应用部署
  • 视频监控
  • MySQL系列~
  • 应用高可用
  • 静态代码扫描
  • OpenSSH版本升级
  • 区块链~
  • 软件过程文档目录
  • 著作权申请须知及申报示例
  • 项目申报技巧
  • 项目竣工资料清单
  • 科技项目申报流程及注意事项
  • 初级职称申报
  • 产品需求文档基础知识
  • 产品经理需了解的技术知识
  • 墨刀原型设计指南
  • 文档规范
  • 文档规范
  • 投标工作总结(一)
  • 科技项目申报心得体会
  • 数字孪生
  • AI
  • RSP基础平台
  • RMCloud
  • 区块链
  • 网络态势感知
  • 国产化
  • 数据湖
  • 贡献度
  • 文档编写说明
  • Markdown教程
工作链接
  • Java

    • Maven库配置
    • RSP开发框架
    • RSP框架插件
    • 框架插件

      • 工作流
      • Word、PDF、Excel增强
      • 消息提醒
      • 资源打包成ZIP下载
      • 区分多个项目Redis缓存
      • 留言评论
      • 内部消息队列
        • 一. 下载源码
        • 二. 插件集成
          • 1. [项目]-mq
          • 2. [项目]-ui
          • 3. 执行SQL
          • 4. 测试(可选)
        • 三. 业务开发
        • 四. 异常补偿机制
        • 五. 技术说明
      • 文件预览增强
      • 在线Excel报表设计
      • 自定义高级查询
  • .NET

  • Python

  • 常见问题

  • 后端
  • Java
  • 框架插件
孙超
2023-02-07
目录

内部消息队列

框架添加了内部消息队列的功能,主版本RSP框架已包含此功能,老版本框架若要使用,请进行如下操作,集成功能:

# 一. 下载源码

下载 内部消息队列插件 (opens new window)。

# 二. 插件集成

innermq

# 1. [项目]-mq

将[project-mq]模块添加到您的项目中,按项目修改pom.xml

<artifactId>[按项目修改project]-mq</artifactId>

<!-- 通用工具-->
<dependency>
  <groupId>com.risun</groupId>
  <artifactId>[按项目修改project]-framework</artifactId>
</dependency>

# 2. [项目]-ui

  • 将mq目录复制到[项目]-ui/src/views/monitor下
  • 将innermq.js复制到[项目]-ui/src/api/monitor下

# 3. 执行SQL

在你的项目中,执行sql/innermq.sql

# 4. 测试(可选)

test/mq下的文件是测试代码,若要进行测试,需进行如下操作:

  • 修改[项目]-admin/pom.xml
<!-- 核心模块 -->
<dependency>
    <groupId>com.risun</groupId>
    <artifactId>[项目]-framework</artifactId>
</dependency>

// 修改为
<!-- 消息队列 -->
<dependency>
    <groupId>com.risun</groupId>
    <artifactId>[项目]-mq</artifactId>
</dependency>
  • 将test/mq下的文件复制到[项目]-admin/src/test/java/com/risun目录下

# 三. 业务开发

使用内部消息队列来处理业务需求,需要编写如下代码。这里以创建订单后,给客户发送短信、邮件为例。

假设已有业务类:Order.java OrderService.java SmsService.java MailService.java

需要编写如下类:OrderInnerMQService.java SmsConsumerHandler.java MailConsumerHandler.java

SmsConsumerHandler.java (短信消费者)
/**
 * 短信消费者
 */
public class SmsConsumerHandler implements IndependentComsumer<Order> {

    private final SmsService smsService;

    public SmsConsumerHandler(SmsService smsService) {
        this.smsService = smsService;
    }

    @Override
    public void onEvent(InnerMQEvent<Order> event, long sequence, boolean endOfBatch) throws Exception {
        // 必须写上这一句
        event.setConsumerClass(SmsConsumerHandler.class.getName());
        // 短信Service发送短息
        smsService.sendSms(event);
    }
}

MailConsumerHandler.java (邮件消费者)
/**
 * 邮件消费者
 */
public class MailConsumerHandler implements IndependentComsumer<Order> {
  
    private final MailService mailService;

    public MailConsumerHandler(MailService mailService) {
      this.mailService = mailService;
    }

    @Override
    public void onEvent(InnerMQEvent<Order> event, long sequence, boolean endOfBatch) throws Exception {
        // 必须写上这一句
        event.setConsumerClass(MailConsumerHandler.class.getName());
        // 邮件Service发送邮件
        mailService.sendMail(event);
    }
}

OrderInnerMQService.java (订单消息队列服务类)
/**
 * 订单消息队列服务类(发布消息以及设置消费者处理方式)
 */
@Service("orderInnerMQService")
public class OrderInnerMQService extends InnerMQService<Order> {
  
    @Autowired
    private SmsService smsService;

    @Autowired
    private MailService mailService;

    @Override
    protected void bizConsume() {
        // 指定短信消费者和邮件消费者来处理订单
        innerMQ.handleEventsWith(new SmsConsumerHandler(smsService), new MailConsumerHandler(mailService));
    }
}

OrderService.java (已存在,填写消息队列逻辑)
/**
 * 订单服务类
 */
public class OrderService {

    @Autowired
    @Qualifier("orderInnerMQService")
    private InnerMQService orderInnerMQService;
  
    /**
     * 创建订单
     */
    public void createOrder() {
        // 创建订单
        Order order = 创建好的订单
        // 发布订单消息
        InnerMQEvent<Order> event = new InnerMQEvent<>();
        event.setPayload(order);
        orderInnerMQService.publish(event);
    }
}

# 四. 异常补偿机制

内部消息队列中,所有对消息的操作都是在内存中进行的,所以在业务逻辑中处理消息出现异常时,该消息就无法再被消费处理,这样有时会导致业务逻辑的不完整。

例如:客户消费后,系统会创建订单、增加客户积分。创建订单时发布消息,增加客户积分接收消息进行处理。当增加客户积分操作出现异常时,该客户的积分就没有增加。

为了解决这个问题,框架采用事后补偿机制来确保一次交易的一致性,业务消费消息出现异常时,消息会持久化到 sys_inner_mq_exception 表中,业务通过该表进行业务处理。

# 五. 技术说明

内部消息队列功能是基于开源并发框架Disruptor (opens new window)实现的。其作用和阻塞队列(BlockingQueue)类似,都是在相同进程内、不同线程间传递数据(例如消息、事件),另外disruptor也有自己的一些特色:

  • 以广播的形式发布事件,并且消费者之间存在依赖关系;
  • 为事件提前分配内存;
  • 无锁算法;
disruptor
  • 消费模式,包括独立消费和共同消费两种。

    例如:每产生一个订单都要发邮件和短信通知买家,如果产生了十个订单,要发十封邮件和十条短信,此时,邮件系统和短信系统是各自独立的,他们各自消费这十个订单的事件。 假设邮件系统处理能力差,为了提升处理能力,部署了两台邮件服务器,因此是这两台邮件服务器共同处理十个订单事件,合起来一共发送了十封邮件,一号邮件服务器和二号邮件服务器是共同消费。

consumer
留言评论
文件预览增强

← 留言评论 文件预览增强→

最近更新
01
vue问题记录
10-11
02
RSP进度
10-09
03
贡献度
09-28
更多文章>
Copyright © 2014-2025 甘肃睿阳科技有限公司 陇ICP备15001783-1号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式