View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix.rmi;
19  
20  import java.io.Closeable;
21  import java.io.Externalizable;
22  import java.io.IOException;
23  import java.io.ObjectInput;
24  import java.io.ObjectOutput;
25  import java.net.ServerSocket;
26  import java.net.Socket;
27  import java.rmi.NotBoundException;
28  import java.rmi.server.RMIClientSocketFactory;
29  import java.rmi.server.RMIServerSocketFactory;
30  import java.rmi.server.RMISocketFactory;
31  import java.util.HashMap;
32  import java.util.HashSet;
33  import java.util.Map;
34  import java.util.Set;
35  
36  import org.newsclub.net.unix.AFServerSocket;
37  import org.newsclub.net.unix.AFSocket;
38  import org.newsclub.net.unix.AFSocketAddress;
39  import org.newsclub.net.unix.StackTraceUtil;
40  import org.newsclub.net.unix.rmi.ShutdownHookSupport.ShutdownHook;
41  
42  import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
43  
44  /**
45   * An {@link RMISocketFactory} that supports {@link AFSocket}s.
46   *
47   * @author Christian Kohlschütter
48   */
49  public abstract class AFRMISocketFactory extends RMISocketFactory implements Externalizable,
50      Closeable {
51    private static final long serialVersionUID = 1L;
52  
53    private transient AFRMIService rmiService = null;
54  
55    private transient Externables externables;
56    private final transient Map<Integer, AFServerSocket<?>> openServerSockets = new HashMap<>();
57    private final transient Set<AFSocket<?>> openSockets = new HashSet<>();
58  
59    private static final class Externables {
60      private final AFNaming naming;
61      private final RMIClientSocketFactory defaultClientFactory;
62      private final RMIServerSocketFactory defaultServerFactory;
63  
64      private Externables(AFNaming naming, RMIClientSocketFactory defaultClientFactory,
65          RMIServerSocketFactory defaultServerFactory) {
66        this.naming = naming;
67        this.defaultClientFactory = defaultClientFactory;
68        this.defaultServerFactory = defaultServerFactory;
69      }
70    }
71  
72    /**
73     * Constructor required per definition.
74     *
75     * @see RMISocketFactory
76     */
77    public AFRMISocketFactory() {
78      this(null, null, null);
79    }
80  
81    /**
82     * Creates a new socket factory.
83     *
84     * @param naming The {@link AFNaming} instance to use.
85     * @param defaultClientFactory The default {@link RMIClientSocketFactory}.
86     * @param defaultServerFactory The default {@link RMIServerSocketFactory}.
87     */
88    @SuppressFBWarnings("EI_EXPOSE_REP2")
89    public AFRMISocketFactory(final AFNaming naming,
90        final RMIClientSocketFactory defaultClientFactory,
91        final RMIServerSocketFactory defaultServerFactory) {
92      super();
93      this.externables = new Externables(naming, defaultClientFactory, defaultServerFactory);
94  
95      closeUponRuntimeShutdown();
96    }
97  
98    // only to be called from the constructor
99    private void closeUponRuntimeShutdown() {
100     ShutdownHookSupport.addWeakShutdownHook(new ShutdownHook() {
101 
102       @Override
103       public void onRuntimeShutdown(Thread thread) {
104         try {
105           close();
106         } catch (IOException e) {
107           // ignore
108         }
109       }
110     });
111   }
112 
113   /**
114    * Creates a new socket address for the given RMI port.
115    *
116    * @param port The port.
117    * @return The socket address.
118    * @throws IOException on error.
119    */
120   protected abstract AFSocketAddress newSocketAddress(int port) throws IOException;
121 
122   /**
123    * Creates a new socket that is connected to the given socket address.
124    *
125    * @param addr The socket address.
126    * @return The connected socket.
127    * @throws IOException on error.
128    */
129   protected abstract AFSocket<?> newConnectedSocket(AFSocketAddress addr) throws IOException;
130 
131   private synchronized Externables getExternables() {
132     return externables;
133   }
134 
135   private synchronized void setExternable(Externables externable) {
136     this.externables = externable;
137   }
138 
139   @Override
140   public Socket createSocket(String host, int port) throws IOException {
141     final RMIClientSocketFactory cf = getExternables().defaultClientFactory;
142     if (cf != null && port < RMIPorts.AF_PORT_BASE) {
143       return cf.createSocket(host, port);
144     }
145 
146     final AFSocketAddress addr = newSocketAddress(port);
147     AFSocket<?> socket = newConnectedSocket(addr);
148 
149     synchronized (openSockets) {
150       openSockets.add(socket);
151     }
152     socket.addCloseable(() -> {
153       synchronized (openSockets) {
154         openSockets.remove(socket);
155       }
156     });
157     return socket;
158   }
159 
160   @Override
161   public void close() throws IOException {
162     synchronized (getExternables().naming) {
163       rmiService = null;
164       closeServerSockets();
165       closeSockets();
166     }
167   }
168 
169   private AFRMIService getRmiService() throws IOException {
170     AFNaming naming = getExternables().naming;
171     synchronized (naming) {
172       if (rmiService == null) {
173         try {
174           rmiService = naming.getRMIService();
175         } catch (NotBoundException e) {
176           throw (IOException) new IOException(e.getMessage()).initCause(e);
177         }
178       }
179       return rmiService;
180     }
181   }
182 
183   /**
184    * Returns a new free port.
185    *
186    * @return The new port.
187    * @throws IOException on error.
188    * @see #returnPort(int)
189    * @deprecated use {@link #newPortLease()}.
190    */
191   @Deprecated
192   protected int newPort() throws IOException {
193     return getRmiService().newPort();
194   }
195 
196   /**
197    * Returns a new free port.
198    *
199    * @return The new port, wrapped as a {@link PortLease}. Closing the lease will return the port.
200    * @throws IOException on error.
201    */
202   protected PortLease newPortLease() throws IOException {
203     AFRMIService service = getRmiService();
204     int port = service.newPort();
205     return new PortLease(port, service);
206   }
207 
208   /**
209    * Returns a port that was previously returned by {@link #newPort()}.
210    *
211    * Note that this may call may stall unnecessarily upon shutdown due to locking issues.
212    *
213    * @param port The port to return.
214    * @throws IOException on error.
215    * @deprecated use {@link #newPortLease()}
216    */
217   @Deprecated
218   protected void returnPort(int port) throws IOException {
219     try {
220       getRmiService().returnPort(port);
221     } catch (ShutdownException e) {
222       // ignore
223     } catch (IOException e) {
224       StackTraceUtil.printStackTrace(e);
225     }
226   }
227 
228   /**
229    * A lease on a registered port; closing the lease will return the port.
230    *
231    * @author Christian Kohlschütter
232    */
233   protected static final class PortLease implements Closeable {
234     private final int port;
235     private final AFRMIService rmiService;
236 
237     private PortLease(int port, AFRMIService rmiService) {
238       this.port = port;
239       this.rmiService = rmiService;
240     }
241 
242     /**
243      * Closes the lease, returning the port to the {@link AFRMIService} it was leased from.
244      */
245     @Override
246     public void close() throws IOException {
247       rmiService.returnPort(getPort());
248     }
249 
250     /**
251      * Returns the port number.
252      *
253      * @return the port number.
254      */
255     public int getPort() {
256       return port;
257     }
258 
259     /**
260      * Returns the service the port was leased from.
261      *
262      * @return The service.
263      */
264     public AFRMIService getRmiService() {
265       return rmiService;
266     }
267   }
268 
269   @Override
270   public ServerSocket createServerSocket(int port) throws IOException {
271     if (port == 0) {
272       PortLease portLease = newPortLease();
273       port = portLease.getPort();
274       final AFSocketAddress addr = newSocketAddress(port);
275       AFServerSocket<?> ass = addr.getAddressFamily().newServerSocket();
276       ass.addCloseable(portLease);
277       ass.setReuseAddress(true);
278       ass.setDeleteOnClose(true);
279       ass.bind(addr);
280 
281       if (port >= RMIPorts.AF_PORT_BASE) {
282         ass.addCloseable(new ServerSocketCloseable(ass, port));
283       }
284       return ass;
285     }
286 
287     final RMIServerSocketFactory sf = getExternables().defaultServerFactory;
288     if (sf != null && port < RMIPorts.AF_PORT_BASE) {
289       return sf.createServerSocket(port);
290     }
291 
292     final AFSocketAddress addr = newSocketAddress(port);
293     AFServerSocket<?> socket = addr.getAddressFamily().newServerSocket();
294     socket.setDeleteOnClose(true);
295     socket.setReuseAddress(true);
296     socket.bind(addr);
297     socket.addCloseable(new ServerSocketCloseable(socket, port));
298     return socket;
299   }
300 
301   private void closeServerSockets() throws IOException {
302     Map<Integer, AFServerSocket<?>> map;
303     synchronized (openServerSockets) {
304       map = new HashMap<>(openServerSockets);
305     }
306     IOException ex = null;
307     for (Map.Entry<Integer, AFServerSocket<?>> en : map.entrySet()) {
308       try {
309         en.getValue().close();
310       } catch (ShutdownException e) {
311         // ignore
312       } catch (IOException e) {
313         if (ex == null) {
314           ex = e;
315         } else {
316           ex.addSuppressed(e);
317         }
318       }
319     }
320     synchronized (openServerSockets) {
321       openServerSockets.clear();
322     }
323     if (ex != null) {
324       throw ex;
325     }
326   }
327 
328   private void closeSockets() {
329     Set<AFSocket<?>> set;
330     synchronized (openSockets) {
331       set = new HashSet<>(openSockets);
332     }
333     for (AFSocket<?> socket : set) {
334       try {
335         socket.close();
336       } catch (IOException e) {
337         // ignore
338       }
339     }
340     synchronized (openSockets) {
341       openSockets.clear();
342     }
343   }
344 
345   private final class ServerSocketCloseable implements Closeable {
346     private final int port;
347 
348     ServerSocketCloseable(AFServerSocket<?> socket, int port) {
349       this.port = port;
350       synchronized (openServerSockets) {
351         openServerSockets.put(port, socket);
352       }
353     }
354 
355     @Override
356     public void close() throws IOException {
357       synchronized (openServerSockets) {
358         openServerSockets.remove(port);
359       }
360     }
361   }
362 
363   @Override
364   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
365     setExternable(new Externables(readNamingInstance(in), //
366         (RMIClientSocketFactory) in.readObject(), //
367         (RMIServerSocketFactory) in.readObject()));
368   }
369 
370   @Override
371   public void writeExternal(ObjectOutput out) throws IOException {
372     Externables ext = getExternables();
373 
374     writeNamingInstance(out, ext.naming);
375     out.writeObject(ext.defaultClientFactory);
376     out.writeObject(ext.defaultServerFactory);
377   }
378 
379   /**
380    * Deserializes information necessary to instantiate the {@link AFNaming} instance.
381    *
382    * @param in The stream.
383    * @return The {@link AFNaming} instance.
384    * @throws IOException on error.
385    */
386   protected abstract AFNaming readNamingInstance(ObjectInput in) throws IOException;
387 
388   /**
389    * Serializes information necessary to instantiate the given {@link AFNaming} instance.
390    *
391    * @param out The stream.
392    * @param namingInstance The {@link AFNaming} instance.
393    * @throws IOException on error.
394    */
395   protected abstract void writeNamingInstance(ObjectOutput out, AFNaming namingInstance)
396       throws IOException;
397 
398   /**
399    * Checks if the given port refers to a local server port.
400    *
401    * @param port The port to check.
402    * @return {@code true} if the given port is a local server.
403    */
404   public boolean isLocalServer(int port) {
405     if (port < RMIPorts.AF_PORT_BASE) {
406       return false;
407     }
408     synchronized (openServerSockets) {
409       return openServerSockets.containsKey(port);
410     }
411   }
412 
413   /**
414    * The naming instance.
415    *
416    * @return The instance.
417    */
418   protected AFNaming getNaming() {
419     return getExternables().naming;
420   }
421 
422   /**
423    * Checks if this socket factory has some knowledge about the given port.
424    *
425    * @param port The port.
426    * @return {@code true} if registered.
427    */
428   abstract boolean hasRegisteredPort(int port);
429 }