package com.qmtt.server;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;@Servicepublic class NettyServer { ???private static final Logger log = LoggerFactory.getLogger(NettyServer.class); ???EventLoopGroup bossGroup; ???EventLoopGroup workGroup; ???Channel channel; ???// public static void main(String[] args) { ???// new NettyServer().run(); ???// } ???@PostConstruct ???public void run() { ???????log.info("启动netty"); ???????bossGroup = new NioEventLoopGroup(); ???????workGroup = new NioEventLoopGroup(); ???????try { ???????????ServerBootstrap b = new ServerBootstrap(); ???????????b.group(bossGroup, workGroup); ???????????b.channel(NioServerSocketChannel.class); ???????????b.childHandler(new ChildChannelHandler()); ???????????channel = b.bind(7397).sync().channel(); ???????????// channel.closeFuture().sync(); ???????} catch (Exception e) { ???????????log.error("", e); ???????} finally { ???????????// bossGroup.shutdownGracefully(); ???????????// workGroup.shutdownGracefully(); ???????} ???} ???@PreDestroy ???public void stop() { ???????log.info("关闭netty"); ???????if (null == channel) { ???????????log.error("server channel is null"); ???????} ???????bossGroup.shutdownGracefully(); ???????workGroup.shutdownGracefully(); ???????channel.closeFuture().syncUninterruptibly(); ???????bossGroup = null; ???????workGroup = null; ???????channel = null; ???}}
package com.qmtt.server;import java.util.Hashtable;import java.util.Iterator;import java.util.Map;import java.util.Map.Entry;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.data.redis.core.RedisTemplate;import com.qmtt.tools.JsonUtils;import com.qmtt.tools.SpringUtil;import com.qmtt.websocket.GameFunction2;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.DefaultFullHttpResponse;import io.netty.handler.codec.http.FullHttpRequest;import io.netty.handler.codec.http.HttpResponseStatus;import io.netty.handler.codec.http.HttpVersion;import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;import io.netty.handler.codec.http.websocketx.WebSocketFrame;import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;import io.netty.util.CharsetUtil;public class MyWebSocket2 extends SimpleChannelInboundHandler<Object> { ???private static final Logger log = LoggerFactory.getLogger(MyWebSocket2.class); ???private WebSocketServerHandshaker handshaker; ???private static Map<String, ChannelHandlerContext> webSocketMap = new Hashtable<String, ChannelHandlerContext>(); ???public GameFunction2 gameFunction; ???RedisTemplate redisTemplate; ???@Override ???public void channelActive(ChannelHandlerContext ctx) throws Exception { ???????log.info("客户端与服务端连接开启"); ???????gameFunction = SpringUtil.getBean(GameFunction2.class); ???????redisTemplate = (RedisTemplate) SpringUtil.getBean("redisTemplate"); ???????// 添加 ???????// Global.group.add(ctx.channel()); ???} ???@Override ???public void channelInactive(ChannelHandlerContext ctx) throws Exception { ???????// 移除 ???????// Global.group.remove(ctx.channel()); ???????log.info("客户端与服务端连接关闭"); ???????String key = null; ???????Iterator iterator = webSocketMap.entrySet().iterator(); ???????while (iterator.hasNext()) { ???????????Map.Entry<String, ChannelHandlerContext> entry = (Entry<String, ChannelHandlerContext>) iterator.next(); ???????????key = entry.getKey(); ???????????if (entry.getValue().equals(ctx)) { ???????????????key = entry.getKey(); ???????????????break; ???????????} ???????} ???????log.info("<{}>断开连接", key); ???????if (key != null) { ???????????webSocketMap.remove(key); ???????} ???????gameFunction.close(key); ???} ???@Override ???protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { ???????if (msg instanceof FullHttpRequest) { ???????????handleHttpRequest(ctx, ((FullHttpRequest) msg)); ???????} else if (msg instanceof WebSocketFrame) { ???????????handlerWebSocketFrame(ctx, (WebSocketFrame) msg); ???????} ???} ???@Override ???public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ???????ctx.flush(); ???} ???private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { ???????// 判断是否关闭链路的指令 ???????if (frame instanceof CloseWebSocketFrame) { ???????????log.info("连接开闭"); ???????????handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); ???????????return; ???????} ???????// 判断是否ping消息 ???????if (frame instanceof PingWebSocketFrame) { ???????????ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); ???????????return; ???????} ???????// 本例程仅支持文本消息,不支持二进制消息 ???????if (!(frame instanceof TextWebSocketFrame)) { ???????????log.info("不支持二进制消息"); ???????????return; ???????} ???????// 返回应答消息 ???????String message = ((TextWebSocketFrame) frame).text(); ???????if (!message.contains("msgType")) { ???????????return; ???????} ???????log.info("服务端收到消息:" + message); ???????try { ???????????Map map = JsonUtils.json2map(message); ???????????String msgTpye = map.get("msgType").toString(); ???????????String openid = map.get("openid").toString(); ???????????// 开始游戏 ???????????if (msgTpye.equals("start")) { ???????????????webSocketMap.put(openid, ctx); ???????????????// String rankValue = map.get("rankValue").toString(); ???????????????gameFunction.joinGame(openid); ???????????????return; ???????????} ???????????// 回答问题 ???????????if (msgTpye.equals("answer")) { ???????????????gameFunction.answer(map); ???????????????return; ???????????} ???????????// 游戏结束 ???????????if (msgTpye.equals("gameover")) { ???????????????gameFunction.gameover(map); ???????????????return; ???????????} ???????????// 发出邀请等待对手 ???????????if (msgTpye.equals("wait")) { ???????????????webSocketMap.put(openid, ctx); ???????????????gameFunction.waitEnter(openid); ???????????????return; ???????????} ???????????// 发出邀请对手进入 ???????????if (msgTpye.equals("waitEnter")) { ???????????????webSocketMap.put(openid, ctx); ???????????????String inviteOpenid = (String) map.get("inviteOpenid"); ???????????????// 要判断用户是否已经开始在玩游戏 了,是否已经离开 ???????????????gameFunction.checkUserStatus(openid, inviteOpenid); ???????????????return; ???????????} ???????????// 发出邀请对手进入 ???????????if (msgTpye.equals("waitStart")) { ???????????????gameFunction.waitStart(openid); ???????????????return; ???????????} ???????????// 再来一局 ???????????if (msgTpye.equals("playAgain")) { ???????????????gameFunction.playAgain(openid); ???????????????return; ???????????} ???????} catch (Exception e) { ???????????log.error("", e); ???????} ???????// TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() ???????// + ctx.channel().id() + ":" + request); ???????// // 群发 ???????// Global.group.writeAndFlush(tws); ???????// 返回【谁发的发给谁】 ???????// ctx.channel().writeAndFlush(tws); ???} ???private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { ???????if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { ???????????sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); ???????????return; ???????} ???????// 注意,这条地址别被误导了,其实这里填写什么都无所谓,WS协议消息的接收不受这里控制 ???????// 消息分发可以通过Req中获取uri处理 ???????// WebSocketServerHandshakerFactory wsFactory = new ???????// WebSocketServerHandshakerFactory("ws://127.0.0.1:7397/websocket", ???????// null, ???????// false); ???????WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("", null, false); ???????handshaker = wsFactory.newHandshaker(req); ???????if (handshaker == null) { ???????????WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); ???????} else { ???????????handshaker.handshake(ctx.channel(), req); ???????} ???} ???private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { ???????// 返回应答给客户端 ???????if (res.getStatus().code() != 200) { ???????????ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); ???????????res.content().writeBytes(buf); ???????????buf.release(); ???????} ???????// 如果是非Keep-Alive,关闭连接 ???????ChannelFuture f = ctx.channel().writeAndFlush(res); ???????if (!isKeepAlive(req) || res.getStatus().code() != 200) { ???????????f.addListener(ChannelFutureListener.CLOSE); ???????} ???} ???private static boolean isKeepAlive(FullHttpRequest req) { ???????return false; ???} ???@Override ???public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ???????cause.printStackTrace(); ???????ctx.close(); ???} ???public static Map<String, ChannelHandlerContext> getWebSocketMap() { ???????return webSocketMap; ???} ???public static int sendMsg(String id, Object msg) { ???????try { ???????????String ret = JsonUtils.toJsonStringIgnoreNull(msg); ???????????ChannelHandlerContext socket = MyWebSocket2.getWebSocketMap().get(id); ???????????if (socket != null) { ???????????????log.info("给<{}>发送消息:{}", id, ret); ???????????????socket.writeAndFlush(new TextWebSocketFrame(ret)); ???????????????return 1; ???????????} else { ???????????????log.info("<{}>连接不存在,不处理", id); ???????????} ???????} catch (Exception ex) { ???????????log.error("", ex); ???????} ???????return 0; ???} ???public static int sendMsg(String id, String msg) { ???????try { ???????????ChannelHandlerContext socket = MyWebSocket2.getWebSocketMap().get(id); ???????????if (socket != null) { ???????????????log.info("给<{}>发送消息:{}", id, msg); ???????????????socket.writeAndFlush(new TextWebSocketFrame(msg)); ???????????????return 1; ???????????} else { ???????????????log.info("连接不存在,不处理"); ???????????} ???????} catch (Exception ex) { ???????????log.error("", ex); ???????} ???????return 0; ???}}
package com.qmtt.server;import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.stream.ChunkedWriteHandler;public class ChildChannelHandler extends ChannelInitializer<SocketChannel> { ???@Override ???protected void initChannel(SocketChannel e) throws Exception { ???????e.pipeline().addLast("http-codec", new HttpServerCodec()); ???????e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ???????e.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); ???????e.pipeline().addLast("handler", new MyWebSocket2()); ???}}
此代码为诗词荣耀websocket的实现,解决了tomcat实现的websocket连接不稳定的问题
Netty实现WebSocket
原文地址:https://www.cnblogs.com/wujf/p/9087394.html