changes.
[IRC.git] / Robust / Transactions / dstm2src / Thread.java
1 package dstm2;
2
3 /*
4  * Thread.java
5  *
6  * Copyright 2006 Sun Microsystems, Inc., 4150 Network Circle, Santa
7  * Clara, California 95054, U.S.A.  All rights reserved.  
8  * 
9  * Sun Microsystems, Inc. has intellectual property rights relating to
10  * technology embodied in the product that is described in this
11  * document.  In particular, and without limitation, these
12  * intellectual property rights may include one or more of the
13  * U.S. patents listed at http://www.sun.com/patents and one or more
14  * additional patents or pending patent applications in the U.S. and
15  * in other countries.
16  * 
17  * U.S. Government Rights - Commercial software.
18  * Government users are subject to the Sun Microsystems, Inc. standard
19  * license agreement and applicable provisions of the FAR and its
20  * supplements.  Use is subject to license terms.  Sun, Sun
21  * Microsystems, the Sun logo and Java are trademarks or registered
22  * trademarks of Sun Microsystems, Inc. in the U.S. and other
23  * countries.  
24  * 
25  * This product is covered and controlled by U.S. Export Control laws
26  * and may be subject to the export or import laws in other countries.
27  * Nuclear, missile, chemical biological weapons or nuclear maritime
28  * end uses or end users, whether direct or indirect, are strictly
29  * prohibited.  Export or reexport to countries subject to
30  * U.S. embargo or to entities identified on U.S. export exclusion
31  * lists, including, but not limited to, the denied persons and
32  * specially designated nationals lists is strictly prohibited.
33  */
34
35 //package dstm2;
36
37
38 import TransactionalIO.exceptions.AbortedException;
39 import TransactionalIO.exceptions.GracefulException;
40 import TransactionalIO.exceptions.PanicException;
41
42 import dstm2.factory.AtomicFactory;
43 import dstm2.factory.Factory;
44 import TransactionalIO.benchmarks.benchmark;
45 import TransactionalIO.core.TransactionalFile;
46 import TransactionalIO.core.Wrapper;
47 import java.lang.reflect.Constructor;
48 import java.lang.reflect.InvocationTargetException;
49 import java.util.ArrayList;
50 import java.util.Collections;
51 import java.util.HashMap;
52 import java.util.HashSet;
53 import java.util.LinkedList;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.Set;
57 import java.util.concurrent.Callable;
58 import static dstm2.Defaults.*;
59 /**
60  * The basic unit of computation for the transactional memory.  This
61  * class extends <code>java.lang.Thread</code> by providing methods to
62  * begin, commit and abort transactions.
63  *
64  * Every <code>Thread</code> has a contention manager, created when
65  * the thread is created.  Before creating any <code>Thread</code>s,
66  * you must call <code>Thread.setContentionManager</code> to set the
67  * class of the contention manager that will be created.  The
68  * contention manager of a thread is notified (by invoking its
69  * notification methods) of the results of any methods involving the
70  * thread.  It is also consulted on whether a transaction should be
71  * begun.
72  *
73  * @see dstm2.ContentionManager
74  */
75 public class Thread extends java.lang.Thread {
76   /**
77    * Contention manager class.
78    */
79   protected static Class contentionManagerClass;
80   
81   /**
82    * Adapter class.
83    */
84   protected static Class<dstm2.factory.Adapter> adapterClass;
85   
86   /**
87    * Set to true when benchmark runs out of time.
88    **/
89   public static volatile boolean stop = false;
90   /**
91    * number of committed transactions for all threads
92    */
93   public static long totalCommitted = 0;
94   /**
95    * total number of transactions for all threads
96    */
97   public static long totalTotal = 0;
98   /**
99    * number of committed memory references for all threads
100    */
101   public static long totalCommittedMemRefs = 0;
102   /**
103    * total number of memory references for all threads
104    */
105   public static long totalTotalMemRefs = 0;
106   
107   static ThreadLocal<ThreadState> _threadState = new ThreadLocal<ThreadState>() {
108     protected synchronized ThreadState initialValue() {
109       return new ThreadState();
110     }
111   };
112   static ThreadLocal<Thread> _thread = new ThreadLocal<Thread>() {
113     protected synchronized Thread initialValue() {
114       return null;
115     }
116   };
117   
118   private static int MAX_NESTING_DEPTH = 1;
119   
120   private static Object lock = new Object();
121   
122   // Memo-ize factories so we don't have to recreate them.
123   private static Map<Class,Factory> factoryTable
124       = Collections.synchronizedMap(new HashMap<Class,Factory>());
125   
126   /**
127    * Create thread to run a method.
128    * @param target execute this object's <CODE>run()</CODE> method
129    */
130   public Thread(final Runnable target) {
131     super(new Runnable() {
132       public void run() {
133         ThreadState threadState = _threadState.get();
134         threadState.reset();
135         target.run();
136         // collect statistics
137         synchronized (lock){
138           totalCommitted += threadState.committed;
139           totalTotal += threadState.total;
140           totalCommittedMemRefs += threadState.committedMemRefs;
141           totalTotalMemRefs += threadState.totalMemRefs;
142         }
143       }
144     });
145   }
146   /**
147    * no-arg constructor
148    */
149   public Thread() {
150     super();
151   }
152   
153   /**
154    * Establishes a contention manager.  You must call this method
155    * before creating any <code>Thread</code>.
156    *
157    * @see dstm2.ContentionManager
158    * @param theClass class of desired contention manager.
159    */
160   public static void setContentionManagerClass(Class theClass) {
161     Class cm;
162     try {
163       cm = Class.forName("dstm2.ContentionManager");
164     } catch (ClassNotFoundException e) {
165       throw new PanicException(e);
166     }
167     try {
168       contentionManagerClass = theClass;
169     } catch (Exception e) {
170       throw new PanicException("The class " + theClass
171           + " does not implement dstm2.ContentionManager");
172     }
173   }
174   
175   /**
176    * set Adapter class for this thread
177    * @param adapterClassName adapter class as string
178    */
179   public static void setAdapterClass(String adapterClassName) {
180     try {
181       adapterClass = (Class<dstm2.factory.Adapter>)Class.forName(adapterClassName);
182     } catch (ClassNotFoundException ex) {
183       throw new PanicException("Adapter class not found: %s\n", adapterClassName);
184     }
185   }
186   
187   /**
188    * Tests whether the current transaction can still commit.  Does not
189    * actually end the transaction (either <code>commitTransaction</code> or
190    * <code>abortTransaction</code> must still be called).  The contention
191    * manager of the invoking thread is notified if the onValidate fails
192    * because a <code>TMObject</code> opened for reading was invalidated.
193    *
194    * @return whether the current transaction may commit successfully.
195    */
196   static public boolean validate() {
197     ThreadState threadState = _threadState.get();
198     return threadState.validate();
199   }
200   
201   /**
202    * Gets the current transaction, if any, of the invoking <code>Thread</code>.
203    *
204    * @return the current thread's current transaction; <code>null</code> if
205    *         there is no current transaction.
206    */
207   static public Transaction getTransaction() {
208     return _threadState.get().transaction;
209   }
210   
211   /**
212    * Gets the contention manager of the invoking <code>Thread</code>.
213    *
214    * @return the invoking thread's contention manager
215    */
216   static public ContentionManager getContentionManager() {
217     return _threadState.get().manager;
218   }
219   
220   /**
221    * Create a new factory instance.
222    * @param _class class to implement
223    * @return new factory
224    */
225   static public <T> Factory<T> makeFactory(Class<T> _class) {
226     try {
227       Factory<T> factory = (Factory<T>) factoryTable.get(_class);
228      
229       if (factory == null) {
230         factory =  new AtomicFactory<T>(_class, adapterClass);
231         factoryTable.put(_class, factory);
232       }
233       return factory;
234     } catch (Exception e) {
235       throw new PanicException(e);
236     }
237   }
238   
239   /**
240    * Execute a transaction
241    * @param xaction execute this object's <CODE>call()</CODE> method.
242    * @return result of <CODE>call()</CODE> method
243    */
244   public static <T> T doIt(Callable<T> xaction) {
245     ThreadState threadState = _threadState.get();
246     ContentionManager manager = threadState.manager;
247     T result = null;
248     boolean flag = false;
249     try {
250       while (true) {
251         threadState.beginTransaction();
252
253        /////For Integrating with IO////////// 
254         Wrapper.Initialize(Thread.getTransaction());
255        ////////////////////////////////////// 
256         try {
257           result = xaction.call();
258           
259       //  } catch (AbortedException d) {
260           /*  synchronized(benchmark.lock){
261                 System.out.println(Thread.currentThread() + " aborted in committing");
262             }*/
263
264        // } //catch (SnapshotException s) {
265           //threadState.abortTransaction();
266       //} 
267        // catch (Exception e) {
268       //    e.printStackTrace();
269        //   throw new PanicException("Unhandled exception " + e);
270        // }
271         threadState.totalMemRefs += threadState.transaction.memRefs;
272         threadState.transaction.attempts++;
273          /*synchronized(benchmark.lock){
274                     System.out.println(Thread.currentThread() + " ghabl az try");
275                 }*/
276        // try{
277             
278                 
279             //    if (!flag)
280                     Wrapper.prepareIOCommit();
281                 /* synchronized(benchmark.lock){
282                     System.out.println(Thread.currentThread() + " to try");
283                 }*/
284         ///////////////////////////////
285         
286                 if (threadState.commitTransaction()) {
287                     threadState.committedMemRefs += threadState.transaction.memRefs;
288                     
289                     Wrapper.commitIO();
290                     flag = true;
291                }
292         }
293         catch(AbortedException ex){
294             threadState.depth--;
295          ///   synchronized(benchmark.lock){
296             //    System.out.println(Thread.currentThread() + " aborted in committing");
297             //}
298          
299         }
300         catch (Exception e) {
301           e.printStackTrace();
302           throw new PanicException("Unhandled exception " + e);
303         }
304         finally{
305             
306             Wrapper.getTransaction().unlockAllLocks();
307             if  (flag == true)
308                 break;
309         }
310       
311         // transaction aborted
312       }
313       if (threadState.transaction != null) {
314         threadState.abortTransaction();
315       }
316     } finally {
317       threadState.transaction = null;
318       Wrapper.setTransaction(null);
319     }
320     // collect statistics
321     synchronized (lock){
322       totalTotalMemRefs = threadState.totalMemRefs;
323       totalCommittedMemRefs = threadState.committedMemRefs;
324       totalCommitted += threadState.committed;
325       totalTotal += threadState.total;
326       threadState.reset();  // set up for next iteration
327     }
328     throw new GracefulException();
329   }
330   /**
331    * Execute transaction
332    * @param xaction call this object's <CODE>run()</CODE> method
333    */
334   public static void doIt(final Runnable xaction) {
335     doIt(new Callable<Boolean>() {
336       public Boolean call() {
337         xaction.run();
338         return false;
339       };
340     });
341   }
342   
343   /**
344    * number of transactions committed by this thread
345    * @return number of transactions committed by this thread
346    */
347   public static long getCommitted() {
348     return totalCommitted;
349   }
350   
351   /**
352    * umber of transactions aborted by this thread
353    * @return number of aborted transactions
354    */
355   public static long getAborted() {
356     return totalTotal -  totalCommitted;
357   }
358   
359   /**
360    * number of transactions executed by this thread
361    * @return number of transactions
362    */
363   public static long getTotal() {
364     return totalTotal;
365   }
366   
367   /**
368    * Register a method to be called every time this thread validates any transaction.
369    * @param c abort if this object's <CODE>call()</CODE> method returns false
370    */
371   public static void onValidate(Callable<Boolean> c) {
372     _threadState.get().onValidate.add(c);
373   }
374   /**
375    * Register a method to be called every time the current transaction is validated.
376    * @param c abort if this object's <CODE>call()</CODE> method returns false
377    */
378   public static void onValidateOnce(Callable<Boolean> c) {
379     _threadState.get().onValidateOnce.add(c);
380   }
381   /**
382    * Register a method to be called every time this thread commits a transaction.
383    * @param r call this object's <CODE>run()</CODE> method
384    */
385   public static void onCommit(Runnable r) {
386     _threadState.get().onCommit.add(r);
387   }
388   /**
389    * Register a method to be called once if the current transaction commits.
390    * @param r call this object's <CODE>run()</CODE> method
391    */
392   public static void onCommitOnce(Runnable r) {
393     _threadState.get().onCommitOnce.add(r);
394   }
395   /**
396    * Register a method to be called every time this thread aborts a transaction.
397    * @param r call this objec't <CODE>run()</CODE> method
398    */
399   public static void onAbort(Runnable r) {
400     _threadState.get().onAbort.add(r);
401   }
402   /**
403    * Register a method to be called once if the current transaction aborts.
404    * @param r call this object's <CODE>run()</CODE> method
405    */
406   public static void onAbortOnce(Runnable r) {
407     _threadState.get().onAbortOnce.add(r);
408   }
409   /**
410    * get thread ID for debugging
411    * @return unique id
412    */
413   public static int getID() {
414     return _threadState.get().hashCode();
415   }
416   
417   /**
418    * reset thread statistics
419    */
420   public static void clear() {
421     totalTotal = 0;
422     totalCommitted = 0;
423     totalCommittedMemRefs = 0;
424     totalTotalMemRefs = 0;
425     stop = false;
426   }
427   
428   /**
429    * Class that holds thread's actual state
430    */
431   public static class ThreadState {
432     
433     int depth = 0;
434     ContentionManager manager;
435     
436     private long committed = 0;        // number of committed transactions
437     private long total = 0;            // total number of transactions
438     private long committedMemRefs = 0; // number of committed reads and writes
439     private long totalMemRefs = 0;     // total number of reads and writes
440     
441     Set<Callable<Boolean>> onValidate = new HashSet<Callable<Boolean>>();
442     Set<Runnable>          onCommit   = new HashSet<Runnable>();
443     Set<Runnable>          onAbort    = new HashSet<Runnable>();
444     Set<Callable<Boolean>> onValidateOnce = new HashSet<Callable<Boolean>>();
445     Set<Runnable>          onCommitOnce   = new HashSet<Runnable>();
446     Set<Runnable>          onAbortOnce    = new HashSet<Runnable>();
447     
448     Transaction transaction = null;
449     
450     /**
451      * Creates new ThreadState
452      */
453     public ThreadState() {
454       try {
455         manager = (ContentionManager)Thread.contentionManagerClass.newInstance();
456       } catch (NullPointerException e) {
457         throw new PanicException("No default contention manager class set.");
458       } catch (Exception e) {  // Some problem with instantiation
459         throw new PanicException(e);
460       }
461     }
462     
463     /**
464      * Resets any metering information (commits/aborts, etc).
465      */
466     public void reset() {
467       committed = 0;        // number of committed transactions
468       total = 0;            // total number of transactions
469       committedMemRefs = 0; // number of committed reads and writes
470       totalMemRefs = 0;     // total number of reads and writes
471     }
472     
473     /**
474      * used for debugging
475      * @return string representation of thread state
476      */
477     public String toString() {
478       return
479           "Thread" + hashCode() + "["+
480           "committed: " +  committed + "," +
481           "aborted: " + ( total -  committed) +
482           "]";
483     }
484     
485     /**
486      * Can this transaction still commit?
487      * This method may be called at any time, not just at transaction end,
488      * so we do not clear the onValidateOnce table.
489      * @return true iff transaction might still commit
490      */
491     public boolean validate() {
492       try {
493         // permanent
494         for (Callable<Boolean> v : onValidate) {
495           if (!v.call()) {
496             return false;
497           }
498         }
499         // temporary
500         for (Callable<Boolean> v : onValidateOnce) {
501           if (!v.call()) {
502             return false;
503           }
504         }
505         return transaction.validate();
506       } catch (Exception ex) {
507         return false;
508       }
509     }
510     
511     /**
512      * Call methods registered to be called on commit.
513      */
514     public void runCommitHandlers() {
515       try {
516         // permanent
517         for (Runnable r: onCommit) {
518           r.run();
519         }
520         // temporary
521         for (Runnable r: onCommitOnce) {
522           r.run();
523         }
524         onCommitOnce.clear();
525         onValidateOnce.clear();
526       } catch (Exception ex) {
527         throw new PanicException(ex);
528       }
529     }
530     
531     /**
532      * Starts a new transaction.  Cannot nest transactions deeper than
533      * <code>Thread.MAX_NESTING_DEPTH.</code> The contention manager of the
534      * invoking thread is notified when a transaction is begun.
535      */
536     public void beginTransaction() {
537       transaction = new Transaction();
538       if (depth == 0) {
539         total++;
540       }
541       // first thing to fix if we allow nested transactions
542       if (depth >= 1) {
543         throw new PanicException("beginTransaction: attempting to nest transactions too deeply.");
544       }
545       depth++;
546     }
547     
548     /**
549      * Attempts to commit the current transaction of the invoking
550      * <code>Thread</code>.  Always succeeds for nested
551      * transactions.  The contention manager of the invoking thread is
552      * notified of the result.  If the transaction does not commit
553      * because a <code>TMObject</code> opened for reading was
554      * invalidated, the contention manager is also notified of the
555      * inonValidate.
556      *
557      *
558      * @return whether commit succeeded.
559      */
560     public boolean commitTransaction() {
561       depth--;
562       if (depth < 0) {
563         throw new PanicException("commitTransaction invoked when no transaction active.");
564       }
565       if (depth > 0) {
566         throw new PanicException("commitTransaction invoked on nested transaction.");
567       }
568       if (depth == 0) {
569         if (validate() && transaction.commit()) {
570           committed++;
571           runCommitHandlers();
572           return true;
573         }
574         abortTransaction();
575         return false;
576       } else {
577         return true;
578       }
579     }
580     
581     /**
582      * Aborts the current transaction of the invoking <code>Thread</code>.
583      * Does not end transaction, but ensures it will never commit.
584      */
585     public void abortTransaction() {
586       runAbortHandlers();
587       transaction.abort();
588     }
589     
590     /**
591      * Call methods registered to be called on commit.
592      */
593     public void runAbortHandlers() {
594       try {
595         // permanent
596         for (Runnable r: onAbort) {
597           r.run();
598         }
599         // temporary
600         for (Runnable r: onAbortOnce) {
601           r.run();
602         }
603         onAbortOnce.clear();
604         onValidateOnce.clear();
605       } catch (Exception ex) {
606         throw new PanicException(ex);
607       }
608     }
609   }
610 }