在一个标准群集场景中,节点通过一个数据包发送到协定好的多播IP地址:Port上,建立起通信。比如使用TCP插头。
【使用Servlet模拟群集场景】
【1.连接上@ServerEndPoint】
【节点做的事】
//ws://localhost:8080/cluster/clusterNodeSocket/clusterNode1/query
URI uri = new URI("ws", "localhost:8080", path, null, null);
//连接上websocket
this.session = ContainerProvider.getWebSocketContainer()
???????.connectToServer(this, uri);
【Server做的事】
2 ????public void onOpen(Session session, @PathParam("nodeId") String nodeId) 3 ????{ 8 ????????ClusterMessage message = new ClusterMessage(nodeId, "Joined the cluster.");11 ????????????//通知所有节点 有新的节点加入 ?因为这是在onOpen发生的,也就是终端连接上的代表加入12 ????????byte[] bytes = ClusterNodeEndpoint.toByteArray(message);13 ????????for(Session node : ClusterNodeEndpoint.nodes)14 ????????????????//发送ByteBuffer15 ????????????????node.getBasicRemote().sendBinary(ByteBuffer.wrap(bytes));22 ????????ClusterNodeEndpoint.nodes.add(session);23 ????}
【2.Servlet负责路由请求和接收消息、Server负责传递给其他节点消息】
【节点处理get请求】
2 ????protected void doGet(HttpServletRequest request, HttpServletResponse response) 3 ????????????throws ServletException, IOException 4 ????{ 6 ????????//构造Message准备发给节点 7 ????????ClusterMessage message = new ClusterMessage(this.nodeId, 8 ????????????????"request:{ip:\"" + request.getRemoteAddr() + 9 ????????????????"\",queryString:\"" + request.getQueryString() + "\"}");10 11 ????????//使用序列化机制发送消息12 ????????try(OutputStream output = this.session.getBasicRemote().getSendStream();13 ????????????ObjectOutputStream stream = new ObjectOutputStream(output))14 ????????{15 ????????????stream.writeObject(message);16 ????????}17 ????????response.getWriter().append("OK");18 ????}
【节点接收消息】
1 ?@OnMessage 2 ????public void onMessage(InputStream input) 3 ????{ 4 ????????try(ObjectInputStream stream = new ObjectInputStream(input)) 5 ????????{ 6 ????????????ClusterMessage message = (ClusterMessage)stream.readObject(); 7 ????????????System.out.println("INFO (Node " + this.nodeId + 8 ????????????????????"): Message received from cluster; node = " + 9 ????????????????????message.getNodeId() + ", message = " + message.getMessage());10 ????????}11 ????????catch(IOException | ClassNotFoundException e)12 ????????{13 ????????????e.printStackTrace();14 ????????}15 ????}
【Server传递给其他节点消息】
1 @OnMessage 2 ????public void onMessage(Session session, byte[] message) 3 ????{ 4 ????????try 5 ????????{ 6 ????????????for(Session node : ClusterNodeEndpoint.nodes) 7 ????????????{ 8 ????????????????//向其他节点发送消息(消息来自当前节点) 9 ????????????????if(node != session)11 ????????????????????node.getBasicRemote().sendBinary(ByteBuffer.wrap(message));12 ????????????}13 ????????}14 ????????catch(IOException e)15 ????????{16 ????????????System.err.println("ERROR: Exception when handling message on server");17 ????????????e.printStackTrace();18 ????????}19 ????}
使用WebSocket帮助应用程序群集节点间通信
原文地址:https://www.cnblogs.com/chenhui7373/p/8654592.html