View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix;
19  
20  import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD;
21  import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD_WR;
22  import static org.newsclub.net.unix.NativeUnixSocket.SHUT_WR;
23  
24  import java.io.EOFException;
25  import java.io.FileDescriptor;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.InterruptedIOException;
29  import java.io.OutputStream;
30  import java.net.InetAddress;
31  import java.net.SocketAddress;
32  import java.net.SocketException;
33  import java.net.SocketImpl;
34  import java.net.SocketOption;
35  import java.net.SocketOptions;
36  import java.net.SocketTimeoutException;
37  import java.nio.ByteBuffer;
38  import java.nio.channels.SelectionKey;
39  import java.util.Objects;
40  import java.util.Set;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  
44  import org.eclipse.jdt.annotation.NonNull;
45  import org.eclipse.jdt.annotation.Nullable;
46  import org.newsclub.net.unix.pool.MutableHolder;
47  import org.newsclub.net.unix.pool.ObjectPool.Lease;
48  
49  /**
50   * junixsocket-based {@link SocketImpl}.
51   *
52   * @author Christian Kohlschütter
53   * @param <A> The supported address type.
54   */
55  @SuppressWarnings({
56      "PMD.CyclomaticComplexity", "PMD.CouplingBetweenObjects",
57      "UnsafeFinalization" /* errorprone */})
58  public abstract class AFSocketImpl<A extends AFSocketAddress> extends SocketImplShim {
59    private static final int SHUTDOWN_RD_WR = (1 << SHUT_RD) | (1 << SHUT_WR);
60  
61    private final AFSocketStreamCore core;
62    final AncillaryDataSupport ancillaryDataSupport = new AncillaryDataSupport();
63  
64    private final AtomicBoolean bound = new AtomicBoolean(false);
65    private Boolean createType = null;
66    private final AtomicBoolean connected = new AtomicBoolean(false);
67  
68    private volatile boolean closedInputStream = false;
69    private volatile boolean closedOutputStream = false;
70  
71    private final AFInputStream in;
72    private final AFOutputStream out;
73  
74    private boolean reuseAddr = true;
75  
76    private final AtomicInteger socketTimeout = new AtomicInteger(0);
77    private final AFAddressFamily<A> addressFamily;
78  
79    private int shutdownState = 0;
80  
81    private AFSocketImplExtensions<A> implExtensions = null;
82  
83    private final AtomicBoolean closed = new AtomicBoolean(false);
84  
85    /**
86     * When the {@link AFSocketImpl} becomes unreachable (but not yet closed), we must ensure that the
87     * underlying socket and all related file descriptors are closed.
88     *
89     * @author Christian Kohlschütter
90     */
91    static final class AFSocketStreamCore extends AFSocketCore {
92      AFSocketStreamCore(AFSocketImpl<?> observed, FileDescriptor fd,
93          AncillaryDataSupport ancillaryDataSupport, AFAddressFamily<?> af) {
94        super(observed, fd, ancillaryDataSupport, af, false);
95      }
96  
97      void createSocket(FileDescriptor fdTarget, AFSocketType type) throws IOException {
98        NativeUnixSocket.createSocket(fdTarget, addressFamily().getDomain(), type.getId());
99      }
100 
101     /**
102      * Unblock other threads that are currently waiting on accept, simply by connecting to the
103      * socket.
104      */
105     @Override
106     protected void unblockAccepts() {
107       if (socketAddress == null || socketAddress.getBytes() == null || inode.get() < 0) {
108         return;
109       }
110       if (!hasPendingAccepts()) {
111         return;
112       }
113       try {
114         ThreadUtil.runOnSystemThread(this::unblockAccepts0);
115       } catch (InterruptedException e) {
116         // ignore
117       }
118     }
119 
120     private void unblockAccepts0() {
121       while (hasPendingAccepts()) {
122         try {
123           FileDescriptor tmpFd = new FileDescriptor();
124 
125           try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
126             createSocket(tmpFd, AFSocketType.SOCK_STREAM);
127             ByteBuffer ab = abLease.get();
128             NativeUnixSocket.connect(ab, ab.limit(), tmpFd, inode.get());
129           } catch (IOException e) {
130             // there's nothing more we can do to unlock these accepts
131             // (e.g., SocketException: No such file or directory)
132             return;
133           }
134           if (isShutdownOnClose()) {
135             try {
136               NativeUnixSocket.shutdown(tmpFd, SHUT_RD_WR);
137             } catch (Exception e) {
138               // ignore
139             }
140           }
141           try {
142             NativeUnixSocket.close(tmpFd);
143           } catch (Exception e) {
144             // ignore
145           }
146         } catch (RuntimeException e) {
147           // ignore
148         }
149 
150         // sleep a little to give the cleaners some CPU time to actually clean up
151         try {
152           Thread.sleep(5);
153         } catch (InterruptedException e) {
154           // ignore
155         }
156       }
157     }
158   }
159 
160   /**
161    * Creates a new {@link AFSocketImpl} instance.
162    *
163    * @param addressFamily The address family.
164    * @param fdObj The socket's {@link FileDescriptor}.
165    */
166   protected AFSocketImpl(AFAddressFamily<@NonNull A> addressFamily, FileDescriptor fdObj) {
167     super();
168     this.addressFamily = addressFamily;
169     this.address = InetAddress.getLoopbackAddress();
170     this.core = new AFSocketStreamCore(this, fdObj, ancillaryDataSupport, addressFamily);
171     this.fd = core.fd;
172     this.in = newInputStream();
173     this.out = newOutputStream();
174   }
175 
176   /**
177    * Creates a new {@link InputStream} for this socket.
178    *
179    * @return The new stream.
180    */
181   protected final AFInputStream newInputStream() {
182     return new AFInputStreamImpl();
183   }
184 
185   /**
186    * Creates a new {@link OutputStream} for this socket.
187    *
188    * @return The new stream.
189    */
190   protected final AFOutputStream newOutputStream() {
191     return new AFOutputStreamImpl();
192   }
193 
194   final FileDescriptor getFD() {
195     return fd;
196   }
197 
198   // CPD-OFF
199   final boolean isConnected() {
200     if (connected.get()) {
201       return true;
202     }
203     if (isClosed()) {
204       return false;
205     }
206     if (core.isConnected(false)) {
207       connected.set(true);
208       return true;
209     }
210     return false;
211   }
212 
213   final boolean isBound() {
214     if (bound.get()) {
215       return true;
216     }
217     if (isClosed()) {
218       return false;
219     }
220     if (core.isConnected(true)) {
221       bound.set(true);
222       return true;
223     }
224     return false;
225   }
226 
227   final AFSocketCore getCore() {
228     return core;
229   }
230 
231   boolean isClosed() {
232     return closed.get() || core.isClosed();
233   }
234   // CPD-ON
235 
236   @Override
237   protected final void accept(SocketImpl socket) throws IOException {
238     accept0(socket);
239   }
240 
241   @SuppressWarnings({
242       "Finally" /* errorprone */, //
243       "PMD.CognitiveComplexity", "PMD.NPathComplexity", "PMD.NcssCount"})
244   final boolean accept0(SocketImpl socket) throws IOException {
245     FileDescriptor fdesc = core.validFdOrException();
246     if (isClosed()) {
247       throw new SocketClosedException();
248     } else if (!isBound()) {
249       throw new NotBoundSocketException();
250     }
251 
252     AFSocketAddress socketAddress = core.socketAddress;
253     AFSocketAddress boundSocketAddress = getLocalSocketAddress();
254     if (boundSocketAddress != null) {
255       // Always resolve bound address from wildcard address, etc.
256       core.socketAddress = socketAddress = boundSocketAddress;
257     }
258 
259     if (socketAddress == null) {
260       throw new NotBoundSocketException();
261     }
262 
263     @SuppressWarnings("unchecked")
264     final AFSocketImpl<A> si = (AFSocketImpl<A>) socket;
265     core.incPendingAccepts();
266 
267     final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
268         .isVirtualBlocking();
269 
270     long now = virtualBlocking ? System.currentTimeMillis() : 0;
271     boolean park = false;
272     virtualThreadLoop : do {
273       if (virtualBlocking) {
274         if (park) {
275           VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_ACCEPT, now,
276               socketTimeout::get, this::close);
277         }
278       }
279 
280       try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
281         ByteBuffer ab = abLease.get();
282 
283         SocketException caught = null;
284         try {
285           boolean success;
286           if (virtualBlocking) {
287             core.configureVirtualBlocking(true);
288           }
289           try {
290             success = NativeUnixSocket.accept(ab, ab.limit(), fdesc, si.fd, core.inode.get(),
291                 socketTimeout.get());
292           } catch (SocketTimeoutException e) {
293             if (virtualBlocking) {
294               // try again
295               park = true;
296               continue virtualThreadLoop;
297             } else {
298               throw e;
299             }
300           } finally {
301             if (virtualBlocking) {
302               core.configureVirtualBlocking(false);
303             }
304           }
305 
306           if (virtualBlocking) {
307             if (success) {
308               // mark the accepted socket as blocking if necessary
309               NativeUnixSocket.configureBlocking(si.fd, core.isBlocking());
310             } else {
311               // try again
312               park = true;
313               continue virtualThreadLoop;
314             }
315           }
316         } catch (NotConnectedSocketException | SocketClosedException // NOPMD.ExceptionAsFlowControl
317             | BrokenPipeSocketException e) {
318           try {
319             close();
320           } catch (Exception e2) {
321             e.addSuppressed(e2);
322           }
323           throw e;
324         } catch (SocketException e) { // NOPMD.ExceptionAsFlowControl
325           caught = e;
326         } finally { // NOPMD.DoNotThrowExceptionInFinally
327           if (!isBound() || isClosed()) {
328             if (getCore().isShutdownOnClose()) {
329               try {
330                 NativeUnixSocket.shutdown(si.fd, SHUT_RD_WR);
331               } catch (Exception e) {
332                 // ignore
333               }
334             }
335             try {
336               NativeUnixSocket.close(si.fd);
337             } catch (Exception e) {
338               // ignore
339             }
340             if (caught != null) {
341               throw caught;
342             } else {
343               throw new BrokenPipeSocketException("Socket is closed");
344             }
345           } else if (caught != null) {
346             throw caught;
347           }
348         }
349       } finally {
350         core.decPendingAccepts();
351       }
352       break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
353     } while (true); // NOPMD.WhileLoopWithLiteralBoolean
354 
355     if (!si.fd.valid()) {
356       return false;
357     }
358 
359     si.setSocketAddress(socketAddress);
360     si.connected.set(true);
361 
362     return true;
363   }
364 
365   final void setSocketAddress(AFSocketAddress socketAddress) {
366     if (socketAddress == null) {
367       this.core.socketAddress = null;
368       this.address = null;
369       this.localport = -1;
370     } else {
371       this.core.socketAddress = socketAddress;
372       this.address = socketAddress.getAddress();
373       if (this.localport <= 0) {
374         this.localport = socketAddress.getPort();
375       }
376     }
377   }
378 
379   @Override
380   protected final int available() throws IOException {
381     FileDescriptor fdesc = core.validFdOrException();
382     try (Lease<MutableHolder<ByteBuffer>> lease = core.getPrivateDirectByteBuffer(0)) {
383       return NativeUnixSocket.available(fdesc, lease.get().get());
384     }
385   }
386 
387   final void bind(SocketAddress addr, int options) throws IOException {
388     if (addr == null) {
389       addr = addressFamily.nullBindAddress();
390       if (addr == null) {
391         throw new UnsupportedOperationException("Cannot bind to null address");
392       }
393     }
394 
395     if (addr == AFSocketAddress.INTERNAL_DUMMY_BIND) { // NOPMD
396       bound.set(true);
397       core.inode.set(0);
398       return;
399     }
400 
401     addr = AFSocketAddress.mapOrFail(addr, addressFamily.getSocketAddressClass());
402     bound.set(true);
403 
404     AFSocketAddress socketAddress = Objects.requireNonNull((AFSocketAddress) addr);
405 
406     this.setSocketAddress(socketAddress);
407     try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
408       ByteBuffer ab = abLease.get();
409       long inode = NativeUnixSocket.bind(ab, ab.limit(), fd, options);
410       core.inode.set(inode);
411     }
412     core.validFdOrException();
413   }
414 
415   @Override
416   @SuppressWarnings("hiding")
417   protected final void bind(InetAddress host, int port) throws IOException {
418     // ignored
419   }
420 
421   private void checkClose() throws IOException {
422     if (closedInputStream && closedOutputStream) {
423       close();
424     }
425   }
426 
427   @Override
428   protected final void close() throws IOException {
429     this.closed.set(true);
430     try {
431       shutdown();
432     } catch (NotConnectedSocketException | SocketClosedException e) {
433       // ignore
434     }
435 
436     core.runCleaner();
437   }
438 
439   @Override
440   @SuppressWarnings("hiding")
441   protected final void connect(String host, int port) throws IOException {
442     throw new SocketException("Cannot bind to this type of address: " + InetAddress.class);
443   }
444 
445   @Override
446   @SuppressWarnings("hiding")
447   protected final void connect(InetAddress address, int port) throws IOException {
448     throw new SocketException("Cannot bind to this type of address: " + InetAddress.class);
449   }
450 
451   @Override
452   protected final void connect(SocketAddress addr, int connectTimeout) throws IOException {
453     connect0(addr, connectTimeout);
454   }
455 
456   @SuppressWarnings({"PMD.CognitiveComplexity", "PMD.NPathComplexity", "PMD.NcssCount"})
457   final boolean connect0(SocketAddress addr, int connectTimeout) throws IOException {
458     if (addr == AFSocketAddress.INTERNAL_DUMMY_CONNECT) { // NOPMD
459       this.connected.set(true);
460       return true;
461     } else if (addr == AFSocketAddress.INTERNAL_DUMMY_DONT_CONNECT) { // NOPMD)
462       return false;
463     }
464 
465     addr = AFSocketAddress.mapOrFail(addr, addressFamily.getSocketAddressClass());
466     AFSocketAddress socketAddress = Objects.requireNonNull((AFSocketAddress) addr);
467 
468     final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
469         .isVirtualBlocking();
470     long now = virtualBlocking ? System.currentTimeMillis() : 0;
471 
472     /**
473      * If set, a two-phase connect is in flight, and the value holds the connect timeout.
474      */
475     AFSupplier<Integer> virtualConnectTimeout = null;
476 
477     if (virtualBlocking) {
478       core.configureVirtualBlocking(true);
479     }
480     boolean park = false;
481     try {
482       virtualThreadLoop : do {
483         try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
484           ByteBuffer ab = abLease.get();
485           boolean success = false;
486           boolean ignoreSpuriousTimeout = true;
487           do {
488             if (virtualBlocking) {
489               if (virtualConnectTimeout != null) {
490                 if (park) {
491                   VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fd, SelectionKey.OP_CONNECT,
492                       now, virtualConnectTimeout, this::close);
493                 }
494               } else {
495                 Thread.yield();
496               }
497             }
498 
499             if (virtualBlocking) {
500               core.configureVirtualBlocking(true);
501             }
502             try {
503               success = NativeUnixSocket.connect(ab, ab.limit(), fd, -2);
504               if (!success && virtualBlocking) {
505                 // try again (non-blocking timeout)
506                 if (virtualConnectTimeout == null) {
507                   virtualConnectTimeout = () -> connectTimeout;
508                 }
509                 park = true;
510                 continue virtualThreadLoop;
511               }
512               break;
513             } catch (SocketTimeoutException e) {
514               // Ignore spurious timeout once when SO_TIMEOUT==0
515               // seen on older Linux kernels with AF_VSOCK running in qemu
516               if (ignoreSpuriousTimeout) {
517                 Object o = getOption(SocketOptions.SO_TIMEOUT);
518                 if (o instanceof Integer) {
519                   if (((Integer) o) == 0) {
520                     ignoreSpuriousTimeout = false;
521                     continue;
522                   }
523                 } else if (o == null) {
524                   ignoreSpuriousTimeout = false;
525                   continue;
526                 }
527               }
528               throw e;
529             } catch (NotConnectedSocketException | SocketClosedException
530                 | BrokenPipeSocketException e) {
531               try {
532                 close();
533               } catch (Exception e2) {
534                 e.addSuppressed(e2);
535               }
536               throw e;
537             } catch (SocketException e) {
538               if (virtualBlocking) {
539                 Thread.yield();
540               }
541               throw e;
542             } finally {
543               if (virtualBlocking) {
544                 core.configureVirtualBlocking(false);
545               }
546             }
547           } while (ThreadUtil.checkNotInterruptedOrThrow());
548           if (success) {
549             setSocketAddress(socketAddress);
550             this.connected.set(true);
551           }
552           core.validFdOrException();
553           return success;
554         }
555       } while (true); // NOPMD.WhileLoopWithLiteralBoolean
556     } finally {
557       if (virtualBlocking) {
558         core.configureVirtualBlocking(true);
559       }
560     }
561   }
562 
563   @Override
564   protected final void create(boolean stream) throws IOException {
565     if (isClosed()) {
566       throw new SocketException("Already closed");
567     }
568     if (fd.valid()) {
569       if (createType != null) {
570         if (createType.booleanValue() != stream) { // NOPMD.UnnecessaryBoxing
571           throw new IllegalStateException("Already created with different mode");
572         }
573       } else {
574         createType = stream;
575       }
576       return;
577     }
578     createType = stream;
579     createSocket(fd, stream ? AFSocketType.SOCK_STREAM : AFSocketType.SOCK_DGRAM);
580   }
581 
582   @Override
583   protected final AFInputStream getInputStream() throws IOException {
584     if (!isConnected() && !isBound()) {
585       close();
586       throw new SocketClosedException("Not connected/not bound");
587     }
588     core.validFdOrException();
589     return in;
590   }
591 
592   @Override
593   protected final AFOutputStream getOutputStream() throws IOException {
594     if (!isClosed() && !isBound()) {
595       close();
596       throw new SocketClosedException("Not connected/not bound");
597     }
598     core.validFdOrException();
599     return out;
600   }
601 
602   @Override
603   protected final void listen(int backlog) throws IOException {
604     FileDescriptor fdesc = core.validFdOrException();
605     if (backlog <= 0) {
606       backlog = 50;
607     }
608     NativeUnixSocket.listen(fdesc, backlog);
609   }
610 
611   @Override
612   protected final boolean supportsUrgentData() {
613     return false;
614   }
615 
616   @Override
617   protected final void sendUrgentData(int data) throws IOException {
618     throw new UnsupportedOperationException();
619   }
620 
621   private final class AFInputStreamImpl extends AFInputStream {
622     private volatile boolean streamClosed = false;
623     private final AtomicBoolean eofReached = new AtomicBoolean(false);
624 
625     private final int defaultOpt = (core.isBlocking() ? 0 : NativeUnixSocket.OPT_NON_BLOCKING);
626 
627     @SuppressWarnings("PMD.CognitiveComplexity")
628     @Override
629     public int read(byte[] buf, int off, int len) throws IOException {
630       if (streamClosed) {
631         throw new SocketClosedException("This InputStream has already been closed.");
632       }
633       if (eofReached.get()) {
634         return -1;
635       }
636 
637       FileDescriptor fdesc = core.validFdOrException();
638       if (len == 0) {
639         return 0;
640       } else if (off < 0 || len < 0 || (len > buf.length - off)) {
641         throw new IndexOutOfBoundsException();
642       }
643 
644       final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
645           .isVirtualBlocking();
646       final long now;
647       final int opt;
648       if (virtualBlocking) {
649         now = System.currentTimeMillis();
650         opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
651       } else {
652         now = 0;
653         opt = defaultOpt;
654       }
655 
656       int read;
657 
658       boolean park = false;
659       virtualThreadLoop : do {
660         if (virtualBlocking) {
661           if (park) {
662             VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_READ, now,
663                 socketTimeout::get, this::forceCloseSocket);
664           }
665           core.configureVirtualBlocking(true);
666         }
667 
668         try {
669           read = NativeUnixSocket.read(fdesc, buf, off, len, opt, ancillaryDataSupport,
670               socketTimeout.get());
671           if (read == -2) {
672             if (virtualBlocking) {
673               // sleep again
674               park = true;
675               continue virtualThreadLoop;
676             } else {
677               read = 0;
678             }
679           }
680         } catch (SocketTimeoutException e) {
681           if (virtualBlocking) {
682             // sleep again
683             park = true;
684             continue virtualThreadLoop;
685           } else {
686             throw e;
687           }
688         } catch (EOFException e) {
689           eofReached.set(true);
690           throw e;
691         } finally {
692           if (virtualBlocking) {
693             core.configureVirtualBlocking(false);
694           }
695         }
696         break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
697       } while (true); // NOPMD.WhileLoopWithLiteralBoolean
698 
699       return read;
700     }
701 
702     @SuppressWarnings("PMD.CognitiveComplexity")
703     @Override
704     public int read() throws IOException {
705       FileDescriptor fdesc = core.validFdOrException();
706 
707       if (eofReached.get()) {
708         return -1;
709       }
710 
711       // CPD-OFF
712       final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
713           .isVirtualBlocking();
714       final long now;
715       final int opt;
716       if (virtualBlocking) {
717         now = System.currentTimeMillis();
718         opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
719       } else {
720         now = 0;
721         opt = defaultOpt;
722       }
723 
724       boolean park = false;
725       virtualThreadLoop : do {
726         if (virtualBlocking) {
727           if (park) {
728             VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_READ, now,
729                 socketTimeout::get, this::forceCloseSocket);
730           }
731           core.configureVirtualBlocking(true);
732         }
733 
734         try {
735           int byteRead = NativeUnixSocket.read(fdesc, null, 0, 1, opt, ancillaryDataSupport,
736               socketTimeout.get());
737           if (byteRead < 0) {
738             if (byteRead == -2) {
739               if (virtualBlocking) {
740                 // sleep again
741                 park = true;
742                 continue virtualThreadLoop;
743               } else {
744                 byteRead = -1;
745               }
746             }
747             eofReached.set(true);
748             return -1;
749           } else {
750             return byteRead;
751           }
752         } catch (SocketTimeoutException e) {
753           if (virtualBlocking) {
754             // sleep again
755             park = true;
756             continue virtualThreadLoop;
757           } else {
758             throw e;
759           }
760         } finally {
761           if (virtualBlocking) {
762             core.configureVirtualBlocking(false);
763           }
764         }
765       } while (true); // NOPMD.WhileLoopWithLiteralBoolean
766 
767       // CPD-ON
768     }
769 
770     private void forceCloseSocket() throws IOException {
771       closedOutputStream = true;
772       close();
773     }
774 
775     @Override
776     public synchronized void close() throws IOException {
777       if (streamClosed || isClosed()) {
778         return;
779       }
780       streamClosed = true;
781       FileDescriptor fdesc = core.validFd();
782       if (fdesc != null && getCore().isShutdownOnClose()) {
783         NativeUnixSocket.shutdown(fdesc, SHUT_RD);
784       }
785 
786       closedInputStream = true;
787       checkClose();
788     }
789 
790     @Override
791     public int available() throws IOException {
792       if (streamClosed) {
793         throw new SocketClosedException("This InputStream has already been closed.");
794       }
795 
796       return AFSocketImpl.this.available();
797     }
798 
799     @Override
800     public FileDescriptor getFileDescriptor() throws IOException {
801       return getFD();
802     }
803 
804   }
805 
806   private static boolean checkWriteInterruptedException(int bytesTransferred)
807       throws InterruptedIOException {
808     if (Thread.currentThread().isInterrupted()) {
809       InterruptedIOException ex = new InterruptedIOException("write");
810       ex.bytesTransferred = bytesTransferred;
811       throw ex;
812     }
813     return true;
814   }
815 
816   private final class AFOutputStreamImpl extends AFOutputStream {
817     private volatile boolean streamClosed = false;
818 
819     private final int defaultOpt = (core.isBlocking() ? 0 : NativeUnixSocket.OPT_NON_BLOCKING);
820 
821     @SuppressWarnings("PMD.CognitiveComplexity")
822     @Override
823     public void write(int oneByte) throws IOException {
824       FileDescriptor fdesc = core.validFdOrException();
825 
826       final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
827           .isVirtualBlocking();
828       final long now;
829       final int opt;
830       if (virtualBlocking) {
831         now = System.currentTimeMillis();
832         opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
833       } else {
834         now = 0;
835         opt = defaultOpt;
836       }
837 
838       boolean park = false;
839       virtualThreadLoop : do {
840         if (virtualBlocking) {
841           if (park) {
842             VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
843                 socketTimeout::get, this::forceCloseSocket);
844           }
845           core.configureVirtualBlocking(true);
846         }
847 
848         try {
849           int written;
850           do {
851             written = NativeUnixSocket.write(fdesc, null, oneByte, 1, opt, ancillaryDataSupport);
852             if (written != 0) {
853               break;
854             }
855             if (virtualBlocking) {
856               park = true;
857               continue virtualThreadLoop;
858             }
859           } while (checkWriteInterruptedException(0));
860         } catch (NotConnectedSocketException | SocketClosedException
861             | BrokenPipeSocketException e) {
862           try {
863             forceCloseSocket();
864           } catch (Exception e2) {
865             e.addSuppressed(e2);
866           }
867           throw e;
868         } catch (SocketTimeoutException e) {
869           if (virtualBlocking) {
870             // try again
871             park = true;
872             continue virtualThreadLoop;
873           } else {
874             throw e;
875           }
876         } finally {
877           if (virtualBlocking) {
878             core.configureVirtualBlocking(false);
879           }
880         }
881         break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
882       } while (true); // NOPMD.WhileLoopWithLiteralBoolean
883     }
884 
885     @SuppressWarnings({"PMD.CognitiveComplexity", "PMD.NPathComplexity"})
886     @Override
887     public void write(byte[] buf, int off, int len) throws IOException {
888       if (streamClosed) {
889         throw new SocketException("This OutputStream has already been closed.");
890       }
891       if (len < 0 || off < 0 || len > buf.length - off) {
892         throw new IndexOutOfBoundsException();
893       }
894       FileDescriptor fdesc = core.validFdOrException();
895 
896       // NOTE: writing messages with len == 0 should be permissible (unless ignored in native code)
897       // For certain sockets, empty messages can be used to probe if the remote connection is alive
898       if (len == 0 && !AFSocket.supports(AFSocketCapability.CAPABILITY_ZERO_LENGTH_SEND)) {
899         return;
900       }
901 
902       final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
903           .isVirtualBlocking();
904       final long now;
905       final int opt;
906       if (virtualBlocking) {
907         now = System.currentTimeMillis();
908         opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
909       } else {
910         now = 0;
911         opt = defaultOpt;
912       }
913 
914       int writtenTotal = 0;
915       do {
916         boolean park = false;
917         virtualThreadLoop : do {
918           if (virtualBlocking) {
919             if (park) {
920               VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
921                   socketTimeout::get, this::forceCloseSocket);
922             }
923             core.configureVirtualBlocking(true);
924           }
925 
926           final int written;
927           try {
928             written = NativeUnixSocket.write(fdesc, buf, off, len, opt, ancillaryDataSupport);
929             if (written == 0 && virtualBlocking) {
930               // try again
931               park = true;
932               continue virtualThreadLoop;
933             }
934             if (written < 0) {
935               if (len == 0) {
936                 // This exception is only useful to detect OS-level bugs that we need to
937                 // work-around
938                 // in native code.
939                 // throw new IOException("Error while writing zero-length byte array; try -D"
940                 // + AFSocket.PROP_LIBRARY_DISABLE_CAPABILITY_PREFIX
941                 // + AFSocketCapability.CAPABILITY_ZERO_LENGTH_SEND.name() + "=true");
942 
943                 // ignore
944                 return;
945               } else {
946                 throw new IOException("Unspecific error while writing");
947               }
948             }
949           } catch (NotConnectedSocketException | SocketClosedException
950               | BrokenPipeSocketException e) {
951             try {
952               forceCloseSocket();
953             } catch (Exception e2) {
954               e.addSuppressed(e2);
955             }
956             throw e;
957           } catch (SocketTimeoutException e) {
958             if (virtualBlocking) {
959               // try again
960               park = true;
961               continue virtualThreadLoop;
962             } else {
963               throw e;
964             }
965           } finally {
966             if (virtualBlocking) {
967               core.configureVirtualBlocking(false);
968             }
969           }
970 
971           len -= written;
972           off += written;
973           writtenTotal += written;
974           break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
975         } while (true); // NOPMD.WhileLoopWithLiteralBoolean
976 
977       } while (len > 0 && checkWriteInterruptedException(writtenTotal));
978     }
979 
980     private void forceCloseSocket() throws IOException {
981       closedInputStream = true;
982       close();
983     }
984 
985     @Override
986     public synchronized void close() throws IOException {
987       if (streamClosed || isClosed()) {
988         return;
989       }
990       streamClosed = true;
991       FileDescriptor fdesc = core.validFd();
992       if (fdesc != null && getCore().isShutdownOnClose()) {
993         NativeUnixSocket.shutdown(fdesc, SHUT_WR);
994       }
995       closedOutputStream = true;
996       checkClose();
997     }
998 
999     @Override
1000     public FileDescriptor getFileDescriptor() throws IOException {
1001       return getFD();
1002     }
1003   }
1004 
1005   @Override
1006   public final String toString() {
1007     return super.toString() + "[fd=" + fd + "; addr=" + this.core.socketAddress + "; connected="
1008         + connected + "; bound=" + bound + "]";
1009   }
1010 
1011   private static int expectInteger(Object value) throws SocketException {
1012     if (value == null) {
1013       throw (SocketException) new SocketException("Value must not be null").initCause(
1014           new NullPointerException());
1015     }
1016     try {
1017       return (Integer) value;
1018     } catch (final ClassCastException e) {
1019       throw (SocketException) new SocketException("Unsupported value: " + value).initCause(e);
1020     }
1021   }
1022 
1023   private static int expectBoolean(Object value) throws SocketException {
1024     if (value == null) {
1025       throw (SocketException) new SocketException("Value must not be null").initCause(
1026           new NullPointerException());
1027     }
1028     try {
1029       return ((Boolean) value) ? 1 : 0;
1030     } catch (final ClassCastException e) {
1031       throw (SocketException) new SocketException("Unsupported value: " + value).initCause(e);
1032     }
1033   }
1034 
1035   @Override
1036   public Object getOption(int optID) throws SocketException {
1037     return getOption0(optID);
1038   }
1039 
1040   private Object getOption0(int optID) throws SocketException {
1041     if (isClosed()) {
1042       throw new SocketException("Socket is closed");
1043     }
1044     if (optID == SocketOptions.SO_REUSEADDR) {
1045       return reuseAddr;
1046     }
1047 
1048     FileDescriptor fdesc = core.validFdOrException();
1049     return getOptionDefault(fdesc, optID, socketTimeout, addressFamily);
1050   }
1051 
1052   static final Object getOptionDefault(FileDescriptor fdesc, int optID, AtomicInteger acceptTimeout,
1053       AFAddressFamily<?> af) throws SocketException {
1054     try {
1055       switch (optID) {
1056         case SocketOptions.SO_KEEPALIVE:
1057           try {
1058             return (NativeUnixSocket.getSocketOptionInt(fdesc, optID) != 0);
1059           } catch (SocketException e) {
1060             // ignore
1061             return false;
1062           }
1063         case SocketOptions.TCP_NODELAY:
1064           return (NativeUnixSocket.getSocketOptionInt(fdesc, optID) != 0);
1065         case SocketOptions.SO_TIMEOUT:
1066           int v = Math.max(NativeUnixSocket.getSocketOptionInt(fdesc, 0x1005), NativeUnixSocket
1067               .getSocketOptionInt(fdesc, 0x1006));
1068           if (v == -1) {
1069             // special value, meaning: do not override infinite timeout from native code
1070             return 0;
1071           }
1072           return Math.max((acceptTimeout == null ? 0 : acceptTimeout.get()), v);
1073         case SocketOptions.SO_LINGER:
1074         case SocketOptions.SO_RCVBUF:
1075         case SocketOptions.SO_SNDBUF:
1076           return NativeUnixSocket.getSocketOptionInt(fdesc, optID);
1077         case SocketOptions.IP_TOS:
1078           return 0;
1079         case SocketOptions.SO_BINDADDR:
1080           return AFSocketAddress.getInetAddress(fdesc, false, af);
1081         case SocketOptions.SO_REUSEADDR:
1082           return false;
1083         default:
1084           throw new SocketException("Unsupported option: " + optID);
1085       }
1086     } catch (final SocketException e) {
1087       throw e;
1088     } catch (final Exception e) {
1089       throw (SocketException) new SocketException("Could not get option").initCause(e);
1090     }
1091   }
1092 
1093   @Override
1094   public void setOption(int optID, Object value) throws SocketException {
1095     setOption0(optID, value);
1096   }
1097 
1098   private void setOption0(int optID, Object value) throws SocketException {
1099     if (isClosed()) {
1100       throw new SocketException("Socket is closed");
1101     }
1102     if (optID == SocketOptions.SO_REUSEADDR) {
1103       reuseAddr = (expectBoolean(value) != 0);
1104       return;
1105     }
1106 
1107     FileDescriptor fdesc = core.validFdOrException();
1108     setOptionDefault(fdesc, optID, value, socketTimeout);
1109   }
1110 
1111   /**
1112    * Like {@link #getOption(int)}, but ignores exceptions for certain option IDs.
1113    *
1114    * @param optID The option ID.
1115    * @return The value.
1116    * @throws SocketException on error.
1117    */
1118   protected final Object getOptionLenient(int optID) throws SocketException {
1119     try {
1120       return getOption0(optID);
1121     } catch (SocketException e) {
1122       switch (optID) {
1123         case SocketOptions.TCP_NODELAY:
1124         case SocketOptions.SO_KEEPALIVE:
1125           return false;
1126         default:
1127           throw e;
1128       }
1129     }
1130   }
1131 
1132   /**
1133    * Like {@link #setOption(int, Object)}, but ignores exceptions for certain option IDs.
1134    *
1135    * @param optID The option ID.
1136    * @param value The value.
1137    * @throws SocketException on error.
1138    */
1139   protected final void setOptionLenient(int optID, Object value) throws SocketException {
1140     try {
1141       setOption0(optID, value);
1142     } catch (SocketException e) {
1143       switch (optID) {
1144         case SocketOptions.TCP_NODELAY:
1145           return;
1146         default:
1147           throw e;
1148       }
1149     }
1150   }
1151 
1152   static final void setOptionDefault(FileDescriptor fdesc, int optID, Object value,
1153       AtomicInteger acceptTimeout) throws SocketException {
1154     try {
1155       switch (optID) {
1156         case SocketOptions.SO_LINGER:
1157 
1158           if (value instanceof Boolean) {
1159             final boolean b = (Boolean) value;
1160             if (b) {
1161               throw new SocketException("Only accepting Boolean.FALSE here");
1162             }
1163             NativeUnixSocket.setSocketOptionInt(fdesc, optID, -1);
1164             return;
1165           }
1166           NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectInteger(value));
1167           return;
1168         case SocketOptions.SO_TIMEOUT: {
1169           int timeout = expectInteger(value);
1170           try {
1171             NativeUnixSocket.setSocketOptionInt(fdesc, 0x1005, timeout);
1172           } catch (InvalidArgumentSocketException e) {
1173             // Perhaps the socket is shut down?
1174           }
1175           try {
1176             NativeUnixSocket.setSocketOptionInt(fdesc, 0x1006, timeout);
1177           } catch (InvalidArgumentSocketException e) {
1178             // Perhaps the socket is shut down?
1179           }
1180           if (acceptTimeout != null) {
1181             acceptTimeout.set(timeout);
1182           }
1183           return;
1184         }
1185         case SocketOptions.SO_RCVBUF:
1186         case SocketOptions.SO_SNDBUF:
1187           NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectInteger(value));
1188           return;
1189         case SocketOptions.SO_KEEPALIVE:
1190           try {
1191             NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectBoolean(value));
1192           } catch (SocketException e) {
1193             // ignore
1194           }
1195           return;
1196         case SocketOptions.TCP_NODELAY:
1197           NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectBoolean(value));
1198           return;
1199         case SocketOptions.IP_TOS:
1200           // ignore
1201           return;
1202         case SocketOptions.SO_REUSEADDR:
1203           // ignore
1204           return;
1205         default:
1206           throw new SocketException("Unsupported option: " + optID);
1207       }
1208     } catch (final SocketException e) {
1209       throw e;
1210     } catch (final Exception e) {
1211       throw (SocketException) new SocketException("Error while setting option").initCause(e);
1212     }
1213   }
1214 
1215   /**
1216    * Shuts down both input and output at once. Equivalent to calling {@link #shutdownInput()} and
1217    * {@link #shutdownOutput()}.
1218    *
1219    * @throws IOException on error.
1220    */
1221   protected final void shutdown() throws IOException {
1222     FileDescriptor fdesc = core.validFd();
1223     if (fdesc != null) {
1224       NativeUnixSocket.shutdown(fdesc, SHUT_RD_WR);
1225       shutdownState = 0;
1226     }
1227   }
1228 
1229   @Override
1230   protected final void shutdownInput() throws IOException {
1231     FileDescriptor fdesc = core.validFd();
1232     if (fdesc != null) {
1233       NativeUnixSocket.shutdown(fdesc, SHUT_RD);
1234       shutdownState |= 1 << (SHUT_RD);
1235       if (shutdownState == SHUTDOWN_RD_WR) {
1236         NativeUnixSocket.shutdown(fdesc, SHUT_RD_WR);
1237         shutdownState = 0;
1238       }
1239     }
1240   }
1241 
1242   @Override
1243   protected final void shutdownOutput() throws IOException {
1244     FileDescriptor fdesc = core.validFd();
1245     if (fdesc != null) {
1246       NativeUnixSocket.shutdown(fdesc, SHUT_WR);
1247       shutdownState |= 1 << (SHUT_RD_WR);
1248       if (shutdownState == SHUTDOWN_RD_WR) {
1249         NativeUnixSocket.shutdown(fdesc, SHUT_RD_WR);
1250         shutdownState = 0;
1251       }
1252     }
1253   }
1254 
1255   final int getAncillaryReceiveBufferSize() {
1256     return ancillaryDataSupport.getAncillaryReceiveBufferSize();
1257   }
1258 
1259   final void setAncillaryReceiveBufferSize(int size) {
1260     ancillaryDataSupport.setAncillaryReceiveBufferSize(size);
1261   }
1262 
1263   final void ensureAncillaryReceiveBufferSize(int minSize) {
1264     ancillaryDataSupport.ensureAncillaryReceiveBufferSize(minSize);
1265   }
1266 
1267   AncillaryDataSupport getAncillaryDataSupport() {
1268     return ancillaryDataSupport;
1269   }
1270 
1271   final SocketAddress receive(ByteBuffer dst) throws IOException {
1272     return core.receive(dst, socketTimeout::get);
1273   }
1274 
1275   final int send(ByteBuffer src, SocketAddress target) throws IOException {
1276     return core.write(src, socketTimeout::get, target, 0);
1277   }
1278 
1279   final int read(ByteBuffer dst, ByteBuffer socketAddressBuffer) throws IOException {
1280     return core.read(dst, socketTimeout::get, socketAddressBuffer, 0);
1281   }
1282 
1283   final int write(ByteBuffer src) throws IOException {
1284     return core.write(src, socketTimeout::get);
1285   }
1286 
1287   @Override
1288   protected final FileDescriptor getFileDescriptor() {
1289     return core.fd;
1290   }
1291 
1292   final void updatePorts(int local, int remote) {
1293     this.localport = local;
1294     if (remote >= 0) {
1295       this.port = remote;
1296     }
1297   }
1298 
1299   final @Nullable A getLocalSocketAddress() {
1300     return AFSocketAddress.getSocketAddress(getFileDescriptor(), false, localport, addressFamily);
1301   }
1302 
1303   final @Nullable A getRemoteSocketAddress() {
1304     return AFSocketAddress.getSocketAddress(getFileDescriptor(), true, port, addressFamily);
1305   }
1306 
1307   final int getLocalPort1() {
1308     return localport;
1309   }
1310 
1311   final int getRemotePort() {
1312     return port;
1313   }
1314 
1315   @Override
1316   protected final InetAddress getInetAddress() {
1317     @Nullable
1318     A rsa = getRemoteSocketAddress();
1319     if (rsa == null) {
1320       return InetAddress.getLoopbackAddress();
1321     } else {
1322       return rsa.getInetAddress();
1323     }
1324   }
1325 
1326   final void createSocket(FileDescriptor fdTarget, AFSocketType type) throws IOException {
1327     NativeUnixSocket.createSocket(fdTarget, addressFamily.getDomain(), type.getId());
1328   }
1329 
1330   final AFAddressFamily<A> getAddressFamily() {
1331     return addressFamily;
1332   }
1333 
1334   @Override
1335   protected <T> void setOption(SocketOption<T> name, T value) throws IOException {
1336     if (name instanceof AFSocketOption<?>) {
1337       getCore().setOption((AFSocketOption<T>) name, value);
1338       return;
1339     }
1340     Integer optionId = SocketOptionsMapper.resolve(name);
1341     if (optionId == null) {
1342       super.setOption(name, value);
1343     } else {
1344       setOption(optionId, value);
1345     }
1346   }
1347 
1348   @SuppressWarnings("unchecked")
1349   @Override
1350   protected <T> T getOption(SocketOption<T> name) throws IOException {
1351     if (name instanceof AFSocketOption<?>) {
1352       return getCore().getOption((AFSocketOption<T>) name);
1353     }
1354     Integer optionId = SocketOptionsMapper.resolve(name);
1355     if (optionId == null) {
1356       return super.getOption(name);
1357     } else {
1358       return (T) getOption(optionId);
1359     }
1360   }
1361 
1362   @Override
1363   protected Set<SocketOption<?>> supportedOptions() {
1364     return SocketOptionsMapper.SUPPORTED_SOCKET_OPTIONS;
1365   }
1366 
1367   /**
1368    * Returns the internal helper instance for address-specific extensions.
1369    *
1370    * @return The helper instance.
1371    * @throws UnsupportedOperationException if such extensions are not supported for this address
1372    *           type.
1373    */
1374   protected final synchronized AFSocketImplExtensions<A> getImplExtensions() {
1375     if (implExtensions == null) {
1376       implExtensions = addressFamily.initImplExtensions(ancillaryDataSupport);
1377     }
1378     return implExtensions;
1379   }
1380 }