2 * Copyright (C) 2014, United States Government, as represented by the
3 * Administrator of the National Aeronautics and Space Administration.
6 * The Java Pathfinder core (jpf-core) platform is licensed under the
7 * Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0.
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 package gov.nasa.jpf.util;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.util.Arrays;
23 import java.util.concurrent.locks.ReentrantLock;
25 /* Note: This class fails after 8 petabytes of data has been read. This should
26 * never be a problem. For example, assuming a 10 Ghz clock and 1 cycle to read
27 * 4-bytes from L1 cache, it would take 7.3 years to read that much data.
29 public class SplitInputStream {
31 static final int INITIAL_BUFFER_SIZE = 1024; // Recommended to be a power of 2. If not, it will be rounded up to the next power of 2.
33 private final ReentrantLock m_sourceLock = new ReentrantLock();
34 private final ReentrantLock m_dataLock = new ReentrantLock();
35 private final InputStream m_source; // Protected by m_sourceGate lock
36 private final Stream m_stream[]; // Not protected by a lock
37 private long m_write; // Must hold m_dataGate lock to read. Must hold m_dataGate and m_sourceGate to write.
38 private int m_available; // Must hold m_dataGate lock to read. Must hold m_dataGate and m_sourceGate to write.
39 private int m_openStreams; // Must hold m_dataGate lock to access.
40 private byte m_buffer[]; // Must hold m_dataGate lock to read. Must hold m_sourceGate lock to write. The written data doesn't become available until m_write is updated.
42 public SplitInputStream(InputStream source, int streamCount) {
43 this(source, streamCount, INITIAL_BUFFER_SIZE);
46 public SplitInputStream(InputStream source, int streamCount, int initialSize) {
50 throw new NullPointerException("source == null");
53 if (streamCount <= 0) {
54 throw new IllegalArgumentException("streamCount <= 0 : " + streamCount);
57 if (initialSize <= 0) {
58 throw new IllegalArgumentException("initialSize <= 0 : " + initialSize);
62 m_openStreams = streamCount;
63 m_stream = new Stream[streamCount];
65 for (i = streamCount; --i >= 0;) {
66 m_stream[i] = new Stream(i);
69 initialSize--; // Rounds initialSize up to the next power of 2
70 initialSize |= initialSize >> 1;
71 initialSize |= initialSize >> 2;
72 initialSize |= initialSize >> 4;
73 initialSize |= initialSize >> 8;
74 initialSize |= initialSize >> 16;
77 m_buffer = new byte[initialSize];
80 public int getStreamCount() {
81 return (m_stream.length);
84 public InputStream getStream(int index) {
85 return (m_stream[index]);
88 private int read(int index) throws IOException {
95 position = m_stream[index].getPosition();
97 if (position == m_write) {
102 position = m_stream[index].getPosition();
105 offset = getBufferOffset(position);
106 result = m_buffer[offset] & 0x0FF;
108 m_stream[index].setPosition(position + 1);
116 private int read(int index, byte buffer[], int offset, int length) throws IOException {
120 if (buffer == null) {
121 throw new NullPointerException("buffer == null");
125 throw new IndexOutOfBoundsException("offset < 0 : " + offset);
129 throw new IndexOutOfBoundsException("length < 0 : " + length);
132 if (offset + length > buffer.length) {
133 throw new IndexOutOfBoundsException("offset + length > buffer.length : " + offset + " + " + length + " > " + buffer.length);
143 position = m_stream[index].getPosition();
145 if (position == m_write) {
150 position = m_stream[index].getPosition();
153 off = getBufferOffset(position);
154 length = (int) Math.min(length, m_write - position);
155 length = Math.min(length, m_buffer.length - off);
157 m_stream[index].setPosition(position + length);
158 System.arraycopy(m_buffer, off, buffer, offset, length);
166 private long skip(int index, long n) throws IOException {
176 position = m_stream[index].getPosition();
178 if (position == m_write) {
183 position = m_stream[index].getPosition();
186 n = Math.min(n, m_write - position);
188 m_stream[index].setPosition(position + n);
196 private boolean fill(int index) throws IOException {
197 long minPosition, write;
198 int length, offsetPosition, offsetWrite;
201 if (!doLock(index)) {
205 minPosition = getMinPosition();
207 if (m_write - minPosition + 1 >= m_buffer.length) {
211 write = m_write; // Capture the data in local variables so the calculations can take place outside m_dataLock.
212 length = m_buffer.length;
213 m_available = m_source.available();
215 m_dataLock.unlock(); // Don't hold m_dataLock while blocked reading. That way other Streams with data left to read can do so.
217 offsetWrite = getBufferOffset(write);
218 offsetPosition = getBufferOffset(minPosition); // If the minPosition advances while not holding the lock, no big deal. It simply means less data will be read from m_source.
219 length = getReadLength(offsetPosition, offsetWrite, length);
222 length = m_source.read(m_buffer, offsetWrite, length);
223 } while (length == 0); // Guarantee that at least 1 byte is read or end of file is reached.
232 m_available = m_source.available();
234 m_sourceLock.unlock();
236 if (!m_dataLock.isHeldByCurrentThread()) // Restore the lock state when the method was called.
245 private boolean doLock(int index) {
248 /* m_sourceLock must be acquired before m_dataLock. Otherwise, there will
249 * be a deadlock. But, if we can tryLock() m_sourceLock while holding
250 * m_dataLock, this will save CPU time.
252 if (m_sourceLock.tryLock()) {
260 position = m_stream[index].getPosition();
262 return (position == m_write); // Does the thread still need to read data?
265 private long getMinPosition() {
266 long result, position;
269 result = Long.MAX_VALUE;
271 for (i = m_stream.length; --i >= 0;) {
272 if (!m_stream[i].isClosed()) {
273 position = m_stream[i].getPosition();
274 result = Math.min(result, position);
281 private int getReadLength(int offsetPosition, int offsetWrite, int length) {
282 if (offsetPosition > offsetWrite) {
283 return (offsetPosition - offsetWrite - 1);
286 length -= offsetWrite;
288 if (offsetPosition == 0) {
295 private void expand() {
300 length = buffer.length;
301 m_buffer = Arrays.copyOf(buffer, 2 * length);
303 // Since we are doubling the length of the array, we simply have to duplicate the contents.
304 // This allows us to avoid figuring out which part of m_buffer is actually holding data and dealing with wrapping.
305 System.arraycopy(buffer, 0, m_buffer, length, length);
308 private int available(int index) throws IOException {
314 sourceLock = m_sourceLock.tryLock(); // By putting this after locking m_dataLock, the only way tryLock() will fail is if a thread is reading from m_source.
318 m_available = m_source.available();
321 result = m_available;
322 result += m_write - m_stream[index].getPosition();
327 m_sourceLock.unlock();
331 if (result > Integer.MAX_VALUE) {
332 return (Integer.MAX_VALUE);
335 return ((int) result);
338 private void close() throws IOException {
346 close = m_openStreams == 0;
360 m_sourceLock.unlock();
364 private int getBufferOffset(long position) {
365 return ((int) (position & (m_buffer.length - 1)));
368 private class Stream extends InputStream {
370 private long m_position;
371 private final int m_index;
372 private boolean m_closed;
374 private Stream(int index) {
382 void setPosition(long position) {
383 m_position = position;
386 synchronized boolean isClosed() {
391 public int read() throws IOException {
396 return (SplitInputStream.this.read(m_index));
400 public int read(byte buffer[], int offset, int length) throws IOException {
405 return (SplitInputStream.this.read(m_index, buffer, offset, length));
409 public long skip(long n) throws IOException {
414 return (SplitInputStream.this.skip(m_index, n));
418 public int available() throws IOException {
423 return (SplitInputStream.this.available(m_index));
427 public void close() throws IOException {
428 synchronized (this) {
436 SplitInputStream.this.close();