博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty5入门学习笔记004-使用Netty传输POJO对象(上)
阅读量:7074 次
发布时间:2019-06-28

本文共 15587 字,大约阅读时间需要 51 分钟。

hot3.png

使用Netty传输POJO对象,重点在于对象的序列化,序列化后的对象可以通过TCP流进行网络传输,结合Netty提供的对象编解码器,可以做到远程传输对象。

下面我们来看一个例子:模拟订票

首先Java序列化的POJO对象需要实现java.io.Serializable接口。

说明:还有很多种序列化的方式要比JDK自带的序列化要好 体积小利于保存和传输 例如google的protobuf和jboss的Marshalling 

230109_k7WO_374.png

火车车次和余票量POJO:

package bookticket;import java.io.Serializable;/** * 火车pojo对象 * @author xwalker */public class Train implements Serializable {	private static final long serialVersionUID = 1510326612440404416L;	private String number;//火车车次	private int ticketCounts;//余票数量	public Train(String number,int ticketCounts){		this.number=number;		this.ticketCounts=ticketCounts;	}	public String getNumber() {		return number;	}	public void setNumber(String number) {		this.number = number;	}	public int getTicketCounts() {		return ticketCounts;	}	public void setTicketCounts(int ticketCounts) {		this.ticketCounts = ticketCounts;	}}

 

车票POJO:

package bookticket;import java.io.Serializable;import java.util.Date;/** * 订票POJO对象 * @author xwalker */public class Ticket implements Serializable {	private static final long serialVersionUID = 4228051882802183587L;	private String trainNumber;//火车车次	private int carriageNumber;//车厢编号	private String seatNumber;//座位编号	private String number;//车票编号	private User user;//订票用户	private Date bookTime;//订票时间	private Date startTime;//开车时间	public String getNumber() {		return number;	}	public void setNumber(String number) {		this.number = number;	}	public Date getBookTime() {		return bookTime;	}	public void setBookTime(Date bookTime) {		this.bookTime = bookTime;	}	public Date getStartTime() {		return startTime;	}	public void setStartTime(Date startTime) {		this.startTime = startTime;	}	public User getUser() {		return user;	}	public void setUser(User user) {		this.user = user;	}	public String getTrainNumber() {		return trainNumber;	}	public void setTrainNumber(String trainNumber) {		this.trainNumber = trainNumber;	}	public int getCarriageNumber() {		return carriageNumber;	}	public void setCarriageNumber(int carriageNumber) {		this.carriageNumber = carriageNumber;	}	public String getSeatNumber() {		return seatNumber;	}	public void setSeatNumber(String seatNumber) {		this.seatNumber = seatNumber;	}}

 

用户POJO:

package bookticket;import java.io.Serializable;/** * 用户POJO对象 * @author xwalker */public class User implements Serializable {	private static final long serialVersionUID = -3845514510571408376L;	private String userId;//身份证	private String userName;//姓名	private String phone;//电话	private String email;//邮箱	public String getUserId() {		return userId;	}	public void setUserId(String userId) {		this.userId = userId;	}	public String getUserName() {		return userName;	}	public void setUserName(String userName) {		this.userName = userName;	}	public String getPhone() {		return phone;	}	public void setPhone(String phone) {		this.phone = phone;	}	public String getEmail() {		return email;	}	public void setEmail(String email) {		this.email = email;	}}

 

请求指令集:通讯使用的固定指令集 服务器和客户端统一

package bookticket;/** * 指令集 * @author xwalker * */public class Code {	public static final int CODE_SEARCH=1;//查询车票余量	public static final int CODE_BOOK=2;//订票	public static final int CODE_NONE=-1;//错误指令 无法处理}

 

客户端发送的请求信息:客户端发送一条请求信息 根据code属性确定此消息需要服务器做出如何响应 依据Code.java中定义的查票还是订票等

package bookticket;import java.io.Serializable;import java.util.Date;/** * 订票人发送查询余票和订票使用的请求信息 * @author xwalker * */public class BookRequestMsg implements Serializable {	private static final long serialVersionUID = -7335293929249462183L;	private User user;//发送订票信息用户	private String trainNumber;//火车车次	private int code;//查询命令	private Date startTime;//开车时间	public User getUser() {		return user;	}	public void setUser(User user) {		this.user = user;	}	public String getTrainNumber() {		return trainNumber;	}	public void setTrainNumber(String trainNumber) {		this.trainNumber = trainNumber;	}	public Date getStartTime() {		return startTime;	}	public void setStartTime(Date startTime) {		this.startTime = startTime;	}	public int getCode() {		return code;	}	public void setCode(int code) {		this.code = code;	}}

 

服务器接收订票和查票后处理完业务反馈客户端的信息:

package bookticket;import java.io.Serializable;import java.util.Date;/** * 订票成功与否反馈信息 * @author xwalker */public class BookResponseMsg implements Serializable {	private static final long serialVersionUID = -4984721370227929766L;	private boolean success;//是否操作成功	private User user;//请求用户	private String msg;//反馈信息	private int code;//请求指令	private Train train;//火车车次	private Date startTime;//出发时间	private Ticket ticket;//订票成功后具体出票票据	public boolean getSuccess() {		return success;	}	public void setSuccess(boolean success) {		this.success = success;	}	public String getMsg() {		return msg;	}	public void setMsg(String msg) {		this.msg = msg;	}	public Ticket getTicket() {		return ticket;	}	public void setTicket(Ticket ticket) {		this.ticket = ticket;	}	public int getCode() {		return code;	}	public void setCode(int code) {		this.code = code;	}	public Train getTrain() {		return train;	}	public void setTrain(Train train) {		this.train = train;	}	public Date getStartTime() {		return startTime;	}	public void setStartTime(Date startTime) {		this.startTime = startTime;	}	public User getUser() {		return user;	}	public void setUser(User user) {		this.user = user;	}	}

 

订票服务器:主要是配置对象解码器 netty会自动将序列化的pojo对象编码 解码 无需自己额外处理 只需要依据配置即可

package bookticket;import java.util.ArrayList;import java.util.List;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;/** * 订票服务器端 * @author xwalker * */public class BookTicketServer {	public static List
 trains; /**  * 初始化 构造车次和车票余数  */ public BookTicketServer() { trains=new ArrayList
(); trains.add(new Train("G242",500)); trains.add(new Train("G243",200)); trains.add(new Train("D1025",100)); trains.add(new Train("D1235",0)); } public void bind(int port) throws Exception{ //配置NIO线程组 EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workerGroup=new NioEventLoopGroup(); try{ //服务器辅助启动类配置 ServerBootstrap b=new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加对象解码器 负责对序列化POJO对象进行解码 设置对象序列化最大长度为1M 防止内存溢出 //设置线程安全的WeakReferenceMap对类加载器进行缓存 支持多线程并发访问  防止内存溢出  ch.pipeline().addLast(new ObjectDecoder(1024*1024,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); //添加对象编码器 在服务器对外发送消息的时候自动将实现序列化的POJO对象编码 ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new BookTicketServerhandler()); } }); //绑定端口 同步等待绑定成功 ChannelFuture f=b.bind(port).sync(); //等到服务端监听端口关闭 f.channel().closeFuture().sync(); }finally{ //优雅释放线程资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port =8000; new BookTicketServer().bind(port); }}

 

服务器端网络IO处理器,查票订票业务处理和反馈:根据客户端请求信息中的code指令 确定是查票还是订票

package bookticket;import java.util.Date;import java.util.Random;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;/** * 订票server端处理器 * @author xwalker * */public class BookTicketServerhandler extends ChannelHandlerAdapter {	@Override	public void channelRead(ChannelHandlerContext ctx, Object msg)			throws Exception {		BookRequestMsg requestMsg=(BookRequestMsg) msg;		BookResponseMsg responseMsg=null;		switch (requestMsg.getCode()) {		case Code.CODE_SEARCH://查询余票			for(Train train:BookTicketServer.trains){				//找到车次与请求车次相同的 返回车次余票				if(requestMsg.getTrainNumber().equals(train.getNumber())){					responseMsg=new BookResponseMsg();					responseMsg.setUser(requestMsg.getUser());					responseMsg.setCode(Code.CODE_SEARCH);					responseMsg.setSuccess(true);					responseMsg.setTrain(train);					responseMsg.setStartTime(requestMsg.getStartTime());					responseMsg.setMsg("火车【"+train.getNumber()+"】余票数量为【"+train.getTicketCounts()+"】");					break;				}			}			if(responseMsg==null){				responseMsg=new BookResponseMsg();				responseMsg.setUser(requestMsg.getUser());				responseMsg.setCode(Code.CODE_SEARCH);				responseMsg.setSuccess(false);				responseMsg.setMsg("火车【"+requestMsg.getTrainNumber()+"】的信息不存在!");			}			break;		case Code.CODE_BOOK://确认订票			for(Train train:BookTicketServer.trains){				//找到车次与请求车次相同的 返回车次余票				if(requestMsg.getTrainNumber().equals(train.getNumber())){					responseMsg=new BookResponseMsg();					responseMsg.setUser(requestMsg.getUser());					responseMsg.setSuccess(true);					responseMsg.setCode(Code.CODE_BOOK);					responseMsg.setMsg("恭喜您,订票成功!");					Ticket ticket=new Ticket();					ticket.setBookTime(new Date());					ticket.setUser(requestMsg.getUser());					ticket.setStartTime(requestMsg.getStartTime());					ticket.setNumber(train.getNumber()+System.currentTimeMillis());//生成车票编号					ticket.setCarriageNumber(new Random().nextInt(15));//随机车厢					ticket.setUser(requestMsg.getUser());//设置订票人信息					String[] seat=new String[]{"A","B","C","D","E"};					Random seatRandom=new Random();					ticket.setSeatNumber(seat[seatRandom.nextInt(5)]+seatRandom.nextInt(100));					ticket.setTrainNumber(train.getNumber());					train.setTicketCounts(train.getTicketCounts()-1);//余票减去一张					responseMsg.setTrain(train);					responseMsg.setTicket(ticket);					break;				}			}			if(responseMsg==null){				responseMsg=new BookResponseMsg();				responseMsg.setUser(requestMsg.getUser());				responseMsg.setCode(Code.CODE_BOOK);				responseMsg.setSuccess(false);				responseMsg.setMsg("火车【"+requestMsg.getTrainNumber()+"】的信息不存在!");			}			break;		default://无法处理				responseMsg=new BookResponseMsg();				responseMsg.setUser(requestMsg.getUser());				responseMsg.setCode(Code.CODE_NONE);				responseMsg.setSuccess(false);				responseMsg.setMsg("指令无法处理!");			break;		}				ctx.writeAndFlush(responseMsg);	}	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)			throws Exception {		cause.printStackTrace();		ctx.close();	}}

 

客户端:客户端也需要配置对象编码 解码器

package bookticket;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;/** * 订票客户端 * @author xwalker */public class BookTicketClient {	public void connect(int port,String host) throws Exception{		//配置客户端线程组		EventLoopGroup group=new NioEventLoopGroup();		try{			//配置客户端启动辅助类			Bootstrap b=new Bootstrap();			b.group(group).channel(NioSocketChannel.class)			.option(ChannelOption.TCP_NODELAY, true)			.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加POJO对象解码器 禁止缓存类加载器 ch.pipeline().addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))); //设置发送消息编码器 ch.pipeline().addLast(new ObjectEncoder()); //设置网络IO处理器 ch.pipeline().addLast(new BookTicketClientHandler()); } }); //发起异步服务器连接请求 同步等待成功 ChannelFuture f=b.connect(host,port).sync(); //等到客户端链路关闭 f.channel().closeFuture().sync(); }finally{ //优雅释放线程资源 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception{ new BookTicketClient().connect(8000, "127.0.0.1"); }}

 

客户端处理网络IO处理器 发送查票和订票请求:链路创建成功后需要 模拟发送两个车次的查票指令 其中一车有票 一车无票 有票的信息反馈回来后继续发送订票指令 成功订票后输出车票信息

package bookticket;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import java.util.Calendar;/** * 客户端处理器 *  * @author xwalker */public class BookTicketClientHandler extends ChannelHandlerAdapter {	private User user;	public BookTicketClientHandler() {		user=new User();		user.setUserName("xwalker");		user.setPhone("187667*****");		user.setEmail("909854136@qq.com");		user.setUserId("3705231988********");	}	/**	 * 链路链接成功	 */	@Override	public void channelActive(ChannelHandlerContext ctx) throws Exception {				// 链接成功后发送查询某车次余票的请求		Calendar c = Calendar.getInstance();		c.set(Calendar.YEAR, 2015);		c.set(Calendar.MONTH, 1);		c.set(Calendar.DATE, 2);		c.set(Calendar.HOUR, 11);		c.set(Calendar.MINUTE, 30);		// G242查询余票		BookRequestMsg requestMsg1 = new BookRequestMsg();		requestMsg1.setCode(Code.CODE_SEARCH);		requestMsg1.setStartTime(c.getTime());		requestMsg1.setTrainNumber("G242");//设置查询车次		requestMsg1.setUser(user);//设置当前登陆用户		ctx.write(requestMsg1);		// D1235查询余票		BookRequestMsg requestMsg2 = new BookRequestMsg();		requestMsg2.setCode(Code.CODE_SEARCH);		requestMsg2.setStartTime(c.getTime());		requestMsg2.setTrainNumber("D1235");//设置查询车次		requestMsg2.setUser(user);		ctx.write(requestMsg2);		ctx.flush();	}	@Override	public void channelRead(ChannelHandlerContext ctx, Object msg)			throws Exception {		BookResponseMsg responseMsg = (BookResponseMsg) msg;		switch (responseMsg.getCode()) {		case Code.CODE_SEARCH://收到查询结果			System.out.println("==========火车【"+responseMsg.getTrain().getNumber()+"】余票查询结果:【"+(responseMsg.getSuccess()?"成功":"失败")+"】=========");			System.out.println(responseMsg.getMsg());			//查询发现有余票的话 需要发送订票指令			if(responseMsg.getTrain().getTicketCounts()>0){				//构造查询有余票的火车的订票指令				BookRequestMsg requestMsg = new BookRequestMsg();				requestMsg.setCode(Code.CODE_BOOK);				requestMsg.setUser(user);				requestMsg.setStartTime(responseMsg.getStartTime());				requestMsg.setTrainNumber(responseMsg.getTrain().getNumber());				ctx.writeAndFlush(requestMsg);			}else{				System.out.println("火车【"+responseMsg.getTrain().getNumber()+"】没有余票,不能订票了!");			}			break;		case Code.CODE_BOOK://收到订票结果			System.out.println("==========火车【"+responseMsg.getTrain().getNumber()+"】订票结果:【"+(responseMsg.getSuccess()?"成功":"失败")+"】=========");			System.out.println(responseMsg.getMsg());			System.out.println("========车票详情========");			Ticket ticket=responseMsg.getTicket();			System.out.println("车票票号:【"+ticket.getNumber()+"】");			System.out.println("火车车次:【"+ticket.getTrainNumber()+"】");			System.out.println("火车车厢:【"+ticket.getCarriageNumber()+"】");			System.out.println("车厢座位:【"+ticket.getSeatNumber()+"】");			System.out.println("预定时间:【"+ticket.getBookTime()+"】");			System.out.println("出发时间:【"+ticket.getStartTime()+"】");			System.out.println("乘客信息:【"+ticket.getUser().getUserName()+"】");			break;		default:			System.out.println("==========操作错误结果=========");			System.out.println(responseMsg.getMsg());			break;		}	}	@Override	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {		ctx.flush();	}	@Override	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)			throws Exception {		cause.printStackTrace();		ctx.close();	}}

 

最后测试结果:

231237_rj5B_374.png

 

转载于:https://my.oschina.net/imhoodoo/blog/361076

你可能感兴趣的文章
Oracle创建表空间
查看>>
java使用Executor(执行器)管理线程
查看>>
iOS开发Swift篇—(十)方法
查看>>
[翻译] UPCardsCarousel
查看>>
win8 开发之旅(19) --足球游戏揭秘6
查看>>
翻页效果
查看>>
UIGestureRecognizerState
查看>>
Lua非常有用的工具——递归打印表数据
查看>>
(九十四)函数和二维数组
查看>>
Android ListView监听上下滑动(判断是否显示返回顶部按钮)
查看>>
跟着实例学习ZooKeeper的用法: Barrier
查看>>
JAVA生成二维码(zxing)
查看>>
BUG系列
查看>>
成为优秀Java程序员的10大技巧
查看>>
一个16年毕业生所经历的php面试
查看>>
AAC架构系列一(初识)
查看>>
react-native-echarts 在手机上 图表出现滚动条解决方法
查看>>
微信中的video属性设置
查看>>
JavaScript 笔记02
查看>>
新形式下触电新闻如何打造内容安全领域新标杆
查看>>