逻辑
客户端捕获键盘输入然后发送给服务端
服务器接收到消息,转发给各个客户端
服务端代码
/*** 服务端* @author suibibk@qq.com*/public class NettyServer {public static void main(String[] args) throws Exception{//1、创建一个线程组,接收客户端连接EventLoopGroup bossGroup = new NioEventLoopGroup();//2、创建一个线程组,处理网络操作EventLoopGroup workerGroup = new NioEventLoopGroup();//3、创建服务端启动助手来配置参数ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)//4、设置两个线程组.channel(NioServerSocketChannel.class)//5、使用NioServerSocketChannel作为服务器端通道的实现.option(ChannelOption.SO_BACKLOG, 128)//6、设置线程队列中等待连接的个数.childOption(ChannelOption.SO_KEEPALIVE, true)//7、保持活动连接状态.childHandler(new ChannelInitializer<SocketChannel>() {//8、创建一个通道初始化对象@Overrideprotected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器sc.pipeline().addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器//9、往pipeline链中添加自定义的handlersc.pipeline().addLast(new NettyServerHandler());}});System.out.println("【服务器已经启动】");ChannelFuture cf = b.bind(9999).sync();//10、绑定端口,bind方法是异步的,sync方法是同步阻塞的//11、关闭通道,关闭线程组cf.channel().closeFuture().sync();//关闭连接(异步非阻塞)bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
/*** 服务端处理业务类* @author suibibk@qq.com**/public class NettyServerHandler extends ChannelInboundHandlerAdapter{//内部会用ConcurrentMap来维护,线程安全private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");/*** channel处于就绪状态,客户端刚上线*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();String remoteAddress = channel.remoteAddress().toString();//加入全局变量中//channelGroup.writeAndFlush(Unpooled.copiedBuffer("【客户端】"+remoteAddress+"上线啦 "+format.format(new Date()),CharsetUtil.UTF_8));channelGroup.writeAndFlush("【客户端】"+remoteAddress+"上线啦 "+format.format(new Date()));channelGroup.add(channel);//将当前channel加入到ChannelGroupSystem.out.println("【客户端】"+remoteAddress+"上线啦");}/*** channel离线*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();String remoteAddress = channel.remoteAddress().toString();channelGroup.remove(channel);channelGroup.writeAndFlush("【客户端】"+remoteAddress+"已下线 "+format.format(new Date()));System.out.println("【客户端】"+remoteAddress+"已下线");}//读取数据事件@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();String remoteAddress = channel.remoteAddress().toString();System.out.println("【客户端】"+remoteAddress+":"+msg);channelGroup.forEach(ch -> {if(ch==channel) {ch.writeAndFlush("【自己】"+remoteAddress+":"+msg);}else {ch.writeAndFlush("【客户端】"+remoteAddress+":"+msg);}});}//异常发生事件@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//日志:远程主机强迫关闭了一个现有的连接。//System.out.println(cause.getMessage());ctx.close();}}
客户端代码
/*** 网络客户端* @author suibibk@qq.com**/public class NettyClient {public static void main(String[] args)throws Exception {//1、创建一个线程组EventLoopGroup group = new NioEventLoopGroup();//2、创建客户端启动助手,完成相关配置Bootstrap b = new Bootstrap();b.group(group)//3、设置线程组.channel(NioSocketChannel.class)//4、设置客户端通道的实现类.handler(new ChannelInitializer<SocketChannel>() {//创建一个初始化通道对象@Overrideprotected void initChannel(SocketChannel sc) throws Exception {sc.pipeline().addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器sc.pipeline().addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器//6、在pipline中添加自定义的handlersc.pipeline().addLast(new NettyClientHandler());}});System.out.println("【客户端已启动】");//7、启动客户端去连接服务器端 connect方法是异步的,sync方法是同步阻塞的ChannelFuture cf =b.connect("127.0.0.1", 9999).sync();System.out.println("---"+cf.channel().remoteAddress()+"------");//循环监听用户键盘输入Scanner scanner = new Scanner(System.in);while(scanner.hasNextLine()) {String msg = scanner.nextLine();System.out.println("用户输入:"+msg);//通过channel发送到服务器端cf.channel().writeAndFlush(msg);}//8、关闭连接(异步非阻塞)cf.channel().closeFuture().sync();System.out.print("Client is end.....");}}
/*** 客户端业务处理类* @author suibibk@qq.com**/public class NettyClientHandler extends ChannelInboundHandlerAdapter{//读取数据事件@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// ByteBuf buf = (ByteBuf)msg;System.out.println(msg);}//异常发生事件@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {//日志:远程主机强迫关闭了一个现有的连接。//System.out.println(cause.getMessage());ctx.close();}}
测试
启动服务器,然后启动三个客户端,输入消息回车,会看到如下日志
【服务器已经启动】【客户端】/127.0.0.1:50576上线啦【客户端】/127.0.0.1:50600上线啦【客户端】/127.0.0.1:50631上线啦【客户端】/127.0.0.1:50576:我是客户端1【客户端】/127.0.0.1:50631:我是客户端2
【客户端已启动】---/127.0.0.1:9999------【客户端】/127.0.0.1:50600上线啦 2021-01-21 20:30:41【客户端】/127.0.0.1:50631上线啦 2021-01-21 20:31:14我是客户端1用户输入:我是客户端1【自己】/127.0.0.1:50576:我是客户端1【客户端】/127.0.0.1:50631:我是客户端2
【客户端已启动】---/127.0.0.1:9999------【客户端】/127.0.0.1:50631上线啦 2021-01-21 20:31:14【客户端】/127.0.0.1:50576:我是客户端1【客户端】/127.0.0.1:50631:我是客户端2
【客户端已启动】---/127.0.0.1:9999------【客户端】/127.0.0.1:50576:我是客户端1我是客户端2用户输入:我是客户端2【自己】/127.0.0.1:50631:我是客户端2
总结
其实netty开发框架模板是固定的,主要编写的类是两个处理引擎
NettyServerHandlerNettyClientHandler
期间可能会遇到一些错误,可以参考:netty(二)、channelGroup.writeAndFlush发送消息客户端接收不到的原因
