123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- package com.zcxk.tcpserver.tcp;
- import com.zcxk.tcpserver.tcp.generator.ConcentratorDemo;
- import com.zcxk.tcpserver.tcp.generator.ProtocolGenerator;
- import com.zcxk.tcpserver.util.ConvertCode;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.Unpooled;
- import io.netty.channel.*;
- import io.netty.channel.group.ChannelGroup;
- import io.netty.channel.group.DefaultChannelGroup;
- import io.netty.util.concurrent.GlobalEventExecutor;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.stereotype.Component;
- @Component
- @Qualifier("tcpSocketHandler")
- @ChannelHandler.Sharable
- @Slf4j
- public class TcpSocketHandler extends ChannelInboundHandlerAdapter {
- //获取现有通道,一个通道channel就是一个socket链接在这里
- public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- }
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- log.info("客户端" + ctx.channel().id().asLongText() + "连接服务器");
- channels.add(ctx.channel());
- }
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ByteBuf buf = (ByteBuf)msg;
- byte [] bytes = new byte[buf.readableBytes()];
- buf.readBytes(bytes);//复制内容到字节数组bytes
- String receiveStr = ConvertCode.receiveHexToString(bytes);
- log.info("服务端收到消息:" + receiveStr);
- Integer result = ProtocolGenerator.analysis(receiveStr);
- if(result == 1){
- String re1 = ConcentratorDemo.login();
- writeToClient(re1,ctx,"登录");
- }
- if(result == 2){
- String re = ConcentratorDemo.heartbeat();
- writeToClient(re,ctx,"心跳");
- }
- }
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- String channelIdLongText = ctx.channel().id().asLongText();
- log.info("客户端" + channelIdLongText + "断开");
- channels.remove(ctx.channel());
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- String channelIdLongText = ctx.channel().id().asLongText();
- log.info("客户端" + channelIdLongText + "异常");
- cause.printStackTrace();
- channels.remove(ctx.channel());
- ctx.close();
- }
- private void writeToClient(final String receiveStr, ChannelHandlerContext channel, final String mark) {
- try {
- ByteBuf bufff = Unpooled.buffer();//netty需要用ByteBuf传输
- bufff.writeBytes(ConvertCode.hexString2Bytes(receiveStr));//对接需要16进制
- channel.writeAndFlush(bufff).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- StringBuilder sb = new StringBuilder("");
- if(!StringUtils.isEmpty(mark)){
- sb.append("【").append(mark).append("】");
- }
- if (future.isSuccess()) {
- log.info(sb.toString()+"回写成功"+receiveStr);
- } else {
- log.error(sb.toString()+"回写失败"+receiveStr);
- }
- }
- });
- } catch (Exception e) {
- e.printStackTrace();
- System.out.println("调用通用writeToClient()异常"+e.getMessage());
- log.error("调用通用writeToClient()异常:",e);
- }
- }
- }
|