Browse Source

mq websocket 集成

hxx 2 years ago
parent
commit
2030a2ed44

+ 15 - 0
pom.xml

@@ -263,6 +263,17 @@
             <artifactId>dysmsapi20170525</artifactId>
             <version>2.0.9</version>
         </dependency>
+        <dependency>
+            <groupId>adi</groupId>
+            <artifactId>vtk</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+        <!--        websocket -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+
     </dependencies>
 
     <dependencyManagement>
@@ -275,6 +286,10 @@
                 <scope>import</scope>
             </dependency>
 
+
+
+
+
         </dependencies>
     </dependencyManagement>
 

+ 5 - 5
profiles/dev/application-dev.properties

@@ -111,13 +111,13 @@ mf.mail.nickname=\u670D\u52A1
 
 #=======================MQ\u914D\u7F6E================================
 # ip
-spring.rabbitmq.host=192.168.0.140
+spring.rabbitmq.host=192.168.0.43
 # \u7AEF\u53E3
-spring.rabbitmq.port=5677
+spring.rabbitmq.port=5672
 # \u7528\u6237\u540D
-spring.rabbitmq.username=xitech
+spring.rabbitmq.username=admin
 # \u5BC6\u7801
-spring.rabbitmq.password=xitech
+spring.rabbitmq.password=admin
 # \u914D\u7F6E\u865A\u62DF\u673A
 spring.rabbitmq.virtual-host=/
 
@@ -144,7 +144,7 @@ mf.mq.baseName=BLOCK
 mf.mq.global=
 
 #enable mq,\u9ED8\u8BA4\u4E3Afalse
-mf.mq.enable=false
+mf.mq.enable=true
 
 #======================================================
 #\u4F7F\u80FD aop\u65E5\u5FD7\u4E8B\u4EF6 ,\u9ED8\u8BA4\u4E3Atrue

+ 4 - 4
profiles/dev/userconfig/mq.properties

@@ -1,6 +1,6 @@
-host=192.168.0.140
-port=5677
-username=xitech
-passwd=xitech
+host=192.168.0.43
+port=5672
+username=admin
+passwd=admin
 virtualost=/
 queuenamebase=bvr-1

+ 89 - 0
src/main/java/com/miniframe/CellCenters.java

@@ -0,0 +1,89 @@
+package com.miniframe;
+
+import vtk.vtkActor;
+import vtk.vtkNativeLibrary;
+import vtk.vtkRenderWindow;
+import vtk.vtkRenderWindowInteractor;
+import vtk.vtkRenderer;
+import vtk.vtkImageData;
+import vtk.vtkCellCenters;
+import vtk.vtkDataSetMapper;
+
+
+public class CellCenters
+{
+    // -----------------------------------------------------------------
+    // Load VTK library and print which library was not properly loaded
+    static
+    {
+        if (!vtkNativeLibrary.LoadAllNativeLibraries())
+        {
+            for (vtkNativeLibrary lib : vtkNativeLibrary.values())
+            {
+                if (!lib.IsLoaded())
+                {
+                    System.out.println(lib.GetLibraryName() + " not loaded");
+                }
+            }
+        }
+        vtkNativeLibrary.DisableOutputWindow(null);
+    }
+    // -----------------------------------------------------------------
+
+    public static void main(String args[])
+    {
+        // Create an image data
+        vtkImageData imageData = new vtkImageData();
+
+        // Specify the size of the image data
+        imageData.SetDimensions(3,3,3); //尺寸
+        imageData.SetSpacing(1.0, 1.0, 1.0);//间距
+        imageData.SetOrigin(0.0, 0.0, 0.0);//原点
+
+        vtkCellCenters cellCentersFilter = new vtkCellCenters();//中心点过滤器
+        cellCentersFilter.SetInputData(imageData);
+        cellCentersFilter.VertexCellsOn();//顶点单元开启
+        cellCentersFilter.Update();
+
+        // Access the cell centers
+        for(int i = 0; i < cellCentersFilter.GetOutput().GetNumberOfPoints(); i++)
+        {
+            double p[] = new double[3];
+            cellCentersFilter.GetOutput().GetPoint(i, p);
+            System.out.print("Point " + " " + i + " : " + " " +  p[0] + " , " + p[1] + " , " + p[2] + "\n");
+        }
+
+        // Display the cell centers
+        vtkDataSetMapper centerMapper = new vtkDataSetMapper();
+        centerMapper.SetInputConnection(cellCentersFilter.GetOutputPort());
+
+        vtkActor centerActor = new vtkActor();
+        centerActor.SetMapper(centerMapper);
+
+        vtkDataSetMapper mapper = new vtkDataSetMapper();
+        mapper.SetInputData(imageData);
+
+        vtkActor actor = new vtkActor();
+        actor.SetMapper(mapper);
+        actor.GetProperty().SetRepresentationToWireframe();//只显示边
+
+        // Create the renderer, render window and interactor.
+        vtkRenderer ren = new vtkRenderer();
+        vtkRenderWindow renWin = new vtkRenderWindow();
+        renWin.AddRenderer(ren);
+        vtkRenderWindowInteractor iren = new vtkRenderWindowInteractor();
+        iren.SetRenderWindow(renWin);
+
+        // Visualize
+        ren.AddActor(actor);
+        ren.AddActor(centerActor);
+
+        renWin.SetWindowName("中心点过滤器");
+        renWin.SetSize(300, 300);
+        renWin.Render();
+
+        iren.Initialize();
+        iren.Start();
+    }
+}
+

