AFSocketServerConnector.java
/*
* junixsocket
*
* Copyright 2009-2024 Christian Kohlschütter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//
// based upon jetty-unixdomain-server
// original copyright message from jetty's UnixDomainServerConnector:
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.newsclub.net.unix.jetty;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.SocketException;
import java.net.StandardSocketOptions;
import java.nio.channels.Channel;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.nio.file.Path;
import java.util.EventListener;
import java.util.Locale;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.newsclub.net.unix.AFServerSocketChannel;
import org.newsclub.net.unix.AFSocketAddress;
import org.newsclub.net.unix.AFUNIXSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
/**
* A {@link Connector} implementation for junixsocket server socket channels (Unix domains etc.)
*
* Based upon jetty's UnixDomainServerConnector.
*
* This implementation should work with jetty version 9.4.12 or newer.
*/
@ManagedObject
@SuppressWarnings("PMD.CouplingBetweenObjects")
public class AFSocketServerConnector extends AbstractConnector {
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnector.class);
private final AtomicReference<Closeable> acceptor = new AtomicReference<>();
private final SelectorManager selectorManager;
private ServerSocketChannel serverChannel;
private AFSocketAddress listenSocketAddress;
private boolean inheritChannel;
private int acceptQueueSize;
private int acceptedReceiveBufferSize;
private int acceptedSendBufferSize;
private boolean mayStopServer = false;
private boolean mayStopServerForce = false;
private final Class<? extends EventListener> selectorManagerListenerClass;
private final Server server;
/**
* Creates a new {@link AFSocketServerConnector}.
*
* @param server The server this connector will be added to. Must not be null.
* @param factories The Connection Factories to use.
*/
public AFSocketServerConnector(Server server, ConnectionFactory... factories) {
this(server, null, null, null, -1, -1, factories);
}
/**
* Creates a new {@link AFSocketServerConnector}.
*
* @param server The server this connector will be added to. Must not be null.
* @param acceptors the number of acceptor threads to use, or -1 for a default value. If 0, then
* no acceptor threads will be launched and some other mechanism will need to be used to
* accept new connections.
* @param selectors The number of selectors to use, or -1 for a default derived
* @param factories The Connection Factories to use.
*/
public AFSocketServerConnector(Server server, int acceptors, int selectors,
ConnectionFactory... factories) {
this(server, null, null, null, acceptors, selectors, factories);
}
/**
* Creates a new {@link AFSocketServerConnector}.
*
* @param server The server this connector will be added to. Must not be null.
* @param executor An executor for this connector or null to use the servers executor
* @param scheduler A scheduler for this connector or null to either a {@link Scheduler} set as a
* server bean or if none set, then a new {@link ScheduledExecutorScheduler} instance.
* @param pool A buffer pool for this connector or null to either a {@link ByteBufferPool} set as
* a server bean or none set, the new {code ArrayByteBufferPool} instance.
* @param acceptors the number of acceptor threads to use, or -1 for a default value. If 0, then
* no acceptor threads will be launched and some other mechanism will need to be used to
* accept new connections.
* @param selectors The number of selectors to use, or -1 for a default derived
* @param factories The Connection Factories to use.
*/
@SuppressFBWarnings("EI_EXPOSE_REP2")
@SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
public AFSocketServerConnector(Server server, Executor executor, Scheduler scheduler,
ByteBufferPool pool, int acceptors, int selectors, ConnectionFactory... factories) {
super(server, executor, scheduler, pool, acceptors, factories.length > 0 ? factories
: new ConnectionFactory[] {new HttpConnectionFactory()});
this.server = server;
this.selectorManager = newSelectorManager(getExecutor(), getScheduler(), selectors);
addBean(selectorManager, true);
this.selectorManagerListenerClass = findSelectorManagerListenerClass();
}
@SuppressWarnings("unchecked")
private static Class<? extends EventListener> findSelectorManagerListenerClass() {
try {
return (Class<? extends EventListener>) Class.forName(
"org.eclipse.jetty.io.SelectorManager$SelectorManagerListener");
} catch (ClassNotFoundException e) {
return null;
}
}
private SelectorManager newSelectorManager(Executor executor, Scheduler scheduler,
int selectors) {
return new AFSocketSelectorManager(executor, scheduler, selectors);
}
/**
* Returns the Unix-Domain path this connector listens to.
*
* Added for compatibility with jetty's {@code UnixDomainServerConnector}.
*
* @return The Unix-Domain path this connector listens to.
* @deprecated Use {@link #getListenSocketAddress()} instead.
* @see #getListenSocketAddress()
*/
@ManagedAttribute("The Unix-Domain path this connector listens to")
@Deprecated
public Path getUnixDomainPath() {
if (listenSocketAddress instanceof AFUNIXSocketAddress) {
AFUNIXSocketAddress addr = (AFUNIXSocketAddress) listenSocketAddress;
if (addr.hasFilename()) {
try {
return addr.getFile().toPath();
} catch (FileNotFoundException e) {
return null;
}
}
}
return null;
}
/**
* Sets the Unix-Domain path this connector listens to.
*
* Added for compatibility with jetty's {@code UnixDomainServerConnector}.
*
* @param unixDomainPath The path.
* @deprecated Use {@link #setListenSocketAddress(AFSocketAddress)} instead.
* @see #setListenSocketAddress(AFSocketAddress)
*/
@Deprecated
public void setUnixDomainPath(Path unixDomainPath) {
try {
this.listenSocketAddress = AFUNIXSocketAddress.of(unixDomainPath);
} catch (SocketException e) {
throw new IllegalStateException(e);
}
}
/**
* Returns the socket address this connector listens to.
*
* @return The socket address, or {@code null} if none set.
*/
@ManagedAttribute("The socket address this connector listens to")
@SuppressFBWarnings("EI_EXPOSE_REP")
public AFSocketAddress getListenSocketAddress() {
return listenSocketAddress;
}
/**
* Sets the socket address this connector listens to.
*
* @param addr The socket address, or {@code null}.
*/
@SuppressFBWarnings("EI_EXPOSE_REP2")
public void setListenSocketAddress(AFSocketAddress addr) {
this.listenSocketAddress = addr;
}
/**
* Checks whether this connector uses a server channel inherited from the JVM.
*
* @return {@code true} if so.
*/
@ManagedAttribute("Whether this connector uses a server channel inherited from the JVM")
public boolean isInheritChannel() {
return inheritChannel;
}
/**
* Sets whether this connector uses a server channel inherited from the JVM.
*
* @param inheritChannel {@code true} if so.
*/
public void setInheritChannel(boolean inheritChannel) {
this.inheritChannel = inheritChannel;
}
/**
* Returns the accept queue size (backlog) for the server socket.
*
* @return The backlog.
*/
@ManagedAttribute("The accept queue size (backlog) for the server socket")
public int getAcceptQueueSize() {
return acceptQueueSize;
}
/**
* Sets the accept queue size (backlog) for the server socket.
*
* @param acceptQueueSize The backlog.
*/
public void setAcceptQueueSize(int acceptQueueSize) {
this.acceptQueueSize = acceptQueueSize;
}
/**
* Returns the SO_RCVBUF size for accepted sockets.
*
* @return The buffer size.
*/
@ManagedAttribute("The SO_RCVBUF option for accepted sockets")
public int getAcceptedReceiveBufferSize() {
return acceptedReceiveBufferSize;
}
/**
* Sets the SO_RCVBUF size for accepted sockets.
*
* @param acceptedReceiveBufferSize The buffer size.
*/
public void setAcceptedReceiveBufferSize(int acceptedReceiveBufferSize) {
this.acceptedReceiveBufferSize = acceptedReceiveBufferSize;
}
/**
* Returns the SO_SNDBUF size for accepted sockets.
*
* @return The buffer size.
*/
@ManagedAttribute("The SO_SNDBUF option for accepted sockets")
public int getAcceptedSendBufferSize() {
return acceptedSendBufferSize;
}
/**
* Sets the SO_SNDBUF size for accepted sockets.
*
* @param acceptedSendBufferSize The buffer size.
*/
public void setAcceptedSendBufferSize(int acceptedSendBufferSize) {
this.acceptedSendBufferSize = acceptedSendBufferSize;
}
@Override
protected void doStart() throws Exception {
if (selectorManagerListenerClass != null) {
getBeans(selectorManagerListenerClass).forEach(selectorManager::addEventListener);
}
serverChannel = open();
addBean(serverChannel);
super.doStart();
}
@Override
protected void doStop() throws Exception {
super.doStop();
removeBean(serverChannel);
close();
if (selectorManagerListenerClass != null) {
getBeans(selectorManagerListenerClass).forEach(selectorManager::removeEventListener);
}
}
@Override
@SuppressWarnings("PMD.CognitiveComplexity")
protected void accept(int acceptorID) throws IOException {
ServerSocketChannel sc = this.serverChannel;
if (sc != null) {
try {
SocketChannel channel = sc.accept();
accepted(channel);
} catch (IOException e) {
boolean takenOver = !sc.isOpen() || sc.getLocalAddress() == null;
if (!takenOver && sc instanceof AFServerSocketChannel<?>) {
takenOver = !((AFServerSocketChannel<?>) sc).isLocalSocketAddressValid();
}
if (takenOver && isMayStopServer()) {
LOG.warn("Another server has taken over our address");
ForkJoinPool.commonPool().execute(this::checkServerStop);
}
Thread.currentThread().interrupt();
throw (ClosedByInterruptException) new ClosedByInterruptException().initCause(e);
}
}
}
private void checkServerStop() {
Connector[] connectors = server.getConnectors();
if (connectors != null && !isMayStopServerForce()) {
for (Connector conn : connectors) {
if (conn != AFSocketServerConnector.this && conn.isRunning()) { // NOPMD.CompareObjectsWithEquals
return; // don't stop
}
}
}
LOG.warn("Server has no other connectors; shutting down: " + server); // NOPMD
try {
server.stop();
} catch (Exception e1) {
LOG.warn("Exception upon stopping " + server, e1); // NOPMD
}
}
private void accepted(SocketChannel channel) throws IOException {
channel.configureBlocking(false);
configure(channel);
selectorManager.accept(channel);
}
/**
* Configures an incoming {@link SocketChannel}, setting socket options such as receive and send
* buffer sizes.
*
* @param channel The socket channel to configure.
* @throws IOException on error.
*/
protected void configure(SocketChannel channel) throws IOException {
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
int rcvBufSize = getAcceptedReceiveBufferSize();
if (rcvBufSize > 0) {
channel.setOption(StandardSocketOptions.SO_RCVBUF, rcvBufSize);
}
int sndBufSize = getAcceptedSendBufferSize();
if (sndBufSize > 0) {
channel.setOption(StandardSocketOptions.SO_SNDBUF, sndBufSize);
}
}
@Override
@SuppressFBWarnings("EI_EXPOSE_REP")
public Object getTransport() {
return serverChannel;
}
private ServerSocketChannel open() throws IOException {
ServerSocketChannel sc = openServerSocketChannel();
if (getAcceptors() == 0) {
sc.configureBlocking(false);
acceptor.set(selectorManager.acceptor(sc));
}
return sc;
}
private void close() throws IOException {
ServerSocketChannel sc = this.serverChannel;
this.serverChannel = null;
IO.close(sc);
}
private ServerSocketChannel openServerSocketChannel() throws IOException {
ServerSocketChannel sc = null;
if (isInheritChannel()) {
Channel channel = System.inheritedChannel();
if (channel instanceof ServerSocketChannel) {
sc = (ServerSocketChannel) channel;
} else {
LOG.warn( // NOPMD.GuardLogStatement
"Unable to use System.inheritedChannel() {}. Trying a new ServerSocketChannel at {}",
channel, getListenSocketAddress());
}
}
if (sc == null) {
sc = bindServerSocketChannel();
}
return sc;
}
private ServerSocketChannel bindServerSocketChannel() throws IOException {
AFSocketAddress socketAddress = listenSocketAddress;
AFServerSocketChannel<?> sc = socketAddress.getAddressFamily().newServerSocketChannel();
try {
sc.bind(socketAddress, getAcceptQueueSize());
return sc;
} catch (IOException x) {
String message = String.format(Locale.ENGLISH, "Could not bind %s to %s",
AFSocketServerConnector.class.getSimpleName(), listenSocketAddress);
throw new IOException(message, x);
}
}
@Override
public void setAccepting(boolean accepting) {
super.setAccepting(accepting);
if (getAcceptors() == 0) {
return;
}
if (accepting) {
if (acceptor.get() == null) {
Closeable cl = selectorManager.acceptor(serverChannel);
if (!this.acceptor.compareAndSet(null, cl)) {
IO.close(cl);
}
}
} else {
Closeable cl = this.acceptor.get();
if (cl != null && this.acceptor.compareAndSet(cl, null)) {
IO.close(cl);
}
}
}
@Override
public String toString() {
return String.format(Locale.ENGLISH, "%s@%h[%s]", getClass().getSimpleName(), hashCode(),
listenSocketAddress);
}
private final class AFSocketSelectorManager extends SelectorManager {
public AFSocketSelectorManager(Executor executor, Scheduler scheduler, int selectors) {
super(executor, scheduler, selectors);
}
@Override
protected Selector newSelector() throws IOException {
SelectorProvider provider = listenSocketAddress.getAddressFamily().getSelectorProvider();
return provider.openSelector();
}
@Override
protected void accepted(SelectableChannel channel) throws IOException {
AFSocketServerConnector.this.accepted((SocketChannel) channel);
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector,
SelectionKey selectionKey) {
SocketChannelEndPoint endPoint = new SocketChannelEndPoint((SocketChannel) channel, selector,
selectionKey, getScheduler());
endPoint.setIdleTimeout(getIdleTimeout());
return endPoint;
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint,
Object attachment) {
return getDefaultConnectionFactory().newConnection(AFSocketServerConnector.this, endpoint);
}
@Override
protected void endPointOpened(EndPoint endpoint) {
super.endPointOpened(endpoint);
onEndPointOpened(endpoint);
}
@Override
protected void endPointClosed(EndPoint endpoint) {
onEndPointClosed(endpoint);
super.endPointClosed(endpoint);
}
}
/**
* Checks if this connector may stop the server when it's no longer able to serve and no other
* connectors are available.
*
* @return {@code true} if so.
*/
@ManagedAttribute("Whether this connector may stop the server when it's no longer able to"
+ " serve and no other connectors are available")
public boolean isMayStopServer() {
return mayStopServer;
}
/**
* Sets if this connector may stop the server when it's no longer able to serve and no other
* connectors are available.
*
* @param mayStopServer {@code true} if so.
*/
public void setMayStopServer(boolean mayStopServer) {
this.mayStopServer = mayStopServer;
}
/**
* Checks if this connector may stop the server when it's no longer able to serve, even if other
* connectors are available.
*
* @return {@code true} if so.
*/
@ManagedAttribute("Whether this connector may stop the server when it's no longer able to"
+ " serve, even if other connectors are available")
public boolean isMayStopServerForce() {
return mayStopServerForce;
}
/**
* Sets if this connector may stop the server when it's no longer able to serve and no other
* connectors are available.
*
* @param b {@code true} if so (which then also implies {@code setMayStopServer(true)}
*/
public void setMayStopServerForce(boolean b) {
if (b) {
setMayStopServer(true);
}
this.mayStopServerForce = b;
}
}