1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.newsclub.net.unix;
19
20 import java.io.FileDescriptor;
21 import java.io.IOException;
22 import java.net.SocketAddress;
23 import java.net.SocketException;
24 import java.nio.ByteBuffer;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27
28
29
30
31
32 class AFCore extends CleanableState {
33 private static final ThreadLocal<ByteBuffer> TL_BUFFER = new ThreadLocal<>();
34
35 private static final String PROP_TL_BUFFER_MAX_CAPACITY =
36 "org.newsclub.net.unix.thread-local-buffer.max-capacity";
37
38 private static final int TL_BUFFER_MIN_CAPACITY = 8192;
39 private static final int TL_BUFFER_MAX_CAPACITY = Integer.parseInt(System.getProperty(
40 PROP_TL_BUFFER_MAX_CAPACITY, Integer.toString(1 * 1024 * 1024)));
41
42 private final AtomicBoolean closed = new AtomicBoolean(false);
43
44 final FileDescriptor fd;
45 final AncillaryDataSupport ancillaryDataSupport;
46
47 private final boolean datagramMode;
48
49 private boolean blocking = true;
50 private boolean cleanFd = true;
51
52 AFCore(Object observed, FileDescriptor fd, AncillaryDataSupport ancillaryDataSupport,
53 boolean datagramMode) {
54 super(observed);
55 this.datagramMode = datagramMode;
56 this.ancillaryDataSupport = ancillaryDataSupport;
57
58 this.fd = fd == null ? new FileDescriptor() : fd;
59 }
60
61 AFCore(Object observed, FileDescriptor fd) {
62 this(observed, fd, null, false);
63 }
64
65 @Override
66 protected final void doClean() {
67 if (fd != null && fd.valid() && cleanFd) {
68 try {
69 doClose();
70 } catch (IOException e) {
71
72 }
73 }
74 if (ancillaryDataSupport != null) {
75 ancillaryDataSupport.close();
76 }
77 }
78
79 void disableCleanFd() {
80 this.cleanFd = false;
81 }
82
83 boolean isClosed() {
84 return closed.get();
85 }
86
87 void doClose() throws IOException {
88 if (closed.compareAndSet(false, true)) {
89 NativeUnixSocket.close(fd);
90 }
91 }
92
93 FileDescriptor validFdOrException() throws SocketException {
94 FileDescriptor fdesc = validFd();
95 if (fdesc == null) {
96 closed.set(true);
97 throw new SocketClosedException("Not open");
98 }
99 return fdesc;
100 }
101
102 synchronized FileDescriptor validFd() {
103 if (isClosed()) {
104 return null;
105 }
106 FileDescriptor descriptor = this.fd;
107 if (descriptor != null) {
108 if (descriptor.valid()) {
109 return descriptor;
110 }
111 }
112 return null;
113 }
114
115 int read(ByteBuffer dst) throws IOException {
116 return read(dst, null, 0);
117 }
118
119 int read(ByteBuffer dst, ByteBuffer socketAddressBuffer, int options) throws IOException {
120 int remaining = dst.remaining();
121 if (remaining == 0) {
122 return 0;
123 }
124 FileDescriptor fdesc = validFdOrException();
125
126 int dstPos = dst.position();
127
128 ByteBuffer buf;
129 int pos;
130
131 boolean direct = dst.isDirect();
132 if (direct) {
133 buf = dst;
134 pos = dstPos;
135 } else {
136 buf = getThreadLocalDirectByteBuffer(remaining);
137 remaining = Math.min(remaining, buf.remaining());
138 pos = buf.position();
139 }
140
141 if (!blocking) {
142 options |= NativeUnixSocket.OPT_NON_BLOCKING;
143 }
144
145 int count = NativeUnixSocket.receive(fdesc, buf, pos, remaining, socketAddressBuffer, options,
146 ancillaryDataSupport, 0);
147 if (count == -1) {
148 return count;
149 }
150
151 if (direct) {
152 if (count < 0) {
153 throw new IllegalStateException();
154 }
155 dst.position(pos + count);
156 } else {
157 int oldLimit = buf.limit();
158 if (count < oldLimit) {
159 buf.limit(count);
160 }
161 try {
162 while (buf.hasRemaining()) {
163 dst.put(buf);
164 }
165 } finally {
166 if (count < oldLimit) {
167 buf.limit(oldLimit);
168 }
169 }
170 }
171 return count;
172 }
173
174 int write(ByteBuffer src) throws IOException {
175 return write(src, null, 0);
176 }
177
178 int write(ByteBuffer src, SocketAddress target, int options) throws IOException {
179 int remaining = src.remaining();
180
181 if (remaining == 0) {
182 return 0;
183 }
184
185 FileDescriptor fdesc = validFdOrException();
186 final ByteBuffer addressTo;
187 final int addressToLen;
188 if (target == null) {
189 addressTo = null;
190 addressToLen = 0;
191 } else {
192 addressTo = AFSocketAddress.SOCKETADDRESS_BUFFER_TL.get();
193 addressToLen = AFSocketAddress.unwrapAddressDirectBufferInternal(addressTo, target);
194 }
195
196
197
198 if (!blocking) {
199 options |= NativeUnixSocket.OPT_NON_BLOCKING;
200 }
201
202 int pos = src.position();
203 boolean isDirect = src.isDirect();
204 ByteBuffer buf;
205 int bufPos;
206 if (isDirect) {
207 buf = src;
208 bufPos = pos;
209 } else {
210 buf = getThreadLocalDirectByteBuffer(remaining);
211 remaining = Math.min(remaining, buf.remaining());
212
213 bufPos = buf.position();
214
215 while (src.hasRemaining() && buf.hasRemaining()) {
216 buf.put(src);
217 }
218
219 buf.position(bufPos);
220 }
221 if (datagramMode) {
222 options |= NativeUnixSocket.OPT_DGRAM_MODE;
223 }
224
225 int written = NativeUnixSocket.send(fdesc, buf, bufPos, remaining, addressTo, addressToLen,
226 options, ancillaryDataSupport);
227 src.position(pos + written);
228
229 return written;
230 }
231
232
233
234
235
236
237
238
239
240
241
242 ByteBuffer getThreadLocalDirectByteBuffer(int capacity) {
243 if (capacity > TL_BUFFER_MAX_CAPACITY && TL_BUFFER_MAX_CAPACITY > 0) {
244
245
246
247 return ByteBuffer.allocateDirect(capacity);
248 }
249 if (capacity < TL_BUFFER_MIN_CAPACITY) {
250 capacity = TL_BUFFER_MIN_CAPACITY;
251 }
252 ByteBuffer buffer = TL_BUFFER.get();
253 if (buffer == null || capacity > buffer.capacity()) {
254 buffer = ByteBuffer.allocateDirect(capacity);
255 TL_BUFFER.set(buffer);
256 }
257 buffer.clear();
258 return buffer;
259 }
260
261 void implConfigureBlocking(boolean block) throws IOException {
262 NativeUnixSocket.configureBlocking(validFdOrException(), block);
263 this.blocking = block;
264 }
265
266 boolean isBlocking() {
267 return blocking;
268 }
269 }