逻辑
客户端捕获键盘输入然后发送给服务端
服务器接收到消息,转发给各个客户端
服务端代码
/**
* 服务端
* @author suibibk@qq.com
*/
public class NettyServer {
public static void main(String[] args) throws Exception{
//1、创建一个线程组,接收客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2、创建一个线程组,处理网络操作
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3、创建服务端启动助手来配置参数
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//4、设置两个线程组
.channel(NioServerSocketChannel.class)//5、使用NioServerSocketChannel作为服务器端通道的实现
.option(ChannelOption.SO_BACKLOG, 128)//6、设置线程队列中等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true)//7、保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {//8、创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
sc.pipeline().addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
//9、往pipeline链中添加自定义的handler
sc.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("【服务器已经启动】");
ChannelFuture cf = b.bind(9999).sync();//10、绑定端口,bind方法是异步的,sync方法是同步阻塞的
//11、关闭通道,关闭线程组
cf.channel().closeFuture().sync();//关闭连接(异步非阻塞)
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 服务端处理业务类
* @author suibibk@qq.com
*
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter{
//内部会用ConcurrentMap来维护,线程安全
private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* channel处于就绪状态,客户端刚上线
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String remoteAddress = channel.remoteAddress().toString();
//加入全局变量中
//channelGroup.writeAndFlush(Unpooled.copiedBuffer("【客户端】"+remoteAddress+"上线啦 "+format.format(new Date()),CharsetUtil.UTF_8));
channelGroup.writeAndFlush("【客户端】"+remoteAddress+"上线啦 "+format.format(new Date()));
channelGroup.add(channel);
//将当前channel加入到ChannelGroup
System.out.println("【客户端】"+remoteAddress+"上线啦");
}
/**
* channel离线
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
String remoteAddress = channel.remoteAddress().toString();
channelGroup.remove(channel);
channelGroup.writeAndFlush("【客户端】"+remoteAddress+"已下线 "+format.format(new Date()));
System.out.println("【客户端】"+remoteAddress+"已下线");
}
//读取数据事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
String remoteAddress = channel.remoteAddress().toString();
System.out.println("【客户端】"+remoteAddress+":"+msg);
channelGroup.forEach(ch -> {
if(ch==channel) {
ch.writeAndFlush("【自己】"+remoteAddress+":"+msg);
}else {
ch.writeAndFlush("【客户端】"+remoteAddress+":"+msg);
}
}
);
}
//异常发生事件
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//日志:远程主机强迫关闭了一个现有的连接。
//System.out.println(cause.getMessage());
ctx.close();
}
}
客户端代码
/**
* 网络客户端
* @author suibibk@qq.com
*
*/
public class NettyClient {
public static void main(String[] args)throws Exception {
//1、创建一个线程组
EventLoopGroup group = new NioEventLoopGroup();
//2、创建客户端启动助手,完成相关配置
Bootstrap b = new Bootstrap();
b.group(group)//3、设置线程组
.channel(NioSocketChannel.class)//4、设置客户端通道的实现类
.handler(new ChannelInitializer<SocketChannel>() {//创建一个初始化通道对象
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
sc.pipeline().addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
//6、在pipline中添加自定义的handler
sc.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("【客户端已启动】");
//7、启动客户端去连接服务器端 connect方法是异步的,sync方法是同步阻塞的
ChannelFuture cf =b.connect("127.0.0.1", 9999).sync();
System.out.println("---"+cf.channel().remoteAddress()+"------");
//循环监听用户键盘输入
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextLine()) {
String msg = scanner.nextLine();
System.out.println("用户输入:"+msg);
//通过channel发送到服务器端
cf.channel().writeAndFlush(msg);
}
//8、关闭连接(异步非阻塞)
cf.channel().closeFuture().sync();
System.out.print("Client is end.....");
}
}
/**
* 客户端业务处理类
* @author suibibk@qq.com
*
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter{
//读取数据事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf buf = (ByteBuf)msg;
System.out.println(msg);
}
//异常发生事件
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//日志:远程主机强迫关闭了一个现有的连接。
//System.out.println(cause.getMessage());
ctx.close();
}
}
测试
启动服务器,然后启动三个客户端,输入消息回车,会看到如下日志
【服务器已经启动】
【客户端】/127.0.0.1:50576上线啦
【客户端】/127.0.0.1:50600上线啦
【客户端】/127.0.0.1:50631上线啦
【客户端】/127.0.0.1:50576:我是客户端1
【客户端】/127.0.0.1:50631:我是客户端2
【客户端已启动】
---/127.0.0.1:9999------
【客户端】/127.0.0.1:50600上线啦 2021-01-21 20:30:41
【客户端】/127.0.0.1:50631上线啦 2021-01-21 20:31:14
我是客户端1
用户输入:我是客户端1
【自己】/127.0.0.1:50576:我是客户端1
【客户端】/127.0.0.1:50631:我是客户端2
【客户端已启动】
---/127.0.0.1:9999------
【客户端】/127.0.0.1:50631上线啦 2021-01-21 20:31:14
【客户端】/127.0.0.1:50576:我是客户端1
【客户端】/127.0.0.1:50631:我是客户端2
【客户端已启动】
---/127.0.0.1:9999------
【客户端】/127.0.0.1:50576:我是客户端1
我是客户端2
用户输入:我是客户端2
【自己】/127.0.0.1:50631:我是客户端2
总结
其实netty开发框架模板是固定的,主要编写的类是两个处理引擎
NettyServerHandler
NettyClientHandler
期间可能会遇到一些错误,可以参考:netty(二)、channelGroup.writeAndFlush发送消息客户端接收不到的原因