+ 90 - 0
src/main/java/com/miniframe/service/mq/CaeMQ.java

@@ -0,0 +1,90 @@
+package com.miniframe.service.mq;
+
+import com.miniframe.spring.mq.*;
+import com.rabbitmq.client.Channel;
+import org.springframework.amqp.core.AcknowledgeMode;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MQ 创建时会根据enableDlx值决定是否创建 队列对应的DLX队列,用于队列超时后进入DLX。
+ * MQ消息处理,获取队列名称
+ * getQueueNameArray:当前实例 所有队列名称
+ * getQueueNameArrayInDataExchange :当前实例 所有数据交换机上队列名称
+ * getQueueNameArrayInDlxExchange :当前实例 所有DLX交换机上的队列名称
+ * getQueueLocalNameArray : 当前实例 所有本地服务定义的交换机上队列名称(如传入参数 ,选择参数对应的队列)
+ * getQueueLocalNameArrayInDataExchange :当前实例 所有本地服务定义的数据换机上队列名称
+ * getQueueLocalNameArrayInDlxExchange :当前实例 所有本地服务定义的DLX交换机上队列名称
+ * getQueueGlobalNameArray : 当前实例 所有全局定义的交换机上队列名称(如传入参数 ,选择参数对应的队列)
+ * getQueueGlobalNameArrayInDataExchange :当前实例 所有全局定义的数据换机上队列名称
+ * getQueueGlobalNameArrayInDlxExchange :当前实例 所有全局定义的DLX交换机上队列名称
+ * getQueueNameWithIndex : 当前实例 按编号获取队列
+ *
+ * 动态设置
+ * declareListenerInfo()函数 设置listenerInfo信息
+ * declareListenerQueueLocal()函数 设置当前实例需要监听的本地服务队列,不设置不会监听,设置null,监听当前实例添加的全部本地服务队列
+ * declareListenerQueueGlobal()函数 设置当前实例需要监听的全局队列,不设置不会监听,设置null,监听当前实例添加的全部全局队列
+ * setLocalMqOp setGlobalMqOp 设置队列的处理函数 ,setDefOp 设置默认处理函数
+ *
+ * declareQueue 设置需要添加的队列信息 MFMqInfo
+ * 发送数据
+ * sendData不传name ,默认发送 index=0的队列
+ */
+
+@Component
+@ConditionalOnProperty(prefix = "mf.mq", name = "enable", havingValue = "true", matchIfMissing = false)
+public class CaeMQ extends AbstractMFMqDynamic {
+
+    public CaeMQ(MFMqConfig mfMqConfig) {
+        super(mfMqConfig);
+        setLocalMqOp(new CaeMqOp(),"caedyout");//设置监听
+//        setDefOp(new DefMqOp());//设置默认监听
+    }
+
+    @Override
+    public MFMqListenerInfo declareListenerInfo() {
+        return new MFMqListenerInfo(AcknowledgeMode.MANUAL);
+    }
+
+    @Override
+    public String[] declareListenerQueueLocal(){
+        return new String[]{"caedyout","caedyDlx"};
+    }
+
+
+    @Override
+    public List<MFMqInfo> declareQueue() {
+        //TTLMS: 0,不设置x-message-ttl 参数
+        //enableDlx : false 不申明dlx queue
+        List<MFMqInfo> mfMqInfoList=new ArrayList<>();
+        mfMqInfoList.add(new MFMqInfo("caedyin","caedy",30000l,true));
+        mfMqInfoList.add(new MFMqInfo("caedyout","caedy",30000l,true));
+        mfMqInfoList.add(new MFMqInfo("caedyDlx","caedy",MFMqInfo.MFMQExchangeType.LOCAL_DLX));
+        return mfMqInfoList;
+    }
+
+    /**
+     * 向未处理信息队列 发送消息
+     * @param data
+     */
+    public void sendDynamic(String data){
+        sendData("caedyin",data);
+    }
+
+
+    /**
+     * 返回消息队列 数据处理
+     */
+    public static class CaeMqOp implements IMFMqOp {
+        @Override
+        public boolean handle(String data, long deliveryTag, Map<String, Object> headers, Channel channel) throws Exception {
+            System.out.println("Queue : "+getAMQPConsumerQueue(headers)+", Data : "+ data);
+            return true;
+        }
+    }
+
+}

