1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.newsclub.net.unix;
19
20 import java.io.Closeable;
21 import java.io.FileDescriptor;
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.net.SocketTimeoutException;
25 import java.util.LinkedList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.locks.LockSupport;
31
32 import org.eclipse.jdt.annotation.Nullable;
33 import org.newsclub.net.unix.AFSelector.PollFd;
34
35
36
37
38
39
40
41 final class VirtualThreadPollerNaive implements VirtualThreadPoller {
42 private static final int POLL_INTERVAL_MILLIS = 1_000;
43
44
45 private static final Map<FileDescriptor, PollJob> POLL_JOBS = new ConcurrentHashMap<>();
46
47 private static final InterruptedIOException POLL_INTERRUPTED_SENTINEL =
48 new InterruptedIOException();
49
50 private static final class PollJob {
51 private final List<Thread> waitingThreads = new LinkedList<>();
52 private final FileDescriptor fd;
53 private final int mode;
54 private final long now;
55 private final AFSupplier<Integer> timeout;
56
57 PollJob(FileDescriptor fd, int mode, long now, AFSupplier<Integer> timeout) {
58 this.fd = fd;
59 this.mode = mode;
60 this.now = now;
61 this.timeout = timeout;
62 }
63
64 @SuppressWarnings("PMD.CognitiveComplexity")
65 AFFuture<@Nullable IOException> trigger(Thread waitingThread) {
66 synchronized (fd) {
67 waitingThreads.add(waitingThread);
68 }
69 return AFFuture.supplyAsync(() -> {
70 try {
71 Thread thread = Thread.currentThread();
72 PollFd pfd = new PollFd(new FileDescriptor[] {fd}, new int[] {mode});
73 do {
74 if (thread.isInterrupted() || !fd.valid()) {
75 return POLL_INTERRUPTED_SENTINEL;
76 }
77 try {
78 NativeUnixSocket.poll(pfd, POLL_INTERVAL_MILLIS);
79 } catch (IOException e) {
80 return e;
81 }
82 if (thread.isInterrupted() || !fd.valid()) {
83 return POLL_INTERRUPTED_SENTINEL;
84 }
85 if (pfd.rops[0] != 0) {
86 break;
87 }
88
89 int timeoutMillis = timeout.get();
90 if (timeoutMillis > 0) {
91 if ((System.currentTimeMillis() - now) >= timeoutMillis) {
92
93 break;
94 }
95 }
96 } while (true);
97 } finally {
98 Thread threadToWake = null;
99 try {
100 synchronized (fd) {
101 threadToWake = waitingThreads.remove(0);
102 if (waitingThreads.isEmpty()) {
103 POLL_JOBS.remove(fd);
104 }
105 }
106 } finally {
107 if (threadToWake != null) {
108 LockSupport.unpark(threadToWake);
109 }
110 }
111 }
112
113 return null;
114 })::get;
115 }
116 }
117
118 @Override
119 public void parkThreadUntilReady(FileDescriptor fd, int mode, long now,
120 AFSupplier<Integer> timeout, Closeable closeOnInterrupt) throws IOException {
121 Thread virtualThread = Thread.currentThread();
122
123 PollJob job = Java7Util.computeIfAbsent(POLL_JOBS, fd, (k) -> new PollJob(fd, mode, now,
124 timeout));
125 AFFuture<@Nullable IOException> future = job.trigger(virtualThread);
126
127 LockSupport.park();
128 if (virtualThread.isInterrupted()) {
129 throw SocketClosedByInterruptException.newInstanceAndClose(closeOnInterrupt);
130 }
131
132 try {
133 IOException ex = future.get();
134 if (ex != null) {
135 if (ex == POLL_INTERRUPTED_SENTINEL) {
136 throw SocketClosedByInterruptException.newInstanceAndClose(closeOnInterrupt);
137 }
138 throw ex;
139 }
140 } catch (InterruptedException | ExecutionException e) {
141 throw SocketClosedByInterruptException.newInstanceAndClose(closeOnInterrupt);
142 }
143
144 int timeoutMillis = timeout.get();
145 if (timeoutMillis > 0) {
146 if ((System.currentTimeMillis() - now) >= timeoutMillis) {
147 throw new SocketTimeoutException();
148 }
149 }
150 }
151 }