123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package com.miniframe.service.mq;
- import com.miniframe.aftercure.ActionMsg;
- import com.miniframe.core.ext.UtilTools;
- import com.miniframe.spring.mq.*;
- import com.miniframe.tools.XiJsonUtil;
- import com.miniframe.websocket.WebsocketEndPoint;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.AcknowledgeMode;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.stereotype.Component;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- /**
- * MQ 创建时会根据enableDlx值决定是否创建 队列对应的DLX队列,用于队列超时后进入DLX。
- * MQ消息处理,获取队列名称
- * getQueueNameArray:当前实例 所有队列名称
- * getQueueNameArrayInDataExchange :当前实例 所有数据交换机上队列名称
- * getQueueNameArrayInDlxExchange :当前实例 所有DLX交换机上的队列名称
- * getQueueLocalNameArray : 当前实例 所有本地服务定义的交换机上队列名称(如传入参数 ,选择参数对应的队列)
- * getQueueLocalNameArrayInDataExchange :当前实例 所有本地服务定义的数据换机上队列名称
- * getQueueLocalNameArrayInDlxExchange :当前实例 所有本地服务定义的DLX交换机上队列名称
- * getQueueGlobalNameArray : 当前实例 所有全局定义的交换机上队列名称(如传入参数 ,选择参数对应的队列)
- * getQueueGlobalNameArrayInDataExchange :当前实例 所有全局定义的数据换机上队列名称
- * getQueueGlobalNameArrayInDlxExchange :当前实例 所有全局定义的DLX交换机上队列名称
- * getQueueNameWithIndex : 当前实例 按编号获取队列
- *
- * 动态设置
- * declareListenerInfo()函数 设置listenerInfo信息
- * declareListenerQueueLocal()函数 设置当前实例需要监听的本地服务队列,不设置不会监听,设置null,监听当前实例添加的全部本地服务队列
- * declareListenerQueueGlobal()函数 设置当前实例需要监听的全局队列,不设置不会监听,设置null,监听当前实例添加的全部全局队列
- * setLocalMqOp setGlobalMqOp 设置队列的处理函数 ,setDefOp 设置默认处理函数
- *
- * declareQueue 设置需要添加的队列信息 MFMqInfo
- * 发送数据
- * sendData不传name ,默认发送 index=0的队列
- */
- @Component
- @ConditionalOnProperty(prefix = "mf.mq", name = "enable", havingValue = "true", matchIfMissing = false)
- public class CaeMQ extends CaeViewMFMqDynamic {
- public CaeMQ(MFMqConfig mfMqConfig) {
- super(mfMqConfig);
- // setLocalMqOp(new CaeMqOp(),"caedyout");//设置监听
- setLocalMqOp(new CaeMqOp(),"caeout");//设置监听
- // setDefOp(new CaeMqOp());//设置默认监听
- }
- @Override
- public MFMqListenerInfo declareListenerInfo() {
- return new MFMqListenerInfo(AcknowledgeMode.MANUAL);
- }
- @Override
- public String[] declareListenerQueueLocal(){
- return new String[]{"caeout"};
- // return new String[]{};
- }
- @Override
- public List<MFMqInfo> declareQueue() {
- //TTLMS: 0,不设置x-message-ttl 参数
- //enableDlx : false 不申明dlx queue
- List<MFMqInfo> mfMqInfoList=new ArrayList<>();
- mfMqInfoList.add(new MFMqInfo("caein","caein",0,false,MFMqInfo.MFMQExchangeType.GLOBAL_DATA));
- mfMqInfoList.add(new MFMqInfo("caeout","caeout",0,false,MFMqInfo.MFMQExchangeType.GLOBAL_DATA));
- return mfMqInfoList;
- }
- /**
- * 向未处理信息队列 发送消息
- * @param data
- */
- public void sendCaein(String data){
- sendData("caein",data);
- }
- /**
- * 向未处理信息队列 发送消息
- * @param data
- */
- public void sendCaeout(String data){
- sendData("caeout",data);
- }
- /**
- * 返回消息队列 数据处理
- */
- public static class CaeMqOp implements IMFMqOp {
- @Override
- public boolean handle(String data, long deliveryTag, Map<String, Object> headers, Channel channel) throws Exception {
- // System.out.println("Queue : "+getAMQPConsumerQueue(headers)+", Data : "+ data);
- try {
- ActionMsg msg= XiJsonUtil.jsonToPojo(data, ActionMsg.class);
- WebsocketEndPoint point = (WebsocketEndPoint) UtilTools.getBean("websocketEndPoint");
- point.sendMessageToUser(msg.getProId(),data);
- }catch (Exception e){
- e.printStackTrace();
- }
- return true;
- }
- }
- }
|