package com.zcxk.tcpserver.tcp; import com.zcxk.tcpserver.tcp.generator.ConcentratorDemo; import com.zcxk.tcpserver.tcp.generator.ProtocolGenerator; import com.zcxk.tcpserver.util.ConvertCode; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; @Component @Qualifier("tcpSocketHandler") @ChannelHandler.Sharable @Slf4j public class TcpSocketHandler extends ChannelInboundHandlerAdapter { //获取现有通道,一个通道channel就是一个socket链接在这里 public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("客户端" + ctx.channel().id().asLongText() + "连接服务器"); channels.add(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte [] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes);//复制内容到字节数组bytes String receiveStr = ConvertCode.receiveHexToString(bytes); log.info("服务端收到消息:" + receiveStr); Integer result = ProtocolGenerator.analysis(receiveStr); if(result == 1){ String re1 = ConcentratorDemo.login(); writeToClient(re1,ctx,"登录"); } if(result == 2){ String re = ConcentratorDemo.heartbeat(); writeToClient(re,ctx,"心跳"); } } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { String channelIdLongText = ctx.channel().id().asLongText(); log.info("客户端" + channelIdLongText + "断开"); channels.remove(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { String channelIdLongText = ctx.channel().id().asLongText(); log.info("客户端" + channelIdLongText + "异常"); cause.printStackTrace(); channels.remove(ctx.channel()); ctx.close(); } private void writeToClient(final String receiveStr, ChannelHandlerContext channel, final String mark) { try { ByteBuf bufff = Unpooled.buffer();//netty需要用ByteBuf传输 bufff.writeBytes(ConvertCode.hexString2Bytes(receiveStr));//对接需要16进制 channel.writeAndFlush(bufff).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { StringBuilder sb = new StringBuilder(""); if(!StringUtils.isEmpty(mark)){ sb.append("【").append(mark).append("】"); } if (future.isSuccess()) { log.info(sb.toString()+"回写成功"+receiveStr); } else { log.error(sb.toString()+"回写失败"+receiveStr); } } }); } catch (Exception e) { e.printStackTrace(); System.out.println("调用通用writeToClient()异常"+e.getMessage()); log.error("调用通用writeToClient()异常:",e); } } }