+ 4 - 2
src/main/java/com/miniframe/service/mq/XIMQDynamic.java

@@ -46,8 +46,8 @@ public class XIMQDynamic extends AbstractMFMqDynamic {
     public XIMQDynamic(MFMqConfig mfMqConfig) {
         super(mfMqConfig);
 
-        setLocalMqOp(new DynamicMqOp(),"Dynamic01","Dynamic02");
-        setDefOp(new DefMqOp());
+        setLocalMqOp(new DynamicMqOp(),"Dynamic01","Dynamic02");//设置监听
+        setDefOp(new DefMqOp());//设置默认监听
     }
 
     @Override
@@ -77,6 +77,8 @@ public class XIMQDynamic extends AbstractMFMqDynamic {
         mfMqInfoList.add(new MFMqInfo("DATADLX","data",MFMqInfo.MFMQExchangeType.GLOBAL_DLX));
 
 
+
+
         return mfMqInfoList;
     }
 

+ 43 - 0
src/main/java/com/miniframe/websocket/HandShakeInterceptor.java

@@ -0,0 +1,43 @@
+package com.miniframe.websocket;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.server.ServerHttpRequest;
+import org.springframework.http.server.ServerHttpResponse;
+import org.springframework.http.server.ServletServerHttpRequest;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
+
+/**
+ * 握手拦截器
+
+  WebSocket握手拦截器用来拦截和处理客户端和服务器端分别在握手前和握手后的事件
+ * @author Administrator
+ *
+ */
+public class HandShakeInterceptor extends HttpSessionHandshakeInterceptor {
+	private Logger logger =LoggerFactory.getLogger(HandShakeInterceptor.class);
+    @Override
+    public boolean beforeHandshake(ServerHttpRequest request,
+            ServerHttpResponse response, WebSocketHandler wsHandler,
+            Map<String, Object> attributes) throws Exception {
+        logger.info("GOMA ===> Before Handshake");
+        if (request instanceof ServletServerHttpRequest) {
+			ServletServerHttpRequest serverHttpRequest = (ServletServerHttpRequest) request;
+			// 获取参数
+			String projectId = serverHttpRequest.getServletRequest().getParameter("projectId");
+			attributes.put("currentUser", projectId);
+		}
+        return super.beforeHandshake(request, response, wsHandler, attributes);
+    }
+ 
+    @Override
+    public void afterHandshake(ServerHttpRequest request,
+            ServerHttpResponse response, WebSocketHandler wsHandler,
+            Exception ex) {
+    	logger.info("GOMA ===> After Handshake");
+		super.afterHandshake(request, response, wsHandler, ex);
+	}
+}

+ 34 - 0
src/main/java/com/miniframe/websocket/WebSocketConfig.java

@@ -0,0 +1,34 @@
+package com.miniframe.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+/**
+ * 
+ * @author Goma
+ *
+ */
+@Configuration
+@EnableWebSocket
+public class WebSocketConfig implements WebSocketConfigurer {
+ 
+	@Override
+	public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+		registry.addHandler(myhandler(), "/websocket").addInterceptors(myInterceptors()).setAllowedOrigins("*");
+		registry.addHandler(myhandler(), "/sockjs/websocket").addInterceptors(myInterceptors()).withSockJS();
+	}
+ 
+	@Bean
+	public WebSocketHandler myhandler() {
+		return new WebsocketEndPoint();
+	}
+ 
+	@Bean
+	public HandShakeInterceptor myInterceptors() {
+		return new HandShakeInterceptor();
+	}
+}

+ 153 - 0
src/main/java/com/miniframe/websocket/WebsocketEndPoint.java

