亚洲最大看欧美片,亚洲图揄拍自拍另类图片,欧美精品v国产精品v呦,日本在线精品视频免费

  • 站長資訊網
    最全最豐富的資訊網站

    基于Java NIO的即時聊天服務器模型

    折騰了一個周,終于搞出來了一個雛形,相比于xmpp的xml,本人更喜歡json的簡潔,為了防止客戶端異常斷開等,準備采用心跳檢測的機制來判斷用戶是否在線,另外還有一種方法是學習例如Tomcat等Servlet中間件的方式,設置Session周期,定時清除過期Session。

      前不久自己動手寫了一個Android的聊天工具,跟服務器的交互還是基于HTTP方式的,在一般通訊上還算湊活,但是在即時聊天的時候就有點惡心了,客戶端開啟Service每隔3秒去詢問服務器是否有自己的新消息(當然3秒有點太快了),在心疼性能和流量的前提下,只能自己動手寫個服務器,傳統(tǒng)的Socket是阻塞的,這樣的話服務器對每個Socket都需要建立一個線程來操作,資源開銷很大,而且線程多了直接會影響服務端的性能(曾經測試開了3000多個線程就不讓創(chuàng)建了,所以并發(fā)數目也是有限制的),聽說從JDK1.5就多了個New
      IO,灰常不錯的樣子,找了找相關的資料,網上竟然全都是最最最簡單的一個demo,然后去CSDN發(fā)帖,基本上都是建議直接使用MINA框架的,這樣一來根本達不到學習NIO的目的,而且現(xiàn)在的技術也太快餐了,只知道使用前輩留下的東西,知其然不知其所以然。

      折騰了一個周,終于搞出來了一個雛形,相比于xmpp的xml,本人更喜歡json的簡潔,為了防止客戶端異常斷開等,準備采用心跳檢測的機制來判斷用戶是否在線,另外還有一種方法是學習例如Tomcat等Servlet中間件的方式,設置Session周期,定時清除過期Session。本Demo暫時實現(xiàn)了Session過期檢測,心跳檢測有空再搞,如果本例子在使用過程中有性能漏洞或者什么bug請及時通知我,謝謝。

      廢話不多說,關于NIO的SelectionKey、Selector、Channel網上的介紹例子都很多,直接上代碼:

      JsonParser

      Json的解析類,隨便封裝了下,使用的最近比較火的fastjson

                       
      1. public class JsonParser {
      2. private static JSONObject mJson;
      3. public synchronized static String get(String json,String key) {
      4. mJson = JSON.parseObject(json);
      5. return mJson.getString(key);
      6. }
      7. }
       

      Main

      入口,不解釋

                                                                                                       
      1. public class Main {
      2. public static void main(String… args) {
      3. new SeekServer().start();
      4. }
      5. }
       

      Log

                                                                                                                                                               
      1. public class Log {
      2. public static void i(Object obj) {
      3. System.out.println(obj);
      4. }
      5. public static void e(Object e) {
      6. System.err.println(e);
      7. }
      8. }
       

      SeekServer:

      服務器端的入口,請求的封裝和接收都在此類,端口暫時寫死在了代碼里,mSelector.select(TIME_OUT) > 0
      目的是為了當服務器空閑的時候(沒有任何讀寫甚至請求斷開事件),循環(huán)時有個間隔時間,不然基本上相當于while(true){//nothing}了,你懂的。

                                                                                                                                                                                                                                               
      1. public class SeekServer extends Thread{
      2. private final int ACCPET_PORT = 55555;
      3. private final int TIME_OUT = 1000;
      4. private Selector mSelector = null;
      5. private ServerSocketChannel mSocketChannel = null;
      6. private ServerSocket mServerSocket = null;
      7. private InetSocketAddress mAddress = null;
      8. public SeekServer() {
      9. long sign = System.currentTimeMillis();
      10. try {
      11. mSocketChannel = ServerSocketChannel.open();
      12. if(mSocketChannel == null) {
      13. System.out.println(“can’t open server socket channel”);
      14. }
      15. mServerSocket = mSocketChannel.socket();
      16. mAddress = new InetSocketAddress(ACCPET_PORT);
      17. mServerSocket.bind(mAddress);
      18. Log.i(“server bind port is “ + ACCPET_PORT);
      19. mSelector = Selector.open();
      20. mSocketChannel.configureBlocking(false);
      21. SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);
      22. key.attach(new Acceptor());
      23. //檢測Session狀態(tài)
      24. Looper.getInstance().loop();
      25. //開始處理Session
      26. SessionProcessor.start();
      27. Log.i(“Seek server startup in “ + (System.currentTimeMillis() – sign) + “ms!”);
      28. } catch (ClosedChannelException e) {
      29. Log.e(e.getMessage());
      30. } catch (IOException e) {
      31. Log.e(e.getMessage());
      32. }
      33. }
      34. public void run() {
      35. Log.i(“server is listening…”);
      36. while(!Thread.interrupted()) {
      37. try {
      38. if(mSelector.select(TIME_OUT) > 0) {
      39. Set<SelectionKey> keys = mSelector.selectedKeys();
      40. Iterator<SelectionKey> iterator = keys.iterator();
      41. SelectionKey key = null;
      42. while(iterator.hasNext()) {
      43. key = iterator.next();
      44. Handler at = (Handler) key.attachment();
      45. if(at != null) {
      46. at.exec();
      47. }
      48. iterator.remove();
      49. }
      50. }
      51. } catch (IOException e) {
      52. Log.e(e.getMessage());
      53. }
      54. }
      55. }
      56. class Acceptor extends Handler{
      57. public void exec(){
      58. try {
      59. SocketChannel sc = mSocketChannel.accept();
      60. new Session(sc, mSelector);
      61. } catch (ClosedChannelException e) {
      62. Log.e(e);
      63. } catch (IOException e) {
      64. Log.e(e);
      65. }
      66. }
      67. }
      68. }
       

      Handler:

      只有一個抽象方法exec,Session將會繼承它。


      1. public abstract class Handler {
      2. public abstract void exec();
      3. }
       

      Session:

      封裝了用戶的請求和SelectionKey和SocketChannel,每次接收到新的請求時都重置它的最后活動時間,通過狀態(tài)mState=READING
      or SENDING 去執(zhí)行消息的接收與發(fā)送,當客戶端異常斷開時則從SessionManager清除該會話。


      1. public class Session extends Handler{
      2. private SocketChannel mChannel;
      3. private SelectionKey mKey;
      4. private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240);
      5. private Charset charset = Charset.forName(“UTF-8”);
      6. private CharsetDecoder mDecoder = charset.newDecoder();
      7. private CharsetEncoder mEncoder = charset.newEncoder();
      8. private long lastPant;//最后活動時間
      9. private final int TIME_OUT = 1000 * 60 * 5; //Session超時時間
      10. private String key;
      11. private String sendData = “”;
      12. private String receiveData = null;
      13. public static final int READING = 0,SENDING = 1;
      14. int mState = READING;
      15. public Session(SocketChannel socket, Selector selector) throws IOException {
      16. this.mChannel = socket;
      17. mChannel = socket;
      18. mChannel.configureBlocking(false);
      19. mKey = mChannel.register(selector, 0);
      20. mKey.attach(this);
      21. mKey.interestOps(SelectionKey.OP_READ);
      22. selector.wakeup();
      23. lastPant = Calendar.getInstance().getTimeInMillis();
      24. }
      25. public String getReceiveData() {
      26. return receiveData;
      27. }
      28. public void clear() {
      29. receiveData = null;
      30. }
      31. public void setSendData(String sendData) {
      32. mState = SENDING;
      33. mKey.interestOps(SelectionKey.OP_WRITE);
      34. this.sendData = sendData + “n”;
      35. }
      36. public boolean isKeekAlive() {
      37. return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis();
      38. }
      39. public void setAlive() {
      40. lastPant = Calendar.getInstance().getTimeInMillis();
      41. }
      42. /**
      43. * 注銷當前Session
      44. */
      45. public void distroy() {
      46. try {
      47. mChannel.close();
      48. mKey.cancel();
      49. } catch (IOException e) {}
      50. }
      51. @Override
      52. public synchronized void exec() {
      53. try {
      54. if(mState == READING) {
      55. read();
      56. }else if(mState == SENDING) {
      57. write();
      58. }
      59. } catch (IOException e) {
      60. SessionManager.remove(key);
      61. try {
      62. mChannel.close();
      63. } catch (IOException e1) {
      64. Log.e(e1);
      65. }
      66. mKey.cancel();
      67. }
      68. }
      69. public void read() throws IOException{
      70. mRreceiveBuffer.clear();
      71. int sign = mChannel.read(mRreceiveBuffer);
      72. if(sign == –1) { //客戶端連接關閉
      73. mChannel.close();
      74. mKey.cancel();
      75. }
      76. if(sign > 0) {
      77. mRreceiveBuffer.flip();
      78. receiveData = mDecoder.decode(mRreceiveBuffer).toString();
      79. setAlive();
      80. setSign();
      81. SessionManager.addSession(key, this);
      82. }
      83. }
      84. private void setSign() {
      85. //設置當前Session的Key
      86. key = JsonParser.get(receiveData,“imei”);
      87. //檢測消息類型是否為心跳包
      88. // String type = jo.getString(“type”);
      89. // if(type.equals(“HEART_BEAT”)) {
      90. // setAlive();
      91. // }
      92. }
      93. /**
      94. * 寫消息
      95. */
      96. public void write() {
      97. try {
      98. mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));
      99. sendData = null;
      100. mState = READING;
      101. mKey.interestOps(SelectionKey.OP_READ);
      102. } catch (CharacterCodingException e) {
      103. e.printStackTrace();
      104. } catch (IOException e) {
      105. try {
      106. mChannel.close();
      107. } catch (IOException e1) {
      108. Log.e(e1);
      109. }
      110. }
      111. }
      112. }
       

      SessionManager:

      將所有Session存放到ConcurrentHashMap,這里使用手機用戶的imei做key,ConcurrentHashMap因為是線程安全的,所以能很大程度上避免自己去實現(xiàn)同步的過程,
      封裝了一些操作Session的方法例如get,remove等。


      1. public class SessionManager {
      2. private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
      3. public static void addSession(String key,Session session) {
      4. sessions.put(key, session);
      5. }
      6. public static Session getSession(String key) {
      7. return sessions.get(key);
      8. }
      9. public static Set<String> getSessionKeys() {
      10. return sessions.keySet();
      11. }
      12. public static int getSessionCount() {
      13. return sessions.size();
      14. }
      15. public static void remove(String[] keys) {
      16. for(String key:keys) {
      17. if(sessions.containsKey(key)) {
      18. sessions.get(key).distroy();
      19. sessions.remove(key);
      20. }
      21. }
      22. }
      23. public static void remove(String key) {
      24. if(sessions.containsKey(key)) {
      25. sessions.get(key).distroy();
      26. sessions.remove(key);
      27. }
      28. }
      29. }
       

      SessionProcessor

      里面使用了JDK自帶的線程池,用來分發(fā)處理所有Session中當前需要處理的請求(線程池的初始化參數不是太熟,望有了解的童鞋能告訴我),內部類Process則是將Session再次封裝成SocketRequest和SocketResponse(看到這里是不是有點熟悉的感覺,對沒錯,JavaWeb里到處都是request和response)。


      1. public class SessionProcessor implements Runnable{
      2. private static Runnable processor = new SessionProcessor();
      3. private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());
      4. public static void start() {
      5. new Thread(processor).start();
      6. }
      7. @Override
      8. public void run() {
      9. while(true) {
      10. Session tmp = null;
      11. for(String key:SessionManager.getSessionKeys()) {
      12. tmp = SessionManager.getSession(key);
      13. //處理Session未處理的請求
      14. if(tmp.getReceiveData() != null) {
      15. pool.execute(new Process(tmp));
      16. }
      17. }
      18. try {
      19. Thread.sleep(10);
      20. } catch (InterruptedException e) {
      21. Log.e(e);
      22. }
      23. }
      24. }
      25. class Process implements Runnable {
      26. private SocketRequest request;
      27. private SocketResponse response;
      28. public Process(Session session) {
      29. //將Session封裝成Request和Response
      30. request = new SocketRequest(session);
      31. response = new SocketResponse(session);
      32. }
      33. @Override
      34. public void run() {
      35. new RequestTransform().transfer(request, response);
      36. }
      37. }
      38. }
       

      RequestTransform里的transfer方法利用反射對請求參數中的請求類別和請求動作來調用不同類的不同方法(UserHandler和MessageHandler)


      1. public class RequestTransform {
      2. public void transfer(SocketRequest request,SocketResponse response) {
      3. String action = request.getValue(“action”);
      4. String handlerName = request.getValue(“handler”);
      5. //根據Session的請求類型,讓不同的類方法去處理
      6. try {
      7. Class<?> c= Class.forName(“com.seek.server.handler.” + handlerName);
      8. Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};
      9. Method method=c.getMethod(action,arg);
      10. method.invoke(c.newInstance(), new Object[]{request,response});
      11. } catch (Exception e) {
      12. e.printStackTrace();
      13. }
      14. }
      15. }
       

      SocketRequest和SocketResponse


      1. public class SocketRequest {
      2. private Session mSession;
      3. private String mReceive;
      4. public SocketRequest(Session session) {
      5. mSession = session;
      6. mReceive = session.getReceiveData();
      7. mSession.clear();
      8. }
      9. public String getValue(String key) {
      10. return JsonParser.get(mReceive, key);
      11. }
      12. public String getQueryString() {
      13. return mReceive;
      14. }
      15. }
       

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               
      1. public class SocketResponse {
      2. private Session mSession;
      3. public SocketResponse(Session session) {
      4. mSession = session;
      5. }
      6. public void write(String msg) {
      7. mSession.setSendData(msg);
      8. }
      9. }
       

      最后則是兩個處理請求的Handler


      1. public class UserHandler {
      2. public void login(SocketRequest request,SocketResponse response) {
      3. System.out.println(request.getQueryString());
      4. //TODO: 處理用戶登錄
      5. response.write(“你肯定收到消息了”);
      6. }
      7. }
       

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       
      1. public class MessageHandler {
      2. public void send(SocketRequest request,SocketResponse response) {
      3. System.out.println(request.getQueryString());
      4. //消息發(fā)送
      5. String key = request.getValue(“imei”);
      6. Session session = SessionManager.getSession(key);
      7. new SocketResponse(session).write(request.getValue(“sms”));
      8. }
      9. }
       

      還有個監(jiān)測是否超時的類Looper,定期去刪除Session


      1. public class Looper extends Thread{
      2. private static Looper looper = new Looper();
      3. private static boolean isStart = false;
      4. private final int INTERVAL = 1000 * 60 * 5;
      5. private Looper(){}
      6. public static Looper getInstance() {
      7. return looper;
      8. }
      9. public void loop() {
      10. if(!isStart) {
      11. isStart = true;
      12. this.start();
      13. }
      14. }
      15. public void run() {
      16. Task task = new Task();
      17. while(true) {
      18. //Session過期檢測
      19. task.checkState();
      20. //心跳包檢測
      21. //task.sendAck();
      22. try {
      23. Thread.sleep(INTERVAL);
      24. } catch (InterruptedException e) {
      25. Log.e(e);
      26. }
      27. }
      28. }
      29. }
       


      1. public class Task {
      2. public void checkState() {
      3. Set<String> keys = SessionManager.getSessionKeys();
      4. if(keys.size() == 0) {
      5. return;
      6. }
      7. List<String> removes = new ArrayList<String>();
      8. Iterator<String> iterator = keys.iterator();
      9. String key = null;
      10. while(iterator.hasNext()) {
      11. key = iterator.next();
      12. if(!SessionManager.getSession(key).isKeekAlive()) {
      13. removes.add(key);
      14. }
      15. }
      16. if(removes.size() > 0) {
      17. Log.i(“sessions is time out,remove “ + removes.size() + “session”);
      18. }
      19. SessionManager.remove(removes.toArray(new String[removes.size()]));
      20. }
      21. public void sendAck() {
      22. Set<String> keys = SessionManager.getSessionKeys();
      23. if(keys.size() == 0) {
      24. return;
      25. }
      26. Iterator<String> iterator = keys.iterator();
      27. while(iterator.hasNext()) {
      28. iterator.next();
      29. //TODO 發(fā)送心跳包
      30. }
      31. }
      32. }
       

      注意,在Task和SessionProcessor類里都有對SessionManager的sessions做遍歷,文中使用的方法并不是很好,主要是效率問題,推薦使用遍歷Entry的方式來獲取Key和Value,因為一直在JavaWeb上折騰,所以會的童鞋看到Request和Response會挺親切,這個例子沒有經過任何安全和性能測試,如果需要放到生產環(huán)境上得話請先自行做測試-
      -!

      客戶端請求時的數據內容例如{handler:”UserHandler”,action:”login”,imei:”2364656512636″…….},這些約定就自己來定了。

      贊(0)
      分享到: 更多 (0)
      網站地圖   滬ICP備18035694號-2    滬公網安備31011702889846號