*** empty log message ***
[IRC.git] / Robust / Transactions / dstm2 / src / dstm2 / Thread.java
1 package dstm2;
2
3 /*
4  * Thread.java
5  *
6  * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
7  * Clara, California 95054, U.S.A.  All rights reserved.  
8  * 
9  * Sun Microsystems, Inc. has intellectual property rights relating to
10  * technology embodied in the product that is described in this
11  * document.  In particular, and without limitation, these
12  * intellectual property rights may include one or more of the
13  * U.S. patents listed at http://www.sun.com/patents and one or more
14  * additional patents or pending patent applications in the U.S. and
15  * in other countries.
16  * 
17  * U.S. Government Rights - Commercial software.
18  * Government users are subject to the Sun Microsystems, Inc. standard
19  * license agreement and applicable provisions of the FAR and its
20  * supplements.  Use is subject to license terms.  Sun, Sun
21  * Microsystems, the Sun logo and Java are trademarks or registered
22  * trademarks of Sun Microsystems, Inc. in the U.S. and other
23  * countries.  
24  * 
25  * This product is covered and controlled by U.S. Export Control laws
26  * and may be subject to the export or import laws in other countries.
27  * Nuclear, missile, chemical biological weapons or nuclear maritime
28  * end uses or end users, whether direct or indirect, are strictly
29  * prohibited.  Export or reexport to countries subject to
30  * U.S. embargo or to entities identified on U.S. export exclusion
31  * lists, including, but not limited to, the denied persons and
32  * specially designated nationals lists is strictly prohibited.
33  */
34
35 //package dstm2;
36
37
38 import TransactionalIO.exceptions.AbortedException;
39 import TransactionalIO.exceptions.GracefulException;
40 import TransactionalIO.exceptions.PanicException;
41
42 import dstm2.factory.AtomicFactory;
43 import dstm2.factory.Factory;
44 import TransactionalIO.benchmarks.benchmark;
45 import dstm2.SpecialLock;
46 import TransactionalIO.core.TransactionalFile;
47 import TransactionalIO.core.Wrapper;
48 import java.lang.reflect.Constructor;
49 import java.lang.reflect.InvocationTargetException;
50 import java.util.ArrayList;
51 import java.util.Collections;
52 import java.util.HashMap;
53 import java.util.HashSet;
54 import java.util.LinkedList;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.Set;
58 import java.util.concurrent.Callable;
59 import static dstm2.Defaults.*;
60 /**
61  * The basic unit of computation for the transactional memory.  This
62  * class extends <code>java.lang.Thread</code> by providing methods to
63  * begin, commit and abort transactions.
64  *
65  * Every <code>Thread</code> has a contention manager, created when
66  * the thread is created.  Before creating any <code>Thread</code>s,
67  * you must call <code>Thread.setContentionManager</code> to set the
68  * class of the contention manager that will be created.  The
69  * contention manager of a thread is notified (by invoking its
70  * notification methods) of the results of any methods involving the
71  * thread.  It is also consulted on whether a transaction should be
72  * begun.
73  *
74  * @see dstm2.ContentionManager
75  */
76 public class Thread extends java.lang.Thread{
77   /**
78    * Contention manager class.
79    */
80   protected static Class contentionManagerClass;
81   
82   /**
83    * Adapter class.
84    */
85   protected static Class<dstm2.factory.Adapter> adapterClass;
86   
87   /**
88    * Set to true when benchmark runs out of time.
89    **/
90   public static volatile boolean stop = false;
91   /**
92    * number of committed transactions for all threads
93    */
94   public static long totalCommitted = 0;
95   /**
96    * total number of transactions for all threads
97    */
98   public static long totalTotal = 0;
99   /**
100    * number of committed memory references for all threads
101    */
102   public static long totalCommittedMemRefs = 0;
103   /**
104    * total number of memory references for all threads
105    */
106   public static long totalTotalMemRefs = 0;
107   
108   static ThreadLocal<ThreadState> _threadState = new ThreadLocal<ThreadState>() {
109     protected synchronized ThreadState initialValue() {
110       return new ThreadState();
111     }
112   };
113   static ThreadLocal<Thread> _thread = new ThreadLocal<Thread>() {
114     protected synchronized Thread initialValue() {
115       return null;
116     }
117   };
118   
119   private static int MAX_NESTING_DEPTH = 1;
120   
121   private static Object lock = new Object();
122   
123   // Memo-ize factories so we don't have to recreate them.
124   private static Map<Class,Factory> factoryTable
125       = Collections.synchronizedMap(new HashMap<Class,Factory>());
126   
127   /**
128    * Create thread to run a method.
129    * @param target execute this object's <CODE>run()</CODE> method
130    */
131   public Thread(final Runnable target) {
132     super(new Runnable() {
133       public void run() {
134         ThreadState threadState = _threadState.get();
135         threadState.reset();
136         target.run();
137         // collect statistics
138         synchronized (lock){
139           totalCommitted += threadState.committed;
140           totalTotal += threadState.total;
141           totalCommittedMemRefs += threadState.committedMemRefs;
142           totalTotalMemRefs += threadState.totalMemRefs;
143         }
144       }
145     });
146   }
147   /**
148    * no-arg constructor
149    */
150   public Thread() {
151     super();
152   }
153   
154   /**
155    * Establishes a contention manager.  You must call this method
156    * before creating any <code>Thread</code>.
157    *
158    * @see dstm2.ContentionManager
159    * @param theClass class of desired contention manager.
160    */
161   public static void setContentionManagerClass(Class theClass) {
162     Class cm;
163     try {
164       cm = Class.forName("dstm2.ContentionManager");
165     } catch (ClassNotFoundException e) {
166       throw new PanicException(e);
167     }
168     try {
169       contentionManagerClass = theClass;
170     } catch (Exception e) {
171       throw new PanicException("The class " + theClass
172           + " does not implement dstm2.ContentionManager");
173     }
174   }
175   
176   /**
177    * set Adapter class for this thread
178    * @param adapterClassName adapter class as string
179    */
180   public static void setAdapterClass(String adapterClassName) {
181     try {
182       adapterClass = (Class<dstm2.factory.Adapter>)Class.forName(adapterClassName);
183     } catch (ClassNotFoundException ex) {
184       throw new PanicException("Adapter class not found: %s\n", adapterClassName);
185     }
186   }
187   
188   /**
189    * Tests whether the current transaction can still commit.  Does not
190    * actually end the transaction (either <code>commitTransaction</code> or
191    * <code>abortTransaction</code> must still be called).  The contention
192    * manager of the invoking thread is notified if the onValidate fails
193    * because a <code>TMObject</code> opened for reading was invalidated.
194    *
195    * @return whether the current transaction may commit successfully.
196    */
197   static public boolean validate() {
198     ThreadState threadState = _threadState.get();
199     return threadState.validate();
200   }
201   
202   /**
203    * Gets the current transaction, if any, of the invoking <code>Thread</code>.
204    *
205    * @return the current thread's current transaction; <code>null</code> if
206    *         there is no current transaction.
207    */
208   static public Transaction getTransaction() {
209     return _threadState.get().transaction;
210   }
211   
212   /**
213    * Gets the contention manager of the invoking <code>Thread</code>.
214    *
215    * @return the invoking thread's contention manager
216    */
217   static public ContentionManager getContentionManager() {
218     return _threadState.get().manager;
219   }
220   
221   /**
222    * Create a new factory instance.
223    * @param _class class to implement
224    * @return new factory
225    */
226   static public <T> Factory<T> makeFactory(Class<T> _class) {
227     try {
228       Factory<T> factory = (Factory<T>) factoryTable.get(_class);
229      
230       if (factory == null) {
231         factory =  new AtomicFactory<T>(_class, adapterClass);
232         factoryTable.put(_class, factory);
233       }
234       return factory;
235     } catch (Exception e) {
236       throw new PanicException(e);
237     }
238   }
239   
240   /**
241    * Execute a transaction
242    * @param xaction execute this object's <CODE>call()</CODE> method.
243    * @return result of <CODE>call()</CODE> method
244    */
245   public static <T> T doIt(Callable<T> xaction) {
246     ThreadState threadState = _threadState.get();
247     ContentionManager manager = threadState.manager;
248     T result = null;
249    // System.out.println(Thread.currentThread() + " astarted the transaction");
250     boolean flag = false;
251     try {
252       while (true) {
253         threadState.beginTransaction();
254      //   System.out.println(Thread.currentThread() + " offically started the transaction");
255        /////For Integrating with IO////////// 
256         Wrapper.Initialize(Thread.getTransaction());
257       //  System.out.println(Thread.currentThread() + " even more offically started the transaction");
258        ////////////////////////////////////// 
259         try {
260           result = xaction.call();
261       //     System.out.println(Thread.currentThread() + " aborted in committing");
262       //  } catch (AbortedException d) {
263           /*  synchronized(benchmark.lock){
264                 System.out.println(Thread.currentThread() + " aborted in committing");
265             }*/
266
267        // } //catch (SnapshotException s) {
268           //threadState.abortTransaction();
269       //} 
270        // catch (Exception e) {
271       //    e.printStackTrace();
272        //   throw new PanicException("Unhandled exception " + e);
273        // }
274             threadState.totalMemRefs += threadState.transaction.memRefs;
275             threadState.transaction.attempts++;
276      
277             Wrapper.prepareIOCommit();
278
279         ///////////////////////////////
280         
281                 if (threadState.commitTransaction()) {
282                     threadState.committedMemRefs += threadState.transaction.memRefs;
283                     
284                     Wrapper.commitIO();
285                     flag = true;
286                }
287         }
288         catch(AbortedException ex){
289             threadState.depth--;
290            // System.out.println(Thread.currentThread() + " aborted");
291            // Wrapper.getTransaction().unlockAllLocks();
292         }
293         catch (Exception e) {
294           e.printStackTrace();
295           throw new PanicException("Unhandled exception " + e);
296         }
297         finally{
298             
299           //  System.out.println("here " + Thread.currentThread());
300             Wrapper.getTransaction().unlockAllLocks();
301             if (Thread.getTransaction() == SpecialLock.getSpecialLock().getOwnerTransaction()){
302             //     System.out.println("herein " + Thread.currentThread());
303                 SpecialLock.getSpecialLock().unlock(Thread.getTransaction());
304             //    System.out.println("here");
305                         
306             }
307             if  (flag == true)
308                 break;
309         }
310       
311         // transaction aborted
312       }
313       if (threadState.transaction != null) {
314         threadState.abortTransaction();
315       }
316     } finally {
317       threadState.transaction = null;
318       Wrapper.setTransaction(null);
319     }
320     // collect statistics
321     synchronized (lock){
322       totalTotalMemRefs = threadState.totalMemRefs;
323       totalCommittedMemRefs = threadState.committedMemRefs;
324       totalCommitted += threadState.committed;
325       totalTotal += threadState.total;
326       threadState.reset();  // set up for next iteration
327     }
328    /* if (result == null)
329         throw new GracefulException();
330     else return result;*/
331     return result;
332   }
333   /**
334    * Execute transaction
335    * @param xaction call this object's <CODE>run()</CODE> method
336    */
337   public static void doIt(final Runnable xaction) {
338     doIt(new Callable<Boolean>() {
339       public Boolean call() {
340         xaction.run();
341         return false;
342       };
343     });
344   }
345   
346   /**
347    * number of transactions committed by this thread
348    * @return number of transactions committed by this thread
349    */
350   public static long getCommitted() {
351     return totalCommitted;
352   }
353   
354   /**
355    * umber of transactions aborted by this thread
356    * @return number of aborted transactions
357    */
358   public static long getAborted() {
359     return totalTotal -  totalCommitted;
360   }
361   
362   /**
363    * number of transactions executed by this thread
364    * @return number of transactions
365    */
366   public static long getTotal() {
367     return totalTotal;
368   }
369   
370   /**
371    * Register a method to be called every time this thread validates any transaction.
372    * @param c abort if this object's <CODE>call()</CODE> method returns false
373    */
374   public static void onValidate(Callable<Boolean> c) {
375     _threadState.get().onValidate.add(c);
376   }
377   /**
378    * Register a method to be called every time the current transaction is validated.
379    * @param c abort if this object's <CODE>call()</CODE> method returns false
380    */
381   public static void onValidateOnce(Callable<Boolean> c) {
382     _threadState.get().onValidateOnce.add(c);
383   }
384   /**
385    * Register a method to be called every time this thread commits a transaction.
386    * @param r call this object's <CODE>run()</CODE> method
387    */
388   public static void onCommit(Runnable r) {
389     _threadState.get().onCommit.add(r);
390   }
391   /**
392    * Register a method to be called once if the current transaction commits.
393    * @param r call this object's <CODE>run()</CODE> method
394    */
395   public static void onCommitOnce(Runnable r) {
396     _threadState.get().onCommitOnce.add(r);
397   }
398   /**
399    * Register a method to be called every time this thread aborts a transaction.
400    * @param r call this objec't <CODE>run()</CODE> method
401    */
402   public static void onAbort(Runnable r) {
403     _threadState.get().onAbort.add(r);
404   }
405   /**
406    * Register a method to be called once if the current transaction aborts.
407    * @param r call this object's <CODE>run()</CODE> method
408    */
409   public static void onAbortOnce(Runnable r) {
410     _threadState.get().onAbortOnce.add(r);
411   }
412   /**
413    * get thread ID for debugging
414    * @return unique id
415    */
416   public static int getID() {
417     return _threadState.get().hashCode();
418   }
419   
420   /**
421    * reset thread statistics
422    */
423   public static void clear() {
424     totalTotal = 0;
425     totalCommitted = 0;
426     totalCommittedMemRefs = 0;
427     totalTotalMemRefs = 0;
428     stop = false;
429   }
430   
431   /**
432    * Class that holds thread's actual state
433    */
434   public static class ThreadState {
435     
436     int depth = 0;
437     ContentionManager manager;
438     
439     private long committed = 0;        // number of committed transactions
440     private long total = 0;            // total number of transactions
441     private long committedMemRefs = 0; // number of committed reads and writes
442     private long totalMemRefs = 0;     // total number of reads and writes
443     
444     Set<Callable<Boolean>> onValidate = new HashSet<Callable<Boolean>>();
445     Set<Runnable>          onCommit   = new HashSet<Runnable>();
446     Set<Runnable>          onAbort    = new HashSet<Runnable>();
447     Set<Callable<Boolean>> onValidateOnce = new HashSet<Callable<Boolean>>();
448     Set<Runnable>          onCommitOnce   = new HashSet<Runnable>();
449     Set<Runnable>          onAbortOnce    = new HashSet<Runnable>();
450     
451     Transaction transaction = null;
452     
453     /**
454      * Creates new ThreadState
455      */
456     public ThreadState() {
457       try {
458         manager = (ContentionManager)Thread.contentionManagerClass.newInstance();
459       } catch (NullPointerException e) {
460         throw new PanicException("No default contention manager class set.");
461       } catch (Exception e) {  // Some problem with instantiation
462         throw new PanicException(e);
463       }
464     }
465     
466     /**
467      * Resets any metering information (commits/aborts, etc).
468      */
469     public void reset() {
470       committed = 0;        // number of committed transactions
471       total = 0;            // total number of transactions
472       committedMemRefs = 0; // number of committed reads and writes
473       totalMemRefs = 0;     // total number of reads and writes
474     }
475     
476     /**
477      * used for debugging
478      * @return string representation of thread state
479      */
480     public String toString() {
481       return
482           "Thread" + hashCode() + "["+
483           "committed: " +  committed + "," +
484           "aborted: " + ( total -  committed) +
485           "]";
486     }
487     
488     /**
489      * Can this transaction still commit?
490      * This method may be called at any time, not just at transaction end,
491      * so we do not clear the onValidateOnce table.
492      * @return true iff transaction might still commit
493      */
494     public boolean validate() {
495       try {
496         // permanent
497         for (Callable<Boolean> v : onValidate) {
498           if (!v.call()) {
499             return false;
500           }
501         }
502         // temporary
503         for (Callable<Boolean> v : onValidateOnce) {
504           if (!v.call()) {
505             return false;
506           }
507         }
508         return transaction.validate();
509       } catch (Exception ex) {
510         return false;
511       }
512     }
513     
514     /**
515      * Call methods registered to be called on commit.
516      */
517     public void runCommitHandlers() {
518       try {
519         // permanent
520         for (Runnable r: onCommit) {
521           r.run();
522         }
523         // temporary
524         for (Runnable r: onCommitOnce) {
525           r.run();
526         }
527         onCommitOnce.clear();
528         onValidateOnce.clear();
529       } catch (Exception ex) {
530         throw new PanicException(ex);
531       }
532     }
533     
534     /**
535      * Starts a new transaction.  Cannot nest transactions deeper than
536      * <code>Thread.MAX_NESTING_DEPTH.</code> The contention manager of the
537      * invoking thread is notified when a transaction is begun.
538      */
539     public void beginTransaction() {
540       transaction = new Transaction();
541       if (depth == 0) {
542         total++;
543       }
544       // first thing to fix if we allow nested transactions
545       if (depth >= 1) {
546         throw new PanicException("beginTransaction: attempting to nest transactions too deeply.");
547       }
548       depth++;
549     }
550     
551     /**
552      * Attempts to commit the current transaction of the invoking
553      * <code>Thread</code>.  Always succeeds for nested
554      * transactions.  The contention manager of the invoking thread is
555      * notified of the result.  If the transaction does not commit
556      * because a <code>TMObject</code> opened for reading was
557      * invalidated, the contention manager is also notified of the
558      * inonValidate.
559      *
560      *
561      * @return whether commit succeeded.
562      */
563     public boolean commitTransaction() {
564       depth--;
565       if (depth < 0) {
566         throw new PanicException("commitTransaction invoked when no transaction active.");
567       }
568       if (depth > 0) {
569         throw new PanicException("commitTransaction invoked on nested transaction.");
570       }
571       if (depth == 0) {
572         if (validate() && transaction.commit()) {
573           committed++;
574           runCommitHandlers();
575           return true;
576         }
577         abortTransaction();
578         return false;
579       } else {
580         return true;
581       }
582     }
583     
584     /**
585      * Aborts the current transaction of the invoking <code>Thread</code>.
586      * Does not end transaction, but ensures it will never commit.
587      */
588     public void abortTransaction() {
589       runAbortHandlers();
590       transaction.abort();
591     }
592     
593     /**
594      * Call methods registered to be called on commit.
595      */
596     public void runAbortHandlers() {
597       try {
598         // permanent
599         for (Runnable r: onAbort) {
600           r.run();
601         }
602         // temporary
603         for (Runnable r: onAbortOnce) {
604           r.run();
605         }
606         onAbortOnce.clear();
607         onValidateOnce.clear();
608       } catch (Exception ex) {
609         throw new PanicException(ex);
610       }
611     }
612   }
613 }