1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.newsclub.net.unix;
19
20 import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD;
21 import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD_WR;
22 import static org.newsclub.net.unix.NativeUnixSocket.SHUT_WR;
23
24 import java.io.EOFException;
25 import java.io.FileDescriptor;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.InterruptedIOException;
29 import java.io.OutputStream;
30 import java.net.InetAddress;
31 import java.net.SocketAddress;
32 import java.net.SocketException;
33 import java.net.SocketImpl;
34 import java.net.SocketOption;
35 import java.net.SocketOptions;
36 import java.net.SocketTimeoutException;
37 import java.nio.ByteBuffer;
38 import java.nio.channels.SelectionKey;
39 import java.util.Objects;
40 import java.util.Set;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43
44 import org.eclipse.jdt.annotation.NonNull;
45 import org.eclipse.jdt.annotation.Nullable;
46 import org.newsclub.net.unix.pool.MutableHolder;
47 import org.newsclub.net.unix.pool.ObjectPool.Lease;
48
49
50
51
52
53
54
55 @SuppressWarnings({
56 "PMD.CyclomaticComplexity", "PMD.CouplingBetweenObjects",
57 "UnsafeFinalization" })
58 public abstract class AFSocketImpl<A extends AFSocketAddress> extends SocketImplShim {
59 private static final int SHUTDOWN_RD_WR = (1 << SHUT_RD) | (1 << SHUT_WR);
60
61 private final AFSocketStreamCore core;
62 final AncillaryDataSupport ancillaryDataSupport = new AncillaryDataSupport();
63
64 private final AtomicBoolean bound = new AtomicBoolean(false);
65 private Boolean createType = null;
66 private final AtomicBoolean connected = new AtomicBoolean(false);
67
68 private volatile boolean closedInputStream = false;
69 private volatile boolean closedOutputStream = false;
70
71 private final AFInputStream in;
72 private final AFOutputStream out;
73
74 private boolean reuseAddr = true;
75
76 private final AtomicInteger socketTimeout = new AtomicInteger(0);
77 private final AFAddressFamily<A> addressFamily;
78
79 private int shutdownState = 0;
80
81 private AFSocketImplExtensions<A> implExtensions = null;
82
83 private final AtomicBoolean closed = new AtomicBoolean(false);
84
85
86
87
88
89
90
91 static final class AFSocketStreamCore extends AFSocketCore {
92 AFSocketStreamCore(AFSocketImpl<?> observed, FileDescriptor fd,
93 AncillaryDataSupport ancillaryDataSupport, AFAddressFamily<?> af) {
94 super(observed, fd, ancillaryDataSupport, af, false);
95 }
96
97 void createSocket(FileDescriptor fdTarget, AFSocketType type) throws IOException {
98 NativeUnixSocket.createSocket(fdTarget, addressFamily().getDomain(), type.getId());
99 }
100
101
102
103
104
105 @Override
106 protected void unblockAccepts() {
107 if (socketAddress == null || socketAddress.getBytes() == null || inode.get() < 0) {
108 return;
109 }
110 if (!hasPendingAccepts()) {
111 return;
112 }
113 try {
114 ThreadUtil.runOnSystemThread(this::unblockAccepts0);
115 } catch (InterruptedException e) {
116
117 }
118 }
119
120 private void unblockAccepts0() {
121 while (hasPendingAccepts()) {
122 try {
123 FileDescriptor tmpFd = new FileDescriptor();
124
125 try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
126 createSocket(tmpFd, AFSocketType.SOCK_STREAM);
127 ByteBuffer ab = abLease.get();
128 NativeUnixSocket.connect(ab, ab.limit(), tmpFd, inode.get());
129 } catch (IOException e) {
130
131
132 return;
133 }
134 if (isShutdownOnClose()) {
135 try {
136 NativeUnixSocket.shutdown(tmpFd, SHUT_RD_WR);
137 } catch (Exception e) {
138
139 }
140 }
141 try {
142 NativeUnixSocket.close(tmpFd);
143 } catch (Exception e) {
144
145 }
146 } catch (RuntimeException e) {
147
148 }
149
150
151 try {
152 Thread.sleep(5);
153 } catch (InterruptedException e) {
154
155 }
156 }
157 }
158 }
159
160
161
162
163
164
165
166 protected AFSocketImpl(AFAddressFamily<@NonNull A> addressFamily, FileDescriptor fdObj) {
167 super();
168 this.addressFamily = addressFamily;
169 this.address = InetAddress.getLoopbackAddress();
170 this.core = new AFSocketStreamCore(this, fdObj, ancillaryDataSupport, addressFamily);
171 this.fd = core.fd;
172 this.in = newInputStream();
173 this.out = newOutputStream();
174 }
175
176
177
178
179
180
181 protected final AFInputStream newInputStream() {
182 return new AFInputStreamImpl();
183 }
184
185
186
187
188
189
190 protected final AFOutputStream newOutputStream() {
191 return new AFOutputStreamImpl();
192 }
193
194 final FileDescriptor getFD() {
195 return fd;
196 }
197
198
199 final boolean isConnected() {
200 if (connected.get()) {
201 return true;
202 }
203 if (isClosed()) {
204 return false;
205 }
206 if (core.isConnected(false)) {
207 connected.set(true);
208 return true;
209 }
210 return false;
211 }
212
213 final boolean isBound() {
214 if (bound.get()) {
215 return true;
216 }
217 if (isClosed()) {
218 return false;
219 }
220 if (core.isConnected(true)) {
221 bound.set(true);
222 return true;
223 }
224 return false;
225 }
226
227 final AFSocketCore getCore() {
228 return core;
229 }
230
231 boolean isClosed() {
232 return closed.get() || core.isClosed();
233 }
234
235
236 @Override
237 protected final void accept(SocketImpl socket) throws IOException {
238 accept0(socket);
239 }
240
241 @SuppressWarnings({
242 "Finally" ,
243 "PMD.CognitiveComplexity", "PMD.NPathComplexity", "PMD.NcssCount"})
244 final boolean accept0(SocketImpl socket) throws IOException {
245 FileDescriptor fdesc = core.validFdOrException();
246 if (isClosed()) {
247 throw new SocketClosedException();
248 } else if (!isBound()) {
249 throw new NotBoundSocketException();
250 }
251
252 AFSocketAddress socketAddress = core.socketAddress;
253 AFSocketAddress boundSocketAddress = getLocalSocketAddress();
254 if (boundSocketAddress != null) {
255
256 core.socketAddress = socketAddress = boundSocketAddress;
257 }
258
259 if (socketAddress == null) {
260 throw new NotBoundSocketException();
261 }
262
263 @SuppressWarnings("unchecked")
264 final AFSocketImpl<A> si = (AFSocketImpl<A>) socket;
265 core.incPendingAccepts();
266
267 final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
268 .isVirtualBlocking();
269
270 long now = virtualBlocking ? System.currentTimeMillis() : 0;
271 boolean park = false;
272 virtualThreadLoop : do {
273 if (virtualBlocking) {
274 if (park) {
275 VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_ACCEPT, now,
276 socketTimeout::get, this::close);
277 }
278 }
279
280 try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
281 ByteBuffer ab = abLease.get();
282
283 SocketException caught = null;
284 try {
285 boolean success;
286 if (virtualBlocking) {
287 core.configureVirtualBlocking(true);
288 }
289 try {
290 success = NativeUnixSocket.accept(ab, ab.limit(), fdesc, si.fd, core.inode.get(),
291 socketTimeout.get());
292 } catch (SocketTimeoutException e) {
293 if (virtualBlocking) {
294
295 park = true;
296 continue virtualThreadLoop;
297 } else {
298 throw e;
299 }
300 } finally {
301 if (virtualBlocking) {
302 core.configureVirtualBlocking(false);
303 }
304 }
305
306 if (virtualBlocking) {
307 if (success) {
308
309 NativeUnixSocket.configureBlocking(si.fd, core.isBlocking());
310 } else {
311
312 park = true;
313 continue virtualThreadLoop;
314 }
315 }
316 } catch (NotConnectedSocketException | SocketClosedException
317 | BrokenPipeSocketException e) {
318 try {
319 close();
320 } catch (Exception e2) {
321 e.addSuppressed(e2);
322 }
323 throw e;
324 } catch (SocketException e) {
325 caught = e;
326 } finally {
327 if (!isBound() || isClosed()) {
328 if (getCore().isShutdownOnClose()) {
329 try {
330 NativeUnixSocket.shutdown(si.fd, SHUT_RD_WR);
331 } catch (Exception e) {
332
333 }
334 }
335 try {
336 NativeUnixSocket.close(si.fd);
337 } catch (Exception e) {
338
339 }
340 if (caught != null) {
341 throw caught;
342 } else {
343 throw new BrokenPipeSocketException("Socket is closed");
344 }
345 } else if (caught != null) {
346 throw caught;
347 }
348 }
349 } finally {
350 core.decPendingAccepts();
351 }
352 break;
353 } while (true);
354
355 if (!si.fd.valid()) {
356 return false;
357 }
358
359 si.setSocketAddress(socketAddress);
360 si.connected.set(true);
361
362 return true;
363 }
364
365 final void setSocketAddress(AFSocketAddress socketAddress) {
366 if (socketAddress == null) {
367 this.core.socketAddress = null;
368 this.address = null;
369 this.localport = -1;
370 } else {
371 this.core.socketAddress = socketAddress;
372 this.address = socketAddress.getAddress();
373 if (this.localport <= 0) {
374 this.localport = socketAddress.getPort();
375 }
376 }
377 }
378
379 @Override
380 protected final int available() throws IOException {
381 FileDescriptor fdesc = core.validFdOrException();
382 try (Lease<MutableHolder<ByteBuffer>> lease = core.getPrivateDirectByteBuffer(0)) {
383 return NativeUnixSocket.available(fdesc, lease.get().get());
384 }
385 }
386
387 final void bind(SocketAddress addr, int options) throws IOException {
388 if (addr == null) {
389 addr = addressFamily.nullBindAddress();
390 if (addr == null) {
391 throw new UnsupportedOperationException("Cannot bind to null address");
392 }
393 }
394
395 if (addr == AFSocketAddress.INTERNAL_DUMMY_BIND) {
396 bound.set(true);
397 core.inode.set(0);
398 return;
399 }
400
401 addr = AFSocketAddress.mapOrFail(addr, addressFamily.getSocketAddressClass());
402 bound.set(true);
403
404 AFSocketAddress socketAddress = Objects.requireNonNull((AFSocketAddress) addr);
405
406 this.setSocketAddress(socketAddress);
407 try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
408 ByteBuffer ab = abLease.get();
409 long inode = NativeUnixSocket.bind(ab, ab.limit(), fd, options);
410 core.inode.set(inode);
411 }
412 core.validFdOrException();
413 }
414
415 @Override
416 @SuppressWarnings("hiding")
417 protected final void bind(InetAddress host, int port) throws IOException {
418
419 }
420
421 private void checkClose() throws IOException {
422 if (closedInputStream && closedOutputStream) {
423 close();
424 }
425 }
426
427 @Override
428 protected final void close() throws IOException {
429 this.closed.set(true);
430 try {
431 shutdown();
432 } catch (NotConnectedSocketException | SocketClosedException e) {
433
434 }
435
436 core.runCleaner();
437 }
438
439 @Override
440 @SuppressWarnings("hiding")
441 protected final void connect(String host, int port) throws IOException {
442 throw new SocketException("Cannot bind to this type of address: " + InetAddress.class);
443 }
444
445 @Override
446 @SuppressWarnings("hiding")
447 protected final void connect(InetAddress address, int port) throws IOException {
448 throw new SocketException("Cannot bind to this type of address: " + InetAddress.class);
449 }
450
451 @Override
452 protected final void connect(SocketAddress addr, int connectTimeout) throws IOException {
453 connect0(addr, connectTimeout);
454 }
455
456 @SuppressWarnings({"PMD.CognitiveComplexity", "PMD.NPathComplexity", "PMD.NcssCount"})
457 final boolean connect0(SocketAddress addr, int connectTimeout) throws IOException {
458 if (addr == AFSocketAddress.INTERNAL_DUMMY_CONNECT) {
459 this.connected.set(true);
460 return true;
461 } else if (addr == AFSocketAddress.INTERNAL_DUMMY_DONT_CONNECT) {
462 return false;
463 }
464
465 addr = AFSocketAddress.mapOrFail(addr, addressFamily.getSocketAddressClass());
466 AFSocketAddress socketAddress = Objects.requireNonNull((AFSocketAddress) addr);
467
468 final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
469 .isVirtualBlocking();
470 long now = virtualBlocking ? System.currentTimeMillis() : 0;
471
472
473
474
475 AFSupplier<Integer> virtualConnectTimeout = null;
476
477 if (virtualBlocking) {
478 core.configureVirtualBlocking(true);
479 }
480 boolean park = false;
481 try {
482 virtualThreadLoop : do {
483 try (Lease<ByteBuffer> abLease = socketAddress.getNativeAddressDirectBuffer()) {
484 ByteBuffer ab = abLease.get();
485 boolean success = false;
486 boolean ignoreSpuriousTimeout = true;
487 do {
488 if (virtualBlocking) {
489 if (virtualConnectTimeout != null) {
490 if (park) {
491 VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fd, SelectionKey.OP_CONNECT,
492 now, virtualConnectTimeout, this::close);
493 }
494 } else {
495 Thread.yield();
496 }
497 }
498
499 if (virtualBlocking) {
500 core.configureVirtualBlocking(true);
501 }
502 try {
503 success = NativeUnixSocket.connect(ab, ab.limit(), fd, -2);
504 if (!success && virtualBlocking) {
505
506 if (virtualConnectTimeout == null) {
507 virtualConnectTimeout = () -> connectTimeout;
508 }
509 park = true;
510 continue virtualThreadLoop;
511 }
512 break;
513 } catch (SocketTimeoutException e) {
514
515
516 if (ignoreSpuriousTimeout) {
517 Object o = getOption(SocketOptions.SO_TIMEOUT);
518 if (o instanceof Integer) {
519 if (((Integer) o) == 0) {
520 ignoreSpuriousTimeout = false;
521 continue;
522 }
523 } else if (o == null) {
524 ignoreSpuriousTimeout = false;
525 continue;
526 }
527 }
528 throw e;
529 } catch (NotConnectedSocketException | SocketClosedException
530 | BrokenPipeSocketException e) {
531 try {
532 close();
533 } catch (Exception e2) {
534 e.addSuppressed(e2);
535 }
536 throw e;
537 } catch (SocketException e) {
538 if (virtualBlocking) {
539 Thread.yield();
540 }
541 throw e;
542 } finally {
543 if (virtualBlocking) {
544 core.configureVirtualBlocking(false);
545 }
546 }
547 } while (ThreadUtil.checkNotInterruptedOrThrow());
548 if (success) {
549 setSocketAddress(socketAddress);
550 this.connected.set(true);
551 }
552 core.validFdOrException();
553 return success;
554 }
555 } while (true);
556 } finally {
557 if (virtualBlocking) {
558 core.configureVirtualBlocking(true);
559 }
560 }
561 }
562
563 @Override
564 protected final void create(boolean stream) throws IOException {
565 if (isClosed()) {
566 throw new SocketException("Already closed");
567 }
568 if (fd.valid()) {
569 if (createType != null) {
570 if (createType.booleanValue() != stream) {
571 throw new IllegalStateException("Already created with different mode");
572 }
573 } else {
574 createType = stream;
575 }
576 return;
577 }
578 createType = stream;
579 createSocket(fd, stream ? AFSocketType.SOCK_STREAM : AFSocketType.SOCK_DGRAM);
580 }
581
582 @Override
583 protected final AFInputStream getInputStream() throws IOException {
584 if (!isConnected() && !isBound()) {
585 close();
586 throw new SocketClosedException("Not connected/not bound");
587 }
588 core.validFdOrException();
589 return in;
590 }
591
592 @Override
593 protected final AFOutputStream getOutputStream() throws IOException {
594 if (!isClosed() && !isBound()) {
595 close();
596 throw new SocketClosedException("Not connected/not bound");
597 }
598 core.validFdOrException();
599 return out;
600 }
601
602 @Override
603 protected final void listen(int backlog) throws IOException {
604 FileDescriptor fdesc = core.validFdOrException();
605 if (backlog <= 0) {
606 backlog = 50;
607 }
608 NativeUnixSocket.listen(fdesc, backlog);
609 }
610
611 @Override
612 protected final boolean supportsUrgentData() {
613 return false;
614 }
615
616 @Override
617 protected final void sendUrgentData(int data) throws IOException {
618 throw new UnsupportedOperationException();
619 }
620
621 private final class AFInputStreamImpl extends AFInputStream {
622 private volatile boolean streamClosed = false;
623 private final AtomicBoolean eofReached = new AtomicBoolean(false);
624
625 private final int defaultOpt = (core.isBlocking() ? 0 : NativeUnixSocket.OPT_NON_BLOCKING);
626
627 @SuppressWarnings("PMD.CognitiveComplexity")
628 @Override
629 public int read(byte[] buf, int off, int len) throws IOException {
630 if (streamClosed) {
631 throw new SocketClosedException("This InputStream has already been closed.");
632 }
633 if (eofReached.get()) {
634 return -1;
635 }
636
637 FileDescriptor fdesc = core.validFdOrException();
638 if (len == 0) {
639 return 0;
640 } else if (off < 0 || len < 0 || (len > buf.length - off)) {
641 throw new IndexOutOfBoundsException();
642 }
643
644 final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
645 .isVirtualBlocking();
646 final long now;
647 final int opt;
648 if (virtualBlocking) {
649 now = System.currentTimeMillis();
650 opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
651 } else {
652 now = 0;
653 opt = defaultOpt;
654 }
655
656 int read;
657
658 boolean park = false;
659 virtualThreadLoop : do {
660 if (virtualBlocking) {
661 if (park) {
662 VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_READ, now,
663 socketTimeout::get, this::forceCloseSocket);
664 }
665 core.configureVirtualBlocking(true);
666 }
667
668 try {
669 read = NativeUnixSocket.read(fdesc, buf, off, len, opt, ancillaryDataSupport,
670 socketTimeout.get());
671 if (read == -2) {
672 if (virtualBlocking) {
673
674 park = true;
675 continue virtualThreadLoop;
676 } else {
677 read = 0;
678 }
679 }
680 } catch (SocketTimeoutException e) {
681 if (virtualBlocking) {
682
683 park = true;
684 continue virtualThreadLoop;
685 } else {
686 throw e;
687 }
688 } catch (EOFException e) {
689 eofReached.set(true);
690 throw e;
691 } finally {
692 if (virtualBlocking) {
693 core.configureVirtualBlocking(false);
694 }
695 }
696 break;
697 } while (true);
698
699 return read;
700 }
701
702 @SuppressWarnings("PMD.CognitiveComplexity")
703 @Override
704 public int read() throws IOException {
705 FileDescriptor fdesc = core.validFdOrException();
706
707 if (eofReached.get()) {
708 return -1;
709 }
710
711
712 final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
713 .isVirtualBlocking();
714 final long now;
715 final int opt;
716 if (virtualBlocking) {
717 now = System.currentTimeMillis();
718 opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
719 } else {
720 now = 0;
721 opt = defaultOpt;
722 }
723
724 boolean park = false;
725 virtualThreadLoop : do {
726 if (virtualBlocking) {
727 if (park) {
728 VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_READ, now,
729 socketTimeout::get, this::forceCloseSocket);
730 }
731 core.configureVirtualBlocking(true);
732 }
733
734 try {
735 int byteRead = NativeUnixSocket.read(fdesc, null, 0, 1, opt, ancillaryDataSupport,
736 socketTimeout.get());
737 if (byteRead < 0) {
738 if (byteRead == -2) {
739 if (virtualBlocking) {
740
741 park = true;
742 continue virtualThreadLoop;
743 } else {
744 byteRead = -1;
745 }
746 }
747 eofReached.set(true);
748 return -1;
749 } else {
750 return byteRead;
751 }
752 } catch (SocketTimeoutException e) {
753 if (virtualBlocking) {
754
755 park = true;
756 continue virtualThreadLoop;
757 } else {
758 throw e;
759 }
760 } finally {
761 if (virtualBlocking) {
762 core.configureVirtualBlocking(false);
763 }
764 }
765 } while (true);
766
767
768 }
769
770 private void forceCloseSocket() throws IOException {
771 closedOutputStream = true;
772 close();
773 }
774
775 @Override
776 public synchronized void close() throws IOException {
777 if (streamClosed || isClosed()) {
778 return;
779 }
780 streamClosed = true;
781 FileDescriptor fdesc = core.validFd();
782 if (fdesc != null && getCore().isShutdownOnClose()) {
783 NativeUnixSocket.shutdown(fdesc, SHUT_RD);
784 }
785
786 closedInputStream = true;
787 checkClose();
788 }
789
790 @Override
791 public int available() throws IOException {
792 if (streamClosed) {
793 throw new SocketClosedException("This InputStream has already been closed.");
794 }
795
796 return AFSocketImpl.this.available();
797 }
798
799 @Override
800 public FileDescriptor getFileDescriptor() throws IOException {
801 return getFD();
802 }
803
804 }
805
806 private static boolean checkWriteInterruptedException(int bytesTransferred)
807 throws InterruptedIOException {
808 if (Thread.currentThread().isInterrupted()) {
809 InterruptedIOException ex = new InterruptedIOException("write");
810 ex.bytesTransferred = bytesTransferred;
811 throw ex;
812 }
813 return true;
814 }
815
816 private final class AFOutputStreamImpl extends AFOutputStream {
817 private volatile boolean streamClosed = false;
818
819 private final int defaultOpt = (core.isBlocking() ? 0 : NativeUnixSocket.OPT_NON_BLOCKING);
820
821 @SuppressWarnings("PMD.CognitiveComplexity")
822 @Override
823 public void write(int oneByte) throws IOException {
824 FileDescriptor fdesc = core.validFdOrException();
825
826 final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
827 .isVirtualBlocking();
828 final long now;
829 final int opt;
830 if (virtualBlocking) {
831 now = System.currentTimeMillis();
832 opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
833 } else {
834 now = 0;
835 opt = defaultOpt;
836 }
837
838 boolean park = false;
839 virtualThreadLoop : do {
840 if (virtualBlocking) {
841 if (park) {
842 VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
843 socketTimeout::get, this::forceCloseSocket);
844 }
845 core.configureVirtualBlocking(true);
846 }
847
848 try {
849 int written;
850 do {
851 written = NativeUnixSocket.write(fdesc, null, oneByte, 1, opt, ancillaryDataSupport);
852 if (written != 0) {
853 break;
854 }
855 if (virtualBlocking) {
856 park = true;
857 continue virtualThreadLoop;
858 }
859 } while (checkWriteInterruptedException(0));
860 } catch (NotConnectedSocketException | SocketClosedException
861 | BrokenPipeSocketException e) {
862 try {
863 forceCloseSocket();
864 } catch (Exception e2) {
865 e.addSuppressed(e2);
866 }
867 throw e;
868 } catch (SocketTimeoutException e) {
869 if (virtualBlocking) {
870
871 park = true;
872 continue virtualThreadLoop;
873 } else {
874 throw e;
875 }
876 } finally {
877 if (virtualBlocking) {
878 core.configureVirtualBlocking(false);
879 }
880 }
881 break;
882 } while (true);
883 }
884
885 @SuppressWarnings({"PMD.CognitiveComplexity", "PMD.NPathComplexity"})
886 @Override
887 public void write(byte[] buf, int off, int len) throws IOException {
888 if (streamClosed) {
889 throw new SocketException("This OutputStream has already been closed.");
890 }
891 if (len < 0 || off < 0 || len > buf.length - off) {
892 throw new IndexOutOfBoundsException();
893 }
894 FileDescriptor fdesc = core.validFdOrException();
895
896
897
898 if (len == 0 && !AFSocket.supports(AFSocketCapability.CAPABILITY_ZERO_LENGTH_SEND)) {
899 return;
900 }
901
902 final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && core.isBlocking()) || core
903 .isVirtualBlocking();
904 final long now;
905 final int opt;
906 if (virtualBlocking) {
907 now = System.currentTimeMillis();
908 opt = defaultOpt | NativeUnixSocket.OPT_NON_BLOCKING;
909 } else {
910 now = 0;
911 opt = defaultOpt;
912 }
913
914 int writtenTotal = 0;
915 do {
916 boolean park = false;
917 virtualThreadLoop : do {
918 if (virtualBlocking) {
919 if (park) {
920 VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
921 socketTimeout::get, this::forceCloseSocket);
922 }
923 core.configureVirtualBlocking(true);
924 }
925
926 final int written;
927 try {
928 written = NativeUnixSocket.write(fdesc, buf, off, len, opt, ancillaryDataSupport);
929 if (written == 0 && virtualBlocking) {
930
931 park = true;
932 continue virtualThreadLoop;
933 }
934 if (written < 0) {
935 if (len == 0) {
936
937
938
939
940
941
942
943
944 return;
945 } else {
946 throw new IOException("Unspecific error while writing");
947 }
948 }
949 } catch (NotConnectedSocketException | SocketClosedException
950 | BrokenPipeSocketException e) {
951 try {
952 forceCloseSocket();
953 } catch (Exception e2) {
954 e.addSuppressed(e2);
955 }
956 throw e;
957 } catch (SocketTimeoutException e) {
958 if (virtualBlocking) {
959
960 park = true;
961 continue virtualThreadLoop;
962 } else {
963 throw e;
964 }
965 } finally {
966 if (virtualBlocking) {
967 core.configureVirtualBlocking(false);
968 }
969 }
970
971 len -= written;
972 off += written;
973 writtenTotal += written;
974 break;
975 } while (true);
976
977 } while (len > 0 && checkWriteInterruptedException(writtenTotal));
978 }
979
980 private void forceCloseSocket() throws IOException {
981 closedInputStream = true;
982 close();
983 }
984
985 @Override
986 public synchronized void close() throws IOException {
987 if (streamClosed || isClosed()) {
988 return;
989 }
990 streamClosed = true;
991 FileDescriptor fdesc = core.validFd();
992 if (fdesc != null && getCore().isShutdownOnClose()) {
993 NativeUnixSocket.shutdown(fdesc, SHUT_WR);
994 }
995 closedOutputStream = true;
996 checkClose();
997 }
998
999 @Override
1000 public FileDescriptor getFileDescriptor() throws IOException {
1001 return getFD();
1002 }
1003 }
1004
1005 @Override
1006 public final String toString() {
1007 return super.toString() + "[fd=" + fd + "; addr=" + this.core.socketAddress + "; connected="
1008 + connected + "; bound=" + bound + "]";
1009 }
1010
1011 private static int expectInteger(Object value) throws SocketException {
1012 if (value == null) {
1013 throw (SocketException) new SocketException("Value must not be null").initCause(
1014 new NullPointerException());
1015 }
1016 try {
1017 return (Integer) value;
1018 } catch (final ClassCastException e) {
1019 throw (SocketException) new SocketException("Unsupported value: " + value).initCause(e);
1020 }
1021 }
1022
1023 private static int expectBoolean(Object value) throws SocketException {
1024 if (value == null) {
1025 throw (SocketException) new SocketException("Value must not be null").initCause(
1026 new NullPointerException());
1027 }
1028 try {
1029 return ((Boolean) value) ? 1 : 0;
1030 } catch (final ClassCastException e) {
1031 throw (SocketException) new SocketException("Unsupported value: " + value).initCause(e);
1032 }
1033 }
1034
1035 @Override
1036 public Object getOption(int optID) throws SocketException {
1037 return getOption0(optID);
1038 }
1039
1040 private Object getOption0(int optID) throws SocketException {
1041 if (isClosed()) {
1042 throw new SocketException("Socket is closed");
1043 }
1044 if (optID == SocketOptions.SO_REUSEADDR) {
1045 return reuseAddr;
1046 }
1047
1048 FileDescriptor fdesc = core.validFdOrException();
1049 return getOptionDefault(fdesc, optID, socketTimeout, addressFamily);
1050 }
1051
1052 static final Object getOptionDefault(FileDescriptor fdesc, int optID, AtomicInteger acceptTimeout,
1053 AFAddressFamily<?> af) throws SocketException {
1054 try {
1055 switch (optID) {
1056 case SocketOptions.SO_KEEPALIVE:
1057 try {
1058 return (NativeUnixSocket.getSocketOptionInt(fdesc, optID) != 0);
1059 } catch (SocketException e) {
1060
1061 return false;
1062 }
1063 case SocketOptions.TCP_NODELAY:
1064 return (NativeUnixSocket.getSocketOptionInt(fdesc, optID) != 0);
1065 case SocketOptions.SO_TIMEOUT:
1066 int v = Math.max(NativeUnixSocket.getSocketOptionInt(fdesc, 0x1005), NativeUnixSocket
1067 .getSocketOptionInt(fdesc, 0x1006));
1068 if (v == -1) {
1069
1070 return 0;
1071 }
1072 return Math.max((acceptTimeout == null ? 0 : acceptTimeout.get()), v);
1073 case SocketOptions.SO_LINGER:
1074 case SocketOptions.SO_RCVBUF:
1075 case SocketOptions.SO_SNDBUF:
1076 return NativeUnixSocket.getSocketOptionInt(fdesc, optID);
1077 case SocketOptions.IP_TOS:
1078 return 0;
1079 case SocketOptions.SO_BINDADDR:
1080 return AFSocketAddress.getInetAddress(fdesc, false, af);
1081 case SocketOptions.SO_REUSEADDR:
1082 return false;
1083 default:
1084 throw new SocketException("Unsupported option: " + optID);
1085 }
1086 } catch (final SocketException e) {
1087 throw e;
1088 } catch (final Exception e) {
1089 throw (SocketException) new SocketException("Could not get option").initCause(e);
1090 }
1091 }
1092
1093 @Override
1094 public void setOption(int optID, Object value) throws SocketException {
1095 setOption0(optID, value);
1096 }
1097
1098 private void setOption0(int optID, Object value) throws SocketException {
1099 if (isClosed()) {
1100 throw new SocketException("Socket is closed");
1101 }
1102 if (optID == SocketOptions.SO_REUSEADDR) {
1103 reuseAddr = (expectBoolean(value) != 0);
1104 return;
1105 }
1106
1107 FileDescriptor fdesc = core.validFdOrException();
1108 setOptionDefault(fdesc, optID, value, socketTimeout);
1109 }
1110
1111
1112
1113
1114
1115
1116
1117
1118 protected final Object getOptionLenient(int optID) throws SocketException {
1119 try {
1120 return getOption0(optID);
1121 } catch (SocketException e) {
1122 switch (optID) {
1123 case SocketOptions.TCP_NODELAY:
1124 case SocketOptions.SO_KEEPALIVE:
1125 return false;
1126 default:
1127 throw e;
1128 }
1129 }
1130 }
1131
1132
1133
1134
1135
1136
1137
1138
1139 protected final void setOptionLenient(int optID, Object value) throws SocketException {
1140 try {
1141 setOption0(optID, value);
1142 } catch (SocketException e) {
1143 switch (optID) {
1144 case SocketOptions.TCP_NODELAY:
1145 return;
1146 default:
1147 throw e;
1148 }
1149 }
1150 }
1151
1152 static final void setOptionDefault(FileDescriptor fdesc, int optID, Object value,
1153 AtomicInteger acceptTimeout) throws SocketException {
1154 try {
1155 switch (optID) {
1156 case SocketOptions.SO_LINGER:
1157
1158 if (value instanceof Boolean) {
1159 final boolean b = (Boolean) value;
1160 if (b) {
1161 throw new SocketException("Only accepting Boolean.FALSE here");
1162 }
1163 NativeUnixSocket.setSocketOptionInt(fdesc, optID, -1);
1164 return;
1165 }
1166 NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectInteger(value));
1167 return;
1168 case SocketOptions.SO_TIMEOUT: {
1169 int timeout = expectInteger(value);
1170 try {
1171 NativeUnixSocket.setSocketOptionInt(fdesc, 0x1005, timeout);
1172 } catch (InvalidArgumentSocketException e) {
1173
1174 }
1175 try {
1176 NativeUnixSocket.setSocketOptionInt(fdesc, 0x1006, timeout);
1177 } catch (InvalidArgumentSocketException e) {
1178
1179 }
1180 if (acceptTimeout != null) {
1181 acceptTimeout.set(timeout);
1182 }
1183 return;
1184 }
1185 case SocketOptions.SO_RCVBUF:
1186 case SocketOptions.SO_SNDBUF:
1187 NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectInteger(value));
1188 return;
1189 case SocketOptions.SO_KEEPALIVE:
1190 try {
1191 NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectBoolean(value));
1192 } catch (SocketException e) {
1193
1194 }
1195 return;
1196 case SocketOptions.TCP_NODELAY:
1197 NativeUnixSocket.setSocketOptionInt(fdesc, optID, expectBoolean(value));
1198 return;
1199 case SocketOptions.IP_TOS:
1200
1201 return;
1202 case SocketOptions.SO_REUSEADDR:
1203
1204 return;
1205 default:
1206 throw new SocketException("Unsupported option: " + optID);
1207 }
1208 } catch (final SocketException e) {
1209 throw e;
1210 } catch (final Exception e) {
1211 throw (SocketException) new SocketException("Error while setting option").initCause(e);
1212 }
1213 }
1214
1215
1216
1217
1218
1219
1220
1221 protected final void shutdown() throws IOException {
1222 FileDescriptor fdesc = core.validFd();
1223 if (fdesc != null) {
1224 NativeUnixSocket.shutdown(fdesc, SHUT_RD_WR);
1225 shutdownState = 0;
1226 }
1227 }
1228
1229 @Override
1230 protected final void shutdownInput() throws IOException {
1231 FileDescriptor fdesc = core.validFd();
1232 if (fdesc != null) {
1233 NativeUnixSocket.shutdown(fdesc, SHUT_RD);
1234 shutdownState |= 1 << (SHUT_RD);
1235 if (shutdownState == SHUTDOWN_RD_WR) {
1236 NativeUnixSocket.shutdown(fdesc, SHUT_RD_WR);
1237 shutdownState = 0;
1238 }
1239 }
1240 }
1241
1242 @Override
1243 protected final void shutdownOutput() throws IOException {
1244 FileDescriptor fdesc = core.validFd();
1245 if (fdesc != null) {
1246 NativeUnixSocket.shutdown(fdesc, SHUT_WR);
1247 shutdownState |= 1 << (SHUT_RD_WR);
1248 if (shutdownState == SHUTDOWN_RD_WR) {
1249 NativeUnixSocket.shutdown(fdesc, SHUT_RD_WR);
1250 shutdownState = 0;
1251 }
1252 }
1253 }
1254
1255 final int getAncillaryReceiveBufferSize() {
1256 return ancillaryDataSupport.getAncillaryReceiveBufferSize();
1257 }
1258
1259 final void setAncillaryReceiveBufferSize(int size) {
1260 ancillaryDataSupport.setAncillaryReceiveBufferSize(size);
1261 }
1262
1263 final void ensureAncillaryReceiveBufferSize(int minSize) {
1264 ancillaryDataSupport.ensureAncillaryReceiveBufferSize(minSize);
1265 }
1266
1267 AncillaryDataSupport getAncillaryDataSupport() {
1268 return ancillaryDataSupport;
1269 }
1270
1271 final SocketAddress receive(ByteBuffer dst) throws IOException {
1272 return core.receive(dst, socketTimeout::get);
1273 }
1274
1275 final int send(ByteBuffer src, SocketAddress target) throws IOException {
1276 return core.write(src, socketTimeout::get, target, 0);
1277 }
1278
1279 final int read(ByteBuffer dst, ByteBuffer socketAddressBuffer) throws IOException {
1280 return core.read(dst, socketTimeout::get, socketAddressBuffer, 0);
1281 }
1282
1283 final int write(ByteBuffer src) throws IOException {
1284 return core.write(src, socketTimeout::get);
1285 }
1286
1287 @Override
1288 protected final FileDescriptor getFileDescriptor() {
1289 return core.fd;
1290 }
1291
1292 final void updatePorts(int local, int remote) {
1293 this.localport = local;
1294 if (remote >= 0) {
1295 this.port = remote;
1296 }
1297 }
1298
1299 final @Nullable A getLocalSocketAddress() {
1300 return AFSocketAddress.getSocketAddress(getFileDescriptor(), false, localport, addressFamily);
1301 }
1302
1303 final @Nullable A getRemoteSocketAddress() {
1304 return AFSocketAddress.getSocketAddress(getFileDescriptor(), true, port, addressFamily);
1305 }
1306
1307 final int getLocalPort1() {
1308 return localport;
1309 }
1310
1311 final int getRemotePort() {
1312 return port;
1313 }
1314
1315 @Override
1316 protected final InetAddress getInetAddress() {
1317 @Nullable
1318 A rsa = getRemoteSocketAddress();
1319 if (rsa == null) {
1320 return InetAddress.getLoopbackAddress();
1321 } else {
1322 return rsa.getInetAddress();
1323 }
1324 }
1325
1326 final void createSocket(FileDescriptor fdTarget, AFSocketType type) throws IOException {
1327 NativeUnixSocket.createSocket(fdTarget, addressFamily.getDomain(), type.getId());
1328 }
1329
1330 final AFAddressFamily<A> getAddressFamily() {
1331 return addressFamily;
1332 }
1333
1334 @Override
1335 protected <T> void setOption(SocketOption<T> name, T value) throws IOException {
1336 if (name instanceof AFSocketOption<?>) {
1337 getCore().setOption((AFSocketOption<T>) name, value);
1338 return;
1339 }
1340 Integer optionId = SocketOptionsMapper.resolve(name);
1341 if (optionId == null) {
1342 super.setOption(name, value);
1343 } else {
1344 setOption(optionId, value);
1345 }
1346 }
1347
1348 @SuppressWarnings("unchecked")
1349 @Override
1350 protected <T> T getOption(SocketOption<T> name) throws IOException {
1351 if (name instanceof AFSocketOption<?>) {
1352 return getCore().getOption((AFSocketOption<T>) name);
1353 }
1354 Integer optionId = SocketOptionsMapper.resolve(name);
1355 if (optionId == null) {
1356 return super.getOption(name);
1357 } else {
1358 return (T) getOption(optionId);
1359 }
1360 }
1361
1362 @Override
1363 protected Set<SocketOption<?>> supportedOptions() {
1364 return SocketOptionsMapper.SUPPORTED_SOCKET_OPTIONS;
1365 }
1366
1367
1368
1369
1370
1371
1372
1373
1374 protected final synchronized AFSocketImplExtensions<A> getImplExtensions() {
1375 if (implExtensions == null) {
1376 implExtensions = addressFamily.initImplExtensions(ancillaryDataSupport);
1377 }
1378 return implExtensions;
1379 }
1380 }