View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix;
19  
20  import java.io.FileDescriptor;
21  import java.io.IOException;
22  import java.net.SocketException;
23  import java.net.SocketTimeoutException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ClosedSelectorException;
26  import java.nio.channels.SelectableChannel;
27  import java.nio.channels.SelectionKey;
28  import java.nio.channels.Selector;
29  import java.nio.channels.spi.AbstractSelectableChannel;
30  import java.nio.channels.spi.AbstractSelector;
31  import java.util.Collections;
32  import java.util.Iterator;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  final class AFSelector extends AbstractSelector {
39    private final AFPipe selectorPipe;
40    private final PollFd selectorPipePollFd;
41  
42    private final ByteBuffer pipeMsgWakeUp = ByteBuffer.allocate(1);
43    private final ByteBuffer pipeMsgReceiveBuffer = ByteBuffer.allocateDirect(256);
44  
45    private final Map<AFSelectionKey, Integer> keysRegistered = new ConcurrentHashMap<>();
46    private final Set<AFSelectionKey> keysRegisteredKeySet = keysRegistered.keySet();
47    private final Set<SelectionKey> keysRegisteredPublic = Collections.unmodifiableSet(
48        keysRegisteredKeySet);
49  
50    private final AtomicInteger selectCount = new AtomicInteger(0);
51  
52    @SuppressWarnings("PMD.LooseCoupling")
53    private final MapValueSet<SelectionKey, Integer> selectedKeysSet =
54        new MapValueSet<SelectionKey, Integer>(keysRegistered, selectCount::get, 0);
55    private final Set<SelectionKey> selectedKeysPublic = new UngrowableSet<>(selectedKeysSet);
56  
57    private PollFd pollFd = null;
58  
59    AFSelector(AFSelectorProvider<?> provider) throws IOException {
60      super(provider);
61  
62      this.selectorPipe = AFUNIXSelectorProvider.getInstance().openSelectablePipe();
63      this.selectorPipePollFd = new PollFd(selectorPipe.sourceFD());
64    }
65  
66    @Override
67    protected SelectionKey register(AbstractSelectableChannel ch, int ops, Object att) {
68      AFSelectionKey key = new AFSelectionKey(this, ch, ops, att);
69      synchronized (this) {
70        pollFd = null;
71        selectedKeysSet.markRemoved(key);
72      }
73      return key;
74    }
75  
76    @Override
77    public Set<SelectionKey> keys() {
78      return keysRegisteredPublic;
79    }
80  
81    @Override
82    public Set<SelectionKey> selectedKeys() {
83      return selectedKeysPublic;
84    }
85  
86    @Override
87    public int selectNow() throws IOException {
88      return select0(0);
89    }
90  
91    @Override
92    public int select(long timeout) throws IOException {
93      if (timeout > Integer.MAX_VALUE) {
94        timeout = Integer.MAX_VALUE;
95      } else if (timeout < 0) {
96        throw new IllegalArgumentException("Timeout must not be negative");
97      }
98  
99      return select0((int) timeout);
100   }
101 
102   @Override
103   public int select() throws IOException {
104     try {
105       return select0(-1);
106     } catch (SocketTimeoutException e) {
107       return 0;
108     }
109   }
110 
111   @SuppressWarnings("PMD.CognitiveComplexity")
112   private int select0(int timeout) throws IOException {
113     PollFd pfd;
114 
115     int selectId = updateSelectCount();
116 
117     synchronized (this) {
118       if (!isOpen()) {
119         throw new ClosedSelectorException();
120       }
121 
122       pfd = pollFd = initPollFd(pollFd);
123     }
124     int num;
125     try {
126       begin();
127       num = NativeUnixSocket.poll(pfd, timeout);
128     } finally {
129       end();
130     }
131     synchronized (this) {
132       pfd = pollFd;
133       if (pfd != null) {
134         AFSelectionKey[] keys = pfd.keys;
135         if (keys != null) {
136           for (AFSelectionKey key : keys) {
137             if (key != null && key.hasOpInvalid()) {
138               SelectableChannel ch = key.channel();
139               if (ch != null && ch.isOpen()) {
140                 ch.close();
141               }
142             }
143           }
144         }
145       }
146       if (num > 0) {
147         consumeAllBytesAfterPoll();
148         setOpsReady(pfd, selectId); // updates keysSelected and numKeysSelected
149       }
150       return selectedKeysSet.size();
151     }
152   }
153 
154   private synchronized void consumeAllBytesAfterPoll() throws IOException {
155     if (pollFd == null) {
156       return;
157     }
158     if ((pollFd.rops[0] & SelectionKey.OP_READ) == 0) {
159       return;
160     }
161     int maxReceive;
162     int bytesReceived;
163 
164     int options = selectorPipe.getOptions();
165 
166     synchronized (pipeMsgReceiveBuffer) {
167       pipeMsgReceiveBuffer.clear();
168       maxReceive = pipeMsgReceiveBuffer.remaining();
169       bytesReceived = receive(maxReceive, options);
170     }
171 
172     if (bytesReceived == maxReceive && maxReceive > 0) {
173       // consume all pending bytes
174       int read;
175       do {
176         if ((read = NativeUnixSocket.poll(selectorPipePollFd, 0)) > 0) {
177           synchronized (pipeMsgReceiveBuffer) {
178             pipeMsgReceiveBuffer.clear();
179             read = receive(maxReceive, options);
180           }
181         }
182       } while (read == maxReceive && read > 0);
183     }
184   }
185 
186   @SuppressWarnings("PMD.CognitiveComplexity")
187   private int receive(int maxReceive, int options) throws IOException {
188     final boolean virtualBlocking = ThreadUtil.isVirtualThread();
189     final long now;
190     if (virtualBlocking) {
191       now = System.currentTimeMillis();
192       options |= NativeUnixSocket.OPT_NON_BLOCKING;
193     } else {
194       now = 0;
195     }
196 
197     FileDescriptor fdesc = selectorPipePollFd.fds[0];
198 
199     boolean park = false;
200     int count;
201     virtualThreadLoop : do {
202       if (virtualBlocking) {
203         if (park) {
204           VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
205               AFPipe.DUMMY_TIMEOUT, this::close);
206         }
207         NativeUnixSocket.configureBlocking(fdesc, false);
208       }
209       try {
210         count = NativeUnixSocket.receive(fdesc, pipeMsgReceiveBuffer, 0, maxReceive, null, options,
211             null, 1);
212         if (count == 0 && virtualBlocking) {
213           // try again
214           park = true;
215           continue virtualThreadLoop;
216         }
217       } catch (SocketTimeoutException e) {
218         if (virtualBlocking) {
219           // try again
220           park = true;
221           continue virtualThreadLoop;
222         } else {
223           throw e;
224         }
225       } finally {
226         if (virtualBlocking) {
227           NativeUnixSocket.configureBlocking(fdesc, true);
228         }
229       }
230       break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
231     } while (true); // NOPMD.WhileLoopWithLiteralBoolean
232     return count;
233   }
234 
235   private int updateSelectCount() {
236     int selectId = selectCount.incrementAndGet();
237     if (selectId == 0) {
238       // overflow (unlikely)
239       selectedKeysSet.markAllRemoved();
240       selectId = selectCount.incrementAndGet();
241     }
242     return selectId;
243   }
244 
245   private void setOpsReady(PollFd pfd, int selectId) {
246     if (pfd != null) {
247       for (int i = 1; i < pfd.rops.length; i++) {
248         int rops = pfd.rops[i];
249         AFSelectionKey key = pfd.keys[i];
250         key.setOpsReady(rops);
251         if (rops != 0 && keysRegistered.containsKey(key)) {
252           keysRegistered.put(key, selectId);
253         }
254       }
255     }
256   }
257 
258   @SuppressWarnings({"resource", "PMD.CognitiveComplexity"})
259   private PollFd initPollFd(PollFd existingPollFd) throws IOException {
260     synchronized (this) {
261       for (Iterator<AFSelectionKey> it = keysRegisteredKeySet.iterator(); it.hasNext();) {
262         AFSelectionKey key = it.next();
263         if (!key.getAFCore().fd.valid() || !key.isValid()) {
264           key.cancelNoRemove();
265           it.remove();
266           existingPollFd = null;
267         } else {
268           key.setOpsReady(0);
269         }
270       }
271 
272       if (existingPollFd != null && //
273           existingPollFd.keys != null && //
274           (existingPollFd.keys.length - 1) == keysRegistered.size()) {
275         boolean needsUpdate = false;
276         int i = 1;
277         for (AFSelectionKey key : keysRegisteredKeySet) {
278           if (existingPollFd.keys[i] != key || !key.isValid()) { // NOPMD
279             needsUpdate = true;
280             break;
281           }
282           existingPollFd.ops[i] = key.interestOps();
283 
284           i++;
285         }
286 
287         if (!needsUpdate) {
288           return existingPollFd;
289         }
290       }
291 
292       int keysToPoll = keysRegistered.size();
293       for (AFSelectionKey key : keysRegisteredKeySet) {
294         if (!key.isValid()) {
295           keysToPoll--;
296         }
297       }
298 
299       int size = keysToPoll + 1;
300       FileDescriptor[] fds = new FileDescriptor[size];
301       int[] ops = new int[size];
302 
303       AFSelectionKey[] keys = new AFSelectionKey[size];
304       fds[0] = selectorPipe.sourceFD();
305       ops[0] = SelectionKey.OP_READ;
306 
307       int i = 1;
308       for (AFSelectionKey key : keysRegisteredKeySet) {
309         if (!key.isValid()) {
310           continue;
311         }
312         keys[i] = key;
313         fds[i] = key.getAFCore().fd;
314         ops[i] = key.interestOps();
315         i++;
316       }
317       return new PollFd(keys, fds, ops);
318     }
319   }
320 
321   @Override
322   protected void implCloseSelector() throws IOException {
323     wakeup();
324     Set<SelectionKey> keys;
325     synchronized (this) {
326       keys = keys();
327       keysRegistered.clear();
328     }
329     for (SelectionKey key : keys) {
330       ((AFSelectionKey) key).cancelNoRemove();
331     }
332     selectorPipe.close();
333   }
334 
335   @Override
336   public Selector wakeup() {
337     if (isOpen()) {
338       try {
339         synchronized (pipeMsgWakeUp) {
340           pipeMsgWakeUp.clear();
341           try {
342             selectorPipe.sink().write(pipeMsgWakeUp);
343           } catch (SocketException e) {
344             if (selectorPipe.sinkFD().valid()) {
345               throw e;
346             } else {
347               // ignore (Broken pipe, etc)
348             }
349           }
350         }
351       } catch (IOException e) { // NOPMD.ExceptionAsFlowControl
352         // FIXME throw as runtimeexception?
353         StackTraceUtil.printStackTrace(e);
354       }
355     }
356     return this;
357   }
358 
359   synchronized void remove(AFSelectionKey key) {
360     selectedKeysSet.remove(key);
361     deregister(key);
362     pollFd = null;
363   }
364 
365   private void deregister(AFSelectionKey key) {
366     // super.deregister unnecessarily casts SelectionKey to AbstractSelectionKey, and
367     // ((AbstractSelectableChannel)key.channel()).removeKey(key); is not visible.
368     // so we have to resort to some JNI trickery...
369     try {
370       NativeUnixSocket.deregisterSelectionKey((AbstractSelectableChannel) key.channel(), key);
371     } catch (ClassCastException e) {
372       // because our key isn't an AbstractSelectableKey, internal invalidation fails
373       // but at that point, the key is deregistered
374     }
375   }
376 
377   static final class PollFd {
378     // accessed from native code
379     final FileDescriptor[] fds;
380     // accessed from native code
381     final int[] ops;
382     // accessed from native code
383     final int[] rops;
384 
385     final AFSelectionKey[] keys;
386 
387     PollFd(FileDescriptor pipeSourceFd) {
388       this(pipeSourceFd, SelectionKey.OP_READ);
389     }
390 
391     PollFd(FileDescriptor pipeSourceFd, int op) {
392       this.fds = new FileDescriptor[] {pipeSourceFd};
393       this.ops = new int[] {op};
394       this.rops = new int[1];
395       this.keys = null;
396     }
397 
398     PollFd(FileDescriptor[] fds, int[] ops) {
399       this(null, fds, ops);
400     }
401 
402     @SuppressWarnings("PMD.ArrayIsStoredDirectly")
403     PollFd(AFSelectionKey[] keys, FileDescriptor[] fds, int[] ops) {
404       this.keys = keys;
405       if (fds.length != ops.length) {
406         throw new IllegalStateException();
407       }
408       this.fds = fds;
409       this.ops = ops;
410       this.rops = new int[ops.length];
411     }
412   }
413 }