View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix;
19  
20  import java.io.Closeable;
21  import java.io.FileDescriptor;
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.net.SocketTimeoutException;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.ConcurrentHashMap;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.locks.LockSupport;
31  
32  import org.eclipse.jdt.annotation.Nullable;
33  import org.newsclub.net.unix.AFSelector.PollFd;
34  
35  /**
36   * "Naive" implementation of {@link VirtualThreadPoller}, using
37   * {@link NativeUnixSocket#poll(PollFd, int)} on non-virtual threads.
38   *
39   * @author Christian Kohlschütter
40   */
41  final class VirtualThreadPollerNaive implements VirtualThreadPoller {
42    private static final int POLL_INTERVAL_MILLIS = 1_000; // should remain at 1 second to simplify
43                                                           // socket timeout handling
44  
45    private static final Map<FileDescriptor, PollJob> POLL_JOBS = new ConcurrentHashMap<>();
46  
47    private static final InterruptedIOException POLL_INTERRUPTED_SENTINEL =
48        new InterruptedIOException();
49  
50    private static final class PollJob {
51      private final List<Thread> waitingThreads = new LinkedList<>();
52      private final FileDescriptor fd;
53      private final int mode;
54      private final long now;
55      private final AFSupplier<Integer> timeout;
56  
57      PollJob(FileDescriptor fd, int mode, long now, AFSupplier<Integer> timeout) {
58        this.fd = fd;
59        this.mode = mode;
60        this.now = now;
61        this.timeout = timeout;
62      }
63  
64      @SuppressWarnings("PMD.CognitiveComplexity")
65      AFFuture<@Nullable IOException> trigger(Thread waitingThread) {
66        synchronized (fd) {
67          waitingThreads.add(waitingThread);
68        }
69        return AFFuture.supplyAsync(() -> {
70          try {
71            Thread thread = Thread.currentThread();
72            PollFd pfd = new PollFd(new FileDescriptor[] {fd}, new int[] {mode});
73            do {
74              if (thread.isInterrupted() || !fd.valid()) {
75                return POLL_INTERRUPTED_SENTINEL;
76              }
77              try {
78                NativeUnixSocket.poll(pfd, POLL_INTERVAL_MILLIS);
79              } catch (IOException e) {
80                return e;
81              }
82              if (thread.isInterrupted() || !fd.valid()) {
83                return POLL_INTERRUPTED_SENTINEL;
84              }
85              if (pfd.rops[0] != 0) {
86                break;
87              }
88  
89              int timeoutMillis = timeout.get();
90              if (timeoutMillis > 0) {
91                if ((System.currentTimeMillis() - now) >= timeoutMillis) {
92                  // handle in calling thread
93                  break;
94                }
95              }
96            } while (true); // NOPMD.WhileLoopWithLiteralBoolean
97          } finally {
98            Thread threadToWake = null;
99            try {
100             synchronized (fd) {
101               threadToWake = waitingThreads.remove(0);
102               if (waitingThreads.isEmpty()) {
103                 POLL_JOBS.remove(fd);
104               }
105             }
106           } finally {
107             if (threadToWake != null) {
108               LockSupport.unpark(threadToWake);
109             }
110           }
111         }
112 
113         return null;
114       })::get;
115     }
116   }
117 
118   @Override
119   public void parkThreadUntilReady(FileDescriptor fd, int mode, long now,
120       AFSupplier<Integer> timeout, Closeable closeOnInterrupt) throws IOException {
121     Thread virtualThread = Thread.currentThread();
122 
123     PollJob job = Java7Util.computeIfAbsent(POLL_JOBS, fd, (k) -> new PollJob(fd, mode, now,
124         timeout));
125     AFFuture<@Nullable IOException> future = job.trigger(virtualThread);
126 
127     LockSupport.park();
128     if (virtualThread.isInterrupted()) {
129       throw SocketClosedByInterruptException.newInstanceAndClose(closeOnInterrupt);
130     }
131 
132     try {
133       IOException ex = future.get();
134       if (ex != null) {
135         if (ex == POLL_INTERRUPTED_SENTINEL) {
136           throw SocketClosedByInterruptException.newInstanceAndClose(closeOnInterrupt);
137         }
138         throw ex;
139       }
140     } catch (InterruptedException | ExecutionException e) {
141       throw SocketClosedByInterruptException.newInstanceAndClose(closeOnInterrupt); // NOPMD.PreserveStackTrace
142     }
143 
144     int timeoutMillis = timeout.get();
145     if (timeoutMillis > 0) {
146       if ((System.currentTimeMillis() - now) >= timeoutMillis) {
147         throw new SocketTimeoutException();
148       }
149     }
150   }
151 }