|
@@ -0,0 +1,259 @@
|
|
|
|
+package com.miniframe.service.mq;
|
|
|
|
+
|
|
|
|
+import com.miniframe.core.ext.UtilTools;
|
|
|
|
+import com.miniframe.spring.mq.*;
|
|
|
|
+
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
+import org.springframework.amqp.core.AcknowledgeMode;
|
|
|
|
+import org.springframework.amqp.core.Binding;
|
|
|
|
+import org.springframework.amqp.core.Message;
|
|
|
|
+import org.springframework.amqp.core.Queue;
|
|
|
|
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
|
|
|
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
|
|
|
+import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
|
|
|
|
+import org.springframework.amqp.support.converter.MessageConverter;
|
|
|
|
+
|
|
|
|
+import java.util.*;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+public abstract class CaeViewMFMqDynamic extends AbstractMFMq {
|
|
|
|
+ private static Logger log = LoggerFactory.getLogger(CaeViewMFMqDynamic.class);
|
|
|
|
+ private List<String> listenQueue = new ArrayList();
|
|
|
|
+ private SimpleMessageListenerContainer messageListenerContainer = null;
|
|
|
|
+ private MFMqListenerInfo mfMqListenerInfo;
|
|
|
|
+ private Map<String, IMFMqOp> imfMqOpMap = new HashMap();
|
|
|
|
+ private IMFMqOp defOp = null;
|
|
|
|
+
|
|
|
|
+ public MFMqListenerInfo declareListenerInfo() {
|
|
|
|
+ return getDefMqListenerInfo();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public String[] declareListenerQueueLocal() {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public String[] declareListenerQueueGlobal() {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public CaeViewMFMqDynamic(MFMqConfig mfMqConfig) {
|
|
|
|
+ super(mfMqConfig);
|
|
|
|
+ if (this.declareListenerInfo() != null) {
|
|
|
|
+ this.mfMqListenerInfo = this.declareListenerInfo();
|
|
|
|
+ } else {
|
|
|
|
+ this.mfMqListenerInfo = new MFMqListenerInfo();
|
|
|
|
+ this.mfMqListenerInfo.setPrefetchCount(mfMqConfig.getRabbitProperties().getListener().getSimple().getPrefetch());
|
|
|
|
+ this.mfMqListenerInfo.setAcknowledgeMode(mfMqConfig.getRabbitProperties().getListener().getSimple().getAcknowledgeMode());
|
|
|
|
+ this.mfMqListenerInfo.setConcurrentConsumers(mfMqConfig.getRabbitProperties().getListener().getSimple().getConcurrency());
|
|
|
|
+ this.mfMqListenerInfo.setMaxConcurrentConsumers(mfMqConfig.getRabbitProperties().getListener().getSimple().getMaxConcurrency());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ List<String> queueLocalList = null;
|
|
|
|
+ List<String> queueGlobalList = null;
|
|
|
|
+ if (this.declareListenerQueueLocal() != null) {
|
|
|
|
+ queueLocalList = Arrays.asList(this.declareListenerQueueLocal());
|
|
|
|
+ queueLocalList = (List)queueLocalList.stream().filter((str) -> {
|
|
|
|
+
|
|
|
|
+ return UtilTools.isNotNullAndBlank(str);
|
|
|
|
+ }).map((str) -> {
|
|
|
|
+ return this.toQueueLocalRealName(str);
|
|
|
|
+ }).distinct().collect(Collectors.toList());
|
|
|
|
+ } else {
|
|
|
|
+ queueLocalList = this.getQueueLocalNameList();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (this.declareListenerQueueGlobal() != null) {
|
|
|
|
+ queueGlobalList = Arrays.asList(this.declareListenerQueueGlobal());
|
|
|
|
+ queueGlobalList = (List)queueGlobalList.stream().filter((str) -> {
|
|
|
|
+ return UtilTools.isNotNullAndBlank(str);
|
|
|
|
+ }).distinct().collect(Collectors.toList());
|
|
|
|
+ } else {
|
|
|
|
+ queueGlobalList = this.getQueueGlobalNameList();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ this.listenQueue.addAll(queueLocalList);
|
|
|
|
+ this.listenQueue.addAll(queueGlobalList);
|
|
|
|
+ this.initContainer();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public String[] getListenQueueArray() {
|
|
|
|
+ String[] arr = (String[])this.listenQueue.toArray(new String[this.listenQueue.size()]);
|
|
|
|
+ return arr;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static MFMqListenerInfo getDefMqListenerInfo() {
|
|
|
|
+ return new MFMqListenerInfo();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void initContainer() {
|
|
|
|
+ this.messageListenerContainer = this.messageListenerContainer(this.getMfMqConfig().getConnectionFactory());
|
|
|
|
+ if (this.messageListenerContainer != null) {
|
|
|
|
+ this.start();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private SimpleMessageListenerContainer messageListenerContainer(CachingConnectionFactory connectionFactory) {
|
|
|
|
+ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
|
|
|
|
+ container.setQueueNames(this.getListenQueueArray());
|
|
|
|
+ container.setExposeListenerChannel(true);
|
|
|
|
+ container.setPrefetchCount(this.mfMqListenerInfo.getPrefetchCount());
|
|
|
|
+ container.setConcurrentConsumers(this.mfMqListenerInfo.getConcurrentConsumers());
|
|
|
|
+ container.setMaxConcurrentConsumers(this.mfMqListenerInfo.getMaxConcurrentConsumers());
|
|
|
|
+ container.setAcknowledgeMode(this.mfMqListenerInfo.getAcknowledgeMode());
|
|
|
|
+ container.setMessageListener(new CaeViewMFMqDynamic.MessageConsumerHandler());
|
|
|
|
+ return container;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void start() {
|
|
|
|
+ if (this.messageListenerContainer != null && !this.messageListenerContainer.isRunning()) {
|
|
|
|
+ this.messageListenerContainer.start();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public List<String> getQueueNameListInContainer() {
|
|
|
|
+ return (List)this.getQueueNameMapInContainer().keySet().stream().collect(Collectors.toList());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public Map<String, String> getQueueNameMapInContainer() {
|
|
|
|
+ Map<String, String> map = new HashMap();
|
|
|
|
+ if (this.messageListenerContainer != null) {
|
|
|
|
+ String[] queues = this.messageListenerContainer.getQueueNames();
|
|
|
|
+ String[] var3 = queues;
|
|
|
|
+ int var4 = queues.length;
|
|
|
|
+
|
|
|
|
+ for(int var5 = 0; var5 < var4; ++var5) {
|
|
|
|
+ String q = var3[var5];
|
|
|
|
+ map.put(q, q);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return map;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void addQueueName(String queueName) {
|
|
|
|
+ this.addQueueName(queueName, MFMqInfo.MFMQExchangeType.LOCAL_DATA, (IMFMqOp)null);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void addQueueName(String queueName, MFMqInfo.MFMQExchangeType exchangeType) {
|
|
|
|
+ this.addQueueName(queueName, exchangeType, (IMFMqOp)null);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void addQueueName(String queueName, MFMqInfo.MFMQExchangeType exchangeType, IMFMqOp imfMqOp) {
|
|
|
|
+ if (!exchangeType.isValid()) {
|
|
|
|
+ exchangeType = MFMqInfo.MFMQExchangeType.LOCAL_DATA;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String queueRealName = queueName;
|
|
|
|
+ if (exchangeType.isLocal()) {
|
|
|
|
+ queueRealName = this.toQueueLocalRealName(queueName);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!this.getQueueNameMapInContainer().containsKey(queueRealName)) {
|
|
|
|
+ Queue queue;
|
|
|
|
+ queue = this.getMfMqConfig().createQueue(queueRealName, exchangeType);
|
|
|
|
+ Binding binding = this.getMfMqConfig().createBindingToExchange(queue, queueName, exchangeType);
|
|
|
|
+ this.getMfMqConfig().getAmqpAdmin().declareQueue(queue);
|
|
|
|
+ this.getMfMqConfig().getAmqpAdmin().declareBinding(binding);
|
|
|
|
+ if (this.messageListenerContainer != null) {
|
|
|
|
+ try {
|
|
|
|
+ this.messageListenerContainer.addQueueNames(new String[]{queueRealName});
|
|
|
|
+ } catch (Exception var8) {
|
|
|
|
+ log.error("addQueueNames ", var8);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ this.imfMqOpMap.put(queueRealName, imfMqOp);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public IMFMqOp getLocalMqOp(String queueName) {
|
|
|
|
+ return (IMFMqOp)this.imfMqOpMap.get(this.toQueueLocalRealName(queueName));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void setLocalMqOp(IMFMqOp imfMqOp, String... queueName) {
|
|
|
|
+ String[] var3 = queueName;
|
|
|
|
+ int var4 = queueName.length;
|
|
|
|
+
|
|
|
|
+ for(int var5 = 0; var5 < var4; ++var5) {
|
|
|
|
+ String name = var3[var5];
|
|
|
|
+ this.imfMqOpMap.put(this.toQueueLocalRealName(name), imfMqOp);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public IMFMqOp getGlobalMqOp(String queueName) {
|
|
|
|
+ return (IMFMqOp)this.imfMqOpMap.get(queueName);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void setGlobalMqOp(IMFMqOp imfMqOp, String... queueName) {
|
|
|
|
+ String[] var3 = queueName;
|
|
|
|
+ int var4 = queueName.length;
|
|
|
|
+
|
|
|
|
+ for(int var5 = 0; var5 < var4; ++var5) {
|
|
|
|
+ String name = var3[var5];
|
|
|
|
+ this.imfMqOpMap.put(name, imfMqOp);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void setDefOp(IMFMqOp imfMqOp) {
|
|
|
|
+ this.defOp = imfMqOp;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public IMFMqOp getDefOp() {
|
|
|
|
+ return this.defOp;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private class MessageConsumerHandler implements ChannelAwareMessageListener {
|
|
|
|
+ private MessageConsumerHandler() {
|
|
|
|
+ }
|
|
|
|
+ @Override
|
|
|
|
+ public void onMessage(Message message, Channel channel) throws Exception {
|
|
|
|
+ String data = "";
|
|
|
|
+ data = new String(message.getBody());
|
|
|
|
+ Long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
|
|
|
+ Map<String, Object> headers = new HashMap();
|
|
|
|
+ headers.put("amqp_deliveryTag", message.getMessageProperties().getDeliveryTag());
|
|
|
|
+ headers.put("amqp_consumerQueue", message.getMessageProperties().getConsumerQueue());
|
|
|
|
+ headers.put("amqp_consumerTag", message.getMessageProperties().getConsumerTag());
|
|
|
|
+ headers.put("amqp_receivedExchange", message.getMessageProperties().getReceivedExchange());
|
|
|
|
+ headers.put("amqp_receivedRoutingKey", message.getMessageProperties().getReceivedRoutingKey());
|
|
|
|
+ try {
|
|
|
|
+ boolean isOp = true;
|
|
|
|
+ IMFMqOp imfMqOp = (IMFMqOp)CaeViewMFMqDynamic.this.imfMqOpMap.getOrDefault(message.getMessageProperties().getConsumerQueue(), (IMFMqOp)null);
|
|
|
|
+ if (imfMqOp == null) {
|
|
|
|
+ imfMqOp = CaeViewMFMqDynamic.this.defOp;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (imfMqOp != null) {
|
|
|
|
+ try {
|
|
|
|
+ isOp = imfMqOp.handle(data, deliveryTag, headers, channel);
|
|
|
|
+ } catch (Exception var10) {
|
|
|
|
+ isOp = false;
|
|
|
|
+ CaeViewMFMqDynamic.log.error("Handle :" + data, var10);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (CaeViewMFMqDynamic.this.mfMqListenerInfo.getAcknowledgeMode() == AcknowledgeMode.MANUAL) {
|
|
|
|
+ if (isOp) {
|
|
|
|
+ CaeViewMFMqDynamic.this.getMfMqConfig().consumerAck(channel, deliveryTag);
|
|
|
|
+ } else {
|
|
|
|
+ CaeViewMFMqDynamic.this.getMfMqConfig().consumerNAck(channel, deliveryTag, false, true);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ } catch (Exception var11) {
|
|
|
|
+ CaeViewMFMqDynamic.log.error("onMessage : ", var11);
|
|
|
|
+ throw var11;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+}
|