AFSocketImpl.java

/*
 * junixsocket
 *
 * Copyright 2009-2024 Christian Kohlschütter
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.newsclub.net.unix;

import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD;
import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD_WR;
import static org.newsclub.net.unix.NativeUnixSocket.SHUT_WR;

import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketImpl;
import java.net.SocketOption;
import java.net.SocketOptions;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.newsclub.net.unix.pool.MutableHolder;
import org.newsclub.net.unix.pool.ObjectPool.Lease;

/**
 * junixsocket-based {@link SocketImpl}.
 *
 * @author Christian Kohlschütter
 * @param <A> The supported address type.
 */
@SuppressWarnings({
    "PMD.CyclomaticComplexity", "PMD.CouplingBetweenObjects",
    "UnsafeFinalization" /* errorprone */})
public abstract class AFSocketImpl<A extends AFSocketAddress> extends SocketImplShim {
  private static final int SHUTDOWN_RD_WR = (1 << SHUT_RD) | (1 << SHUT_WR);

  private final AFSocketStreamCore core;
  final AncillaryDataSupport ancillaryDataSupport = new AncillaryDataSupport();

  private final AtomicBoolean bound = new AtomicBoolean(false);
  private Boolean createType = null;
  private final AtomicBoolean connected = new AtomicBoolean(false);

  private volatile boolean closedInputStream = false;
  private volatile boolean closedOutputStream = false;

  private final AFInputStream in;
  private final AFOutputStream out;

  private boolean reuseAddr = true;

  private final AtomicInteger socketTimeout = new AtomicInteger(0);
  private final AFAddressFamily<A> addressFamily;

  private int shutdownState = 0;

  private AFSocketImplExtensions<A> implExtensions = null;

  private final AtomicBoolean closed = new AtomicBoolean(false);

  /**
   * When the {@link AFSocketImpl} becomes unreachable (but not yet closed), we must ensure that the
   * underlying socket and all related file descriptors are closed.
   *
   * @author Christian Kohlschütter
   */
  static final class AFSocketStreamCore extends AFSocketCore {
    AFSocketStreamCore(AFSocketImpl<?> observed, FileDescriptor fd,
        AncillaryDataSupport ancillaryDataSupport, AFAddressFamily<?> af) {
      super(observed, fd, ancillaryDataSupport, af, false);
    }

    void createSocket(FileDescriptor fdTarget, AFSocketType type) throws IOException {
      NativeUnixSocket.createSocket(fdTarget, addressFamily().getDomain(), type.getId());
    }

    /**
     * Unblock other threads that are currently waiting on accept, simply by connecting to the
     * socket.
     */
    @Override
    protected void unblockAccepts() {
      if (socketAddress == null || socketAddress.getBytes() == null || inode.get() < 0) {
        return;
      }
      if (!hasPendingAccepts()) {
        return;
      }
      try {
        ThreadUtil.runOnSystemThread(this::unblockAccepts0);
      } catch (InterruptedException e) {
        // ignore
      }
    }

