View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix.server;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.net.ServerSocket;
23  import java.net.Socket;
24  import java.net.SocketAddress;
25  import java.net.SocketException;
26  import java.net.SocketTimeoutException;
27  import java.util.Objects;
28  import java.util.concurrent.Callable;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.ForkJoinPool;
32  import java.util.concurrent.Future;
33  import java.util.concurrent.ScheduledExecutorService;
34  import java.util.concurrent.ScheduledFuture;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  
38  import org.eclipse.jdt.annotation.NonNull;
39  import org.newsclub.net.unix.AFServerSocket;
40  import org.newsclub.net.unix.AFSocketAddress;
41  
42  import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
43  import com.kohlschutter.annotations.compiletime.SuppressLint;
44  
45  /**
46   * A base implementation for a simple, multi-threaded socket server.
47   *
48   * @author Christian Kohlschütter
49   * @see AFSocketServer
50   * @param <A> The supported address type.
51   * @param <S> The supported {@link Socket} type.
52   * @param <V> The supported {@link ServerSocket} type.
53   */
54  public abstract class SocketServer<A extends SocketAddress, S extends Socket, V extends ServerSocket> {
55    private static final ScheduledExecutorService TIMEOUTS = Executors.newScheduledThreadPool(1);
56  
57    private final @NonNull A listenAddress;
58  
59    private int maxConcurrentConnections = Runtime.getRuntime().availableProcessors();
60    private int serverTimeout = 0; // by default, the server doesn't timeout.
61    private int socketTimeout = (int) TimeUnit.SECONDS.toMillis(60);
62    private int serverBusyTimeout = (int) TimeUnit.SECONDS.toMillis(1);
63  
64    private Thread listenThread = null;
65    private V serverSocket;
66    private final AtomicBoolean stopRequested = new AtomicBoolean(false);
67    private final AtomicBoolean ready = new AtomicBoolean(false);
68  
69    private final Object connectionsMonitor = new Object();
70    private ForkJoinPool connectionPool;
71  
72    private ScheduledFuture<IOException> timeoutFuture;
73    private final V reuseSocket;
74  
75    /**
76     * Creates a server using the given, bound {@link ServerSocket}.
77     *
78     * @param serverSocket The server socket to use (must be bound).
79     */
80    @SuppressWarnings("all") // unchecked, null
81    public SocketServer(V serverSocket) {
82      this((A) Objects.requireNonNull(serverSocket).getLocalSocketAddress(), serverSocket);
83    }
84  
85    /**
86     * Creates a server using the given {@link SocketAddress}.
87     *
88     * @param listenAddress The address to bind the socket on.
89     */
90    @SuppressWarnings("null")
91    public SocketServer(A listenAddress) {
92      this(listenAddress, null);
93    }
94  
95    @SuppressWarnings("null")
96    private SocketServer(A listenAddress, V preboundSocket) {
97      Objects.requireNonNull(listenAddress, "listenAddress");
98      this.reuseSocket = preboundSocket;
99  
100     this.listenAddress = listenAddress;
101   }
102 
103   /**
104    * Returns the maximum number of concurrent connections.
105    *
106    * @return The maximum number of concurrent connections.
107    */
108   public int getMaxConcurrentConnections() {
109     return maxConcurrentConnections;
110   }
111 
112   /**
113    * Sets the maximum number of concurrent connections.
114    *
115    * @param maxConcurrentConnections The new maximum.
116    */
117   public void setMaxConcurrentConnections(int maxConcurrentConnections) {
118     if (isRunning()) {
119       throw new IllegalStateException("Already configured");
120     }
121     this.maxConcurrentConnections = maxConcurrentConnections;
122   }
123 
124   /**
125    * Returns the server timeout (in milliseconds).
126    *
127    * @return The server timeout in milliseconds (0 = no timeout).
128    */
129   public int getServerTimeout() {
130     return serverTimeout;
131   }
132 
133   /**
134    * Sets the server timeout (in milliseconds).
135    *
136    * @param timeout The new timeout in milliseconds (0 = no timeout).
137    */
138   public void setServerTimeout(int timeout) {
139     if (isRunning()) {
140       throw new IllegalStateException("Already configured");
141     }
142     this.serverTimeout = timeout;
143   }
144 
145   /**
146    * Returns the socket timeout (in milliseconds).
147    *
148    * @return The socket timeout in milliseconds (0 = no timeout).
149    */
150   public int getSocketTimeout() {
151     return socketTimeout;
152   }
153 
154   /**
155    * Sets the socket timeout (in milliseconds).
156    *
157    * @param timeout The new timeout in milliseconds (0 = no timeout).
158    */
159   public void setSocketTimeout(int timeout) {
160     this.socketTimeout = timeout;
161   }
162 
163   /**
164    * Returns the server-busy timeout (in milliseconds).
165    *
166    * @return The server-busy timeout in milliseconds (0 = no timeout).
167    */
168   public int getServerBusyTimeout() {
169     return serverBusyTimeout;
170   }
171 
172   /**
173    * Sets the server-busy timeout (in milliseconds).
174    *
175    * @param timeout The new timeout in milliseconds (0 = no timeout).
176    */
177   public void setServerBusyTimeout(int timeout) {
178     this.serverBusyTimeout = timeout;
179   }
180 
181   /**
182    * Checks if the server is running.
183    *
184    * @return {@code true} if the server is alive.
185    */
186   public boolean isRunning() {
187     synchronized (this) {
188       return (listenThread != null && listenThread.isAlive());
189     }
190   }
191 
192   /**
193    * Checks if the server is running and accepting new connections.
194    *
195    * @return {@code true} if the server is alive and ready to accept new connections.
196    */
197   public boolean isReady() {
198     return ready.get() && !stopRequested.get() && isRunning();
199   }
200 
201   /**
202    * Starts the server, and returns immediately.
203    *
204    * @see #startAndWaitToBecomeReady(long, TimeUnit)
205    */
206   public void start() {
207     synchronized (this) {
208       if (isRunning()) {
209         return;
210       }
211       if (connectionPool == null) {
212         connectionPool = new ForkJoinPool(maxConcurrentConnections,
213             ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
214       }
215 
216       Thread t = new Thread(SocketServer.this.toString() + " listening thread") {
217         @Override
218         public void run() {
219           try {
220             listen();
221           } catch (Exception e) {
222             onListenException(e);
223           } catch (Throwable e) { // NOPMD
224             onListenException(e);
225           }
226         }
227       };
228       t.start();
229 
230       listenThread = t;
231     }
232   }
233 
234   /**
235    * Starts the server and waits until it is ready or had to stop due to an error.
236    *
237    * @throws InterruptedException If the wait was interrupted.
238    */
239   public void startAndWaitToBecomeReady() throws InterruptedException {
240     synchronized (this) {
241       start();
242       while (!ready.get() && !stopRequested.get()) {
243         this.wait(1000);
244       }
245     }
246   }
247 
248   /**
249    * Starts the server and waits until it is ready or had to stop due to an error.
250    *
251    * @param duration The duration wait.
252    * @param unit The duration's time unit.
253    * @return {@code true} if the server is ready to serve requests.
254    * @throws InterruptedException If the wait was interrupted.
255    */
256   public boolean startAndWaitToBecomeReady(long duration, TimeUnit unit)
257       throws InterruptedException {
258     synchronized (this) {
259       start();
260       long timeStart = System.currentTimeMillis();
261       while (duration > 0) {
262         if (isReady()) {
263           return true;
264         }
265         this.wait(unit.toMillis(duration));
266         duration -= (System.currentTimeMillis() - timeStart);
267       }
268       return isReady();
269     }
270   }
271 
272   /**
273    * Returns a new server socket.
274    *
275    * @return The new socket (an {@link AFServerSocket} if the listen address is an
276    *         {@link AFSocketAddress}).
277    * @throws IOException on error.
278    */
279   protected abstract V newServerSocket() throws IOException;
280 
281   @SuppressWarnings("null")
282   private void listen() throws IOException {
283     V server = null;
284     try {
285       synchronized (this) {
286         if (reuseSocket != null) {
287           server = reuseSocket;
288         } else {
289           server = null;
290         }
291       }
292       if (server == null) {
293         server = newServerSocket();
294       }
295       synchronized (this) {
296         if (serverSocket != null) {
297           throw new IllegalStateException("The server is already listening");
298         }
299         serverSocket = server;
300       }
301       onServerStarting();
302 
303       if (!server.isBound()) {
304         server.bind(listenAddress);
305         onServerBound(listenAddress);
306       }
307       server.setSoTimeout(serverTimeout);
308 
309       acceptLoop(server);
310     } catch (SocketException e) {
311       onSocketExceptionDuringAccept(e);
312     } finally {
313       stop();
314       onServerStopped(server);
315     }
316   }
317 
318   @SuppressWarnings("PMD.CognitiveComplexity")
319   @SuppressFBWarnings("NN_NAKED_NOTIFY")
320   @SuppressLint("RESOURCE_LEAK")
321   private void acceptLoop(V server) throws IOException {
322     long busyStartTime = 0;
323     acceptLoop : while (!stopRequested.get() && !Thread.interrupted()) {
324       try {
325         while (!stopRequested.get() && connectionPool
326             .getActiveThreadCount() >= maxConcurrentConnections) {
327           if (busyStartTime == 0) {
328             busyStartTime = System.currentTimeMillis();
329           }
330           onServerBusy(busyStartTime);
331 
332           synchronized (connectionsMonitor) {
333             try {
334               connectionsMonitor.wait(serverBusyTimeout);
335             } catch (InterruptedException e) {
336               throw (InterruptedIOException) new InterruptedIOException(
337                   "Interrupted while waiting on server resources").initCause(e);
338             }
339           }
340         }
341         busyStartTime = 0;
342 
343         if (stopRequested.get() || server == null) {
344           break;
345         }
346 
347         synchronized (SocketServer.this) {
348           SocketServer.this.notifyAll();
349         }
350         ready.set(true);
351         onServerReady(connectionPool.getActiveThreadCount());
352 
353         final S socket;
354         try {
355           @SuppressWarnings("unchecked")
356           S theSocket = (S) server.accept();
357           socket = theSocket;
358         } catch (SocketException e) {
359           if (server.isClosed()) {
360             // already closed, ignore
361             break acceptLoop;
362           } else {
363             throw e;
364           }
365         }
366         try {
367           socket.setSoTimeout(socketTimeout);
368         } catch (SocketException e) {
369           // Connection closed before we could do anything
370           onSocketExceptionAfterAccept(socket, e);
371           socket.close();
372 
373           continue acceptLoop;
374         }
375 
376         onSubmitted(socket, submit(socket, connectionPool));
377       } catch (SocketTimeoutException e) {
378         if (!connectionPool.isQuiescent()) {
379           continue acceptLoop;
380         } else {
381           onServerShuttingDown();
382           connectionPool.shutdown();
383           break acceptLoop;
384         }
385       }
386     }
387   }
388 
389   /**
390    * Stops the server.
391    *
392    * @throws IOException If there was an error.
393    */
394   @SuppressWarnings("null")
395   @SuppressFBWarnings("NN_NAKED_NOTIFY")
396   public void stop() throws IOException {
397     stopRequested.set(true);
398     ready.set(false);
399 
400     synchronized (this) {
401       V theServerSocket = serverSocket;
402       serverSocket = null;
403       try {
404         if (theServerSocket == null) {
405           return;
406         }
407         ScheduledFuture<IOException> future = this.timeoutFuture;
408         if (future != null) {
409           future.cancel(false);
410           this.timeoutFuture = null;
411         }
412 
413         theServerSocket.close();
414       } finally {
415         SocketServer.this.notifyAll();
416       }
417     }
418   }
419 
420   private Future<?> submit(final S socket, ExecutorService executor) {
421     Objects.requireNonNull(socket);
422     return executor.submit(new Runnable() {
423       @Override
424       public void run() {
425         onBeforeServingSocket(socket);
426 
427         try { // NOPMD
428           doServeSocket(socket);
429         } catch (Exception e) { // NOPMD
430           onServingException(socket, e); // NOPMD
431         } catch (Throwable t) { // NOPMD
432           onServingException(socket, t); // NOPMD
433         } finally {
434           // Notify the server's accept thread that we handled the connection
435           synchronized (connectionsMonitor) {
436             connectionsMonitor.notifyAll();
437           }
438 
439           doSocketClose(socket);
440           onAfterServingSocket(socket);
441         }
442       }
443     });
444   }
445 
446   /**
447    * Called upon closing a socket after serving the connection.
448    * <p>
449    * The default implementation closes the socket directly, ignoring any {@link IOException}s. You
450    * may override this method to close the socket in a separate thread, for example.
451    *
452    * @param socket The socket to close.
453    */
454   @SuppressWarnings("null")
455   protected void doSocketClose(S socket) {
456     try {
457       socket.close();
458     } catch (IOException e) {
459       // ignore
460     }
461   }
462 
463   /**
464    * Requests that the server will be stopped after the given time delay. If the server is not
465    * started yet (and {@link #stop()} was not called yet, it will be started first.
466    *
467    * @param delay The delay.
468    * @param unit The time unit for the delay.
469    * @return A scheduled future that can be used to monitor progress / cancel the request. If there
470    *         was a problem with stopping, an IOException is returned as the value (not thrown). If
471    *         stop was already requested, {@code null} is returned.
472    */
473   public ScheduledFuture<IOException> startThenStopAfter(long delay, TimeUnit unit) {
474     if (stopRequested.get()) {
475       return null;
476     }
477     synchronized (this) {
478       start();
479       ScheduledFuture<?> existingFuture = this.timeoutFuture;
480       if (existingFuture != null) {
481         existingFuture.cancel(false);
482       }
483 
484       return (this.timeoutFuture = TIMEOUTS.schedule(new Callable<IOException>() {
485         @SuppressFBWarnings("THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION")
486         @Override
487         public IOException call() throws Exception {
488           try {
489             stop();
490             return null;
491           } catch (IOException e) {
492             return e;
493           }
494         }
495       }, delay, unit));
496     }
497   }
498 
499   /**
500    * Called when a socket is ready to be served.
501    *
502    * @param socket The socket to serve.
503    * @throws IOException If there was an error.
504    */
505   protected abstract void doServeSocket(S socket) throws IOException;
506 
507   /**
508    * Called when the server is starting up.
509    */
510   protected void onServerStarting() {
511   }
512 
513   /**
514    * Called when the server has been bound to a socket.
515    *
516    * This is not called when you instantiated the server with a pre-bound socket.
517    *
518    * @param address The bound address.
519    */
520   protected void onServerBound(A address) {
521   }
522 
523   /**
524    * Called when the server is ready to accept a new connection.
525    *
526    * @param activeCount The current number of active tasks (= serving sockets).
527    */
528   protected void onServerReady(int activeCount) {
529   }
530 
531   /**
532    * Called when the server is busy / not ready to accept a new connection.
533    *
534    * The frequency on how often this method is called when the server is busy is determined by
535    * {@link #getServerBusyTimeout()}.
536    *
537    * @param busyStartTime The time stamp since the server became busy.
538    */
539   protected void onServerBusy(long busyStartTime) {
540   }
541 
542   /**
543    * Called when the server has been stopped.
544    *
545    * @param socket The server's socket that stopped, or {@code null}.
546    */
547   protected void onServerStopped(V socket) {
548   }
549 
550   /**
551    * Called when a socket gets submitted into the process queue.
552    *
553    * @param socket The socket.
554    * @param submission The {@link Future} referencing the submission; it's "done" after the socket
555    *          has been served.
556    */
557   protected void onSubmitted(S socket, Future<?> submission) {
558   }
559 
560   /**
561    * Called when the server is shutting down.
562    */
563   protected void onServerShuttingDown() {
564   }
565 
566   /**
567    * Called when a {@link SocketException} was thrown during "accept".
568    *
569    * @param e The exception.
570    */
571   protected void onSocketExceptionDuringAccept(SocketException e) {
572   }
573 
574   /**
575    * Called when a {@link SocketException} was thrown during "accept".
576    *
577    * @param socket The socket.
578    * @param e The exception.
579    */
580   protected void onSocketExceptionAfterAccept(S socket, SocketException e) {
581   }
582 
583   /**
584    * Called before serving the socket.
585    *
586    * @param socket The socket.
587    */
588   protected void onBeforeServingSocket(S socket) {
589   }
590 
591   /**
592    * Called when an exception was thrown while serving a socket.
593    *
594    * @param socket The socket.
595    * @param e The exception.
596    * @deprecated Use {@link #onServingException(Socket, Throwable)}
597    * @see #onServingException(Socket, Throwable)
598    */
599   @Deprecated
600   protected void onServingException(S socket, Exception e) {
601     onServingException(socket, (Throwable) e);
602   }
603 
604   /**
605    * Called when a throwable was thrown while serving a socket.
606    *
607    * @param socket The socket.
608    * @param t The throwable.
609    */
610   protected void onServingException(S socket, Throwable t) {
611   }
612 
613   /**
614    * Called after the socket has been served.
615    *
616    * @param socket The socket.
617    */
618   protected void onAfterServingSocket(S socket) {
619   }
620 
621   /**
622    * Called when an exception was thrown while listening on the server socket.
623    *
624    * @param e The exception.
625    * @deprecated Use {@link #onListenException(Throwable)}
626    * @see #onListenException(Throwable)
627    */
628   @Deprecated
629   protected void onListenException(Exception e) {
630     onListenException((Throwable) e);
631   }
632 
633   /**
634    * Called when an exception was thrown while listening on the server socket.
635    *
636    * @param t The throwable.
637    */
638   protected void onListenException(Throwable t) {
639   }
640 
641   /**
642    * Returns the address the server listens to.
643    *
644    * @return The listen address.
645    */
646   protected @NonNull A getListenAddress() {
647     return listenAddress;
648   }
649 }