1 package com.ericsson.research.transport;
2
3 /*
4 * ##_BEGIN_LICENSE_##
5 * Transport Abstraction Package (trap)
6 * ----------
7 * Copyright (C) 2014 Ericsson AB
8 * ----------
9 * Redistribution and use in source and binary forms, with or without modification,
10 * are permitted provided that the following conditions are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright notice, this
13 * list of conditions and the following disclaimer.
14 *
15 * 2. Redistributions in binary form must reproduce the above copyright notice,
16 * this list of conditions and the following disclaimer in the documentation
17 * and/or other materials provided with the distribution.
18 *
19 * 3. Neither the name of the Ericsson AB nor the names of its contributors
20 * may be used to endorse or promote products derived from this software without
21 * specific prior written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
24 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
25 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
26 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
27 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
30 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
31 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
32 * OF THE POSSIBILITY OF SUCH DAMAGE.
33 * ##_END_LICENSE_##
34 */
35
36 import java.io.IOError;
37 import java.io.IOException;
38 import java.lang.ref.WeakReference;
39 import java.net.InetSocketAddress;
40 import java.net.SocketAddress;
41 import java.nio.ByteBuffer;
42 import java.nio.channels.ClosedSelectorException;
43 import java.nio.channels.SelectionKey;
44 import java.nio.channels.Selector;
45 import java.nio.channels.ServerSocketChannel;
46 import java.nio.channels.SocketChannel;
47 import java.nio.channels.spi.SelectorProvider;
48 import java.nio.charset.CoderMalfunctionError;
49 import java.util.Collection;
50 import java.util.HashMap;
51 import java.util.Iterator;
52 import java.util.LinkedList;
53 import java.util.Map;
54 import java.util.concurrent.ConcurrentHashMap;
55 import java.util.concurrent.ConcurrentLinkedQueue;
56 import java.util.concurrent.LinkedBlockingQueue;
57
58 public class NioManager implements Runnable
59 {
60
61 private static final int MAX_OUTGOING_SOCKETS = 15;
62 private static final String KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP = "KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP";
63 private static final String KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE = "KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE";
64 private static final String KEY_METADATA_LAST_BUFFER_REMAINING = "KEY_METADATA_LAST_BUFFER_REMAINING";
65 private static final long SOCKET_CLOSE_TIMEOUT = 5000;
66
67 private static final long SOCKET_WRITE_TIMEOUT = 15000;
68 private static final String KEY_METADATA_LAST_WRITE_TIMESTAMP = "KEY_METADATA_LAST_WRITE_TIMESTAMP";
69
70 private static NioManager instance;
71
72 private final ByteBuffer readBuffer = ByteBuffer.allocate(8192);
73 private final Map<SelectionKey, LinkedBlockingQueue<ByteBuffer>> outputBuffers = new ConcurrentHashMap<SelectionKey, LinkedBlockingQueue<ByteBuffer>>();
74 private final Map<SelectionKey, NioReference<NioEndpoint>> sockets = new ConcurrentHashMap<SelectionKey, NioReference<NioEndpoint>>();
75 private final Collection<NioWaitingSocket> waitingSockets = new ConcurrentLinkedQueue<NioWaitingSocket>();
76 private final Collection<SelectionKey> closeKeys = new ConcurrentLinkedQueue<SelectionKey>();
77 private final Collection<SelectionKey> writeKeys = new ConcurrentLinkedQueue<SelectionKey>();
78 private final Map<SelectionKey, NioWaitingSocket> connectingSockets = new ConcurrentHashMap<SelectionKey, NioWaitingSocket>();
79
80 private Selector selector;
81 private Thread nioThread = null;
82
83 public static synchronized NioManager instance()
84 {
85 if (instance == null)
86 instance = new NioManager();
87 if (instance.nioThread == null)
88 instance.start();
89 return instance;
90 }
91
92 private NioManager()
93 {
94 }
95
96 public static void reset() throws IOException
97 {
98 if (instance != null)
99 instance.stop();
100 instance = null;
101 }
102
103 public void start()
104 {
105 try
106 {
107 this.selector = SelectorProvider.provider().openSelector();
108 }
109 catch (IOException e)
110 {
111 e.printStackTrace();
112 return;
113 }
114 this.nioThread = new Thread(this);
115 this.nioThread.start();
116 }
117
118 public void stop() throws IOException
119 {
120 this.selector.close();
121 }
122
123 public void run()
124 {
125 for (;;)
126 {
127 try
128 {
129
130 if (!this.writeKeys.isEmpty())
131 {
132 synchronized (this.writeKeys)
133 {
134 for (SelectionKey key : this.writeKeys)
135 if (key.isValid())
136 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
137 this.writeKeys.clear();
138 }
139 }
140
141 synchronized (this.selector)
142 {
143
144 // So the iterators below don't tell the whole truth, and it is right that a closeKey can trigger a waitingSocket
145 // Therefore, since I ignore the call to wakeup to not block the thread in some circumstances, I need to make
146 // this check here.
147 if (this.waitingSockets.isEmpty() && this.closeKeys.isEmpty())
148 {
149 if ((this.connectingSockets.size() == 0) && (this.writeKeys.size() == 0))
150 this.selector.select(10000); // Once every ten seconds is acceptable to wake up just in case a race would cause a deadlock.
151 else
152 this.selector.select(1000); // If sockets are connecting, wake up more often to check their timeouts.
153 }
154 else
155 {
156 // Do a very short timeout, in order to process the next set of sockets.
157 this.selector.select(10);
158 }
159
160 Iterator<SelectionKey> selectedKeys = this.selector.selectedKeys().iterator();
161 while (selectedKeys.hasNext())
162 {
163 SelectionKey key = selectedKeys.next();
164 selectedKeys.remove();
165 if (!key.isValid())
166 continue;
167 try
168 {
169 if (key.isReadable())
170 this.read(key);
171 else if (key.isWritable())
172 this.write(key);
173 else if (key.isAcceptable())
174 this.accept(key);
175 else if (key.isConnectable())
176 this.connect(key);
177 }
178 catch (Throwable e)
179 {
180
181 // Some conditions are death conditions.
182 if (e instanceof CoderMalfunctionError || e instanceof IOError || e instanceof ThreadDeath)
183 throw new RuntimeException(e);
184
185 // For everything else, we shouldn't allow the NioManager to be killed.
186 //e.printStackTrace();
187 this.close(key);
188 NioEndpoint ne = this.sockets.get(key).get();
189 if (ne != null)
190 ne.notifyError(e instanceof Exception ? (Exception) e : new Exception(e));
191 }
192 }
193
194 }
195
196 // Add write ops, if applicable
197 // synchronized(outputBuffers)
198 // {
199 // Iterator<SelectionKey> keys = outputBuffers.keySet().iterator();
200 // while(keys.hasNext())
201 // {
202 // SelectionKey key = keys.next();
203 // if (!key.isValid())
204 // continue;
205 //
206 // ArrayList<ByteBuffer> ob = outputBuffers.get(key);
207 //
208 // if (!ob.isEmpty())
209 // key.interestOps(// SelectionKey.OP_READ | (NOTE: Not mixing read & write in first revision)
210 // SelectionKey.OP_WRITE);
211 // }
212 // }
213
214 // Add waiting socket, if applicable
215 if (!this.waitingSockets.isEmpty())
216 {
217 synchronized (this.waitingSockets)
218 {
219 Iterator<NioWaitingSocket> it = this.waitingSockets.iterator();
220 while (it.hasNext())
221 {
222 NioWaitingSocket waitingSocket = it.next();
223 it.remove();
224 boolean accept = (waitingSocket.getOps() & SelectionKey.OP_ACCEPT) != 0;
225 if (accept && (this.connectingSockets.size() > MAX_OUTGOING_SOCKETS))
226 continue;
227 NioEndpoint endpoint = waitingSocket.getSocket();
228 try
229 {
230 SelectionKey key = waitingSocket.createChannel().register(this.selector, waitingSocket.getOps());
231 endpoint.setNioManager(this, key);
232 this.outputBuffers.put(key, new LinkedBlockingQueue<ByteBuffer>());
233 this.sockets.put(key, new NioReference<NioEndpoint>(endpoint));
234 if (accept)
235 endpoint.notifyConnected();
236 else
237 this.connectingSockets.put(key, waitingSocket);
238 }
239 catch (Exception e)
240 {
241 endpoint.notifyError(e);
242 }
243 finally
244 {
245 synchronized (waitingSocket)
246 {
247 waitingSocket.setDone(true);
248 waitingSocket.notifyAll();
249 }
250 }
251 }
252 }
253 }
254
255 if (!this.connectingSockets.isEmpty())
256 {
257 // Check timeouts on sockets that are connecting.
258 Iterator<NioWaitingSocket> it = this.connectingSockets.values().iterator();
259 while (it.hasNext())
260 {
261 NioWaitingSocket socket = it.next();
262 if (System.currentTimeMillis() > socket.getTimeoutTime())
263 {
264 it.remove();
265 socket.getChannel().close();
266 // timeout = error, shall we say?
267 socket.getSocket().notifyError(new IOException("Timed out while trying to connect"));
268 }
269 }
270 }
271
272 if (!this.closeKeys.isEmpty())
273 {
274 // Closed notifications will wait
275 LinkedList<NioEndpoint> closedEndpoints = new LinkedList<NioEndpoint>();
276 synchronized (this.closeKeys)
277 {
278 Iterator<SelectionKey> it = this.closeKeys.iterator();
279 while (it.hasNext())
280 {
281 SelectionKey key = it.next();
282 Collection<ByteBuffer> ob = this.outputBuffers.get(key);
283
284 HashMap<String, Object> metadata = this.getMetadata(key);
285
286 /*
287 * This case covers the case where we are flushing data.
288 * This is verified every second, but in some cases, sockets can get
289 * stuck in this case. What we'll do is verify no data was sent over a certain period
290 * and, if true, remove the socket anyway
291 */
292 if ((ob != null) && !ob.isEmpty() && key.channel().isOpen())
293 {
294
295 Long lastChange = (Long) metadata.get(KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP);
296 Integer lastSize = (Integer) metadata.get(KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE);
297 Integer lastBufRemaining = (Integer) metadata.get(KEY_METADATA_LAST_BUFFER_REMAINING);
298
299 ByteBuffer first = ob.iterator().next();
300
301 int size = ob.size();
302 int remaining = first.remaining();
303 long cTime = System.currentTimeMillis();
304
305 if (lastChange == null)
306 lastChange = cTime;
307
308 if (lastSize == null)
309 lastSize = size;
310
311 if (lastBufRemaining == null)
312 lastBufRemaining = remaining;
313
314 // Now for some verification
315
316 long endTime = lastChange.longValue() + SOCKET_CLOSE_TIMEOUT;
317 boolean doContinue = true;
318
319 // We're within the timeout window!!!
320 if (cTime >= endTime)
321 {
322 if (size == lastSize.intValue())
323 {
324 // Nest the ifs so it's easier to debug
325
326 if (remaining == lastBufRemaining)
327 {
328 // Same number of buffers, same index, time is
329 doContinue = false;
330 }
331 }
332 }
333
334 // Update the params. Technically, this means we'll only perform the comparison after 30 seconds,
335 // leading to an effective timeout time of 59 seconds (or 2*socket_close_timeout) but this way simplifies
336 // the calculations necessary
337 if (doContinue)
338 {
339 metadata.put(KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP, cTime);
340 metadata.put(KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE, size);
341 metadata.put(KEY_METADATA_LAST_BUFFER_REMAINING, remaining);
342 continue;
343 }
344 }
345
346 NioEndpoint socket = null;
347 try
348 {
349 socket = this.sockets.remove(key).get();
350 }
351 catch (Exception e)
352 {
353 }
354
355 try
356 {
357 key.channel().close();
358 }
359 catch (IOException e)
360 {
361 e.printStackTrace();
362 }
363
364 key.cancel();
365
366 synchronized (this.outputBuffers)
367 {
368 this.outputBuffers.remove(key);
369 }
370
371 it.remove();
372
373 if (socket != null)
374 closedEndpoints.add(socket);
375 }
376 }
377
378 if (!this.writeKeys.isEmpty())
379 {
380 synchronized (this.writeKeys)
381 {
382 long now = System.currentTimeMillis();
383 for (SelectionKey key : this.writeKeys)
384 {
385 HashMap<String, Object> metadata = this.getMetadata(key);
386 Long lastWrite = (Long) metadata.get(KEY_METADATA_LAST_WRITE_TIMESTAMP);
387
388 if (lastWrite == null)
389 lastWrite = now;
390
391 if ((lastWrite.longValue() + SOCKET_WRITE_TIMEOUT) < now)
392 this.close(key);
393 }
394 }
395 }
396
397 // Fire off closed notifications AFTER the synchronized block
398 for (NioEndpoint socket : closedEndpoints)
399 {
400 try
401 {
402 socket.notifyClosed();
403 }
404 catch (Exception e)
405 {
406 e.printStackTrace();
407 }
408 }
409 }
410 }
411 catch (ClosedSelectorException e)
412 {
413 // This just means we were forcefully closed
414 this.selector = null;
415 this.nioThread = null;
416 this.outputBuffers.clear();
417 this.sockets.clear();
418 this.waitingSockets.clear();
419 this.closeKeys.clear();
420 this.writeKeys.clear();
421 this.connectingSockets.clear();
422 return;
423 }
424 catch (Exception e)
425 {
426 e.printStackTrace();
427 }
428 }
429 }
430
431 @SuppressWarnings("unchecked")
432 private HashMap<String, Object> getMetadata(SelectionKey key)
433 {
434 HashMap<String, Object> attachment = (HashMap<String, Object>) key.attachment();
435
436 if (attachment == null)
437 {
438 attachment = new HashMap<String, Object>();
439 key.attach(attachment);
440 }
441
442 return attachment;
443 }
444
445 private void connect(SelectionKey key) throws IOException
446 {
447 IOException error = null;
448 try
449 {
450 if (!((SocketChannel) key.channel()).finishConnect())
451 error = new IOException("Failed to connect");
452 }
453 catch (IOException e)
454 {
455 error = e;
456 // Try to recover from the error
457 if ("Connection reset by peer".equals(e.getMessage()))
458 {
459 NioWaitingSocket nws = this.connectingSockets.get(key);
460 if (nws != null)
461 {
462 if (nws.getRetries() > 0)
463 {
464 synchronized (this.waitingSockets)
465 {
466 this.waitingSockets.add(nws);
467 }
468 nws.setRetries(nws.getRetries() - 1);
469 return;
470 }
471 }
472 }
473 }
474 this.connectingSockets.remove(key);
475 if (error == null)
476 {
477 NioEndpoint ne = this.sockets.get(key).get();
478 if (ne != null)
479 ne.notifyConnected();
480 key.interestOps(SelectionKey.OP_READ);
481 }
482 else
483 throw error;
484 }
485
486 private void write(SelectionKey key) throws IOException
487 {
488 SocketChannel socketChannel = (SocketChannel) key.channel();
489 try
490 {
491 LinkedBlockingQueue<ByteBuffer> queue = this.outputBuffers.get(key);
492 // Write until there's not more data ...
493 while (!queue.isEmpty())
494 {
495 ByteBuffer buf = queue.peek();
496
497 int wrote = socketChannel.write(buf);
498 if ((wrote == 0) || (buf.remaining() > 0))
499 {
500 /*
501 System.out.print("### = "+buf.remaining()+" ("+wrote+") - [");
502 byte[] b = buf.array();
503 for(int i=0;i<b.length;i++)
504 System.out.print((char)b[i]);
505 System.out.println("]");
506 */
507 // ... or the socket's buffer fills up
508 break;
509 }
510 queue.poll();
511 }
512 if (queue.isEmpty())
513 {
514 // We wrote away all data, so we're no longer interested
515 // in writing on this socket. Switch back to waiting for
516 // data.
517 synchronized (this.writeKeys)
518 {
519 // Wrap in synchronized block prevents us from overwriting the result from the main run loop that is clearing writeKeys.
520 if (queue.isEmpty())
521 key.interestOps(SelectionKey.OP_READ);
522 }
523 }
524
525 HashMap<String, Object> metadata = this.getMetadata(key);
526 metadata.put(KEY_METADATA_LAST_WRITE_TIMESTAMP, Long.valueOf(System.currentTimeMillis()));
527 }
528 catch (IOException e)
529 {
530 this.outputBuffers.remove(key);
531 throw e;
532 }
533 }
534
535 private void read(SelectionKey key) throws IOException
536 {
537 SocketChannel socketChannel = (SocketChannel) key.channel();
538 this.readBuffer.clear();
539 int numRead = socketChannel.read(this.readBuffer);
540 if (numRead <= -1)
541 {
542 // This key will never flush anything
543 LinkedBlockingQueue<ByteBuffer> queue = this.outputBuffers.get(key);
544 queue.clear();
545 this.close(key);
546 }
547 else
548 {
549 NioEndpoint socket = this.sockets.get(key).get();
550 if (socket != null)
551 socket.receive(this.readBuffer.array(), numRead);
552 }
553 }
554
555 protected void close(SelectionKey key)
556 {
557 if (key == null)
558 return;
559 synchronized (this.closeKeys)
560 {
561 this.closeKeys.add(key);
562 }
563
564 if (Thread.currentThread() != this.nioThread)
565 this.selector.wakeup();
566 }
567
568 private void accept(SelectionKey key) throws IOException
569 {
570 NioEndpoint endpoint = this.sockets.get(key).get();
571
572 if (endpoint == null)
573 {
574 key.cancel();
575 return;
576 }
577
578 NioEndpoint newPoint = endpoint.createAcceptChild();
579 SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
580 channel.configureBlocking(false);
581 SelectionKey channelKey = channel.register(this.selector, SelectionKey.OP_READ);
582 newPoint.setNioManager(this, channelKey);
583 this.sockets.put(channelKey, new NioReference<NioEndpoint>(newPoint));
584 this.outputBuffers.put(channelKey, new LinkedBlockingQueue<ByteBuffer>());
585 endpoint.notifyAccepted(newPoint);
586 }
587
588 protected void send(SelectionKey key, byte[] data, int size) throws IOException
589 {
590 if (!key.isValid())
591 throw new IOException("Transport is no longer available");
592 if (this.closeKeys.contains(key))
593 throw new IOException("Cannot send to a closed socket.");
594 // Wrap the data in a buffer outside the synchronized to not block the main thread.
595 byte[] mData = new byte[size];
596 System.arraycopy(data, 0, mData, 0, size);
597 ByteBuffer byteBuffer = ByteBuffer.wrap(mData);
598
599 this.outputBuffers.get(key).add(byteBuffer);
600
601 synchronized (this.writeKeys)
602 {
603 this.writeKeys.add(key);
604 }
605
606 if (Thread.currentThread() != this.nioThread)
607 this.selector.wakeup();
608 }
609
610 public void open(NioEndpoint socket, InetSocketAddress remote)
611 {
612 this.open(socket, remote, false);
613 }
614
615 public void open(NioEndpoint socket, InetSocketAddress remote, boolean wait)
616 {
617 NioWaitingSocket waitingSocket = new NioWaitingSocket(socket, remote, false, SelectionKey.OP_CONNECT);
618 synchronized (this.waitingSockets)
619 {
620 this.waitingSockets.add(waitingSocket);
621 }
622
623 if (Thread.currentThread() != this.nioThread)
624 this.selector.wakeup();
625 if (wait && !this.nioThread.equals(Thread.currentThread()))
626 {
627 synchronized (waitingSocket)
628 {
629 while (!waitingSocket.isDone())
630 {
631 try
632 {
633 this.selector.wakeup();
634 waitingSocket.wait(1000);
635 }
636 catch (InterruptedException e)
637 {
638 e.printStackTrace();
639 return;
640 }
641 }
642 }
643 }
644 }
645
646 public void bind(NioEndpoint endpoint, SocketAddress local, boolean wait)
647 {
648 if (!endpoint.canAccept())
649 throw new IllegalArgumentException("Received endpoint of type " + endpoint.getClass() + " that cannot accept");
650 NioWaitingSocket waitingSocket = new NioWaitingSocket(endpoint, local, true, SelectionKey.OP_ACCEPT);
651 synchronized (this.waitingSockets)
652 {
653 this.waitingSockets.add(waitingSocket);
654
655 if (Thread.currentThread() != this.nioThread)
656 this.selector.wakeup();
657 }
658 if (wait && !this.nioThread.equals(Thread.currentThread()))
659 {
660 synchronized (waitingSocket)
661 {
662 while (!waitingSocket.isDone())
663 {
664 try
665 {
666 waitingSocket.wait(1000);
667 this.selector.wakeup();
668 }
669 catch (InterruptedException e)
670 {
671 e.printStackTrace();
672 }
673 }
674 }
675 }
676 }
677
678 class NioReference<T extends NioEndpoint> extends WeakReference<T>
679 {
680
681 public NioReference(T referent)
682 {
683 super(referent);
684 }
685
686 public void clear()
687 {
688 T ref = this.get();
689 if (ref != null)
690 NioManager.this.close(ref.getKey());
691 super.clear();
692 }
693
694 }
695
696 }