From e223e5fe69ffde44f73c2f18888f8a3dbcd54ab6 Mon Sep 17 00:00:00 2001 From: navid Date: Mon, 24 Nov 2008 19:02:15 +0000 Subject: [PATCH] *** empty log message *** --- .../core/ExtendedTransaction.java | 64 +- .../core/TransactionalFile.java | 1136 ++++------------- .../TransactionalIO/core/WriteOperations.java | 8 +- 3 files changed, 245 insertions(+), 963 deletions(-) diff --git a/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/ExtendedTransaction.java b/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/ExtendedTransaction.java index 478eb1e0..d04a8720 100644 --- a/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/ExtendedTransaction.java +++ b/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/ExtendedTransaction.java @@ -253,24 +253,16 @@ public class ExtendedTransaction implements TransactionStatu { - public void addFile(TransactionalFile tf/*, TransactionLocalFileAttributes tmp*/) { + public void addFile(TransactionalFile tf, long offstenumber) { - - /* if (tf.appendmode) { - this.addtoFileAccessModeMap(tf.getInode(), FileAccessModesEum.APPEND); - } else if (tf.writemode) { - this.addtoFileAccessModeMap(tf.getInode(), FileAccessModesEum.READ_WRITE); - } else { - this.addtoFileAccessModeMap(tf.getInode(), FileAccessModesEum.READ); - }*/ - // System.out.println("dsadssasadssa"); - - tf.lockOffset(this); - - //tf.offsetlock.lock(); - TransactionLocalFileAttributes tmp = new TransactionLocalFileAttributes(tf.getCommitedoffset().getOffsetnumber()/*, tf.getInodestate().commitedfilesize.get()*/); - //this.heldoffsetlocks.remove(tf.offsetlock); - tf.offsetlock.unlock(); + + //tf.lockOffset(this); + + TransactionLocalFileAttributes tmp = new TransactionLocalFileAttributes(offstenumber/*, tf.getInodestate().commitedfilesize.get()*/); + + //tf.offsetlock.unlock(); + + Vector dummy; @@ -282,16 +274,9 @@ public class ExtendedTransaction implements TransactionStatu { AccessedFiles.put(tf.getInode(), dummy); } - - - // this.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Unlocked the offset lock " + tf.offsetlock + " for file " + tf.getInode() + " form descriptor " + tf.getSequenceNum()); dummy.add(tf); GlobaltoLocalMappings.put(tf, tmp); merge_for_writes_done.put(tf.getInode(), Boolean.TRUE); - - - - //} } @@ -409,31 +394,16 @@ public class ExtendedTransaction implements TransactionStatu { if (mode == BlockAccessModesEnum.READ){ - lock = block.getLock().readLock(); - - - } + lock = block.getLock().readLock(); + } else { - lock = block.getLock().writeLock(); - } while (this.getStatus() == Status.ACTIVE) { - //synchronized(block){ - - // if (lock.tryLock()) { lock.lock(); - // synchronized(benchmark.lock){ - // System.out.println(Thread.currentThread() + " Lock the block lock for " + lock +" number " + block.getBlocknumber()); - // } heldblocklocks.add(lock); - // block.setOwner(this); return true; - // } - - - //getContentionmanager().resolveConflict(this, block); } return false; @@ -540,11 +510,11 @@ public class ExtendedTransaction implements TransactionStatu { long end; //synchronized(value.getOwnertransactionalFile().getCommitedoffset()){ - start = value.getRange().getStart() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber(); - end = value.getRange().getEnd() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber(); - if (value.getBelongingto().isUnknown_inital_offset_for_write()){ - value.getBelongingto().setLocaloffset(value.getBelongingto().getLocaloffset() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber()); - value.getBelongingto().setUnknown_inital_offset_for_write(false); + start = value.getRange().getStart() - value.getTFA().getCopylocaloffset() + value.getOwnerTF().getCommitedoffset().getOffsetnumber(); + end = value.getRange().getEnd() - value.getTFA().getCopylocaloffset() + value.getOwnerTF().getCommitedoffset().getOffsetnumber(); + if (value.getTFA().isUnknown_inital_offset_for_write()){ + value.getTFA().setLocaloffset(value.getTFA().getLocaloffset() - value.getTFA().getCopylocaloffset() + value.getOwnerTF().getCommitedoffset().getOffsetnumber()); + value.getTFA().setUnknown_inital_offset_for_write(false); } //} @@ -700,7 +670,7 @@ public class ExtendedTransaction implements TransactionStatu { // writeop.getOwnertransactionalFile().file.seek(writeop.getRange().getStart()); // System.out.println(Thread.currentThread() + " range " + writeop.getRange().getStart()); // writeop.getOwnertransactionalFile().file.write(bytedata); - invokeNativepwrite(bytedata, writeop.getRange().getStart(), bytedata.length, writeop.getOwnertransactionalFile().file); + invokeNativepwrite(bytedata, writeop.getRange().getStart(), bytedata.length, writeop.getOwnerTF().file); // System.out.println(Thread.currentThread() + " " + bytedata); // } catch (IOException ex) { diff --git a/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/TransactionalFile.java b/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/TransactionalFile.java index 7f47f0d7..435fa3d9 100644 --- a/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/TransactionalFile.java +++ b/Robust/Transactions/TransactionalIO/src/TransactionalIO/core/TransactionalFile.java @@ -13,7 +13,6 @@ import TransactionalIO.benchmarks.benchmark; import TransactionalIO.core.ExtendedTransaction.Status; import TransactionalIO.interfaces.BlockAccessModesEnum; import TransactionalIO.interfaces.OffsetDependency; -import com.sun.org.apache.bcel.internal.generic.IFEQ; import java.io.File; import java.io.FileDescriptor; import java.io.FileNotFoundException; @@ -53,8 +52,7 @@ public class TransactionalFile implements Comparable{ private INode inode; private int sequenceNum = 0; public static int currenSeqNumforInode = 0; - /* public AtomicLong commitedoffset; - public AtomicLong commitedfilesize;*/ + public boolean to_be_created = false; public boolean writemode = false; public boolean appendmode = false; @@ -90,10 +88,6 @@ public class TransactionalFile implements Comparable{ sequenceNum = inodestate.seqNum; inodestate.seqNum++; - - - - if (mode.equals("rw")) { writemode = true; @@ -104,13 +98,7 @@ public class TransactionalFile implements Comparable{ if (inodestate != null) { synchronized (inodestate) { try { - // if (!(to_be_created)) { - // } else { - // adapter.commitedfilesize.set(0); - // } - if (!appendmode) { - //commitedoffset.setOffsetnumber(0); committedoffset = new GlobalOffset(0); } else { committedoffset = new GlobalOffset(file.length()); @@ -138,8 +126,6 @@ public class TransactionalFile implements Comparable{ public int invokeNativepwrite(byte buff[], long offset, int size, RandomAccessFile file) { try { - //System.out.println(buff.length); - // System.out.println(offset); return nativepwrite(buff, offset, buff.length, file.getFD()); } catch (IOException ex) { @@ -165,60 +151,12 @@ public class TransactionalFile implements Comparable{ return inodestate; } - /* public TransactionalFile(Adapter adapter, RandomAccessFile file) { - - this.adapter = adapter; - this.file = file; - decriptors = new Vector(); - } - - public void copyTransactionalFile(TransactionalFile tf){ - try { - int tmp = tf.commitedoffset.get(); - boolean flag = tf.to_be_created; - FileDescriptor fd = tf.file.getFD(); - Adapter ad = new Adapter(tf.adapter); - } catch (IOException ex) { - Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); - } - }*/ - - /* public BlockDataStructure getBlockDataStructure(int blocknumber) { - synchronized (inodestate.lockmap) { - if (inodestate.lockmap.containsKey(blocknumber)) { - - return ((BlockDataStructure) (inodestate.lockmap.get(Long.valueOf(blocknumber)))); - } else { - - BlockDataStructure tmp = new BlockDataStructure(getInode(), blocknumber); - inodestate.lockmap.put(Integer.valueOf(blocknumber), tmp); - return tmp; - } - } - - }*/ - - public INode getInode() { return inode; } - /* public boolean deleteBlockLock(int blocknumber){ - synchronized(adapter.lockmap){ - //adapter.lockmap.get(blocknumber) - if (adapter.lockmap.containsKey(blocknumber)){ - if (((BlockLock)(adapter.lockmap.get(blocknumber))).referncount == 0){ - adapter.lockmap.remove(adapter.lockmap.get(blocknumber)); - return true; - } - else - return false; - } - else { - return false; - } - } - }*/ + + public void close() { try { file.close(); @@ -227,7 +165,7 @@ public class TransactionalFile implements Comparable{ } } - public long getFilePointer(){ + public long getFilePointer(){ ExtendedTransaction me = Wrapper.getTransaction(); TransactionLocalFileAttributes tmp = null; @@ -237,89 +175,41 @@ public class TransactionalFile implements Comparable{ } if (!(me.getGlobaltoLocalMappings().containsKey(this))){ - - //if (!(me.getFilesAccesses().containsKey(this.inode))) { - tmp = new TransactionLocalFileAttributes(0);/*, tf.getInodestate().commitedfilesize.get();*/ - - Vector dummy; - if (me.getAccessedFiles().containsKey(this.getInode())){ - dummy = (Vector) me.getAccessedFiles().get(this.getInode()); - } - else{ - dummy = new Vector(); - me.getAccessedFiles().put(this.getInode(), dummy); - } - - - - // this.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Unlocked the offset lock " + tf.offsetlock + " for file " + tf.getInode() + " form descriptor " + tf.getSequenceNum()); - dummy.add(this); - me.getGlobaltoLocalMappings().put(this, tmp); - me.merge_for_writes_done.put(this.getInode(), Boolean.TRUE); - - // me.addFile(this); - - //me.addFile(this); + me.addFile(this, 0); } tmp = (TransactionLocalFileAttributes) me.getGlobaltoLocalMappings().get(this); if ((tmp.getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_1) || (tmp.getOffsetdependency() == OffsetDependency.NO_ACCESS)){ tmp.setOffsetdependency(OffsetDependency.READ_DEPENDENCY); - //System.out.println("sad"); - //synchronized(this.committedoffset) + long target; - lockOffset(me); - //{ + lockOffset(me); if (!(this.committedoffset.getOffsetReaders().contains(me))){ this.committedoffset.getOffsetReaders().add(me); - /*synchronized(benchmark.lock){ - benchmark.msg += Thread.currentThread().getName() + " Added to Offset Readers for file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"; - }*/ - // me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Added to Offset Readers for file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"); - /* synchronized(benchmark.lock){ - System.out.println(Thread.currentThread().getName() + " Added to Offset Readers for file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"); - }*/ } - tmp.setLocaloffset(tmp.getLocaloffset() + this.committedoffset.getOffsetnumber() - tmp.getCopylocaloffset()); target = this.committedoffset.getOffsetnumber() - tmp.getCopylocaloffset(); - - offsetlock.unlock(); - //me.getHeldoffsetlocks().remove(offsetlock); + Iterator it; if ((me.getWriteBuffer().get(inode)) != null) - { - + { it = ((Vector) (me.getWriteBuffer().get(inode))).iterator(); while (it.hasNext()){ WriteOperations wrp = (WriteOperations) it.next(); - if (wrp.getBelongingto() == tmp && wrp.isUnknownoffset()) + if (wrp.getTFA() == tmp && wrp.isUnknownoffset()) wrp.setUnknownoffset(false); - /*wrp.getRange().setStart(wrp.getOwnertransactionalFile().committedoffset.getOffsetnumber() - wrp.getBelongingto().getCopylocaloffset() + wrp.getRange().getStart()); - wrp.getRange().setEnd(wrp.getOwnertransactionalFile().committedoffset.getOffsetnumber() - wrp.getBelongingto().getCopylocaloffset() + wrp.getRange().getEnd());*/ wrp.getRange().setStart(target + wrp.getRange().getStart()); wrp.getRange().setEnd(target + wrp.getRange().getEnd()); } - } - - - //} + } } tmp.setUnknown_inital_offset_for_write(false); - - /* synchronized(benchmark.lock){ - benchmark.msg += Thread.currentThread().getName() + " Read the offset value for the file " + this.inode +" from descriptor " + this.sequenceNum + "\n"; - } - me.msg += Thread.currentThread().getName() + " Read the offset value for the file " + this.inode +" from descriptor " + this.sequenceNum + "\n";*/ - /* synchronized(benchmark.lock){ - System.out.println("offset " + Thread.currentThread() + " " + tmp.getLocaloffset()); - }*/ return tmp.getLocaloffset(); } @@ -336,38 +226,11 @@ public class TransactionalFile implements Comparable{ } else { - // if (me.getStatus() != Status.ACTIVE) - // throw new AbortedException(); - TransactionLocalFileAttributes tmp = null; - //tf.offsetlock.lock(); - - //this.heldoffsetlocks.remove(tf.offsetlock); - //tf.offsetlock.unlock(); if (!(me.getGlobaltoLocalMappings().containsKey(this))){ - //if (!(me.getFilesAccesses().containsKey(this.inode))) { - tmp = new TransactionLocalFileAttributes(offset);/*, tf.getInodestate().commitedfilesize.get();*/ - - Vector dummy; - if (me.getAccessedFiles().containsKey(this.getInode())){ - dummy = (Vector) me.getAccessedFiles().get(this.getInode()); - } - else{ - dummy = new Vector(); - me.getAccessedFiles().put(this.getInode(), dummy); - } - - - - // this.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Unlocked the offset lock " + tf.offsetlock + " for file " + tf.getInode() + " form descriptor " + tf.getSequenceNum()); - dummy.add(this); - me.getGlobaltoLocalMappings().put(this, tmp); - me.merge_for_writes_done.put(this.getInode(), Boolean.TRUE); - - //me.addFile(this); + me.addFile(this, offset); } tmp = (TransactionLocalFileAttributes) me.getGlobaltoLocalMappings().get(this); - //tmp = ((TransactionLocalFileAttributes) (me.getFilesAccesses().get(this.getInode()))); if (tmp.getOffsetdependency() == OffsetDependency.NO_ACCESS) tmp.setOffsetdependency(OffsetDependency.NO_DEPENDENCY); @@ -376,97 +239,39 @@ public class TransactionalFile implements Comparable{ tmp.setOffsetdependency(OffsetDependency.WRITE_DEPENDENCY_2); tmp.setUnknown_inital_offset_for_write(false); - tmp.setLocaloffset(offset); - - /* synchronized(benchmark.lock){ - System.out.println(tmp.getLocaloffset()); - }*/ - // me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Seeked to the file" + this.inode +" from descriptor " + this.sequenceNum + "\n"); } } public int read(byte[] b) { - - if (appendmode) { - throw new PanicException("Cannot seek into a file opened in append mode"); - } - - boolean firsttime = false; + ExtendedTransaction me = Wrapper.getTransaction(); int size = b.length; int result = 0; - - - + if (me == null) { // not a transaction, but any I/O operation even though within a non-transaction is considered a single opertion transactiion return non_Transactional_Read(b); } - - //if (me.getStatus() != Status.ACTIVE) - // throw new AbortedException(); - + - if (me.getGlobaltoLocalMappings().containsKey(this)){ - - /*long target; - Vector locktracker = new Vector(); - TreeMap hm = me.getSortedFileAccessMap(me.getAccessedFiles());; - Vector vec = (Vector)hm.get(inode); - Iterator vecit = vec.iterator(); - while(vecit.hasNext()){ - TransactionalFile tr = (TransactionalFile)vecit.next(); - TransactionLocalFileAttributes tmp = (TransactionLocalFileAttributes) me.getGlobaltoLocalMappings().get(tr); - - if ((tmp.getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_1) || (tmp.offsetdependency == OffsetDependency.NO_ACCESS) || (tmp.getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_2)){ - tmp.setUnknown_inital_offset_for_write(false); - tmp.setOffsetdependency(OffsetDependency.READ_DEPENDENCY); - tr.lockOffset(me); - System.out.printtln(Thread.currentThread() + " kiri"); - if (!(tr.committedoffset.getOffsetReaders().contains(me))){ - tr.committedoffset.getOffsetReaders().add(me); - target = this.committedoffset.getOffsetnumber() - tmp.getCopylocaloffset(); - me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Added to Offset Readers for file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"); - } - } - }*/ - - - - + if (me.getGlobaltoLocalMappings().containsKey(this)){ + TransactionLocalFileAttributes tmp = (TransactionLocalFileAttributes) me.getGlobaltoLocalMappings().get(this); tmp.setUnknown_inital_offset_for_write(false); + + //make this transaction read dependent on this descriptor if it is not so already if ((tmp.getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_1) || (tmp.offsetdependency == OffsetDependency.NO_ACCESS) || (tmp.getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_2)){ - //System.out.println(Thread.currentThread() + " here"); - //synchronized(this.committedoffset){ + lockOffset(me); - if (tmp.getOffsetdependency() != OffsetDependency.WRITE_DEPENDENCY_2){ - tmp.setLocaloffset(tmp.getLocaloffset() + this.committedoffset.getOffsetnumber() - tmp.getCopylocaloffset()); - } - - tmp.setOffsetdependency(OffsetDependency.READ_DEPENDENCY); - if (!(this.committedoffset.getOffsetReaders().contains(me))){ - this.committedoffset.getOffsetReaders().add(me); - /* synchronized(benchmark.lock){ - System.out.println("adding offset " + committedoffset + " " +Thread.currentThread()); - }*/ - /* synchronized(benchmark.lock){ - benchmark.msg += Thread.currentThread().getName() + " Added to Offset Readers for file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"; - }*/ - // me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Added to Offset Readers for file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"); - /*synchronized(benchmark.lock){ - System.out.println(Thread.currentThread().getName() + " Added to Offset Readers for file " + this.inode + " from descriptor "+ this.sequenceNum); - }*/ - } - + makeDependentonOffet(me, tmp); offsetlock.unlock(); - // me.getHeldoffsetlocks().remove(offsetlock); - //} } + + // make all write operations by any transaction to this offset absolute and those transaction read + //dependent on the offset Iterator it; if (me.getWriteBuffer().get(inode) != null) - //if (!(((Vector)(me.getWriteBuffer().get(inode))).isEmpty())) { it = ((Vector) (me.getWriteBuffer().get(inode))).iterator(); while (it.hasNext()){ @@ -474,55 +279,34 @@ public class TransactionalFile implements Comparable{ WriteOperations wrp = (WriteOperations) it.next(); if (wrp.isUnknownoffset()){ wrp.setUnknownoffset(false); - //synchronized(wrp.getOwnertransactionalFile().committedoffset){ - wrp.getOwnertransactionalFile().lockOffset(me); - - wrp.getRange().setStart(wrp.getOwnertransactionalFile().committedoffset.getOffsetnumber() - wrp.getBelongingto().getCopylocaloffset() + wrp.getRange().getStart()); - wrp.getRange().setEnd(wrp.getOwnertransactionalFile().committedoffset.getOffsetnumber() - wrp.getBelongingto().getCopylocaloffset() + wrp.getRange().getEnd()); - if ((wrp.getBelongingto().getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_1) || (wrp.getBelongingto().offsetdependency == OffsetDependency.NO_ACCESS) || (wrp.getBelongingto().getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_2)){ - wrp.getBelongingto().setOffsetdependency(OffsetDependency.READ_DEPENDENCY); - wrp.getBelongingto().setUnknown_inital_offset_for_write(false); - if (!(wrp.getOwnertransactionalFile().committedoffset.getOffsetReaders().contains(me))) - wrp.getOwnertransactionalFile().committedoffset.getOffsetReaders().add(me); - wrp.getBelongingto().setLocaloffset(wrp.getBelongingto().getLocaloffset() + wrp.getOwnertransactionalFile().committedoffset.getOffsetnumber() - wrp.getBelongingto().getCopylocaloffset()); - /* synchronized(benchmark.lock){ - benchmark.msg += Thread.currentThread().getName() + " Added to Offset Readers for file " + this.inode + " from descriptor "+ wrp.getOwnertransactionalFile().sequenceNum +"\n"; - }*/ - // me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Added to Offset Readers for file " + this.inode + " from descriptor "+ wrp.getOwnertransactionalFile().sequenceNum +"\n"); - } - - // me.getHeldoffsetlocks().remove(wrp.getOwnertransactionalFile().offsetlock); - wrp.getOwnertransactionalFile().offsetlock.unlock(); - - //} - + wrp.getOwnerTF().lockOffset(me); + makeWriteAbsolute(me, wrp); + wrp.getOwnerTF().offsetlock.unlock(); markAccessedBlocks(me, (int)wrp.getRange().getStart(), (int)(wrp.getRange().getEnd() - wrp.getRange().getStart()), BlockAccessModesEnum.WRITE); -// markWriteBlocks((int)wrp.getRange().getStart(), (int)(wrp.getRange().getEnd() - wrp.getRange().getStart())); + } } } /* if (!(me.isWritesmerged())){ - // synchronized(benchmark.lock){ - // System.out.println("ssssad " + Thread.currentThread() + " " + me.getWriteBuffer()); - // } mergeWrittenData(); }*/ - // System.out.println("ssssad " + Thread.currentThread() + " " + me.getWriteBuffer()); + + + // merge the write by this transation to this descriptor before start reading from it if ((Boolean)me.merge_for_writes_done.get(inode) == Boolean.FALSE){ - // synchronized(benchmark.lock){ - System.out.println("ssssad " + Thread.currentThread() + " " + me.getWriteBuffer()); mergeWrittenData(me); - //} } + + // find the intersections of the data o be read with the + // transactions local buffer if any at all long loffset = tmp.getLocaloffset(); markAccessedBlocks(me, loffset, size, BlockAccessModesEnum.READ); - Vector writebuffer; if ((me.getWriteBuffer().get(this.inode)) != null) writebuffer = (Vector) (me.getWriteBuffer().get(this.inode)); @@ -535,13 +319,9 @@ public class TransactionalFile implements Comparable{ Range[] intersectedrange = new Range[writebuffer.size()]; WriteOperations[] markedwriteop = new WriteOperations[writebuffer.size()]; - int counter = 0; - - - - - boolean flag = false; - //System.out.println("yani che>??"); + int number_of_intersections = 0; + boolean data_in_local_buffer = false; + it = writebuffer.iterator(); while (it.hasNext()) { @@ -549,51 +329,40 @@ public class TransactionalFile implements Comparable{ WriteOperations wrp = (WriteOperations) it.next(); writerange = wrp.getRange(); if (writerange.includes(readrange)) { - markedwriteop[counter] = wrp; - flag = true; + markedwriteop[number_of_intersections] = wrp; + data_in_local_buffer = true; break; } if (writerange.hasIntersection(readrange)) { - intersectedrange[counter] = readrange.intersection(writerange); - markedwriteop[counter] = wrp; + intersectedrange[number_of_intersections] = readrange.intersection(writerange); + markedwriteop[number_of_intersections] = wrp; - counter++; + number_of_intersections++; } } - // for block versioning mechanism - /*if (!(validateBlocksVersions(startblock, targetblock))) { ////check to see if version are still valid - - throw new AbortedException(); - - }*/ - if (flag) { - - result = readFromBuffer(b, tmp, markedwriteop[counter],writerange); - - /* synchronized(benchmark.lock){ - benchmark.msg += Thread.currentThread().getName() + " Read " + this.inode + " from descriptor "+ this.sequenceNum +"\n"; - }*/ - - // me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Read " + this.inode + " from descriptor "+ this.sequenceNum +"\n"); + + if (data_in_local_buffer) { + // the to be read offset is written previously by the transaction itself + // so the read is done from localbuffer + result = readFromBuffer(b, tmp, markedwriteop[number_of_intersections],writerange); return result; } else{ - if (counter == 0) { - /* synchronized(benchmark.lock){ - System.out.println("here" +Thread.currentThread()); - }*/ - - // lockOffset(me); + if (number_of_intersections == 0) { + // the whole range to be read should be donefrom the file itself, result = readFromFile(me, b, tmp); } + + else { - - for (int i = 0; i < counter; i++) { + //some of the parts to read are in local buffer some should be done + //from the file + for (int i = 0; i < number_of_intersections; i++) { Byte[] data = markedwriteop[i].getData(); @@ -606,9 +375,8 @@ public class TransactionalFile implements Comparable{ result += Math.min(intersectedrange[i].getEnd(), readrange.getEnd()) - intersectedrange[i].getStart(); } - Range[] non_intersected_ranges = readrange.minus(intersectedrange, counter); + Range[] non_intersected_ranges = readrange.minus(intersectedrange, number_of_intersections); Vector occupiedblocks = new Vector(); - Vector heldlocks = new Vector(); for (int i = 0; i < non_intersected_ranges.length; i++) { int st = FileBlockManager.getCurrentFragmentIndexofTheFile(non_intersected_ranges[i].getStart()); int en = FileBlockManager.getCurrentFragmentIndexofTheFile(non_intersected_ranges[i].getEnd()); @@ -630,100 +398,40 @@ public class TransactionalFile implements Comparable{ while (me.getStatus() == Status.ACTIVE) { BlockDataStructure block = this.inodestate.getBlockDataStructure((Integer)(occupiedblocks.get(k)));//(BlockDataStructure) tmp.adapter.lockmap.get(Integer.valueOf(k))); - - //synchronized(block){ - - // if (block.getLock().readLock().tryLock()) { block.getLock().readLock().lock(); - /* synchronized(benchmark.lock){ - benchmark.msg += Thread.currentThread().getName() + " Locked The Block Number " + block.getBlocknumber()+ " for file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"; - }*/ - // me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Added The Block Number " + block.getBlocknumber()+ " for file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"); - //synchronized (block){ if (!(block.getReaders().contains(me))){ - /* synchronized(benchmark.lock){ - benchmark.msg += Thread.currentThread().getName() + " Added to Block Readers for Block Number " + block.getBlocknumber()+ " for file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"; - }*/ - // me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Added to Block Readers for Block Number " + block.getBlocknumber()+ " for file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"); block.getReaders().add(me); } me.getHeldblocklocks().add(block.getLock().readLock()); - //heldlocks.add(block.getLock().readLock()); - //} break; - // } - //} else { - // me.getContentionmanager().resolveConflict(me, block); - // } - //} } if (me.getStatus() == Status.ABORTED) { - //unlockLocks(heldlocks); - //offsetlock.unlock(); - // me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Aborted in locking blocks in read\n"); - /* synchronized(benchmark.lock){ - benchmark.msg += Thread.currentThread().getName() + " Aborted \n"; - }*/ - // Thread.currentThread().stop(); - throw new AbortedException(); - } - } - /* - int expvalue = ((Integer) tmp.getBlockversions().get(Integer.valueOf(k))).intValue(); - while (me.getStatus() == Status.ACTIVE) { - BlockDataStructure block = ((BlockDataStructure) tmp.adapter.lockmap.get(Integer.valueOf(k))); - if (block.getLock().tryLock()) { - heldlocks.add(block.getLock()); - if (!(block.getVersion().get() == expvalue)) { // for block versioning mechanism - me.abort(); - } - else { - break; - } - } - else { - me.getContentionManager().resolveConflict(me, block.getOwner()); - } - } - if (me.getStatus() == Status.ABORTED) { - unlockLocks(heldlocks); - offsetlock.unlock(); throw new AbortedException(); } } - }*/ + for (int i = 0; i < non_intersected_ranges.length; i++) { try { - synchronized(benchmark.lock){ - System.out.println("read start " + non_intersected_ranges[i].getStart()); - } + //invokeNativepread(b, non_intersected_ranges[i].getStart(), size); file.seek(non_intersected_ranges[i].getStart()); int tmpsize = file.read(b, (int) (non_intersected_ranges[i].getStart() - readrange.getStart()), (int) (non_intersected_ranges[i].getEnd() - non_intersected_ranges[i].getStart())); result += tmpsize; } catch (IOException ex) { - - //unlockLocks(heldlocks); - //offsetlock.unlock(); + Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); } } me.unlockAllLocks(); tmp.setLocaloffset(tmp.getLocaloffset() + result); - // unlockLocks(heldlocks); - // offsetlock.unlock(); } - /*synchronized(benchmark.lock){ - benchmark.msg += Thread.currentThread().getName() + " Read from file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"; - }*/ - // me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Read from file " + this.inode + " from descriptor "+ this.sequenceNum +"\n"); + return result; } } else { // add to the readers list - System.out.println("form read???"); - me.addFile(this); + me.addFile(this, 0); return read(b); } @@ -741,56 +449,27 @@ public class TransactionalFile implements Comparable{ if (me == null) // not a transaction - { - + { non_Transactional_Write(data); return; } - //else if (me.getFilesAccesses().containsKey(this.getInode())) // - //{ - // if (me.getStatus() != Status.ACTIVE) - // throw new AbortedException(); - - if (me.getGlobaltoLocalMappings().containsKey(this)) // + if (me.getGlobaltoLocalMappings().containsKey(this)) { - - + Byte[] by = new Byte[size]; for (int i = 0; i < size; i++) { by[i] = Byte.valueOf(data[i]); } TransactionLocalFileAttributes tmp = ((TransactionLocalFileAttributes) (me.getGlobaltoLocalMappings().get(this))); - /*if (appendmode) { - newwriterange = new Range((((Range) (tm.firstKey())).getStart()), (((Range) (tm.firstKey())).getEnd()) + size); - Range range = new Range((((Range) (tm.firstKey())).getStart()), (((Range) (tm.firstKey())).getEnd())); - Byte[] appenddata = new Byte[(int) (newwriterange.getEnd() - newwriterange.getStart())]; - Byte[] tempor = new Byte[(int) (range.getEnd() - range.getStart())]; - System.arraycopy(tempor, 0, appenddata, 0, tempor.length); - System.arraycopy(by, 0, appenddata, tempor.length, by.length); - tm.remove(range); - tm.put(newwriterange, appenddata); - tmp.setLocaloffset(loffset + size); - tmp.setFilelength(tmp.getFilelength() + size); - - return; - }*/ Vector dummy; if (((Vector)(me.getWriteBuffer().get(this.inode))) != null){ dummy = new Vector((Vector)(me.getWriteBuffer().get(this.inode))); } else dummy = new Vector(); - /* synchronized(benchmark.lock){ - System.out.println(Thread.currentThread() + " gg " + tmp.getLocaloffset() + " " + (tmp.getLocaloffset()+by.length)); - }*/ - /*if (!(tmp.isUnknown_inital_offset_for_write())){ - lockOffset(me); - tmp.setLocaloffset(this.committedoffset.getOffsetnumber()); - offsetlock.unlock(); - }*/ - + dummy.add(new WriteOperations(by, new Range(tmp.getLocaloffset(), tmp.getLocaloffset() + by.length), tmp.isUnknown_inital_offset_for_write(), this, tmp)); me.getWriteBuffer().put(this.inode, dummy); @@ -800,131 +479,17 @@ public class TransactionalFile implements Comparable{ tmp.setLocaloffset(tmp.getLocaloffset() + by.length); me.merge_for_writes_done.put(inode, Boolean.FALSE); - //me.setWritesmerged(false); - - - if (!(tmp.isUnknown_inital_offset_for_write())){ markAccessedBlocks(me, loffset, size, BlockAccessModesEnum.WRITE); -// markWriteBlocks(loffset, size); } - /*{ - int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(loffset); - int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(loffset, size); - for (int i = startblock; i <= targetblock; i++) { - if (me.getAccessedBlocks().containsKey(Integer.valueOf(i))) { - if (((BlockAccessModesEnum) (me.getAccessedBlocks().get(Integer.valueOf(i)))) == BlockAccessModesEnum.READ) { - me.getAccessedBlocks().put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE); - } - } else { - me.getAccessedBlocks().put(Integer.valueOf(i), BlockAccessModesEnum.WRITE); - // tmp.getBlockversions().put(Integer.valueOf(i), Integer.valueOf(getBlockDataStructure(i).getVersion().get())); //For Block Versioning Mechanism - } - } - }*/ - + if (tmp.getOffsetdependency() == OffsetDependency.NO_ACCESS) tmp.offsetdependency = OffsetDependency.WRITE_DEPENDENCY_1; - - - - /*if ((tmp.access_from_absolute_offset) || !(tmp.relocatablewrite)) - { - int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(loffset); - int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(loffset, size); - for (int i = startblock; i <= targetblock; i++) { - if (tmp.getAccesedblocks().containsKey(Integer.valueOf(i))) { - if (((BlockAccessModesEnum) (tmp.getAccesedblocks().get(Integer.valueOf(i)))) == BlockAccessModesEnum.READ) { - tmp.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE); - } - } else { - tmp.getAccesedblocks().put(Integer.valueOf(i), BlockAccessModesEnum.WRITE); - // tmp.getBlockversions().put(Integer.valueOf(i), Integer.valueOf(getBlockDataStructure(i).getVersion().get())); //For Block Versioning Mechanism - } - } - } - - if (tmp.access_from_absolute_offset){ - - mergeWrittenData(tmp.getNon_Speculative_Writtendata(), data, newwriterange); - tmp.setLocaloffset(loffset + size); - if (tmp.getLocaloffset() > tmp.getFilelength()) { - tmp.setFilelength(tmp.getLocaloffset()); - } - return; - } - - else // for comtimious determingin the accessed block is postpond till commit instant - { - Byte[] dd = new Byte[size]; - System.arraycopy(by, 0, dd, 0, size); - if (!(tmp.getSpeculative_Writtendata().isEmpty())){ - - Range lastrange = (Range) tmp.getSpeculative_Writtendata().lastKey(); - if (lastrange.getEnd() == newwriterange.getStart()){ - dd = new Byte[(int)(size + lastrange.getEnd() - lastrange.getStart())]; - System.arraycopy(((Byte[])tmp.getSpeculative_Writtendata().get(lastrange)), 0, dd, 0, (int)(lastrange.getEnd() - lastrange.getStart())); - System.arraycopy(by, 0, dd, (int)(lastrange.getEnd() - lastrange.getStart()), size); - newwriterange = new Range(lastrange.getStart(), size + lastrange.getEnd()); - tmp.getSpeculative_Writtendata().remove(lastrange); - } - - } - - tmp.getSpeculative_Writtendata().put(newwriterange, dd); - - tmp.setLocaloffset(loffset + size); - if (tmp.getLocaloffset() > tmp.getFilelength()) { - tmp.setFilelength(tmp.getLocaloffset()); - } - - return; - } - - */ - - /* - - if (tmp.accessmode == TransactionLocalFileAttributes.MODE.READ) - tmp.accessmode = TransactionLocalFileAttributes.MODE.READ_WRITE; - else if (tmp.accessmode == TransactionLocalFileAttributes.MODE.WRITE) - simpleWritetoBuffer(by, newwriterange, tm); - */ - // me.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Writes to " + this.inode + " from descriptor "+ this.sequenceNum +"\n"); } else { - // System.out.println("form write??"); - // me.addFile(this/*, TransactionLocalFileAttributes.MODE.WRITE*/); - //if (!(me.getGlobaltoLocalMappings().containsKey(this))){ - //if (!(me.getFilesAccesses().containsKey(this.inode))) { - TransactionLocalFileAttributes tmp = new TransactionLocalFileAttributes(0);/*, tf.getInodestate().commitedfilesize.get();*/ - - Vector dummy; - if (me.getAccessedFiles().containsKey(this.getInode())){ - dummy = (Vector) me.getAccessedFiles().get(this.getInode()); - } - else{ - dummy = new Vector(); - me.getAccessedFiles().put(this.getInode(), dummy); - } - - - - // this.msg.put(System.nanoTime(), Thread.currentThread().getName() + " Unlocked the offset lock " + tf.offsetlock + " for file " + tf.getInode() + " form descriptor " + tf.getSequenceNum()); - dummy.add(this); - me.getGlobaltoLocalMappings().put(this, tmp); - me.merge_for_writes_done.put(this.getInode(), Boolean.TRUE); - - //me.addFile(this); - //} - + me.addFile(this, 0); write(data); } - /* synchronized(benchmark.lock){ - benchmark.msg += Thread.currentThread().getName() + " Writes to " + this.inode + " from descriptor "+ this.sequenceNum +"\n"; - }*/ - - } @@ -950,95 +515,19 @@ public class TransactionalFile implements Comparable{ map.put(Integer.valueOf(i), mode); } } - /*int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(loffset); - int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(loffset, size); - for (int i = startblock; i <= targetblock; i++) { - if (me.getAccessedBlocks().containsKey(Integer.valueOf(i))) { - if (((BlockAccessModesEnum) (me.getAccessedBlocks().get(Integer.valueOf(i)))) == BlockAccessModesEnum.READ) { - me.getAccessedBlocks().put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE); - } - - - } else { - me.getAccessedBlocks().put(Integer.valueOf(i), BlockAccessModesEnum.WRITE); - // tmp.getBlockversions().put(Integer.valueOf(i), Integer.valueOf(getBlockDataStructure(i).getVersion().get())); //For Block Versioning Mechanism - } - }*/ + } -/* private void markWriteBlocks(long loffset, int size){ - ExtendedTransaction me = CustomThread.getTransaction(); - SortedSet tt; - if (me.writeBlocks.get(this.inode) != null) - tt = (SortedSet) me.writeBlocks.get(this.inode); - else { - tt = new TreeSet(); - me.writeBlocks.put(this.inode, tt); - } - int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(loffset); - int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(loffset, size); - for (int i = startblock; i <= targetblock; i++) { - tt.add(Integer.valueOf(i)); - } - }*/ - + // reads the data directly from file, private int readFromFile(ExtendedTransaction me, byte[] readdata, TransactionLocalFileAttributes tmp) { - - - //ExtendedTransaction me = Wrapper.getTransaction(); int st = FileBlockManager.getCurrentFragmentIndexofTheFile(tmp.getLocaloffset()); int end = FileBlockManager.getTargetFragmentIndexofTheFile(tmp.getLocaloffset(), readdata.length); - BlockDataStructure block = null; - boolean locked = false; + BlockDataStructure block = null; + boolean locked = false; for (int k = st; k <= end; k++) { - // int expvalue = ((Integer) tmp.getBlockversions().get(Integer.valueOf(k))).intValue(); // all comments here for versioning mechanism - while (me.getStatus() == Status.ACTIVE) { - //BlockDataStructure block = ((BlockDataStructure) this.inodestate.lockmap.get(Integer.valueOf(k))); - // BlockDataStructure block; - //if (me.getAccessedBlocks().get(inode)) - block = this.inodestate.getBlockDataStructure(Integer.valueOf(k)); - - block.getLock().readLock().lock(); - // me.getHeldblocklocks().add(block.getLock().readLock()); - if (!(block.getReaders().contains(me))){ - block.getReaders().add(me); - } - locked = true; - // if (!(block.getVersion().get() == expvalue)) { - // me.abort(); - // } else { - break; - // } - // } - /* else { - me.getContentionmanager().resolveConflict(me, block); - }*/ - - } - if (me.getStatus() == Status.ABORTED) { - int m; - if (locked){ - m = k+1; - } - else - m = k; - for (int i=st; i wrp"); - endsuffix = wrp2.getRange().getEnd(); - //suffixstart = (int) (intersectedrange[i].getEnd() - intersectedrange[i].getStart()); } else if (wrp.getRange().getEnd() > wrp2.getRange().getEnd()) { @@ -1298,17 +685,10 @@ public class TransactionalFile implements Comparable{ suffixdata = (Byte[]) (wrp.getData()); startsuffix = intersectedrange.getEnd() - wrp.getRange().getStart(); suffixsize = (int)(wrp.getRange().getEnd() - intersectedrange.getEnd()); - // System.out.println("wrp2 < wrp"); endsuffix = wrp.getRange().getEnd(); suffix = true; } - /* System.out.println("prefix start:" + startprefix); - System.out.println("suffix end:" + endsuffix); - System.out.println("suffix start:" + startsuffix); - System.out.println("intermediate start:" + intermediatetart); - - System.out.println("suffix size:" + suffixsize);*/ Byte[] data_to_insert; @@ -1332,115 +712,7 @@ public class TransactionalFile implements Comparable{ toberemoved.clear(); Collections.sort(vec); me.merge_for_writes_done.put(inode, Boolean.TRUE); - //me.setWritesmerged(true); - - /*} else if (prefix) { - data_to_insert = new Byte[(int) (to_be_merged_data_range.getStart() - start + size)]; - System.arraycopy(prefixdata, 0, data_to_insert, 0, (int) (to_be_merged_data_range.getStart() - start)); - System.arraycopy(by, 0, data_to_insert, (int) (to_be_merged_data_range.getStart() - start), size); - to_be_merged_data_range.setStart(start); - } else if (suffix) { - data_to_insert = new Byte[(int) (to_be_merged_data_range.getEnd() - end + size)]; - System.arraycopy(by, 0, data_to_insert, 0, size); - System.arraycopy(suffixdata, suffixstart, data_to_insert, size, (int) (end - to_be_merged_data_range.getEnd())); - to_be_merged_data_range.setEnd(end); - } else { - data_to_insert = new Byte[size]; - System.arraycopy(data_to_insert, (int) (to_be_merged_data_range.getStart() - start), by, 0, size); - }*/ - //target.put(to_be_merged_data_range, data_to_insert); - -/* - if (flag) { - int datasize = (int) (oldwriterange.getEnd() - oldwriterange.getStart()); - Byte[] original = (Byte[]) (wrp.getData()); - byte[] originaldata = new byte[datasize]; - - //for (int i = 0; i < data.length; i++) { - // originaldata[i] = original[i].byteValue(); - // } - System.arraycopy(data, 0, originaldata, (int) (to_be_merged_data_range.getStart() - oldwriterange.getStart()), size); - Byte[] to_be_inserted = new Byte[datasize]; - - for (int i = 0; i < datasize; i++) { - to_be_inserted[i] = Byte.valueOf(originaldata[i]); - } - target.put(oldwriterange, to_be_inserted); - return; - - } else if (counter == 0) { - target.put(to_be_merged_data_range, data); - return; - - } else { - - int suffixstart = 0; - long start = 0; - long end = 0; - Byte[] prefixdata = null; - Byte[] suffixdata = null; - boolean prefix = false; - boolean suffix = false; - - for (int i = 0; i < counter; i++) { - if (markedwriteranges[i].getStart() < to_be_merged_data_range.getStart()) { - prefixdata = new Byte[(int) (to_be_merged_data_range.getStart() - markedwriteranges[i].getStart())]; - prefixdata = (Byte[]) (target.get(markedwriteranges[i])); - start = markedwriteranges[i].getStart(); - //newdata = new Byte[size + newwriterange.getStart() - markedwriteranges[i].getStart()]; - //System.arraycopy(by, 0, newdata, newwriterange.getStart() - markedwriteranges[i].getStart(), size); - //System.arraycopy(originaldata, 0, newdata, 0, newwriterange.getStart() - markedwriteranges[i].getStart()); - - //newwriterange.setStart(markedwriteranges[i].getStart()); - prefix = true; - - - } else if (markedwriteranges[i].getEnd() > to_be_merged_data_range.getEnd()) { - - suffixdata = new Byte[(int) (markedwriteranges[i].getStart() - to_be_merged_data_range.getStart())]; - suffixdata = (Byte[]) (target.get(markedwriteranges[i])); - end = markedwriteranges[i].getEnd(); - - //Byte [] originaldata = (Byte [])(tmp.getWrittendata().get(markedwriteranges[i])); - //newdata = new Byte[size + newwriterange.getStart() - markedwriteranges[i].getStart()]; - //System.arraycopy(originaldata, 0, newdata, 0, newwriterange.getStart() - markedwriteranges[i].getStart()); - - //newwriterange.setStart(markedwriteranges[i].getStart()); - ///newwriterange.setEnd(markedwriteranges[i].getEnd()); - suffix = true; - suffixstart = (int) (intersectedrange[i].getEnd() - intersectedrange[i].getStart()); - //tm.remove(markedwriteranges[i]); - } - target.remove(markedwriteranges[i]); - - } - Byte[] data_to_insert; - - if ((prefix) && (suffix)) { - data_to_insert = new Byte[(int) (to_be_merged_data_range.getStart() - start + size - to_be_merged_data_range.getEnd() + end)]; - System.arraycopy(prefixdata, 0, data_to_insert, 0, (int) (to_be_merged_data_range.getStart() - start)); - System.arraycopy(by, 0, data_to_insert, (int) (to_be_merged_data_range.getStart() - start), size); - System.arraycopy(suffixdata, suffixstart, data_to_insert, (int) (size + to_be_merged_data_range.getStart() - start), (int) (end - to_be_merged_data_range.getEnd())); - to_be_merged_data_range.setStart(start); - to_be_merged_data_range.setEnd(end); - } else if (prefix) { - data_to_insert = new Byte[(int) (to_be_merged_data_range.getStart() - start + size)]; - System.arraycopy(prefixdata, 0, data_to_insert, 0, (int) (to_be_merged_data_range.getStart() - start)); - System.arraycopy(by, 0, data_to_insert, (int) (to_be_merged_data_range.getStart() - start), size); - to_be_merged_data_range.setStart(start); - } else if (suffix) { - data_to_insert = new Byte[(int) (to_be_merged_data_range.getEnd() - end + size)]; - System.arraycopy(by, 0, data_to_insert, 0, size); - System.arraycopy(suffixdata, suffixstart, data_to_insert, size, (int) (end - to_be_merged_data_range.getEnd())); - to_be_merged_data_range.setEnd(end); - } else { - data_to_insert = new Byte[size]; - System.arraycopy(data_to_insert, (int) (to_be_merged_data_range.getStart() - start), by, 0, size); - } - target.put(to_be_merged_data_range, data_to_insert); - return; - }*/ - + } public void non_Transactional_Write(byte[] data){ @@ -1452,39 +724,27 @@ public class TransactionalFile implements Comparable{ int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(committedoffset.getOffsetnumber(), data.length); for (int i = startblock; i <= targetblock; i++) { BlockDataStructure block =this.inodestate.getBlockDataStructure(i); - //if (block.getLock().writeLock().tryLock()) { block.getLock().writeLock().lock(); heldlocks.add(block.getLock().writeLock()); - //} else { - // unlockLocks(heldlocks); - // offsetlock.unlock(); - // flag = false; - // break; - //} } - - /*if (flag) { - throw new PanicException("The I/O operation could not be done to contention for the file"); - }*/ - - //else { - try { - file.seek(committedoffset.getOffsetnumber()); - file.write(data); - - committedoffset.setOffsetnumber(committedoffset.getOffsetnumber() +data.length); - - } catch (IOException ex) { - Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); - } finally { - unlockLocks(heldlocks); - offsetlock.unlock(); - } - //} + try { + + file.seek(committedoffset.getOffsetnumber()); + file.write(data); + + committedoffset.setOffsetnumber(committedoffset.getOffsetnumber() +data.length); + + } catch (IOException ex) { + Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); + } finally { + unlockLocks(heldlocks); + offsetlock.unlock(); + } } public int non_Transactional_Read(byte[] b){ + int size = -1; Vector heldlocks = new Vector(); boolean flag = true; @@ -1493,101 +753,46 @@ public class TransactionalFile implements Comparable{ int targetblock; startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(committedoffset.getOffsetnumber()); targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(committedoffset.getOffsetnumber(), size); - /* long offset = committedoffset.getOffsetnumber(); - committedoffset.setOffsetnumber(committedoffset.getOffsetnumber() +b.length); + + for (int i = startblock; i <= targetblock; i++) { + BlockDataStructure block = this.inodestate.getBlockDataStructure(i); + block.getLock().readLock().lock(); + heldlocks.add(block.getLock().readLock()); + + } + + size = invokeNativepread(b, committedoffset.getOffsetnumber(), b.length); + + committedoffset.setOffsetnumber(committedoffset.getOffsetnumber() +size); if (!(committedoffset.getOffsetReaders().isEmpty())){ Iterator it2 = committedoffset.getOffsetReaders().iterator(); // for visible readers strategy while ( it2.hasNext()) { ExtendedTransaction tr = (ExtendedTransaction) it2.next(); tr.abort(); - } - committedoffset.getOffsetReaders().clear(); - } - offsetlock.unlock();*/ - - for (int i = startblock; i <= targetblock; i++) { - BlockDataStructure block = this.inodestate.getBlockDataStructure(i); - //if (block.getLock().readLock().tryLock()) { - block.getLock().readLock().lock(); - heldlocks.add(block.getLock().readLock()); - /*} else { - unlockLocks(heldlocks); - offsetlock.unlock(); - flag = false; - break; - }*/ + } + committedoffset.getOffsetReaders().clear(); } - /*if (flag) { + + + unlockLocks(heldlocks); + offsetlock.unlock(); + if (size == 0) size = -1; - } else {*/ - - - - // try { - //ByteBuffer buffer = ByteBuffer.wrap(b); - //System.out.println(committedoffset.getOffsetnumber()); - //size = file.getChannel().read(buffer, offset); - //file.seek(committedoffset.getOffsetnumber()); - // size = file.read(b); - size = invokeNativepread(b, committedoffset.getOffsetnumber(), b.length); - - - committedoffset.setOffsetnumber(committedoffset.getOffsetnumber() +size); - if (!(committedoffset.getOffsetReaders().isEmpty())){ - Iterator it2 = committedoffset.getOffsetReaders().iterator(); // for visible readers strategy - while ( it2.hasNext()) - { - ExtendedTransaction tr = (ExtendedTransaction) it2.next(); - tr.abort(); - } - committedoffset.getOffsetReaders().clear(); - } - - // } catch (IOException ex) { - // Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); - // size = -1; - // } finally { - unlockLocks(heldlocks); - offsetlock.unlock(); - if (size == 0) - size = -1; - // } - // } return size; - } public void non_Transactional_Seek(long offset){ offsetlock.lock(); - //try { - //file.seek(offset); - //synchronized(adapter){ committedoffset.setOffsetnumber(offset); - // inodestate.commitedfilesize.set(offset); - //} - //} catch (IOException ex) { - /// Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); - //} finally { - offsetlock.unlock(); - //} + offsetlock.unlock(); } public long non_Transactional_getFilePointer(){ long offset = -1;; offsetlock.lock(); - - - // try { - - //synchronized(adapter){ offset = committedoffset.getOffsetnumber(); - //} - // } catch (IOException ex) { - // Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex); - // } finally { - offsetlock.unlock(); - // } + offsetlock.unlock(); return offset; } @@ -1604,5 +809,112 @@ public class TransactionalFile implements Comparable{ return 1; } } + + public void makeDependentonOffet(ExtendedTransaction me, TransactionLocalFileAttributes tmp){ + + if (tmp.getOffsetdependency() != OffsetDependency.WRITE_DEPENDENCY_2){ + tmp.setLocaloffset(tmp.getLocaloffset() + this.committedoffset.getOffsetnumber() - tmp.getCopylocaloffset()); + } + + tmp.setOffsetdependency(OffsetDependency.READ_DEPENDENCY); + if (!(this.committedoffset.getOffsetReaders().contains(me))){ + this.committedoffset.getOffsetReaders().add(me); + + } + } + + public void makeWriteAbsolute(ExtendedTransaction me, WriteOperations wrp){ + wrp.getRange().setStart(wrp.getOwnerTF().committedoffset.getOffsetnumber() - wrp.getTFA().getCopylocaloffset() + wrp.getRange().getStart()); + wrp.getRange().setEnd(wrp.getOwnerTF().committedoffset.getOffsetnumber() - wrp.getTFA().getCopylocaloffset() + wrp.getRange().getEnd()); + if ((wrp.getTFA().getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_1) || (wrp.getTFA().offsetdependency == OffsetDependency.NO_ACCESS) || (wrp.getTFA().getOffsetdependency() == OffsetDependency.WRITE_DEPENDENCY_2)){ + wrp.getTFA().setOffsetdependency(OffsetDependency.READ_DEPENDENCY); + wrp.getTFA().setUnknown_inital_offset_for_write(false); + if (!(wrp.getOwnerTF().committedoffset.getOffsetReaders().contains(me))) + wrp.getOwnerTF().committedoffset.getOffsetReaders().add(me); + wrp.getTFA().setLocaloffset(wrp.getTFA().getLocaloffset() + wrp.getOwnerTF().committedoffset.getOffsetnumber() - wrp.getTFA().getCopylocaloffset()); + } + + } + + + private void lockBlock(ExtendedTransaction me, int st, int k){ + BlockDataStructure block; + boolean locked = false; + while (me.getStatus() == Status.ACTIVE) { + block = this.inodestate.getBlockDataStructure(Integer.valueOf(k)); + + block.getLock().readLock().lock(); + if (!(block.getReaders().contains(me))){ + block.getReaders().add(me); + } + locked = true; + break; + } + + if (me.getStatus() == Status.ABORTED) { + int m; + if (locked){ + m = k+1; + } + else + m = k; + for (int i=st; i