1 package com.ericsson.research.transport.ws.spi;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 import java.io.ByteArrayOutputStream;
37 import java.io.IOException;
38 import java.io.OutputStream;
39 import java.net.InetSocketAddress;
40 import java.nio.ByteBuffer;
41 import java.util.concurrent.ConcurrentLinkedQueue;
42
43 import com.ericsson.research.transport.ws.WSListener;
44 import com.ericsson.research.trap.nio.Nio;
45 import com.ericsson.research.trap.nio.Socket;
46 import com.ericsson.research.trap.nio.Socket.SocketHandler;
47
48 public class WSNioEndpoint extends WSAbstractProtocolWrapper implements SocketHandler
49 {
50
51 private Socket socket;
52 private NioOutputStream os;
53 private boolean closing;
54
55 public WSNioEndpoint(WSAbstractProtocol protocol, WSListener listener) throws IOException
56 {
57 super(protocol, listener);
58 if (protocol.securityContext != null)
59 this.socket = WSSecureSocketFactory.getSecureSocket(protocol.securityContext);
60 else
61 this.socket = Nio.factory().client();
62
63 socket.setHandler(this);
64 }
65
66 public WSNioEndpoint(Socket socket, WSAbstractProtocol protocol, WSListener listener)
67 {
68 super(protocol, listener);
69 if (socket == null)
70 throw new IllegalArgumentException("Socket cannot be null");
71 this.socket = socket;
72 socket.setHandler(this);
73 }
74
75 public synchronized void open() throws IOException
76 {
77 this.closing = false;
78 this.socket.open(this.protocol.host, this.protocol.port);
79 }
80
81 ConcurrentLinkedQueue<ByteBuffer> msgs = new ConcurrentLinkedQueue<ByteBuffer>();
82
83 class NioOutputStream extends ByteArrayOutputStream
84 {
85
86 public synchronized void flush() throws IOException
87 {
88 if (this.count == 0)
89 return;
90
91 if (WSNioEndpoint.this.socket == null)
92 throw new IOException("The socket has already been closed");
93
94 ByteBuffer buffer = ByteBuffer.allocate(count);
95 buffer.put(buf, 0, count);
96 buffer.flip();
97 msgs.add(buffer);
98
99 _flush();
100
101 this.reset();
102 }
103
104 public synchronized void close() throws IOException
105 {
106 this.flush();
107 super.close();
108 }
109
110 }
111
112 public void _flush()
113 {
114 for (;;)
115 {
116 ByteBuffer buf;
117 synchronized(msgs)
118 {
119 buf = msgs.peek();
120
121 if (buf == null)
122 return;
123
124 if (!buf.hasRemaining())
125 {
126 msgs.poll();
127 continue;
128 }
129 }
130 socket.send(buf);
131
132 if (buf.hasRemaining())
133 return;
134 }
135 }
136
137 public synchronized OutputStream getRawOutput() throws IOException
138 {
139 if (this.closing)
140 throw new IOException("The socket is already closing");
141 if (this.os == null)
142 this.os = new NioOutputStream();
143 return this.os;
144 }
145
146 public synchronized void forceClose()
147 {
148 if (this.closing)
149 return;
150 this.closing = true;
151 this.socket.close();
152 this.socket = null;
153 this.os = null;
154 }
155
156 public String toString()
157 {
158 return "Nio (" + this.protocol + ")";
159 }
160
161 public InetSocketAddress getLocalSocketAddress()
162 {
163 try
164 {
165 return this.socket.getLocalSocketAddress();
166 }
167 catch (IOException e)
168 {
169 throw new RuntimeException(e);
170 }
171 }
172
173 public InetSocketAddress getRemoteSocketAddress()
174 {
175 try
176 {
177 return this.socket.getRemoteSocketAddress();
178 }
179 catch (IOException e)
180 {
181 throw new RuntimeException(e);
182 }
183 }
184
185 @Override
186 public void sent(Socket sock)
187 {
188 _flush();
189 }
190
191 byte[] rcvBuf = new byte[4096];
192
193 @Override
194 public synchronized void received(ByteBuffer data, Socket sock)
195 {
196 while (data.hasRemaining())
197 {
198 int loopData = Math.min(rcvBuf.length, data.remaining());
199 data.get(rcvBuf, 0, loopData);
200 this.protocol.notifySocketData(rcvBuf, loopData);
201 }
202 }
203
204 @Override
205 public void opened(Socket sock)
206 {
207 this.protocol.notifyConnected();
208
209 }
210
211 @Override
212 public void closed(Socket sock)
213 {
214 this.protocol.notifyDisconnected();
215
216 }
217
218 @Override
219 public void error(Throwable exc, Socket sock)
220 {
221 if(this.listener != null)
222 this.listener.notifyError(exc);
223 this.forceClose();
224 }
225
226 }