View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix;
19  
20  import java.io.FileDescriptor;
21  import java.io.IOException;
22  import java.net.SocketException;
23  import java.net.SocketTimeoutException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.ClosedSelectorException;
26  import java.nio.channels.SelectableChannel;
27  import java.nio.channels.SelectionKey;
28  import java.nio.channels.Selector;
29  import java.nio.channels.spi.AbstractSelectableChannel;
30  import java.nio.channels.spi.AbstractSelector;
31  import java.util.Collections;
32  import java.util.Iterator;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  final class AFSelector extends AbstractSelector {
39    private final AFPipe selectorPipe;
40    private final PollFd selectorPipePollFd;
41  
42    private final ByteBuffer pipeMsgWakeUp = ByteBuffer.allocate(1);
43    private final ByteBuffer pipeMsgReceiveBuffer = ByteBuffer.allocateDirect(256);
44  
45    private final Map<AFSelectionKey, Integer> keysRegistered = new ConcurrentHashMap<>();
46    private final Set<AFSelectionKey> keysRegisteredKeySet = keysRegistered.keySet();
47    private final Set<SelectionKey> keysRegisteredPublic = Collections.unmodifiableSet(
48        keysRegisteredKeySet);
49  
50    private final AtomicInteger selectCount = new AtomicInteger(0);
51  
52    @SuppressWarnings("PMD.LooseCoupling")
53    private final MapValueSet<SelectionKey, Integer> selectedKeysSet =
54        new MapValueSet<SelectionKey, Integer>(keysRegistered, selectCount::get, 0);
55    private final Set<SelectionKey> selectedKeysPublic = new UngrowableSet<>(selectedKeysSet);
56  
57    private PollFd pollFd = null;
58  
59    AFSelector(AFSelectorProvider<?> provider) throws IOException {
60      super(provider);
61  
62      this.selectorPipe = AFUNIXSelectorProvider.getInstance().openSelectablePipe();
63      this.selectorPipePollFd = new PollFd(selectorPipe.sourceFD());
64    }
65  
66    @Override
67    protected SelectionKey register(AbstractSelectableChannel ch, int ops, Object att) {
68      AFSelectionKey key = new AFSelectionKey(this, ch, ops, att);
69      synchronized (this) {
70        pollFd = null;
71        selectedKeysSet.markRemoved(key);
72      }
73      return key;
74    }
75  
76    @Override
77    public Set<SelectionKey> keys() {
78      return keysRegisteredPublic;
79    }
80  
81    @Override
82    public Set<SelectionKey> selectedKeys() {
83      return selectedKeysPublic;
84    }
85  
86    @Override
87    public int selectNow() throws IOException {
88      return select0(0);
89    }
90  
91    @Override
92    public int select(long timeout) throws IOException {
93      if (timeout > Integer.MAX_VALUE) {
94        timeout = Integer.MAX_VALUE;
95      } else if (timeout < 0) {
96        throw new IllegalArgumentException("Timeout must not be negative");
97      }
98  
99      return select0((int) timeout);
100   }
101 
102   @Override
103   public int select() throws IOException {
104     try {
105       return select0(-1);
106     } catch (SocketTimeoutException e) {
107       return 0;
108     }
109   }
110 
111   @SuppressWarnings("PMD.CognitiveComplexity")
112   private int select0(int timeout) throws IOException {
113     PollFd pfd;
114 
115     int selectId = updateSelectCount();
116 
117     synchronized (this) {
118       if (!isOpen()) {
119         throw new ClosedSelectorException();
120       }
121 
122       pfd = pollFd = initPollFd(pollFd);
123     }
124     int num;
125     try {
126       begin();
127       num = NativeUnixSocket.poll(pfd, timeout);
128     } finally {
129       end();
130     }
131     synchronized (this) {
132       pfd = pollFd;
133       if (pfd != null) {
134         AFSelectionKey[] keys = pfd.keys;
135         if (keys != null) {
136           for (AFSelectionKey key : keys) {
137             if (key != null && key.hasOpInvalid()) {
138               SelectableChannel ch = key.channel();
139               if (ch != null && ch.isOpen()) {
140                 ch.close();
141               }
142             }
143           }
144         }
145       }
146       if (num > 0) {
147         consumeAllBytesAfterPoll();
148         setOpsReady(pfd, selectId); // updates keysSelected and numKeysSelected
149       }
150       return selectedKeysSet.size();
151     }
152   }
153 
154   private synchronized void consumeAllBytesAfterPoll() throws IOException {
155     if (pollFd == null) {
156       return;
157     }
158     if ((pollFd.rops[0] & SelectionKey.OP_READ) == 0) {
159       return;
160     }
161     int maxReceive;
162     int bytesReceived;
163 
164     int options = selectorPipe.getOptions();
165 
166     synchronized (pipeMsgReceiveBuffer) {
167       pipeMsgReceiveBuffer.clear();
168       maxReceive = pipeMsgReceiveBuffer.remaining();
169       bytesReceived = NativeUnixSocket.receive(pollFd.fds[0], pipeMsgReceiveBuffer, 0, maxReceive,
170           null, options, null, 1);
171     }
172 
173     if (bytesReceived == maxReceive && maxReceive > 0) {
174       // consume all pending bytes
175       int read;
176       do {
177         if ((read = NativeUnixSocket.poll(selectorPipePollFd, 0)) > 0) {
178           synchronized (pipeMsgReceiveBuffer) {
179             pipeMsgReceiveBuffer.clear();
180             read = NativeUnixSocket.receive(selectorPipePollFd.fds[0], pipeMsgReceiveBuffer, 0,
181                 maxReceive, null, options, null, 1);
182           }
183         }
184       } while (read == maxReceive && read > 0);
185     }
186   }
187 
188   private int updateSelectCount() {
189     int selectId = selectCount.incrementAndGet();
190     if (selectId == 0) {
191       // overflow (unlikely)
192       selectedKeysSet.markAllRemoved();
193       selectId = selectCount.incrementAndGet();
194     }
195     return selectId;
196   }
197 
198   private void setOpsReady(PollFd pfd, int selectId) {
199     if (pfd != null) {
200       for (int i = 1; i < pfd.rops.length; i++) {
201         int rops = pfd.rops[i];
202         AFSelectionKey key = pfd.keys[i];
203         key.setOpsReady(rops);
204         if (rops != 0 && keysRegistered.containsKey(key)) {
205           keysRegistered.put(key, selectId);
206         }
207       }
208     }
209   }
210 
211   @SuppressWarnings({"resource", "PMD.CognitiveComplexity"})
212   private PollFd initPollFd(PollFd existingPollFd) throws IOException {
213     synchronized (this) {
214       for (Iterator<AFSelectionKey> it = keysRegisteredKeySet.iterator(); it.hasNext();) {
215         AFSelectionKey key = it.next();
216         if (!key.getAFCore().fd.valid() || !key.isValid()) {
217           key.cancelNoRemove();
218           it.remove();
219           existingPollFd = null;
220         } else {
221           key.setOpsReady(0);
222         }
223       }
224 
225       if (existingPollFd != null && //
226           existingPollFd.keys != null && //
227           (existingPollFd.keys.length - 1) == keysRegistered.size()) {
228         boolean needsUpdate = false;
229         int i = 1;
230         for (AFSelectionKey key : keysRegisteredKeySet) {
231           if (existingPollFd.keys[i] != key || !key.isValid()) { // NOPMD
232             needsUpdate = true;
233             break;
234           }
235           existingPollFd.ops[i] = key.interestOps();
236 
237           i++;
238         }
239 
240         if (!needsUpdate) {
241           return existingPollFd;
242         }
243       }
244 
245       int keysToPoll = keysRegistered.size();
246       for (AFSelectionKey key : keysRegisteredKeySet) {
247         if (!key.isValid()) {
248           keysToPoll--;
249         }
250       }
251 
252       int size = keysToPoll + 1;
253       FileDescriptor[] fds = new FileDescriptor[size];
254       int[] ops = new int[size];
255 
256       AFSelectionKey[] keys = new AFSelectionKey[size];
257       fds[0] = selectorPipe.sourceFD();
258       ops[0] = SelectionKey.OP_READ;
259 
260       int i = 1;
261       for (AFSelectionKey key : keysRegisteredKeySet) {
262         if (!key.isValid()) {
263           continue;
264         }
265         keys[i] = key;
266         fds[i] = key.getAFCore().fd;
267         ops[i] = key.interestOps();
268         i++;
269       }
270       return new PollFd(keys, fds, ops);
271     }
272   }
273 
274   @Override
275   protected void implCloseSelector() throws IOException {
276     wakeup();
277     Set<SelectionKey> keys;
278     synchronized (this) {
279       keys = keys();
280       keysRegistered.clear();
281     }
282     for (SelectionKey key : keys) {
283       ((AFSelectionKey) key).cancelNoRemove();
284     }
285     selectorPipe.close();
286   }
287 
288   @Override
289   public Selector wakeup() {
290     if (isOpen()) {
291       try {
292         synchronized (pipeMsgWakeUp) {
293           pipeMsgWakeUp.clear();
294           try {
295             selectorPipe.sink().write(pipeMsgWakeUp);
296           } catch (SocketException e) {
297             if (selectorPipe.sinkFD().valid()) {
298               throw e;
299             } else {
300               // ignore (Broken pipe, etc)
301             }
302           }
303         }
304       } catch (IOException e) { // NOPMD.ExceptionAsFlowControl
305         // FIXME throw as runtimeexception?
306         StackTraceUtil.printStackTrace(e);
307       }
308     }
309     return this;
310   }
311 
312   synchronized void remove(AFSelectionKey key) {
313     selectedKeysSet.remove(key);
314     deregister(key);
315     pollFd = null;
316   }
317 
318   private void deregister(AFSelectionKey key) {
319     // super.deregister unnecessarily casts SelectionKey to AbstractSelectionKey, and
320     // ((AbstractSelectableChannel)key.channel()).removeKey(key); is not visible.
321     // so we have to resort to some JNI trickery...
322     try {
323       NativeUnixSocket.deregisterSelectionKey((AbstractSelectableChannel) key.channel(), key);
324     } catch (ClassCastException e) {
325       // because our key isn't an AbstractSelectableKey, internal invalidation fails
326       // but at that point, the key is deregistered
327     }
328   }
329 
330   static final class PollFd {
331     // accessed from native code
332     final FileDescriptor[] fds;
333     // accessed from native code
334     final int[] ops;
335     // accessed from native code
336     final int[] rops;
337 
338     final AFSelectionKey[] keys;
339 
340     PollFd(FileDescriptor pipeSourceFd) {
341       this(pipeSourceFd, SelectionKey.OP_READ);
342     }
343 
344     PollFd(FileDescriptor pipeSourceFd, int op) {
345       this.fds = new FileDescriptor[] {pipeSourceFd};
346       this.ops = new int[] {op};
347       this.rops = new int[1];
348       this.keys = null;
349     }
350 
351     @SuppressWarnings("PMD.ArrayIsStoredDirectly")
352     PollFd(AFSelectionKey[] keys, FileDescriptor[] fds, int[] ops) {
353       this.keys = keys;
354       if (fds.length != ops.length) {
355         throw new IllegalStateException();
356       }
357       this.fds = fds;
358       this.ops = ops;
359       this.rops = new int[ops.length];
360     }
361   }
362 }