View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix;
19  
20  import java.io.FileDescriptor;
21  import java.io.IOException;
22  import java.net.SocketAddress;
23  import java.net.SocketException;
24  import java.net.SocketTimeoutException;
25  import java.nio.ByteBuffer;
26  import java.nio.channels.AsynchronousCloseException;
27  import java.nio.channels.ClosedByInterruptException;
28  import java.nio.channels.ClosedChannelException;
29  import java.nio.channels.SelectionKey;
30  import java.util.Objects;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.eclipse.jdt.annotation.NonNull;
35  import org.newsclub.net.unix.pool.MutableHolder;
36  import org.newsclub.net.unix.pool.ObjectPool;
37  import org.newsclub.net.unix.pool.ObjectPool.Lease;
38  
39  import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
40  
41  /**
42   * The core functionality of file descriptor based I/O.
43   *
44   * @author Christian Kohlschütter
45   */
46  class AFCore extends CleanableState {
47    private static final ObjectPool<MutableHolder<ByteBuffer>> TL_BUFFER = ObjectPool
48        .newThreadLocalPool(() -> {
49          return new MutableHolder<>(null);
50        }, (o) -> {
51          ByteBuffer bb = o.get();
52          if (bb != null) {
53            bb.clear();
54          }
55          return true;
56        });
57  
58    private static final String PROP_TL_BUFFER_MAX_CAPACITY =
59        "org.newsclub.net.unix.thread-local-buffer.max-capacity"; // 0 means "no limit" (discouraged)
60  
61    private static final int TL_BUFFER_MIN_CAPACITY = 8192; // 8 kb per thread
62    private static final int TL_BUFFER_MAX_CAPACITY = Integer.parseInt(System.getProperty(
63        PROP_TL_BUFFER_MAX_CAPACITY, Integer.toString(1 * 1024 * 1024))); // 1 MB per thread
64  
65    private final AtomicBoolean closed = new AtomicBoolean(false);
66  
67    final FileDescriptor fd;
68    final AncillaryDataSupport ancillaryDataSupport;
69  
70    private final boolean datagramMode;
71  
72    private final AtomicInteger virtualBlockingLeases = new AtomicInteger(0);
73    private volatile boolean blocking = true;
74    private boolean cleanFd = true;
75  
76    AFCore(Object observed, FileDescriptor fd, AncillaryDataSupport ancillaryDataSupport,
77        boolean datagramMode) {
78      super(observed);
79      this.datagramMode = datagramMode;
80      this.ancillaryDataSupport = ancillaryDataSupport;
81  
82      this.fd = fd == null ? new FileDescriptor() : fd;
83    }
84  
85    AFCore(Object observed, FileDescriptor fd) {
86      this(observed, fd, null, false);
87    }
88  
89    @Override
90    protected final void doClean() {
91      if (fd != null && fd.valid() && cleanFd) {
92        try {
93          doClose();
94        } catch (IOException e) {
95          // ignore
96        }
97      }
98      if (ancillaryDataSupport != null) {
99        ancillaryDataSupport.close();
100     }
101   }
102 
103   void disableCleanFd() {
104     this.cleanFd = false;
105   }
106 
107   boolean isClosed() {
108     return closed.get();
109   }
110 
111   void doClose() throws IOException {
112     if (closed.compareAndSet(false, true)) {
113       NativeUnixSocket.close(fd);
114     }
115   }
116 
117   FileDescriptor validFdOrException() throws SocketException {
118     FileDescriptor fdesc = validFd();
119     if (fdesc == null) {
120       closed.set(true);
121       throw new SocketClosedException("Not open");
122     }
123     return fdesc;
124   }
125 
126   synchronized FileDescriptor validFd() {
127     if (isClosed()) {
128       return null;
129     }
130     FileDescriptor descriptor = this.fd;
131     if (descriptor != null) {
132       if (descriptor.valid()) {
133         return descriptor;
134       }
135     }
136     return null;
137   }
138 
139   int read(ByteBuffer dst, AFSupplier<Integer> timeout) throws IOException {
140     return read(dst, timeout, null, 0);
141   }
142 
143   @SuppressWarnings({
144       "PMD.NcssCount", "PMD.CognitiveComplexity", "PMD.CyclomaticComplexity",
145       "PMD.NPathComplexity"})
146   @SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
147   int read(ByteBuffer dst, AFSupplier<Integer> timeout, ByteBuffer socketAddressBuffer, int options)
148       throws IOException {
149     int remaining = dst.remaining();
150     if (remaining == 0) {
151       return 0;
152     }
153     FileDescriptor fdesc = validFdOrException();
154 
155     int dstPos = dst.position();
156 
157     ByteBuffer buf;
158     int pos;
159 
160     boolean direct = dst.isDirect();
161 
162     final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && isBlocking())
163         || isVirtualBlocking();
164     final long now;
165     if (virtualBlocking) {
166       now = System.currentTimeMillis();
167     } else {
168       now = 0;
169     }
170     if (virtualBlocking || !blocking) {
171       options |= NativeUnixSocket.OPT_NON_BLOCKING;
172     }
173 
174     boolean park = false;
175 
176     int count;
177     virtualThreadLoop : do {
178       if (virtualBlocking) {
179         if (park) {
180           VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
181               timeout, this::close);
182         }
183         configureVirtualBlocking(true);
184       }
185 
186       try (Lease<MutableHolder<ByteBuffer>> lease = direct ? null : getPrivateDirectByteBuffer(
187           remaining)) {
188         if (direct) {
189           buf = dst;
190           pos = dstPos;
191         } else {
192           buf = Objects.requireNonNull(Objects.requireNonNull(lease).get().get());
193           remaining = Math.min(remaining, buf.remaining());
194           pos = buf.position();
195           buf.limit(pos + remaining);
196         }
197 
198         try {
199           count = NativeUnixSocket.receive(fdesc, buf, pos, remaining, socketAddressBuffer, options,
200               ancillaryDataSupport, 0);
201           if (count == 0 && virtualBlocking) {
202             // try again
203             park = true;
204             continue virtualThreadLoop;
205           }
206         } catch (AsynchronousCloseException e) {
207           throw e;
208         } catch (ClosedChannelException e) {
209           if (isClosed()) {
210             throw e;
211           } else if (Thread.currentThread().isInterrupted()) {
212             throw (ClosedByInterruptException) new ClosedByInterruptException().initCause(e);
213           } else {
214             throw (AsynchronousCloseException) new AsynchronousCloseException().initCause(e);
215           }
216         } catch (SocketTimeoutException e) {
217           if (virtualBlocking) {
218             // try again
219             park = true;
220             continue virtualThreadLoop;
221           } else {
222             throw e;
223           }
224         }
225 
226         if (count == -1 || buf == null) {
227           return -1;
228         }
229 
230         if (direct) {
231           if (count < 0) {
232             throw new IllegalStateException();
233           }
234           dst.position(pos + count);
235         } else {
236           int oldLimit = buf.limit();
237           if (count < oldLimit) {
238             buf.limit(count);
239           }
240           try {
241             while (buf.hasRemaining()) {
242               dst.put(buf);
243             }
244           } finally {
245             if (count < oldLimit) {
246               buf.limit(oldLimit);
247             }
248           }
249         }
250       } finally {
251         if (virtualBlocking) {
252           configureVirtualBlocking(false);
253         }
254       }
255       break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
256     } while (true); // NOPMD.WhileLoopWithLiteralBoolean
257 
258     return count;
259   }
260 
261   int write(ByteBuffer src, AFSupplier<Integer> timeout) throws IOException {
262     return write(src, timeout, null, 0);
263   }
264 
265   @SuppressWarnings({
266       "PMD.NcssCount", "PMD.CognitiveComplexity", "PMD.CyclomaticComplexity",
267       "PMD.NPathComplexity"})
268   @SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
269   int write(ByteBuffer src, AFSupplier<Integer> timeout, SocketAddress target, int options)
270       throws IOException {
271     int remaining = src.remaining();
272 
273     if (remaining == 0) {
274       return 0;
275     }
276 
277     FileDescriptor fdesc = validFdOrException();
278     final ByteBuffer addressTo;
279     final int addressToLen;
280     try (Lease<ByteBuffer> addressToLease = target == null ? null
281         : AFSocketAddress.SOCKETADDRESS_BUFFER_TL.take()) {
282       if (addressToLease == null) {
283         addressTo = null;
284         addressToLen = 0;
285       } else {
286         addressTo = addressToLease.get();
287         addressToLen = AFSocketAddress.unwrapAddressDirectBufferInternal(addressTo, target);
288       }
289 
290       // accept "send buffer overflow" as packet loss
291       // and don't retry (which may slow things down quite a bit)
292 
293       int pos = src.position();
294       boolean isDirect = src.isDirect();
295       ByteBuffer buf;
296       int bufPos;
297 
298       final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && isBlocking())
299           || isVirtualBlocking();
300       final long now;
301       if (virtualBlocking) {
302         now = System.currentTimeMillis();
303       } else {
304         now = 0;
305       }
306       if (virtualBlocking || !blocking) {
307         options |= NativeUnixSocket.OPT_NON_BLOCKING;
308       }
309       if (datagramMode) {
310         options |= NativeUnixSocket.OPT_DGRAM_MODE;
311       }
312 
313       int written;
314 
315       boolean park = false;
316       virtualThreadLoop : do {
317         if (virtualBlocking) {
318           if (park) {
319             VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
320                 timeout, this::close);
321           }
322           configureVirtualBlocking(true);
323         }
324 
325         try (Lease<MutableHolder<ByteBuffer>> lease = isDirect ? null : getPrivateDirectByteBuffer(
326             remaining)) {
327           if (isDirect) {
328             buf = src;
329             bufPos = pos;
330           } else {
331             buf = Objects.requireNonNull(Objects.requireNonNull(lease).get().get());
332             remaining = Math.min(remaining, buf.remaining());
333 
334             bufPos = buf.position();
335 
336             while (src.hasRemaining() && buf.hasRemaining()) {
337               buf.put(src);
338             }
339 
340             buf.position(bufPos);
341           }
342 
343           written = NativeUnixSocket.send(fdesc, buf, bufPos, remaining, addressTo, addressToLen,
344               options, ancillaryDataSupport);
345           if (written == 0 && virtualBlocking) {
346             // try again
347             park = true;
348             continue virtualThreadLoop;
349           }
350         } catch (SocketTimeoutException e) {
351           if (virtualBlocking) {
352             // try again
353             park = true;
354             continue virtualThreadLoop;
355           } else {
356             throw e;
357           }
358         } finally {
359           if (virtualBlocking) {
360             configureVirtualBlocking(false);
361           }
362         }
363         break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
364       } while (true); // NOPMD.WhileLoopWithLiteralBoolean
365       src.position(pos + written);
366       return written;
367     }
368   }
369 
370   /**
371    * Returns a per-thread reusable byte buffer for a given capacity.
372    *
373    * If a thread-local buffer currently uses a smaller capacity, the buffer is replaced by a larger
374    * one. If the capacity exceeds a configurable maximum, a new direct buffer is allocated but not
375    * cached (i.e., the previously cached one is kept but not immediately returned to the caller).
376    *
377    * @param capacity The desired capacity.
378    * @return A byte buffer satisfying the requested capacity.
379    */
380   @SuppressWarnings("null")
381   Lease<MutableHolder<@NonNull ByteBuffer>> getPrivateDirectByteBuffer(int capacity) {
382     if (capacity > TL_BUFFER_MAX_CAPACITY && TL_BUFFER_MAX_CAPACITY > 0) {
383       // Capacity exceeds configurable maximum limit;
384       // allocate but do not cache direct buffer.
385       // This may incur a performance penalty at the cost of correctness when using such capacities.
386       return ObjectPool.unpooledLease(new MutableHolder<>(ByteBuffer.allocateDirect(capacity)));
387     }
388     if (capacity < TL_BUFFER_MIN_CAPACITY) {
389       capacity = TL_BUFFER_MIN_CAPACITY;
390     }
391     Lease<MutableHolder<ByteBuffer>> lease = TL_BUFFER.take();
392     MutableHolder<ByteBuffer> holder = lease.get();
393     ByteBuffer buffer = holder.get();
394     if (buffer == null || capacity > buffer.capacity()) {
395       buffer = ByteBuffer.allocateDirect(capacity);
396       holder.set(buffer);
397     }
398     buffer.clear();
399     return lease;
400   }
401 
402   void implConfigureBlocking(boolean block) throws IOException {
403     this.blocking = block;
404     if (block && isVirtualBlocking()) {
405       // do not actually change it here, defer it to when the virtual blocking counter goes to 0
406     } else {
407       NativeUnixSocket.configureBlocking(validFdOrException(), block);
408     }
409   }
410 
411   /**
412    * Increments/decrements the "virtual blocking" counter (calls must be in pairs/balanced using
413    * try-finally blocks).
414    *
415    * @param enabled {@code true} if increment, {@code false} if decrement.
416    * @throws SocketException on error.
417    * @throws IOException on error, including count overflow/underflow.
418    */
419   void configureVirtualBlocking(boolean enabled) throws SocketException, IOException {
420     int v;
421     if (enabled) {
422       if ((v = this.virtualBlockingLeases.incrementAndGet()) >= 1 && blocking) {
423         NativeUnixSocket.configureBlocking(validFdOrException(), false);
424       }
425       if (v >= Integer.MAX_VALUE) {
426         throw new IOException("blocking overflow");
427       }
428     } else {
429       if ((v = this.virtualBlockingLeases.decrementAndGet()) == 0 && blocking) {
430         NativeUnixSocket.configureBlocking(validFdOrException(), true);
431       }
432       if (v < 0) {
433         throw new IOException("blocking underflow");
434       }
435     }
436   }
437 
438   boolean isVirtualBlocking() {
439     return virtualBlockingLeases.get() > 0;
440   }
441 
442   boolean isBlocking() {
443     return blocking;
444   }
445 }