AFSocketChannel.java

/*
 * junixsocket
 *
 * Copyright 2009-2024 Christian Kohlschütter
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.newsclub.net.unix;

import static java.util.Objects.requireNonNull;

import java.io.FileDescriptor;
import java.io.IOException;
import java.net.ProtocolFamily;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.net.StandardProtocolFamily;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jdt.annotation.NonNull;

import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;

/**
 * A selectable channel for stream-oriented connecting sockets.
 *
 * @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
 * @author Christian Kohlschütter
 */
public abstract class AFSocketChannel<A extends AFSocketAddress> extends SocketChannel implements
    AFSomeSocket, AFSocketExtensions, AFSomeSocketChannel {
  private final @NonNull AFSocket<A> afSocket;
  private final AtomicBoolean connectPending = new AtomicBoolean(false);

  /**
   * Creates a new socket channel for the given socket, using the given {@link SelectorProvider}.
   *
   * @param socket The socket.
   * @param sp The {@link SelectorProvider}.
   */
  @SuppressWarnings("all")
  protected AFSocketChannel(AFSocket<A> socket, AFSelectorProvider<A> sp) {
    super(sp);
    this.afSocket = Objects.requireNonNull(socket);
  }

  /**
   * Returns the corresponding {@link AFSocket}.
   *
   * @return The corresponding socket.
   */
  protected final AFSocket<A> getAFSocket() {
    return afSocket;
  }

  /**
   * A reference to a method that provides an {@link AFSocket} instance.
   *
   * @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
   */
  @FunctionalInterface
  protected interface AFSocketSupplier<A extends AFSocketAddress> {
    /**
     * Returns a new {@link AFSocket} instance.
     *
     * @return The instance.
     * @throws IOException on error.
     */
    AFSocket<A> newInstance() throws IOException;
  }

  /**
   * Opens a socket channel.
   *
   * @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
   * @param supplier The AFSocketChannel constructor.
   *
   * @return The new channel
   * @throws IOException on error.
   */
  protected static final <A extends AFSocketAddress> AFSocketChannel<A> open(
      AFSocketSupplier<A> supplier) throws IOException {
    return supplier.newInstance().getChannel();
  }

  /**
   * Opens a socket channel, connecting to the given socket address.
   *
   * @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
   * @param remote The socket address to connect to.
   * @param supplier The AFSocketChannel constructor.
   * @return The new channel
   * @throws IOException on error.
   */
  protected static final <A extends AFSocketAddress> AFSocketChannel<A> open(
      AFSocketSupplier<A> supplier, SocketAddress remote) throws IOException {
    @SuppressWarnings("resource")
    AFSocketChannel<A> sc = open(supplier);
    try {
      sc.connect(remote);
    } catch (Throwable x) { // NOPMD
      try {
        sc.close();
      } catch (Throwable suppressed) { // NOPMD
        x.addSuppressed(suppressed);
      }
      throw x;
    }
    assert sc.isConnected();
    return sc;
  }

  @SuppressWarnings("unchecked")
  @Override
  public final <T> T getOption(SocketOption<T> name) throws IOException {
    if (name instanceof AFSocketOption<?>) {
      return getAFCore().getOption((AFSocketOption<T>) name);
    }
    Integer optionId = SocketOptionsMapper.resolve(name);
    if (optionId == null) {
      throw new UnsupportedOperationException("unsupported option");
    } else {
      return (T) afSocket.getAFImpl().getOption(optionId);
    }
  }

  @Override
  public final <T> AFSocketChannel<A> setOption(SocketOption<T> name, T value) throws IOException {
    if (name instanceof AFSocketOption<?>) {
      getAFCore().setOption((AFSocketOption<T>) name, value);
      return this;
    }
    Integer optionId = SocketOptionsMapper.resolve(name);
    if (optionId == null) {
      throw new UnsupportedOperationException("unsupported option");
    } else {
      afSocket.getAFImpl().setOption(optionId, value);
    }
    return this;
  }

  @Override
  public final Set<SocketOption<?>> supportedOptions() {
    return SocketOptionsMapper.SUPPORTED_SOCKET_OPTIONS;
  }

  @Override
  public final AFSocketChannel<A> bind(SocketAddress local) throws IOException {
    afSocket.bind(local);
    return this;
  }

  @Override
  public final AFSocketChannel<A> shutdownInput() throws IOException {
    afSocket.getAFImpl().shutdownInput();
    return this;
  }

  @Override
  public final AFSocketChannel<A> shutdownOutput() throws IOException {
    afSocket.getAFImpl().shutdownOutput();
    return this;
  }

  @Override
  @SuppressFBWarnings("EI_EXPOSE_REP")
  public final AFSocket<A> socket() {
    return afSocket;
  }

  @Override
  public final boolean isConnected() {
    boolean connected = afSocket.isConnected();
    if (connected) {
      connectPending.set(false);
    }
    return connected;
  }

  @Override
  public final boolean isConnectionPending() {
    return connectPending.get();
  }

  @Override
  public final boolean connect(SocketAddress remote) throws IOException {
    boolean complete = false;
    Exception exception = null;
    try {
      begin();
      boolean connected = afSocket.connect0(remote, 0);
      if (!connected) {
        connectPending.set(true);
      }
      complete = true;
      return connected;
    } catch (IOException e) {
      throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException( // NOPMD.PreserveStackTrace
          (exception = InterruptibleChannelUtil.handleException(this, e)));
    } finally {
      InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
    }
  }

  @Override
  public final boolean finishConnect() throws IOException {
    if (isConnected()) {
      return true;
    } else if (!isConnectionPending()) {
      return false;
    }

    boolean complete = false;
    Exception exception = null;
    try {
      begin();
      boolean connected = NativeUnixSocket.finishConnect(afSocket.getFileDescriptor())
          || isConnected();
      if (connected) {
        connectPending.set(false);
      }
      complete = true;
      return connected;
    } catch (IOException e) {
      throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException( // NOPMD.PreserveStackTrace
          (exception = InterruptibleChannelUtil.handleException(this, e)));
    } finally {
      InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
    }
  }

  @Override
  public final A getRemoteAddress() throws IOException {
    return getRemoteSocketAddress();
  }

  @Override
  public final A getRemoteSocketAddress() {
    return afSocket.getRemoteSocketAddress();
  }

  @Override
  public final int read(ByteBuffer dst) throws IOException {
    boolean complete = false;
    Exception exception = null;
    try {
      begin();
      int read = afSocket.getAFImpl().read(dst, null);
      complete = true;
      return read;
    } catch (IOException e) {
      throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException( // NOPMD.PreserveStackTrace
          (exception = InterruptibleChannelUtil.handleException(this, e)));
    } finally {
      InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
    }
  }

  @Override
  public final long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
    if (length == 0) {
      return 0;
    }
    // FIXME support more than one buffer for scatter-gather access
    return read(dsts[offset]);
  }

  @Override
  public final long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
    if (length == 0) {
      return 0;
    }
    // FIXME support more than one buffer for scatter-gather access
    return write(srcs[offset]);
  }

  @Override
  public final int write(ByteBuffer src) throws IOException {
    boolean complete = false;
    Exception exception = null;
    try {
      begin();
      int written = afSocket.getAFImpl().write(src);
      complete = true;
      return written;
    } catch (IOException e) {
      throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException( // NOPMD.PreserveStackTrace
          (exception = InterruptibleChannelUtil.handleException(this, e)));
    } finally {
      InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
    }
  }

  @Override
  public final A getLocalAddress() throws IOException {
    return getLocalSocketAddress();
  }

  @Override
  public final A getLocalSocketAddress() {
    return afSocket.getLocalSocketAddress();
  }

  @Override
  protected final void implCloseSelectableChannel() throws IOException {
    afSocket.close();
  }

  @Override
  protected final void implConfigureBlocking(boolean block) throws IOException {
    getAFCore().implConfigureBlocking(block);
  }

  @Override
  public final int getAncillaryReceiveBufferSize() {
    return afSocket.getAncillaryReceiveBufferSize();
  }

  @Override
  public final void setAncillaryReceiveBufferSize(int size) {
    afSocket.setAncillaryReceiveBufferSize(size);
  }

  @Override
  public final void ensureAncillaryReceiveBufferSize(int minSize) {
    afSocket.ensureAncillaryReceiveBufferSize(minSize);
  }

  final AFSocketCore getAFCore() {
    return afSocket.getAFImpl().getCore();
  }

  @Override
  public final FileDescriptor getFileDescriptor() throws IOException {
    return afSocket.getFileDescriptor();
  }

  @Override
  public final String toString() {
    return super.toString() + afSocket.toStringSuffix();
  }

  @Override
  public void setShutdownOnClose(boolean enabled) {
    getAFCore().setShutdownOnClose(enabled);
  }

  /**
   * Opens a socket channel. The {@code family} parameter specifies the {@link ProtocolFamily
   * protocol family} of the channel's socket.
   * <p>
   * If the {@link ProtocolFamily} is of an {@link AFProtocolFamily}, or {@code UNIX}, the
   * corresponding junixsocket implementation is used. In all other cases, the call is delegated to
   * {@link SocketChannel#open()}.
   *
   * @param family The protocol family.
   * @return The new {@link SocketChannel}.
   * @throws IOException on error.
   */
  public static SocketChannel open(ProtocolFamily family) throws IOException {
    requireNonNull(family);

    if (family instanceof AFProtocolFamily) {
      return ((AFProtocolFamily) family).openSocketChannel();
    } else if ("UNIX".equals(family.name())) {
      return AFUNIXSocketChannel.open();
    } else if (family instanceof StandardProtocolFamily) {
      return SocketChannel.open();
    } else {
      throw new UnsupportedOperationException("Protocol family not supported");
    }
  }
}