*** empty log message ***
[IRC.git] / Robust / Transactions / dstm2 / src / dstm2 / Thread.java
1 package dstm2;
2
3 /*
4  * Thread.java
5  *
6  * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
7  * Clara, California 95054, U.S.A.  All rights reserved.  
8  * 
9  * Sun Microsystems, Inc. has intellectual property rights relating to
10  * technology embodied in the product that is described in this
11  * document.  In particular, and without limitation, these
12  * intellectual property rights may include one or more of the
13  * U.S. patents listed at http://www.sun.com/patents and one or more
14  * additional patents or pending patent applications in the U.S. and
15  * in other countries.
16  * 
17  * U.S. Government Rights - Commercial software.
18  * Government users are subject to the Sun Microsystems, Inc. standard
19  * license agreement and applicable provisions of the FAR and its
20  * supplements.  Use is subject to license terms.  Sun, Sun
21  * Microsystems, the Sun logo and Java are trademarks or registered
22  * trademarks of Sun Microsystems, Inc. in the U.S. and other
23  * countries.  
24  * 
25  * This product is covered and controlled by U.S. Export Control laws
26  * and may be subject to the export or import laws in other countries.
27  * Nuclear, missile, chemical biological weapons or nuclear maritime
28  * end uses or end users, whether direct or indirect, are strictly
29  * prohibited.  Export or reexport to countries subject to
30  * U.S. embargo or to entities identified on U.S. export exclusion
31  * lists, including, but not limited to, the denied persons and
32  * specially designated nationals lists is strictly prohibited.
33  */
34
35 //package dstm2;
36
37
38 import TransactionalIO.exceptions.AbortedException;
39 import TransactionalIO.exceptions.GracefulException;
40 import TransactionalIO.exceptions.PanicException;
41
42 import dstm2.factory.AtomicFactory;
43 import dstm2.factory.Factory;
44 import TransactionalIO.benchmarks.benchmark;
45 import dstm2.SpecialLock;
46 import TransactionalIO.core.TransactionalFile;
47 import TransactionalIO.core.Wrapper;
48 import java.lang.reflect.Constructor;
49 import java.lang.reflect.InvocationTargetException;
50 import java.util.ArrayList;
51 import java.util.Collections;
52 import java.util.HashMap;
53 import java.util.HashSet;
54 import java.util.LinkedList;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Set;
58 import java.util.concurrent.Callable;
59 import static dstm2.Defaults.*;
60 /**
61  * The basic unit of computation for the transactional memory.  This
62  * class extends <code>java.lang.Thread</code> by providing methods to
63  * begin, commit and abort transactions.
64  *
65  * Every <code>Thread</code> has a contention manager, created when
66  * the thread is created.  Before creating any <code>Thread</code>s,
67  * you must call <code>Thread.setContentionManager</code> to set the
68  * class of the contention manager that will be created.  The
69  * contention manager of a thread is notified (by invoking its
70  * notification methods) of the results of any methods involving the
71  * thread.  It is also consulted on whether a transaction should be
72  * begun.
73  *
74  * @see dstm2.ContentionManager
75  */
76 public class Thread extends java.lang.Thread{
77   /**
78    * Contention manager class.
79    */
80   protected static Class contentionManagerClass;
81   
82   /**
83    * Adapter class.
84    */
85   protected static Class<dstm2.factory.Adapter> adapterClass;
86   
87   /**
88    * Set to true when benchmark runs out of time.
89    **/
90   public static volatile boolean stop = false;
91   /**
92    * number of committed transactions for all threads
93    */
94   public static long totalCommitted = 0;
95   /**
96    * total number of transactions for all threads
97    */
98   public static long totalTotal = 0;
99   /**
100    * number of committed memory references for all threads
101    */
102   public static long totalCommittedMemRefs = 0;
103   /**
104    * total number of memory references for all threads
105    */
106   public static long totalTotalMemRefs = 0;
107   
108   static ThreadLocal<ThreadState> _threadState = new ThreadLocal<ThreadState>() {
109     protected synchronized ThreadState initialValue() {
110       return new ThreadState();
111     }
112   };
113   static ThreadLocal<Thread> _thread = new ThreadLocal<Thread>() {
114     protected synchronized Thread initialValue() {
115       return null;
116     }
117   };
118   
119   private static int MAX_NESTING_DEPTH = 1;
120   
121   private static Object lock = new Object();
122   
123   // Memo-ize factories so we don't have to recreate them.
124   private static Map<Class,Factory> factoryTable
125       = Collections.synchronizedMap(new HashMap<Class,Factory>());
126   
127   /**
128    * Create thread to run a method.
129    * @param target execute this object's <CODE>run()</CODE> method
130    */
131   public Thread(final Runnable target) {
132     super(new Runnable() {
133       public void run() {
134         ThreadState threadState = _threadState.get();
135         threadState.reset();
136         target.run();
137         // collect statistics
138         synchronized (lock){
139           totalCommitted += threadState.committed;
140           totalTotal += threadState.total;
141           totalCommittedMemRefs += threadState.committedMemRefs;
142           totalTotalMemRefs += threadState.totalMemRefs;
143         }
144       }
145     });
146   }
147   /**
148    * no-arg constructor
149    */
150   public Thread() {
151     super();
152   }
153   
154   /**
155    * Establishes a contention manager.  You must call this method
156    * before creating any <code>Thread</code>.
157    *
158    * @see dstm2.ContentionManager
159    * @param theClass class of desired contention manager.
160    */
161   public static void setContentionManagerClass(Class theClass) {
162     Class cm;
163     try {
164       cm = Class.forName("dstm2.ContentionManager");
165     } catch (ClassNotFoundException e) {
166       throw new PanicException(e);
167     }
168     try {
169       contentionManagerClass = theClass;
170     } catch (Exception e) {
171       throw new PanicException("The class " + theClass
172           + " does not implement dstm2.ContentionManager");
173     }
174   }
175   
176   /**
177    * set Adapter class for this thread
178    * @param adapterClassName adapter class as string
179    */
180   public static void setAdapterClass(String adapterClassName) {
181     try {
182       adapterClass = (Class<dstm2.factory.Adapter>)Class.forName(adapterClassName);
183     } catch (ClassNotFoundException ex) {
184       throw new PanicException("Adapter class not found: %s\n", adapterClassName);
185     }
186   }
187   
188   /**
189    * Tests whether the current transaction can still commit.  Does not
190    * actually end the transaction (either <code>commitTransaction</code> or
191    * <code>abortTransaction</code> must still be called).  The contention
192    * manager of the invoking thread is notified if the onValidate fails
193    * because a <code>TMObject</code> opened for reading was invalidated.
194    *
195    * @return whether the current transaction may commit successfully.
196    */
197   static public boolean validate() {
198     ThreadState threadState = _threadState.get();
199     return threadState.validate();
200   }
201   
202   /**
203    * Gets the current transaction, if any, of the invoking <code>Thread</code>.
204    *
205    * @return the current thread's current transaction; <code>null</code> if
206    *         there is no current transaction.
207    */
208   static public Transaction getTransaction() {
209     return _threadState.get().transaction;
210   }
211   
212   /**
213    * Gets the contention manager of the invoking <code>Thread</code>.
214    *
215    * @return the invoking thread's contention manager
216    */
217   static public ContentionManager getContentionManager() {
218     return _threadState.get().manager;
219   }
220   
221   /**
222    * Create a new factory instance.
223    * @param _class class to implement
224    * @return new factory
225    */
226   static public <T> Factory<T> makeFactory(Class<T> _class) {
227     try {
228       Factory<T> factory = (Factory<T>) factoryTable.get(_class);
229      
230       if (factory == null) {
231         factory =  new AtomicFactory<T>(_class, adapterClass);
232         factoryTable.put(_class, factory);
233       }
234       return factory;
235     } catch (Exception e) {
236       throw new PanicException(e);
237     }
238   }
239   
240   /**
241    * Execute a transaction
242    * @param xaction execute this object's <CODE>call()</CODE> method.
243    * @return result of <CODE>call()</CODE> method
244    */
245   public static <T> T doIt(Callable<T> xaction) {
246     ThreadState threadState = _threadState.get();
247     ContentionManager manager = threadState.manager;
248     T result = null;
249    // System.out.println(Thread.currentThread() + " astarted the transaction");
250     boolean flag = false;
251     try {
252       while (true) {
253         threadState.beginTransaction();
254                
255      //   System.out.println(Thread.currentThread() + " offically started the transaction");
256        /////For Integrating with IO////////// 
257         Wrapper.Initialize(Thread.getTransaction());
258         //System.out.println(Thread.currentThread() + " starting");
259       //  System.out.println(Thread.currentThread() + " even more offically started the transaction");
260        ////////////////////////////////////// 
261         try {
262             
263           result = xaction.call();
264         //  System.out.println(Thread.currentThread() + " starting2");
265       //     System.out.println(Thread.currentThread() + " aborted in committing");
266       //  } catch (AbortedException d) {
267           /*  synchronized(benchmark.lock){
268                 System.out.println(Thread.currentThread() + " aborted in committing");
269             }*/
270
271        // } //catch (SnapshotException s) {
272           //threadState.abortTransaction();
273       //} 
274        // catch (Exception e) {
275       //    e.printStackTrace();
276        //   throw new PanicException("Unhandled exception " + e);
277        // }
278          
279             threadState.totalMemRefs += threadState.transaction.memRefs;
280             threadState.transaction.attempts++;
281               
282             Wrapper.prepareIOCommit();
283          
284         ///////////////////////////////
285         
286                 if (threadState.commitTransaction()) {
287                     threadState.committedMemRefs += threadState.transaction.memRefs;
288                     
289                     
290                    //Wrapper.realseOffsets();
291                     
292                     Wrapper.commitIO();
293                     flag = true;
294                }
295         }
296         catch(AbortedException ex){
297             threadState.depth--;
298             
299             System.out.println(Thread.currentThread() + " aborted");
300             ex.printStackTrace();
301            // Wrapper.getTransaction().unlockAllLocks();
302         }
303         catch (Exception e) {
304           e.printStackTrace();
305           throw new PanicException("Unhandled exception " + e);
306         }
307         finally{
308             
309           //  System.out.println("here " + Thread.currentThread());
310             Wrapper.getTransaction().unlockAllLocks();
311                
312             if (Thread.getTransaction() == SpecialLock.getSpecialLock().getOwnerTransaction()){
313             //if (Thread.getTransaction().isIOTransaction()){
314                
315               //   System.out.println("herein " + Thread.currentThread());
316                 SpecialLock.getSpecialLock().unlock(Thread.getTransaction());
317                
318             //    System.out.println("here");
319           ///              
320             }
321             if  (flag == true){
322                System.out.println(Thread.currentThread() + " committed");
323                 break;
324             }
325         }
326       
327         // transaction aborted
328       }
329       if (threadState.transaction != null) {
330         threadState.abortTransaction();
331       }
332     } finally {
333       threadState.transaction = null;
334       Wrapper.setTransaction(null);
335     }
336     // collect statistics
337     synchronized (lock){
338       totalTotalMemRefs = threadState.totalMemRefs;
339       totalCommittedMemRefs = threadState.committedMemRefs;
340       totalCommitted += threadState.committed;
341       totalTotal += threadState.total;
342       threadState.reset();  // set up for next iteration
343     }
344    /* if (result == null)
345         throw new GracefulException();
346     else return result;*/
347     return result;
348   }
349   /**
350    * Execute transaction
351    * @param xaction call this object's <CODE>run()</CODE> method
352    */
353   public static void doIt(final Runnable xaction) {
354     doIt(new Callable<Boolean>() {
355       public Boolean call() {
356         xaction.run();
357         return false;
358       };
359     });
360   }
361   
362   /**
363    * number of transactions committed by this thread
364    * @return number of transactions committed by this thread
365    */
366   public static long getCommitted() {
367     return totalCommitted;
368   }
369   
370   /**
371    * umber of transactions aborted by this thread
372    * @return number of aborted transactions
373    */
374   public static long getAborted() {
375     return totalTotal -  totalCommitted;
376   }
377   
378   /**
379    * number of transactions executed by this thread
380    * @return number of transactions
381    */
382   public static long getTotal() {
383     return totalTotal;
384   }
385   
386   /**
387    * Register a method to be called every time this thread validates any transaction.
388    * @param c abort if this object's <CODE>call()</CODE> method returns false
389    */
390   public static void onValidate(Callable<Boolean> c) {
391     _threadState.get().onValidate.add(c);
392   }
393   /**
394    * Register a method to be called every time the current transaction is validated.
395    * @param c abort if this object's <CODE>call()</CODE> method returns false
396    */
397   public static void onValidateOnce(Callable<Boolean> c) {
398     _threadState.get().onValidateOnce.add(c);
399   }
400   /**
401    * Register a method to be called every time this thread commits a transaction.
402    * @param r call this object's <CODE>run()</CODE> method
403    */
404   public static void onCommit(Runnable r) {
405     _threadState.get().onCommit.add(r);
406   }
407   /**
408    * Register a method to be called once if the current transaction commits.
409    * @param r call this object's <CODE>run()</CODE> method
410    */
411   public static void onCommitOnce(Runnable r) {
412     _threadState.get().onCommitOnce.add(r);
413   }
414   /**
415    * Register a method to be called every time this thread aborts a transaction.
416    * @param r call this objec't <CODE>run()</CODE> method
417    */
418   public static void onAbort(Runnable r) {
419     _threadState.get().onAbort.add(r);
420   }
421   /**
422    * Register a method to be called once if the current transaction aborts.
423    * @param r call this object's <CODE>run()</CODE> method
424    */
425   public static void onAbortOnce(Runnable r) {
426     _threadState.get().onAbortOnce.add(r);
427   }
428   /**
429    * get thread ID for debugging
430    * @return unique id
431    */
432   public static int getID() {
433     return _threadState.get().hashCode();
434   }
435   
436   /**
437    * reset thread statistics
438    */
439   public static void clear() {
440     totalTotal = 0;
441     totalCommitted = 0;
442     totalCommittedMemRefs = 0;
443     totalTotalMemRefs = 0;
444     stop = false;
445   }
446   
447   /**
448    * Class that holds thread's actual state
449    */
450   public static class ThreadState {
451     
452     int depth = 0;
453     ContentionManager manager;
454     
455     private long committed = 0;        // number of committed transactions
456     private long total = 0;            // total number of transactions
457     private long committedMemRefs = 0; // number of committed reads and writes
458     private long totalMemRefs = 0;     // total number of reads and writes
459     
460     Set<Callable<Boolean>> onValidate = new HashSet<Callable<Boolean>>();
461     Set<Runnable>          onCommit   = new HashSet<Runnable>();
462     Set<Runnable>          onAbort    = new HashSet<Runnable>();
463     Set<Callable<Boolean>> onValidateOnce = new HashSet<Callable<Boolean>>();
464     Set<Runnable>          onCommitOnce   = new HashSet<Runnable>();
465     Set<Runnable>          onAbortOnce    = new HashSet<Runnable>();
466     
467     Transaction transaction = null;
468     
469     /**
470      * Creates new ThreadState
471      */
472     public ThreadState() {
473       try {
474         manager = (ContentionManager)Thread.contentionManagerClass.newInstance();
475       } catch (NullPointerException e) {
476         throw new PanicException("No default contention manager class set.");
477       } catch (Exception e) {  // Some problem with instantiation
478         throw new PanicException(e);
479       }
480     }
481     
482     /**
483      * Resets any metering information (commits/aborts, etc).
484      */
485     public void reset() {
486       committed = 0;        // number of committed transactions
487       total = 0;            // total number of transactions
488       committedMemRefs = 0; // number of committed reads and writes
489       totalMemRefs = 0;     // total number of reads and writes
490     }
491     
492     /**
493      * used for debugging
494      * @return string representation of thread state
495      */
496     public String toString() {
497       return
498           "Thread" + hashCode() + "["+
499           "committed: " +  committed + "," +
500           "aborted: " + ( total -  committed) +
501           "]";
502     }
503     
504     /**
505      * Can this transaction still commit?
506      * This method may be called at any time, not just at transaction end,
507      * so we do not clear the onValidateOnce table.
508      * @return true iff transaction might still commit
509      */
510     public boolean validate() {
511       try {
512         // permanent
513         for (Callable<Boolean> v : onValidate) {
514           if (!v.call()) {
515             return false;
516           }
517         }
518         // temporary
519         for (Callable<Boolean> v : onValidateOnce) {
520           if (!v.call()) {
521             return false;
522           }
523         }
524         return transaction.validate();
525       } catch (Exception ex) {
526         return false;
527       }
528     }
529     
530     /**
531      * Call methods registered to be called on commit.
532      */
533     public void runCommitHandlers() {
534       try {
535         // permanent
536         for (Runnable r: onCommit) {
537           r.run();
538         }
539         // temporary
540         for (Runnable r: onCommitOnce) {
541           r.run();
542         }
543         onCommitOnce.clear();
544         onValidateOnce.clear();
545       } catch (Exception ex) {
546         throw new PanicException(ex);
547       }
548     }
549     
550     /**
551      * Starts a new transaction.  Cannot nest transactions deeper than
552      * <code>Thread.MAX_NESTING_DEPTH.</code> The contention manager of the
553      * invoking thread is notified when a transaction is begun.
554      */
555     public void beginTransaction() {
556       transaction = new Transaction();
557       if (depth == 0) {
558         total++;
559       }
560       // first thing to fix if we allow nested transactions
561       if (depth >= 1) {
562         throw new PanicException("beginTransaction: attempting to nest transactions too deeply.");
563       }
564       depth++;
565     }
566     
567     /**
568      * Attempts to commit the current transaction of the invoking
569      * <code>Thread</code>.  Always succeeds for nested
570      * transactions.  The contention manager of the invoking thread is
571      * notified of the result.  If the transaction does not commit
572      * because a <code>TMObject</code> opened for reading was
573      * invalidated, the contention manager is also notified of the
574      * inonValidate.
575      *
576      *
577      * @return whether commit succeeded.
578      */
579     public boolean commitTransaction() {
580       depth--;
581       if (depth < 0) {
582         throw new PanicException("commitTransaction invoked when no transaction active.");
583       }
584       if (depth > 0) {
585         throw new PanicException("commitTransaction invoked on nested transaction.");
586       }
587       if (depth == 0) {
588         if (validate() && transaction.commit()) {
589           committed++;
590           runCommitHandlers();
591           return true;
592         }
593         abortTransaction();
594         return false;
595       } else {
596         return true;
597       }
598     }
599     
600     /**
601      * Aborts the current transaction of the invoking <code>Thread</code>.
602      * Does not end transaction, but ensures it will never commit.
603      */
604     public void abortTransaction() {
605       runAbortHandlers();
606       transaction.abort();
607     }
608     
609     /**
610      * Call methods registered to be called on commit.
611      */
612     public void runAbortHandlers() {
613       try {
614         // permanent
615         for (Runnable r: onAbort) {
616           r.run();
617         }
618         // temporary
619         for (Runnable r: onAbortOnce) {
620           r.run();
621         }
622         onAbortOnce.clear();
623         onValidateOnce.clear();
624       } catch (Exception ex) {
625         throw new PanicException(ex);
626       }
627     }
628   }
629 }