Initial import
[jpf-core.git] / src / main / gov / nasa / jpf / util / SplitInputStream.java
1 /*
2  * Copyright (C) 2014, United States Government, as represented by the
3  * Administrator of the National Aeronautics and Space Administration.
4  * All rights reserved.
5  *
6  * The Java Pathfinder core (jpf-core) platform is licensed under the
7  * Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  * 
10  *        http://www.apache.org/licenses/LICENSE-2.0. 
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and 
16  * limitations under the License.
17  */
18 package gov.nasa.jpf.util;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.util.Arrays;
23 import java.util.concurrent.locks.ReentrantLock;
24
25 /* Note: This class fails after 8 petabytes of data has been read.  This should
26  * never be a problem.  For example, assuming a 10 Ghz clock and 1 cycle to read
27  * 4-bytes from L1 cache, it would take 7.3 years to read that much data.
28  */
29 public class SplitInputStream {
30
31   static final int INITIAL_BUFFER_SIZE = 1024; // Recommended to be a power of 2.  If not, it will be rounded up to the next power of 2.
32
33   private final ReentrantLock m_sourceLock = new ReentrantLock();
34   private final ReentrantLock m_dataLock = new ReentrantLock();
35   private final InputStream m_source;      // Protected by m_sourceGate lock
36   private final Stream m_stream[];    // Not protected by a lock
37   private long m_write;       // Must hold m_dataGate lock to read.  Must hold m_dataGate and m_sourceGate to write.
38   private int m_available;   // Must hold m_dataGate lock to read.  Must hold m_dataGate and m_sourceGate to write.
39   private int m_openStreams; // Must hold m_dataGate lock to access.
40   private byte m_buffer[];    // Must hold m_dataGate lock to read.  Must hold m_sourceGate lock to write.  The written data doesn't become available until m_write is updated.
41
42   public SplitInputStream(InputStream source, int streamCount) {
43     this(source, streamCount, INITIAL_BUFFER_SIZE);
44   }
45
46   public SplitInputStream(InputStream source, int streamCount, int initialSize) {
47     int i;
48
49     if (source == null) {
50       throw new NullPointerException("source == null");
51     }
52
53     if (streamCount <= 0) {
54       throw new IllegalArgumentException("streamCount <= 0 : " + streamCount);
55     }
56
57     if (initialSize <= 0) {
58       throw new IllegalArgumentException("initialSize <= 0 : " + initialSize);
59     }
60
61     m_source = source;
62     m_openStreams = streamCount;
63     m_stream = new Stream[streamCount];
64
65     for (i = streamCount; --i >= 0;) {
66       m_stream[i] = new Stream(i);
67     }
68
69     initialSize--;                     // Rounds initialSize up to the next power of 2
70     initialSize |= initialSize >> 1;
71     initialSize |= initialSize >> 2;
72     initialSize |= initialSize >> 4;
73     initialSize |= initialSize >> 8;
74     initialSize |= initialSize >> 16;
75     initialSize++;
76
77     m_buffer = new byte[initialSize];
78   }
79
80   public int getStreamCount() {
81     return (m_stream.length);
82   }
83
84   public InputStream getStream(int index) {
85     return (m_stream[index]);
86   }
87
88   private int read(int index) throws IOException {
89     long position;
90     int offset, result;
91
92     m_dataLock.lock();
93
94     try {
95       position = m_stream[index].getPosition();
96
97       if (position == m_write) {
98         if (!fill(index)) {
99           return (-1);
100         }
101
102         position = m_stream[index].getPosition();
103       }
104
105       offset = getBufferOffset(position);
106       result = m_buffer[offset] & 0x0FF;
107
108       m_stream[index].setPosition(position + 1);
109     } finally {
110       m_dataLock.unlock();
111     }
112
113     return (result);
114   }
115
116   private int read(int index, byte buffer[], int offset, int length) throws IOException {
117     long position;
118     int off;
119
120     if (buffer == null) {
121       throw new NullPointerException("buffer == null");
122     }
123
124     if (offset < 0) {
125       throw new IndexOutOfBoundsException("offset < 0 : " + offset);
126     }
127
128     if (length < 0) {
129       throw new IndexOutOfBoundsException("length < 0 : " + length);
130     }
131
132     if (offset + length > buffer.length) {
133       throw new IndexOutOfBoundsException("offset + length > buffer.length : " + offset + " + " + length + " > " + buffer.length);
134     }
135
136     if (length == 0) {
137       return (0);
138     }
139
140     m_dataLock.lock();
141
142     try {
143       position = m_stream[index].getPosition();
144
145       if (position == m_write) {
146         if (!fill(index)) {
147           return (-1);
148         }
149
150         position = m_stream[index].getPosition();
151       }
152
153       off = getBufferOffset(position);
154       length = (int) Math.min(length, m_write - position);
155       length = Math.min(length, m_buffer.length - off);
156
157       m_stream[index].setPosition(position + length);
158       System.arraycopy(m_buffer, off, buffer, offset, length);
159     } finally {
160       m_dataLock.unlock();
161     }
162
163     return (length);
164   }
165
166   private long skip(int index, long n) throws IOException {
167     long position;
168
169     if (n <= 0) {
170       return (0);
171     }
172
173     m_dataLock.lock();
174
175     try {
176       position = m_stream[index].getPosition();
177
178       if (position == m_write) {
179         if (!fill(index)) {
180           return (0);
181         }
182
183         position = m_stream[index].getPosition();
184       }
185
186       n = Math.min(n, m_write - position);
187
188       m_stream[index].setPosition(position + n);
189     } finally {
190       m_dataLock.unlock();
191     }
192
193     return (n);
194   }
195
196   private boolean fill(int index) throws IOException {
197     long minPosition, write;
198     int length, offsetPosition, offsetWrite;
199
200     try {
201       if (!doLock(index)) {
202         return (true);
203       }
204
205       minPosition = getMinPosition();
206
207       if (m_write - minPosition + 1 >= m_buffer.length) {
208         expand();
209       }
210
211       write = m_write;               // Capture the data in local variables so the calculations can take place outside m_dataLock.
212       length = m_buffer.length;
213       m_available = m_source.available();
214
215       m_dataLock.unlock();                 // Don't hold m_dataLock while blocked reading.  That way other Streams with data left to read can do so.
216
217       offsetWrite = getBufferOffset(write);
218       offsetPosition = getBufferOffset(minPosition); // If the minPosition advances while not holding the lock, no big deal.  It simply means less data will be read from m_source.
219       length = getReadLength(offsetPosition, offsetWrite, length);
220
221       do {
222         length = m_source.read(m_buffer, offsetWrite, length);
223       } while (length == 0); // Guarantee that at least 1 byte is read or end of file is reached.
224
225       if (length < 0) {
226         return (false);
227       }
228
229       m_dataLock.lock();
230
231       m_write += length;
232       m_available = m_source.available();
233     } finally {
234       m_sourceLock.unlock();
235
236       if (!m_dataLock.isHeldByCurrentThread()) // Restore the lock state when the method was called.
237       {
238         m_dataLock.lock();
239       }
240     }
241
242     return (true);
243   }
244
245   private boolean doLock(int index) {
246     long position;
247
248     /* m_sourceLock must be acquired before m_dataLock.  Otherwise, there will
249      * be a deadlock.  But, if we can tryLock() m_sourceLock while holding 
250      * m_dataLock, this will save CPU time.
251      */
252     if (m_sourceLock.tryLock()) {
253       return (true);
254     }
255
256     m_dataLock.unlock();
257     m_sourceLock.lock();
258     m_dataLock.lock();
259
260     position = m_stream[index].getPosition();
261
262     return (position == m_write);    // Does the thread still need to read data?
263   }
264
265   private long getMinPosition() {
266     long result, position;
267     int i;
268
269     result = Long.MAX_VALUE;
270
271     for (i = m_stream.length; --i >= 0;) {
272       if (!m_stream[i].isClosed()) {
273         position = m_stream[i].getPosition();
274         result = Math.min(result, position);
275       }
276     }
277
278     return (result);
279   }
280
281   private int getReadLength(int offsetPosition, int offsetWrite, int length) {
282     if (offsetPosition > offsetWrite) {
283       return (offsetPosition - offsetWrite - 1);
284     }
285
286     length -= offsetWrite;
287
288     if (offsetPosition == 0) {
289       length--;
290     }
291
292     return (length);
293   }
294
295   private void expand() {
296     int length;
297     byte buffer[];
298
299     buffer = m_buffer;
300     length = buffer.length;
301     m_buffer = Arrays.copyOf(buffer, 2 * length);
302
303       // Since we are doubling the length of the array, we simply have to duplicate the contents.
304     // This allows us to avoid figuring out which part of m_buffer is actually holding data and dealing with wrapping.
305     System.arraycopy(buffer, 0, m_buffer, length, length);
306   }
307
308   private int available(int index) throws IOException {
309     long result;
310     boolean sourceLock;
311
312     m_dataLock.lock();
313
314     sourceLock = m_sourceLock.tryLock();   // By putting this after locking m_dataLock, the only way tryLock() will fail is if a thread is reading from m_source.
315
316     try {
317       if (sourceLock) {
318         m_available = m_source.available();
319       }
320
321       result = m_available;
322       result += m_write - m_stream[index].getPosition();
323     } finally {
324       m_dataLock.unlock();
325
326       if (sourceLock) {
327         m_sourceLock.unlock();
328       }
329     }
330
331     if (result > Integer.MAX_VALUE) {
332       return (Integer.MAX_VALUE);
333     }
334
335     return ((int) result);
336   }
337
338   private void close() throws IOException {
339     boolean close;
340
341     m_dataLock.lock();
342
343     try {
344       m_openStreams--;
345
346       close = m_openStreams == 0;
347     } finally {
348       m_dataLock.unlock();
349     }
350
351     if (!close) {
352       return;
353     }
354
355     m_sourceLock.lock();
356
357     try {
358       m_source.close();
359     } finally {
360       m_sourceLock.unlock();
361     }
362   }
363
364   private int getBufferOffset(long position) {
365     return ((int) (position & (m_buffer.length - 1)));
366   }
367
368   private class Stream extends InputStream {
369
370     private long m_position;
371     private final int m_index;
372     private boolean m_closed;
373
374     private Stream(int index) {
375       m_index = index;
376     }
377
378     long getPosition() {
379       return (m_position);
380     }
381
382     void setPosition(long position) {
383       m_position = position;
384     }
385
386     synchronized boolean isClosed() {
387       return (m_closed);
388     }
389
390     @Override
391         public int read() throws IOException {
392       if (isClosed()) {
393         return (-1);
394       }
395
396       return (SplitInputStream.this.read(m_index));
397     }
398
399     @Override
400         public int read(byte buffer[], int offset, int length) throws IOException {
401       if (isClosed()) {
402         return (-1);
403       }
404
405       return (SplitInputStream.this.read(m_index, buffer, offset, length));
406     }
407
408     @Override
409         public long skip(long n) throws IOException {
410       if (isClosed()) {
411         return (0);
412       }
413
414       return (SplitInputStream.this.skip(m_index, n));
415     }
416
417     @Override
418         public int available() throws IOException {
419       if (isClosed()) {
420         return (0);
421       }
422
423       return (SplitInputStream.this.available(m_index));
424     }
425
426     @Override
427         public void close() throws IOException {
428       synchronized (this) {
429         if (m_closed) {
430           return;
431         }
432
433         m_closed = true;
434       }
435
436       SplitInputStream.this.close();
437     }
438   }
439 }