可选择ASCII码或者HEX

发布时间:2025-06-24 20:13:52  作者:北方职教升学中心  阅读量:054


六、
        如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题;
如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。可能遇到的问题及处理方式

        1.1  TCP粘包拆包

        TCP在接受数据的时候,有一个滑动窗口来控制接受数据的大小,这个滑动窗口你就可以理解为一个缓冲区的大小。可选择ASCII码或者HEX。将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息(推荐);

        4、

//可加入自己的数据解析器socketChannel.pipeline().addLast(new StringEncoder());socketChannel.pipeline().addLast(new StringDecoder());socketChannel.pipeline().addLast(new SimpleChannelInboundHandler()

通信时是在通道中写入数据,使用channel.writeAndFlush()方法,可同步或异步发送数据。缓冲区满了就会把数据发送。tcp客户端实现

package org.example;import com.google.common.util.concurrent.ThreadFactoryBuilder;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import lombok.Getter;import lombok.extern.log4j.Log4j2;import org.springframework.stereotype.Component;import java.net.InetSocketAddress;import java.util.HashMap;import java.util.Map;import java.util.concurrent.*;/** * tcp客户端 */@Log4j2@Componentpublic class TcpClient {    private Bootstrap bootstrap;    /**     * 第三方通道Map     * key: ip:port     */    @Getter    private Map<String, Channel> thirdChannelMap = new HashMap<>();    private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            4,            10,            30,            TimeUnit.SECONDS,            new ArrayBlockingQueue<>(100),            new ThreadPoolExecutor.DiscardPolicy());    public void init() {        // 准备netty tcp客户端        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("TCP-Client-%d").build();        NioEventLoopGroup eventExecutors = new NioEventLoopGroup(10, threadFactory);        bootstrap = new Bootstrap();        //设置相关参数 // 设置线程组        bootstrap.group(eventExecutors)                // 设置客户端通道的实现类                .channel(NioSocketChannel.class)                // 保持连接                .option(ChannelOption.TCP_NODELAY, true)                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)                .option(ChannelOption.SO_RCVBUF, 1024 * 64)                .option(ChannelOption.SO_SNDBUF, 1024 * 64)                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)                .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 64 * 1024)                .option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024)                .option(ChannelOption.SO_KEEPALIVE, true)                .handler(new ChannelInitializer<SocketChannel>() {                    @Override                    protected void initChannel(SocketChannel socketChannel) {                        socketChannel.pipeline().addLast(new StringEncoder());                        socketChannel.pipeline().addLast(new StringDecoder());                        // 加入自己业务的处理器                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {                            private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("query-thread-%d").build();                            private ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(10, threadFactory);                            @Override                            public void channelActive(ChannelHandlerContext ctx) throws Exception {                                InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();                                String ip = socketAddress.getAddress().getHostAddress();                                int port = socketAddress.getPort();                                String innerCode = ip + ":" + port;                                log.info("与第三方服务建立tcp连接成功: {}", innerCode);                                Channel channel = ctx.channel();                                channel.writeAndFlush("hello").sync();}                            @Override                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {                                InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();                                String ip = socketAddress.getAddress().getHostAddress();                                int port = socketAddress.getPort();                                if (null == msg) {                                    log.info("{}-通道无返回结果", socketAddress);                                    return;}                            }                            @Override                            public void channelInactive(ChannelHandlerContext ctx) throws Exception {                                InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();                                String ip = socketAddress.getAddress().getHostAddress();                                int port = socketAddress.getPort();                                String innerCode = ip + ":" + port;                                log.error("与第三方服务tcp连接断开: {}", innerCode);                                thirdChannelMap.remove(innerCode);                                // 重连                                connect(ip, port);}                            @Override                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {                                log.error("TCP通讯发生异常:{}", cause.getCause(), cause);}                        });                    }                });        log.info("Tcp客户端准备完成");        // 连接第三方主机        connect("192.168.126.1",501);    }    public void connect(String ip, int port) {        log.info("start connect with ip: {}, port: {}", ip, port);        bootstrap.connect(ip, port).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (future.isSuccess()) {                    String innerCode = ip + ":" + port;                    thirdChannelMap.put(innerCode, future.channel());                    log.info("connect [{}:{}] success", ip, port);} else {                    log.info("connect  [{}:{}] failed.......", ip, port);                    // 连接不成功,另起线程等待30秒后重新连接,防止阻塞netty线程                    threadPoolExecutor.execute(new Runnable() {                        @Override                        public void run() {                            try {                                log.info("wait a moment: {}s", 30);                                TimeUnit.SECONDS.sleep(30);} catch (InterruptedException e) {                                log.error(e);                            }                            connect(ip, port);                        }                    });                }            }        });    }}

在TCP客户端中添加断线重连机制,避免断线影响业务。tcp服务端的实现

package org.example.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import lombok.Data;import lombok.extern.log4j.Log4j2;import org.springframework.stereotype.Component;import java.net.InetSocketAddress;@Data@Log4j2@Componentpublic class TcpServer {    /**     * 启动TCP服务端     *     * @param ip   本地地址     * @param port 本服务端口     */    public boolean startService(String ip, int port) {        //负责接收客户端连接请求的group        EventLoopGroup bossGroup = new NioEventLoopGroup();        //负责和客户端数据交互的group        EventLoopGroup workerGroup = new NioEventLoopGroup();        InetSocketAddress addr = new InetSocketAddress(ip, port);        try {            ServerBootstrap bootstrap = new ServerBootstrap();            // 设置相关参数            bootstrap.group(bossGroup, workerGroup) // 设置线程组                    .channel(NioServerSocketChannel.class)                    .option(ChannelOption.SO_BACKLOG, 1024)                    .childOption(ChannelOption.SO_KEEPALIVE, true)                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel socketChannel) throws Exception {                            //可加入自己的数据解析器                            socketChannel.pipeline().addLast(new StringEncoder());                            socketChannel.pipeline().addLast(new StringDecoder());                            socketChannel.pipeline().addLast(new SimpleChannelInboundHandler() {                                @Override                                protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {                                    log.info("接收第三方数据message为: {}", msg);//                                    responseHandler.handMsg((String) msg);                                    Channel channel = channelHandlerContext.channel();                                    channel.writeAndFlush("hello").sync();}                                /**                                 * 客户端连接触发                                 *                                 * @param ctx                                 * @throws Exception                                 */                                @Override                                public void channelActive(ChannelHandlerContext ctx) throws Exception {                                    InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();                                    log.info("ISCS[{}] gets online", socketAddress);                                    super.channelActive(ctx);}                                /**                                 * 客户端断线触发                                 *                                 * @param ctx                                 * @throws Exception                                 */                                @Override                                public void channelInactive(ChannelHandlerContext ctx) throws Exception {                                    InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();                                    log.info("ISCS[{}] gets offline", socketAddress);                                    super.channelInactive(ctx);}                                /**                                 * 客户端连接出现异常                                 *                                 * @param ctx                                 * @param cause                                 */                                @Override                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {                                    InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();                                    log.error("connection with ISCS[{}] has error, message: {}", socketAddress, cause.getMessage(), cause);}                            });                        }                    });            bootstrap.bind(addr).sync(); //绑定端口,开始接收进来的请求            return true;        } catch (Exception e) {            log.error("创建TCP[{}:{}]服务时出现异常", ip, port, e);            return false;}    }}

 TCP服务端启动

package org.example.server;import lombok.extern.log4j.Log4j2;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.context.annotation.DependsOn;import org.springframework.core.annotation.Order;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Log4j2@Component@Order(1)public class MainAction implements ApplicationRunner {        @Autowired    private TcpServer tcpService;    @Override    public void run(ApplicationArguments args) throws Exception {                // 创建网络TCP服务端        while(!tcpService.startService("192.168.126.1", 501)){            TimeUnit.SECONDS.sleep(30000);        }        log.info("TCP服务初始化成功");    }}

在TCP服务端中需要注意的点是在socketChannel.pipeline()可加入自己的解析器,在此处我加入的是String编解码。

Channel channel = channelHandlerContext.channel();channel.writeAndFlush("hello").sync();

四、数据包的大小是不固定的,有时候比缓冲区大有时候小。

        1.2  解决方法

        对于粘包和拆包问题,常见的解决方案有四种:

        1、发送方在每个包的末尾使用固定的分隔符,例如\r\n,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的\r\n,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包,这种方式相当于做特殊标记(不推荐);

        3、

一、

二、调试工具(NetAssist)

         官方下载路径:NetAssist(网络调试助手)官方下载_NetAssist(网络调试助手)最新版v4.3.25免费下载_3DM软件

        使用网络调试助手可快速测试连接,帮助大家更快上手。概述

        Netty一个高性能IO工具包,主要用于开发HTTP服务器,HTTPS服务器,WebSocket服务器,TCP服务器,UDP服务器和在JVM管道。用Netty开发的服务器可单独运行(即:在main()函数里调用),不需要部署在类似tomcat的容器里。通过自定义协议进行粘包和拆包的处理。

五、Netty使用单线程并发模型,并围绕非阻塞NIO设计,所以性能较高。

添加依赖
<dependency>            <groupId>io.netty</groupId>            <artifactId>netty-all</artifactId>            <version>4.1.2.Final</version>        </dependency>

三、发送方在发送数据包的时候,每个包都固定长度,比如1024个字节大小,如果客户端发送的数据长度不足1024个字节,则通过补充空格的方式补全到指定长度;

        2、