1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.newsclub.net.unix;
19
20 import java.io.FileDescriptor;
21 import java.io.IOException;
22 import java.net.SocketException;
23 import java.net.SocketTimeoutException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.ClosedSelectorException;
26 import java.nio.channels.SelectableChannel;
27 import java.nio.channels.SelectionKey;
28 import java.nio.channels.Selector;
29 import java.nio.channels.spi.AbstractSelectableChannel;
30 import java.nio.channels.spi.AbstractSelector;
31 import java.util.Collections;
32 import java.util.Iterator;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 final class AFSelector extends AbstractSelector {
39 private final AFPipe selectorPipe;
40 private final PollFd selectorPipePollFd;
41
42 private final ByteBuffer pipeMsgWakeUp = ByteBuffer.allocate(1);
43 private final ByteBuffer pipeMsgReceiveBuffer = ByteBuffer.allocateDirect(256);
44
45 private final Map<AFSelectionKey, Integer> keysRegistered = new ConcurrentHashMap<>();
46 private final Set<AFSelectionKey> keysRegisteredKeySet = keysRegistered.keySet();
47 private final Set<SelectionKey> keysRegisteredPublic = Collections.unmodifiableSet(
48 keysRegisteredKeySet);
49
50 private final AtomicInteger selectCount = new AtomicInteger(0);
51
52 @SuppressWarnings("PMD.LooseCoupling")
53 private final MapValueSet<SelectionKey, Integer> selectedKeysSet =
54 new MapValueSet<SelectionKey, Integer>(keysRegistered, selectCount::get, 0);
55 private final Set<SelectionKey> selectedKeysPublic = new UngrowableSet<>(selectedKeysSet);
56
57 private PollFd pollFd = null;
58
59 AFSelector(AFSelectorProvider<?> provider) throws IOException {
60 super(provider);
61
62 this.selectorPipe = AFUNIXSelectorProvider.getInstance().openSelectablePipe();
63 this.selectorPipePollFd = new PollFd(selectorPipe.sourceFD());
64 }
65
66 @Override
67 protected SelectionKey register(AbstractSelectableChannel ch, int ops, Object att) {
68 AFSelectionKey key = new AFSelectionKey(this, ch, ops, att);
69 synchronized (this) {
70 pollFd = null;
71 selectedKeysSet.markRemoved(key);
72 }
73 return key;
74 }
75
76 @Override
77 public Set<SelectionKey> keys() {
78 return keysRegisteredPublic;
79 }
80
81 @Override
82 public Set<SelectionKey> selectedKeys() {
83 return selectedKeysPublic;
84 }
85
86 @Override
87 public int selectNow() throws IOException {
88 return select0(0);
89 }
90
91 @Override
92 public int select(long timeout) throws IOException {
93 if (timeout > Integer.MAX_VALUE) {
94 timeout = Integer.MAX_VALUE;
95 } else if (timeout < 0) {
96 throw new IllegalArgumentException("Timeout must not be negative");
97 }
98
99 return select0((int) timeout);
100 }
101
102 @Override
103 public int select() throws IOException {
104 try {
105 return select0(-1);
106 } catch (SocketTimeoutException e) {
107 return 0;
108 }
109 }
110
111 @SuppressWarnings("PMD.CognitiveComplexity")
112 private int select0(int timeout) throws IOException {
113 PollFd pfd;
114
115 int selectId = updateSelectCount();
116
117 synchronized (this) {
118 if (!isOpen()) {
119 throw new ClosedSelectorException();
120 }
121
122 pfd = pollFd = initPollFd(pollFd);
123 }
124 int num;
125 try {
126 begin();
127 num = NativeUnixSocket.poll(pfd, timeout);
128 } finally {
129 end();
130 }
131 synchronized (this) {
132 pfd = pollFd;
133 if (pfd != null) {
134 AFSelectionKey[] keys = pfd.keys;
135 if (keys != null) {
136 for (AFSelectionKey key : keys) {
137 if (key != null && key.hasOpInvalid()) {
138 SelectableChannel ch = key.channel();
139 if (ch != null && ch.isOpen()) {
140 ch.close();
141 }
142 }
143 }
144 }
145 }
146 if (num > 0) {
147 consumeAllBytesAfterPoll();
148 setOpsReady(pfd, selectId);
149 }
150 return selectedKeysSet.size();
151 }
152 }
153
154 private synchronized void consumeAllBytesAfterPoll() throws IOException {
155 if (pollFd == null) {
156 return;
157 }
158 if ((pollFd.rops[0] & SelectionKey.OP_READ) == 0) {
159 return;
160 }
161 int maxReceive;
162 int bytesReceived;
163
164 int options = selectorPipe.getOptions();
165
166 synchronized (pipeMsgReceiveBuffer) {
167 pipeMsgReceiveBuffer.clear();
168 maxReceive = pipeMsgReceiveBuffer.remaining();
169 bytesReceived = NativeUnixSocket.receive(pollFd.fds[0], pipeMsgReceiveBuffer, 0, maxReceive,
170 null, options, null, 1);
171 }
172
173 if (bytesReceived == maxReceive && maxReceive > 0) {
174
175 int read;
176 do {
177 if ((read = NativeUnixSocket.poll(selectorPipePollFd, 0)) > 0) {
178 synchronized (pipeMsgReceiveBuffer) {
179 pipeMsgReceiveBuffer.clear();
180 read = NativeUnixSocket.receive(selectorPipePollFd.fds[0], pipeMsgReceiveBuffer, 0,
181 maxReceive, null, options, null, 1);
182 }
183 }
184 } while (read == maxReceive && read > 0);
185 }
186 }
187
188 private int updateSelectCount() {
189 int selectId = selectCount.incrementAndGet();
190 if (selectId == 0) {
191
192 selectedKeysSet.markAllRemoved();
193 selectId = selectCount.incrementAndGet();
194 }
195 return selectId;
196 }
197
198 private void setOpsReady(PollFd pfd, int selectId) {
199 if (pfd != null) {
200 for (int i = 1; i < pfd.rops.length; i++) {
201 int rops = pfd.rops[i];
202 AFSelectionKey key = pfd.keys[i];
203 key.setOpsReady(rops);
204 if (rops != 0 && keysRegistered.containsKey(key)) {
205 keysRegistered.put(key, selectId);
206 }
207 }
208 }
209 }
210
211 @SuppressWarnings({"resource", "PMD.CognitiveComplexity"})
212 private PollFd initPollFd(PollFd existingPollFd) throws IOException {
213 synchronized (this) {
214 for (Iterator<AFSelectionKey> it = keysRegisteredKeySet.iterator(); it.hasNext();) {
215 AFSelectionKey key = it.next();
216 if (!key.getAFCore().fd.valid() || !key.isValid()) {
217 key.cancelNoRemove();
218 it.remove();
219 existingPollFd = null;
220 } else {
221 key.setOpsReady(0);
222 }
223 }
224
225 if (existingPollFd != null &&
226 existingPollFd.keys != null &&
227 (existingPollFd.keys.length - 1) == keysRegistered.size()) {
228 boolean needsUpdate = false;
229 int i = 1;
230 for (AFSelectionKey key : keysRegisteredKeySet) {
231 if (existingPollFd.keys[i] != key || !key.isValid()) {
232 needsUpdate = true;
233 break;
234 }
235 existingPollFd.ops[i] = key.interestOps();
236
237 i++;
238 }
239
240 if (!needsUpdate) {
241 return existingPollFd;
242 }
243 }
244
245 int keysToPoll = keysRegistered.size();
246 for (AFSelectionKey key : keysRegisteredKeySet) {
247 if (!key.isValid()) {
248 keysToPoll--;
249 }
250 }
251
252 int size = keysToPoll + 1;
253 FileDescriptor[] fds = new FileDescriptor[size];
254 int[] ops = new int[size];
255
256 AFSelectionKey[] keys = new AFSelectionKey[size];
257 fds[0] = selectorPipe.sourceFD();
258 ops[0] = SelectionKey.OP_READ;
259
260 int i = 1;
261 for (AFSelectionKey key : keysRegisteredKeySet) {
262 if (!key.isValid()) {
263 continue;
264 }
265 keys[i] = key;
266 fds[i] = key.getAFCore().fd;
267 ops[i] = key.interestOps();
268 i++;
269 }
270 return new PollFd(keys, fds, ops);
271 }
272 }
273
274 @Override
275 protected void implCloseSelector() throws IOException {
276 wakeup();
277 Set<SelectionKey> keys;
278 synchronized (this) {
279 keys = keys();
280 keysRegistered.clear();
281 }
282 for (SelectionKey key : keys) {
283 ((AFSelectionKey) key).cancelNoRemove();
284 }
285 selectorPipe.close();
286 }
287
288 @Override
289 public Selector wakeup() {
290 if (isOpen()) {
291 try {
292 synchronized (pipeMsgWakeUp) {
293 pipeMsgWakeUp.clear();
294 try {
295 selectorPipe.sink().write(pipeMsgWakeUp);
296 } catch (SocketException e) {
297 if (selectorPipe.sinkFD().valid()) {
298 throw e;
299 } else {
300
301 }
302 }
303 }
304 } catch (IOException e) {
305
306 StackTraceUtil.printStackTrace(e);
307 }
308 }
309 return this;
310 }
311
312 synchronized void remove(AFSelectionKey key) {
313 selectedKeysSet.remove(key);
314 deregister(key);
315 pollFd = null;
316 }
317
318 private void deregister(AFSelectionKey key) {
319
320
321
322 try {
323 NativeUnixSocket.deregisterSelectionKey((AbstractSelectableChannel) key.channel(), key);
324 } catch (ClassCastException e) {
325
326
327 }
328 }
329
330 static final class PollFd {
331
332 final FileDescriptor[] fds;
333
334 final int[] ops;
335
336 final int[] rops;
337
338 final AFSelectionKey[] keys;
339
340 PollFd(FileDescriptor pipeSourceFd) {
341 this(pipeSourceFd, SelectionKey.OP_READ);
342 }
343
344 PollFd(FileDescriptor pipeSourceFd, int op) {
345 this.fds = new FileDescriptor[] {pipeSourceFd};
346 this.ops = new int[] {op};
347 this.rops = new int[1];
348 this.keys = null;
349 }
350
351 @SuppressWarnings("PMD.ArrayIsStoredDirectly")
352 PollFd(AFSelectionKey[] keys, FileDescriptor[] fds, int[] ops) {
353 this.keys = keys;
354 if (fds.length != ops.length) {
355 throw new IllegalStateException();
356 }
357 this.fds = fds;
358 this.ops = ops;
359 this.rops = new int[ops.length];
360 }
361 }
362 }