View Javadoc
1   package com.ericsson.research.transport.ws.spi;
2   
3   /*
4    * ##_BEGIN_LICENSE_##
5    * Transport Abstraction Package (trap)
6    * ----------
7    * Copyright (C) 2014 Ericsson AB
8    * ----------
9    * Redistribution and use in source and binary forms, with or without modification,
10   * are permitted provided that the following conditions are met:
11   * 
12   * 1. Redistributions of source code must retain the above copyright notice, this
13   *    list of conditions and the following disclaimer.
14   * 
15   * 2. Redistributions in binary form must reproduce the above copyright notice,
16   *    this list of conditions and the following disclaimer in the documentation
17   *    and/or other materials provided with the distribution.
18   * 
19   * 3. Neither the name of the Ericsson AB nor the names of its contributors
20   *    may be used to endorse or promote products derived from this software without
21   *    specific prior written permission.
22   * 
23   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
24   * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
25   * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
26   * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
27   * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
28   * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29   * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
30   * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
31   * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
32   * OF THE POSSIBILITY OF SUCH DAMAGE.
33   * ##_END_LICENSE_##
34   */
35  
36  import java.io.ByteArrayOutputStream;
37  import java.io.IOException;
38  import java.io.OutputStream;
39  import java.net.InetSocketAddress;
40  import java.nio.ByteBuffer;
41  import java.util.concurrent.ConcurrentLinkedQueue;
42  
43  import com.ericsson.research.transport.ws.WSListener;
44  import com.ericsson.research.trap.nio.Nio;
45  import com.ericsson.research.trap.nio.Socket;
46  import com.ericsson.research.trap.nio.Socket.SocketHandler;
47  
48  public class WSNioEndpoint extends WSAbstractProtocolWrapper implements SocketHandler
49  {
50  
51  	private Socket	        socket;
52  	private NioOutputStream	os;
53  	private boolean	        closing;
54  
55  	public WSNioEndpoint(WSAbstractProtocol protocol, WSListener listener) throws IOException
56  	{
57  		super(protocol, listener);
58  		if (protocol.securityContext != null)
59  			this.socket = WSSecureSocketFactory.getSecureSocket(protocol.securityContext);
60  		else
61  			this.socket = Nio.factory().client();
62  
63  		socket.setHandler(this);
64  	}
65  
66  	public WSNioEndpoint(Socket socket, WSAbstractProtocol protocol, WSListener listener)
67  	{
68  		super(protocol, listener);
69  		if (socket == null)
70  			throw new IllegalArgumentException("Socket cannot be null");
71  		this.socket = socket;
72  		socket.setHandler(this);
73  	}
74  
75  	public synchronized void open() throws IOException
76  	{
77  		this.closing = false;
78  		this.socket.open(this.protocol.host, this.protocol.port);
79  	}
80  
81  	ConcurrentLinkedQueue<ByteBuffer>	msgs	= new ConcurrentLinkedQueue<ByteBuffer>();
82  
83  	class NioOutputStream extends ByteArrayOutputStream
84  	{
85  
86  		public synchronized void flush() throws IOException
87  		{
88  			if (this.count == 0)
89  				return;
90  
91  			if (WSNioEndpoint.this.socket == null)
92  				throw new IOException("The socket has already been closed");
93  
94  			ByteBuffer buffer = ByteBuffer.allocate(count);
95  			buffer.put(buf, 0, count);
96  			buffer.flip();
97  			msgs.add(buffer);
98  
99  			_flush();
100 
101 			this.reset();
102 		}
103 
104 		public synchronized void close() throws IOException
105 		{
106 			this.flush();
107 			super.close();
108 		}
109 
110 	}
111 
112 	public void _flush()
113 	{
114 			for (;;)
115 			{
116 				ByteBuffer buf;
117 				synchronized(msgs) 
118 				{
119 					buf = msgs.peek();
120 	
121 					if (buf == null)
122 						return;
123 	
124 					if (!buf.hasRemaining())
125 					{
126 						msgs.poll();
127 						continue;
128 					}
129 				}
130 				socket.send(buf);
131 
132 				if (buf.hasRemaining())
133 					return;
134 			}
135 	}
136 
137 	public synchronized OutputStream getRawOutput() throws IOException
138 	{
139 		if (this.closing)
140 			throw new IOException("The socket is already closing");
141 		if (this.os == null)
142 			this.os = new NioOutputStream();
143 		return this.os;
144 	}
145 
146 	public synchronized void forceClose()
147 	{
148 		if (this.closing)
149 			return;
150 		this.closing = true;
151 		this.socket.close();
152 		this.socket = null;
153 		this.os = null;
154 	}
155 
156 	public String toString()
157 	{
158 		return "Nio (" + this.protocol + ")";
159 	}
160 
161 	public InetSocketAddress getLocalSocketAddress()
162 	{
163 		try
164         {
165 	        return this.socket.getLocalSocketAddress();
166         }
167         catch (IOException e)
168         {
169 	        throw new RuntimeException(e);
170         }
171 	}
172 
173 	public InetSocketAddress getRemoteSocketAddress()
174 	{
175 		try
176         {
177 	        return this.socket.getRemoteSocketAddress();
178         }
179         catch (IOException e)
180         {
181 	        throw new RuntimeException(e);
182         }
183 	}
184 
185 	@Override
186 	public void sent(Socket sock)
187 	{
188 		_flush();
189 	}
190 
191 	byte[]	rcvBuf	= new byte[4096];
192 
193 	@Override
194 	public synchronized void received(ByteBuffer data, Socket sock)
195 	{
196 		while (data.hasRemaining())
197 		{
198 			int loopData = Math.min(rcvBuf.length, data.remaining());
199 			data.get(rcvBuf, 0, loopData);
200 			this.protocol.notifySocketData(rcvBuf, loopData);
201 		}
202 	}
203 
204 	@Override
205 	public void opened(Socket sock)
206 	{
207 		this.protocol.notifyConnected();
208 
209 	}
210 
211 	@Override
212 	public void closed(Socket sock)
213 	{
214 		this.protocol.notifyDisconnected();
215 
216 	}
217 
218 	@Override
219 	public void error(Throwable exc, Socket sock)
220 	{
221 		if(this.listener != null)
222 			this.listener.notifyError(exc);
223 		this.forceClose();
224 	}
225 
226 }