View Javadoc
1   package com.ericsson.research.transport;
2   
3   /*
4    * ##_BEGIN_LICENSE_##
5    * Transport Abstraction Package (trap)
6    * ----------
7    * Copyright (C) 2014 Ericsson AB
8    * ----------
9    * Redistribution and use in source and binary forms, with or without modification,
10   * are permitted provided that the following conditions are met:
11   * 
12   * 1. Redistributions of source code must retain the above copyright notice, this
13   *    list of conditions and the following disclaimer.
14   * 
15   * 2. Redistributions in binary form must reproduce the above copyright notice,
16   *    this list of conditions and the following disclaimer in the documentation
17   *    and/or other materials provided with the distribution.
18   * 
19   * 3. Neither the name of the Ericsson AB nor the names of its contributors
20   *    may be used to endorse or promote products derived from this software without
21   *    specific prior written permission.
22   * 
23   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
24   * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
25   * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
26   * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
27   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28   * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
30   * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
31   * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
32   * OF THE POSSIBILITY OF SUCH DAMAGE.
33   * ##_END_LICENSE_##
34   */
35  
36  import java.io.IOError;
37  import java.io.IOException;
38  import java.lang.ref.WeakReference;
39  import java.net.InetSocketAddress;
40  import java.net.SocketAddress;
41  import java.nio.ByteBuffer;
42  import java.nio.channels.ClosedSelectorException;
43  import java.nio.channels.SelectionKey;
44  import java.nio.channels.Selector;
45  import java.nio.channels.ServerSocketChannel;
46  import java.nio.channels.SocketChannel;
47  import java.nio.channels.spi.SelectorProvider;
48  import java.nio.charset.CoderMalfunctionError;
49  import java.util.Collection;
50  import java.util.HashMap;
51  import java.util.Iterator;
52  import java.util.LinkedList;
53  import java.util.Map;
54  import java.util.concurrent.ConcurrentHashMap;
55  import java.util.concurrent.ConcurrentLinkedQueue;
56  import java.util.concurrent.LinkedBlockingQueue;
57  
58  public class NioManager implements Runnable
59  {
60      
61      private static final int                                         MAX_OUTGOING_SOCKETS                             = 15;
62      private static final String                                      KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP = "KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP";
63      private static final String                                      KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE             = "KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE";
64      private static final String                                      KEY_METADATA_LAST_BUFFER_REMAINING               = "KEY_METADATA_LAST_BUFFER_REMAINING";
65      private static final long                                        SOCKET_CLOSE_TIMEOUT                             = 5000;
66      
67      private static final long                                        SOCKET_WRITE_TIMEOUT                             = 15000;
68      private static final String                                      KEY_METADATA_LAST_WRITE_TIMESTAMP                = "KEY_METADATA_LAST_WRITE_TIMESTAMP";
69      
70      private static NioManager                                        instance;
71      
72      private final ByteBuffer                                         readBuffer                                       = ByteBuffer.allocate(8192);
73      private final Map<SelectionKey, LinkedBlockingQueue<ByteBuffer>> outputBuffers                                    = new ConcurrentHashMap<SelectionKey, LinkedBlockingQueue<ByteBuffer>>();
74      private final Map<SelectionKey, NioReference<NioEndpoint>>       sockets                                          = new ConcurrentHashMap<SelectionKey, NioReference<NioEndpoint>>();
75      private final Collection<NioWaitingSocket>                       waitingSockets                                   = new ConcurrentLinkedQueue<NioWaitingSocket>();
76      private final Collection<SelectionKey>                           closeKeys                                        = new ConcurrentLinkedQueue<SelectionKey>();
77      private final Collection<SelectionKey>                           writeKeys                                        = new ConcurrentLinkedQueue<SelectionKey>();
78      private final Map<SelectionKey, NioWaitingSocket>                connectingSockets                                = new ConcurrentHashMap<SelectionKey, NioWaitingSocket>();
79      
80      private Selector                                                 selector;
81      private Thread                                                   nioThread                                        = null;
82      
83      public static synchronized NioManager instance()
84      {
85          if (instance == null)
86              instance = new NioManager();
87          if (instance.nioThread == null)
88              instance.start();
89          return instance;
90      }
91      
92      private NioManager()
93      {
94      }
95      
96      public static void reset() throws IOException
97      {
98          if (instance != null)
99              instance.stop();
100         instance = null;
101     }
102     
103     public void start()
104     {
105         try
106         {
107             this.selector = SelectorProvider.provider().openSelector();
108         }
109         catch (IOException e)
110         {
111             e.printStackTrace();
112             return;
113         }
114         this.nioThread = new Thread(this);
115         this.nioThread.start();
116     }
117     
118     public void stop() throws IOException
119     {
120         this.selector.close();
121     }
122     
123     public void run()
124     {
125         for (;;)
126         {
127             try
128             {
129                 
130                 if (!this.writeKeys.isEmpty())
131                 {
132                     synchronized (this.writeKeys)
133                     {
134                         for (SelectionKey key : this.writeKeys)
135                             if (key.isValid())
136                                 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
137                         this.writeKeys.clear();
138                     }
139                 }
140                 
141                 synchronized (this.selector)
142                 {
143                     
144                     // So the iterators below don't tell the whole truth, and it is right that a closeKey can trigger a waitingSocket
145                     // Therefore, since I ignore the call to wakeup to not block the thread in some circumstances, I need to make
146                     // this check here.
147                     if (this.waitingSockets.isEmpty() && this.closeKeys.isEmpty())
148                     {
149                         if ((this.connectingSockets.size() == 0) && (this.writeKeys.size() == 0))
150                             this.selector.select(10000); // Once every ten seconds is acceptable to wake up just in case a race would cause a deadlock.
151                         else
152                             this.selector.select(1000); // If sockets are connecting, wake up more often to check their timeouts.
153                     }
154                     else
155                     {
156                         // Do a very short timeout, in order to process the next set of sockets.
157                         this.selector.select(10);
158                     }
159                     
160                     Iterator<SelectionKey> selectedKeys = this.selector.selectedKeys().iterator();
161                     while (selectedKeys.hasNext())
162                     {
163                         SelectionKey key = selectedKeys.next();
164                         selectedKeys.remove();
165                         if (!key.isValid())
166                             continue;
167                         try
168                         {
169                             if (key.isReadable())
170                                 this.read(key);
171                             else if (key.isWritable())
172                                 this.write(key);
173                             else if (key.isAcceptable())
174                                 this.accept(key);
175                             else if (key.isConnectable())
176                                 this.connect(key);
177                         }
178                         catch (Throwable e)
179                         {
180                             
181                             // Some conditions are death conditions.
182                             if (e instanceof CoderMalfunctionError || e instanceof IOError || e instanceof ThreadDeath)
183                                 throw new RuntimeException(e);
184                             
185                             // For everything else, we shouldn't allow the NioManager to be killed.
186                             //e.printStackTrace();
187                             this.close(key);
188                             NioEndpoint ne = this.sockets.get(key).get();
189                             if (ne != null)
190                                 ne.notifyError(e instanceof Exception ? (Exception) e : new Exception(e));
191                         }
192                     }
193                     
194                 }
195                 
196                 // Add write ops, if applicable
197                 //              synchronized(outputBuffers)
198                 //              {
199                 //                  Iterator<SelectionKey> keys = outputBuffers.keySet().iterator();
200                 //                  while(keys.hasNext())
201                 //                  {
202                 //                      SelectionKey key = keys.next();
203                 //                      if (!key.isValid())
204                 //                          continue;
205                 //
206                 //                      ArrayList<ByteBuffer> ob = outputBuffers.get(key);
207                 //
208                 //                      if (!ob.isEmpty())
209                 //                          key.interestOps(// SelectionKey.OP_READ |  (NOTE: Not mixing read & write in first revision)
210                 //                                  SelectionKey.OP_WRITE);
211                 //                  }
212                 //              }
213                 
214                 // Add waiting socket, if applicable
215                 if (!this.waitingSockets.isEmpty())
216                 {
217                     synchronized (this.waitingSockets)
218                     {
219                         Iterator<NioWaitingSocket> it = this.waitingSockets.iterator();
220                         while (it.hasNext())
221                         {
222                             NioWaitingSocket waitingSocket = it.next();
223                             it.remove();
224                             boolean accept = (waitingSocket.getOps() & SelectionKey.OP_ACCEPT) != 0;
225                             if (accept && (this.connectingSockets.size() > MAX_OUTGOING_SOCKETS))
226                                 continue;
227                             NioEndpoint endpoint = waitingSocket.getSocket();
228                             try
229                             {
230                                 SelectionKey key = waitingSocket.createChannel().register(this.selector, waitingSocket.getOps());
231                                 endpoint.setNioManager(this, key);
232                                 this.outputBuffers.put(key, new LinkedBlockingQueue<ByteBuffer>());
233                                 this.sockets.put(key, new NioReference<NioEndpoint>(endpoint));
234                                 if (accept)
235                                     endpoint.notifyConnected();
236                                 else
237                                     this.connectingSockets.put(key, waitingSocket);
238                             }
239                             catch (Exception e)
240                             {
241                                 endpoint.notifyError(e);
242                             }
243                             finally
244                             {
245                                 synchronized (waitingSocket)
246                                 {
247                                     waitingSocket.setDone(true);
248                                     waitingSocket.notifyAll();
249                                 }
250                             }
251                         }
252                     }
253                 }
254                 
255                 if (!this.connectingSockets.isEmpty())
256                 {
257                     // Check timeouts on sockets that are connecting.
258                     Iterator<NioWaitingSocket> it = this.connectingSockets.values().iterator();
259                     while (it.hasNext())
260                     {
261                         NioWaitingSocket socket = it.next();
262                         if (System.currentTimeMillis() > socket.getTimeoutTime())
263                         {
264                             it.remove();
265                             socket.getChannel().close();
266                             // timeout = error, shall we say?
267                             socket.getSocket().notifyError(new IOException("Timed out while trying to connect"));
268                         }
269                     }
270                 }
271                 
272                 if (!this.closeKeys.isEmpty())
273                 {
274                     // Closed notifications will wait
275                     LinkedList<NioEndpoint> closedEndpoints = new LinkedList<NioEndpoint>();
276                     synchronized (this.closeKeys)
277                     {
278                         Iterator<SelectionKey> it = this.closeKeys.iterator();
279                         while (it.hasNext())
280                         {
281                             SelectionKey key = it.next();
282                             Collection<ByteBuffer> ob = this.outputBuffers.get(key);
283                             
284                             HashMap<String, Object> metadata = this.getMetadata(key);
285                             
286                             /*
287                              * This case covers the case where we are flushing data.
288                              * This is verified every second, but in some cases, sockets can get
289                              * stuck in this case. What we'll do is verify no data was sent over a certain period
290                              * and, if true, remove the socket anyway
291                              */
292                             if ((ob != null) && !ob.isEmpty() && key.channel().isOpen())
293                             {
294                                 
295                                 Long lastChange = (Long) metadata.get(KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP);
296                                 Integer lastSize = (Integer) metadata.get(KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE);
297                                 Integer lastBufRemaining = (Integer) metadata.get(KEY_METADATA_LAST_BUFFER_REMAINING);
298                                 
299                                 ByteBuffer first = ob.iterator().next();
300                                 
301                                 int size = ob.size();
302                                 int remaining = first.remaining();
303                                 long cTime = System.currentTimeMillis();
304                                 
305                                 if (lastChange == null)
306                                     lastChange = cTime;
307                                 
308                                 if (lastSize == null)
309                                     lastSize = size;
310                                 
311                                 if (lastBufRemaining == null)
312                                     lastBufRemaining = remaining;
313                                 
314                                 // Now for some verification
315                                 
316                                 long endTime = lastChange.longValue() + SOCKET_CLOSE_TIMEOUT;
317                                 boolean doContinue = true;
318                                 
319                                 // We're within the timeout window!!!
320                                 if (cTime >= endTime)
321                                 {
322                                     if (size == lastSize.intValue())
323                                     {
324                                         // Nest the ifs so it's easier to debug
325                                         
326                                         if (remaining == lastBufRemaining)
327                                         {
328                                             // Same number of buffers, same index, time is
329                                             doContinue = false;
330                                         }
331                                     }
332                                 }
333                                 
334                                 // Update the params. Technically, this means we'll only perform the comparison after 30 seconds,
335                                 // leading to an effective timeout time of 59 seconds (or 2*socket_close_timeout) but this way simplifies
336                                 // the calculations necessary
337                                 if (doContinue)
338                                 {
339                                     metadata.put(KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP, cTime);
340                                     metadata.put(KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE, size);
341                                     metadata.put(KEY_METADATA_LAST_BUFFER_REMAINING, remaining);
342                                     continue;
343                                 }
344                             }
345                             
346                             NioEndpoint socket = null;
347                             try
348                             {
349                                 socket = this.sockets.remove(key).get();
350                             }
351                             catch (Exception e)
352                             {
353                             }
354                             
355                             try
356                             {
357                                 key.channel().close();
358                             }
359                             catch (IOException e)
360                             {
361                                 e.printStackTrace();
362                             }
363                             
364                             key.cancel();
365                             
366                             synchronized (this.outputBuffers)
367                             {
368                                 this.outputBuffers.remove(key);
369                             }
370                             
371                             it.remove();
372                             
373                             if (socket != null)
374                                 closedEndpoints.add(socket);
375                         }
376                     }
377                     
378                     if (!this.writeKeys.isEmpty())
379                     {
380                         synchronized (this.writeKeys)
381                         {
382                             long now = System.currentTimeMillis();
383                             for (SelectionKey key : this.writeKeys)
384                             {
385                                 HashMap<String, Object> metadata = this.getMetadata(key);
386                                 Long lastWrite = (Long) metadata.get(KEY_METADATA_LAST_WRITE_TIMESTAMP);
387                                 
388                                 if (lastWrite == null)
389                                     lastWrite = now;
390                                 
391                                 if ((lastWrite.longValue() + SOCKET_WRITE_TIMEOUT) < now)
392                                     this.close(key);
393                             }
394                         }
395                     }
396                     
397                     // Fire off closed notifications AFTER the synchronized block
398                     for (NioEndpoint socket : closedEndpoints)
399                     {
400                         try
401                         {
402                             socket.notifyClosed();
403                         }
404                         catch (Exception e)
405                         {
406                             e.printStackTrace();
407                         }
408                     }
409                 }
410             }
411             catch (ClosedSelectorException e)
412             {
413                 // This just means we were forcefully closed
414                 this.selector = null;
415                 this.nioThread = null;
416                 this.outputBuffers.clear();
417                 this.sockets.clear();
418                 this.waitingSockets.clear();
419                 this.closeKeys.clear();
420                 this.writeKeys.clear();
421                 this.connectingSockets.clear();
422                 return;
423             }
424             catch (Exception e)
425             {
426                 e.printStackTrace();
427             }
428         }
429     }
430     
431     @SuppressWarnings("unchecked")
432     private HashMap<String, Object> getMetadata(SelectionKey key)
433     {
434         HashMap<String, Object> attachment = (HashMap<String, Object>) key.attachment();
435         
436         if (attachment == null)
437         {
438             attachment = new HashMap<String, Object>();
439             key.attach(attachment);
440         }
441         
442         return attachment;
443     }
444     
445     private void connect(SelectionKey key) throws IOException
446     {
447         IOException error = null;
448         try
449         {
450             if (!((SocketChannel) key.channel()).finishConnect())
451                 error = new IOException("Failed to connect");
452         }
453         catch (IOException e)
454         {
455             error = e;
456             // Try to recover from the error
457             if ("Connection reset by peer".equals(e.getMessage()))
458             {
459                 NioWaitingSocket nws = this.connectingSockets.get(key);
460                 if (nws != null)
461                 {
462                     if (nws.getRetries() > 0)
463                     {
464                         synchronized (this.waitingSockets)
465                         {
466                             this.waitingSockets.add(nws);
467                         }
468                         nws.setRetries(nws.getRetries() - 1);
469                         return;
470                     }
471                 }
472             }
473         }
474         this.connectingSockets.remove(key);
475         if (error == null)
476         {
477             NioEndpoint ne = this.sockets.get(key).get();
478             if (ne != null)
479                 ne.notifyConnected();
480             key.interestOps(SelectionKey.OP_READ);
481         }
482         else
483             throw error;
484     }
485     
486     private void write(SelectionKey key) throws IOException
487     {
488         SocketChannel socketChannel = (SocketChannel) key.channel();
489         try
490         {
491             LinkedBlockingQueue<ByteBuffer> queue = this.outputBuffers.get(key);
492             // Write until there's not more data ...
493             while (!queue.isEmpty())
494             {
495                 ByteBuffer buf = queue.peek();
496                 
497                 int wrote = socketChannel.write(buf);
498                 if ((wrote == 0) || (buf.remaining() > 0))
499                 {
500                     /*
501                         System.out.print("### = "+buf.remaining()+" ("+wrote+") - [");
502                         byte[] b = buf.array();
503                         for(int i=0;i<b.length;i++)
504                             System.out.print((char)b[i]);
505                         System.out.println("]");
506                      */
507                     // ... or the socket's buffer fills up
508                     break;
509                 }
510                 queue.poll();
511             }
512             if (queue.isEmpty())
513             {
514                 // We wrote away all data, so we're no longer interested
515                 // in writing on this socket. Switch back to waiting for
516                 // data.
517                 synchronized (this.writeKeys)
518                 {
519                     // Wrap in synchronized block prevents us from overwriting the result from the main run loop that is clearing writeKeys.
520                     if (queue.isEmpty())
521                         key.interestOps(SelectionKey.OP_READ);
522                 }
523             }
524             
525             HashMap<String, Object> metadata = this.getMetadata(key);
526             metadata.put(KEY_METADATA_LAST_WRITE_TIMESTAMP, Long.valueOf(System.currentTimeMillis()));
527         }
528         catch (IOException e)
529         {
530             this.outputBuffers.remove(key);
531             throw e;
532         }
533     }
534     
535     private void read(SelectionKey key) throws IOException
536     {
537         SocketChannel socketChannel = (SocketChannel) key.channel();
538         this.readBuffer.clear();
539         int numRead = socketChannel.read(this.readBuffer);
540         if (numRead <= -1)
541         {
542             // This key will never flush anything
543             LinkedBlockingQueue<ByteBuffer> queue = this.outputBuffers.get(key);
544             queue.clear();
545             this.close(key);
546         }
547         else
548         {
549             NioEndpoint socket = this.sockets.get(key).get();
550             if (socket != null)
551                 socket.receive(this.readBuffer.array(), numRead);
552         }
553     }
554     
555     protected void close(SelectionKey key)
556     {
557         if (key == null)
558             return;
559         synchronized (this.closeKeys)
560         {
561             this.closeKeys.add(key);
562         }
563         
564         if (Thread.currentThread() != this.nioThread)
565             this.selector.wakeup();
566     }
567     
568     private void accept(SelectionKey key) throws IOException
569     {
570         NioEndpoint endpoint = this.sockets.get(key).get();
571         
572         if (endpoint == null)
573         {
574             key.cancel();
575             return;
576         }
577         
578         NioEndpoint newPoint = endpoint.createAcceptChild();
579         SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
580         channel.configureBlocking(false);
581         SelectionKey channelKey = channel.register(this.selector, SelectionKey.OP_READ);
582         newPoint.setNioManager(this, channelKey);
583         this.sockets.put(channelKey, new NioReference<NioEndpoint>(newPoint));
584         this.outputBuffers.put(channelKey, new LinkedBlockingQueue<ByteBuffer>());
585         endpoint.notifyAccepted(newPoint);
586     }
587     
588     protected void send(SelectionKey key, byte[] data, int size) throws IOException
589     {
590         if (!key.isValid())
591             throw new IOException("Transport is no longer available");
592         if (this.closeKeys.contains(key))
593             throw new IOException("Cannot send to a closed socket.");
594         // Wrap the data in a buffer outside the synchronized to not block the main thread.
595         byte[] mData = new byte[size];
596         System.arraycopy(data, 0, mData, 0, size);
597         ByteBuffer byteBuffer = ByteBuffer.wrap(mData);
598         
599         this.outputBuffers.get(key).add(byteBuffer);
600         
601         synchronized (this.writeKeys)
602         {
603             this.writeKeys.add(key);
604         }
605         
606         if (Thread.currentThread() != this.nioThread)
607             this.selector.wakeup();
608     }
609     
610     public void open(NioEndpoint socket, InetSocketAddress remote)
611     {
612         this.open(socket, remote, false);
613     }
614     
615     public void open(NioEndpoint socket, InetSocketAddress remote, boolean wait)
616     {
617         NioWaitingSocket waitingSocket = new NioWaitingSocket(socket, remote, false, SelectionKey.OP_CONNECT);
618         synchronized (this.waitingSockets)
619         {
620             this.waitingSockets.add(waitingSocket);
621         }
622         
623         if (Thread.currentThread() != this.nioThread)
624             this.selector.wakeup();
625         if (wait && !this.nioThread.equals(Thread.currentThread()))
626         {
627             synchronized (waitingSocket)
628             {
629                 while (!waitingSocket.isDone())
630                 {
631                     try
632                     {
633                         this.selector.wakeup();
634                         waitingSocket.wait(1000);
635                     }
636                     catch (InterruptedException e)
637                     {
638                         e.printStackTrace();
639                         return;
640                     }
641                 }
642             }
643         }
644     }
645     
646     public void bind(NioEndpoint endpoint, SocketAddress local, boolean wait)
647     {
648         if (!endpoint.canAccept())
649             throw new IllegalArgumentException("Received endpoint of type " + endpoint.getClass() + " that cannot accept");
650         NioWaitingSocket waitingSocket = new NioWaitingSocket(endpoint, local, true, SelectionKey.OP_ACCEPT);
651         synchronized (this.waitingSockets)
652         {
653             this.waitingSockets.add(waitingSocket);
654             
655             if (Thread.currentThread() != this.nioThread)
656                 this.selector.wakeup();
657         }
658         if (wait && !this.nioThread.equals(Thread.currentThread()))
659         {
660             synchronized (waitingSocket)
661             {
662                 while (!waitingSocket.isDone())
663                 {
664                     try
665                     {
666                         waitingSocket.wait(1000);
667                         this.selector.wakeup();
668                     }
669                     catch (InterruptedException e)
670                     {
671                         e.printStackTrace();
672                     }
673                 }
674             }
675         }
676     }
677     
678     class NioReference<T extends NioEndpoint> extends WeakReference<T>
679     {
680         
681         public NioReference(T referent)
682         {
683             super(referent);
684         }
685         
686         public void clear()
687         {
688             T ref = this.get();
689             if (ref != null)
690                 NioManager.this.close(ref.getKey());
691             super.clear();
692         }
693         
694     }
695     
696 }