@@ -0,0 +1,153 @@
+package com.miniframe.websocket;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketMessage;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+
+/**
+ * 这个类处理来之浏览器(客户端)的WebSocket请求。在这个例子中,我们创建一个叫WebSocketEndPoint的类,并让它集成TextWebsocketHandler类。
+ * 然后重写父类方法handlerTextMessage(),每当客户端发送信息过来,都会由这个函数接收并处理。
+ * 当然这里还可以重写其他方法,如afterConnectionEstablished、afterConnectionClosed、handleTransportError
+ * 等等
+ * 
+ * @author Administrator
+ *
+ */
+@Service
+public class WebsocketEndPoint extends TextWebSocketHandler {
+
+	@Autowired
+
+	private final static Map<String, WebSocketSession> userMap = new ConcurrentHashMap<String, WebSocketSession>();
+
+	/**
+	 * 关闭websocket时调用该方法
+	 * 
+	 * @see org.springframework.web.socket.WebSocketHandler#afterConnectionClosed(org.springframework.web.socket.WebSocketSession,
+	 *      org.springframework.web.socket.CloseStatus)
+	 */
+	@Override
+	public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+		String projectId = this.getProjectId(session);
+		if (!StringUtils.isEmpty(projectId)) {
+			userMap.remove(projectId);
+			System.err.println("该" + projectId + "用户已成功关闭");
+		} else {
+			System.err.println("关闭时,获取用户id为空");
+		}
+
+	}
+
+	/**
+	 * 建立websocket连接时调用该方法
+	 * 
+	 * org.springframework.web.socket.WebSocketHandler#afterConnectionEstablished(org.springframework.web.socket.WebSocketSession)
+	 */
+	@Override
+	public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+		String projectId = this.getProjectId(session);
+		if (!StringUtils.isEmpty(projectId)) {
+			userMap.put(projectId, session);
+			session.sendMessage(new TextMessage("建立服务端连接成功!"));
+		}
+
+	}
+
+	/**
+	 * 客户端调用websocket.send时候,会调用该方法,进行数据通信
+	 * 
+	 * org.springframework.web.socket.WebSocketHandler#handleMessage(org.springframework.web.socket.WebSocketSession,
+	 * org.springframework.web.socket.WebSocketMessage)
+	 */
+	@Override
+	public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
+		String msg = message.getPayload().toString();
+		String projectId = this.getProjectId(session);
+		System.err.println("该" + projectId + "用户发送的消息是:" + msg);
+		message = new TextMessage("服务端已经接收到消息,msg=" + msg);
+		session.sendMessage(message);
+	}
+
+	/**
+	 * 传输过程出现异常时,调用该方法
+	 * 
+	 * org.springframework.web.socket.WebSocketHandler#handleTransportError(org.springframework.web.socket.WebSocketSession,
+	 * java.lang.Throwable)
+	 */
+	@Override
+	public void handleTransportError(WebSocketSession session, Throwable e) throws Exception {
+		WebSocketMessage<String> message = new TextMessage("异常信息:" + e.getMessage());
+		session.sendMessage(message);
+	}
+
+	/**
+	 * org.springframework.web.socket.WebSocketHandler#supportsPartialMessages()
+	 */
+	@Override
+	public boolean supportsPartialMessages() {
+
+		return false;
+	}
+
+	/**
+	 * sendMessageToUser:发给指定用户
+	 * 
+	 */
+	public void sendMessageToUser(String projectId, String contents) {
+		WebSocketSession session = userMap.get(projectId);
+		if (session != null && session.isOpen()) {
+			try {
+				TextMessage message = new TextMessage(contents);
+				session.sendMessage(message);
+			} catch (IOException e) {
+				//e.printStackTrace();
+			}
+		}
+	}
+
+	/**
+	 * sendMessageToAllUsers:发给所有的用户
+	 * 
+	 */
+	public void sendMessageToAllUsers(String contents) {
+		Set<String> projectIds = userMap.keySet();
+		for (String projectId : projectIds) {
+			this.sendMessageToUser(projectId, contents);
+		}
+	}
+
+	/**
+	 * getvdioId:获取用户id
+	 * 
+	 * @author liuchao
+	 * @param session
+	 * @return
+	 * @since JDK 1.7
+	 */
+	private String getProjectId(WebSocketSession session) {
+		try {
+			String projectId = (String) session.getAttributes().get("currentUser");
+			return projectId;
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+		return null;
+	}
+}

+ 17 - 0
src/test/java/com/miniframe/MqTest.java

@@ -0,0 +1,17 @@
+package com.miniframe;
+
+import com.miniframe.service.mq.XIMQDynamic;
+import com.miniframe.spring.mq.MFMqUtils;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest(classes = Application.class)
+public class MqTest {
+
+
+    @Test
+    void test(){
+        MFMqUtils.get(XIMQDynamic.class).sendData("asdfasdf");
+    }
+}