View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix;
19  
20  import static java.util.Objects.requireNonNull;
21  
22  import java.io.FileDescriptor;
23  import java.io.IOException;
24  import java.net.ProtocolFamily;
25  import java.net.SocketAddress;
26  import java.net.SocketOption;
27  import java.net.StandardProtocolFamily;
28  import java.nio.ByteBuffer;
29  import java.nio.channels.SocketChannel;
30  import java.nio.channels.spi.SelectorProvider;
31  import java.util.Objects;
32  import java.util.Set;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  
35  import org.eclipse.jdt.annotation.NonNull;
36  
37  import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
38  
39  /**
40   * A selectable channel for stream-oriented connecting sockets.
41   *
42   * @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
43   * @author Christian Kohlschütter
44   */
45  public abstract class AFSocketChannel<A extends AFSocketAddress> extends SocketChannel implements
46      AFSomeSocket, AFSocketExtensions, AFSomeSocketChannel {
47    private final @NonNull AFSocket<A> afSocket;
48    private final AtomicBoolean connectPending = new AtomicBoolean(false);
49  
50    /**
51     * Creates a new socket channel for the given socket, using the given {@link SelectorProvider}.
52     *
53     * @param socket The socket.
54     * @param sp The {@link SelectorProvider}.
55     */
56    @SuppressWarnings("all")
57    protected AFSocketChannel(AFSocket<A> socket, AFSelectorProvider<A> sp) {
58      super(sp);
59      this.afSocket = Objects.requireNonNull(socket);
60    }
61  
62    /**
63     * Returns the corresponding {@link AFSocket}.
64     *
65     * @return The corresponding socket.
66     */
67    protected final AFSocket<A> getAFSocket() {
68      return afSocket;
69    }
70  
71    /**
72     * A reference to a method that provides an {@link AFSocket} instance.
73     *
74     * @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
75     */
76    @FunctionalInterface
77    protected interface AFSocketSupplier<A extends AFSocketAddress> {
78      /**
79       * Returns a new {@link AFSocket} instance.
80       *
81       * @return The instance.
82       * @throws IOException on error.
83       */
84      AFSocket<A> newInstance() throws IOException;
85    }
86  
87    /**
88     * Opens a socket channel.
89     *
90     * @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
91     * @param supplier The AFSocketChannel constructor.
92     *
93     * @return The new channel
94     * @throws IOException on error.
95     */
96    protected static final <A extends AFSocketAddress> AFSocketChannel<A> open(
97        AFSocketSupplier<A> supplier) throws IOException {
98      return supplier.newInstance().getChannel();
99    }
100 
101   /**
102    * Opens a socket channel, connecting to the given socket address.
103    *
104    * @param <A> The concrete {@link AFSocketAddress} that is supported by this type.
105    * @param remote The socket address to connect to.
106    * @param supplier The AFSocketChannel constructor.
107    * @return The new channel
108    * @throws IOException on error.
109    */
110   protected static final <A extends AFSocketAddress> AFSocketChannel<A> open(
111       AFSocketSupplier<A> supplier, SocketAddress remote) throws IOException {
112     @SuppressWarnings("resource")
113     AFSocketChannel<A> sc = open(supplier);
114     try {
115       sc.connect(remote);
116     } catch (Throwable x) { // NOPMD
117       try {
118         sc.close();
119       } catch (Throwable suppressed) { // NOPMD
120         x.addSuppressed(suppressed);
121       }
122       throw x;
123     }
124     assert sc.isConnected();
125     return sc;
126   }
127 
128   @SuppressWarnings("unchecked")
129   @Override
130   public final <T> T getOption(SocketOption<T> name) throws IOException {
131     if (name instanceof AFSocketOption<?>) {
132       return getAFCore().getOption((AFSocketOption<T>) name);
133     }
134     Integer optionId = SocketOptionsMapper.resolve(name);
135     if (optionId == null) {
136       throw new UnsupportedOperationException("unsupported option");
137     } else {
138       return (T) afSocket.getAFImpl().getOption(optionId);
139     }
140   }
141 
142   @Override
143   public final <T> AFSocketChannel<A> setOption(SocketOption<T> name, T value) throws IOException {
144     if (name instanceof AFSocketOption<?>) {
145       getAFCore().setOption((AFSocketOption<T>) name, value);
146       return this;
147     }
148     Integer optionId = SocketOptionsMapper.resolve(name);
149     if (optionId == null) {
150       throw new UnsupportedOperationException("unsupported option");
151     } else {
152       afSocket.getAFImpl().setOption(optionId, value);
153     }
154     return this;
155   }
156 
157   @Override
158   public final Set<SocketOption<?>> supportedOptions() {
159     return SocketOptionsMapper.SUPPORTED_SOCKET_OPTIONS;
160   }
161 
162   @Override
163   public final AFSocketChannel<A> bind(SocketAddress local) throws IOException {
164     afSocket.bind(local);
165     return this;
166   }
167 
168   @Override
169   public final AFSocketChannel<A> shutdownInput() throws IOException {
170     afSocket.getAFImpl().shutdownInput();
171     return this;
172   }
173 
174   @Override
175   public final AFSocketChannel<A> shutdownOutput() throws IOException {
176     afSocket.getAFImpl().shutdownOutput();
177     return this;
178   }
179 
180   @Override
181   @SuppressFBWarnings("EI_EXPOSE_REP")
182   public final AFSocket<A> socket() {
183     return afSocket;
184   }
185 
186   @Override
187   public final boolean isConnected() {
188     boolean connected = afSocket.isConnected();
189     if (connected) {
190       connectPending.set(false);
191     }
192     return connected;
193   }
194 
195   @Override
196   public final boolean isConnectionPending() {
197     return connectPending.get();
198   }
199 
200   @Override
201   public final boolean connect(SocketAddress remote) throws IOException {
202     boolean complete = false;
203     Exception exception = null;
204     try {
205       begin();
206       boolean connected = afSocket.connect0(remote, 0);
207       if (!connected) {
208         connectPending.set(true);
209       }
210       complete = true;
211       return connected;
212     } catch (IOException e) {
213       throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException( // NOPMD.PreserveStackTrace
214           (exception = InterruptibleChannelUtil.handleException(this, e)));
215     } finally {
216       InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
217     }
218   }
219 
220   @Override
221   public final boolean finishConnect() throws IOException {
222     if (isConnected()) {
223       return true;
224     } else if (!isConnectionPending()) {
225       return false;
226     }
227 
228     boolean complete = false;
229     Exception exception = null;
230     try {
231       begin();
232       boolean connected = NativeUnixSocket.finishConnect(afSocket.getFileDescriptor())
233           || isConnected();
234       if (connected) {
235         connectPending.set(false);
236       }
237       complete = true;
238       return connected;
239     } catch (IOException e) {
240       throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException( // NOPMD.PreserveStackTrace
241           (exception = InterruptibleChannelUtil.handleException(this, e)));
242     } finally {
243       InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
244     }
245   }
246 
247   @Override
248   public final A getRemoteAddress() throws IOException {
249     return getRemoteSocketAddress();
250   }
251 
252   @Override
253   public final A getRemoteSocketAddress() {
254     return afSocket.getRemoteSocketAddress();
255   }
256 
257   @Override
258   public final int read(ByteBuffer dst) throws IOException {
259     boolean complete = false;
260     Exception exception = null;
261     try {
262       begin();
263       int read = afSocket.getAFImpl().read(dst, null);
264       complete = true;
265       return read;
266     } catch (IOException e) {
267       throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException( // NOPMD.PreserveStackTrace
268           (exception = InterruptibleChannelUtil.handleException(this, e)));
269     } finally {
270       InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
271     }
272   }
273 
274   @Override
275   public final long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
276     if (length == 0) {
277       return 0;
278     }
279     // FIXME support more than one buffer for scatter-gather access
280     return read(dsts[offset]);
281   }
282 
283   @Override
284   public final long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
285     if (length == 0) {
286       return 0;
287     }
288     // FIXME support more than one buffer for scatter-gather access
289     return write(srcs[offset]);
290   }
291 
292   @Override
293   public final int write(ByteBuffer src) throws IOException {
294     boolean complete = false;
295     Exception exception = null;
296     try {
297       begin();
298       int written = afSocket.getAFImpl().write(src);
299       complete = true;
300       return written;
301     } catch (IOException e) {
302       throw InterruptibleChannelUtil.ioExceptionOrThrowRuntimeException( // NOPMD.PreserveStackTrace
303           (exception = InterruptibleChannelUtil.handleException(this, e)));
304     } finally {
305       InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
306     }
307   }
308 
309   @Override
310   public final A getLocalAddress() throws IOException {
311     return getLocalSocketAddress();
312   }
313 
314   @Override
315   public final A getLocalSocketAddress() {
316     return afSocket.getLocalSocketAddress();
317   }
318 
319   @Override
320   protected final void implCloseSelectableChannel() throws IOException {
321     afSocket.close();
322   }
323 
324   @Override
325   protected final void implConfigureBlocking(boolean block) throws IOException {
326     getAFCore().implConfigureBlocking(block);
327   }
328 
329   @Override
330   public final int getAncillaryReceiveBufferSize() {
331     return afSocket.getAncillaryReceiveBufferSize();
332   }
333 
334   @Override
335   public final void setAncillaryReceiveBufferSize(int size) {
336     afSocket.setAncillaryReceiveBufferSize(size);
337   }
338 
339   @Override
340   public final void ensureAncillaryReceiveBufferSize(int minSize) {
341     afSocket.ensureAncillaryReceiveBufferSize(minSize);
342   }
343 
344   final AFSocketCore getAFCore() {
345     return afSocket.getAFImpl().getCore();
346   }
347 
348   @Override
349   public final FileDescriptor getFileDescriptor() throws IOException {
350     return afSocket.getFileDescriptor();
351   }
352 
353   @Override
354   public final String toString() {
355     return super.toString() + afSocket.toStringSuffix();
356   }
357 
358   @Override
359   public void setShutdownOnClose(boolean enabled) {
360     getAFCore().setShutdownOnClose(enabled);
361   }
362 
363   /**
364    * Opens a socket channel. The {@code family} parameter specifies the {@link ProtocolFamily
365    * protocol family} of the channel's socket.
366    * <p>
367    * If the {@link ProtocolFamily} is of an {@link AFProtocolFamily}, or {@code UNIX}, the
368    * corresponding junixsocket implementation is used. In all other cases, the call is delegated to
369    * {@link SocketChannel#open()}.
370    *
371    * @param family The protocol family.
372    * @return The new {@link SocketChannel}.
373    * @throws IOException on error.
374    */
375   public static SocketChannel open(ProtocolFamily family) throws IOException {
376     requireNonNull(family);
377 
378     if (family instanceof AFProtocolFamily) {
379       return ((AFProtocolFamily) family).openSocketChannel();
380     } else if ("UNIX".equals(family.name())) {
381       return AFUNIXSocketChannel.open();
382     } else if (family instanceof StandardProtocolFamily) {
383       return SocketChannel.open();
384     } else {
385       throw new UnsupportedOperationException("Protocol family not supported");
386     }
387   }
388 }