updates to transsim for journal submission
[IRC.git] / Robust / TransSim / FlexScheduler.java
1 import java.util.*;
2
3 public class FlexScheduler extends Thread {
4   Executor e;
5   int abortThreshold;
6   int abortRatio;
7   int deadlockcount;
8   int checkdepth;
9   int barriercount;
10
11   public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
12     this(e, policy, p);
13     this.abortThreshold=abortThreshold;
14     this.abortRatio=abortRatio;
15     this.checkdepth=checkdepth;
16   }
17
18   public void run() {
19     dosim();
20   }
21
22   public FlexScheduler(Executor e, int policy, Plot p) {
23     this.e=e;
24     barriercount=e.numThreads();
25     aborted=new boolean[e.numThreads()];
26     currentevents=new Event[e.numThreads()];
27     rdobjmap=new Hashtable();
28     wrobjmap=new Hashtable();
29     this.policy=policy;
30     r=new Random(100);
31     eq=new PriorityQueue();
32     backoff=new int[e.numThreads()];
33     retrycount=new int[e.numThreads()];
34     transferred=new int[e.numThreads()];
35     objtoinfo=new Hashtable();
36     threadinfo=new ThreadInfo[e.numThreads()];
37     blocked=new boolean[e.numThreads()];
38
39     for(int i=0;i<e.numThreads();i++) {
40       backoff[i]=BACKOFFSTART;
41       threadinfo[i]=new ThreadInfo(this);
42     }
43     this.p=p;
44     if (p!=null) {
45       serCommit=p.getSeries("COMMIT");
46       serStart=p.getSeries("START");
47       serAbort=p.getSeries("ABORT");
48       serStall=p.getSeries("STALL");
49       serWake=p.getSeries("WAKE");
50       serAvoid=p.getSeries("AVOIDDEADLOCK");
51     }
52   }
53
54   Plot p;
55   Series serCommit;
56   Series serStart;
57   Series serAbort;
58   Series serStall;
59   Series serAvoid;
60   Series serWake;
61
62   public int getDeadLockCount() {
63     return deadlockcount;
64   }
65
66   //Where to start the backoff delay at
67   public static final int BACKOFFSTART=1;
68
69   //Commit options
70   public static final int LAZY=0;
71   public static final int COMMIT=1;
72   public static final int ATTACK=2;
73   public static final int SUICIDE=3;
74   public static final int TIMESTAMP=4;
75   public static final int LOCK=5;
76   public static final int LOCKCOMMIT=6;
77   public static final int RANDOM=7;
78   public static final int KARMA=8;
79   public static final int POLITE=9;
80   public static final int ERUPTION=10;
81   public static final int THREAD=11;
82   public static final int ATTACKTIME=12;
83   public static final int ATTACKTHREAD=13;
84
85   PriorityQueue eq;
86   int policy;
87   boolean[] aborted;
88   long shorttesttime;
89   long starttime=-1;
90   Hashtable rdobjmap;
91   Hashtable wrobjmap;
92   int abortcount;
93   int commitcount;
94   long backoffcycles;
95   long stallcycles;
96   long abortedcycles;
97   Event[] currentevents;
98   Random r;
99   int[] backoff;
100   int[] retrycount;
101   int[] transferred;
102   Hashtable objtoinfo;
103   ThreadInfo[] threadinfo;
104   
105   boolean[] blocked;
106
107   public boolean isEager() {
108     return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION||policy==THREAD||policy==ATTACKTIME||policy==ATTACKTHREAD;
109   }
110
111   public boolean countObjects() {
112     return policy==KARMA||policy==ERUPTION;
113   }
114
115   public boolean isLock() {
116     return policy==LOCK||policy==LOCKCOMMIT;
117   }
118
119   public int getAborts() {
120     return abortcount;
121   }
122
123   public int getCommits() {
124     return commitcount;
125   }
126
127   public long getTime() {
128     return shorttesttime-starttime;
129   }
130
131   public long getStallTime() {
132     return stallcycles;
133   }
134
135   public long getBackoffTime() {
136     return backoffcycles;
137   }
138
139   //Aborts another thread...
140   public void reschedule(int currthread, long currtime, long backofftime) {
141     long time=currtime+backofftime;
142     backoffcycles+=backofftime;
143     currentevents[currthread].makeInvalid();
144     if (threadinfo[currthread].isStalled()) {
145       //remove from waiter list
146       threadinfo[currthread].setStall(false);
147       getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
148     }
149     if (serAbort!=null) {
150       serAbort.addPoint(time, currthread);
151     }
152     Transaction trans=currentevents[currthread].getTransaction();
153     
154     releaseObjects(trans, currthread, time);
155     Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
156     currentevents[currthread]=nev;
157     eq.add(nev);
158   }
159
160   //Aborts another thread...
161   public void stall(Event ev, long time, long delay) {
162     stallcycles+=delay;
163     ev.setTime(time+delay);
164     eq.add(ev);
165   }
166
167   private void releaseObjects(Transaction trans, int currthread, long time) {
168     //remove all events
169     for(int i=0;i<trans.numEvents();i++) {
170       ObjIndex object=trans.getObjIndex(i);
171
172       if (object!=null&&rdobjmap.containsKey(object)) {
173         ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
174       }
175       if (object!=null&&wrobjmap.containsKey(object)) {
176         ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
177       }
178       if (object!=null&&objtoinfo.containsKey(object)) {
179         ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
180         if (oi.getOwner()==currentevents[currthread].getThread()) {
181           oi.releaseOwner();
182           
183           //wake up one waiter
184           for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
185             //requeue everyone who was waiting on us and start them back up
186             Event waiter=(Event)waitit.next();
187             waitit.remove();
188             waiter.setTime(time);
189             threadinfo[waiter.getThread()].setStall(false);
190             if (serWake!=null)
191               serWake.addPoint(time,waiter.getThread());
192             oi.setOwner(waiter.getThread());
193             eq.add(waiter);
194             break;
195           }
196         }
197       }
198     }
199   }
200
201   /* Initializes things and returns number of transactions */
202   public int startinitial() {
203     int tcount=0;
204     for(int i=0;i<e.numThreads();i++) {
205       Transaction trans=e.getThread(i).getTransaction(0);
206       long time=trans.getTime(0);
207       Event ev=new Event(time, trans, 0, i, 0);
208       currentevents[i]=ev;
209       eq.add(ev);
210       tcount+=e.getThread(i).numTransactions();
211     }
212     return tcount;
213   }
214
215   public void dosim() {
216     long lasttime=0;
217     //start first transactions
218     int numtrans=startinitial();
219     System.out.println("Number of transactions="+numtrans);
220     int tcount=0;
221     while(!eq.isEmpty()) {
222       Event ev=(Event)eq.poll();
223       if (!ev.isValid()) {
224         continue;
225       }
226
227       Transaction trans=ev.getTransaction();
228
229       int event=ev.getEvent();
230       long currtime=ev.getTime();
231       lasttime=currtime;
232       if (trans.started&&starttime==-1)
233         starttime=currtime;
234
235       if (trans.numEvents()==(event+1)) {
236         tryCommit(ev, trans);
237         tcount++;
238         if ((tcount%100000)==0)
239           System.out.println("Attempted "+tcount+"transactions "+policy);
240       } else {
241         enqueueEvent(ev, trans);
242       }
243     }
244     shorttesttime=lasttime;
245     if (p!=null)
246       p.close();
247   }
248
249   private ObjectInfo getmapping(ObjIndex obj) {
250     if (!objtoinfo.containsKey(obj))
251       objtoinfo.put(obj, new ObjectInfo(this));
252     return (ObjectInfo)objtoinfo.get(obj);
253   }
254
255   public void tryCommit(Event ev, Transaction trans) {
256     //ready to commit this one
257     long currtime=ev.getTime();
258     releaseObjects(trans, ev.getThread(), currtime);
259     
260     //See if we have been flagged as aborted for the lazy case
261     boolean abort=aborted[ev.getThread()];
262     aborted[ev.getThread()]=false;
263     if (!abort) {
264       //if it is a transaction, increment commit count
265       if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
266         commitcount++;
267         if (serCommit!=null) {
268           serCommit.addPoint(ev.getTime(),ev.getThread());
269         }
270       }
271       //Reset our backoff counter
272       threadinfo[ev.getThread()].priority=0;
273       threadinfo[ev.getThread()].aborted=false;
274       backoff[ev.getThread()]=BACKOFFSTART;
275       retrycount[ev.getThread()]=0;
276       transferred[ev.getThread()]=0;
277
278       //abort the other threads
279       for(int i=0;i<trans.numEvents();i++) {
280         ObjIndex object=trans.getObjIndex(i);
281         int op=trans.getEvent(i);
282         //Mark commits to objects
283         if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
284           if (object==null) {
285             System.out.println(op);
286           }
287           getmapping(object).recordCommit();
288         }
289         //Check for threads we might cause to abort
290         if (op==Transaction.WRITE) {
291           HashSet abortset=new HashSet();
292           if (rdobjmap.containsKey(object)) {
293             for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
294               Integer threadid=(Integer)it.next();
295               abortset.add(threadid);
296               if (isLock()) {
297                 ObjectInfo oi=getmapping(object);
298                 oi.recordAbort();
299               }
300             }
301           }
302           if (wrobjmap.containsKey(object)) {
303             for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
304               Integer threadid=(Integer)it.next();
305               abortset.add(threadid);
306               if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
307                 //if this object hasn't already cause this thread to
308                 //abort, then flag it as an abort cause
309                 ObjectInfo oi=getmapping(object);
310                 oi.recordAbort();
311               }
312             }
313           }
314           for(Iterator abit=abortset.iterator();abit.hasNext();) {
315             Integer threadid=(Integer)abit.next();
316             if (policy==LAZY||policy==LOCK) {
317               //just flag to abort when it trie to commit
318               aborted[threadid]=true;
319               if (serAbort!=null)
320                 serAbort.addPoint(currtime, threadid);
321             } else if (policy==COMMIT||policy==LOCKCOMMIT) {
322               //abort it immediately
323               reschedule(threadid, currtime, 0);
324               abortcount++;
325             }
326           }
327         }
328       }
329     } else {
330       abortcount++;
331     }
332     
333     //add next transaction event...could be us if we aborted
334     int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
335     if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
336       Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
337       if (serStart!=null) {
338         if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
339           serStart.addPoint(ev.getTime(),ev.getThread());
340         }
341       }
342
343       Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
344       currentevents[ev.getThread()]=nev;
345       eq.add(nev);
346     }
347   }
348
349   public Set rdConflictSet(int thread, ObjIndex obj) {
350     if (!wrobjmap.containsKey(obj))
351       return null;
352     HashSet conflictset=new HashSet();
353     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
354       Integer threadid=(Integer)it.next();
355       if (threadid.intValue()!=thread)
356         conflictset.add(threadid);
357     }
358     if (conflictset.isEmpty())
359       return null;
360     else
361       return conflictset;
362   }
363
364   public Set wrConflictSet(int thread, ObjIndex obj) {
365     HashSet conflictset=new HashSet();
366     if (rdobjmap.containsKey(obj)) {
367       for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
368         Integer threadid=(Integer)it.next();
369         if (threadid.intValue()!=thread)
370           conflictset.add(threadid);
371       }
372     }
373     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
374       Integer threadid=(Integer)it.next();
375       if (threadid.intValue()!=thread)
376         conflictset.add(threadid);
377     }
378     if (conflictset.isEmpty())
379       return null;
380     else
381       return conflictset;
382   }
383
384   //Takes as parameter -- current transaction read event ev, conflicting
385   //set of threads, and the current time
386   //Returning false causes current transaction not continue to be scheduled
387
388
389   public boolean handleConflicts(Event ev, Set threadstokill, long time) {
390     if (policy==RANDOM) {
391       boolean b=r.nextBoolean();
392       if (b) {
393         //delay
394         int thread=ev.getThread();
395         int dback=backoff[thread]*2;
396         if (dback>0)
397           backoff[thread]=dback;
398         stall(ev, time, r.nextInt(backoff[thread]));
399         return false;
400       } else {
401         //abort other transactions
402         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
403           Integer thread=(Integer)thit.next();
404           reschedule(thread, time, 0);
405           abortcount++;
406         }
407         return true;
408       }
409     } else if (policy==KARMA) {
410       int maxpriority=0;
411       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
412         Integer thread=(Integer)thit.next();
413         if (threadinfo[thread].priority>maxpriority)
414           maxpriority=threadinfo[thread].priority;
415       }
416       if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
417         //stall for a little while
418         threadinfo[ev.getThread()].priority--;
419         retrycount[ev.getThread()]++;
420         int rtime=r.nextInt(3000);
421         stall(ev, time, rtime);
422         return false;
423       } else {
424         //we win
425         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
426           Integer thread=(Integer)thit.next();
427           int dback=backoff[thread]*2;
428           if (dback>0)
429             backoff[thread]=dback;
430           int atime=r.nextInt(backoff[thread]);
431           reschedule(thread, time, atime);
432           abortcount++;
433         }
434         return true;
435       }
436     } else if (policy==ERUPTION) {
437       int maxpriority=0;
438       //abort other transactions
439       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
440         Integer thread=(Integer)thit.next();
441         if (threadinfo[thread].priority>maxpriority)
442           maxpriority=threadinfo[thread].priority;
443       }
444       if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
445         //we lose
446         threadinfo[ev.getThread()].priority--;
447         //stall for a little while
448         int rtime=r.nextInt(3000);
449         stall(ev, time, rtime);
450         int ourpriority=threadinfo[ev.getThread()].priority;
451         ourpriority-=transferred[ev.getThread()];
452         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
453           Integer thread=(Integer)thit.next();
454           threadinfo[thread].priority+=ourpriority;
455         }
456         transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
457         retrycount[ev.getThread()]++;
458
459         return false;
460       } else {
461         //we win
462         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
463           Integer thread=(Integer)thit.next();
464           int dback=backoff[thread]*2;
465           if (dback>0)
466             backoff[thread]=dback;
467           int atime=r.nextInt(backoff[thread]);
468           reschedule(thread, time, atime);
469           abortcount++;
470         }
471         return true;
472       }
473     } else if (policy==POLITE) {
474       int retry=(++retrycount[ev.getThread()]);
475       if (retry>=22) {
476         retrycount[ev.getThread()]=0;
477         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
478           Integer thread=(Integer)thit.next();
479           reschedule(thread, time, 0);
480           abortcount++;
481         }
482         return true;
483       } else {
484         //otherwise stall
485         int stalltime=(1<<(retry-1))*12;
486         if (stalltime<0)
487           stalltime=1<<30;
488         stall(ev, time, r.nextInt(stalltime));
489         return false;
490       }
491     } else if (policy==ATTACK) {
492       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
493         Integer thread=(Integer)thit.next();
494         reschedule(thread, time, r.nextInt(backoff[thread.intValue()]));
495         int dback=backoff[thread.intValue()]*2;
496         if (dback>0)
497           backoff[thread.intValue()]=dback;
498         abortcount++;
499       }
500       return true;
501     } else if (policy==SUICIDE) {
502       reschedule(ev.getThread(), time, r.nextInt(backoff[ev.getThread()]));
503       int dback=backoff[ev.getThread()]*2;
504       if (dback>0)
505         backoff[ev.getThread()]=dback;
506       abortcount++;
507       return false;
508     } else if (policy==TIMESTAMP) {
509       long opponenttime=0;
510
511       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
512         Integer thread=(Integer)thit.next();
513         Event other=currentevents[thread.intValue()];
514         int eventnum=other.getEvent();
515         long otime=other.getTransaction().getTime(other.getEvent());
516         if (otime>opponenttime)
517           opponenttime=otime;
518       }
519       if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
520         //kill ourself
521         reschedule(ev.getThread(), time, 0);
522         abortcount++;
523         return false;
524       } else {
525         //kill the opponents
526         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
527           Integer thread=(Integer)thit.next();
528           reschedule(thread, time, 0);
529           abortcount++;
530         }
531         return true;    
532       }
533     } else if (policy==THREAD) {
534       long tid=1000;
535
536       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
537         Integer thread=(Integer)thit.next();
538         Event other=currentevents[thread.intValue()];
539         int eventnum=other.getEvent();
540         long otid=thread.intValue();
541         if (tid>otid)
542           tid=otid;
543       }
544       if (ev.getThread()>tid) {
545         //kill ourself
546         reschedule(ev.getThread(), time, 0);
547         abortcount++;
548         return false;
549       } else {
550         //kill the opponents
551         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
552           Integer thread=(Integer)thit.next();
553           reschedule(thread, time, 0);
554           abortcount++;
555         }
556         return true;    
557       }
558     } else if (policy==ATTACKTIME) {
559       boolean timebased=false;
560       int tev=ev.getThread();
561       timebased|=threadinfo[tev].aborted;
562       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
563         Integer thread=(Integer)thit.next();
564         timebased|=threadinfo[thread.intValue()].aborted;
565       }
566       if (timebased) {
567         long opponenttime=0;
568         
569         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
570           Integer thread=(Integer)thit.next();
571           Event other=currentevents[thread.intValue()];
572           int eventnum=other.getEvent();
573           long otime=other.getTransaction().getTime(other.getEvent());
574           if (otime>opponenttime)
575             opponenttime=otime;
576         }
577         if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
578           //kill ourself
579           reschedule(ev.getThread(), time, 0);
580           threadinfo[ev.getThread()].aborted=true;
581           abortcount++;
582           return false;
583         } else {
584           //kill the opponents
585           for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
586             Integer thread=(Integer)thit.next();
587             reschedule(thread, time, 0);
588             threadinfo[thread.intValue()].aborted=true;
589             abortcount++;
590           }
591           return true;  
592         }
593       } else {
594         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
595           Integer thread=(Integer)thit.next();
596           reschedule(thread, time, 0);
597           threadinfo[thread.intValue()].aborted=true;
598           abortcount++;
599         }
600         return true;
601       }
602     } else if (policy==ATTACKTHREAD) {
603       boolean threadbased=false;
604       int tev=ev.getThread();
605       threadbased|=threadinfo[tev].aborted;
606       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
607         Integer thread=(Integer)thit.next();
608         threadbased|=threadinfo[thread.intValue()].aborted;
609       }
610       if (threadbased) {
611         long opponentthr=1000;
612         
613         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
614           Integer thread=(Integer)thit.next();
615           Event other=currentevents[thread.intValue()];
616           int eventnum=other.getEvent();
617           long othr=thread.intValue();
618           if (othr<opponentthr)
619             opponentthr=othr;
620         }
621         if (opponentthr<tev) {
622           //kill ourself
623           reschedule(ev.getThread(), time, 0);
624           threadinfo[ev.getThread()].aborted=true;
625           abortcount++;
626           return false;
627         } else {
628           //kill the opponents
629           for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
630             Integer thread=(Integer)thit.next();
631             reschedule(thread, time, 0);
632             threadinfo[thread.intValue()].aborted=true;
633             abortcount++;
634           }
635           return true;  
636         }
637       } else {
638         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
639           Integer thread=(Integer)thit.next();
640           reschedule(thread, time, 0);
641           threadinfo[thread.intValue()].aborted=true;
642           abortcount++;
643         }
644         return true;
645       }
646     }
647
648     //Not eager
649     return true;
650   }
651
652   //Handle current event (read, write, delay) in a transaction and
653   //enqueue the next one
654
655   public void enqueueEvent(Event ev, Transaction trans) {
656     //just enqueue next event
657     int event=ev.getEvent();
658     long currtime=ev.getTime();
659     ObjIndex object=trans.getObjIndex(event);
660     int operation=trans.getEvent(event);
661
662     if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
663       ObjectInfo oi=getmapping(object);
664       
665       if (oi.isRisky()) {
666         if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
667           //we're going to wait
668           boolean deadlocked=true;
669           ObjectInfo toi=oi;
670           for(int i=0;i<checkdepth;i++) {
671             //check if stalling would close the loop
672             if (toi.getOwner()==ev.getThread())
673               break;
674             //see if cycle is broken
675             if (!threadinfo[toi.getOwner()].isStalled()) {
676               deadlocked=false;
677               break;
678             }
679             //follow one more in depth
680             toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
681           }
682           
683           if (!deadlocked) {
684             //don't wait on stalled threads, we could deadlock
685             threadinfo[ev.getThread()].setStall(true);
686             threadinfo[ev.getThread()].setObjIndex(object);
687             if (serStall!=null)
688               serStall.addPoint(ev.getTime(),ev.getThread());
689             oi.addWaiter(ev);
690             return;
691           } else {
692             if (serAvoid!=null)
693               serAvoid.addPoint(ev.getTime(),ev.getThread());
694             deadlockcount++;
695           }
696         } else {
697           //we have object
698           oi.setOwner(ev.getThread());
699         }
700       }
701     }
702     
703     //process the current event
704     if (operation==Transaction.READ) {
705       //record read event
706       if (!rdobjmap.containsKey(object))
707         rdobjmap.put(object,new HashSet());
708       if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
709         //added new object
710         if (countObjects()) {
711           threadinfo[ev.getThread()].priority++;
712         }
713       }
714       if (isEager()) {
715         //do eager contention management
716         Set conflicts=rdConflictSet(ev.getThread(), object);
717         if (conflicts!=null) {
718           if (!handleConflicts(ev, conflicts, currtime)) {
719             ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
720             return;
721           }
722         }
723       }
724     } else if (operation==Transaction.WRITE) {
725       //record write event
726       if (!wrobjmap.containsKey(object))
727         wrobjmap.put(object,new HashSet());
728       if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
729         if (countObjects()) {
730           threadinfo[ev.getThread()].priority++;
731         }
732       }
733       if (isEager()) {
734         Set conflicts=wrConflictSet(ev.getThread(), object);
735         if (conflicts!=null) {
736           if (!handleConflicts(ev, conflicts, currtime)) {
737             ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
738             return;
739           }
740         }
741       }
742     } else if (operation==Transaction.BARRIER) {
743       barriercount--;
744       if (barriercount==0) {
745         for(int i=0;i<e.numThreads();i++) {
746           //enqueue the next event
747           Event bev=currentevents[i];
748           int bevent=bev.getEvent();
749           long bcurrtime=bev.getTime();
750           Transaction btrans=bev.getTransaction();
751           long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
752           Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
753           currentevents[bev.getThread()]=nev;
754           eq.add(nev);
755         }
756         barriercount=e.numThreads();
757       } else {
758         //Do nothing
759         //wait until all threads in barrier
760       }
761       return;
762     }
763     retrycount[ev.getThread()]=0;
764     transferred[ev.getThread()]=0;
765     //enqueue the next event
766     long deltatime=trans.getTime(event+1)-trans.getTime(event);
767     Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
768     currentevents[ev.getThread()]=nev;
769     eq.add(nev);
770   }
771
772   
773   class Event implements Comparable {
774     boolean valid;
775     long time;
776     int num;
777     Transaction t;
778     int threadid;
779     int transnum;
780
781     public void makeInvalid() {
782       valid=false;
783     }
784
785     public boolean isValid() {
786       return valid;
787     }
788
789     public int getTransNum() {
790       return transnum;
791     }
792
793     public Transaction getTransaction() {
794       return t;
795     }
796
797     public int getEvent() {
798       return num;
799     }
800
801     public long getTime() {
802       return time;
803     }
804     
805     public void setTime(long time) {
806       this.time=time;
807     }
808
809     public int getThread() {
810       return threadid;
811     }
812
813     public Event(long time, Transaction t, int num, int threadid, int transnum) {
814       this.time=time;
815       this.t=t;
816       this.num=num;
817       this.threadid=threadid;
818       this.transnum=transnum;
819       valid=true;
820     }
821
822     //break ties to allow commits to occur earliest
823     public int compareTo(Object o) {
824       Event e=(Event)o;
825       long delta=time-e.time;
826       if (delta!=0) {
827         if (delta>0)
828           return 1;
829         else
830           return -1;
831       }
832       if (((getEvent()+1)==getTransaction().numEvents())&&
833           (e.getEvent()+1)!=e.getTransaction().numEvents())
834         return -1;
835       if (((getEvent()+1)!=getTransaction().numEvents())&&
836           (e.getEvent()+1)==e.getTransaction().numEvents())
837         return 1;
838       return 0;
839     }
840   }
841 }