1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.newsclub.net.unix;
19
20 import static java.util.Objects.requireNonNull;
21
22 import java.io.FileDescriptor;
23 import java.io.IOException;
24 import java.net.ProtocolFamily;
25 import java.net.SocketAddress;
26 import java.net.SocketOption;
27 import java.net.StandardProtocolFamily;
28 import java.nio.ByteBuffer;
29 import java.nio.channels.SocketChannel;
30 import java.nio.channels.spi.SelectorProvider;
31 import java.util.Objects;
32 import java.util.Set;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 import org.eclipse.jdt.annotation.NonNull;
36
37 import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
38
39
40
41
42
43
44
45 public abstract class AFSocketChannel<A extends AFSocketAddress> extends SocketChannel implements
46 AFSomeSocket, AFSocketExtensions, AFSomeSocketChannel {
47 private final @NonNull AFSocket<A> afSocket;
48 private final AtomicBoolean connectPending = new AtomicBoolean(false);
49
50
51
52
53
54
55
56 @SuppressWarnings("all")
57 protected AFSocketChannel(AFSocket<A> socket, AFSelectorProvider<A> sp) {
58 super(sp);
59 this.afSocket = Objects.requireNonNull(socket);
60 }
61
62
63
64
65
66
67 protected final AFSocket<A> getAFSocket() {
68 return afSocket;
69 }
70
71
72
73
74
75
76 @FunctionalInterface
77 protected interface AFSocketSupplier<A extends AFSocketAddress> {
78
79
80
81
82
83
84 AFSocket<A> newInstance() throws IOException;
85 }
86
87
88
89
90
91
92
93
94
95
96 protected static final <A extends AFSocketAddress> AFSocketChannel<A> open(
97 AFSocketSupplier<A> supplier) throws IOException {
98 return supplier.newInstance().getChannel();
99 }
100
101
102
103
104
105
106
107
108
109
110 protected static final <A extends AFSocketAddress> AFSocketChannel<A> open(
111 AFSocketSupplier<A> supplier, SocketAddress remote) throws IOException {
112 @SuppressWarnings("resource")
113 AFSocketChannel<A> sc = open(supplier);
114 try {
115 sc.connect(remote);
116 } catch (Throwable x) {
117 try {
118 sc.close();
119 } catch (Throwable suppressed) {
120 x.addSuppressed(suppressed);
121 }
122 throw x;
123 }
124 assert sc.isConnected();
125 return sc;
126 }
127
128 @SuppressWarnings("unchecked")
129 @Override
130 public final <T> T getOption(SocketOption<T> name) throws IOException {
131 if (name instanceof AFSocketOption<?>) {
132 return getAFCore().getOption((AFSocketOption<T>) name);
133 }
134 Integer optionId = SocketOptionsMapper.resolve(name);
135 if (optionId == null) {
136 throw new UnsupportedOperationException("unsupported option");
137 } else {
138 return (T) afSocket.getAFImpl().getOption(optionId);
139 }
140 }
141
142 @Override
143 public final <T> AFSocketChannel<A> setOption(SocketOption<T> name, T value) throws IOException {
144 if (name instanceof AFSocketOption<?>) {
145 getAFCore().setOption((AFSocketOption<T>) name, value);
146 return this;
147 }
148 Integer optionId = SocketOptionsMapper.resolve(name);
149 if (optionId == null) {
150 throw new UnsupportedOperationException("unsupported option");
151 } else {
152 afSocket.getAFImpl().setOption(optionId, value);
153 }
154 return this;
155 }
156
157 @Override
158 public final Set<SocketOption<?>> supportedOptions() {
159 return SocketOptionsMapper.SUPPORTED_SOCKET_OPTIONS;
160 }
161
162 @Override
163 public final AFSocketChannel<A> bind(SocketAddress local) throws IOException {
164 afSocket.bind(local);
165 return this;
166 }
167
168 @Override
169 public final AFSocketChannel<A> shutdownInput() throws IOException {
170 afSocket.getAFImpl().shutdownInput();
171 return this;
172 }
173
174 @Override
175 public final AFSocketChannel<A> shutdownOutput() throws IOException {
176 afSocket.getAFImpl().shutdownOutput();
177 return this;
178 }
179
180 @Override
181 @SuppressFBWarnings("EI_EXPOSE_REP")
182 public final AFSocket<A> socket() {
183 return afSocket;
184 }
185
186 @Override
187 public final boolean isConnected() {
188 boolean connected = afSocket.isConnected();
189 if (connected) {
190 connectPending.set(false);
191 }
192 return connected;
193 }
194
195 @Override
196 public final boolean isConnectionPending() {
197 return connectPending.get();
198 }
199
200 @Override
201 public final boolean connect(SocketAddress remote) throws IOException {
202 boolean complete = false;
203 Exception exception = null;
204 try {
205 begin();
206 boolean connected = afSocket.connect0(remote, 0);
207 if (!connected) {
208 connectPending.set(true);
209 }
210 complete = true;
211 return connected;
212 } catch (IOException e) {
213 throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException(
214 (exception = InterruptibleChannelUtil.handleException(this, e)));
215 } finally {
216 InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
217 }
218 }
219
220 @Override
221 public final boolean finishConnect() throws IOException {
222 if (isConnected()) {
223 return true;
224 } else if (!isConnectionPending()) {
225 return false;
226 }
227
228 boolean complete = false;
229 Exception exception = null;
230 try {
231 begin();
232 boolean connected = NativeUnixSocket.finishConnect(afSocket.getFileDescriptor())
233 || isConnected();
234 if (connected) {
235 connectPending.set(false);
236 }
237 complete = true;
238 return connected;
239 } catch (IOException e) {
240 throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException(
241 (exception = InterruptibleChannelUtil.handleException(this, e)));
242 } finally {
243 InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
244 }
245 }
246
247 @Override
248 public final A getRemoteAddress() throws IOException {
249 return getRemoteSocketAddress();
250 }
251
252 @Override
253 public final A getRemoteSocketAddress() {
254 return afSocket.getRemoteSocketAddress();
255 }
256
257 @Override
258 public final int read(ByteBuffer dst) throws IOException {
259 boolean complete = false;
260 Exception exception = null;
261 try {
262 begin();
263 int read = afSocket.getAFImpl().read(dst, null);
264 complete = true;
265 return read;
266 } catch (IOException e) {
267 throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException(
268 (exception = InterruptibleChannelUtil.handleException(this, e)));
269 } finally {
270 InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
271 }
272 }
273
274 @Override
275 public final long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
276 if (length == 0) {
277 return 0;
278 }
279
280 return read(dsts[offset]);
281 }
282
283 @Override
284 public final long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
285 if (length == 0) {
286 return 0;
287 }
288
289 return write(srcs[offset]);
290 }
291
292 @Override
293 public final int write(ByteBuffer src) throws IOException {
294 boolean complete = false;
295 Exception exception = null;
296 try {
297 begin();
298 int written = afSocket.getAFImpl().write(src);
299 complete = true;
300 return written;
301 } catch (IOException e) {
302 throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException(
303 (exception = InterruptibleChannelUtil.handleException(this, e)));
304 } finally {
305 InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
306 }
307 }
308
309 @Override
310 public final A getLocalAddress() throws IOException {
311 return getLocalSocketAddress();
312 }
313
314 @Override
315 public final A getLocalSocketAddress() {
316 return afSocket.getLocalSocketAddress();
317 }
318
319 @Override
320 protected final void implCloseSelectableChannel() throws IOException {
321 afSocket.close();
322 }
323
324 @Override
325 protected final void implConfigureBlocking(boolean block) throws IOException {
326 getAFCore().implConfigureBlocking(block);
327 }
328
329 @Override
330 public final int getAncillaryReceiveBufferSize() {
331 return afSocket.getAncillaryReceiveBufferSize();
332 }
333
334 @Override
335 public final void setAncillaryReceiveBufferSize(int size) {
336 afSocket.setAncillaryReceiveBufferSize(size);
337 }
338
339 @Override
340 public final void ensureAncillaryReceiveBufferSize(int minSize) {
341 afSocket.ensureAncillaryReceiveBufferSize(minSize);
342 }
343
344 final AFSocketCore getAFCore() {
345 return afSocket.getAFImpl().getCore();
346 }
347
348 @Override
349 public final FileDescriptor getFileDescriptor() throws IOException {
350 return afSocket.getFileDescriptor();
351 }
352
353 @Override
354 public final String toString() {
355 return super.toString() + afSocket.toStringSuffix();
356 }
357
358 @Override
359 public void setShutdownOnClose(boolean enabled) {
360 getAFCore().setShutdownOnClose(enabled);
361 }
362
363
364
365
366
367
368
369
370
371
372
373
374
375 public static SocketChannel open(ProtocolFamily family) throws IOException {
376 requireNonNull(family);
377
378 if (family instanceof AFProtocolFamily) {
379 return ((AFProtocolFamily) family).openSocketChannel();
380 } else if ("UNIX".equals(family.name())) {
381 return AFUNIXSocketChannel.open();
382 } else if (family instanceof StandardProtocolFamily) {
383 return SocketChannel.open();
384 } else {
385 throw new UnsupportedOperationException("Protocol family not supported");
386 }
387 }
388 }