CaeMQ.java 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package com.miniframe.service.mq;
  2. import com.miniframe.aftercure.ActionMsg;
  3. import com.miniframe.core.ext.UtilTools;
  4. import com.miniframe.generate.appcode.Processing;
  5. import com.miniframe.model.system.AdiSolverConfigImg;
  6. import com.miniframe.model.system.dao.AdiSolverConfigImgMapper;
  7. import com.miniframe.spring.mq.IMFMqOp;
  8. import com.miniframe.spring.mq.MFMqConfig;
  9. import com.miniframe.spring.mq.MFMqInfo;
  10. import com.miniframe.spring.mq.MFMqListenerInfo;
  11. import com.miniframe.tools.XIDateTimeUtils;
  12. import com.miniframe.tools.XiJsonUtil;
  13. import com.miniframe.websocket.WebsocketEndPoint;
  14. import com.rabbitmq.client.Channel;
  15. import org.springframework.amqp.core.AcknowledgeMode;
  16. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  17. import org.springframework.stereotype.Component;
  18. import tk.mybatis.mapper.util.StringUtil;
  19. import java.util.ArrayList;
  20. import java.util.List;
  21. import java.util.Map;
  22. /**
  23. * MQ 创建时会根据enableDlx值决定是否创建 队列对应的DLX队列,用于队列超时后进入DLX。
  24. * MQ消息处理,获取队列名称
  25. * getQueueNameArray:当前实例 所有队列名称
  26. * getQueueNameArrayInDataExchange :当前实例 所有数据交换机上队列名称
  27. * getQueueNameArrayInDlxExchange :当前实例 所有DLX交换机上的队列名称
  28. * getQueueLocalNameArray : 当前实例 所有本地服务定义的交换机上队列名称(如传入参数 ,选择参数对应的队列)
  29. * getQueueLocalNameArrayInDataExchange :当前实例 所有本地服务定义的数据换机上队列名称
  30. * getQueueLocalNameArrayInDlxExchange :当前实例 所有本地服务定义的DLX交换机上队列名称
  31. * getQueueGlobalNameArray : 当前实例 所有全局定义的交换机上队列名称(如传入参数 ,选择参数对应的队列)
  32. * getQueueGlobalNameArrayInDataExchange :当前实例 所有全局定义的数据换机上队列名称
  33. * getQueueGlobalNameArrayInDlxExchange :当前实例 所有全局定义的DLX交换机上队列名称
  34. * getQueueNameWithIndex : 当前实例 按编号获取队列
  35. *
  36. * 动态设置
  37. * declareListenerInfo()函数 设置listenerInfo信息
  38. * declareListenerQueueLocal()函数 设置当前实例需要监听的本地服务队列,不设置不会监听,设置null,监听当前实例添加的全部本地服务队列
  39. * declareListenerQueueGlobal()函数 设置当前实例需要监听的全局队列,不设置不会监听,设置null,监听当前实例添加的全部全局队列
  40. * setLocalMqOp setGlobalMqOp 设置队列的处理函数 ,setDefOp 设置默认处理函数
  41. *
  42. * declareQueue 设置需要添加的队列信息 MFMqInfo
  43. * 发送数据
  44. * sendData不传name ,默认发送 index=0的队列
  45. */
  46. @Component
  47. @ConditionalOnProperty(prefix = "mf.mq", name = "enable", havingValue = "true", matchIfMissing = false)
  48. public class CaeMQ extends CaeViewMFMqDynamic {
  49. public CaeMQ(MFMqConfig mfMqConfig) {
  50. super(mfMqConfig);
  51. // setLocalMqOp(new CaeMqOp(),"caedyout");//设置监听
  52. setLocalMqOp(new CaeMqOp(),"caeout");//设置监听
  53. // setDefOp(new CaeMqOp());//设置默认监听
  54. }
  55. @Override
  56. public MFMqListenerInfo declareListenerInfo() {
  57. return new MFMqListenerInfo(AcknowledgeMode.MANUAL);
  58. }
  59. @Override
  60. public String[] declareListenerQueueLocal(){
  61. return new String[]{"caeout"};
  62. // return new String[]{};
  63. }
  64. @Override
  65. public List<MFMqInfo> declareQueue() {
  66. //TTLMS: 0,不设置x-message-ttl 参数
  67. //enableDlx : false 不申明dlx queue
  68. List<MFMqInfo> mfMqInfoList=new ArrayList<>();
  69. // mfMqInfoList.add(new MFMqInfo("caein","caein",30000l,true));
  70. // mfMqInfoList.add(new MFMqInfo("caeout","caeout",30000l,true));
  71. mfMqInfoList.add(new MFMqInfo("caein","caein",0,false,MFMqInfo.MFMQExchangeType.GLOBAL_DATA));
  72. mfMqInfoList.add(new MFMqInfo("caeout","caeout",0,false,MFMqInfo.MFMQExchangeType.GLOBAL_DATA));
  73. return mfMqInfoList;
  74. }
  75. /**+
  76. * 向未处理信息队列 发送消息
  77. * @param data
  78. */
  79. public void sendCaein(String data){
  80. sendData("caein",data);
  81. }
  82. /**
  83. * 向未处理信息队列 发送消息
  84. * @param data
  85. */
  86. public void sendCaeout(String data){
  87. sendData("caeout",data);
  88. }
  89. /**
  90. * 返回消息队列 数据处理
  91. */
  92. public static class CaeMqOp implements IMFMqOp {
  93. @Override
  94. public boolean handle(String data, long deliveryTag, Map<String, Object> headers, Channel channel) throws Exception {
  95. System.out.println("Queue : "+getAMQPConsumerQueue(headers)+", Data : "+ data);
  96. try {
  97. ActionMsg msg= XiJsonUtil.jsonToPojo(data, ActionMsg.class);
  98. WebsocketEndPoint point = (WebsocketEndPoint) UtilTools.getBean("websocketEndPoint");
  99. if(!StringUtil.isEmpty(msg.getImg())){
  100. if(msg.getAction().equals("animation")){//动画
  101. AdiSolverConfigImgMapper configImgMapper= UtilTools.getBean(AdiSolverConfigImgMapper.class);
  102. AdiSolverConfigImg img =new AdiSolverConfigImg();
  103. img.setId(UtilTools.getUUid());
  104. img.setPid(msg.getProId());
  105. img.setSolverConfigId(msg.getSolverConfigid());
  106. img.setCreateTime(XIDateTimeUtils.getNowDate());
  107. img.setStep(msg.getStep());
  108. img.setType(Processing.post.getIndex());
  109. img.setImg(msg.getImg());
  110. img.setAction(msg.getAction());
  111. img.setAnimationtype(msg.getAnimationType());
  112. img.setIsvolume(msg.getIsVolume());
  113. configImgMapper.insert(img);
  114. img.setImg("");
  115. point.sendMessageToUser(msg.getProId(),XiJsonUtil.objectToJson(img));//发送加载进度
  116. }else{
  117. AdiSolverConfigImgMapper configImgMapper= UtilTools.getBean(AdiSolverConfigImgMapper.class);
  118. AdiSolverConfigImg img =new AdiSolverConfigImg();
  119. img.setId(UtilTools.getUUid());
  120. img.setPid(msg.getProId());
  121. img.setSolverConfigId(msg.getSolverConfigid());
  122. img.setCreateTime(XIDateTimeUtils.getNowDate());
  123. img.setStep(msg.getStep());
  124. img.setType(Processing.pre.getIndex());
  125. img.setImg(msg.getImg());
  126. configImgMapper.insert(img);
  127. point.sendMessageToUser(msg.getProId(),data);
  128. }
  129. }else{
  130. point.sendMessageToUser(msg.getProId(),data);
  131. }
  132. }catch (Exception e){
  133. e.printStackTrace();
  134. }
  135. return true;
  136. }
  137. }
  138. }