AFDatagramSocket.java
/*
* junixsocket
*
* Copyright 2009-2024 Christian Kohlschütter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.newsclub.net.unix;
import java.io.FileDescriptor;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketImpl;
import java.nio.channels.AlreadyBoundException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.IllegalBlockingModeException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jdt.annotation.Nullable;
import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
/**
* A {@link DatagramSocket} implementation that works with junixsocket.
*
* @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
* @author Christian Kohlschütter
*/
public abstract class AFDatagramSocket<A extends AFSocketAddress> extends DatagramSocketShim
implements AFSomeSocket, AFSocketExtensions {
private static final InetSocketAddress WILDCARD_ADDRESS = new InetSocketAddress(0);
private final AFDatagramSocketImpl<A> impl;
private final AncillaryDataSupport ancillaryDataSupport;
private final AtomicBoolean created = new AtomicBoolean(false);
private final AtomicBoolean deleteOnClose = new AtomicBoolean(true);
@SuppressWarnings("this-escape")
private final AFDatagramChannel<A> channel = newChannel();
/**
* Creates a new {@link AFDatagramSocket} instance.
*
* @param impl The corresponding {@link SocketImpl} class.
*/
protected AFDatagramSocket(final AFDatagramSocketImpl<A> impl) {
super(impl);
this.impl = impl;
this.ancillaryDataSupport = impl.ancillaryDataSupport;
}
/**
* Creates a new {@link DatagramChannel} that is associated with this socket.
*
* @return The channel.
*/
protected abstract AFDatagramChannel<A> newChannel();
/**
* Returns the {@code AncillaryDataSupport} instance.
*
* @return The instance.
*/
final AncillaryDataSupport getAncillaryDataSupport() {
return ancillaryDataSupport;
}
/**
* A reference to the constructor of an {@link AFDatagramSocket} subclass.
*
* @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
*/
@FunctionalInterface
public interface Constructor<A extends AFSocketAddress> {
/**
* Constructs a new {@link DatagramSocket} instance.
*
* @param fd The file descriptor.
* @return The new instance.
* @throws IOException on error.
*/
AFDatagramSocket<A> newSocket(FileDescriptor fd) throws IOException;
}
/**
* Returns the {@link AFSocketAddress} type supported by this socket.
*
* @return The supported {@link AFSocketAddress}.
*/
protected final Class<? extends AFSocketAddress> socketAddressClass() {
return impl.getAddressFamily().getSocketAddressClass();
}
/**
* Returns a new {@link AFDatagramSocket} instance.
*
* @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
* @param constructor The supplying constructor.
* @return The new instance.
* @throws IOException on error.
*/
protected static final <A extends AFSocketAddress> AFDatagramSocket<A> newInstance(
Constructor<A> constructor) throws IOException {
return constructor.newSocket(null);
}
/**
* Creates a new {@link AFDatagramSocket}.
*
* @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
* @param constructor The supplying constructor.
* @param fdObj The file descriptor.
* @param localPort The local port.
* @param remotePort The remote port.
* @return The new instance.
* @throws IOException on error.
*/
protected static final <A extends AFSocketAddress> AFDatagramSocket<A> newInstance(
Constructor<A> constructor, FileDescriptor fdObj, int localPort, int remotePort)
throws IOException {
if (fdObj == null) {
return newInstance(constructor);
}
if (!fdObj.valid()) {
throw new SocketException("Invalid file descriptor");
}
int status = NativeUnixSocket.socketStatus(fdObj);
if (status == NativeUnixSocket.SOCKETSTATUS_INVALID) {
throw new SocketException("Not a valid socket");
}
AFDatagramSocket<A> socket = constructor.newSocket(fdObj);
socket.getAFImpl().updatePorts(localPort, remotePort);
switch (status) {
case NativeUnixSocket.SOCKETSTATUS_CONNECTED:
socket.internalDummyConnect();
break;
case NativeUnixSocket.SOCKETSTATUS_BOUND:
socket.internalDummyBind();
break;
case NativeUnixSocket.SOCKETSTATUS_UNKNOWN:
break;
default:
throw new IllegalStateException("Invalid socketStatus response: " + status);
}
return socket;
}
@Override
public final void connect(InetAddress address, int port) {
throw new IllegalArgumentException("Cannot connect to InetAddress");
}
/**
* Reads the next received packet without actually removing it from the queue.
*
* In other words, once a packet is received, calling this method multiple times in a row will not
* have further effects on the packet contents.
*
* This call still blocks until at least one packet has been received and added to the queue.
*
* @param p The packet.
* @throws IOException on error.
*/
public final void peek(DatagramPacket p) throws IOException {
synchronized (p) {
if (isClosed()) {
throw new SocketException("Socket is closed");
}
getAFImpl().peekData(p);
}
}
@Override
public final void send(DatagramPacket p) throws IOException {
synchronized (p) {
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (!isBound()) {
internalDummyBind();
}
getAFImpl().send(p);
}
}
final void internalDummyConnect() throws SocketException {
super.connect(AFSocketAddress.INTERNAL_DUMMY_DONT_CONNECT);
}
final void internalDummyBind() throws SocketException {
bind(AFSocketAddress.INTERNAL_DUMMY_BIND);
}
@Override
public final synchronized void connect(SocketAddress addr) throws SocketException {
if (!isBound()) {
internalDummyBind();
}
internalDummyConnect();
try {
getAFImpl().connect(AFSocketAddress.preprocessSocketAddress(socketAddressClass(), addr,
null));
} catch (SocketException e) {
throw e;
} catch (IOException e) {
throw (SocketException) new SocketException(e.getMessage()).initCause(e);
}
}
@Override
public final synchronized @Nullable A getRemoteSocketAddress() {
return getAFImpl().getRemoteSocketAddress();
}
@Override
public final boolean isConnected() {
return super.isConnected() || impl.isConnected();
}
@Override
public final boolean isBound() {
return super.isBound() || impl.isBound();
}
@Override
public final void close() {
// IMPORTANT This method must not be synchronized on "this",
// otherwise we can't unblock a pending read
if (isClosed()) {
return;
}
getAFImpl().close();
boolean wasBound = isBound();
if (wasBound && deleteOnClose.get()) {
InetAddress addr = getLocalAddress();
if (AFInetAddress.isSupportedAddress(addr, addressFamily())) {
try {
AFSocketAddress socketAddress = AFSocketAddress.unwrap(addr, 0, addressFamily());
if (socketAddress != null && socketAddress.hasFilename()) {
if (!socketAddress.getFile().delete()) {
// ignore
}
}
} catch (IOException e) {
// ignore
}
}
}
super.close();
}
@Override
@SuppressWarnings("PMD.CognitiveComplexity")
public final synchronized void bind(SocketAddress addr) throws SocketException {
boolean isBound = isBound();
if (isBound) {
if (addr == AFSocketAddress.INTERNAL_DUMMY_BIND) { // NOPMD
return;
}
// getAFImpl().bind(null); // try unbind (may not succeed)
}
if (isClosed()) {
throw new SocketException("Socket is closed");
}
if (!isBound) {
try {
super.bind(AFSocketAddress.INTERNAL_DUMMY_BIND);
} catch (AlreadyBoundException e) {
// ignore
} catch (SocketException e) {
String message = e.getMessage();
if (message != null && message.contains("already bound")) {
// ignore (Java 14 or older)
} else {
throw e;
}
}
}
boolean isWildcardBind = WILDCARD_ADDRESS.equals(addr);
AFSocketAddress epoint = (addr == null || isWildcardBind) ? null : AFSocketAddress
.preprocessSocketAddress(socketAddressClass(), addr, null);
if (epoint instanceof SentinelSocketAddress) {
return;
}
try {
getAFImpl().bind(epoint);
} catch (SocketException e) {
if (isWildcardBind) {
// permit errors on wildcard bind
} else {
getAFImpl().close();
throw e;
}
}
}
@Override
public final @Nullable A getLocalSocketAddress() {
if (isClosed()) {
return null;
}
if (!isBound()) {
return null;
}
return getAFImpl().getLocalSocketAddress();
}
/**
* Checks if this {@link AFDatagramSocket}'s bound filename should be removed upon
* {@link #close()}.
*
* Deletion is not guaranteed, especially when not supported (e.g., addresses in the abstract
* namespace).
*
* @return {@code true} if an attempt is made to delete the socket file upon {@link #close()}.
*/
public final boolean isDeleteOnClose() {
return deleteOnClose.get();
}
/**
* Enables/disables deleting this {@link AFDatagramSocket}'s bound filename upon {@link #close()}.
*
* Deletion is not guaranteed, especially when not supported (e.g., addresses in the abstract
* namespace).
*
* @param b Enabled if {@code true}.
*/
public final void setDeleteOnClose(boolean b) {
deleteOnClose.set(b);
}
final AFDatagramSocketImpl<A> getAFImpl() {
if (created.compareAndSet(false, true)) {
try {
getSoTimeout(); // trigger create via java.net.Socket
} catch (SocketException e) {
// ignore
}
}
return impl;
}
final AFDatagramSocketImpl<A> getAFImpl(boolean create) {
if (create) {
return getAFImpl();
} else {
return impl;
}
}
@Override
public final int getAncillaryReceiveBufferSize() {
return ancillaryDataSupport.getAncillaryReceiveBufferSize();
}
@Override
public final void setAncillaryReceiveBufferSize(int size) {
ancillaryDataSupport.setAncillaryReceiveBufferSize(size);
}
@Override
public final void ensureAncillaryReceiveBufferSize(int minSize) {
ancillaryDataSupport.ensureAncillaryReceiveBufferSize(minSize);
}
@Override
public final boolean isClosed() {
return super.isClosed() || getAFImpl().isClosed();
}
@SuppressFBWarnings("EI_EXPOSE_REP")
@Override
public AFDatagramChannel<A> getChannel() {
return channel;
}
@Override
public final FileDescriptor getFileDescriptor() throws IOException {
return getAFImpl().getFileDescriptor();
}
@Override
public final void receive(DatagramPacket p) throws IOException {
getAFImpl().receive(p);
}
/**
* Returns the address family supported by this implementation.
*
* @return The family.
*/
protected final AFAddressFamily<A> addressFamily() {
return getAFImpl().getAddressFamily();
}
/**
* Returns the internal helper instance for address-specific extensions.
*
* @return The helper instance.
* @throws UnsupportedOperationException if such extensions are not supported for this address
* type.
*/
protected AFSocketImplExtensions<A> getImplExtensions() {
return getAFImpl(false).getImplExtensions();
}
/**
* Returns the value of a junixsocket socket option.
*
* @param <T> The type of the socket option value.
* @param name The socket option.
* @return The value of the socket option.
* @throws IOException on error.
*/
@Override
public <T> T getOption(AFSocketOption<T> name) throws IOException {
return getAFImpl().getCore().getOption(name);
}
/**
* Sets the value of a socket option.
*
* @param <T> The type of the socket option value.
* @param name The socket option.
* @param value The value of the socket option.
* @return this DatagramSocket.
* @throws IOException on error.
*/
@Override
public <T> DatagramSocket setOption(AFSocketOption<T> name, T value) throws IOException {
getAFImpl().getCore().setOption(name, value);
return this;
}
/**
* Accepts a connection to this socket. Note that 1., the socket must be in {@code listen} state
* by calling {@link #bind(SocketAddress)}, followed by {@link #listen(int)}, and 2., the socket
* type must allow listen/accept. This is true for {@link AFSocketType#SOCK_SEQPACKET} AF_UNIX
* sockets, for example.
*
* @return The accepted datagram socket.
* @throws IOException on error.
* @see #listen(int)
*/
public AFDatagramSocket<A> accept() throws IOException {
return accept1(true);
}
/**
* Sets this socket into "listen" state, which allows subsequent calls to {@link #accept()}
* receive any connection attempt. Note that 1., the socket must be bound to a local address using
* {@link #bind(SocketAddress)}, and 2., the socket type must allow listen/accept. This is true
* for {@link AFSocketType#SOCK_SEQPACKET} AF_UNIX sockets, for example.
*
* @param backlog The backlog, or {@code 0} for default.
* @throws IOException on error.
*/
public final void listen(int backlog) throws IOException {
FileDescriptor fdesc = getAFImpl().getCore().validFdOrException();
if (backlog <= 0) {
backlog = 50;
}
NativeUnixSocket.listen(fdesc, backlog);
}
/**
* Returns a new {@link AFDatagramSocket} instance to be used for {@link #accept()}, i.e., no
* {@link FileDescriptor} is associated.
*
* @return The new instance.
* @throws IOException on error.
*/
protected abstract AFDatagramSocket<A> newDatagramSocketInstance() throws IOException;
// CPD-OFF
AFDatagramSocket<A> accept1(boolean throwOnFail) throws IOException {
AFDatagramSocket<A> as = newDatagramSocketInstance();
boolean success = getAFImpl().accept0(as.getAFImpl(false));
if (isClosed()) {
// We may have connected to the socket to unblock it
throw new SocketClosedException("Socket is closed");
}
if (!success) {
if (throwOnFail) {
if (getChannel().isBlocking()) {
// unexpected
return null;
} else {
// non-blocking socket, nothing to accept
throw new IllegalBlockingModeException();
}
} else {
return null;
}
}
as.getAFImpl(true); // trigger create
as.connect(AFSocketAddress.INTERNAL_DUMMY_CONNECT);
as.getAFImpl().updatePorts(getAFImpl().getLocalPort1(), getAFImpl().getRemotePort());
return as;
}
@Override
public void setShutdownOnClose(boolean enabled) {
getAFImpl().getCore().setShutdownOnClose(enabled);
}
}