View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix;
19  
20  import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD_WR;
21  
22  import java.io.FileDescriptor;
23  import java.io.IOException;
24  import java.net.DatagramPacket;
25  import java.net.DatagramSocketImpl;
26  import java.net.InetAddress;
27  import java.net.NetworkInterface;
28  import java.net.SocketAddress;
29  import java.net.SocketException;
30  import java.net.SocketTimeoutException;
31  import java.nio.ByteBuffer;
32  import java.nio.channels.ClosedChannelException;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  import org.eclipse.jdt.annotation.NonNull;
37  import org.eclipse.jdt.annotation.Nullable;
38  
39  /**
40   * A {@link DatagramSocketImpl} implemented by junixsocket.
41   *
42   * @param <A> The associated address type.
43   * @author Christian Kohlschütter
44   */
45  @SuppressWarnings("PMD.CyclomaticComplexity")
46  public abstract class AFDatagramSocketImpl<A extends AFSocketAddress> extends
47      DatagramSocketImplShim {
48    private final AFSocketType socketType;
49    private final AFSocketCore core;
50    final AncillaryDataSupport ancillaryDataSupport = new AncillaryDataSupport();
51    private final AtomicBoolean connected = new AtomicBoolean(false);
52    private final AtomicBoolean bound = new AtomicBoolean(false);
53  
54    private final AtomicInteger socketTimeout = new AtomicInteger(0);
55    private int localPort;
56    private int remotePort = 0;
57    private final AFAddressFamily<@NonNull A> addressFamily;
58    private AFSocketImplExtensions<A> implExtensions = null;
59  
60    /**
61     * Constructs a new {@link AFDatagramSocketImpl} using the given {@link FileDescriptor} (or null
62     * to create a new one).
63     *
64     * @param addressFamily The address family.
65     * @param fd The file descriptor, or {@code null}.
66     * @param socketType The socket type.
67     */
68    @SuppressWarnings("this-escape")
69    protected AFDatagramSocketImpl(AFAddressFamily<@NonNull A> addressFamily, FileDescriptor fd,
70        AFSocketType socketType) {
71      super();
72      this.addressFamily = addressFamily;
73      // FIXME verify fd
74      this.socketType = socketType;
75      this.core = new AFSocketCore(this, fd, ancillaryDataSupport, getAddressFamily(), true);
76      this.fd = core.fd;
77    }
78  
79    @Override
80    protected final void create() throws SocketException {
81      if (isClosed()) {
82        throw new SocketException("Already closed");
83      } else if (fd.valid()) {
84        return;
85      }
86      try {
87        NativeUnixSocket.createSocket(fd, getAddressFamily().getDomain(), socketType.getId());
88      } catch (SocketException e) {
89        throw e;
90      } catch (IOException e) {
91        throw (SocketException) new SocketException(e.getMessage()).initCause(e);
92      }
93    }
94  
95    @Override
96    protected final void close() {
97      core.runCleaner();
98    }
99  
100   @Override
101   protected final void connect(InetAddress address, int port) throws SocketException {
102     // not used; see connect(AFSocketAddress)
103   }
104 
105   final void connect(AFSocketAddress socketAddress) throws IOException {
106     if (socketAddress == AFSocketAddress.INTERNAL_DUMMY_CONNECT) { // NOPMD
107       return;
108     }
109     ByteBuffer ab = socketAddress.getNativeAddressDirectBuffer();
110     NativeUnixSocket.connect(ab, ab.limit(), fd, -1);
111     this.remotePort = socketAddress.getPort();
112   }
113 
114   @Override
115   protected final void disconnect() {
116     try {
117       NativeUnixSocket.disconnect(fd);
118       connected.set(false);
119       this.remotePort = 0;
120     } catch (IOException e) {
121       StackTraceUtil.printStackTrace(e);
122     }
123   }
124 
125   final AFSocketCore getCore() {
126     return core;
127   }
128 
129   @Override
130   protected final FileDescriptor getFileDescriptor() {
131     return core.fd;
132   }
133 
134   final boolean isClosed() {
135     return core.isClosed();
136   }
137 
138   @Override
139   protected final void bind(int lport, InetAddress laddr) throws SocketException {
140     // not used; see bind(AFSocketAddress)
141   }
142 
143   final void bind(AFSocketAddress socketAddress) throws SocketException {
144     if (socketAddress == AFSocketAddress.INTERNAL_DUMMY_BIND) { // NOPMD
145       return;
146     }
147     try {
148       ByteBuffer ab;
149       if (socketAddress == null) {
150         ab = AFSocketAddress.getNativeAddressDirectBuffer(0);
151       } else {
152         ab = socketAddress.getNativeAddressDirectBuffer();
153       }
154       NativeUnixSocket.bind(ab, ab.limit(), fd, NativeUnixSocket.OPT_DGRAM_MODE);
155       if (socketAddress == null) {
156         this.localPort = 0;
157         this.bound.set(false);
158       } else {
159         this.localPort = socketAddress.getPort();
160       }
161     } catch (SocketException e) {
162       throw e;
163     } catch (IOException e) {
164       throw (SocketException) new SocketException(e.getMessage()).initCause(e);
165     }
166   }
167 
168   @Override
169   protected final void receive(DatagramPacket p) throws IOException {
170     recv(p, 0);
171   }
172 
173   private void recv(DatagramPacket p, int options) throws IOException {
174     int len = p.getLength();
175     FileDescriptor fdesc = core.validFdOrException();
176 
177     ByteBuffer datagramPacketBuffer = core.getThreadLocalDirectByteBuffer(len);
178     len = Math.min(len, datagramPacketBuffer.capacity());
179 
180     options |= core.isBlocking() ? 0 : NativeUnixSocket.OPT_NON_BLOCKING;
181 
182     ByteBuffer socketAddressBuffer = AFSocketAddress.SOCKETADDRESS_BUFFER_TL.get();
183     int count = NativeUnixSocket.receive(fdesc, datagramPacketBuffer, 0, len, socketAddressBuffer,
184         options, ancillaryDataSupport, socketTimeout.get());
185     if (count > len) {
186       throw new IllegalStateException("count > len: " + count + " > " + len);
187     } else if (count == -1) {
188       throw new SocketTimeoutException();
189     } else if (count < 0) {
190       throw new IllegalStateException("count: " + count + " < 0");
191     }
192     datagramPacketBuffer.limit(count);
193     datagramPacketBuffer.rewind();
194     datagramPacketBuffer.get(p.getData(), p.getOffset(), count);
195 
196     p.setLength(count);
197 
198     A addr = AFSocketAddress.ofInternal(socketAddressBuffer, getAddressFamily());
199     p.setAddress(addr == null ? null : addr.getInetAddress());
200     p.setPort(remotePort);
201   }
202 
203   @Override
204   protected final void send(DatagramPacket p) throws IOException {
205     InetAddress addr = p.getAddress();
206     ByteBuffer sendToBuf = null;
207     int sendToBufLen = 0;
208     if (addr != null) {
209       byte[] addrBytes = AFInetAddress.unwrapAddress(addr, getAddressFamily());
210       if (addrBytes != null) {
211         sendToBuf = AFSocketAddress.SOCKETADDRESS_BUFFER_TL.get();
212         sendToBufLen = NativeUnixSocket.bytesToSockAddr(getAddressFamily().getDomain(), sendToBuf,
213             addrBytes);
214         sendToBuf.position(0);
215         if (sendToBufLen == -1) {
216           throw new SocketException("Unsupported domain");
217         }
218       }
219     }
220     FileDescriptor fdesc = core.validFdOrException();
221 
222     int len = p.getLength();
223 
224     ByteBuffer datagramPacketBuffer = core.getThreadLocalDirectByteBuffer(len);
225     datagramPacketBuffer.clear();
226     datagramPacketBuffer.put(p.getData(), p.getOffset(), p.getLength());
227     datagramPacketBuffer.flip();
228 
229     NativeUnixSocket.send(fdesc, datagramPacketBuffer, 0, len, sendToBuf, sendToBufLen,
230         /* NativeUnixSocket.OPT_NON_BLOCKING | */
231         NativeUnixSocket.OPT_DGRAM_MODE, ancillaryDataSupport);
232   }
233 
234   @Override
235   protected final int peek(InetAddress i) throws IOException {
236     throw new SocketException("Unsupported operation");
237   }
238 
239   @Override
240   protected final int peekData(DatagramPacket p) throws IOException {
241     recv(p, NativeUnixSocket.OPT_PEEK);
242     return 0;
243   }
244 
245   @Override
246   @Deprecated
247   protected final byte getTTL() throws IOException {
248     return (byte) (getTimeToLive() & 0xFF);
249   }
250 
251   @Override
252   @Deprecated
253   protected final void setTTL(byte ttl) throws IOException {
254     // ignored
255   }
256 
257   @Override
258   protected final int getTimeToLive() throws IOException {
259     return 0;
260   }
261 
262   @Override
263   protected final void setTimeToLive(int ttl) throws IOException {
264     // ignored
265   }
266 
267   @Override
268   protected final void join(InetAddress inetaddr) throws IOException {
269     throw new SocketException("Unsupported");
270   }
271 
272   @Override
273   protected final void leave(InetAddress inetaddr) throws IOException {
274     throw new SocketException("Unsupported");
275   }
276 
277   @Override
278   protected final void joinGroup(SocketAddress mcastaddr, NetworkInterface netIf)
279       throws IOException {
280     throw new SocketException("Unsupported");
281   }
282 
283   @Override
284   protected final void leaveGroup(SocketAddress mcastaddr, NetworkInterface netIf)
285       throws IOException {
286     throw new SocketException("Unsupported");
287   }
288 
289   @Override
290   public Object getOption(int optID) throws SocketException {
291     if (isClosed()) {
292       throw new SocketException("Socket is closed");
293     }
294 
295     FileDescriptor fdesc = core.validFdOrException();
296     return AFSocketImpl.getOptionDefault(fdesc, optID, socketTimeout, getAddressFamily());
297   }
298 
299   @Override
300   public void setOption(int optID, Object value) throws SocketException {
301     if (isClosed()) {
302       throw new SocketException("Socket is closed");
303     }
304 
305     FileDescriptor fdesc = core.validFdOrException();
306     AFSocketImpl.setOptionDefault(fdesc, optID, value, socketTimeout);
307   }
308 
309   @SuppressWarnings("unchecked")
310   final A receive(ByteBuffer dst) throws IOException {
311     try {
312       return (A) core.receive(dst);
313     } catch (SocketClosedException e) {
314       throw (ClosedChannelException) new ClosedChannelException().initCause(e);
315     }
316   }
317 
318   final int send(ByteBuffer src, SocketAddress target) throws IOException {
319     try {
320       return core.write(src, target, 0);
321     } catch (SocketClosedException e) {
322       throw (ClosedChannelException) new ClosedChannelException().initCause(e);
323     }
324   }
325 
326   final int read(ByteBuffer dst, ByteBuffer socketAddressBuffer) throws IOException {
327     try {
328       return core.read(dst, socketAddressBuffer, 0);
329     } catch (SocketClosedException e) {
330       throw (ClosedChannelException) new ClosedChannelException().initCause(e);
331     }
332   }
333 
334   final int write(ByteBuffer src) throws IOException {
335     try {
336       return core.write(src);
337     } catch (SocketClosedException e) {
338       throw (ClosedChannelException) new ClosedChannelException().initCause(e);
339     }
340   }
341 
342   final boolean isConnected() {
343     if (connected.get()) {
344       return true;
345     }
346     if (isClosed()) {
347       return false;
348     }
349     if (core.isConnected(false)) {
350       connected.set(true);
351       return true;
352     }
353     return false;
354   }
355 
356   final boolean isBound() {
357     if (bound.get()) {
358       return true;
359     }
360     if (isClosed()) {
361       return false;
362     }
363     if (core.isConnected(true)) {
364       bound.set(true);
365       return true;
366     }
367     return false;
368   }
369 
370   final void updatePorts(int local, int remote) {
371     this.localPort = local;
372     this.remotePort = remote;
373   }
374 
375   final @Nullable A getLocalSocketAddress() {
376     return AFSocketAddress.getSocketAddress(getFileDescriptor(), false, localPort,
377         getAddressFamily());
378   }
379 
380   final @Nullable A getRemoteSocketAddress() {
381     return AFSocketAddress.getSocketAddress(getFileDescriptor(), true, remotePort,
382         getAddressFamily());
383   }
384 
385   /**
386    * Returns the address family supported by this implementation.
387    *
388    * @return The family.
389    */
390   protected final AFAddressFamily<@NonNull A> getAddressFamily() {
391     return addressFamily;
392   }
393 
394   /**
395    * Returns the internal helper instance for address-specific extensions.
396    *
397    * @return The helper instance.
398    * @throws UnsupportedOperationException if such extensions are not supported for this address
399    *           type.
400    */
401   protected final synchronized AFSocketImplExtensions<A> getImplExtensions() {
402     if (implExtensions == null) {
403       implExtensions = addressFamily.initImplExtensions(ancillaryDataSupport);
404     }
405     return implExtensions;
406   }
407 
408   // CPD-OFF
409   @SuppressWarnings("Finally" /* errorprone */)
410   final boolean accept0(AFDatagramSocketImpl<A> socket) throws IOException {
411     FileDescriptor fdesc = core.validFdOrException();
412     if (isClosed()) {
413       throw new SocketException("Socket is closed");
414     } else if (!isBound()) {
415       throw new SocketException("Socket is not bound");
416     }
417 
418     AFSocketAddress socketAddress = core.socketAddress;
419     AFSocketAddress boundSocketAddress = getLocalSocketAddress();
420     if (boundSocketAddress != null) {
421       // Always resolve bound address from wildcard address, etc.
422       core.socketAddress = socketAddress = boundSocketAddress;
423     }
424 
425     if (socketAddress == null) {
426       throw new SocketException("Socket is not bound");
427     }
428 
429     final AFDatagramSocketImpl<A> si = socket;
430     core.incPendingAccepts();
431     try {
432       ByteBuffer ab = socketAddress.getNativeAddressDirectBuffer();
433 
434       SocketException caught = null;
435       try {
436         if (!NativeUnixSocket.accept(ab, ab.limit(), fdesc, si.fd, core.inode.get(), socketTimeout
437             .get())) {
438           return false;
439         }
440       } catch (SocketException e) { // NOPMD.ExceptionAsFlowControl
441         caught = e;
442       } finally { // NOPMD.DoNotThrowExceptionInFinally
443         if (!isBound() || isClosed()) {
444           if (getCore().isShutdownOnClose()) {
445             try {
446               NativeUnixSocket.shutdown(si.fd, SHUT_RD_WR);
447             } catch (Exception e) {
448               // ignore
449             }
450           }
451           try {
452             NativeUnixSocket.close(si.fd);
453           } catch (Exception e) {
454             // ignore
455           }
456           if (caught != null) {
457             throw caught;
458           } else {
459             throw new SocketClosedException("Socket is closed");
460           }
461         } else if (caught != null) {
462           throw caught;
463         }
464       }
465     } finally {
466       core.decPendingAccepts();
467     }
468     si.setSocketAddress(socketAddress);
469     si.connected.set(true);
470 
471     return true;
472   }
473 
474   final int getLocalPort1() {
475     return localPort;
476   }
477 
478   final int getRemotePort() {
479     return remotePort;
480   }
481 
482   final void setSocketAddress(AFSocketAddress socketAddress) {
483     if (socketAddress == null) {
484       this.core.socketAddress = null;
485       this.localPort = -1;
486     } else {
487       this.core.socketAddress = socketAddress;
488       if (this.localPort <= 0) {
489         this.localPort = socketAddress.getPort();
490       }
491     }
492   }
493 }