1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.newsclub.net.unix;
19
20 import java.io.FileDescriptor;
21 import java.io.IOException;
22 import java.net.SocketAddress;
23 import java.net.SocketException;
24 import java.net.SocketTimeoutException;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.AsynchronousCloseException;
27 import java.nio.channels.ClosedByInterruptException;
28 import java.nio.channels.ClosedChannelException;
29 import java.nio.channels.SelectionKey;
30 import java.util.Objects;
31 import java.util.concurrent.atomic.AtomicBoolean;
32 import java.util.concurrent.atomic.AtomicInteger;
33
34 import org.eclipse.jdt.annotation.NonNull;
35 import org.newsclub.net.unix.pool.MutableHolder;
36 import org.newsclub.net.unix.pool.ObjectPool;
37 import org.newsclub.net.unix.pool.ObjectPool.Lease;
38
39 import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
40
41
42
43
44
45
46 class AFCore extends CleanableState {
47 private static final ObjectPool<MutableHolder<ByteBuffer>> TL_BUFFER = ObjectPool
48 .newThreadLocalPool(() -> {
49 return new MutableHolder<>(null);
50 }, (o) -> {
51 ByteBuffer bb = o.get();
52 if (bb != null) {
53 bb.clear();
54 }
55 return true;
56 });
57
58 private static final String PROP_TL_BUFFER_MAX_CAPACITY =
59 "org.newsclub.net.unix.thread-local-buffer.max-capacity";
60
61 private static final int TL_BUFFER_MIN_CAPACITY = 8192;
62 private static final int TL_BUFFER_MAX_CAPACITY = Integer.parseInt(System.getProperty(
63 PROP_TL_BUFFER_MAX_CAPACITY, Integer.toString(1 * 1024 * 1024)));
64
65 private final AtomicBoolean closed = new AtomicBoolean(false);
66
67 final FileDescriptor fd;
68 final AncillaryDataSupport ancillaryDataSupport;
69
70 private final boolean datagramMode;
71
72 private final AtomicInteger virtualBlockingLeases = new AtomicInteger(0);
73 private volatile boolean blocking = true;
74 private boolean cleanFd = true;
75
76 AFCore(Object observed, FileDescriptor fd, AncillaryDataSupport ancillaryDataSupport,
77 boolean datagramMode) {
78 super(observed);
79 this.datagramMode = datagramMode;
80 this.ancillaryDataSupport = ancillaryDataSupport;
81
82 this.fd = fd == null ? new FileDescriptor() : fd;
83 }
84
85 AFCore(Object observed, FileDescriptor fd) {
86 this(observed, fd, null, false);
87 }
88
89 @Override
90 protected final void doClean() {
91 if (fd != null && fd.valid() && cleanFd) {
92 try {
93 doClose();
94 } catch (IOException e) {
95
96 }
97 }
98 if (ancillaryDataSupport != null) {
99 ancillaryDataSupport.close();
100 }
101 }
102
103 void disableCleanFd() {
104 this.cleanFd = false;
105 }
106
107 boolean isClosed() {
108 return closed.get();
109 }
110
111 void doClose() throws IOException {
112 if (closed.compareAndSet(false, true)) {
113 NativeUnixSocket.close(fd);
114 }
115 }
116
117 FileDescriptor validFdOrException() throws SocketException {
118 FileDescriptor fdesc = validFd();
119 if (fdesc == null) {
120 closed.set(true);
121 throw new SocketClosedException("Not open");
122 }
123 return fdesc;
124 }
125
126 synchronized FileDescriptor validFd() {
127 if (isClosed()) {
128 return null;
129 }
130 FileDescriptor descriptor = this.fd;
131 if (descriptor != null) {
132 if (descriptor.valid()) {
133 return descriptor;
134 }
135 }
136 return null;
137 }
138
139 int read(ByteBuffer dst, AFSupplier<Integer> timeout) throws IOException {
140 return read(dst, timeout, null, 0);
141 }
142
143 @SuppressWarnings({
144 "PMD.NcssCount", "PMD.CognitiveComplexity", "PMD.CyclomaticComplexity",
145 "PMD.NPathComplexity"})
146 @SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
147 int read(ByteBuffer dst, AFSupplier<Integer> timeout, ByteBuffer socketAddressBuffer, int options)
148 throws IOException {
149 int remaining = dst.remaining();
150 if (remaining == 0) {
151 return 0;
152 }
153 FileDescriptor fdesc = validFdOrException();
154
155 int dstPos = dst.position();
156
157 ByteBuffer buf;
158 int pos;
159
160 boolean direct = dst.isDirect();
161
162 final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && isBlocking())
163 || isVirtualBlocking();
164 final long now;
165 if (virtualBlocking) {
166 now = System.currentTimeMillis();
167 } else {
168 now = 0;
169 }
170 if (virtualBlocking || !blocking) {
171 options |= NativeUnixSocket.OPT_NON_BLOCKING;
172 }
173
174 boolean park = false;
175
176 int count;
177 virtualThreadLoop : do {
178 if (virtualBlocking) {
179 if (park) {
180 VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
181 timeout, this::close);
182 }
183 configureVirtualBlocking(true);
184 }
185
186 try (Lease<MutableHolder<ByteBuffer>> lease = direct ? null : getPrivateDirectByteBuffer(
187 remaining)) {
188 if (direct) {
189 buf = dst;
190 pos = dstPos;
191 } else {
192 buf = Objects.requireNonNull(Objects.requireNonNull(lease).get().get());
193 remaining = Math.min(remaining, buf.remaining());
194 pos = buf.position();
195 buf.limit(pos + remaining);
196 }
197
198 try {
199 count = NativeUnixSocket.receive(fdesc, buf, pos, remaining, socketAddressBuffer, options,
200 ancillaryDataSupport, 0);
201 if (count == 0 && virtualBlocking) {
202
203 park = true;
204 continue virtualThreadLoop;
205 }
206 } catch (AsynchronousCloseException e) {
207 throw e;
208 } catch (ClosedChannelException e) {
209 if (isClosed()) {
210 throw e;
211 } else if (Thread.currentThread().isInterrupted()) {
212 throw (ClosedByInterruptException) new ClosedByInterruptException().initCause(e);
213 } else {
214 throw (AsynchronousCloseException) new AsynchronousCloseException().initCause(e);
215 }
216 } catch (SocketTimeoutException e) {
217 if (virtualBlocking) {
218
219 park = true;
220 continue virtualThreadLoop;
221 } else {
222 throw e;
223 }
224 }
225
226 if (count == -1 || buf == null) {
227 return -1;
228 }
229
230 if (direct) {
231 if (count < 0) {
232 throw new IllegalStateException();
233 }
234 dst.position(pos + count);
235 } else {
236 int oldLimit = buf.limit();
237 if (count < oldLimit) {
238 buf.limit(count);
239 }
240 try {
241 while (buf.hasRemaining()) {
242 dst.put(buf);
243 }
244 } finally {
245 if (count < oldLimit) {
246 buf.limit(oldLimit);
247 }
248 }
249 }
250 } finally {
251 if (virtualBlocking) {
252 configureVirtualBlocking(false);
253 }
254 }
255 break;
256 } while (true);
257
258 return count;
259 }
260
261 int write(ByteBuffer src, AFSupplier<Integer> timeout) throws IOException {
262 return write(src, timeout, null, 0);
263 }
264
265 @SuppressWarnings({
266 "PMD.NcssCount", "PMD.CognitiveComplexity", "PMD.CyclomaticComplexity",
267 "PMD.NPathComplexity"})
268 @SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
269 int write(ByteBuffer src, AFSupplier<Integer> timeout, SocketAddress target, int options)
270 throws IOException {
271 int remaining = src.remaining();
272
273 if (remaining == 0) {
274 return 0;
275 }
276
277 FileDescriptor fdesc = validFdOrException();
278 final ByteBuffer addressTo;
279 final int addressToLen;
280 try (Lease<ByteBuffer> addressToLease = target == null ? null
281 : AFSocketAddress.SOCKETADDRESS_BUFFER_TL.take()) {
282 if (addressToLease == null) {
283 addressTo = null;
284 addressToLen = 0;
285 } else {
286 addressTo = addressToLease.get();
287 addressToLen = AFSocketAddress.unwrapAddressDirectBufferInternal(addressTo, target);
288 }
289
290
291
292
293 int pos = src.position();
294 boolean isDirect = src.isDirect();
295 ByteBuffer buf;
296 int bufPos;
297
298 final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && isBlocking())
299 || isVirtualBlocking();
300 final long now;
301 if (virtualBlocking) {
302 now = System.currentTimeMillis();
303 } else {
304 now = 0;
305 }
306 if (virtualBlocking || !blocking) {
307 options |= NativeUnixSocket.OPT_NON_BLOCKING;
308 }
309 if (datagramMode) {
310 options |= NativeUnixSocket.OPT_DGRAM_MODE;
311 }
312
313 int written;
314
315 boolean park = false;
316 virtualThreadLoop : do {
317 if (virtualBlocking) {
318 if (park) {
319 VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
320 timeout, this::close);
321 }
322 configureVirtualBlocking(true);
323 }
324
325 try (Lease<MutableHolder<ByteBuffer>> lease = isDirect ? null : getPrivateDirectByteBuffer(
326 remaining)) {
327 if (isDirect) {
328 buf = src;
329 bufPos = pos;
330 } else {
331 buf = Objects.requireNonNull(Objects.requireNonNull(lease).get().get());
332 remaining = Math.min(remaining, buf.remaining());
333
334 bufPos = buf.position();
335
336 while (src.hasRemaining() && buf.hasRemaining()) {
337 buf.put(src);
338 }
339
340 buf.position(bufPos);
341 }
342
343 written = NativeUnixSocket.send(fdesc, buf, bufPos, remaining, addressTo, addressToLen,
344 options, ancillaryDataSupport);
345 if (written == 0 && virtualBlocking) {
346
347 park = true;
348 continue virtualThreadLoop;
349 }
350 } catch (SocketTimeoutException e) {
351 if (virtualBlocking) {
352
353 park = true;
354 continue virtualThreadLoop;
355 } else {
356 throw e;
357 }
358 } finally {
359 if (virtualBlocking) {
360 configureVirtualBlocking(false);
361 }
362 }
363 break;
364 } while (true);
365 src.position(pos + written);
366 return written;
367 }
368 }
369
370
371
372
373
374
375
376
377
378
379
380 @SuppressWarnings("null")
381 Lease<MutableHolder<@NonNull ByteBuffer>> getPrivateDirectByteBuffer(int capacity) {
382 if (capacity > TL_BUFFER_MAX_CAPACITY && TL_BUFFER_MAX_CAPACITY > 0) {
383
384
385
386 return ObjectPool.unpooledLease(new MutableHolder<>(ByteBuffer.allocateDirect(capacity)));
387 }
388 if (capacity < TL_BUFFER_MIN_CAPACITY) {
389 capacity = TL_BUFFER_MIN_CAPACITY;
390 }
391 Lease<MutableHolder<ByteBuffer>> lease = TL_BUFFER.take();
392 MutableHolder<ByteBuffer> holder = lease.get();
393 ByteBuffer buffer = holder.get();
394 if (buffer == null || capacity > buffer.capacity()) {
395 buffer = ByteBuffer.allocateDirect(capacity);
396 holder.set(buffer);
397 }
398 buffer.clear();
399 return lease;
400 }
401
402 void implConfigureBlocking(boolean block) throws IOException {
403 this.blocking = block;
404 if (block && isVirtualBlocking()) {
405
406 } else {
407 NativeUnixSocket.configureBlocking(validFdOrException(), block);
408 }
409 }
410
411
412
413
414
415
416
417
418
419 void configureVirtualBlocking(boolean enabled) throws SocketException, IOException {
420 int v;
421 if (enabled) {
422 if ((v = this.virtualBlockingLeases.incrementAndGet()) >= 1 && blocking) {
423 NativeUnixSocket.configureBlocking(validFdOrException(), false);
424 }
425 if (v >= Integer.MAX_VALUE) {
426 throw new IOException("blocking overflow");
427 }
428 } else {
429 if ((v = this.virtualBlockingLeases.decrementAndGet()) == 0 && blocking) {
430 NativeUnixSocket.configureBlocking(validFdOrException(), true);
431 }
432 if (v < 0) {
433 throw new IOException("blocking underflow");
434 }
435 }
436 }
437
438 boolean isVirtualBlocking() {
439 return virtualBlockingLeases.get() > 0;
440 }
441
442 boolean isBlocking() {
443 return blocking;
444 }
445 }