1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.newsclub.net.unix;
19
20 import java.io.Closeable;
21 import java.io.FileDescriptor;
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.Pipe;
25 import java.nio.channels.spi.SelectorProvider;
26
27 import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;
28
29
30
31
32
33
34 public final class AFPipe extends Pipe implements Closeable {
35 private final AFCore sourceCore;
36 private final AFCore sinkCore;
37 private final SourceChannel sourceChannel;
38 private final SinkChannel sinkChannel;
39 private final int options;
40
41 AFPipe(AFSelectorProvider<?> provider, boolean selectable) throws IOException {
42 super();
43
44 NativeUnixSocket.ensureSupported();
45
46 this.sourceCore = new AFCore(this, (FileDescriptor) null);
47 this.sinkCore = new AFCore(this, (FileDescriptor) null);
48
49 boolean isSocket = NativeUnixSocket.initPipe(sourceCore.fd, sinkCore.fd, selectable);
50 this.options = isSocket ? 0 : NativeUnixSocket.OPT_NON_SOCKET;
51
52 this.sourceChannel = new SourceChannel(provider);
53 this.sinkChannel = new SinkChannel(provider);
54 }
55
56 @SuppressFBWarnings("EI_EXPOSE_REP")
57 @Override
58 public SourceChannel source() {
59 return sourceChannel;
60 }
61
62 @SuppressFBWarnings("EI_EXPOSE_REP")
63 @Override
64 public SinkChannel sink() {
65 return sinkChannel;
66 }
67
68 FileDescriptor sourceFD() {
69 return sourceCore.fd;
70 }
71
72 FileDescriptor sinkFD() {
73 return sinkCore.fd;
74 }
75
76 @Override
77 public void close() throws IOException {
78 try {
79 source().close();
80 } finally {
81 sink().close();
82 }
83 }
84
85
86
87
88
89 public final class SourceChannel extends java.nio.channels.Pipe.SourceChannel implements
90 FileDescriptorAccess {
91 SourceChannel(SelectorProvider provider) {
92 super(provider);
93 }
94
95 @Override
96 public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
97 if (length == 0) {
98 return 0;
99 }
100 return read(dsts[offset]);
101 }
102
103 @Override
104 public long read(ByteBuffer[] dsts) throws IOException {
105 return read(dsts, 0, dsts.length);
106 }
107
108 @Override
109 public int read(ByteBuffer dst) throws IOException {
110 return sourceCore.read(dst, null, options);
111 }
112
113 @Override
114 protected void implConfigureBlocking(boolean block) throws IOException {
115 sourceCore.implConfigureBlocking(block);
116 }
117
118 @Override
119 protected void implCloseSelectableChannel() throws IOException {
120 sourceCore.close();
121 }
122
123 @Override
124 public FileDescriptor getFileDescriptor() throws IOException {
125 return sourceCore.fd;
126 }
127 }
128
129
130
131
132
133 public final class SinkChannel extends java.nio.channels.Pipe.SinkChannel implements
134 FileDescriptorAccess {
135 SinkChannel(SelectorProvider provider) {
136 super(provider);
137 }
138
139 @Override
140 public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
141 if (length == 0) {
142 return 0;
143 }
144 return write(srcs[offset]);
145 }
146
147 @Override
148 public long write(ByteBuffer[] srcs) throws IOException {
149 return write(srcs, 0, srcs.length);
150 }
151
152 @Override
153 public int write(ByteBuffer src) throws IOException {
154 return sinkCore.write(src, null, options);
155 }
156
157 @Override
158 protected void implConfigureBlocking(boolean block) throws IOException {
159 sinkCore.implConfigureBlocking(block);
160 }
161
162 @Override
163 protected void implCloseSelectableChannel() throws IOException {
164 sinkCore.close();
165 }
166
167 @Override
168 public FileDescriptor getFileDescriptor() throws IOException {
169 return sinkCore.fd;
170 }
171 }
172
173
174
175
176
177
178 int getOptions() {
179 return options;
180 }
181
182
183
184
185
186
187
188 public static AFPipe open() throws IOException {
189 return AFUNIXSelectorProvider.provider().openPipe();
190 }
191 }