    private void unblockAccepts0() {
      while (hasPendingAccepts()) {
        try {
          FileDescriptor tmpFd = new FileDescriptor();

          try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
            createSocket(tmpFd, AFSocketType.SOCK_STREAM);
            ByteBuffer ab = abLease.get();
            NativeUnixSocket.connect(ab, ab.limit(), tmpFd, inode.get());
          } catch (IOException e) {
            // there's nothing more we can do to unlock these accepts
            // (e.g., SocketException: No such file or directory)
            return;
          }
          if (isShutdownOnClose()) {
            try {
              NativeUnixSocket.shutdown(tmpFd, SHUT_RD_WR);
            } catch (Exception e) {
              // ignore
            }
          }
          try {
            NativeUnixSocket.close(tmpFd);
          } catch (Exception e) {
            // ignore
          }
        } catch (RuntimeException e) {
          // ignore
        }

        // sleep a little to give the cleaners some CPU time to actually clean up
        try {
          Thread.sleep(5);
        } catch (InterruptedException e) {
          // ignore
        }
      }
    }
  }

  /**
   * Creates a new {@link AFSocketImpl} instance.
   *
   * @param addressFamily The address family.
   * @param fdObj The socket's {@link FileDescriptor}.
   */
  protected AFSocketImpl(AFAddressFamily<@NonNull A> addressFamily, FileDescriptor fdObj) {
    super();
    this.addressFamily = addressFamily;
    this.address = InetAddress.getLoopbackAddress();
    this.core = new AFSocketStreamCore(this, fdObj, ancillaryDataSupport, addressFamily);
    this.fd = core.fd;
    this.in = newInputStream();
    this.out = newOutputStream();
  }

  /**
   * Creates a new {@link InputStream} for this socket.
   *
   * @return The new stream.
   */
  protected final AFInputStream newInputStream() {
    return new AFInputStreamImpl();
  }

  /**
   * Creates a new {@link OutputStream} for this socket.
   *
   * @return The new stream.
   */
  protected final AFOutputStream newOutputStream() {
    return new AFOutputStreamImpl();
  }

  final FileDescriptor getFD() {
    return fd;
  }

  // CPD-OFF
  final boolean isConnected() {
    if (connected.get()) {
      return true;
    }
    if (isClosed()) {
      return false;
    }
    if (core.isConnected(false)) {
      connected.set(true);
      return true;
    }
    return false;
  }

  final boolean isBound() {
    if (bound.get()) {
      return true;
    }
    if (isClosed()) {
      return false;
    }
    if (core.isConnected(true)) {
      bound.set(true);
      return true;
    }
    return false;
  }

  final AFSocketCore getCore() {
    return core;
  }

  boolean isClosed() {
    return closed.get() || core.isClosed();
  }
  // CPD-ON

  @Override
  protected final void accept(SocketImpl socket) throws IOException {
    accept0(socket);
  }

  @SuppressWarnings({
      "Finally" /* errorprone */, //
      "PMD.CognitiveComplexity", "PMD.NPathComplexity", "PMD.NcssCount"})
  final boolean accept0(SocketImpl socket) throws IOException {
    FileDescriptor fdesc = core.validFdOrException();
    if (isClosed()) {
      throw new SocketClosedException();
    } else if (!isBound()) {
      throw new NotBoundSocketException();
    }

    AFSocketAddress socketAddress = core.socketAddress;
    AFSocketAddress boundSocketAddress = getLocalSocketAddress();
    if (boundSocketAddress != null) {
      // Always resolve bound address from wildcard address, etc.
      core.socketAddress = socketAddress = boundSocketAddress;
    }

    if (socketAddress == null) {
      throw new NotBoundSocketException();
    }

    @SuppressWarnings("unchecked")
    final AFSocketImpl<A> si = (AFSocketImpl<A>) socket;
    core.incPendingAccepts();

    final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
        .isVirtualBlocking();

    long now = virtualBlocking ? System.currentTimeMillis() : 0;
    boolean park = false;
    virtualThreadLoop : do {
      if (virtualBlocking) {
        if (park) {
          VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_ACCEPT, now,
              socketTimeout::get, this::close);
        }
      }

      try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
        ByteBuffer ab = abLease.get();

        SocketException caught = null;
        try {
          boolean success;
          if (virtualBlocking) {
            core.configureVirtualBlocking(true);
          }
          try {
            success = NativeUnixSocket.accept(ab, ab.limit(), fdesc, si.fd, core.inode.get(),
                socketTimeout.get());
          } catch (SocketTimeoutException e) {
            if (virtualBlocking) {
              // try again
              park = true;
              continue virtualThreadLoop;
            } else {
              throw e;
            }
          } finally {
            if (virtualBlocking) {
              core.configureVirtualBlocking(false);
            }
          }

          if (virtualBlocking) {
            if (success) {
              // mark the accepted socket as blocking if necessary
              NativeUnixSocket.configureBlocking(si.fd, core.isBlocking());
            } else {
              // try again
              park = true;
              continue virtualThreadLoop;
            }
          }
        } catch (NotConnectedSocketException | SocketClosedException // NOPMD.ExceptionAsFlowControl
            | BrokenPipeSocketException e) {
          try {
            close();
          } catch (Exception e2) {
            e.addSuppressed(e2);
          }
          throw e;
        } catch (SocketException e) { // NOPMD.ExceptionAsFlowControl
          caught = e;
        } finally { // NOPMD.DoNotThrowExceptionInFinally
          if (!isBound() || isClosed()) {
            if (getCore().isShutdownOnClose()) {
              try {
                NativeUnixSocket.shutdown(si.fd, SHUT_RD_WR);
              } catch (Exception e) {
                // ignore
              }
            }
            try {
              NativeUnixSocket.close(si.fd);
            } catch (Exception e) {
              // ignore
            }
            if (caught != null) {
              throw caught;
            } else {
              throw new BrokenPipeSocketException("Socket is closed");
            }
          } else if (caught != null) {
            throw caught;
          }
        }
      } finally {
        core.decPendingAccepts();
      }
      break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
    } while (true); // NOPMD.WhileLoopWithLiteralBoolean

    if (!si.fd.valid()) {
      return false;
    }

    si.setSocketAddress(socketAddress);
    si.connected.set(true);

    return true;
  }

  final void setSocketAddress(AFSocketAddress socketAddress) {
    if (socketAddress == null) {
      this.core.socketAddress = null;
      this.address = null;
      this.localport = -1;
    } else {
      this.core.socketAddress = socketAddress;
      this.address = socketAddress.getAddress();
      if (this.localport <= 0) {
        this.localport = socketAddress.getPort();
      }
    }
  }

  @Override
  protected final int available() throws IOException {
    FileDescriptor fdesc = core.validFdOrException();
    try (Lease<MutableHolder<ByteBuffer>> lease = core.getPrivateDirectByteBuffer(0)) {
      return NativeUnixSocket.available(fdesc, lease.get().get());
    }
  }

  final void bind(SocketAddress addr, int options) throws IOException {
    if (addr == null) {
      addr = addressFamily.nullBindAddress();
      if (addr == null) {
        throw new UnsupportedOperationException("Cannot bind to null address");
      }
    }

    if (addr == AFSocketAddress.INTERNAL_DUMMY_BIND) { // NOPMD
      bound.set(true);
      core.inode.set(0);
      return;
    }

    addr = AFSocketAddress.mapOrFail(addr, addressFamily.getSocketAddressClass());
    bound.set(true);

    AFSocketAddress socketAddress = Objects.requireNonNull((AFSocketAddress) addr);

    this.setSocketAddress(socketAddress);
    try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
      ByteBuffer ab = abLease.get();
      long inode = NativeUnixSocket.bind(ab, ab.limit(), fd, options);
      core.inode.set(inode);
    }
    core.validFdOrException();
  }

  @Override
  @SuppressWarnings("hiding")
  protected final void bind(InetAddress host, int port) throws IOException {
    // ignored
  }

  private void checkClose() throws IOException {
    if (closedInputStream && closedOutputStream) {
      close();
    }
  }

  @Override
  protected final void close() throws IOException {
    this.closed.set(true);
    try {
      shutdown();
    } catch (NotConnectedSocketException | SocketClosedException e) {
      // ignore
    }

    core.runCleaner();
  }

  @Override
  @SuppressWarnings("hiding")
  protected final void connect(String host, int port) throws IOException {
    throw new SocketException("Cannot bind to this type of address: " + InetAddress.class);
  }

  @Override
  @SuppressWarnings("hiding")
  protected final void connect(InetAddress address, int port) throws IOException {
    throw new SocketException("Cannot bind to this type of address: " + InetAddress.class);
  }

  @Override
  protected final void connect(SocketAddress addr, int connectTimeout) throws IOException {
    connect0(addr, connectTimeout);
  }

  @SuppressWarnings({"PMD.CognitiveComplexity", "PMD.NPathComplexity", "PMD.NcssCount"})
  final boolean connect0(SocketAddress addr, int connectTimeout) throws IOException {
    if (addr == AFSocketAddress.INTERNAL_DUMMY_CONNECT) { // NOPMD
      this.connected.set(true);
      return true;
    } else if (addr == AFSocketAddress.INTERNAL_DUMMY_DONT_CONNECT) { // NOPMD)
      return false;
    }

    addr = AFSocketAddress.mapOrFail(addr, addressFamily.getSocketAddressClass());
    AFSocketAddress socketAddress = Objects.requireNonNull((AFSocketAddress) addr);

    final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
        .isVirtualBlocking();
    long now = virtualBlocking ? System.currentTimeMillis() : 0;

    /**
     * If set, a two-phase connect is in flight, and the value holds the connect timeout.
     */
    AFSupplier<Integer> virtualConnectTimeout = null;

    if (virtualBlocking) {
      core.configureVirtualBlocking(true);
    }
    boolean park = false;
    try {
      virtualThreadLoop : do {
        try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
          ByteBuffer ab = abLease.get();
          boolean success = false;
          boolean ignoreSpuriousTimeout = true;
          do {
            if (virtualBlocking) {
              if (virtualConnectTimeout != null) {
                if (park) {
                  VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fd, SelectionKey.OP_CONNECT,
                      now, virtualConnectTimeout, this::close);
                }
              } else {
                Thread.yield();
              }
            }

            if (virtualBlocking) {
              core.configureVirtualBlocking(true);
            }
            try {
              success = NativeUnixSocket.connect(ab, ab.limit(), fd, -2);
              if (!success && virtualBlocking) {
                // try again (non-blocking timeout)
                if (virtualConnectTimeout == null) {
                  virtualConnectTimeout = () -> connectTimeout;
                }
                park = true;
                continue virtualThreadLoop;
              }
              break;
            } catch (SocketTimeoutException e) {
              // Ignore spurious timeout once when SO_TIMEOUT==0
              // seen on older Linux kernels with AF_VSOCK running in qemu
              if (ignoreSpuriousTimeout) {
                Object o = getOption(SocketOptions.SO_TIMEOUT);
                if (o instanceof Integer) {
                  if (((Integer) o) == 0) {
                    ignoreSpuriousTimeout = false;
                    continue;
                  }
                } else if (o == null) {
                  ignoreSpuriousTimeout = false;
                  continue;
                }
              }
              throw e;
            } catch (NotConnectedSocketException | SocketClosedException
                | BrokenPipeSocketException e) {
              try {
                close();
              } catch (Exception e2) {
                e.addSuppressed(e2);
              }
              throw e;
            } catch (SocketException e) {
              if (virtualBlocking) {
                Thread.yield();
              }
              throw e;
            } finally {
              if (virtualBlocking) {
                core.configureVirtualBlocking(false);
              }
            }
          } while (ThreadUtil.checkNotInterruptedOrThrow());
          if (success) {
            setSocketAddress(socketAddress);
            this.connected.set(true);
          }
          core.validFdOrException();
          return success;
        }
      } while (true); // NOPMD.WhileLoopWithLiteralBoolean
    } finally {
      if (virtualBlocking) {
        core.configureVirtualBlocking(true);
      }
    }
  }

  @Override
  protected final void create(boolean stream) throws IOException {
    if (isClosed()) {
      throw new SocketException("Already closed");
    }
    if (fd.valid()) {
      if (createType != null) {
        if (createType.booleanValue() != stream) { // NOPMD.UnnecessaryBoxing
          throw new IllegalStateException("Already created with different mode");
        }
      } else {
        createType = stream;
      }
      return;
    }
    createType = stream;
    createSocket(fd, stream ? AFSocketType.SOCK_STREAM : AFSocketType.SOCK_DGRAM);
  }

  @Override
  protected final AFInputStream getInputStream() throws IOException {
    if (!isConnected() && !isBound()) {
      close();
      throw new SocketClosedException("Not connected/not bound");
    }
    core.validFdOrException();
    return in;
  }

  @Override
  protected final AFOutputStream getOutputStream() throws IOException {
    if (!isClosed() && !isBound()) {
      close();
      throw new SocketClosedException("Not connected/not bound");
    }
    core.validFdOrException();
    return out;
  }

  @Override
  protected final void listen(int backlog) throws IOException {
    FileDescriptor fdesc = core.validFdOrException();
    if (backlog <= 0) {
      backlog = 50;
    }
    NativeUnixSocket.listen(fdesc, backlog);
  }

  @Override
  protected final boolean supportsUrgentData() {
    return false;
  }

  @Override
  protected final void sendUrgentData(int data) throws IOException {
    throw new UnsupportedOperationException();
  }

  private final class AFInputStreamImpl extends AFInputStream {
    private volatile boolean streamClosed = false;
    private final AtomicBoolean eofReached = new AtomicBoolean(false);

    private final int defaultOpt = (core.isBlocking() ? 0 : NativeUnixSocket.OPT_NON_BLOCKING);

    @SuppressWarnings("PMD.CognitiveComplexity")
    @Override
    public int read(byte[] buf, int off, int len) throws IOException {
      if (streamClosed) {
        throw new SocketClosedException("This InputStream has already been closed.");
      }
      if (eofReached.get()) {
        return -1;
      }

      FileDescriptor fdesc = core.validFdOrException();
      if (len == 0) {
        return 0;
      } else if (off < 0 || len < 0 || (len > buf.length - off)) {
        throw new IndexOutOfBoundsException();
      }

      final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
          .isVirtualBlocking();
      final long now;
      final int opt;
      if (virtualBlocking) {
        now = System.currentTimeMillis();
        opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
      } else {
        now = 0;
        opt = defaultOpt;
      }

      int read;

      boolean park = false;
      virtualThreadLoop : do {
        if (virtualBlocking) {
          if (park) {
            VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_READ, now,
                socketTimeout::get, this::forceCloseSocket);
          }
          core.configureVirtualBlocking(true);
        }

        try {
          read = NativeUnixSocket.read(fdesc, buf, off, len, opt, ancillaryDataSupport,
              socketTimeout.get());
          if (read == -2) {
            if (virtualBlocking) {
              // sleep again
              park = true;
              continue virtualThreadLoop;
            } else {
              read = 0;
            }
          }
        } catch (SocketTimeoutException e) {
          if (virtualBlocking) {
            // sleep again
            park = true;
            continue virtualThreadLoop;
          } else {
            throw e;
          }
        } catch (EOFException e) {
          eofReached.set(true);
          throw e;
        } finally {
          if (virtualBlocking) {
            core.configureVirtualBlocking(false);
          }
        }
        break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
      } while (true); // NOPMD.WhileLoopWithLiteralBoolean

      return read;
    }

    @SuppressWarnings("PMD.CognitiveComplexity")
    @Override
    public int read() throws IOException {
      FileDescriptor fdesc = core.validFdOrException();

      if (eofReached.get()) {
        return -1;
      }

      // CPD-OFF
      final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
          .isVirtualBlocking();
      final long now;
      final int opt;
      if (virtualBlocking) {
        now = System.currentTimeMillis();
        opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
      } else {
        now = 0;
        opt = defaultOpt;
      }

      boolean park = false;
      virtualThreadLoop : do {
        if (virtualBlocking) {
          if (park) {
            VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_READ, now,
                socketTimeout::get, this::forceCloseSocket);
          }
          core.configureVirtualBlocking(true);
        }

        try {
          int byteRead = NativeUnixSocket.read(fdesc, null, 0, 1, opt, ancillaryDataSupport,
              socketTimeout.get());
          if (byteRead < 0) {
            if (byteRead == -2) {
              if (virtualBlocking) {
                // sleep again
                park = true;
                continue virtualThreadLoop;
              } else {
                byteRead = -1;
              }
            }
            eofReached.set(true);
            return -1;
          } else {
            return byteRead;
          }
        } catch (SocketTimeoutException e) {
          if (virtualBlocking) {
            // sleep again
            park = true;
            continue virtualThreadLoop;
          } else {
            throw e;
          }
        } finally {
          if (virtualBlocking) {
            core.configureVirtualBlocking(false);
          }
        }
      } while (true); // NOPMD.WhileLoopWithLiteralBoolean

      // CPD-ON
    }

    private void forceCloseSocket() throws IOException {
      closedOutputStream = true;
      close();
    }

    @Override
    public synchronized void close() throws IOException {
      if (streamClosed || isClosed()) {
        return;
      }
      streamClosed = true;
      FileDescriptor fdesc = core.validFd();
      if (fdesc != null && getCore().isShutdownOnClose()) {
        NativeUnixSocket.shutdown(fdesc, SHUT_RD);
      }

      closedInputStream = true;
      checkClose();
    }

    @Override
    public int available() throws IOException {
      if (streamClosed) {
        throw new SocketClosedException("This InputStream has already been closed.");
      }

      return AFSocketImpl.this.available();
    }

    @Override
    public FileDescriptor getFileDescriptor() throws IOException {
      return getFD();
    }

  }

  private static boolean checkWriteInterruptedException(int bytesTransferred)
      throws InterruptedIOException {
    if (Thread.currentThread().isInterrupted()) {
      InterruptedIOException ex = new InterruptedIOException("write");
      ex.bytesTransferred = bytesTransferred;
      throw ex;
    }
    return true;
  }

  private final class AFOutputStreamImpl extends AFOutputStream {
    private volatile boolean streamClosed = false;

    private final int defaultOpt = (core.isBlocking() ? 0 : NativeUnixSocket.OPT_NON_BLOCKING);

    @SuppressWarnings("PMD.CognitiveComplexity")
    @Override
    public void write(int oneByte) throws IOException {
      FileDescriptor fdesc = core.validFdOrException();

      final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
          .isVirtualBlocking();
      final long now;
      final int opt;
      if (virtualBlocking) {
        now = System.currentTimeMillis();
        opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
      } else {
        now = 0;
        opt = defaultOpt;
      }

      boolean park = false;
      virtualThreadLoop : do {
        if (virtualBlocking) {
          if (park) {
            VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
                socketTimeout::get, this::forceCloseSocket);
          }
          core.configureVirtualBlocking(true);
        }

        try {
          int written;
          do {
            written = NativeUnixSocket.write(fdesc, null, oneByte, 1, opt, ancillaryDataSupport);
            if (written != 0) {
              break;
            }
            if (virtualBlocking) {
              park = true;
              continue virtualThreadLoop;
            }
          } while (checkWriteInterruptedException(0));
        } catch (NotConnectedSocketException | SocketClosedException
            | BrokenPipeSocketException e) {
          try {
            forceCloseSocket();
          } catch (Exception e2) {
            e.addSuppressed(e2);
          }
          throw e;
        } catch (SocketTimeoutException e) {
          if (virtualBlocking) {
            // try again
            park = true;
            continue virtualThreadLoop;
          } else {
            throw e;
          }
        } finally {
          if (virtualBlocking) {
            core.configureVirtualBlocking(false);
          }
        }
        break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
      } while (true); // NOPMD.WhileLoopWithLiteralBoolean
    }

    @SuppressWarnings({"PMD.CognitiveComplexity", "PMD.NPathComplexity"})
    @Override
    public void write(byte[] buf, int off, int len) throws IOException {
      if (streamClosed) {
        throw new SocketException("This OutputStream has already been closed.");
      }
      if (len < 0 || off < 0 || len > buf.length - off) {
        throw new IndexOutOfBoundsException();
      }
      FileDescriptor fdesc = core.validFdOrException();

      // NOTE: writing messages with len == 0 should be permissible (unless ignored in native code)
      // For certain sockets, empty messages can be used to probe if the remote connection is alive
      if (len == 0 && !AFSocket.supports(AFSocketCapability.CAPABILITY_ZERO_LENGTH_SEND)) {
        return;
      }

      final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
          .isVirtualBlocking();
      final long now;
      final int opt;
      if (virtualBlocking) {
        now = System.currentTimeMillis();
        opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
      } else {
        now = 0;
        opt = defaultOpt;
      }

      int writtenTotal = 0;
      do {
        boolean park = false;
        virtualThreadLoop : do {
          if (virtualBlocking) {
            if (park) {
              VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
                  socketTimeout::get, this::forceCloseSocket);
            }
            core.configureVirtualBlocking(true);
          }

          final int written;
          try {
            written = NativeUnixSocket.write(fdesc, buf, off, len, opt, ancillaryDataSupport);
            if (written == 0 && virtualBlocking) {
              // try again
              park = true;
              continue virtualThreadLoop;
            }
            if (written < 0) {
              if (len == 0) {
                // This exception is only useful to detect OS-level bugs that we need to
                // work-around
                // in native code.
                // throw new IOException("Error while writing zero-length byte array; try -D"
                // + AFSocket.PROP_LIBRARY_DISABLE_CAPABILITY_PREFIX
                // + AFSocketCapability.CAPABILITY_ZERO_LENGTH_SEND.name() + "=true");

                // ignore
                return;
              } else {
                throw new IOException("Unspecific error while writing");
              }
            }
          } catch (NotConnectedSocketException | SocketClosedException
              | BrokenPipeSocketException e) {
            try {
              forceCloseSocket();
            } catch (Exception e2) {
              e.addSuppressed(e2);
            }
            throw e;
          } catch (SocketTimeoutException e) {
            if (virtualBlocking) {
              // try again
              park = true;
              continue virtualThreadLoop;
            } else {
              throw e;
            }
          } finally {
            if (virtualBlocking) {
              core.configureVirtualBlocking(false);
            }
          }

          len -= written;
          off += written;
          writtenTotal += written;
          break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
        } while (true); // NOPMD.WhileLoopWithLiteralBoolean

      } while (len > 0 && checkWriteInterruptedException(writtenTotal));
    }

    private void forceCloseSocket() throws IOException {
      closedInputStream = true;
      close();
    }

    @Override
    public synchronized void close() throws IOException {
      if (streamClosed || isClosed()) {
        return;
      }
      streamClosed = true;
      FileDescriptor fdesc = core.validFd();
      if (fdesc != null && getCore().isShutdownOnClose()) {
        NativeUnixSocket.shutdown(fdesc, SHUT_WR);
      }
      closedOutputStream = true;
      checkClose();
    }

    @Override
    public FileDescriptor getFileDescriptor() throws IOException {
      return getFD();
    }
  }

  @Override
  public final String toString() {
    return super.toString() + "[fd=" + fd + "; addr=" + this.core.socketAddress + "; connected="
        + connected + "; bound=" + bound + "]";
  }

  private static int expectInteger(Object value) throws SocketException {
    if (value == null) {
      throw (SocketException) new SocketException("Value must not be null").initCause(
          new NullPointerException());
    }
    try {
      return (Integer) value;
    } catch (final ClassCastException e) {
      throw (SocketException) new SocketException("Unsupported value: " + value).initCause(e);
    }
  }

  private static int expectBoolean(Object value) throws SocketException {
    if (value == null) {
      throw (SocketException) new SocketException("Value must not be null").initCause(
          new NullPointerException());
    }
    try {
      return ((Boolean) value) ? 1 : 0;
    } catch (final ClassCastException e) {
      throw (SocketException) new SocketException("Unsupported value: " + value).initCause(e);
    }
  }

  @Override
  public Object getOption(int optID) throws SocketException {
    return getOption0(optID);
  }

  private Object getOption0(int optID) throws SocketException {
    if (isClosed()) {
      throw new SocketException("Socket is closed");
    }
    if (optID == SocketOptions.SO_REUSEADDR) {
      return reuseAddr;
    }

    FileDescriptor fdesc = core.validFdOrException();
    return getOptionDefault(fdesc, optID, socketTimeout, addressFamily);
  }

  static final Object getOptionDefault(FileDescriptor fdesc, int optID, AtomicInteger acceptTimeout,
      AFAddressFamily<?> af) throws SocketException {
    try {
      switch (optID) {
        case SocketOptions.SO_KEEPALIVE:
          try {
            return (NativeUnixSocket.getSocketOptionInt(fdesc, optID) != 0);
          } catch (SocketException e) {
            // ignore
            return false;
          }
        case SocketOptions.TCP_NODELAY:
          return (NativeUnixSocket.getSocketOptionInt(fdesc, optID) != 0);
        case SocketOptions.SO_TIMEOUT:
          int v = Math.max(NativeUnixSocket.getSocketOptionInt(fdesc, 0x1005), NativeUnixSocket
              .getSocketOptionInt(fdesc, 0x1006));
          if (v == -1) {
            // special value, meaning: do not override infinite timeout from native code
            return 0;
          }
          return Math.max((acceptTimeout == null ? 0 : acceptTimeout.get()), v);
        case SocketOptions.SO_LINGER:
        case SocketOptions.SO_RCVBUF:
        case SocketOptions.SO_SNDBUF:
          return NativeUnixSocket.getSocketOptionInt(fdesc, optID);
        case SocketOptions.IP_TOS:
          return 0;
        case SocketOptions.SO_BINDADDR:
          return AFSocketAddress.getInetAddress(fdesc, false, af);
        case SocketOptions.SO_REUSEADDR:
          return false;
        default:
          throw new SocketException("Unsupported option: " + optID);
      }
    } catch (final SocketException e) {
      throw e;
    } catch (final Exception e) {
      throw (SocketException) new SocketException("Could not get option").initCause(e);
    }
  }

  @Override
  public void setOption(int optID, Object value) throws SocketException {
    setOption0(optID, value);
  }

  private void setOption0(int optID, Object value) throws SocketException {
    if (isClosed()) {
      throw new SocketException("Socket is closed");
    }
    if (optID == SocketOptions.SO_REUSEADDR) {
      reuseAddr = (expectBoolean(value) != 0);
      return;
    }

    FileDescriptor fdesc = core.validFdOrException();
    setOptionDefault(fdesc, optID, value, socketTimeout);
  }

  /**
   * Like {@link #getOption(int)}, but ignores exceptions for certain option IDs.
   *
   * @param optID The option ID.
   * @return The value.
   * @throws SocketException on error.
   */
  protected final Object getOptionLenient(int optID) throws SocketException {
    try {
      return getOption0(optID);
    } catch (SocketException e) {
      switch (optID) {
        case SocketOptions.TCP_NODELAY:
        case SocketOptions.SO_KEEPALIVE:
          return false;
        default:
          throw e;
      }
    }
  }

  /**
   * Like {@link #setOption(int, Object)}, but ignores exceptions for certain option IDs.
   *
   * @param optID The option ID.
   * @param value The value.
   * @throws SocketException on error.
   */
  protected final void setOptionLenient(int optID, Object value) throws SocketException {
    try {
      setOption0(optID, value);
    } catch (SocketException e) {
      switch (optID) {
        case SocketOptions.TCP_NODELAY:
          return;
        default:
          throw e;
      }
    }
  }

  static final void setOptionDefault(FileDescriptor fdesc, int optID, Object value,
      AtomicInteger acceptTimeout) throws SocketException {
    try {
      switch (optID) {
        case SocketOptions.SO_LINGER:

          if (value instanceof Boolean) {
            final boolean b = (Boolean) value;
            if (b) {
              throw new SocketException("Only accepting Boolean.FALSE here");
            }
            NativeUnixSocket.setSocketOptionInt(fdesc, optID, -1);
            return;
          }
          NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectInteger(value));
          return;
        case SocketOptions.SO_TIMEOUT: {
          int timeout = expectInteger(value);
          try {
            NativeUnixSocket.setSocketOptionInt(fdesc, 0x1005, timeout);
          } catch (InvalidArgumentSocketException e) {
            // Perhaps the socket is shut down?
          }
          try {
            NativeUnixSocket.setSocketOptionInt(fdesc, 0x1006, timeout);
          } catch (InvalidArgumentSocketException e) {
            // Perhaps the socket is shut down?
          }
          if (acceptTimeout != null) {
            acceptTimeout.set(timeout);
          }
          return;
        }
        case SocketOptions.SO_RCVBUF:
        case SocketOptions.SO_SNDBUF:
          NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectInteger(value));
          return;
        case SocketOptions.SO_KEEPALIVE:
          try {
            NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectBoolean(value));
          } catch (SocketException e) {
            // ignore
          }
          return;
        case SocketOptions.TCP_NODELAY:
          NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectBoolean(value));
          return;
        case SocketOptions.IP_TOS:
          // ignore
          return;
        case SocketOptions.SO_REUSEADDR:
          // ignore
          return;
        default:
          throw new SocketException("Unsupported option: " + optID);
      }
    } catch (final SocketException e) {
      throw e;
    } catch (final Exception e) {
      throw (SocketException) new SocketException("Error while setting option").initCause(e);
    }
  }

  /**
   * Shuts down both input and output at once. Equivalent to calling {@link #shutdownInput()} and
   * {@link #shutdownOutput()}.
   *
   * @throws IOException on error.
   */
  protected final void shutdown() throws IOException {
    FileDescriptor fdesc = core.validFd();
    if (fdesc != null) {
      NativeUnixSocket.shutdown(fdesc, SHUT_RD_WR);
      shutdownState = 0;
    }
  }

  @Override
  protected final void shutdownInput() throws IOException {
    FileDescriptor fdesc = core.validFd();
    if (fdesc != null) {
      NativeUnixSocket.shutdown(fdesc, SHUT_RD);
      shutdownState |= 1 << (SHUT_RD);
      if (shutdownState == SHUTDOWN_RD_WR) {
        NativeUnixSocket.shutdown(fdesc, SHUT_RD_WR);
        shutdownState = 0;
      }
    }
  }

  @Override
  protected final void shutdownOutput() throws IOException {
    FileDescriptor fdesc = core.validFd();
    if (fdesc != null) {
      NativeUnixSocket.shutdown(fdesc, SHUT_WR);
      shutdownState |= 1 << (SHUT_RD_WR);
      if (shutdownState == SHUTDOWN_RD_WR) {
        NativeUnixSocket.shutdown(fdesc, SHUT_RD_WR);
        shutdownState = 0;
      }
    }
  }

  final int getAncillaryReceiveBufferSize() {
    return ancillaryDataSupport.getAncillaryReceiveBufferSize();
  }

  final void setAncillaryReceiveBufferSize(int size) {
    ancillaryDataSupport.setAncillaryReceiveBufferSize(size);
  }

  final void ensureAncillaryReceiveBufferSize(int minSize) {
    ancillaryDataSupport.ensureAncillaryReceiveBufferSize(minSize);
  }

  AncillaryDataSupport getAncillaryDataSupport() {
    return ancillaryDataSupport;
  }

  final SocketAddress receive(ByteBuffer dst) throws IOException {
    return core.receive(dst, socketTimeout::get);
  }

  final int send(ByteBuffer src, SocketAddress target) throws IOException {
    return core.write(src, socketTimeout::get, target, 0);
  }

  final int read(ByteBuffer dst, ByteBuffer socketAddressBuffer) throws IOException {
    return core.read(dst, socketTimeout::get, socketAddressBuffer, 0);
  }

  final int write(ByteBuffer src) throws IOException {
    return core.write(src, socketTimeout::get);
  }

  @Override
  protected final FileDescriptor getFileDescriptor() {
    return core.fd;
  }

  final void updatePorts(int local, int remote) {
    this.localport = local;
    if (remote >= 0) {
      this.port = remote;
    }
  }

  final @Nullable A getLocalSocketAddress() {
    return AFSocketAddress.getSocketAddress(getFileDescriptor(), false, localport, addressFamily);
  }

  final @Nullable A getRemoteSocketAddress() {
    return AFSocketAddress.getSocketAddress(getFileDescriptor(), true, port, addressFamily);
  }

  final int getLocalPort1() {
    return localport;
  }

  final int getRemotePort() {
    return port;
  }

  @Override
  protected final InetAddress getInetAddress() {
    @Nullable
    A rsa = getRemoteSocketAddress();
    if (rsa == null) {
      return InetAddress.getLoopbackAddress();
    } else {
      return rsa.getInetAddress();
    }
  }

  final void createSocket(FileDescriptor fdTarget, AFSocketType type) throws IOException {
    NativeUnixSocket.createSocket(fdTarget, addressFamily.getDomain(), type.getId());
  }

  final AFAddressFamily<A> getAddressFamily() {
    return addressFamily;
  }

  @Override
  protected <T> void setOption(SocketOption<T> name, T value) throws IOException {
    if (name instanceof AFSocketOption<?>) {
      getCore().setOption((AFSocketOption<T>) name, value);
      return;
    }
    Integer optionId = SocketOptionsMapper.resolve(name);
    if (optionId == null) {
      super.setOption(name, value);
    } else {
      setOption(optionId, value);
    }
  }

  @SuppressWarnings("unchecked")
  @Override
  protected <T> T getOption(SocketOption<T> name) throws IOException {
    if (name instanceof AFSocketOption<?>) {
      return getCore().getOption((AFSocketOption<T>) name);
    }
    Integer optionId = SocketOptionsMapper.resolve(name);
    if (optionId == null) {
      return super.getOption(name);
    } else {
      return (T) getOption(optionId);
    }
  }

  @Override
  protected Set<SocketOption<?>> supportedOptions() {
    return SocketOptionsMapper.SUPPORTED_SOCKET_OPTIONS;
  }

  /**
   * Returns the internal helper instance for address-specific extensions.
   *
   * @return The helper instance.
   * @throws UnsupportedOperationException if such extensions are not supported for this address
   *           type.
   */
  protected final synchronized AFSocketImplExtensions<A> getImplExtensions() {
    if (implExtensions == null) {
      implExtensions = addressFamily.initImplExtensions(ancillaryDataSupport);
    }
    return implExtensions;
  }
}