package com.miniframe.service.mq; import com.miniframe.aftercure.ActionMsg; import com.miniframe.core.ext.UtilTools; import com.miniframe.spring.mq.*; import com.miniframe.tools.XiJsonUtil; import com.miniframe.websocket.WebsocketEndPoint; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * MQ 创建时会根据enableDlx值决定是否创建 队列对应的DLX队列,用于队列超时后进入DLX。 * MQ消息处理,获取队列名称 * getQueueNameArray:当前实例 所有队列名称 * getQueueNameArrayInDataExchange :当前实例 所有数据交换机上队列名称 * getQueueNameArrayInDlxExchange :当前实例 所有DLX交换机上的队列名称 * getQueueLocalNameArray : 当前实例 所有本地服务定义的交换机上队列名称(如传入参数 ,选择参数对应的队列) * getQueueLocalNameArrayInDataExchange :当前实例 所有本地服务定义的数据换机上队列名称 * getQueueLocalNameArrayInDlxExchange :当前实例 所有本地服务定义的DLX交换机上队列名称 * getQueueGlobalNameArray : 当前实例 所有全局定义的交换机上队列名称(如传入参数 ,选择参数对应的队列) * getQueueGlobalNameArrayInDataExchange :当前实例 所有全局定义的数据换机上队列名称 * getQueueGlobalNameArrayInDlxExchange :当前实例 所有全局定义的DLX交换机上队列名称 * getQueueNameWithIndex : 当前实例 按编号获取队列 * * 动态设置 * declareListenerInfo()函数 设置listenerInfo信息 * declareListenerQueueLocal()函数 设置当前实例需要监听的本地服务队列,不设置不会监听,设置null,监听当前实例添加的全部本地服务队列 * declareListenerQueueGlobal()函数 设置当前实例需要监听的全局队列,不设置不会监听,设置null,监听当前实例添加的全部全局队列 * setLocalMqOp setGlobalMqOp 设置队列的处理函数 ,setDefOp 设置默认处理函数 * * declareQueue 设置需要添加的队列信息 MFMqInfo * 发送数据 * sendData不传name ,默认发送 index=0的队列 */ @Component @ConditionalOnProperty(prefix = "mf.mq", name = "enable", havingValue = "true", matchIfMissing = false) public class CaeMQ extends CaeViewMFMqDynamic { public CaeMQ(MFMqConfig mfMqConfig) { super(mfMqConfig); // setLocalMqOp(new CaeMqOp(),"caedyout");//设置监听 setLocalMqOp(new CaeMqOp(),"caeout");//设置监听 // setDefOp(new CaeMqOp());//设置默认监听 } @Override public MFMqListenerInfo declareListenerInfo() { return new MFMqListenerInfo(AcknowledgeMode.MANUAL); } @Override public String[] declareListenerQueueLocal(){ return new String[]{"caeout"}; // return new String[]{}; } @Override public List declareQueue() { //TTLMS: 0,不设置x-message-ttl 参数 //enableDlx : false 不申明dlx queue List mfMqInfoList=new ArrayList<>(); mfMqInfoList.add(new MFMqInfo("caein","caein",0,false,MFMqInfo.MFMQExchangeType.GLOBAL_DATA)); mfMqInfoList.add(new MFMqInfo("caeout","caeout",0,false,MFMqInfo.MFMQExchangeType.GLOBAL_DATA)); return mfMqInfoList; } /** * 向未处理信息队列 发送消息 * @param data */ public void sendCaein(String data){ sendData("caein",data); } /** * 向未处理信息队列 发送消息 * @param data */ public void sendCaeout(String data){ sendData("caeout",data); } /** * 返回消息队列 数据处理 */ public static class CaeMqOp implements IMFMqOp { @Override public boolean handle(String data, long deliveryTag, Map headers, Channel channel) throws Exception { // System.out.println("Queue : "+getAMQPConsumerQueue(headers)+", Data : "+ data); try { ActionMsg msg= XiJsonUtil.jsonToPojo(data, ActionMsg.class); WebsocketEndPoint point = (WebsocketEndPoint) UtilTools.getBean("websocketEndPoint"); point.sendMessageToUser(msg.getProId(),data); }catch (Exception e){ e.printStackTrace(); } return true; } } }