2 * To change this template, choose Tools | Templates
3 * and open the template in the editor.
5 package TransactionalIO.core;
7 import TransactionalIO.exceptions.AbortedException;
8 import TransactionalIO.benchmarks.benchmark;
9 import TransactionalIO.benchmarks.customhandler;
10 import TransactionalIO.benchmarks.customhandler;
11 import TransactionalIO.interfaces.BlockAccessModesEnum;
12 import TransactionalIO.interfaces.ContentionManager;
13 import TransactionalIO.interfaces.TransactionStatu;
14 //import dstm2.file.managers.BaseManager;
15 import java.awt.event.ActionListener;
16 import java.beans.EventHandler;
17 import java.beans.PropertyChangeEvent;
18 import java.beans.PropertyChangeListener;
19 import java.beans.PropertyChangeSupport;
20 import java.io.FileDescriptor;
21 import java.io.IOException;
22 import java.io.RandomAccessFile;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Iterator;
27 import java.util.TreeMap;
28 import java.util.Vector;
29 import java.util.concurrent.locks.Lock;
30 import java.util.concurrent.locks.ReentrantLock;
31 import java.util.concurrent.locks.ReentrantReadWriteLock;
32 import java.util.logging.Level;
33 import java.util.logging.Logger;
39 public class ExtendedTransaction implements TransactionStatu {
41 private static native int nativepwrite(byte buff[], long offset, int size, FileDescriptor fd);
45 System.load("/home/navid/libkooni.so");
47 private boolean flag = true;
48 public TransactionStatu memorystate;
49 private PropertyChangeSupport changes = new PropertyChangeSupport(this);
52 public TreeMap msg = new TreeMap();
53 public int numberofwrites;
54 public int numberofreads;
58 ABORTED, ACTIVE, COMMITTED
60 private boolean writesmerged = true;
61 private Vector heldlengthlocks;
62 //private Vector<ReentrantLock> heldoffsetlocks;
63 private Vector heldoffsetlocks;
64 //private Vector<ReentrantLock> heldblocklocks;
65 private Vector heldblocklocks;
66 //private HashMap<INode, Vector<TransactionalFile>> AccessedFiles;
67 private HashMap AccessedFiles;
68 //private HashMap<INode, HashMap<Integer, BlockAccessModesEnum> > accessedBlocks;
69 private HashMap accessedBlocks;
70 //private HashMap<TransactionalFile, TransactionLocalFileAttributes> LocaltoGlobalMappings;
71 private HashMap GlobaltoLocalMappings;
72 public HashMap merge_for_writes_done;
73 private HashMap writeBuffer;
74 private ContentionManager contentionmanager;
75 private volatile Status status;
79 public ReentrantLock[] toholoffsetlocks;
80 public int offsetcount = 0;
82 public Lock[] toholdblocklocks;
83 public int blockcount = 0;
85 public ExtendedTransaction() {
87 // id = Integer.valueOf(Thread.currentThread().getName().substring(7));
88 heldlengthlocks = new Vector();
89 heldblocklocks = new Vector();
90 heldoffsetlocks = new Vector();
91 AccessedFiles = new HashMap();
92 GlobaltoLocalMappings = new HashMap/*<TransactionalFile, TransactionLocalFileAttributes >*/();
93 writeBuffer = new HashMap();
94 status = Status.ACTIVE;
95 accessedBlocks = new HashMap();
96 merge_for_writes_done = new HashMap();
98 // setContentionmanager(new BaseManager());
99 // beginTransaction();
103 public ExtendedTransaction(TransactionStatu memorystate) {
106 this.memorystate = memorystate;
109 public static int invokeNativepwrite(byte buff[], long offset, int size, RandomAccessFile file) {
111 return nativepwrite(buff, offset, buff.length, file.getFD());
112 } catch (IOException ex) {
114 Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
120 public void beginTransaction() {
121 this.addPropertyChangeListener(new customhandler(Status.ABORTED));
124 public void abort() {
125 synchronized (this) {
126 this.status = Status.ABORTED;
127 if (this.memorystate != null && !(this.memorystate).isAborted()) {
128 this.memorystate.abortThisSystem();
133 public Status getStatus() {
137 public boolean isActive() {
138 return this.getStatus() == Status.ACTIVE;
141 public boolean isAborted() {
142 return this.getStatus() == Status.ABORTED;
145 public ContentionManager getContentionmanager() {
146 return contentionmanager;
149 public void setContentionmanager(ContentionManager contentionmanager) {
150 this.contentionmanager = contentionmanager;
153 public HashMap getWriteBuffer() {
157 public HashMap getAccessedFiles() {
158 return AccessedFiles;
161 public boolean isWritesmerged() {
165 public void setWritesmerged(boolean writesmerged) {
166 this.writesmerged = writesmerged;
169 public HashMap getGlobaltoLocalMappings() {
170 return GlobaltoLocalMappings;
173 public HashMap getAccessedBlocks() {
174 return accessedBlocks;
177 public ContentionManager getBlockContentionManager() {
178 return ManagerRepository.getBlockcm();
181 public ContentionManager getOffsetContentionManager() {
182 return ManagerRepository.getOffsetcm();
185 public TreeMap getSortedFileAccessMap(HashMap hmap) {
186 /*TreeMap sortedMap = new TreeMap(hmap);
188 return new TreeMap(hmap);
191 public void setStatus(Status st) {
192 Status oldst = getStatus();
194 this.changes.firePropertyChange("status", oldst, st);
197 public void addFile(TransactionalFile tf, long offsetnumber/*, TransactionLocalFileAttributes tmp*/) {
199 TransactionLocalFileAttributes tmp = new TransactionLocalFileAttributes(offsetnumber, tf.getInodestate().commitedfilesize.getLength());
202 if (AccessedFiles.containsKey(tf.getInode())) {
203 dummy = (Vector) AccessedFiles.get(tf.getInode());
205 dummy = new Vector();
206 AccessedFiles.put(tf.getInode(), dummy);
209 GlobaltoLocalMappings.put(tf, tmp);
210 merge_for_writes_done.put(tf.getInode(), Boolean.TRUE);
213 public boolean lockOffsets() { /// Locking offsets for File Descriptors
215 // toholoffsetlocks = new ReentrantLock[30];
217 TreeMap hm = getSortedFileAccessMap(AccessedFiles);
218 Iterator iter = hm.keySet().iterator();
220 while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) {
221 INode key = (INode) iter.next();
222 Vector vec = (Vector) AccessedFiles.get(key);
224 /* if (vec.size() == 1){
225 TransactionalFile tf = ((TransactionalFile)vec.firstElement());
226 tf.offsetlock.lock();
227 // toholoffsetlocks[offsetcount] = tf.offsetlock;
229 heldoffsetlocks.add(tf.offsetlock);
233 Collections.sort(vec);
234 Iterator it = vec.iterator();
235 while (it.hasNext() /*&& this.getStatus() == Status.ACTIVE*/) {
236 TransactionalFile value = (TransactionalFile) it.next();
237 value.offsetlock.lock();
238 // toholoffsetlocks[offsetcount] = value.offsetlock;
240 heldoffsetlocks.add(value.offsetlock);
242 if (((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(value)).lenght_read){
243 if (!(value.getInodestate().commitedfilesize.lengthlock.isHeldByCurrentThread())){
244 value.getInodestate().commitedfilesize.lengthlock.lock();
245 heldlengthlocks.add(value.getInodestate().commitedfilesize.lengthlock);
252 if (this.getStatus() != Status.ACTIVE) {
253 // for (int j=0; j<offsetcount; j++){
254 // heldoffsetlocks.add(toholoffsetlocks[j]);
264 public boolean lockBlock(BlockDataStructure block, BlockAccessModesEnum mode/*, GlobalINodeState adapter, BlockAccessModesEnum mode, int expvalue, INode inode, TransactionLocalFileAttributes tf*/) {
267 if (mode == BlockAccessModesEnum.READ) {
268 lock = block.getLock().readLock();
271 lock = block.getLock().writeLock();
275 // toholdblocklocks[blockcount] = lock;
277 heldblocklocks.add(lock);
282 public void prepareCommit() {
283 if (this.status != Status.ACTIVE) {
284 throw new AbortedException();
286 boolean offsetsok = true;
287 if (!lockOffsets()) {
288 throw new AbortedException();
291 // boolean lengthslock = true;
292 // if (!lockOffsets()) {
293 // throw new AbortedException();
297 ///////////////////////////
300 Map hm = getWriteBuffer();
302 Iterator iter = hm.keySet().iterator();
303 WriteOperations value;
304 Vector vec = new Vector();
305 while (iter.hasNext() && (this.getStatus() == Status.ACTIVE) && offsetsok) {
306 INode key = (INode) iter.next();
307 vec = (Vector) hm.get(key);
308 Collections.sort(vec);
309 Iterator it = vec.iterator();
310 while (it.hasNext()) {
312 value = (WriteOperations) it.next();
313 if (value.isUnknownoffset()) {
317 start = value.getRange().getStart() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber();
318 end = value.getRange().getEnd() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber();
319 if (value.getBelongingto().isUnknown_inital_offset_for_write()) {
320 value.getBelongingto().setLocaloffset(value.getBelongingto().getLocaloffset() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber());
321 value.getBelongingto().setUnknown_inital_offset_for_write(false);
324 int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(start);
325 int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(start, value.getRange().getEnd() - value.getRange().getStart());
328 if (this.getAccessedBlocks().get(key) != null) {
329 sset = (TreeMap) this.getAccessedBlocks().get(key);
331 sset = new TreeMap();
332 this.getAccessedBlocks().put(key, sset);
336 for (int i = startblock; i <= targetblock; i++) {
337 if (sset.containsKey(Integer.valueOf(i))) {
338 if (sset.get(Integer.valueOf(i)) != BlockAccessModesEnum.WRITE) {
339 sset.put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE);
342 sset.put(Integer.valueOf(i), BlockAccessModesEnum.WRITE);
346 value.getRange().setStart(start);
347 value.getRange().setEnd(end);
353 //toholdblocklocks = new Lock[100];
355 Iterator it = this.getAccessedBlocks().keySet().iterator();
356 BlockDataStructure[] blocks = new BlockDataStructure[100];
357 //if (this.getStatus() == Status.ACTIVE)
358 while (it.hasNext() /*&& (this.getStatus() == Status.ACTIVE)*/) {
359 INode inode = (INode) it.next();
360 GlobalINodeState inodestate = TransactionalFileWrapperFactory.getTateransactionalFileINodeState(inode);
361 TreeMap vec2 = (TreeMap) this.getAccessedBlocks().get(inode);
362 Iterator iter2 = vec2.keySet().iterator();
364 while (iter2.hasNext() && this.getStatus() == Status.ACTIVE) {
365 Integer num = (Integer) iter2.next();
366 BlockDataStructure blockobj = inodestate.getBlockDataStructure(num);
367 this.lockBlock(blockobj, (BlockAccessModesEnum) vec2.get(num));
372 if (this.getStatus() != Status.ACTIVE) {
373 // for (int i=0; i<blockcount; i++)
374 // heldblocklocks.add(toholdblocklocks[i]);
375 throw new AbortedException();
381 public void commitChanges() {
383 Map hm = getWriteBuffer();
384 Iterator iter = hm.keySet().iterator();
386 WriteOperations writeop;
388 while (iter.hasNext()) {
389 INode key = (INode) iter.next();
391 vec = (Vector) hm.get(key);
392 Collections.sort(vec);
394 while (it.hasNext()) {
396 writeop = (WriteOperations) it.next();
397 Byte[] data = new Byte[(int) (writeop.getRange().getEnd() - writeop.getRange().getStart())];
398 byte[] bytedata = new byte[(int) (writeop.getRange().getEnd() - writeop.getRange().getStart())];
399 data = (Byte[]) writeop.getData();
401 for (int i = 0; i < data.length; i++) {
402 bytedata[i] = data[i];
404 invokeNativepwrite(bytedata, writeop.getRange().getStart(), bytedata.length, writeop.getOwnertransactionalFile().file);
408 Iterator k = GlobaltoLocalMappings.keySet().iterator();
409 while (k.hasNext()) {
410 TransactionalFile trf = (TransactionalFile) (k.next());
411 trf.getCommitedoffset().setOffsetnumber(((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(trf)).getLocaloffset());
412 if (((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(trf)).getInitiallocallength() != ((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(trf)).getLocalsize()){
414 if (!(trf.getInodestate().commitedfilesize.lengthlock.isHeldByCurrentThread()))
415 trf.getInodestate().commitedfilesize.lengthlock.lock();
417 Iterator it2 = trf.getInodestate().commitedfilesize.getLengthReaders().iterator();
418 if (((TransactionLocalFileAttributes)getGlobaltoLocalMappings().get(trf)).getInitiallocallength() != ((TransactionLocalFileAttributes)getGlobaltoLocalMappings().get(trf)).getLocalsize())
420 while (it2.hasNext()) {
421 ExtendedTransaction tr = (ExtendedTransaction) it2.next();
426 trf.getInodestate().commitedfilesize.getLengthReaders().clear();
428 trf.getInodestate().commitedfilesize.setLength(trf.file.length());
430 if (trf.getInodestate().commitedfilesize.lengthlock.isHeldByCurrentThread()){
431 heldlengthlocks.remove(trf.getInodestate().commitedfilesize.lengthlock);
432 trf.getInodestate().commitedfilesize.lengthlock.unlock();
435 } catch (IOException ex) {
436 Logger.getLogger(ExtendedTransaction.class.getName()).log(Level.SEVERE, null, ex);
440 if (((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(trf)).lenght_read){
441 trf.getInodestate().commitedfilesize.getLengthReaders().remove(this);
442 heldlengthlocks.remove(trf.getInodestate().commitedfilesize.lengthlock);
443 trf.getInodestate().commitedfilesize.lengthlock.unlock();
448 /* for (int i =0; i<blockcount; i++){
449 toholdblocklocks[i].unlock();
451 for (int i =0; i<offsetcount; i++){
452 toholoffsetlocks[i].unlock();
456 public void unlockAllLocks() {
457 Iterator it = heldblocklocks.iterator();
459 while (it.hasNext()) {
461 Lock lock = (Lock) it.next();
464 heldblocklocks.clear();
466 it = heldoffsetlocks.iterator();
467 while (it.hasNext()) {
468 ReentrantLock lock = (ReentrantLock) it.next();
471 heldoffsetlocks.clear();
473 it = heldlengthlocks.iterator();
474 while (it.hasNext()) {
475 ReentrantLock lock = (ReentrantLock) it.next();
478 heldlengthlocks.clear();
481 public void abortAllReaders() {
482 TreeMap hm = getSortedFileAccessMap(AccessedFiles);
484 Iterator iter = hm.keySet().iterator();
485 TransactionalFile value;
486 while (iter.hasNext()) {
487 INode key = (INode) iter.next();
488 Vector vec = (Vector) AccessedFiles.get(key);
489 Iterator it = vec.iterator();
490 while (it.hasNext()) {
492 value = (TransactionalFile) it.next();
493 Iterator it2 = value.getCommitedoffset().getOffsetReaders().iterator(); // for visible readers strategy
495 while (it2.hasNext()) {
496 ExtendedTransaction tr = (ExtendedTransaction) it2.next();
501 value.getCommitedoffset().getOffsetReaders().clear();
508 if (accessedBlocks.get(key) != null) {
509 vec2 = (TreeMap) accessedBlocks.get(key);
511 vec2 = new TreeMap();
514 GlobalINodeState inodestate = TransactionalFileWrapperFactory.getTateransactionalFileINodeState(key);
515 Iterator it2 = vec2.keySet().iterator();
517 while (it2.hasNext()) {
519 Integer num = (Integer) it2.next();
520 if (vec2.get(num) != BlockAccessModesEnum.READ) {
521 BlockDataStructure blockobj = (BlockDataStructure) inodestate.getBlockDataStructure(num);
523 Iterator it4 = blockobj.getReaders().iterator(); // from here for visible readers strategy
525 while (it4.hasNext()) {
527 ExtendedTransaction tr = (ExtendedTransaction) it4.next();
532 blockobj.getReaders().clear();
542 public void addPropertyChangeListener(PropertyChangeListener listener) {
543 this.changes.addPropertyChangeListener("status", listener);
546 public void removePropertyChangeListener(PropertyChangeListener listener) {
547 this.changes.removePropertyChangeListener("status", listener);
550 public TransactionStatu getOtherSystem() {
554 public void setOtherSystem(TransactionStatu othersystem) {
555 memorystate = othersystem;
558 public Vector getHeldblocklocks() {
559 return heldblocklocks;
562 public void setHeldblocklocks(Vector heldblocklocks) {
563 this.heldblocklocks = heldblocklocks;
566 public Vector getHeldoffsetlocks() {
567 return heldoffsetlocks;
570 public Vector getHeldlengthlocks() {
571 return heldlengthlocks;
574 public void setHeldoffsetlocks(Vector heldoffsetlocks) {
575 this.heldoffsetlocks = heldoffsetlocks;
578 public void abortThisSystem() {
582 public boolean isCommitted() {
583 if (this.status == Status.COMMITTED) {
591 public boolean lockBlock(BlockDataStructure block, Adapter adapter, BlockAccessModesEnum mode, int expvalue) { // from here for visible readers strategy
592 while (this.getStatus() == Status.ACTIVE) {
593 if (lock.tryLock()) {
594 Thread.onAbortOnce(new Runnable() {
601 heldblocklocks.add(lock);
603 synchronized (adapter) {
604 block.setOwner(this);
605 // Iterator it = block.getReaders().iterator();
606 // while (it.hasNext())
608 // ExtendedTransaction tr = (ExtendedTransaction) it.next();
615 getBlockContentionManager().resolveConflict(this, block.getOwner());
620 public boolean lockBlock(BlockDataStructure block, Adapter adapter, BlockAccessModesEnum mode, int expvalue) { // versioning strat
621 while (this.getStatus() == Status.ACTIVE) {
622 if (lock.tryLock()) {
623 Thread.onAbortOnce(new Runnable() {
630 heldblocklocks.add(lock);
631 if (mode != BlockAccessModesEnum.WRITE) { egy
632 if (block.getVersion().get() != expvalue) {
637 synchronized (adapter) {
638 block.setOwner(this);
643 getContentionManager().resolveConflict(this, block.getOwner());
649 //expvalue = ((Integer) value.getBlockversions().get(it)).intValue(); //for versioning strategy
650 /*if (!(value.isValidatelocaloffset())) {
651 if (((BlockAccessModesEnum) (value.getAccesedblocks().get(blockno))) != BlockAccessModesEnum.WRITE) { //versioning strategy
653 /if (blockobj.getVersion().get() == expvalue) {
655 ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue);
666 ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue);
676 throw new AbortedException();