AFCore.java
/*
* junixsocket
*
* Copyright 2009-2024 Christian Kohlschütter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.newsclub.net.unix;
import java.io.FileDescriptor;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* The core functionality of file descriptor based I/O.
*
* @author Christian Kohlschütter
*/
class AFCore extends CleanableState {
private static final ThreadLocal<ByteBuffer> TL_BUFFER = new ThreadLocal<>();
private static final String PROP_TL_BUFFER_MAX_CAPACITY =
"org.newsclub.net.unix.thread-local-buffer.max-capacity"; // 0 means "no limit" (discouraged)
private static final int TL_BUFFER_MIN_CAPACITY = 8192; // 8 kb per thread
private static final int TL_BUFFER_MAX_CAPACITY = Integer.parseInt(System.getProperty(
PROP_TL_BUFFER_MAX_CAPACITY, Integer.toString(1 * 1024 * 1024))); // 1 MB per thread
private final AtomicBoolean closed = new AtomicBoolean(false);
final FileDescriptor fd;
final AncillaryDataSupport ancillaryDataSupport;
private final boolean datagramMode;
private boolean blocking = true;
private boolean cleanFd = true;
AFCore(Object observed, FileDescriptor fd, AncillaryDataSupport ancillaryDataSupport,
boolean datagramMode) {
super(observed);
this.datagramMode = datagramMode;
this.ancillaryDataSupport = ancillaryDataSupport;
this.fd = fd == null ? new FileDescriptor() : fd;
}
AFCore(Object observed, FileDescriptor fd) {
this(observed, fd, null, false);
}
@Override
protected final void doClean() {
if (fd != null && fd.valid() && cleanFd) {
try {
doClose();
} catch (IOException e) {
// ignore
}
}
if (ancillaryDataSupport != null) {
ancillaryDataSupport.close();
}
}
void disableCleanFd() {
this.cleanFd = false;
}
boolean isClosed() {
return closed.get();
}
void doClose() throws IOException {
if (closed.compareAndSet(false, true)) {
NativeUnixSocket.close(fd);
}
}
FileDescriptor validFdOrException() throws SocketException {
FileDescriptor fdesc = validFd();
if (fdesc == null) {
closed.set(true);
throw new SocketClosedException("Not open");
}
return fdesc;
}
synchronized FileDescriptor validFd() {
if (isClosed()) {
return null;
}
FileDescriptor descriptor = this.fd;
if (descriptor != null) {
if (descriptor.valid()) {
return descriptor;
}
}
return null;
}
int read(ByteBuffer dst) throws IOException {
return read(dst, null, 0);
}
int read(ByteBuffer dst, ByteBuffer socketAddressBuffer, int options) throws IOException {
int remaining = dst.remaining();
if (remaining == 0) {
return 0;
}
FileDescriptor fdesc = validFdOrException();
int dstPos = dst.position();
ByteBuffer buf;
int pos;
boolean direct = dst.isDirect();
if (direct) {
buf = dst;
pos = dstPos;
} else {
buf = getThreadLocalDirectByteBuffer(remaining);
remaining = Math.min(remaining, buf.remaining());
pos = buf.position();
}
if (!blocking) {
options |= NativeUnixSocket.OPT_NON_BLOCKING;
}
int count = NativeUnixSocket.receive(fdesc, buf, pos, remaining, socketAddressBuffer, options,
ancillaryDataSupport, 0);
if (count == -1) {
return count;
}
if (direct) {
if (count < 0) {
throw new IllegalStateException();
}
dst.position(pos + count);
} else {
int oldLimit = buf.limit();
if (count < oldLimit) {
buf.limit(count);
}
try {
while (buf.hasRemaining()) {
dst.put(buf);
}
} finally {
if (count < oldLimit) {
buf.limit(oldLimit);
}
}
}
return count;
}
int write(ByteBuffer src) throws IOException {
return write(src, null, 0);
}
int write(ByteBuffer src, SocketAddress target, int options) throws IOException {
int remaining = src.remaining();
if (remaining == 0) {
return 0;
}
FileDescriptor fdesc = validFdOrException();
final ByteBuffer addressTo;
final int addressToLen;
if (target == null) {
addressTo = null;
addressToLen = 0;
} else {
addressTo = AFSocketAddress.SOCKETADDRESS_BUFFER_TL.get();
addressToLen = AFSocketAddress.unwrapAddressDirectBufferInternal(addressTo, target);
}
// accept "send buffer overflow" as packet loss
// and don't retry (which may slow things down quite a bit)
if (!blocking) {
options |= NativeUnixSocket.OPT_NON_BLOCKING;
}
int pos = src.position();
boolean isDirect = src.isDirect();
ByteBuffer buf;
int bufPos;
if (isDirect) {
buf = src;
bufPos = pos;
} else {
buf = getThreadLocalDirectByteBuffer(remaining);
remaining = Math.min(remaining, buf.remaining());
bufPos = buf.position();
while (src.hasRemaining() && buf.hasRemaining()) {
buf.put(src);
}
buf.position(bufPos);
}
if (datagramMode) {
options |= NativeUnixSocket.OPT_DGRAM_MODE;
}
int written = NativeUnixSocket.send(fdesc, buf, bufPos, remaining, addressTo, addressToLen,
options, ancillaryDataSupport);
src.position(pos + written);
return written;
}
/**
* Returns a per-thread reusable byte buffer for a given capacity.
*
* If a thread-local buffer currently uses a smaller capacity, the buffer is replaced by a larger
* one. If the capacity exceeds a configurable maximum, a new direct buffer is allocated but not
* cached (i.e., the previously cached one is kept but not immediately returned to the caller).
*
* @param capacity The desired capacity.
* @return A byte buffer satisfying the requested capacity.
*/
ByteBuffer getThreadLocalDirectByteBuffer(int capacity) {
if (capacity > TL_BUFFER_MAX_CAPACITY && TL_BUFFER_MAX_CAPACITY > 0) {
// Capacity exceeds configurable maximum limit;
// allocate but do not cache direct buffer.
// This may incur a performance penalty at the cost of correctness when using such capacities.
return ByteBuffer.allocateDirect(capacity);
}
if (capacity < TL_BUFFER_MIN_CAPACITY) {
capacity = TL_BUFFER_MIN_CAPACITY;
}
ByteBuffer buffer = TL_BUFFER.get();
if (buffer == null || capacity > buffer.capacity()) {
buffer = ByteBuffer.allocateDirect(capacity);
TL_BUFFER.set(buffer);
}
buffer.clear();
return buffer;
}
void implConfigureBlocking(boolean block) throws IOException {
NativeUnixSocket.configureBlocking(validFdOrException(), block);
this.blocking = block;
}
boolean isBlocking() {
return blocking;
}
}