getlenght support added
[IRC.git] / Robust / Transactions / TransactionalIO / src / TransactionalIO / core / ExtendedTransaction.java
1 /*
2  * To change this template, choose Tools | Templates
3  * and open the template in the editor.
4  */
5 package TransactionalIO.core;
6
7 import TransactionalIO.exceptions.AbortedException;
8 import TransactionalIO.benchmarks.benchmark;
9 import TransactionalIO.benchmarks.customhandler;
10 import TransactionalIO.benchmarks.customhandler;
11 import TransactionalIO.interfaces.BlockAccessModesEnum;
12 import TransactionalIO.interfaces.ContentionManager;
13 import TransactionalIO.interfaces.TransactionStatu;
14 //import dstm2.file.managers.BaseManager;
15 import java.awt.event.ActionListener;
16 import java.beans.EventHandler;
17 import java.beans.PropertyChangeEvent;
18 import java.beans.PropertyChangeListener;
19 import java.beans.PropertyChangeSupport;
20 import java.io.FileDescriptor;
21 import java.io.IOException;
22 import java.io.RandomAccessFile;
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Iterator;
26 import java.util.Map;
27 import java.util.TreeMap;
28 import java.util.Vector;
29 import java.util.concurrent.locks.Lock;
30 import java.util.concurrent.locks.ReentrantLock;
31 import java.util.concurrent.locks.ReentrantReadWriteLock;
32 import java.util.logging.Level;
33 import java.util.logging.Logger;
34
35 /**
36  *
37  * @author navid
38  */
39 public class ExtendedTransaction implements TransactionStatu {
40
41     private static native int nativepwrite(byte buff[], long offset, int size, FileDescriptor fd);
42     
43
44     {
45         System.load("/home/navid/libkooni.so");
46     }
47     private boolean flag = true;
48     public TransactionStatu memorystate;
49     private PropertyChangeSupport changes = new PropertyChangeSupport(this);
50     public int starttime;
51     public int endtime;
52     public TreeMap msg = new TreeMap();
53     public int numberofwrites;
54     public int numberofreads;
55
56     public enum Status {
57
58         ABORTED, ACTIVE, COMMITTED
59     };
60     private boolean writesmerged = true;
61     private Vector heldlengthlocks;
62     //private Vector<ReentrantLock> heldoffsetlocks;    
63     private Vector heldoffsetlocks;
64     //private Vector<ReentrantLock> heldblocklocks;    
65     private Vector heldblocklocks;
66     //private HashMap<INode, Vector<TransactionalFile>> AccessedFiles;
67     private HashMap AccessedFiles;
68     //private HashMap<INode, HashMap<Integer, BlockAccessModesEnum> > accessedBlocks;
69     private HashMap accessedBlocks;
70     //private HashMap<TransactionalFile, TransactionLocalFileAttributes> LocaltoGlobalMappings;
71     private HashMap GlobaltoLocalMappings;
72     public HashMap merge_for_writes_done;
73     private HashMap writeBuffer;
74     private ContentionManager contentionmanager;
75     private volatile Status status;
76     private int id;
77     
78     
79     public ReentrantLock[] toholoffsetlocks;
80     public int offsetcount = 0;
81     
82     public Lock[] toholdblocklocks;
83     public int blockcount = 0;
84
85     public ExtendedTransaction() {
86         //  super();
87         // id = Integer.valueOf(Thread.currentThread().getName().substring(7));
88         heldlengthlocks = new Vector();
89         heldblocklocks = new Vector();
90         heldoffsetlocks = new Vector();
91         AccessedFiles = new HashMap();
92         GlobaltoLocalMappings = new HashMap/*<TransactionalFile, TransactionLocalFileAttributes >*/();
93         writeBuffer = new HashMap();
94         status = Status.ACTIVE;
95         accessedBlocks = new HashMap();
96         merge_for_writes_done = new HashMap();
97         writesmerged = true;
98     //   setContentionmanager(new BaseManager());
99     //    beginTransaction();
100
101     }
102
103     public ExtendedTransaction(TransactionStatu memorystate) {
104         this();
105
106         this.memorystate = memorystate;
107     }
108
109     public static int invokeNativepwrite(byte buff[], long offset, int size, RandomAccessFile file) {
110         try {
111             return nativepwrite(buff, offset, buff.length, file.getFD());
112         } catch (IOException ex) {
113
114             Logger.getLogger(TransactionalFile.class.getName()).log(Level.SEVERE, null, ex);
115             return -1;
116         }
117
118     }
119
120     public void beginTransaction() {
121         this.addPropertyChangeListener(new customhandler(Status.ABORTED));
122     }
123
124     public void abort() {
125         synchronized (this) {
126             this.status = Status.ABORTED;
127             if (this.memorystate != null && !(this.memorystate).isAborted()) {
128                 this.memorystate.abortThisSystem();
129             }
130         }
131     }
132
133     public Status getStatus() {
134         return status;
135     }
136
137     public boolean isActive() {
138         return this.getStatus() == Status.ACTIVE;
139     }
140
141     public boolean isAborted() {
142         return this.getStatus() == Status.ABORTED;
143     }
144
145     public ContentionManager getContentionmanager() {
146         return contentionmanager;
147     }
148
149     public void setContentionmanager(ContentionManager contentionmanager) {
150         this.contentionmanager = contentionmanager;
151     }
152
153     public HashMap getWriteBuffer() {
154         return writeBuffer;
155     }
156
157     public HashMap getAccessedFiles() {
158         return AccessedFiles;
159     }
160
161     public boolean isWritesmerged() {
162         return writesmerged;
163     }
164
165     public void setWritesmerged(boolean writesmerged) {
166         this.writesmerged = writesmerged;
167     }
168
169     public HashMap getGlobaltoLocalMappings() {
170         return GlobaltoLocalMappings;
171     }
172
173     public HashMap getAccessedBlocks() {
174         return accessedBlocks;
175     }
176
177     public ContentionManager getBlockContentionManager() {
178         return ManagerRepository.getBlockcm();
179     }
180
181     public ContentionManager getOffsetContentionManager() {
182         return ManagerRepository.getOffsetcm();
183     }
184
185     public TreeMap getSortedFileAccessMap(HashMap hmap) {
186         /*TreeMap sortedMap = new TreeMap(hmap);
187         return sortedMap;*/
188         return new TreeMap(hmap);
189     }
190
191     public void setStatus(Status st) {
192         Status oldst = getStatus();
193         this.status = st;
194         this.changes.firePropertyChange("status", oldst, st);
195     }
196
197     public void addFile(TransactionalFile tf, long offsetnumber/*, TransactionLocalFileAttributes tmp*/) {
198
199         TransactionLocalFileAttributes tmp = new TransactionLocalFileAttributes(offsetnumber, tf.getInodestate().commitedfilesize.getLength());
200         Vector dummy;
201
202         if (AccessedFiles.containsKey(tf.getInode())) {
203             dummy = (Vector) AccessedFiles.get(tf.getInode());
204         } else {
205             dummy = new Vector();
206             AccessedFiles.put(tf.getInode(), dummy);
207         }
208         dummy.add(tf);
209         GlobaltoLocalMappings.put(tf, tmp);
210         merge_for_writes_done.put(tf.getInode(), Boolean.TRUE);
211     }
212
213     public boolean lockOffsets() {   /// Locking offsets for File Descriptors
214
215       //  toholoffsetlocks = new ReentrantLock[30];
216         
217         TreeMap hm = getSortedFileAccessMap(AccessedFiles);
218         Iterator iter = hm.keySet().iterator();
219         offsetcount = 0;
220         while (iter.hasNext() && (this.getStatus() == Status.ACTIVE)) {
221             INode key = (INode) iter.next();
222             Vector vec = (Vector) AccessedFiles.get(key);
223             
224        /*     if (vec.size() == 1){
225                 TransactionalFile tf = ((TransactionalFile)vec.firstElement());
226                 tf.offsetlock.lock();
227              //   toholoffsetlocks[offsetcount] = tf.offsetlock;
228             //   offsetcount++;
229                 heldoffsetlocks.add(tf.offsetlock);
230                 continue;
231             }*/
232             
233             Collections.sort(vec);
234             Iterator it = vec.iterator();
235             while (it.hasNext() /*&& this.getStatus() == Status.ACTIVE*/) {
236                TransactionalFile value = (TransactionalFile) it.next();
237                value.offsetlock.lock();
238              //   toholoffsetlocks[offsetcount] = value.offsetlock;
239             //    offsetcount++;
240                 heldoffsetlocks.add(value.offsetlock);
241                 
242                 if (((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(value)).lenght_read){ 
243                     if (!(value.getInodestate().commitedfilesize.lengthlock.isHeldByCurrentThread())){
244                         value.getInodestate().commitedfilesize.lengthlock.lock();
245                         heldlengthlocks.add(value.getInodestate().commitedfilesize.lengthlock);
246                     }
247                 }
248                 break;
249             }
250         }
251
252         if (this.getStatus() != Status.ACTIVE) {
253          //   for (int j=0; j<offsetcount; j++){
254          //       heldoffsetlocks.add(toholoffsetlocks[j]);
255          //   }
256             return false;
257         }
258         return true;
259     }
260     
261     
262     
263
264     public boolean lockBlock(BlockDataStructure block, BlockAccessModesEnum mode/*, GlobalINodeState adapter, BlockAccessModesEnum mode, int expvalue, INode inode, TransactionLocalFileAttributes tf*/) {
265
266         Lock lock;
267         if (mode == BlockAccessModesEnum.READ) {
268             lock = block.getLock().readLock();
269         } else {
270
271             lock = block.getLock().writeLock();
272         }
273         
274         lock.lock();
275       //  toholdblocklocks[blockcount] = lock;
276       //  blockcount++;
277         heldblocklocks.add(lock);
278         return true;
279        
280     }
281
282     public void prepareCommit() {
283         if (this.status != Status.ACTIVE) {
284             throw new AbortedException();
285         }
286         boolean offsetsok = true;
287         if (!lockOffsets()) {
288             throw new AbortedException();
289         }
290         
291       //  boolean lengthslock = true;
292      //   if (!lockOffsets()) {
293      //       throw new AbortedException();
294      //   }
295
296
297         ///////////////////////////
298
299
300         Map hm = getWriteBuffer();
301
302         Iterator iter = hm.keySet().iterator();
303         WriteOperations value;
304         Vector vec = new Vector();
305         while (iter.hasNext() && (this.getStatus() == Status.ACTIVE) && offsetsok) {
306             INode key = (INode) iter.next();
307             vec = (Vector) hm.get(key);
308             Collections.sort(vec);
309             Iterator it = vec.iterator();
310             while (it.hasNext()) {
311
312                 value = (WriteOperations) it.next();
313                 if (value.isUnknownoffset()) {
314
315                     long start;
316                     long end;
317                     start = value.getRange().getStart() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber();
318                     end = value.getRange().getEnd() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber();
319                     if (value.getBelongingto().isUnknown_inital_offset_for_write()) {
320                         value.getBelongingto().setLocaloffset(value.getBelongingto().getLocaloffset() - value.getBelongingto().getCopylocaloffset() + value.getOwnertransactionalFile().getCommitedoffset().getOffsetnumber());
321                         value.getBelongingto().setUnknown_inital_offset_for_write(false);
322                     }
323
324                     int startblock = FileBlockManager.getCurrentFragmentIndexofTheFile(start);
325                     int targetblock = FileBlockManager.getTargetFragmentIndexofTheFile(start, value.getRange().getEnd() - value.getRange().getStart());
326
327                     TreeMap sset;
328                     if (this.getAccessedBlocks().get(key) != null) {
329                         sset = (TreeMap) this.getAccessedBlocks().get(key);
330                     } else {
331                         sset = new TreeMap();
332                         this.getAccessedBlocks().put(key, sset);
333                     }
334
335
336                     for (int i = startblock; i <= targetblock; i++) {
337                         if (sset.containsKey(Integer.valueOf(i))) {
338                             if (sset.get(Integer.valueOf(i)) != BlockAccessModesEnum.WRITE) {
339                                 sset.put(Integer.valueOf(i), BlockAccessModesEnum.READ_WRITE);
340                             }
341                         } else {
342                             sset.put(Integer.valueOf(i), BlockAccessModesEnum.WRITE);
343                         }
344                     }
345
346                     value.getRange().setStart(start);
347                     value.getRange().setEnd(end);
348                 }
349             }
350
351         }
352         
353         //toholdblocklocks = new Lock[100];
354         
355         Iterator it = this.getAccessedBlocks().keySet().iterator();
356         BlockDataStructure[] blocks = new BlockDataStructure[100];
357         //if (this.getStatus() == Status.ACTIVE)
358             while (it.hasNext() /*&& (this.getStatus() == Status.ACTIVE)*/) {
359                 INode inode = (INode) it.next();
360                 GlobalINodeState inodestate = TransactionalFileWrapperFactory.getTateransactionalFileINodeState(inode);
361                 TreeMap vec2 = (TreeMap) this.getAccessedBlocks().get(inode);
362                 Iterator iter2 = vec2.keySet().iterator();
363             
364                 while (iter2.hasNext() && this.getStatus() == Status.ACTIVE) {
365                     Integer num = (Integer) iter2.next();    
366                     BlockDataStructure blockobj = inodestate.getBlockDataStructure(num);
367                     this.lockBlock(blockobj, (BlockAccessModesEnum) vec2.get(num));
368                     
369                 }
370             }
371
372         if (this.getStatus() != Status.ACTIVE) {
373         //    for (int i=0; i<blockcount; i++)
374         //        heldblocklocks.add(toholdblocklocks[i]); 
375             throw new AbortedException();
376         }
377         abortAllReaders();
378
379     }
380
381     public void commitChanges() {
382
383         Map hm = getWriteBuffer();
384         Iterator iter = hm.keySet().iterator();
385         Iterator it;
386         WriteOperations writeop;
387         Vector vec;
388         while (iter.hasNext()) {
389             INode key = (INode) iter.next();
390
391             vec = (Vector) hm.get(key);
392             Collections.sort(vec);
393             it = vec.iterator();
394             while (it.hasNext()) {
395
396                 writeop = (WriteOperations) it.next();
397                 Byte[] data = new Byte[(int) (writeop.getRange().getEnd() - writeop.getRange().getStart())];
398                 byte[] bytedata = new byte[(int) (writeop.getRange().getEnd() - writeop.getRange().getStart())];
399                 data = (Byte[]) writeop.getData();
400
401                 for (int i = 0; i < data.length; i++) {
402                     bytedata[i] = data[i];
403                 }
404                 invokeNativepwrite(bytedata, writeop.getRange().getStart(), bytedata.length, writeop.getOwnertransactionalFile().file);
405             }
406         }
407
408         Iterator k = GlobaltoLocalMappings.keySet().iterator();
409         while (k.hasNext()) {
410             TransactionalFile trf = (TransactionalFile) (k.next());
411             trf.getCommitedoffset().setOffsetnumber(((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(trf)).getLocaloffset());
412             if (((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(trf)).getInitiallocallength() != ((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(trf)).getLocalsize()){
413                 try {
414                     if (!(trf.getInodestate().commitedfilesize.lengthlock.isHeldByCurrentThread()))
415                         trf.getInodestate().commitedfilesize.lengthlock.lock();
416                     
417                     Iterator it2 = trf.getInodestate().commitedfilesize.getLengthReaders().iterator();
418                     if (((TransactionLocalFileAttributes)getGlobaltoLocalMappings().get(trf)).getInitiallocallength() != ((TransactionLocalFileAttributes)getGlobaltoLocalMappings().get(trf)).getLocalsize())
419                     {
420                         while (it2.hasNext()) {
421                             ExtendedTransaction tr = (ExtendedTransaction) it2.next();
422                             if (tr != this) {
423                                 tr.abort();
424                             }
425                         }
426                         trf.getInodestate().commitedfilesize.getLengthReaders().clear();
427                     }
428                     trf.getInodestate().commitedfilesize.setLength(trf.file.length());
429                     
430                     if (trf.getInodestate().commitedfilesize.lengthlock.isHeldByCurrentThread()){
431                         heldlengthlocks.remove(trf.getInodestate().commitedfilesize.lengthlock);
432                         trf.getInodestate().commitedfilesize.lengthlock.unlock();
433                     }
434                     
435                 } catch (IOException ex) {
436                     Logger.getLogger(ExtendedTransaction.class.getName()).log(Level.SEVERE, null, ex);
437                 }
438             }
439             
440             if (((TransactionLocalFileAttributes) GlobaltoLocalMappings.get(trf)).lenght_read){
441                 trf.getInodestate().commitedfilesize.getLengthReaders().remove(this);
442                 heldlengthlocks.remove(trf.getInodestate().commitedfilesize.lengthlock);
443                 trf.getInodestate().commitedfilesize.lengthlock.unlock();
444             }
445         }
446         
447         
448       /*  for (int i =0; i<blockcount; i++){
449             toholdblocklocks[i].unlock();
450         }
451         for (int i =0; i<offsetcount; i++){
452             toholoffsetlocks[i].unlock();
453         }*/
454     }
455
456     public void unlockAllLocks() {
457         Iterator it = heldblocklocks.iterator();
458
459         while (it.hasNext()) {
460
461             Lock lock = (Lock) it.next();
462             lock.unlock();
463         }
464         heldblocklocks.clear();
465
466         it = heldoffsetlocks.iterator();
467         while (it.hasNext()) {
468             ReentrantLock lock = (ReentrantLock) it.next();
469             lock.unlock();
470         }
471         heldoffsetlocks.clear();
472         
473        it = heldlengthlocks.iterator(); 
474        while (it.hasNext()) {
475             ReentrantLock lock = (ReentrantLock) it.next();
476             lock.unlock();
477         }
478         heldlengthlocks.clear();
479     }
480
481     public void abortAllReaders() {
482         TreeMap hm = getSortedFileAccessMap(AccessedFiles);
483         //lock phase
484         Iterator iter = hm.keySet().iterator();
485         TransactionalFile value;
486         while (iter.hasNext()) {
487             INode key = (INode) iter.next();
488             Vector vec = (Vector) AccessedFiles.get(key);
489             Iterator it = vec.iterator();
490             while (it.hasNext()) {
491
492                 value = (TransactionalFile) it.next();
493                 Iterator it2 = value.getCommitedoffset().getOffsetReaders().iterator(); // for visible readers strategy
494
495                 while (it2.hasNext()) {
496                     ExtendedTransaction tr = (ExtendedTransaction) it2.next();
497                     if (tr != this) {
498                         tr.abort();
499                     }
500                 }
501                 value.getCommitedoffset().getOffsetReaders().clear();
502                 
503             
504                 
505             }
506
507             TreeMap vec2;
508             if (accessedBlocks.get(key) != null) {
509                 vec2 = (TreeMap) accessedBlocks.get(key);
510             } else {
511                 vec2 = new TreeMap();
512
513             }
514             GlobalINodeState inodestate = TransactionalFileWrapperFactory.getTateransactionalFileINodeState(key);
515             Iterator it2 = vec2.keySet().iterator();
516
517             while (it2.hasNext()) {
518
519                 Integer num = (Integer) it2.next();
520                 if (vec2.get(num) != BlockAccessModesEnum.READ) {
521                     BlockDataStructure blockobj = (BlockDataStructure) inodestate.getBlockDataStructure(num);
522                     //lockmap.get(num);
523                     Iterator it4 = blockobj.getReaders().iterator(); // from here for visible readers strategy
524
525                     while (it4.hasNext()) {
526
527                         ExtendedTransaction tr = (ExtendedTransaction) it4.next();
528                         if (this != tr) {
529                             tr.abort();
530                         }
531                     }
532                     blockobj.getReaders().clear();
533                 }
534             }
535
536
537         }
538     }
539     
540   
541
542     public void addPropertyChangeListener(PropertyChangeListener listener) {
543         this.changes.addPropertyChangeListener("status", listener);
544     }
545
546     public void removePropertyChangeListener(PropertyChangeListener listener) {
547         this.changes.removePropertyChangeListener("status", listener);
548     }
549
550     public TransactionStatu getOtherSystem() {
551         return memorystate;
552     }
553
554     public void setOtherSystem(TransactionStatu othersystem) {
555         memorystate = othersystem;
556     }
557
558     public Vector getHeldblocklocks() {
559         return heldblocklocks;
560     }
561
562     public void setHeldblocklocks(Vector heldblocklocks) {
563         this.heldblocklocks = heldblocklocks;
564     }
565
566     public Vector getHeldoffsetlocks() {
567         return heldoffsetlocks;
568     }
569     
570     public Vector getHeldlengthlocks() {
571         return heldlengthlocks;
572     }
573
574     public void setHeldoffsetlocks(Vector heldoffsetlocks) {
575         this.heldoffsetlocks = heldoffsetlocks;
576     }
577
578     public void abortThisSystem() {
579         abort();
580     }
581
582     public boolean isCommitted() {
583         if (this.status == Status.COMMITTED) {
584             return true;
585         }
586         return false;
587
588     }
589 }
590 /*
591 public boolean lockBlock(BlockDataStructure block, Adapter adapter, BlockAccessModesEnum mode, int expvalue) { // from here for visible readers strategy
592 while (this.getStatus() == Status.ACTIVE) {
593 if (lock.tryLock()) {
594 Thread.onAbortOnce(new Runnable() {
595
596 public void run() {
597 lock.unlock();
598 }
599 });
600
601 heldblocklocks.add(lock);
602
603 synchronized (adapter) {
604 block.setOwner(this);
605 //        Iterator it =  block.getReaders().iterator(); 
606 //        while (it.hasNext())
607 //        {
608 //            ExtendedTransaction tr = (ExtendedTransaction) it.next();
609 //            tr.abort();
610 //       }
611 }
612
613 return true;
614 } else {
615 getBlockContentionManager().resolveConflict(this, block.getOwner());
616 }
617 }
618 return false;*/
619 /*
620 public boolean lockBlock(BlockDataStructure block, Adapter adapter, BlockAccessModesEnum mode, int expvalue) { // versioning strat
621 while (this.getStatus() == Status.ACTIVE) {
622 if (lock.tryLock()) {
623 Thread.onAbortOnce(new Runnable() {
624
625 public void run() {
626 lock.unlock();
627 }
628 });
629
630 heldblocklocks.add(lock);
631 if (mode != BlockAccessModesEnum.WRITE) {   egy
632 if (block.getVersion().get() != expvalue) {
633 unlockAllLocks();
634 return false;
635 }
636 }
637 synchronized (adapter) {
638 block.setOwner(this);
639 }
640
641 return true;
642 } else {
643 getContentionManager().resolveConflict(this, block.getOwner());
644 }
645 }
646 return false;
647 }*/
648 // }
649 //expvalue = ((Integer) value.getBlockversions().get(it)).intValue(); //for versioning strategy
650             /*if (!(value.isValidatelocaloffset())) {
651 if (((BlockAccessModesEnum) (value.getAccesedblocks().get(blockno))) != BlockAccessModesEnum.WRITE) { //versioning strategy
652
653 /if (blockobj.getVersion().get() == expvalue) {
654
655 ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue);
656 if (ok == false) {
657 //        unlockAllLocks();
658 break;
659 }
660 } else {
661 ok = false;
662 break;
663 }
664 } else {
665
666 ok = this.lock(blockobj, value.adapter, (BlockAccessModesEnum) (value.getAccesedblocks().get(blockno)), expvalue);
667 if (ok == false) {
668 break;
669 }
670 }
671 }
672
673
674 if (!(ok)) {
675 unlockAllLocks();
676 throw new AbortedException();
677 }*/
678    
679
680
681