CaeMQ.java 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package com.miniframe.service.mq;
  2. import com.miniframe.aftercure.ActionMsg;
  3. import com.miniframe.core.ext.UtilTools;
  4. import com.miniframe.spring.mq.*;
  5. import com.miniframe.tools.XiJsonUtil;
  6. import com.miniframe.websocket.WebsocketEndPoint;
  7. import com.rabbitmq.client.Channel;
  8. import org.springframework.amqp.core.AcknowledgeMode;
  9. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  10. import org.springframework.stereotype.Component;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. import java.util.Map;
  14. /**
  15. * MQ 创建时会根据enableDlx值决定是否创建 队列对应的DLX队列,用于队列超时后进入DLX。
  16. * MQ消息处理,获取队列名称
  17. * getQueueNameArray:当前实例 所有队列名称
  18. * getQueueNameArrayInDataExchange :当前实例 所有数据交换机上队列名称
  19. * getQueueNameArrayInDlxExchange :当前实例 所有DLX交换机上的队列名称
  20. * getQueueLocalNameArray : 当前实例 所有本地服务定义的交换机上队列名称(如传入参数 ,选择参数对应的队列)
  21. * getQueueLocalNameArrayInDataExchange :当前实例 所有本地服务定义的数据换机上队列名称
  22. * getQueueLocalNameArrayInDlxExchange :当前实例 所有本地服务定义的DLX交换机上队列名称
  23. * getQueueGlobalNameArray : 当前实例 所有全局定义的交换机上队列名称(如传入参数 ,选择参数对应的队列)
  24. * getQueueGlobalNameArrayInDataExchange :当前实例 所有全局定义的数据换机上队列名称
  25. * getQueueGlobalNameArrayInDlxExchange :当前实例 所有全局定义的DLX交换机上队列名称
  26. * getQueueNameWithIndex : 当前实例 按编号获取队列
  27. *
  28. * 动态设置
  29. * declareListenerInfo()函数 设置listenerInfo信息
  30. * declareListenerQueueLocal()函数 设置当前实例需要监听的本地服务队列,不设置不会监听,设置null,监听当前实例添加的全部本地服务队列
  31. * declareListenerQueueGlobal()函数 设置当前实例需要监听的全局队列,不设置不会监听,设置null,监听当前实例添加的全部全局队列
  32. * setLocalMqOp setGlobalMqOp 设置队列的处理函数 ,setDefOp 设置默认处理函数
  33. *
  34. * declareQueue 设置需要添加的队列信息 MFMqInfo
  35. * 发送数据
  36. * sendData不传name ,默认发送 index=0的队列
  37. */
  38. @Component
  39. @ConditionalOnProperty(prefix = "mf.mq", name = "enable", havingValue = "true", matchIfMissing = false)
  40. public class CaeMQ extends CaeViewMFMqDynamic {
  41. public CaeMQ(MFMqConfig mfMqConfig) {
  42. super(mfMqConfig);
  43. // setLocalMqOp(new CaeMqOp(),"caedyout");//设置监听
  44. setLocalMqOp(new CaeMqOp(),"caeout");//设置监听
  45. // setDefOp(new CaeMqOp());//设置默认监听
  46. }
  47. @Override
  48. public MFMqListenerInfo declareListenerInfo() {
  49. return new MFMqListenerInfo(AcknowledgeMode.MANUAL);
  50. }
  51. @Override
  52. public String[] declareListenerQueueLocal(){
  53. return new String[]{"caeout"};
  54. // return new String[]{};
  55. }
  56. @Override
  57. public List<MFMqInfo> declareQueue() {
  58. //TTLMS: 0,不设置x-message-ttl 参数
  59. //enableDlx : false 不申明dlx queue
  60. List<MFMqInfo> mfMqInfoList=new ArrayList<>();
  61. mfMqInfoList.add(new MFMqInfo("caein","caein",0,false,MFMqInfo.MFMQExchangeType.GLOBAL_DATA));
  62. mfMqInfoList.add(new MFMqInfo("caeout","caeout",0,false,MFMqInfo.MFMQExchangeType.GLOBAL_DATA));
  63. return mfMqInfoList;
  64. }
  65. /**
  66. * 向未处理信息队列 发送消息
  67. * @param data
  68. */
  69. public void sendCaein(String data){
  70. sendData("caein",data);
  71. }
  72. /**
  73. * 向未处理信息队列 发送消息
  74. * @param data
  75. */
  76. public void sendCaeout(String data){
  77. sendData("caeout",data);
  78. }
  79. /**
  80. * 返回消息队列 数据处理
  81. */
  82. public static class CaeMqOp implements IMFMqOp {
  83. @Override
  84. public boolean handle(String data, long deliveryTag, Map<String, Object> headers, Channel channel) throws Exception {
  85. // System.out.println("Queue : "+getAMQPConsumerQueue(headers)+", Data : "+ data);
  86. try {
  87. ActionMsg msg= XiJsonUtil.jsonToPojo(data, ActionMsg.class);
  88. WebsocketEndPoint point = (WebsocketEndPoint) UtilTools.getBean("websocketEndPoint");
  89. point.sendMessageToUser(msg.getProId(),data);
  90. }catch (Exception e){
  91. e.printStackTrace();
  92. }
  93. return true;
  94. }
  95. }
  96. }