可选择ASCII码或者HEX
发布时间:2025-06-24 20:13:52 作者:北方职教升学中心 阅读量:054
六、
如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题;
如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。可能遇到的问题及处理方式
1.1 TCP粘包拆包
TCP在接受数据的时候,有一个滑动窗口来控制接受数据的大小,这个滑动窗口你就可以理解为一个缓冲区的大小。可选择ASCII码或者HEX。将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息(推荐);
4、
//可加入自己的数据解析器socketChannel.pipeline().addLast(new StringEncoder());socketChannel.pipeline().addLast(new StringDecoder());socketChannel.pipeline().addLast(new SimpleChannelInboundHandler()
通信时是在通道中写入数据,使用channel.writeAndFlush()方法,可同步或异步发送数据。缓冲区满了就会把数据发送。tcp客户端实现
package org.example;import com.google.common.util.concurrent.ThreadFactoryBuilder;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import lombok.Getter;import lombok.extern.log4j.Log4j2;import org.springframework.stereotype.Component;import java.net.InetSocketAddress;import java.util.HashMap;import java.util.Map;import java.util.concurrent.*;/** * tcp客户端 */@Log4j2@Componentpublic class TcpClient { private Bootstrap bootstrap; /** * 第三方通道Map * key: ip:port */ @Getter private Map<String, Channel> thirdChannelMap = new HashMap<>(); private ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 4, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.DiscardPolicy()); public void init() { // 准备netty tcp客户端 ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("TCP-Client-%d").build(); NioEventLoopGroup eventExecutors = new NioEventLoopGroup(10, threadFactory); bootstrap = new Bootstrap(); //设置相关参数 // 设置线程组 bootstrap.group(eventExecutors) // 设置客户端通道的实现类 .channel(NioSocketChannel.class) // 保持连接 .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .option(ChannelOption.SO_RCVBUF, 1024 * 64) .option(ChannelOption.SO_SNDBUF, 1024 * 64) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 64 * 1024) .option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 32 * 1024) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) { socketChannel.pipeline().addLast(new StringEncoder()); socketChannel.pipeline().addLast(new StringDecoder()); // 加入自己业务的处理器 socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() { private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("query-thread-%d").build(); private ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(10, threadFactory); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = socketAddress.getAddress().getHostAddress(); int port = socketAddress.getPort(); String innerCode = ip + ":" + port; log.info("与第三方服务建立tcp连接成功: {}", innerCode); Channel channel = ctx.channel(); channel.writeAndFlush("hello").sync();} @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = socketAddress.getAddress().getHostAddress(); int port = socketAddress.getPort(); if (null == msg) { log.info("{}-通道无返回结果", socketAddress); return;} } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); String ip = socketAddress.getAddress().getHostAddress(); int port = socketAddress.getPort(); String innerCode = ip + ":" + port; log.error("与第三方服务tcp连接断开: {}", innerCode); thirdChannelMap.remove(innerCode); // 重连 connect(ip, port);} @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("TCP通讯发生异常:{}", cause.getCause(), cause);} }); } }); log.info("Tcp客户端准备完成"); // 连接第三方主机 connect("192.168.126.1",501); } public void connect(String ip, int port) { log.info("start connect with ip: {}, port: {}", ip, port); bootstrap.connect(ip, port).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { String innerCode = ip + ":" + port; thirdChannelMap.put(innerCode, future.channel()); log.info("connect [{}:{}] success", ip, port);} else { log.info("connect [{}:{}] failed.......", ip, port); // 连接不成功,另起线程等待30秒后重新连接,防止阻塞netty线程 threadPoolExecutor.execute(new Runnable() { @Override public void run() { try { log.info("wait a moment: {}s", 30); TimeUnit.SECONDS.sleep(30);} catch (InterruptedException e) { log.error(e); } connect(ip, port); } }); } } }); }}
在TCP客户端中添加断线重连机制,避免断线影响业务。tcp服务端的实现
package org.example.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import lombok.Data;import lombok.extern.log4j.Log4j2;import org.springframework.stereotype.Component;import java.net.InetSocketAddress;@Data@Log4j2@Componentpublic class TcpServer { /** * 启动TCP服务端 * * @param ip 本地地址 * @param port 本服务端口 */ public boolean startService(String ip, int port) { //负责接收客户端连接请求的group EventLoopGroup bossGroup = new NioEventLoopGroup(); //负责和客户端数据交互的group EventLoopGroup workerGroup = new NioEventLoopGroup(); InetSocketAddress addr = new InetSocketAddress(ip, port); try { ServerBootstrap bootstrap = new ServerBootstrap(); // 设置相关参数 bootstrap.group(bossGroup, workerGroup) // 设置线程组 .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //可加入自己的数据解析器 socketChannel.pipeline().addLast(new StringEncoder()); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception { log.info("接收第三方数据message为: {}", msg);// responseHandler.handMsg((String) msg); Channel channel = channelHandlerContext.channel(); channel.writeAndFlush("hello").sync();} /** * 客户端连接触发 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); log.info("ISCS[{}] gets online", socketAddress); super.channelActive(ctx);} /** * 客户端断线触发 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); log.info("ISCS[{}] gets offline", socketAddress); super.channelInactive(ctx);} /** * 客户端连接出现异常 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); log.error("connection with ISCS[{}] has error, message: {}", socketAddress, cause.getMessage(), cause);} }); } }); bootstrap.bind(addr).sync(); //绑定端口,开始接收进来的请求 return true; } catch (Exception e) { log.error("创建TCP[{}:{}]服务时出现异常", ip, port, e); return false;} }}
TCP服务端启动
package org.example.server;import lombok.extern.log4j.Log4j2;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.ApplicationArguments;import org.springframework.boot.ApplicationRunner;import org.springframework.context.annotation.DependsOn;import org.springframework.core.annotation.Order;import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;@Log4j2@Component@Order(1)public class MainAction implements ApplicationRunner { @Autowired private TcpServer tcpService; @Override public void run(ApplicationArguments args) throws Exception { // 创建网络TCP服务端 while(!tcpService.startService("192.168.126.1", 501)){ TimeUnit.SECONDS.sleep(30000); } log.info("TCP服务初始化成功"); }}
在TCP服务端中需要注意的点是在socketChannel.pipeline()可加入自己的解析器,在此处我加入的是String编解码。
Channel channel = channelHandlerContext.channel();channel.writeAndFlush("hello").sync();
四、数据包的大小是不固定的,有时候比缓冲区大有时候小。 1.2 解决方法
对于粘包和拆包问题,常见的解决方案有四种:
1、发送方在每个包的末尾使用固定的分隔符,例如\r\n,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的\r\n,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包,这种方式相当于做特殊标记(不推荐);
3、
一、二、调试工具(NetAssist)
官方下载路径:NetAssist(网络调试助手)官方下载_NetAssist(网络调试助手)最新版v4.3.25免费下载_3DM软件
使用网络调试助手可快速测试连接,帮助大家更快上手。概述
Netty一个高性能IO工具包,主要用于开发HTTP服务器,HTTPS服务器,WebSocket服务器,TCP服务器,UDP服务器和在JVM管道。用Netty开发的服务器可单独运行(即:在main()函数里调用),不需要部署在类似tomcat的容器里。通过自定义协议进行粘包和拆包的处理。
五、Netty使用单线程并发模型,并围绕非阻塞NIO设计,所以性能较高。
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.2.Final</version> </dependency>
三、发送方在发送数据包的时候,每个包都固定长度,比如1024个字节大小,如果客户端发送的数据长度不足1024个字节,则通过补充空格的方式补全到指定长度;
2、