- 浏览: 61535 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
u014549257:
...
Apache Mina: StreamIoHandler传输文件处理 -
至尊包:
想问一下,这个官网的列子如果要兼容3.0以下的版本要怎么处理? ...
Swipe Views (水平分页)
本例子根据mina自带的例子:sumup改写。
1. 基本原理:
1) 客户端向服务端发送AddMessage对象时,先根据AddMessageEncoder编码, 当服务端接收到AddMessage后,根据自定义的AddMessageDecode解码数据。
2) 服务端数据解码后,生成回复对象ResultMessage,并对该对象通过ResultMessageEncoder进行编码,并发送到客户端。 客户端接收ResultMessage后,根据ResultMessageDecoder解码,并将数据显示出来。
3) AddMessageEncoder,ResultMessageEncoder,AddMessageDecoder,ResultMessageDecoder自定义编码和解码的方法。
2. 代码:
1) 创建AddMessage和ResultMessage对象
AbstractMessage.java
AddMessage.java
ResultMessage.java
2) 创建AddMessageEncoder和ResultMessageEncoder
AddMessageEncoder.java
ResultMessageEncoder.java
3) 创建AddMessageDecoder和ResultMessageDecoder
AddMessageDecoder.java
ResultMessageDecoder.java
4) 创建CodecFactory,注册AddMessageEncoder, ResultMessageEncoder, AddMessageDecoder, ResultMessageDecoder.
5) 创建server端服务和业务处理ServerIoHandler
Server.java
ServerIoHandler.java
6) 创建Client端连接和业务处理ClientIoHandler
Client.java
ClientIoHandler.java
7. Constants.java
1. 基本原理:
1) 客户端向服务端发送AddMessage对象时,先根据AddMessageEncoder编码, 当服务端接收到AddMessage后,根据自定义的AddMessageDecode解码数据。
2) 服务端数据解码后,生成回复对象ResultMessage,并对该对象通过ResultMessageEncoder进行编码,并发送到客户端。 客户端接收ResultMessage后,根据ResultMessageDecoder解码,并将数据显示出来。
3) AddMessageEncoder,ResultMessageEncoder,AddMessageDecoder,ResultMessageDecoder自定义编码和解码的方法。
2. 代码:
1) 创建AddMessage和ResultMessage对象
AbstractMessage.java
public abstract class AbstractMessage implements Serializable { private int sequence; public int getSequence() { return sequence; } public void setSequence(int sequence) { this.sequence = sequence; } }
AddMessage.java
public class AddMessage extends AbstractMessage { private static final long serialVersionUID = -735205238699949292L; private int value; public AddMessage(){ } public int getValue() { return value; } public void setValue(int value) { this.value = value; } @Override public String toString() { return "AddMessage [value=" + value + ", getSequence()=" + getSequence() + "]"; } }
ResultMessage.java
public class ResultMessage extends AbstractMessage { private static final long serialVersionUID = 7431899532938146290L; private boolean ok; private int value; public ResultMessage(){ } public boolean isOk() { return ok; } public void setOk(boolean ok) { this.ok = ok; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } @Override public String toString() { return "ResultMessage [ok=" + ok + ", value=" + value + ", getSequence()=" + getSequence() + "]"; } }
2) 创建AddMessageEncoder和ResultMessageEncoder
AddMessageEncoder.java
public class AddMessageEncoder<T extends AddMessage> implements MessageEncoder<T> { @Override public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception { IoBuffer buf = IoBuffer.allocate(16); buf.setAutoExpand(true); // Enable auto-expand for easier encoding // Encode a header buf.putInt(message.getSequence()); buf.putInt(message.getValue()); buf.flip(); out.write(buf); } }
ResultMessageEncoder.java
public class ResultMessageEncoder<T extends ResultMessage> implements MessageEncoder<T> { @Override public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception { IoBuffer buf = IoBuffer.allocate(16); buf.setAutoExpand(true); // Enable auto-expand for easier encoding if(message.isOk()){ buf.putShort((short) Constants.RESULT_OK); buf.putInt(message.getSequence()); buf.putInt(message.getValue()); }else{ buf.putShort((short)Constants.RESULT_ERROR); buf.putInt(message.getSequence()); buf.putInt(0); } buf.flip(); out.write(buf); } }
3) 创建AddMessageDecoder和ResultMessageDecoder
AddMessageDecoder.java
public class AddMessageDecoder implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 1) { return MessageDecoderResult.NEED_DATA; } // Return NOT_OK if not matches. return MessageDecoderResult.OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (in.remaining() < 1) { return MessageDecoderResult.NEED_DATA; } int sequence = in.getInt(); int value = in.getInt(); AddMessage m = new AddMessage(); m.setSequence(sequence); m.setValue(value); out.write(m); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // TODO Auto-generated method stub } }
ResultMessageDecoder.java
public class ResultMessageDecoder implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 1) { return MessageDecoderResult.NEED_DATA; } int code = in.getShort(); if(code==Constants.RESULT_OK){ return MessageDecoderResult.OK; } // Return NOT_OK if not matches. return MessageDecoderResult.NOT_OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (in.remaining() < 1) { return MessageDecoderResult.NEED_DATA; } int code = in.getShort(); int sequence = in.getInt(); int value = in.getInt(); ResultMessage m = new ResultMessage(); if(code==Constants.RESULT_OK){ m.setOk(true); m.setSequence(sequence); m.setValue(value); }else{ m.setOk(false); } out.write(m); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // TODO Auto-generated method stub } }
4) 创建CodecFactory,注册AddMessageEncoder, ResultMessageEncoder, AddMessageDecoder, ResultMessageDecoder.
public class DemoProtocolCodecFactory extends DemuxingProtocolCodecFactory { public DemoProtocolCodecFactory(boolean isServer){ if(isServer){ super.addMessageEncoder(ResultMessage.class, ResultMessageEncoder.class); super.addMessageDecoder(AddMessageDecoder.class); }else{ super.addMessageEncoder(AddMessage.class, AddMessageEncoder.class); super.addMessageDecoder(ResultMessageDecoder.class); } } }
5) 创建server端服务和业务处理ServerIoHandler
Server.java
public class Server { public void init() throws IOException{ IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast("logger", new LoggingFilter()); acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new DemoProtocolCodecFactory(Constants.IS_SERVER))); acceptor.setHandler(new ServerIoHandler()); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.setDefaultLocalAddress(new InetSocketAddress(Constants.PORT)); acceptor.bind();// 启动监听 } public static void main(String[] args) throws IOException { Server server = new Server(); server.init(); } }
ServerIoHandler.java
public class ServerIoHandler extends IoHandlerAdapter { private static final String SUM_KEY = "sum"; @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { System.out.println(cause.getMessage()); cause.printStackTrace(); session.close(true); } @Override public void messageReceived(IoSession session, Object message) throws Exception { AddMessage add = (AddMessage) message; int sum = ((Integer)session.getAttribute(SUM_KEY)).intValue(); int value = add.getValue(); long total = (long) sum + value; if(total>Integer.MAX_VALUE || total<Integer.MIN_VALUE){ ResultMessage result = new ResultMessage(); result.setSequence(add.getSequence()); result.setOk(false); session.write(result); }else{ sum = (int) total; session.setAttribute(SUM_KEY, sum); ResultMessage result = new ResultMessage(); result.setSequence(add.getSequence()); result.setOk(true); result.setValue(sum); session.write(result); } //System.out.println("total=" + total); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { session.close(true); } @Override public void sessionOpened(IoSession session) throws Exception { session.setAttribute(SUM_KEY, new Integer(0)); } }
6) 创建Client端连接和业务处理ClientIoHandler
Client.java
public class Client { public void init() throws InterruptedException{ NioSocketConnector connector = new NioSocketConnector(); // Configure the service. connector.setConnectTimeoutMillis(Constants.CONNECT_TIMEOUT); connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new DemoProtocolCodecFactory(!Constants.IS_SERVER))); connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.setHandler(new ClientIoHandler()); IoSession session; for (;;) { try { ConnectFuture future = connector.connect(new InetSocketAddress(Constants.HOSTNAME, Constants.PORT)); future.awaitUninterruptibly(); session = future.getSession(); break; } catch (RuntimeIoException e) { System.err.println("Failed to connect."); e.printStackTrace(); Thread.sleep(5000); } } // wait until the summation is done session.getCloseFuture().awaitUninterruptibly(); connector.dispose(); } /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { Client client = new Client(); client.init(); } }
ClientIoHandler.java
public class ClientIoHandler extends IoHandlerAdapter { private List<Integer> values = new ArrayList<Integer>(); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { session.close(true); } @Override public void messageReceived(IoSession session, Object message) throws Exception { ResultMessage rm = (ResultMessage) message; if (rm.isOk()) { if (rm.getSequence() == values.size() - 1) { System.out.println("the sum is " + rm.getValue()); session.close(true); //finished = true; } } else { System.out.println("Server error, disconnecting..."); session.close(true); //finished = true; } } @Override public void sessionOpened(IoSession session) throws Exception { init(); for (int i = 0; i < values.size(); i++) { int _value = ((Integer) values.get(i)).intValue(); AddMessage m = new AddMessage(); m.setSequence(i); m.setValue(_value); session.write(m); } } private void init(){ for(int i=0;i<3;i++){ int _value = i*100 + 1; values.add(new Integer(_value)); } } }
7. Constants.java
public class Constants { public final static int PORT = 9123; public static final String HOSTNAME = "localhost"; public static final long CONNECT_TIMEOUT = 30*1000L; // 30 seconds public final static boolean IS_SERVER = true; public static final int RESULT_OK = 0; public static final int RESULT_ERROR = 1; }
- minaServer.rar (60 KB)
- 下载次数: 4
相关推荐
Apache MINA是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。 当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序(只在最新的预览版...
Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)
apache-mina-2.0.16.zip
深入理解Apache_Mina_(1)----_Mina的几个类 深入理解Apache_Mina_(2)----_与IoFilter相关的几个类 深入理解Apache_Mina_(3)----_与IoHandler相关的几个类 深入理解Apache_Mina_(4)----_IoFilter和IoHandler的区别和...
这个是我学的apache mina2.0学习笔记的实例
hive 开发UDF 使用maven工程 引发jar包缺失 hive 开发UDF 使用maven工程 引发jar包缺失
Apache MINA 线程模型配置 Mina配置
收集整理的Apache Mina chm pdf教程和帮助文档
Apache MINA是一个网络应用程序框架,用来帮助用户简单地开发高性能和高可靠性的网络应用程序。 apache-mina-2.0.7-bin.zip,apache-mina-2.0.7-src.zip,log4j-1.2.17.zip,slf4j-api-1.6.6.jar,slf4j-api-1.6.6-...
apache mina的入门完整学习资料,附加中文参考手册。
apache mina 简单示例apache mina 简单示例apache mina 简单示例apache mina 简单示例apache mina 简单示例apache mina 简单示例
mina:Apache MINA的镜像
Apache MINA2实用手册 Apache MINA2实用手册
apache mina 框架 实例 自定义协议包 自定义编码器 解码器 服务端 客户端
一个Apache MINA使用案例源代码ApacheMina
apache mina实例免费下载,有很多实例代码简单易懂欢迎大家下载!
Apache MINA 2.0 用户指南
本资源包含两个 pdf 文档,一本根据官方最新文档 (http://mina.apache.org/mina-project/userguide/user-guide-toc.html) 整理的 mina_2.0_user_guide_en.pdf,一个中文翻译的 mina_2.0_user_guide_cn.pdf。...
最新的Apache Mina v2.0.8 API手册,chm格式,2014年9月下旬制作。