AFSocketCore.java

/*
 * junixsocket
 *
 * Copyright 2009-2024 Christian Kohlschütter
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.newsclub.net.unix;

import java.io.FileDescriptor;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.newsclub.net.unix.pool.ObjectPool.Lease;

/**
 * A shared core that is common for all AF* sockets (datagrams, streams).
 *
 * @author Christian Kohlschütter
 */
class AFSocketCore extends AFCore {
  private final AtomicInteger pendingAccepts = new AtomicInteger(0);
  private static final int SHUT_RD_WR = 2;

  /**
   * We keep track of the server's inode to detect when another server connects to our address.
   */
  final AtomicLong inode = new AtomicLong(-1);

  AFSocketAddress socketAddress;

  private final AFAddressFamily<?> af;
  private boolean shutdownOnClose = true;

  protected AFSocketCore(Object observed, FileDescriptor fd,
      AncillaryDataSupport ancillaryDataSupport, AFAddressFamily<?> af, boolean datagramMode) {
    super(observed, fd, ancillaryDataSupport, datagramMode);
    this.af = af;
  }

  protected AFAddressFamily<?> addressFamily() {
    return af;
  }

  @Override
  @SuppressWarnings("UnsafeFinalization" /* errorprone */)
  protected void doClose() throws IOException {
    if (isShutdownOnClose()) {
      NativeUnixSocket.shutdown(fd, SHUT_RD_WR);
      unblockAccepts();
    }

    super.doClose();
  }

  protected void unblockAccepts() {
    // see AFSocketImpl
  }

  AFSocketAddress receive(ByteBuffer dst, AFSupplier<Integer> socketTimeout) throws IOException {
    try (Lease<ByteBuffer> socketAddressBufferLease = AFSocketAddress.SOCKETADDRESS_BUFFER_TL
        .take()) {
      ByteBuffer socketAddressBuffer = socketAddressBufferLease.get();

      int read = read(dst, socketTimeout, socketAddressBuffer, 0);
      if (read > 0) {
        return AFSocketAddress.ofInternal(socketAddressBuffer, af);
      } else {
        return null;
      }
    }
  }

  boolean isConnected(boolean boundOk) {
    try {
      if (fd.valid()) {
        switch (NativeUnixSocket.socketStatus(fd)) {
          case NativeUnixSocket.SOCKETSTATUS_CONNECTED:
            return true;
          case NativeUnixSocket.SOCKETSTATUS_BOUND:
            if (boundOk) {
              return true;
            }
            break;
          default:
        }
      }
    } catch (IOException e) {
      throw new IllegalStateException(e);
    }
    return false;
  }

  @SuppressWarnings({"unchecked"})
  <T> T getOption(AFSocketOption<T> name) throws IOException {
    Class<T> type = name.type();
    if (Boolean.class.isAssignableFrom(type)) {
      return (T) (Object) (NativeUnixSocket.getSocketOption(fd, name.level(), name.optionName(),
          Integer.class) != 0);
    } else if (NamedInteger.HasOfValue.class.isAssignableFrom(type)) {
      @SuppressWarnings("all") // "null" creates another warning
      int v = NativeUnixSocket.getSocketOption(fd, name.level(), name.optionName(), Integer.class);
      try {
        return (T) type.getMethod("ofValue", int.class).invoke(null, v);
      } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
          | NoSuchMethodException | SecurityException e) {
        throw new IOException("Value casting problem", e);
      }
    } else {
      return NativeUnixSocket.getSocketOption(fd, name.level(), name.optionName(), type);
    }
  }

  <T> void setOption(AFSocketOption<T> name, T value) throws IOException {
    final Object val;
    if (value instanceof Boolean) {
      val = (((Boolean) value) ? 1 : 0);
    } else if (value instanceof NamedInteger) {
      val = ((NamedInteger) value).value();
    } else {
      val = value;
    }
    int level = name.level();
    int optionName = name.optionName();
    NativeUnixSocket.setSocketOption(fd, level, optionName, val);
    if (level == 271 && optionName == 135) {
      // AFTIPCSocketOptions.TIPC_GROUP_JOIN
      // unclear why, but sleeping for at least 1ms prevents issues with GROUP_JOIN
      try {
        Thread.sleep(1);
      } catch (InterruptedException e) {
        // ignore
      }
    }
  }

  protected void incPendingAccepts() throws SocketException {
    if (pendingAccepts.incrementAndGet() >= Integer.MAX_VALUE) {
      pendingAccepts.decrementAndGet();
      throw new SocketException("Too many pending accepts");
    }
  }

  protected void decPendingAccepts() {
    pendingAccepts.decrementAndGet();
  }

  protected boolean hasPendingAccepts() {
    return pendingAccepts.get() > 0;
  }

  boolean isShutdownOnClose() {
    return shutdownOnClose;
  }

  void setShutdownOnClose(boolean enabled) {
    this.shutdownOnClose = enabled;
  }
}