more changes
[IRC.git] / Robust / TransSim / FlexScheduler.java
1 import java.util.*;
2
3 public class FlexScheduler extends Thread {
4   Executor e;
5   int abortThreshold;
6   int abortRatio;
7   int deadlockcount;
8   int checkdepth;
9   int barriercount;
10
11   public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
12     this(e, policy, p);
13     this.abortThreshold=abortThreshold;
14     this.abortRatio=abortRatio;
15     this.checkdepth=checkdepth;
16   }
17
18   public void run() {
19     dosim();
20   }
21
22   public FlexScheduler(Executor e, int policy, Plot p) {
23     this.e=e;
24     barriercount=e.numThreads();
25     aborted=new boolean[e.numThreads()];
26     currentevents=new Event[e.numThreads()];
27     rdobjmap=new Hashtable();
28     wrobjmap=new Hashtable();
29     this.policy=policy;
30     r=new Random(100);
31     eq=new PriorityQueue();
32     backoff=new int[e.numThreads()];
33     retrycount=new int[e.numThreads()];
34     transferred=new int[e.numThreads()];
35     objtoinfo=new Hashtable();
36     threadinfo=new ThreadInfo[e.numThreads()];
37     blocked=new boolean[e.numThreads()];
38
39     for(int i=0;i<e.numThreads();i++) {
40       backoff[i]=BACKOFFSTART;
41       threadinfo[i]=new ThreadInfo(this);
42     }
43     this.p=p;
44     if (p!=null) {
45       serCommit=p.getSeries("COMMIT");
46       serStart=p.getSeries("START");
47       serAbort=p.getSeries("ABORT");
48       serStall=p.getSeries("STALL");
49       serWake=p.getSeries("WAKE");
50       serAvoid=p.getSeries("AVOIDDEADLOCK");
51     }
52   }
53
54   Plot p;
55   Series serCommit;
56   Series serStart;
57   Series serAbort;
58   Series serStall;
59   Series serAvoid;
60   Series serWake;
61
62   public int getDeadLockCount() {
63     return deadlockcount;
64   }
65
66   //Where to start the backoff delay at
67   public static final int BACKOFFSTART=1;
68
69   //Commit options
70   public static final int LAZY=0;
71   public static final int COMMIT=1;
72   public static final int ATTACK=2;
73   public static final int SUICIDE=3;
74   public static final int TIMESTAMP=4;
75   public static final int LOCK=5;
76   public static final int LOCKCOMMIT=6;
77   public static final int RANDOM=7;
78   public static final int KARMA=8;
79   public static final int POLITE=9;
80   public static final int ERUPTION=10;
81   public static final int THREAD=11;
82   public static final int ATTACKTIME=12;
83   public static final int ATTACKTHREAD=13;
84
85   public static String getName(int policy) {
86     switch (policy) {
87     case LAZY:
88       return new String("LAZY");
89     case COMMIT:
90       return new String("COMMIT");
91     case ATTACK:
92       return new String("ATTACK");
93     case SUICIDE:
94       return new String("SUICIDE");
95     case TIMESTAMP:
96       return new String("TIMESTAMP");
97     case LOCK:
98       return new String("LOCK");
99     case LOCKCOMMIT:
100       return new String("LOCKCOMMIT");
101     case RANDOM:
102       return new String("RANDOM");
103     case KARMA:
104       return new String("KARMA");
105     case POLITE:
106       return new String("POLITE");
107     case ERUPTION:
108       return new String("ERUPTION");
109     case THREAD:
110       return new String("THREAD");
111     case ATTACKTIME:
112       return new String("ATTACKTIME");
113     case ATTACKTHREAD:
114       return new String("ATTACKTHREAD");
115     }
116     return null;
117   }
118
119   PriorityQueue eq;
120   int policy;
121   boolean[] aborted;
122   long shorttesttime;
123   long starttime=-1;
124   Hashtable rdobjmap;
125   Hashtable wrobjmap;
126   int abortcount;
127   int commitcount;
128   long backoffcycles;
129   long stallcycles;
130   long abortedcycles;
131   Event[] currentevents;
132   Random r;
133   int[] backoff;
134   int[] retrycount;
135   int[] transferred;
136   Hashtable objtoinfo;
137   ThreadInfo[] threadinfo;
138   
139   boolean[] blocked;
140
141   public boolean isEager() {
142     return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION||policy==THREAD||policy==ATTACKTIME||policy==ATTACKTHREAD;
143   }
144
145   public boolean countObjects() {
146     return policy==KARMA||policy==ERUPTION;
147   }
148
149   public boolean isLock() {
150     return policy==LOCK||policy==LOCKCOMMIT;
151   }
152
153   public int getAborts() {
154     return abortcount;
155   }
156
157   public int getCommits() {
158     return commitcount;
159   }
160
161   public long getTime() {
162     return shorttesttime-starttime;
163   }
164
165   public long getStallTime() {
166     return stallcycles;
167   }
168
169   public long getBackoffTime() {
170     return backoffcycles;
171   }
172
173   public long getAbortedTime() {
174     return abortedcycles;
175   }
176
177   //Computes wasted time
178   public void timewasted(int currthread, long currtime) {
179     Event e=currentevents[currthread];
180     Transaction trans=e.getTransaction();
181     int eIndex=e.getEvent();
182     long eTime=e.getTime();
183     long timeleft=eTime-currtime;
184     long totaltime=0;
185     for(int i=0;i<=eIndex;i++)
186       totaltime+=trans.getTime(i);
187     totaltime-=timeleft;//subtract off time to the next event
188     abortedcycles+=totaltime;
189   }
190
191   //Aborts another thread...
192   public void reschedule(int currthread, long currtime, long backofftime) {
193     long time=currtime+backofftime;
194     backoffcycles+=backofftime;
195     currentevents[currthread].makeInvalid();
196     if (threadinfo[currthread].isStalled()) {
197       //remove from waiter list
198       threadinfo[currthread].setStall(false);
199       getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
200     }
201     if (serAbort!=null) {
202       serAbort.addPoint(time, currthread);
203     }
204     Transaction trans=currentevents[currthread].getTransaction();
205     
206     releaseObjects(trans, currthread, time);
207     Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
208     currentevents[currthread]=nev;
209     eq.add(nev);
210   }
211
212   //Aborts another thread...
213   public void stall(Event ev, long time, long delay) {
214     stallcycles+=delay;
215     ev.setTime(time+delay);
216     eq.add(ev);
217   }
218
219   private void releaseObjects(Transaction trans, int currthread, long time) {
220     //remove all events
221     for(int i=0;i<trans.numEvents();i++) {
222       ObjIndex object=trans.getObjIndex(i);
223
224       if (object!=null&&rdobjmap.containsKey(object)) {
225         ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
226       }
227       if (object!=null&&wrobjmap.containsKey(object)) {
228         ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
229       }
230       if (object!=null&&objtoinfo.containsKey(object)) {
231         ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
232         if (oi.getOwner()==currentevents[currthread].getThread()) {
233           oi.releaseOwner();
234           
235           //wake up one waiter
236           for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
237             //requeue everyone who was waiting on us and start them back up
238             Event waiter=(Event)waitit.next();
239             waitit.remove();
240             waiter.setTime(time);
241             threadinfo[waiter.getThread()].setStall(false);
242             if (serWake!=null)
243               serWake.addPoint(time,waiter.getThread());
244             oi.setOwner(waiter.getThread());
245             eq.add(waiter);
246             break;
247           }
248         }
249       }
250     }
251   }
252
253   /* Initializes things and returns number of transactions */
254   public int startinitial() {
255     int tcount=0;
256     for(int i=0;i<e.numThreads();i++) {
257       Transaction trans=e.getThread(i).getTransaction(0);
258       long time=trans.getTime(0);
259       Event ev=new Event(time, trans, 0, i, 0);
260       currentevents[i]=ev;
261       eq.add(ev);
262       tcount+=e.getThread(i).numTransactions();
263     }
264     return tcount;
265   }
266
267   public void dosim() {
268     long lasttime=0;
269     //start first transactions
270     int numtrans=startinitial();
271     System.out.println("Number of transactions="+numtrans);
272     int tcount=0;
273     while(!eq.isEmpty()) {
274       Event ev=(Event)eq.poll();
275       if (!ev.isValid()) {
276         continue;
277       }
278
279       Transaction trans=ev.getTransaction();
280
281       int event=ev.getEvent();
282       long currtime=ev.getTime();
283       lasttime=currtime;
284       if (trans.started&&starttime==-1)
285         starttime=currtime;
286
287       if (trans.numEvents()==(event+1)) {
288         tryCommit(ev, trans);
289         tcount++;
290         if ((tcount%100000)==0)
291           System.out.println("Attempted "+tcount+"transactions "+policy);
292       } else {
293         enqueueEvent(ev, trans);
294       }
295     }
296     shorttesttime=lasttime;
297     if (p!=null)
298       p.close();
299   }
300
301   private ObjectInfo getmapping(ObjIndex obj) {
302     if (!objtoinfo.containsKey(obj))
303       objtoinfo.put(obj, new ObjectInfo(this));
304     return (ObjectInfo)objtoinfo.get(obj);
305   }
306
307   public void tryCommit(Event ev, Transaction trans) {
308     //ready to commit this one
309     long currtime=ev.getTime();
310     releaseObjects(trans, ev.getThread(), currtime);
311     
312     //See if we have been flagged as aborted for the lazy case
313     boolean abort=aborted[ev.getThread()];
314     aborted[ev.getThread()]=false;
315     if (!abort) {
316       //if it is a transaction, increment commit count
317       if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
318         commitcount++;
319         if (serCommit!=null) {
320           serCommit.addPoint(ev.getTime(),ev.getThread());
321         }
322       }
323       //Reset our backoff counter
324       threadinfo[ev.getThread()].priority=0;
325       threadinfo[ev.getThread()].aborted=false;
326       backoff[ev.getThread()]=BACKOFFSTART;
327       retrycount[ev.getThread()]=0;
328       transferred[ev.getThread()]=0;
329
330       //abort the other threads
331       for(int i=0;i<trans.numEvents();i++) {
332         ObjIndex object=trans.getObjIndex(i);
333         int op=trans.getEvent(i);
334         //Mark commits to objects
335         if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
336           if (object==null) {
337             System.out.println(op);
338           }
339           getmapping(object).recordCommit();
340         }
341         //Check for threads we might cause to abort
342         if (op==Transaction.WRITE) {
343           HashSet abortset=new HashSet();
344           if (rdobjmap.containsKey(object)) {
345             for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
346               Integer threadid=(Integer)it.next();
347               abortset.add(threadid);
348               if (isLock()) {
349                 ObjectInfo oi=getmapping(object);
350                 oi.recordAbort();
351               }
352             }
353           }
354           if (wrobjmap.containsKey(object)) {
355             for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
356               Integer threadid=(Integer)it.next();
357               abortset.add(threadid);
358               if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
359                 //if this object hasn't already cause this thread to
360                 //abort, then flag it as an abort cause
361                 ObjectInfo oi=getmapping(object);
362                 oi.recordAbort();
363               }
364             }
365           }
366           for(Iterator abit=abortset.iterator();abit.hasNext();) {
367             Integer threadid=(Integer)abit.next();
368             if (policy==LAZY||policy==LOCK) {
369               //just flag to abort when it trie to commit
370               aborted[threadid]=true;
371               if (serAbort!=null)
372                 serAbort.addPoint(currtime, threadid);
373             } else if (policy==COMMIT||policy==LOCKCOMMIT) {
374               //abort it immediately
375               timewasted(threadid, currtime);
376               reschedule(threadid, currtime, 0);
377               abortcount++;
378             }
379           }
380         }
381       }
382     } else {
383       abortcount++;
384       timewasted(ev.getThread(), currtime);
385     }
386     
387     //add next transaction event...could be us if we aborted
388     int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
389     if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
390       Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
391       if (serStart!=null) {
392         if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
393           serStart.addPoint(ev.getTime(),ev.getThread());
394         }
395       }
396
397       Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
398       currentevents[ev.getThread()]=nev;
399       eq.add(nev);
400     }
401   }
402
403   public Set rdConflictSet(int thread, ObjIndex obj) {
404     if (!wrobjmap.containsKey(obj))
405       return null;
406     HashSet conflictset=new HashSet();
407     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
408       Integer threadid=(Integer)it.next();
409       if (threadid.intValue()!=thread)
410         conflictset.add(threadid);
411     }
412     if (conflictset.isEmpty())
413       return null;
414     else
415       return conflictset;
416   }
417
418   public Set wrConflictSet(int thread, ObjIndex obj) {
419     HashSet conflictset=new HashSet();
420     if (rdobjmap.containsKey(obj)) {
421       for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
422         Integer threadid=(Integer)it.next();
423         if (threadid.intValue()!=thread)
424           conflictset.add(threadid);
425       }
426     }
427     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
428       Integer threadid=(Integer)it.next();
429       if (threadid.intValue()!=thread)
430         conflictset.add(threadid);
431     }
432     if (conflictset.isEmpty())
433       return null;
434     else
435       return conflictset;
436   }
437
438   //Takes as parameter -- current transaction read event ev, conflicting
439   //set of threads, and the current time
440   //Returning false causes current transaction not continue to be scheduled
441
442
443   public boolean handleConflicts(Event ev, Set threadstokill, long time) {
444     if (policy==RANDOM) {
445       boolean b=r.nextBoolean();
446       if (b) {
447         //delay
448         int thread=ev.getThread();
449         int dback=backoff[thread]*2;
450         if (dback>0)
451           backoff[thread]=dback;
452         stall(ev, time, r.nextInt(backoff[thread]));
453         return false;
454       } else {
455         //abort other transactions
456         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
457           Integer thread=(Integer)thit.next();
458           timewasted(thread, time);
459           reschedule(thread, time, 0);
460           abortcount++;
461         }
462         return true;
463       }
464     } else if (policy==KARMA) {
465       int maxpriority=0;
466       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
467         Integer thread=(Integer)thit.next();
468         if (threadinfo[thread].priority>maxpriority)
469           maxpriority=threadinfo[thread].priority;
470       }
471       if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
472         //stall for a little while
473         threadinfo[ev.getThread()].priority--;
474         retrycount[ev.getThread()]++;
475         int rtime=r.nextInt(3000);
476         stall(ev, time, rtime);
477         return false;
478       } else {
479         //we win
480         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
481           Integer thread=(Integer)thit.next();
482           int dback=backoff[thread]*2;
483           if (dback>0)
484             backoff[thread]=dback;
485           int atime=r.nextInt(backoff[thread]);
486           timewasted(thread, time);
487           reschedule(thread, time, atime);
488           abortcount++;
489         }
490         return true;
491       }
492     } else if (policy==ERUPTION) {
493       int maxpriority=0;
494       //abort other transactions
495       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
496         Integer thread=(Integer)thit.next();
497         if (threadinfo[thread].priority>maxpriority)
498           maxpriority=threadinfo[thread].priority;
499       }
500       if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
501         //we lose
502         threadinfo[ev.getThread()].priority--;
503         //stall for a little while
504         int rtime=r.nextInt(3000);
505         stall(ev, time, rtime);
506         int ourpriority=threadinfo[ev.getThread()].priority;
507         ourpriority-=transferred[ev.getThread()];
508         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
509           Integer thread=(Integer)thit.next();
510           threadinfo[thread].priority+=ourpriority;
511         }
512         transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
513         retrycount[ev.getThread()]++;
514
515         return false;
516       } else {
517         //we win
518         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
519           Integer thread=(Integer)thit.next();
520           int dback=backoff[thread]*2;
521           if (dback>0)
522             backoff[thread]=dback;
523           int atime=r.nextInt(backoff[thread]);
524           timewasted(thread, time);
525           reschedule(thread, time, atime);
526           abortcount++;
527         }
528         return true;
529       }
530     } else if (policy==POLITE) {
531       int retry=(++retrycount[ev.getThread()]);
532       if (retry>=22) {
533         retrycount[ev.getThread()]=0;
534         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
535           Integer thread=(Integer)thit.next();
536           timewasted(thread, time);
537           reschedule(thread, time, 0);
538           abortcount++;
539         }
540         return true;
541       } else {
542         //otherwise stall
543         int stalltime=(1<<(retry-1))*12;
544         if (stalltime<0)
545           stalltime=1<<30;
546         stall(ev, time, r.nextInt(stalltime));
547         return false;
548       }
549     } else if (policy==ATTACK) {
550       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
551         Integer thread=(Integer)thit.next();
552         timewasted(thread, time);
553         reschedule(thread, time, r.nextInt(backoff[thread.intValue()]));
554         int dback=backoff[thread.intValue()]*2;
555         if (dback>0)
556           backoff[thread.intValue()]=dback;
557         abortcount++;
558       }
559       return true;
560     } else if (policy==SUICIDE) {
561       timewasted(ev.getThread(), time);
562       reschedule(ev.getThread(), time, r.nextInt(backoff[ev.getThread()]));
563       int dback=backoff[ev.getThread()]*2;
564       if (dback>0)
565         backoff[ev.getThread()]=dback;
566       abortcount++;
567       return false;
568     } else if (policy==TIMESTAMP) {
569       long opponenttime=0;
570
571       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
572         Integer thread=(Integer)thit.next();
573         Event other=currentevents[thread.intValue()];
574         int eventnum=other.getEvent();
575         long otime=other.getTransaction().getTime(other.getEvent());
576         if (otime>opponenttime)
577           opponenttime=otime;
578       }
579       if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
580         //kill ourself
581         timewasted(ev.getThread(), time);
582         reschedule(ev.getThread(), time, 0);
583         abortcount++;
584         return false;
585       } else {
586         //kill the opponents
587         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
588           Integer thread=(Integer)thit.next();
589           timewasted(thread, time);
590           reschedule(thread, time, 0);
591           abortcount++;
592         }
593         return true;    
594       }
595     } else if (policy==THREAD) {
596       long tid=1000;
597
598       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
599         Integer thread=(Integer)thit.next();
600         Event other=currentevents[thread.intValue()];
601         int eventnum=other.getEvent();
602         long otid=thread.intValue();
603         if (tid>otid)
604           tid=otid;
605       }
606       if (ev.getThread()>tid) {
607         //kill ourself
608         timewasted(ev.getThread(), time);
609         reschedule(ev.getThread(), time, 0);
610         abortcount++;
611         return false;
612       } else {
613         //kill the opponents
614         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
615           Integer thread=(Integer)thit.next();
616           timewasted(thread, time);
617           reschedule(thread, time, 0);
618           abortcount++;
619         }
620         return true;    
621       }
622     } else if (policy==ATTACKTIME) {
623       boolean timebased=false;
624       int tev=ev.getThread();
625       timebased|=threadinfo[tev].aborted;
626       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
627         Integer thread=(Integer)thit.next();
628         timebased|=threadinfo[thread.intValue()].aborted;
629       }
630       if (timebased) {
631         long opponenttime=0;
632         
633         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
634           Integer thread=(Integer)thit.next();
635           Event other=currentevents[thread.intValue()];
636           int eventnum=other.getEvent();
637           long otime=other.getTransaction().getTime(other.getEvent());
638           if (otime>opponenttime)
639             opponenttime=otime;
640         }
641         if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
642           //kill ourself
643           timewasted(ev.getThread(), time);
644           reschedule(ev.getThread(), time, 0);
645           threadinfo[ev.getThread()].aborted=true;
646           abortcount++;
647           return false;
648         } else {
649           //kill the opponents
650           for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
651             Integer thread=(Integer)thit.next();
652             timewasted(thread, time);
653             reschedule(thread, time, 0);
654             threadinfo[thread.intValue()].aborted=true;
655             abortcount++;
656           }
657           return true;  
658         }
659       } else {
660         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
661           Integer thread=(Integer)thit.next();
662           timewasted(thread, time);
663           reschedule(thread, time, 0);
664           threadinfo[thread.intValue()].aborted=true;
665           abortcount++;
666         }
667         return true;
668       }
669     } else if (policy==ATTACKTHREAD) {
670       boolean threadbased=false;
671       int tev=ev.getThread();
672       threadbased|=threadinfo[tev].aborted;
673       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
674         Integer thread=(Integer)thit.next();
675         threadbased|=threadinfo[thread.intValue()].aborted;
676       }
677       if (threadbased) {
678         long opponentthr=1000;
679         
680         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
681           Integer thread=(Integer)thit.next();
682           Event other=currentevents[thread.intValue()];
683           int eventnum=other.getEvent();
684           long othr=thread.intValue();
685           if (othr<opponentthr)
686             opponentthr=othr;
687         }
688         if (opponentthr<tev) {
689           //kill ourself
690           timewasted(ev.getThread(), time);
691           reschedule(ev.getThread(), time, 0);
692           threadinfo[ev.getThread()].aborted=true;
693           abortcount++;
694           return false;
695         } else {
696           //kill the opponents
697           for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
698             Integer thread=(Integer)thit.next();
699             timewasted(thread, time);
700             reschedule(thread, time, 0);
701             threadinfo[thread.intValue()].aborted=true;
702             abortcount++;
703           }
704           return true;  
705         }
706       } else {
707         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
708           Integer thread=(Integer)thit.next();
709           timewasted(thread, time);
710           reschedule(thread, time, 0);
711           threadinfo[thread.intValue()].aborted=true;
712           abortcount++;
713         }
714         return true;
715       }
716     }
717
718     //Not eager
719     return true;
720   }
721
722   //Handle current event (read, write, delay) in a transaction and
723   //enqueue the next one
724
725   public void enqueueEvent(Event ev, Transaction trans) {
726     //just enqueue next event
727     int event=ev.getEvent();
728     long currtime=ev.getTime();
729     ObjIndex object=trans.getObjIndex(event);
730     int operation=trans.getEvent(event);
731
732     if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
733       ObjectInfo oi=getmapping(object);
734       
735       if (oi.isRisky()) {
736         if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
737           //we're going to wait
738           boolean deadlocked=true;
739           ObjectInfo toi=oi;
740           for(int i=0;i<checkdepth;i++) {
741             //check if stalling would close the loop
742             if (toi.getOwner()==ev.getThread())
743               break;
744             //see if cycle is broken
745             if (!threadinfo[toi.getOwner()].isStalled()) {
746               deadlocked=false;
747               break;
748             }
749             //follow one more in depth
750             toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
751           }
752           
753           if (!deadlocked) {
754             //don't wait on stalled threads, we could deadlock
755             threadinfo[ev.getThread()].setStall(true);
756             threadinfo[ev.getThread()].setObjIndex(object);
757             if (serStall!=null)
758               serStall.addPoint(ev.getTime(),ev.getThread());
759             oi.addWaiter(ev);
760             return;
761           } else {
762             if (serAvoid!=null)
763               serAvoid.addPoint(ev.getTime(),ev.getThread());
764             deadlockcount++;
765           }
766         } else {
767           //we have object
768           oi.setOwner(ev.getThread());
769         }
770       }
771     }
772     
773     //process the current event
774     if (operation==Transaction.READ) {
775       //record read event
776       if (!rdobjmap.containsKey(object))
777         rdobjmap.put(object,new HashSet());
778       if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
779         //added new object
780         if (countObjects()) {
781           threadinfo[ev.getThread()].priority++;
782         }
783       }
784       if (isEager()) {
785         //do eager contention management
786         Set conflicts=rdConflictSet(ev.getThread(), object);
787         if (conflicts!=null) {
788           if (!handleConflicts(ev, conflicts, currtime)) {
789             ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
790             return;
791           }
792         }
793       }
794     } else if (operation==Transaction.WRITE) {
795       //record write event
796       if (!wrobjmap.containsKey(object))
797         wrobjmap.put(object,new HashSet());
798       if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
799         if (countObjects()) {
800           threadinfo[ev.getThread()].priority++;
801         }
802       }
803       if (isEager()) {
804         Set conflicts=wrConflictSet(ev.getThread(), object);
805         if (conflicts!=null) {
806           if (!handleConflicts(ev, conflicts, currtime)) {
807             ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
808             return;
809           }
810         }
811       }
812     } else if (operation==Transaction.BARRIER) {
813       barriercount--;
814       if (barriercount==0) {
815         for(int i=0;i<e.numThreads();i++) {
816           //enqueue the next event
817           Event bev=currentevents[i];
818           int bevent=bev.getEvent();
819           long bcurrtime=bev.getTime();
820           Transaction btrans=bev.getTransaction();
821           long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
822           Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
823           currentevents[bev.getThread()]=nev;
824           eq.add(nev);
825         }
826         barriercount=e.numThreads();
827       } else {
828         //Do nothing
829         //wait until all threads in barrier
830       }
831       return;
832     }
833     retrycount[ev.getThread()]=0;
834     transferred[ev.getThread()]=0;
835     //enqueue the next event
836     long deltatime=trans.getTime(event+1)-trans.getTime(event);
837     Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
838     currentevents[ev.getThread()]=nev;
839     eq.add(nev);
840   }
841
842   
843   class Event implements Comparable {
844     boolean valid;
845     long time;
846     int num;
847     Transaction t;
848     int threadid;
849     int transnum;
850
851     public void makeInvalid() {
852       valid=false;
853     }
854
855     public boolean isValid() {
856       return valid;
857     }
858
859     public int getTransNum() {
860       return transnum;
861     }
862
863     public Transaction getTransaction() {
864       return t;
865     }
866
867     public int getEvent() {
868       return num;
869     }
870
871     public long getTime() {
872       return time;
873     }
874     
875     public void setTime(long time) {
876       this.time=time;
877     }
878
879     public int getThread() {
880       return threadid;
881     }
882
883     public Event(long time, Transaction t, int num, int threadid, int transnum) {
884       this.time=time;
885       this.t=t;
886       this.num=num;
887       this.threadid=threadid;
888       this.transnum=transnum;
889       valid=true;
890     }
891
892     //break ties to allow commits to occur earliest
893     public int compareTo(Object o) {
894       Event e=(Event)o;
895       long delta=time-e.time;
896       if (delta!=0) {
897         if (delta>0)
898           return 1;
899         else
900           return -1;
901       }
902       if (((getEvent()+1)==getTransaction().numEvents())&&
903           (e.getEvent()+1)!=e.getTransaction().numEvents())
904         return -1;
905       if (((getEvent()+1)!=getTransaction().numEvents())&&
906           (e.getEvent()+1)==e.getTransaction().numEvents())
907         return 1;
908       return 0;
909     }
910   }
911 }