1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.newsclub.net.unix;
19
20 import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD_WR;
21
22 import java.io.FileDescriptor;
23 import java.io.IOException;
24 import java.net.DatagramPacket;
25 import java.net.DatagramSocketImpl;
26 import java.net.InetAddress;
27 import java.net.NetworkInterface;
28 import java.net.SocketAddress;
29 import java.net.SocketException;
30 import java.net.SocketTimeoutException;
31 import java.nio.ByteBuffer;
32 import java.nio.channels.ClosedChannelException;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import java.util.concurrent.atomic.AtomicInteger;
35
36 import org.eclipse.jdt.annotation.NonNull;
37 import org.eclipse.jdt.annotation.Nullable;
38
39
40
41
42
43
44
45 @SuppressWarnings("PMD.CyclomaticComplexity")
46 public abstract class AFDatagramSocketImpl<A extends AFSocketAddress> extends
47 DatagramSocketImplShim {
48 private final AFSocketType socketType;
49 private final AFSocketCore core;
50 final AncillaryDataSupport ancillaryDataSupport = new AncillaryDataSupport();
51 private final AtomicBoolean connected = new AtomicBoolean(false);
52 private final AtomicBoolean bound = new AtomicBoolean(false);
53
54 private final AtomicInteger socketTimeout = new AtomicInteger(0);
55 private int localPort;
56 private int remotePort = 0;
57 private final AFAddressFamily<@NonNull A> addressFamily;
58 private AFSocketImplExtensions<A> implExtensions = null;
59
60
61
62
63
64
65
66
67
68 @SuppressWarnings("this-escape")
69 protected AFDatagramSocketImpl(AFAddressFamily<@NonNull A> addressFamily, FileDescriptor fd,
70 AFSocketType socketType) {
71 super();
72 this.addressFamily = addressFamily;
73
74 this.socketType = socketType;
75 this.core = new AFSocketCore(this, fd, ancillaryDataSupport, getAddressFamily(), true);
76 this.fd = core.fd;
77 }
78
79 @Override
80 protected final void create() throws SocketException {
81 if (isClosed()) {
82 throw new SocketException("Already closed");
83 } else if (fd.valid()) {
84 return;
85 }
86 try {
87 NativeUnixSocket.createSocket(fd, getAddressFamily().getDomain(), socketType.getId());
88 } catch (SocketException e) {
89 throw e;
90 } catch (IOException e) {
91 throw (SocketException) new SocketException(e.getMessage()).initCause(e);
92 }
93 }
94
95 @Override
96 protected final void close() {
97 core.runCleaner();
98 }
99
100 @Override
101 protected final void connect(InetAddress address, int port) throws SocketException {
102
103 }
104
105 final void connect(AFSocketAddress socketAddress) throws IOException {
106 if (socketAddress == AFSocketAddress.INTERNAL_DUMMY_CONNECT) {
107 return;
108 }
109 ByteBuffer ab = socketAddress.getNativeAddressDirectBuffer();
110 NativeUnixSocket.connect(ab, ab.limit(), fd, -1);
111 this.remotePort = socketAddress.getPort();
112 }
113
114 @Override
115 protected final void disconnect() {
116 try {
117 NativeUnixSocket.disconnect(fd);
118 connected.set(false);
119 this.remotePort = 0;
120 } catch (IOException e) {
121 StackTraceUtil.printStackTrace(e);
122 }
123 }
124
125 final AFSocketCore getCore() {
126 return core;
127 }
128
129 @Override
130 protected final FileDescriptor getFileDescriptor() {
131 return core.fd;
132 }
133
134 final boolean isClosed() {
135 return core.isClosed();
136 }
137
138 @Override
139 protected final void bind(int lport, InetAddress laddr) throws SocketException {
140
141 }
142
143 final void bind(AFSocketAddress socketAddress) throws SocketException {
144 if (socketAddress == AFSocketAddress.INTERNAL_DUMMY_BIND) {
145 return;
146 }
147 try {
148 ByteBuffer ab;
149 if (socketAddress == null) {
150 ab = AFSocketAddress.getNativeAddressDirectBuffer(0);
151 } else {
152 ab = socketAddress.getNativeAddressDirectBuffer();
153 }
154 NativeUnixSocket.bind(ab, ab.limit(), fd, NativeUnixSocket.OPT_DGRAM_MODE);
155 if (socketAddress == null) {
156 this.localPort = 0;
157 this.bound.set(false);
158 } else {
159 this.localPort = socketAddress.getPort();
160 }
161 } catch (SocketException e) {
162 throw e;
163 } catch (IOException e) {
164 throw (SocketException) new SocketException(e.getMessage()).initCause(e);
165 }
166 }
167
168 @Override
169 protected final void receive(DatagramPacket p) throws IOException {
170 recv(p, 0);
171 }
172
173 private void recv(DatagramPacket p, int options) throws IOException {
174 int len = p.getLength();
175 FileDescriptor fdesc = core.validFdOrException();
176
177 ByteBuffer datagramPacketBuffer = core.getThreadLocalDirectByteBuffer(len);
178 len = Math.min(len, datagramPacketBuffer.capacity());
179
180 options |= core.isBlocking() ? 0 : NativeUnixSocket.OPT_NON_BLOCKING;
181
182 ByteBuffer socketAddressBuffer = AFSocketAddress.SOCKETADDRESS_BUFFER_TL.get();
183 int count = NativeUnixSocket.receive(fdesc, datagramPacketBuffer, 0, len, socketAddressBuffer,
184 options, ancillaryDataSupport, socketTimeout.get());
185 if (count > len) {
186 throw new IllegalStateException("count > len: " + count + " > " + len);
187 } else if (count == -1) {
188 throw new SocketTimeoutException();
189 } else if (count < 0) {
190 throw new IllegalStateException("count: " + count + " < 0");
191 }
192 datagramPacketBuffer.limit(count);
193 datagramPacketBuffer.rewind();
194 datagramPacketBuffer.get(p.getData(), p.getOffset(), count);
195
196 p.setLength(count);
197
198 A addr = AFSocketAddress.ofInternal(socketAddressBuffer, getAddressFamily());
199 p.setAddress(addr == null ? null : addr.getInetAddress());
200 p.setPort(remotePort);
201 }
202
203 @Override
204 protected final void send(DatagramPacket p) throws IOException {
205 InetAddress addr = p.getAddress();
206 ByteBuffer sendToBuf = null;
207 int sendToBufLen = 0;
208 if (addr != null) {
209 byte[] addrBytes = AFInetAddress.unwrapAddress(addr, getAddressFamily());
210 if (addrBytes != null) {
211 sendToBuf = AFSocketAddress.SOCKETADDRESS_BUFFER_TL.get();
212 sendToBufLen = NativeUnixSocket.bytesToSockAddr(getAddressFamily().getDomain(), sendToBuf,
213 addrBytes);
214 sendToBuf.position(0);
215 if (sendToBufLen == -1) {
216 throw new SocketException("Unsupported domain");
217 }
218 }
219 }
220 FileDescriptor fdesc = core.validFdOrException();
221
222 int len = p.getLength();
223
224 ByteBuffer datagramPacketBuffer = core.getThreadLocalDirectByteBuffer(len);
225 datagramPacketBuffer.clear();
226 datagramPacketBuffer.put(p.getData(), p.getOffset(), p.getLength());
227 datagramPacketBuffer.flip();
228
229 NativeUnixSocket.send(fdesc, datagramPacketBuffer, 0, len, sendToBuf, sendToBufLen,
230
231 NativeUnixSocket.OPT_DGRAM_MODE, ancillaryDataSupport);
232 }
233
234 @Override
235 protected final int peek(InetAddress i) throws IOException {
236 throw new SocketException("Unsupported operation");
237 }
238
239 @Override
240 protected final int peekData(DatagramPacket p) throws IOException {
241 recv(p, NativeUnixSocket.OPT_PEEK);
242 return 0;
243 }
244
245 @Override
246 @Deprecated
247 protected final byte getTTL() throws IOException {
248 return (byte) (getTimeToLive() & 0xFF);
249 }
250
251 @Override
252 @Deprecated
253 protected final void setTTL(byte ttl) throws IOException {
254
255 }
256
257 @Override
258 protected final int getTimeToLive() throws IOException {
259 return 0;
260 }
261
262 @Override
263 protected final void setTimeToLive(int ttl) throws IOException {
264
265 }
266
267 @Override
268 protected final void join(InetAddress inetaddr) throws IOException {
269 throw new SocketException("Unsupported");
270 }
271
272 @Override
273 protected final void leave(InetAddress inetaddr) throws IOException {
274 throw new SocketException("Unsupported");
275 }
276
277 @Override
278 protected final void joinGroup(SocketAddress mcastaddr, NetworkInterface netIf)
279 throws IOException {
280 throw new SocketException("Unsupported");
281 }
282
283 @Override
284 protected final void leaveGroup(SocketAddress mcastaddr, NetworkInterface netIf)
285 throws IOException {
286 throw new SocketException("Unsupported");
287 }
288
289 @Override
290 public Object getOption(int optID) throws SocketException {
291 if (isClosed()) {
292 throw new SocketException("Socket is closed");
293 }
294
295 FileDescriptor fdesc = core.validFdOrException();
296 return AFSocketImpl.getOptionDefault(fdesc, optID, socketTimeout, getAddressFamily());
297 }
298
299 @Override
300 public void setOption(int optID, Object value) throws SocketException {
301 if (isClosed()) {
302 throw new SocketException("Socket is closed");
303 }
304
305 FileDescriptor fdesc = core.validFdOrException();
306 AFSocketImpl.setOptionDefault(fdesc, optID, value, socketTimeout);
307 }
308
309 @SuppressWarnings("unchecked")
310 final A receive(ByteBuffer dst) throws IOException {
311 try {
312 return (A) core.receive(dst);
313 } catch (SocketClosedException e) {
314 throw (ClosedChannelException) new ClosedChannelException().initCause(e);
315 }
316 }
317
318 final int send(ByteBuffer src, SocketAddress target) throws IOException {
319 try {
320 return core.write(src, target, 0);
321 } catch (SocketClosedException e) {
322 throw (ClosedChannelException) new ClosedChannelException().initCause(e);
323 }
324 }
325
326 final int read(ByteBuffer dst, ByteBuffer socketAddressBuffer) throws IOException {
327 try {
328 return core.read(dst, socketAddressBuffer, 0);
329 } catch (SocketClosedException e) {
330 throw (ClosedChannelException) new ClosedChannelException().initCause(e);
331 }
332 }
333
334 final int write(ByteBuffer src) throws IOException {
335 try {
336 return core.write(src);
337 } catch (SocketClosedException e) {
338 throw (ClosedChannelException) new ClosedChannelException().initCause(e);
339 }
340 }
341
342 final boolean isConnected() {
343 if (connected.get()) {
344 return true;
345 }
346 if (isClosed()) {
347 return false;
348 }
349 if (core.isConnected(false)) {
350 connected.set(true);
351 return true;
352 }
353 return false;
354 }
355
356 final boolean isBound() {
357 if (bound.get()) {
358 return true;
359 }
360 if (isClosed()) {
361 return false;
362 }
363 if (core.isConnected(true)) {
364 bound.set(true);
365 return true;
366 }
367 return false;
368 }
369
370 final void updatePorts(int local, int remote) {
371 this.localPort = local;
372 this.remotePort = remote;
373 }
374
375 final @Nullable A getLocalSocketAddress() {
376 return AFSocketAddress.getSocketAddress(getFileDescriptor(), false, localPort,
377 getAddressFamily());
378 }
379
380 final @Nullable A getRemoteSocketAddress() {
381 return AFSocketAddress.getSocketAddress(getFileDescriptor(), true, remotePort,
382 getAddressFamily());
383 }
384
385
386
387
388
389
390 protected final AFAddressFamily<@NonNull A> getAddressFamily() {
391 return addressFamily;
392 }
393
394
395
396
397
398
399
400
401 protected final synchronized AFSocketImplExtensions<A> getImplExtensions() {
402 if (implExtensions == null) {
403 implExtensions = addressFamily.initImplExtensions(ancillaryDataSupport);
404 }
405 return implExtensions;
406 }
407
408
409 @SuppressWarnings("Finally" )
410 final boolean accept0(AFDatagramSocketImpl<A> socket) throws IOException {
411 FileDescriptor fdesc = core.validFdOrException();
412 if (isClosed()) {
413 throw new SocketException("Socket is closed");
414 } else if (!isBound()) {
415 throw new SocketException("Socket is not bound");
416 }
417
418 AFSocketAddress socketAddress = core.socketAddress;
419 AFSocketAddress boundSocketAddress = getLocalSocketAddress();
420 if (boundSocketAddress != null) {
421
422 core.socketAddress = socketAddress = boundSocketAddress;
423 }
424
425 if (socketAddress == null) {
426 throw new SocketException("Socket is not bound");
427 }
428
429 final AFDatagramSocketImpl<A> si = socket;
430 core.incPendingAccepts();
431 try {
432 ByteBuffer ab = socketAddress.getNativeAddressDirectBuffer();
433
434 SocketException caught = null;
435 try {
436 if (!NativeUnixSocket.accept(ab, ab.limit(), fdesc, si.fd, core.inode.get(), socketTimeout
437 .get())) {
438 return false;
439 }
440 } catch (SocketException e) {
441 caught = e;
442 } finally {
443 if (!isBound() || isClosed()) {
444 if (getCore().isShutdownOnClose()) {
445 try {
446 NativeUnixSocket.shutdown(si.fd, SHUT_RD_WR);
447 } catch (Exception e) {
448
449 }
450 }
451 try {
452 NativeUnixSocket.close(si.fd);
453 } catch (Exception e) {
454
455 }
456 if (caught != null) {
457 throw caught;
458 } else {
459 throw new SocketClosedException("Socket is closed");
460 }
461 } else if (caught != null) {
462 throw caught;
463 }
464 }
465 } finally {
466 core.decPendingAccepts();
467 }
468 si.setSocketAddress(socketAddress);
469 si.connected.set(true);
470
471 return true;
472 }
473
474 final int getLocalPort1() {
475 return localPort;
476 }
477
478 final int getRemotePort() {
479 return remotePort;
480 }
481
482 final void setSocketAddress(AFSocketAddress socketAddress) {
483 if (socketAddress == null) {
484 this.core.socketAddress = null;
485 this.localPort = -1;
486 } else {
487 this.core.socketAddress = socketAddress;
488 if (this.localPort <= 0) {
489 this.localPort = socketAddress.getPort();
490 }
491 }
492 }
493 }