1 package com.ericsson.research.transport;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 import java.io.IOError;
37 import java.io.IOException;
38 import java.lang.ref.WeakReference;
39 import java.net.InetSocketAddress;
40 import java.net.SocketAddress;
41 import java.nio.ByteBuffer;
42 import java.nio.channels.ClosedSelectorException;
43 import java.nio.channels.SelectionKey;
44 import java.nio.channels.Selector;
45 import java.nio.channels.ServerSocketChannel;
46 import java.nio.channels.SocketChannel;
47 import java.nio.channels.spi.SelectorProvider;
48 import java.nio.charset.CoderMalfunctionError;
49 import java.util.Collection;
50 import java.util.HashMap;
51 import java.util.Iterator;
52 import java.util.LinkedList;
53 import java.util.Map;
54 import java.util.concurrent.ConcurrentHashMap;
55 import java.util.concurrent.ConcurrentLinkedQueue;
56 import java.util.concurrent.LinkedBlockingQueue;
57
58 public class NioManager implements Runnable
59 {
60
61 private static final int MAX_OUTGOING_SOCKETS = 15;
62 private static final String KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP = "KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP";
63 private static final String KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE = "KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE";
64 private static final String KEY_METADATA_LAST_BUFFER_REMAINING = "KEY_METADATA_LAST_BUFFER_REMAINING";
65 private static final long SOCKET_CLOSE_TIMEOUT = 5000;
66
67 private static final long SOCKET_WRITE_TIMEOUT = 15000;
68 private static final String KEY_METADATA_LAST_WRITE_TIMESTAMP = "KEY_METADATA_LAST_WRITE_TIMESTAMP";
69
70 private static NioManager instance;
71
72 private final ByteBuffer readBuffer = ByteBuffer.allocate(8192);
73 private final Map<SelectionKey, LinkedBlockingQueue<ByteBuffer>> outputBuffers = new ConcurrentHashMap<SelectionKey, LinkedBlockingQueue<ByteBuffer>>();
74 private final Map<SelectionKey, NioReference<NioEndpoint>> sockets = new ConcurrentHashMap<SelectionKey, NioReference<NioEndpoint>>();
75 private final Collection<NioWaitingSocket> waitingSockets = new ConcurrentLinkedQueue<NioWaitingSocket>();
76 private final Collection<SelectionKey> closeKeys = new ConcurrentLinkedQueue<SelectionKey>();
77 private final Collection<SelectionKey> writeKeys = new ConcurrentLinkedQueue<SelectionKey>();
78 private final Map<SelectionKey, NioWaitingSocket> connectingSockets = new ConcurrentHashMap<SelectionKey, NioWaitingSocket>();
79
80 private Selector selector;
81 private Thread nioThread = null;
82
83 public static synchronized NioManager instance()
84 {
85 if (instance == null)
86 instance = new NioManager();
87 if (instance.nioThread == null)
88 instance.start();
89 return instance;
90 }
91
92 private NioManager()
93 {
94 }
95
96 public static void reset() throws IOException
97 {
98 if (instance != null)
99 instance.stop();
100 instance = null;
101 }
102
103 public void start()
104 {
105 try
106 {
107 this.selector = SelectorProvider.provider().openSelector();
108 }
109 catch (IOException e)
110 {
111 e.printStackTrace();
112 return;
113 }
114 this.nioThread = new Thread(this);
115 this.nioThread.start();
116 }
117
118 public void stop() throws IOException
119 {
120 this.selector.close();
121 }
122
123 public void run()
124 {
125 for (;;)
126 {
127 try
128 {
129
130 if (!this.writeKeys.isEmpty())
131 {
132 synchronized (this.writeKeys)
133 {
134 for (SelectionKey key : this.writeKeys)
135 if (key.isValid())
136 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
137 this.writeKeys.clear();
138 }
139 }
140
141 synchronized (this.selector)
142 {
143
144
145
146
147 if (this.waitingSockets.isEmpty() && this.closeKeys.isEmpty())
148 {
149 if ((this.connectingSockets.size() == 0) && (this.writeKeys.size() == 0))
150 this.selector.select(10000);
151 else
152 this.selector.select(1000);
153 }
154 else
155 {
156
157 this.selector.select(10);
158 }
159
160 Iterator<SelectionKey> selectedKeys = this.selector.selectedKeys().iterator();
161 while (selectedKeys.hasNext())
162 {
163 SelectionKey key = selectedKeys.next();
164 selectedKeys.remove();
165 if (!key.isValid())
166 continue;
167 try
168 {
169 if (key.isReadable())
170 this.read(key);
171 else if (key.isWritable())
172 this.write(key);
173 else if (key.isAcceptable())
174 this.accept(key);
175 else if (key.isConnectable())
176 this.connect(key);
177 }
178 catch (Throwable e)
179 {
180
181
182 if (e instanceof CoderMalfunctionError || e instanceof IOError || e instanceof ThreadDeath)
183 throw new RuntimeException(e);
184
185
186
187 this.close(key);
188 NioEndpoint ne = this.sockets.get(key).get();
189 if (ne != null)
190 ne.notifyError(e instanceof Exception ? (Exception) e : new Exception(e));
191 }
192 }
193
194 }
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215 if (!this.waitingSockets.isEmpty())
216 {
217 synchronized (this.waitingSockets)
218 {
219 Iterator<NioWaitingSocket> it = this.waitingSockets.iterator();
220 while (it.hasNext())
221 {
222 NioWaitingSocket waitingSocket = it.next();
223 it.remove();
224 boolean accept = (waitingSocket.getOps() & SelectionKey.OP_ACCEPT) != 0;
225 if (accept && (this.connectingSockets.size() > MAX_OUTGOING_SOCKETS))
226 continue;
227 NioEndpoint endpoint = waitingSocket.getSocket();
228 try
229 {
230 SelectionKey key = waitingSocket.createChannel().register(this.selector, waitingSocket.getOps());
231 endpoint.setNioManager(this, key);
232 this.outputBuffers.put(key, new LinkedBlockingQueue<ByteBuffer>());
233 this.sockets.put(key, new NioReference<NioEndpoint>(endpoint));
234 if (accept)
235 endpoint.notifyConnected();
236 else
237 this.connectingSockets.put(key, waitingSocket);
238 }
239 catch (Exception e)
240 {
241 endpoint.notifyError(e);
242 }
243 finally
244 {
245 synchronized (waitingSocket)
246 {
247 waitingSocket.setDone(true);
248 waitingSocket.notifyAll();
249 }
250 }
251 }
252 }
253 }
254
255 if (!this.connectingSockets.isEmpty())
256 {
257
258 Iterator<NioWaitingSocket> it = this.connectingSockets.values().iterator();
259 while (it.hasNext())
260 {
261 NioWaitingSocket socket = it.next();
262 if (System.currentTimeMillis() > socket.getTimeoutTime())
263 {
264 it.remove();
265 socket.getChannel().close();
266
267 socket.getSocket().notifyError(new IOException("Timed out while trying to connect"));
268 }
269 }
270 }
271
272 if (!this.closeKeys.isEmpty())
273 {
274
275 LinkedList<NioEndpoint> closedEndpoints = new LinkedList<NioEndpoint>();
276 synchronized (this.closeKeys)
277 {
278 Iterator<SelectionKey> it = this.closeKeys.iterator();
279 while (it.hasNext())
280 {
281 SelectionKey key = it.next();
282 Collection<ByteBuffer> ob = this.outputBuffers.get(key);
283
284 HashMap<String, Object> metadata = this.getMetadata(key);
285
286
287
288
289
290
291
292 if ((ob != null) && !ob.isEmpty() && key.channel().isOpen())
293 {
294
295 Long lastChange = (Long) metadata.get(KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP);
296 Integer lastSize = (Integer) metadata.get(KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE);
297 Integer lastBufRemaining = (Integer) metadata.get(KEY_METADATA_LAST_BUFFER_REMAINING);
298
299 ByteBuffer first = ob.iterator().next();
300
301 int size = ob.size();
302 int remaining = first.remaining();
303 long cTime = System.currentTimeMillis();
304
305 if (lastChange == null)
306 lastChange = cTime;
307
308 if (lastSize == null)
309 lastSize = size;
310
311 if (lastBufRemaining == null)
312 lastBufRemaining = remaining;
313
314
315
316 long endTime = lastChange.longValue() + SOCKET_CLOSE_TIMEOUT;
317 boolean doContinue = true;
318
319
320 if (cTime >= endTime)
321 {
322 if (size == lastSize.intValue())
323 {
324
325
326 if (remaining == lastBufRemaining)
327 {
328
329 doContinue = false;
330 }
331 }
332 }
333
334
335
336
337 if (doContinue)
338 {
339 metadata.put(KEY_METADATA_LAST_OUTPUT_BUFFER_CHANGE_TIMESTAMP, cTime);
340 metadata.put(KEY_METADATA_LAST_OUTPUT_BUFFER_SIZE, size);
341 metadata.put(KEY_METADATA_LAST_BUFFER_REMAINING, remaining);
342 continue;
343 }
344 }
345
346 NioEndpoint socket = null;
347 try
348 {
349 socket = this.sockets.remove(key).get();
350 }
351 catch (Exception e)
352 {
353 }
354
355 try
356 {
357 key.channel().close();
358 }
359 catch (IOException e)
360 {
361 e.printStackTrace();
362 }
363
364 key.cancel();
365
366 synchronized (this.outputBuffers)
367 {
368 this.outputBuffers.remove(key);
369 }
370
371 it.remove();
372
373 if (socket != null)
374 closedEndpoints.add(socket);
375 }
376 }
377
378 if (!this.writeKeys.isEmpty())
379 {
380 synchronized (this.writeKeys)
381 {
382 long now = System.currentTimeMillis();
383 for (SelectionKey key : this.writeKeys)
384 {
385 HashMap<String, Object> metadata = this.getMetadata(key);
386 Long lastWrite = (Long) metadata.get(KEY_METADATA_LAST_WRITE_TIMESTAMP);
387
388 if (lastWrite == null)
389 lastWrite = now;
390
391 if ((lastWrite.longValue() + SOCKET_WRITE_TIMEOUT) < now)
392 this.close(key);
393 }
394 }
395 }
396
397
398 for (NioEndpoint socket : closedEndpoints)
399 {
400 try
401 {
402 socket.notifyClosed();
403 }
404 catch (Exception e)
405 {
406 e.printStackTrace();
407 }
408 }
409 }
410 }
411 catch (ClosedSelectorException e)
412 {
413
414 this.selector = null;
415 this.nioThread = null;
416 this.outputBuffers.clear();
417 this.sockets.clear();
418 this.waitingSockets.clear();
419 this.closeKeys.clear();
420 this.writeKeys.clear();
421 this.connectingSockets.clear();
422 return;
423 }
424 catch (Exception e)
425 {
426 e.printStackTrace();
427 }
428 }
429 }
430
431 @SuppressWarnings("unchecked")
432 private HashMap<String, Object> getMetadata(SelectionKey key)
433 {
434 HashMap<String, Object> attachment = (HashMap<String, Object>) key.attachment();
435
436 if (attachment == null)
437 {
438 attachment = new HashMap<String, Object>();
439 key.attach(attachment);
440 }
441
442 return attachment;
443 }
444
445 private void connect(SelectionKey key) throws IOException
446 {
447 IOException error = null;
448 try
449 {
450 if (!((SocketChannel) key.channel()).finishConnect())
451 error = new IOException("Failed to connect");
452 }
453 catch (IOException e)
454 {
455 error = e;
456
457 if ("Connection reset by peer".equals(e.getMessage()))
458 {
459 NioWaitingSocket nws = this.connectingSockets.get(key);
460 if (nws != null)
461 {
462 if (nws.getRetries() > 0)
463 {
464 synchronized (this.waitingSockets)
465 {
466 this.waitingSockets.add(nws);
467 }
468 nws.setRetries(nws.getRetries() - 1);
469 return;
470 }
471 }
472 }
473 }
474 this.connectingSockets.remove(key);
475 if (error == null)
476 {
477 NioEndpoint ne = this.sockets.get(key).get();
478 if (ne != null)
479 ne.notifyConnected();
480 key.interestOps(SelectionKey.OP_READ);
481 }
482 else
483 throw error;
484 }
485
486 private void write(SelectionKey key) throws IOException
487 {
488 SocketChannel socketChannel = (SocketChannel) key.channel();
489 try
490 {
491 LinkedBlockingQueue<ByteBuffer> queue = this.outputBuffers.get(key);
492
493 while (!queue.isEmpty())
494 {
495 ByteBuffer buf = queue.peek();
496
497 int wrote = socketChannel.write(buf);
498 if ((wrote == 0) || (buf.remaining() > 0))
499 {
500
501
502
503
504
505
506
507
508 break;
509 }
510 queue.poll();
511 }
512 if (queue.isEmpty())
513 {
514
515
516
517 synchronized (this.writeKeys)
518 {
519
520 if (queue.isEmpty())
521 key.interestOps(SelectionKey.OP_READ);
522 }
523 }
524
525 HashMap<String, Object> metadata = this.getMetadata(key);
526 metadata.put(KEY_METADATA_LAST_WRITE_TIMESTAMP, Long.valueOf(System.currentTimeMillis()));
527 }
528 catch (IOException e)
529 {
530 this.outputBuffers.remove(key);
531 throw e;
532 }
533 }
534
535 private void read(SelectionKey key) throws IOException
536 {
537 SocketChannel socketChannel = (SocketChannel) key.channel();
538 this.readBuffer.clear();
539 int numRead = socketChannel.read(this.readBuffer);
540 if (numRead <= -1)
541 {
542
543 LinkedBlockingQueue<ByteBuffer> queue = this.outputBuffers.get(key);
544 queue.clear();
545 this.close(key);
546 }
547 else
548 {
549 NioEndpoint socket = this.sockets.get(key).get();
550 if (socket != null)
551 socket.receive(this.readBuffer.array(), numRead);
552 }
553 }
554
555 protected void close(SelectionKey key)
556 {
557 if (key == null)
558 return;
559 synchronized (this.closeKeys)
560 {
561 this.closeKeys.add(key);
562 }
563
564 if (Thread.currentThread() != this.nioThread)
565 this.selector.wakeup();
566 }
567
568 private void accept(SelectionKey key) throws IOException
569 {
570 NioEndpoint endpoint = this.sockets.get(key).get();
571
572 if (endpoint == null)
573 {
574 key.cancel();
575 return;
576 }
577
578 NioEndpoint newPoint = endpoint.createAcceptChild();
579 SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
580 channel.configureBlocking(false);
581 SelectionKey channelKey = channel.register(this.selector, SelectionKey.OP_READ);
582 newPoint.setNioManager(this, channelKey);
583 this.sockets.put(channelKey, new NioReference<NioEndpoint>(newPoint));
584 this.outputBuffers.put(channelKey, new LinkedBlockingQueue<ByteBuffer>());
585 endpoint.notifyAccepted(newPoint);
586 }
587
588 protected void send(SelectionKey key, byte[] data, int size) throws IOException
589 {
590 if (!key.isValid())
591 throw new IOException("Transport is no longer available");
592 if (this.closeKeys.contains(key))
593 throw new IOException("Cannot send to a closed socket.");
594
595 byte[] mData = new byte[size];
596 System.arraycopy(data, 0, mData, 0, size);
597 ByteBuffer byteBuffer = ByteBuffer.wrap(mData);
598
599 this.outputBuffers.get(key).add(byteBuffer);
600
601 synchronized (this.writeKeys)
602 {
603 this.writeKeys.add(key);
604 }
605
606 if (Thread.currentThread() != this.nioThread)
607 this.selector.wakeup();
608 }
609
610 public void open(NioEndpoint socket, InetSocketAddress remote)
611 {
612 this.open(socket, remote, false);
613 }
614
615 public void open(NioEndpoint socket, InetSocketAddress remote, boolean wait)
616 {
617 NioWaitingSocket waitingSocket = new NioWaitingSocket(socket, remote, false, SelectionKey.OP_CONNECT);
618 synchronized (this.waitingSockets)
619 {
620 this.waitingSockets.add(waitingSocket);
621 }
622
623 if (Thread.currentThread() != this.nioThread)
624 this.selector.wakeup();
625 if (wait && !this.nioThread.equals(Thread.currentThread()))
626 {
627 synchronized (waitingSocket)
628 {
629 while (!waitingSocket.isDone())
630 {
631 try
632 {
633 this.selector.wakeup();
634 waitingSocket.wait(1000);
635 }
636 catch (InterruptedException e)
637 {
638 e.printStackTrace();
639 return;
640 }
641 }
642 }
643 }
644 }
645
646 public void bind(NioEndpoint endpoint, SocketAddress local, boolean wait)
647 {
648 if (!endpoint.canAccept())
649 throw new IllegalArgumentException("Received endpoint of type " + endpoint.getClass() + " that cannot accept");
650 NioWaitingSocket waitingSocket = new NioWaitingSocket(endpoint, local, true, SelectionKey.OP_ACCEPT);
651 synchronized (this.waitingSockets)
652 {
653 this.waitingSockets.add(waitingSocket);
654
655 if (Thread.currentThread() != this.nioThread)
656 this.selector.wakeup();
657 }
658 if (wait && !this.nioThread.equals(Thread.currentThread()))
659 {
660 synchronized (waitingSocket)
661 {
662 while (!waitingSocket.isDone())
663 {
664 try
665 {
666 waitingSocket.wait(1000);
667 this.selector.wakeup();
668 }
669 catch (InterruptedException e)
670 {
671 e.printStackTrace();
672 }
673 }
674 }
675 }
676 }
677
678 class NioReference<T extends NioEndpoint> extends WeakReference<T>
679 {
680
681 public NioReference(T referent)
682 {
683 super(referent);
684 }
685
686 public void clear()
687 {
688 T ref = this.get();
689 if (ref != null)
690 NioManager.this.close(ref.getKey());
691 super.clear();
692 }
693
694 }
695
696 }