1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.newsclub.net.unix.server;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.net.ServerSocket;
23 import java.net.Socket;
24 import java.net.SocketAddress;
25 import java.net.SocketException;
26 import java.net.SocketTimeoutException;
27 import java.util.Objects;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ForkJoinPool;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.ScheduledExecutorService;
34 import java.util.concurrent.ScheduledFuture;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37
38 import org.eclipse.jdt.annotation.NonNull;
39 import org.newsclub.net.unix.AFServerSocket;
40 import org.newsclub.net.unix.AFSocketAddress;
41
42 import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
43 import com.kohlschutter.annotations.compiletime.SuppressLint;
44
45
46
47
48
49
50
51
52
53
54 public abstract class SocketServer<A extends SocketAddress, S extends Socket, V extends ServerSocket> {
55 private static final ScheduledExecutorService TIMEOUTS = Executors.newScheduledThreadPool(1);
56
57 private final @NonNull A listenAddress;
58
59 private int maxConcurrentConnections = Runtime.getRuntime().availableProcessors();
60 private int serverTimeout = 0;
61 private int socketTimeout = (int) TimeUnit.SECONDS.toMillis(60);
62 private int serverBusyTimeout = (int) TimeUnit.SECONDS.toMillis(1);
63
64 private Thread listenThread = null;
65 private V serverSocket;
66 private final AtomicBoolean stopRequested = new AtomicBoolean(false);
67 private final AtomicBoolean ready = new AtomicBoolean(false);
68
69 private final Object connectionsMonitor = new Object();
70 private ForkJoinPool connectionPool;
71
72 private ScheduledFuture<IOException> timeoutFuture;
73 private final V reuseSocket;
74
75
76
77
78
79
80 @SuppressWarnings("all")
81 public SocketServer(V serverSocket) {
82 this((A) Objects.requireNonNull(serverSocket).getLocalSocketAddress(), serverSocket);
83 }
84
85
86
87
88
89
90 @SuppressWarnings("null")
91 public SocketServer(A listenAddress) {
92 this(listenAddress, null);
93 }
94
95 @SuppressWarnings("null")
96 private SocketServer(A listenAddress, V preboundSocket) {
97 Objects.requireNonNull(listenAddress, "listenAddress");
98 this.reuseSocket = preboundSocket;
99
100 this.listenAddress = listenAddress;
101 }
102
103
104
105
106
107
108 public int getMaxConcurrentConnections() {
109 return maxConcurrentConnections;
110 }
111
112
113
114
115
116
117 public void setMaxConcurrentConnections(int maxConcurrentConnections) {
118 if (isRunning()) {
119 throw new IllegalStateException("Already configured");
120 }
121 this.maxConcurrentConnections = maxConcurrentConnections;
122 }
123
124
125
126
127
128
129 public int getServerTimeout() {
130 return serverTimeout;
131 }
132
133
134
135
136
137
138 public void setServerTimeout(int timeout) {
139 if (isRunning()) {
140 throw new IllegalStateException("Already configured");
141 }
142 this.serverTimeout = timeout;
143 }
144
145
146
147
148
149
150 public int getSocketTimeout() {
151 return socketTimeout;
152 }
153
154
155
156
157
158
159 public void setSocketTimeout(int timeout) {
160 this.socketTimeout = timeout;
161 }
162
163
164
165
166
167
168 public int getServerBusyTimeout() {
169 return serverBusyTimeout;
170 }
171
172
173
174
175
176
177 public void setServerBusyTimeout(int timeout) {
178 this.serverBusyTimeout = timeout;
179 }
180
181
182
183
184
185
186 public boolean isRunning() {
187 synchronized (this) {
188 return (listenThread != null && listenThread.isAlive());
189 }
190 }
191
192
193
194
195
196
197 public boolean isReady() {
198 return ready.get() && !stopRequested.get() && isRunning();
199 }
200
201
202
203
204
205
206 public void start() {
207 synchronized (this) {
208 if (isRunning()) {
209 return;
210 }
211 if (connectionPool == null) {
212 connectionPool = new ForkJoinPool(maxConcurrentConnections,
213 ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
214 }
215
216 Thread t = new Thread(SocketServer.this.toString() + " listening thread") {
217 @Override
218 public void run() {
219 try {
220 listen();
221 } catch (Exception e) {
222 onListenException(e);
223 } catch (Throwable e) {
224 onListenException(e);
225 }
226 }
227 };
228 t.start();
229
230 listenThread = t;
231 }
232 }
233
234
235
236
237
238
239 public void startAndWaitToBecomeReady() throws InterruptedException {
240 synchronized (this) {
241 start();
242 while (!ready.get() && !stopRequested.get()) {
243 this.wait(1000);
244 }
245 }
246 }
247
248
249
250
251
252
253
254
255
256 public boolean startAndWaitToBecomeReady(long duration, TimeUnit unit)
257 throws InterruptedException {
258 synchronized (this) {
259 start();
260 long timeStart = System.currentTimeMillis();
261 while (duration > 0) {
262 if (isReady()) {
263 return true;
264 }
265 this.wait(unit.toMillis(duration));
266 duration -= (System.currentTimeMillis() - timeStart);
267 }
268 return isReady();
269 }
270 }
271
272
273
274
275
276
277
278
279 protected abstract V newServerSocket() throws IOException;
280
281 @SuppressWarnings("null")
282 private void listen() throws IOException {
283 V server = null;
284 try {
285 synchronized (this) {
286 if (reuseSocket != null) {
287 server = reuseSocket;
288 } else {
289 server = null;
290 }
291 }
292 if (server == null) {
293 server = newServerSocket();
294 }
295 synchronized (this) {
296 if (serverSocket != null) {
297 throw new IllegalStateException("The server is already listening");
298 }
299 serverSocket = server;
300 }
301 onServerStarting();
302
303 if (!server.isBound()) {
304 server.bind(listenAddress);
305 onServerBound(listenAddress);
306 }
307 server.setSoTimeout(serverTimeout);
308
309 acceptLoop(server);
310 } catch (SocketException e) {
311 onSocketExceptionDuringAccept(e);
312 } finally {
313 stop();
314 onServerStopped(server);
315 }
316 }
317
318 @SuppressWarnings("PMD.CognitiveComplexity")
319 @SuppressFBWarnings("NN_NAKED_NOTIFY")
320 @SuppressLint("RESOURCE_LEAK")
321 private void acceptLoop(V server) throws IOException {
322 long busyStartTime = 0;
323 acceptLoop : while (!stopRequested.get() && !Thread.interrupted()) {
324 try {
325 while (!stopRequested.get() && connectionPool
326 .getActiveThreadCount() >= maxConcurrentConnections) {
327 if (busyStartTime == 0) {
328 busyStartTime = System.currentTimeMillis();
329 }
330 onServerBusy(busyStartTime);
331
332 synchronized (connectionsMonitor) {
333 try {
334 connectionsMonitor.wait(serverBusyTimeout);
335 } catch (InterruptedException e) {
336 throw (InterruptedIOException) new InterruptedIOException(
337 "Interrupted while waiting on server resources").initCause(e);
338 }
339 }
340 }
341 busyStartTime = 0;
342
343 if (stopRequested.get() || server == null) {
344 break;
345 }
346
347 synchronized (SocketServer.this) {
348 SocketServer.this.notifyAll();
349 }
350 ready.set(true);
351 onServerReady(connectionPool.getActiveThreadCount());
352
353 final S socket;
354 try {
355 @SuppressWarnings("unchecked")
356 S theSocket = (S) server.accept();
357 socket = theSocket;
358 } catch (SocketException e) {
359 if (server.isClosed()) {
360
361 break acceptLoop;
362 } else {
363 throw e;
364 }
365 }
366 try {
367 socket.setSoTimeout(socketTimeout);
368 } catch (SocketException e) {
369
370 onSocketExceptionAfterAccept(socket, e);
371 socket.close();
372
373 continue acceptLoop;
374 }
375
376 onSubmitted(socket, submit(socket, connectionPool));
377 } catch (SocketTimeoutException e) {
378 if (!connectionPool.isQuiescent()) {
379 continue acceptLoop;
380 } else {
381 onServerShuttingDown();
382 connectionPool.shutdown();
383 break acceptLoop;
384 }
385 }
386 }
387 }
388
389
390
391
392
393
394 @SuppressWarnings("null")
395 @SuppressFBWarnings("NN_NAKED_NOTIFY")
396 public void stop() throws IOException {
397 stopRequested.set(true);
398 ready.set(false);
399
400 synchronized (this) {
401 V theServerSocket = serverSocket;
402 serverSocket = null;
403 try {
404 if (theServerSocket == null) {
405 return;
406 }
407 ScheduledFuture<IOException> future = this.timeoutFuture;
408 if (future != null) {
409 future.cancel(false);
410 this.timeoutFuture = null;
411 }
412
413 theServerSocket.close();
414 } finally {
415 SocketServer.this.notifyAll();
416 }
417 }
418 }
419
420 private Future<?> submit(final S socket, ExecutorService executor) {
421 Objects.requireNonNull(socket);
422 return executor.submit(new Runnable() {
423 @Override
424 public void run() {
425 onBeforeServingSocket(socket);
426
427 try {
428 doServeSocket(socket);
429 } catch (Exception e) {
430 onServingException(socket, e);
431 } catch (Throwable t) {
432 onServingException(socket, t);
433 } finally {
434
435 synchronized (connectionsMonitor) {
436 connectionsMonitor.notifyAll();
437 }
438
439 doSocketClose(socket);
440 onAfterServingSocket(socket);
441 }
442 }
443 });
444 }
445
446
447
448
449
450
451
452
453
454 @SuppressWarnings("null")
455 protected void doSocketClose(S socket) {
456 try {
457 socket.close();
458 } catch (IOException e) {
459
460 }
461 }
462
463
464
465
466
467
468
469
470
471
472
473 public ScheduledFuture<IOException> startThenStopAfter(long delay, TimeUnit unit) {
474 if (stopRequested.get()) {
475 return null;
476 }
477 synchronized (this) {
478 start();
479 ScheduledFuture<?> existingFuture = this.timeoutFuture;
480 if (existingFuture != null) {
481 existingFuture.cancel(false);
482 }
483
484 return (this.timeoutFuture = TIMEOUTS.schedule(new Callable<IOException>() {
485 @SuppressFBWarnings("THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION")
486 @Override
487 public IOException call() throws Exception {
488 try {
489 stop();
490 return null;
491 } catch (IOException e) {
492 return e;
493 }
494 }
495 }, delay, unit));
496 }
497 }
498
499
500
501
502
503
504
505 protected abstract void doServeSocket(S socket) throws IOException;
506
507
508
509
510 protected void onServerStarting() {
511 }
512
513
514
515
516
517
518
519
520 protected void onServerBound(A address) {
521 }
522
523
524
525
526
527
528 protected void onServerReady(int activeCount) {
529 }
530
531
532
533
534
535
536
537
538
539 protected void onServerBusy(long busyStartTime) {
540 }
541
542
543
544
545
546
547 protected void onServerStopped(V socket) {
548 }
549
550
551
552
553
554
555
556
557 protected void onSubmitted(S socket, Future<?> submission) {
558 }
559
560
561
562
563 protected void onServerShuttingDown() {
564 }
565
566
567
568
569
570
571 protected void onSocketExceptionDuringAccept(SocketException e) {
572 }
573
574
575
576
577
578
579
580 protected void onSocketExceptionAfterAccept(S socket, SocketException e) {
581 }
582
583
584
585
586
587
588 protected void onBeforeServingSocket(S socket) {
589 }
590
591
592
593
594
595
596
597
598
599 @Deprecated
600 protected void onServingException(S socket, Exception e) {
601 onServingException(socket, (Throwable) e);
602 }
603
604
605
606
607
608
609
610 protected void onServingException(S socket, Throwable t) {
611 }
612
613
614
615
616
617
618 protected void onAfterServingSocket(S socket) {
619 }
620
621
622
623
624
625
626
627
628 @Deprecated
629 protected void onListenException(Exception e) {
630 onListenException((Throwable) e);
631 }
632
633
634
635
636
637
638 protected void onListenException(Throwable t) {
639 }
640
641
642
643
644
645
646 protected @NonNull A getListenAddress() {
647 return listenAddress;
648 }
649 }