1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.newsclub.net.unix;
19
20 import java.io.FileDescriptor;
21 import java.io.IOException;
22 import java.net.SocketException;
23 import java.net.SocketTimeoutException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.ClosedSelectorException;
26 import java.nio.channels.SelectableChannel;
27 import java.nio.channels.SelectionKey;
28 import java.nio.channels.Selector;
29 import java.nio.channels.spi.AbstractSelectableChannel;
30 import java.nio.channels.spi.AbstractSelector;
31 import java.util.Collections;
32 import java.util.Iterator;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 final class AFSelector extends AbstractSelector {
39 private final AFPipe selectorPipe;
40 private final PollFd selectorPipePollFd;
41
42 private final ByteBuffer pipeMsgWakeUp = ByteBuffer.allocate(1);
43 private final ByteBuffer pipeMsgReceiveBuffer = ByteBuffer.allocateDirect(256);
44
45 private final Map<AFSelectionKey, Integer> keysRegistered = new ConcurrentHashMap<>();
46 private final Set<AFSelectionKey> keysRegisteredKeySet = keysRegistered.keySet();
47 private final Set<SelectionKey> keysRegisteredPublic = Collections.unmodifiableSet(
48 keysRegisteredKeySet);
49
50 private final AtomicInteger selectCount = new AtomicInteger(0);
51
52 @SuppressWarnings("PMD.LooseCoupling")
53 private final MapValueSet<SelectionKey, Integer> selectedKeysSet =
54 new MapValueSet<SelectionKey, Integer>(keysRegistered, selectCount::get, 0);
55 private final Set<SelectionKey> selectedKeysPublic = new UngrowableSet<>(selectedKeysSet);
56
57 private PollFd pollFd = null;
58
59 AFSelector(AFSelectorProvider<?> provider) throws IOException {
60 super(provider);
61
62 this.selectorPipe = AFUNIXSelectorProvider.getInstance().openSelectablePipe();
63 this.selectorPipePollFd = new PollFd(selectorPipe.sourceFD());
64 }
65
66 @Override
67 protected SelectionKey register(AbstractSelectableChannel ch, int ops, Object att) {
68 AFSelectionKey key = new AFSelectionKey(this, ch, ops, att);
69 synchronized (this) {
70 pollFd = null;
71 selectedKeysSet.markRemoved(key);
72 }
73 return key;
74 }
75
76 @Override
77 public Set<SelectionKey> keys() {
78 return keysRegisteredPublic;
79 }
80
81 @Override
82 public Set<SelectionKey> selectedKeys() {
83 return selectedKeysPublic;
84 }
85
86 @Override
87 public int selectNow() throws IOException {
88 return select0(0);
89 }
90
91 @Override
92 public int select(long timeout) throws IOException {
93 if (timeout > Integer.MAX_VALUE) {
94 timeout = Integer.MAX_VALUE;
95 } else if (timeout < 0) {
96 throw new IllegalArgumentException("Timeout must not be negative");
97 }
98
99 return select0((int) timeout);
100 }
101
102 @Override
103 public int select() throws IOException {
104 try {
105 return select0(-1);
106 } catch (SocketTimeoutException e) {
107 return 0;
108 }
109 }
110
111 @SuppressWarnings("PMD.CognitiveComplexity")
112 private int select0(int timeout) throws IOException {
113 PollFd pfd;
114
115 int selectId = updateSelectCount();
116
117 synchronized (this) {
118 if (!isOpen()) {
119 throw new ClosedSelectorException();
120 }
121
122 pfd = pollFd = initPollFd(pollFd);
123 }
124 int num;
125 try {
126 begin();
127 num = NativeUnixSocket.poll(pfd, timeout);
128 } finally {
129 end();
130 }
131 synchronized (this) {
132 pfd = pollFd;
133 if (pfd != null) {
134 AFSelectionKey[] keys = pfd.keys;
135 if (keys != null) {
136 for (AFSelectionKey key : keys) {
137 if (key != null && key.hasOpInvalid()) {
138 SelectableChannel ch = key.channel();
139 if (ch != null && ch.isOpen()) {
140 ch.close();
141 }
142 }
143 }
144 }
145 }
146 if (num > 0) {
147 consumeAllBytesAfterPoll();
148 setOpsReady(pfd, selectId);
149 }
150 return selectedKeysSet.size();
151 }
152 }
153
154 private synchronized void consumeAllBytesAfterPoll() throws IOException {
155 if (pollFd == null) {
156 return;
157 }
158 if ((pollFd.rops[0] & SelectionKey.OP_READ) == 0) {
159 return;
160 }
161 int maxReceive;
162 int bytesReceived;
163
164 int options = selectorPipe.getOptions();
165
166 synchronized (pipeMsgReceiveBuffer) {
167 pipeMsgReceiveBuffer.clear();
168 maxReceive = pipeMsgReceiveBuffer.remaining();
169 bytesReceived = receive(maxReceive, options);
170 }
171
172 if (bytesReceived == maxReceive && maxReceive > 0) {
173
174 int read;
175 do {
176 if ((read = NativeUnixSocket.poll(selectorPipePollFd, 0)) > 0) {
177 synchronized (pipeMsgReceiveBuffer) {
178 pipeMsgReceiveBuffer.clear();
179 read = receive(maxReceive, options);
180 }
181 }
182 } while (read == maxReceive && read > 0);
183 }
184 }
185
186 @SuppressWarnings("PMD.CognitiveComplexity")
187 private int receive(int maxReceive, int options) throws IOException {
188 final boolean virtualBlocking = ThreadUtil.isVirtualThread();
189 final long now;
190 if (virtualBlocking) {
191 now = System.currentTimeMillis();
192 options |= NativeUnixSocket.OPT_NON_BLOCKING;
193 } else {
194 now = 0;
195 }
196
197 FileDescriptor fdesc = selectorPipePollFd.fds[0];
198
199 boolean park = false;
200 int count;
201 virtualThreadLoop : do {
202 if (virtualBlocking) {
203 if (park) {
204 VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
205 AFPipe.DUMMY_TIMEOUT, this::close);
206 }
207 NativeUnixSocket.configureBlocking(fdesc, false);
208 }
209 try {
210 count = NativeUnixSocket.receive(fdesc, pipeMsgReceiveBuffer, 0, maxReceive, null, options,
211 null, 1);
212 if (count == 0 && virtualBlocking) {
213
214 park = true;
215 continue virtualThreadLoop;
216 }
217 } catch (SocketTimeoutException e) {
218 if (virtualBlocking) {
219
220 park = true;
221 continue virtualThreadLoop;
222 } else {
223 throw e;
224 }
225 } finally {
226 if (virtualBlocking) {
227 NativeUnixSocket.configureBlocking(fdesc, true);
228 }
229 }
230 break;
231 } while (true);
232 return count;
233 }
234
235 private int updateSelectCount() {
236 int selectId = selectCount.incrementAndGet();
237 if (selectId == 0) {
238
239 selectedKeysSet.markAllRemoved();
240 selectId = selectCount.incrementAndGet();
241 }
242 return selectId;
243 }
244
245 private void setOpsReady(PollFd pfd, int selectId) {
246 if (pfd != null) {
247 for (int i = 1; i < pfd.rops.length; i++) {
248 int rops = pfd.rops[i];
249 AFSelectionKey key = pfd.keys[i];
250 key.setOpsReady(rops);
251 if (rops != 0 && keysRegistered.containsKey(key)) {
252 keysRegistered.put(key, selectId);
253 }
254 }
255 }
256 }
257
258 @SuppressWarnings({"resource", "PMD.CognitiveComplexity"})
259 private PollFd initPollFd(PollFd existingPollFd) throws IOException {
260 synchronized (this) {
261 for (Iterator<AFSelectionKey> it = keysRegisteredKeySet.iterator(); it.hasNext();) {
262 AFSelectionKey key = it.next();
263 if (!key.getAFCore().fd.valid() || !key.isValid()) {
264 key.cancelNoRemove();
265 it.remove();
266 existingPollFd = null;
267 } else {
268 key.setOpsReady(0);
269 }
270 }
271
272 if (existingPollFd != null &&
273 existingPollFd.keys != null &&
274 (existingPollFd.keys.length - 1) == keysRegistered.size()) {
275 boolean needsUpdate = false;
276 int i = 1;
277 for (AFSelectionKey key : keysRegisteredKeySet) {
278 if (existingPollFd.keys[i] != key || !key.isValid()) {
279 needsUpdate = true;
280 break;
281 }
282 existingPollFd.ops[i] = key.interestOps();
283
284 i++;
285 }
286
287 if (!needsUpdate) {
288 return existingPollFd;
289 }
290 }
291
292 int keysToPoll = keysRegistered.size();
293 for (AFSelectionKey key : keysRegisteredKeySet) {
294 if (!key.isValid()) {
295 keysToPoll--;
296 }
297 }
298
299 int size = keysToPoll + 1;
300 FileDescriptor[] fds = new FileDescriptor[size];
301 int[] ops = new int[size];
302
303 AFSelectionKey[] keys = new AFSelectionKey[size];
304 fds[0] = selectorPipe.sourceFD();
305 ops[0] = SelectionKey.OP_READ;
306
307 int i = 1;
308 for (AFSelectionKey key : keysRegisteredKeySet) {
309 if (!key.isValid()) {
310 continue;
311 }
312 keys[i] = key;
313 fds[i] = key.getAFCore().fd;
314 ops[i] = key.interestOps();
315 i++;
316 }
317 return new PollFd(keys, fds, ops);
318 }
319 }
320
321 @Override
322 protected void implCloseSelector() throws IOException {
323 wakeup();
324 Set<SelectionKey> keys;
325 synchronized (this) {
326 keys = keys();
327 keysRegistered.clear();
328 }
329 for (SelectionKey key : keys) {
330 ((AFSelectionKey) key).cancelNoRemove();
331 }
332 selectorPipe.close();
333 }
334
335 @Override
336 public Selector wakeup() {
337 if (isOpen()) {
338 try {
339 synchronized (pipeMsgWakeUp) {
340 pipeMsgWakeUp.clear();
341 try {
342 selectorPipe.sink().write(pipeMsgWakeUp);
343 } catch (SocketException e) {
344 if (selectorPipe.sinkFD().valid()) {
345 throw e;
346 } else {
347
348 }
349 }
350 }
351 } catch (IOException e) {
352
353 StackTraceUtil.printStackTrace(e);
354 }
355 }
356 return this;
357 }
358
359 synchronized void remove(AFSelectionKey key) {
360 selectedKeysSet.remove(key);
361 deregister(key);
362 pollFd = null;
363 }
364
365 private void deregister(AFSelectionKey key) {
366
367
368
369 try {
370 NativeUnixSocket.deregisterSelectionKey((AbstractSelectableChannel) key.channel(), key);
371 } catch (ClassCastException e) {
372
373
374 }
375 }
376
377 static final class PollFd {
378
379 final FileDescriptor[] fds;
380
381 final int[] ops;
382
383 final int[] rops;
384
385 final AFSelectionKey[] keys;
386
387 PollFd(FileDescriptor pipeSourceFd) {
388 this(pipeSourceFd, SelectionKey.OP_READ);
389 }
390
391 PollFd(FileDescriptor pipeSourceFd, int op) {
392 this.fds = new FileDescriptor[] {pipeSourceFd};
393 this.ops = new int[] {op};
394 this.rops = new int[1];
395 this.keys = null;
396 }
397
398 PollFd(FileDescriptor[] fds, int[] ops) {
399 this(null, fds, ops);
400 }
401
402 @SuppressWarnings("PMD.ArrayIsStoredDirectly")
403 PollFd(AFSelectionKey[] keys, FileDescriptor[] fds, int[] ops) {
404 this.keys = keys;
405 if (fds.length != ops.length) {
406 throw new IllegalStateException();
407 }
408 this.fds = fds;
409 this.ops = ops;
410 this.rops = new int[ops.length];
411 }
412 }
413 }