VirtualThreadPollerNaive.java
/*
* junixsocket
*
* Copyright 2009-2024 Christian Kohlschütter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.newsclub.net.unix;
import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.SocketTimeoutException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.LockSupport;
import org.eclipse.jdt.annotation.Nullable;
import org.newsclub.net.unix.AFSelector.PollFd;
/**
* "Naive" implementation of {@link VirtualThreadPoller}, using
* {@link NativeUnixSocket#poll(PollFd, int)} on non-virtual threads.
*
* @author Christian Kohlschütter
*/
final class VirtualThreadPollerNaive implements VirtualThreadPoller {
private static final int POLL_INTERVAL_MILLIS = 1_000; // should remain at 1 second to simplify
// socket timeout handling
private static final Map<FileDescriptor, PollJob> POLL_JOBS = new ConcurrentHashMap<>();
private static final InterruptedIOException POLL_INTERRUPTED_SENTINEL =
new InterruptedIOException();
private static final class PollJob {
private final List<Thread> waitingThreads = new LinkedList<>();
private final FileDescriptor fd;
private final int mode;
private final long now;
private final AFSupplier<Integer> timeout;
PollJob(FileDescriptor fd, int mode, long now, AFSupplier<Integer> timeout) {
this.fd = fd;
this.mode = mode;
this.now = now;
this.timeout = timeout;
}
@SuppressWarnings("PMD.CognitiveComplexity")
AFFuture<@Nullable IOException> trigger(Thread waitingThread) {
synchronized (fd) {
waitingThreads.add(waitingThread);
}
return AFFuture.supplyAsync(() -> {
try {
Thread thread = Thread.currentThread();
PollFd pfd = new PollFd(new FileDescriptor[] {fd}, new int[] {mode});
do {
if (thread.isInterrupted() || !fd.valid()) {
return POLL_INTERRUPTED_SENTINEL;
}
try {
NativeUnixSocket.poll(pfd, POLL_INTERVAL_MILLIS);
} catch (IOException e) {
return e;
}
if (thread.isInterrupted() || !fd.valid()) {
return POLL_INTERRUPTED_SENTINEL;
}
if (pfd.rops[0] != 0) {
break;
}
int timeoutMillis = timeout.get();
if (timeoutMillis > 0) {
if ((System.currentTimeMillis() - now) >= timeoutMillis) {
// handle in calling thread
break;
}
}
} while (true); // NOPMD.WhileLoopWithLiteralBoolean
} finally {
Thread threadToWake = null;
try {
synchronized (fd) {
threadToWake = waitingThreads.remove(0);
if (waitingThreads.isEmpty()) {
POLL_JOBS.remove(fd);
}
}
} finally {
if (threadToWake != null) {
LockSupport.unpark(threadToWake);
}
}
}
return null;
})::get;
}
}
@Override
public void parkThreadUntilReady(FileDescriptor fd, int mode, long now,
AFSupplier<Integer> timeout, Closeable closeOnInterrupt) throws IOException {
Thread virtualThread = Thread.currentThread();
PollJob job = Java7Util.computeIfAbsent(POLL_JOBS, fd, (k) -> new PollJob(fd, mode, now,
timeout));
AFFuture<@Nullable IOException> future = job.trigger(virtualThread);
LockSupport.park();
if (virtualThread.isInterrupted()) {
throw SocketClosedByInterruptException.newInstanceAndClose(closeOnInterrupt);
}
try {
IOException ex = future.get();
if (ex != null) {
if (ex == POLL_INTERRUPTED_SENTINEL) {
throw SocketClosedByInterruptException.newInstanceAndClose(closeOnInterrupt);
}
throw ex;
}
} catch (InterruptedException | ExecutionException e) {
throw SocketClosedByInterruptException.newInstanceAndClose(closeOnInterrupt); // NOPMD.PreserveStackTrace
}
int timeoutMillis = timeout.get();
if (timeoutMillis > 0) {
if ((System.currentTimeMillis() - now) >= timeoutMillis) {
throw new SocketTimeoutException();
}
}
}
}