View Javadoc
1   /*
2    * junixsocket
3    *
4    * Copyright 2009-2024 Christian Kohlschütter
5    *
6    * Licensed under the Apache License, Version 2.0 (the "License");
7    * you may not use this file except in compliance with the License.
8    * You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.newsclub.net.unix;
19  
20  import java.io.FileDescriptor;
21  import java.io.IOException;
22  import java.lang.reflect.InvocationTargetException;
23  import java.net.SocketException;
24  import java.nio.ByteBuffer;
25  import java.util.concurrent.atomic.AtomicInteger;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import org.newsclub.net.unix.pool.ObjectPool.Lease;
29  
30  /**
31   * A shared core that is common for all AF* sockets (datagrams, streams).
32   *
33   * @author Christian Kohlschütter
34   */
35  class AFSocketCore extends AFCore {
36    private final AtomicInteger pendingAccepts = new AtomicInteger(0);
37    private static final int SHUT_RD_WR = 2;
38  
39    /**
40     * We keep track of the server's inode to detect when another server connects to our address.
41     */
42    final AtomicLong inode = new AtomicLong(-1);
43  
44    AFSocketAddress socketAddress;
45  
46    private final AFAddressFamily<?> af;
47    private boolean shutdownOnClose = true;
48  
49    protected AFSocketCore(Object observed, FileDescriptor fd,
50        AncillaryDataSupport ancillaryDataSupport, AFAddressFamily<?> af, boolean datagramMode) {
51      super(observed, fd, ancillaryDataSupport, datagramMode);
52      this.af = af;
53    }
54  
55    protected AFAddressFamily<?> addressFamily() {
56      return af;
57    }
58  
59    @Override
60    @SuppressWarnings("UnsafeFinalization" /* errorprone */)
61    protected void doClose() throws IOException {
62      if (isShutdownOnClose()) {
63        NativeUnixSocket.shutdown(fd, SHUT_RD_WR);
64        unblockAccepts();
65      }
66  
67      super.doClose();
68    }
69  
70    protected void unblockAccepts() {
71      // see AFSocketImpl
72    }
73  
74    AFSocketAddress receive(ByteBuffer dst, AFSupplier<Integer> socketTimeout) throws IOException {
75      try (Lease<ByteBuffer> socketAddressBufferLease = AFSocketAddress.SOCKETADDRESS_BUFFER_TL
76          .take()) {
77        ByteBuffer socketAddressBuffer = socketAddressBufferLease.get();
78  
79        int read = read(dst, socketTimeout, socketAddressBuffer, 0);
80        if (read > 0) {
81          return AFSocketAddress.ofInternal(socketAddressBuffer, af);
82        } else {
83          return null;
84        }
85      }
86    }
87  
88    boolean isConnected(boolean boundOk) {
89      try {
90        if (fd.valid()) {
91          switch (NativeUnixSocket.socketStatus(fd)) {
92            case NativeUnixSocket.SOCKETSTATUS_CONNECTED:
93              return true;
94            case NativeUnixSocket.SOCKETSTATUS_BOUND:
95              if (boundOk) {
96                return true;
97              }
98              break;
99            default:
100         }
101       }
102     } catch (IOException e) {
103       throw new IllegalStateException(e);
104     }
105     return false;
106   }
107 
108   @SuppressWarnings({"unchecked"})
109   <T> T getOption(AFSocketOption<T> name) throws IOException {
110     Class<T> type = name.type();
111     if (Boolean.class.isAssignableFrom(type)) {
112       return (T) (Object) (NativeUnixSocket.getSocketOption(fd, name.level(), name.optionName(),
113           Integer.class) != 0);
114     } else if (NamedInteger.HasOfValue.class.isAssignableFrom(type)) {
115       @SuppressWarnings("all") // "null" creates another warning
116       int v = NativeUnixSocket.getSocketOption(fd, name.level(), name.optionName(), Integer.class);
117       try {
118         return (T) type.getMethod("ofValue", int.class).invoke(null, v);
119       } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException
120           | NoSuchMethodException | SecurityException e) {
121         throw new IOException("Value casting problem", e);
122       }
123     } else {
124       return NativeUnixSocket.getSocketOption(fd, name.level(), name.optionName(), type);
125     }
126   }
127 
128   <T> void setOption(AFSocketOption<T> name, T value) throws IOException {
129     final Object val;
130     if (value instanceof Boolean) {
131       val = (((Boolean) value) ? 1 : 0);
132     } else if (value instanceof NamedInteger) {
133       val = ((NamedInteger) value).value();
134     } else {
135       val = value;
136     }
137     int level = name.level();
138     int optionName = name.optionName();
139     NativeUnixSocket.setSocketOption(fd, level, optionName, val);
140     if (level == 271 && optionName == 135) {
141       // AFTIPCSocketOptions.TIPC_GROUP_JOIN
142       // unclear why, but sleeping for at least 1ms prevents issues with GROUP_JOIN
143       try {
144         Thread.sleep(1);
145       } catch (InterruptedException e) {
146         // ignore
147       }
148     }
149   }
150 
151   protected void incPendingAccepts() throws SocketException {
152     if (pendingAccepts.incrementAndGet() >= Integer.MAX_VALUE) {
153       pendingAccepts.decrementAndGet();
154       throw new SocketException("Too many pending accepts");
155     }
156   }
157 
158   protected void decPendingAccepts() {
159     pendingAccepts.decrementAndGet();
160   }
161 
162   protected boolean hasPendingAccepts() {
163     return pendingAccepts.get() > 0;
164   }
165 
166   boolean isShutdownOnClose() {
167     return shutdownOnClose;
168   }
169 
170   void setShutdownOnClose(boolean enabled) {
171     this.shutdownOnClose = enabled;
172   }
173 }