TcpSocketHandler.java 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package com.zcxk.tcpserver.tcp;
  2. import com.zcxk.tcpserver.tcp.generator.ConcentratorDemo;
  3. import com.zcxk.tcpserver.tcp.generator.ProtocolGenerator;
  4. import com.zcxk.tcpserver.util.ConvertCode;
  5. import io.netty.buffer.ByteBuf;
  6. import io.netty.buffer.Unpooled;
  7. import io.netty.channel.*;
  8. import io.netty.channel.group.ChannelGroup;
  9. import io.netty.channel.group.DefaultChannelGroup;
  10. import io.netty.util.concurrent.GlobalEventExecutor;
  11. import lombok.extern.slf4j.Slf4j;
  12. import org.apache.commons.lang3.StringUtils;
  13. import org.springframework.beans.factory.annotation.Qualifier;
  14. import org.springframework.stereotype.Component;
  15. @Component
  16. @Qualifier("tcpSocketHandler")
  17. @ChannelHandler.Sharable
  18. @Slf4j
  19. public class TcpSocketHandler extends ChannelInboundHandlerAdapter {
  20. //获取现有通道,一个通道channel就是一个socket链接在这里
  21. public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
  22. @Override
  23. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  24. }
  25. @Override
  26. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  27. log.info("客户端" + ctx.channel().id().asLongText() + "连接服务器");
  28. channels.add(ctx.channel());
  29. }
  30. @Override
  31. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  32. ByteBuf buf = (ByteBuf)msg;
  33. byte [] bytes = new byte[buf.readableBytes()];
  34. buf.readBytes(bytes);//复制内容到字节数组bytes
  35. String receiveStr = ConvertCode.receiveHexToString(bytes);
  36. log.info("服务端收到消息:" + receiveStr);
  37. Integer result = ProtocolGenerator.analysis(receiveStr);
  38. if(result == 1){
  39. String re1 = ConcentratorDemo.login();
  40. writeToClient(re1,ctx,"登录");
  41. }
  42. if(result == 2){
  43. String re = ConcentratorDemo.heartbeat();
  44. writeToClient(re,ctx,"心跳");
  45. }
  46. }
  47. @Override
  48. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  49. String channelIdLongText = ctx.channel().id().asLongText();
  50. log.info("客户端" + channelIdLongText + "断开");
  51. channels.remove(ctx.channel());
  52. }
  53. @Override
  54. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  55. String channelIdLongText = ctx.channel().id().asLongText();
  56. log.info("客户端" + channelIdLongText + "异常");
  57. cause.printStackTrace();
  58. channels.remove(ctx.channel());
  59. ctx.close();
  60. }
  61. private void writeToClient(final String receiveStr, ChannelHandlerContext channel, final String mark) {
  62. try {
  63. ByteBuf bufff = Unpooled.buffer();//netty需要用ByteBuf传输
  64. bufff.writeBytes(ConvertCode.hexString2Bytes(receiveStr));//对接需要16进制
  65. channel.writeAndFlush(bufff).addListener(new ChannelFutureListener() {
  66. @Override
  67. public void operationComplete(ChannelFuture future) throws Exception {
  68. StringBuilder sb = new StringBuilder("");
  69. if(!StringUtils.isEmpty(mark)){
  70. sb.append("【").append(mark).append("】");
  71. }
  72. if (future.isSuccess()) {
  73. log.info(sb.toString()+"回写成功"+receiveStr);
  74. } else {
  75. log.error(sb.toString()+"回写失败"+receiveStr);
  76. }
  77. }
  78. });
  79. } catch (Exception e) {
  80. e.printStackTrace();
  81. System.out.println("调用通用writeToClient()异常"+e.getMessage());
  82. log.error("调用通用writeToClient()异常:",e);
  83. }
  84. }
  85. }