View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix;
19  
20  import java.io.FileDescriptor;
21  import java.io.IOException;
22  import java.net.SocketAddress;
23  import java.net.SocketException;
24  import java.nio.ByteBuffer;
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  /**
28   * The core functionality of file descriptor based I/O.
29   *
30   * @author Christian Kohlschütter
31   */
32  class AFCore extends CleanableState {
33    private static final ThreadLocal<ByteBuffer> TL_BUFFER = new ThreadLocal<>();
34  
35    private static final String PROP_TL_BUFFER_MAX_CAPACITY =
36        "org.newsclub.net.unix.thread-local-buffer.max-capacity"; // 0 means "no limit" (discouraged)
37  
38    private static final int TL_BUFFER_MIN_CAPACITY = 8192; // 8 kb per thread
39    private static final int TL_BUFFER_MAX_CAPACITY = Integer.parseInt(System.getProperty(
40        PROP_TL_BUFFER_MAX_CAPACITY, Integer.toString(1 * 1024 * 1024))); // 1 MB per thread
41  
42    private final AtomicBoolean closed = new AtomicBoolean(false);
43  
44    final FileDescriptor fd;
45    final AncillaryDataSupport ancillaryDataSupport;
46  
47    private final boolean datagramMode;
48  
49    private boolean blocking = true;
50    private boolean cleanFd = true;
51  
52    AFCore(Object observed, FileDescriptor fd, AncillaryDataSupport ancillaryDataSupport,
53        boolean datagramMode) {
54      super(observed);
55      this.datagramMode = datagramMode;
56      this.ancillaryDataSupport = ancillaryDataSupport;
57  
58      this.fd = fd == null ? new FileDescriptor() : fd;
59    }
60  
61    AFCore(Object observed, FileDescriptor fd) {
62      this(observed, fd, null, false);
63    }
64  
65    @Override
66    protected final void doClean() {
67      if (fd != null && fd.valid() && cleanFd) {
68        try {
69          doClose();
70        } catch (IOException e) {
71          // ignore
72        }
73      }
74      if (ancillaryDataSupport != null) {
75        ancillaryDataSupport.close();
76      }
77    }
78  
79    void disableCleanFd() {
80      this.cleanFd = false;
81    }
82  
83    boolean isClosed() {
84      return closed.get();
85    }
86  
87    void doClose() throws IOException {
88      if (closed.compareAndSet(false, true)) {
89        NativeUnixSocket.close(fd);
90      }
91    }
92  
93    FileDescriptor validFdOrException() throws SocketException {
94      FileDescriptor fdesc = validFd();
95      if (fdesc == null) {
96        closed.set(true);
97        throw new SocketClosedException("Not open");
98      }
99      return fdesc;
100   }
101 
102   synchronized FileDescriptor validFd() {
103     if (isClosed()) {
104       return null;
105     }
106     FileDescriptor descriptor = this.fd;
107     if (descriptor != null) {
108       if (descriptor.valid()) {
109         return descriptor;
110       }
111     }
112     return null;
113   }
114 
115   int read(ByteBuffer dst) throws IOException {
116     return read(dst, null, 0);
117   }
118 
119   int read(ByteBuffer dst, ByteBuffer socketAddressBuffer, int options) throws IOException {
120     int remaining = dst.remaining();
121     if (remaining == 0) {
122       return 0;
123     }
124     FileDescriptor fdesc = validFdOrException();
125 
126     int dstPos = dst.position();
127 
128     ByteBuffer buf;
129     int pos;
130 
131     boolean direct = dst.isDirect();
132     if (direct) {
133       buf = dst;
134       pos = dstPos;
135     } else {
136       buf = getThreadLocalDirectByteBuffer(remaining);
137       remaining = Math.min(remaining, buf.remaining());
138       pos = buf.position();
139     }
140 
141     if (!blocking) {
142       options |= NativeUnixSocket.OPT_NON_BLOCKING;
143     }
144 
145     int count = NativeUnixSocket.receive(fdesc, buf, pos, remaining, socketAddressBuffer, options,
146         ancillaryDataSupport, 0);
147     if (count == -1) {
148       return count;
149     }
150 
151     if (direct) {
152       if (count < 0) {
153         throw new IllegalStateException();
154       }
155       dst.position(pos + count);
156     } else {
157       int oldLimit = buf.limit();
158       if (count < oldLimit) {
159         buf.limit(count);
160       }
161       try {
162         while (buf.hasRemaining()) {
163           dst.put(buf);
164         }
165       } finally {
166         if (count < oldLimit) {
167           buf.limit(oldLimit);
168         }
169       }
170     }
171     return count;
172   }
173 
174   int write(ByteBuffer src) throws IOException {
175     return write(src, null, 0);
176   }
177 
178   int write(ByteBuffer src, SocketAddress target, int options) throws IOException {
179     int remaining = src.remaining();
180 
181     if (remaining == 0) {
182       return 0;
183     }
184 
185     FileDescriptor fdesc = validFdOrException();
186     final ByteBuffer addressTo;
187     final int addressToLen;
188     if (target == null) {
189       addressTo = null;
190       addressToLen = 0;
191     } else {
192       addressTo = AFSocketAddress.SOCKETADDRESS_BUFFER_TL.get();
193       addressToLen = AFSocketAddress.unwrapAddressDirectBufferInternal(addressTo, target);
194     }
195 
196     // accept "send buffer overflow" as packet loss
197     // and don't retry (which may slow things down quite a bit)
198     if (!blocking) {
199       options |= NativeUnixSocket.OPT_NON_BLOCKING;
200     }
201 
202     int pos = src.position();
203     boolean isDirect = src.isDirect();
204     ByteBuffer buf;
205     int bufPos;
206     if (isDirect) {
207       buf = src;
208       bufPos = pos;
209     } else {
210       buf = getThreadLocalDirectByteBuffer(remaining);
211       remaining = Math.min(remaining, buf.remaining());
212 
213       bufPos = buf.position();
214 
215       while (src.hasRemaining() && buf.hasRemaining()) {
216         buf.put(src);
217       }
218 
219       buf.position(bufPos);
220     }
221     if (datagramMode) {
222       options |= NativeUnixSocket.OPT_DGRAM_MODE;
223     }
224 
225     int written = NativeUnixSocket.send(fdesc, buf, bufPos, remaining, addressTo, addressToLen,
226         options, ancillaryDataSupport);
227     src.position(pos + written);
228 
229     return written;
230   }
231 
232   /**
233    * Returns a per-thread reusable byte buffer for a given capacity.
234    *
235    * If a thread-local buffer currently uses a smaller capacity, the buffer is replaced by a larger
236    * one. If the capacity exceeds a configurable maximum, a new direct buffer is allocated but not
237    * cached (i.e., the previously cached one is kept but not immediately returned to the caller).
238    *
239    * @param capacity The desired capacity.
240    * @return A byte buffer satisfying the requested capacity.
241    */
242   ByteBuffer getThreadLocalDirectByteBuffer(int capacity) {
243     if (capacity > TL_BUFFER_MAX_CAPACITY && TL_BUFFER_MAX_CAPACITY > 0) {
244       // Capacity exceeds configurable maximum limit;
245       // allocate but do not cache direct buffer.
246       // This may incur a performance penalty at the cost of correctness when using such capacities.
247       return ByteBuffer.allocateDirect(capacity);
248     }
249     if (capacity < TL_BUFFER_MIN_CAPACITY) {
250       capacity = TL_BUFFER_MIN_CAPACITY;
251     }
252     ByteBuffer buffer = TL_BUFFER.get();
253     if (buffer == null || capacity > buffer.capacity()) {
254       buffer = ByteBuffer.allocateDirect(capacity);
255       TL_BUFFER.set(buffer);
256     }
257     buffer.clear();
258     return buffer;
259   }
260 
261   void implConfigureBlocking(boolean block) throws IOException {
262     NativeUnixSocket.configureBlocking(validFdOrException(), block);
263     this.blocking = block;
264   }
265 
266   boolean isBlocking() {
267     return blocking;
268   }
269 }