bug fixes and extensions
[IRC.git] / Robust / TransSim / FlexScheduler.java
1 import java.util.*;
2
3 public class FlexScheduler extends Thread {
4   Executor e;
5   int abortThreshold;
6   int abortRatio;
7   int deadlockcount;
8   int checkdepth;
9   int barriercount;
10
11   public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
12     this(e, policy, p);
13     this.abortThreshold=abortThreshold;
14     this.abortRatio=abortRatio;
15     this.checkdepth=checkdepth;
16   }
17
18   public void run() {
19     dosim();
20   }
21
22   public FlexScheduler(Executor e, int policy, Plot p) {
23     this.e=e;
24     barriercount=e.numThreads();
25     aborted=new boolean[e.numThreads()];
26     currentevents=new Event[e.numThreads()];
27     rdobjmap=new Hashtable();
28     wrobjmap=new Hashtable();
29     this.policy=policy;
30     r=new Random(100);
31     eq=new PriorityQueue();
32     backoff=new int[e.numThreads()];
33     retrycount=new int[e.numThreads()];
34     transferred=new int[e.numThreads()];
35     objtoinfo=new Hashtable();
36     threadinfo=new ThreadInfo[e.numThreads()];
37     blocked=new boolean[e.numThreads()];
38
39     for(int i=0;i<e.numThreads();i++) {
40       backoff[i]=BACKOFFSTART;
41       threadinfo[i]=new ThreadInfo(this);
42     }
43     this.p=p;
44     if (p!=null) {
45       serCommit=p.getSeries("COMMIT");
46       serStart=p.getSeries("START");
47       serAbort=p.getSeries("ABORT");
48       serStall=p.getSeries("STALL");
49       serWake=p.getSeries("WAKE");
50       serAvoid=p.getSeries("AVOIDDEADLOCK");
51     }
52   }
53
54   Plot p;
55   Series serCommit;
56   Series serStart;
57   Series serAbort;
58   Series serStall;
59   Series serAvoid;
60   Series serWake;
61
62   public int getDeadLockCount() {
63     return deadlockcount;
64   }
65
66   //Where to start the backoff delay at
67   public static final int BACKOFFSTART=1;
68
69   //Commit options
70   public static final int LAZY=0;
71   public static final int COMMIT=1;
72   public static final int ATTACK=2;
73   public static final int SUICIDE=3;
74   public static final int TIMESTAMP=4;
75   public static final int LOCK=5;
76   public static final int LOCKCOMMIT=6;
77   public static final int RANDOM=7;
78   public static final int KARMA=8;
79   public static final int POLITE=9;
80   public static final int ERUPTION=10;
81   public static final int THREAD=11;
82   public static final int ATTACKTIME=12;
83   public static final int ATTACKTHREAD=13;
84
85   public static String getName(int policy) {
86     switch (policy) {
87     case LAZY:
88       return new String("LAZY");
89     case COMMIT:
90       return new String("COMMIT");
91     case ATTACK:
92       return new String("ATTACK");
93     case SUICIDE:
94       return new String("SUICIDE");
95     case TIMESTAMP:
96       return new String("TIMESTAMP");
97     case LOCK:
98       return new String("LOCK");
99     case LOCKCOMMIT:
100       return new String("LOCKCOMMIT");
101     case RANDOM:
102       return new String("RANDOM");
103     case KARMA:
104       return new String("KARMA");
105     case POLITE:
106       return new String("POLITE");
107     case ERUPTION:
108       return new String("ERUPTION");
109     case THREAD:
110       return new String("THREAD");
111     case ATTACKTIME:
112       return new String("ATTACKTIME");
113     case ATTACKTHREAD:
114       return new String("ATTACKTHREAD");
115     }
116     return null;
117   }
118
119   PriorityQueue eq;
120   int policy;
121   boolean[] aborted;
122   long shorttesttime;
123   long earliesttime=-1;
124   long starttime=-1;
125   Hashtable rdobjmap;
126   Hashtable wrobjmap;
127   int abortcount;
128   int commitcount;
129   long backoffcycles;
130   long stallcycles;
131   long abortedcycles;
132   Event[] currentevents;
133   Random r;
134   int[] backoff;
135   int[] retrycount;
136   int[] transferred;
137   Hashtable objtoinfo;
138   ThreadInfo[] threadinfo;
139   
140   boolean[] blocked;
141
142   public boolean isEager() {
143     return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION||policy==THREAD||policy==ATTACKTIME||policy==ATTACKTHREAD;
144   }
145
146   public boolean countObjects() {
147     return policy==KARMA||policy==ERUPTION;
148   }
149
150   public boolean isLock() {
151     return policy==LOCK||policy==LOCKCOMMIT;
152   }
153
154   public int getAborts() {
155     return abortcount;
156   }
157
158   public int getCommits() {
159     return commitcount;
160   }
161
162   public long getEarliestTime() {
163     return earliesttime-starttime;
164   }
165
166   public long getTime() {
167     return shorttesttime-starttime;
168   }
169
170   public long getStallTime() {
171     return stallcycles;
172   }
173
174   public long getBackoffTime() {
175     return backoffcycles;
176   }
177
178   public long getAbortedTime() {
179     return abortedcycles;
180   }
181
182   //Computes wasted time
183   public void timewasted(int currthread, long currtime) {
184     Event e=currentevents[currthread];
185     Transaction trans=e.getTransaction();
186     int eIndex=e.getEvent();
187     long eTime=e.getTime();
188     long timeleft=eTime-currtime;
189     if (e.isStalled()) {
190       stallcycles-=timeleft; //this time is no longer stalled...back it out
191       timeleft=0;//if the event is stalled, we already waited this time...
192     }
193     long totaltime=trans.getTime(eIndex);
194     totaltime-=timeleft;//subtract off time to the next event
195     abortedcycles+=totaltime;
196   }
197
198   //Aborts another thread...
199   public void reschedule(int currthread, long currtime, long backofftime) {
200     long time=currtime+backofftime;
201     backoffcycles+=backofftime;
202     currentevents[currthread].makeInvalid();
203     if (threadinfo[currthread].isStalled()) {
204       //remove from waiter list
205       threadinfo[currthread].setStall(false);
206       getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
207     }
208     if (serAbort!=null) {
209       serAbort.addPoint(time, currthread);
210     }
211     Transaction trans=currentevents[currthread].getTransaction();
212     
213     releaseObjects(trans, currthread, time);
214     Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
215     currentevents[currthread]=nev;
216     eq.add(nev);
217   }
218
219   //Aborts another thread...
220   public void stall(Event ev, long time, long delay) {
221     stallcycles+=delay;
222     ev.setTime(time+delay);
223     ev.setStall();
224     eq.add(ev);
225   }
226
227   private void releaseObjects(Transaction trans, int currthread, long time) {
228     //remove all events
229     for(int i=0;i<trans.numEvents();i++) {
230       ObjIndex object=trans.getObjIndex(i);
231
232       if (object!=null&&rdobjmap.containsKey(object)) {
233         ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
234       }
235       if (object!=null&&wrobjmap.containsKey(object)) {
236         ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
237       }
238       if (object!=null&&objtoinfo.containsKey(object)) {
239         ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
240         if (oi.getOwner()==currentevents[currthread].getThread()) {
241           oi.releaseOwner();
242           
243           //wake up one waiter
244           for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
245             //requeue everyone who was waiting on us and start them back up
246             Event waiter=(Event)waitit.next();
247             waitit.remove();
248             waiter.setTime(time);
249             threadinfo[waiter.getThread()].setStall(false);
250             if (serWake!=null)
251               serWake.addPoint(time,waiter.getThread());
252             oi.setOwner(waiter.getThread());
253             eq.add(waiter);
254             break;
255           }
256         }
257       }
258     }
259   }
260
261   /* Initializes things and returns number of transactions */
262   public int startinitial() {
263     int tcount=0;
264     for(int i=0;i<e.numThreads();i++) {
265       Transaction trans=e.getThread(i).getTransaction(0);
266       long time=trans.getTime(0);
267       Event ev=new Event(time, trans, 0, i, 0);
268       currentevents[i]=ev;
269       eq.add(ev);
270       tcount+=e.getThread(i).numTransactions();
271     }
272     return tcount;
273   }
274
275   public void dosim() {
276     long lasttime=0;
277     //start first transactions
278     int numtrans=startinitial();
279     System.out.println("Number of transactions="+numtrans);
280     int tcount=0;
281     while(!eq.isEmpty()) {
282       Event ev=(Event)eq.poll();
283       if (!ev.isValid()) {
284         continue;
285       }
286
287       Transaction trans=ev.getTransaction();
288
289       int event=ev.getEvent();
290       long currtime=ev.getTime();
291       lasttime=currtime;
292       if (trans.started&&starttime==-1)
293         starttime=currtime;
294
295       if (trans.numEvents()==(event+1)) {
296         tryCommit(ev, trans);
297         tcount++;
298         if ((tcount%100000)==0)
299           System.out.println("Attempted "+tcount+"transactions "+policy);
300       } else {
301         enqueueEvent(ev, trans);
302       }
303     }
304     shorttesttime=lasttime;
305     if (p!=null)
306       p.close();
307   }
308
309   private ObjectInfo getmapping(ObjIndex obj) {
310     if (!objtoinfo.containsKey(obj))
311       objtoinfo.put(obj, new ObjectInfo(this));
312     return (ObjectInfo)objtoinfo.get(obj);
313   }
314
315   public void tryCommit(Event ev, Transaction trans) {
316     //ready to commit this one
317     long currtime=ev.getTime();
318     releaseObjects(trans, ev.getThread(), currtime);
319     
320     //See if we have been flagged as aborted for the lazy case
321     boolean abort=aborted[ev.getThread()];
322     aborted[ev.getThread()]=false;
323     if (!abort) {
324       //if it is a transaction, increment commit count
325       if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
326         commitcount++;
327         if (serCommit!=null) {
328           serCommit.addPoint(ev.getTime(),ev.getThread());
329         }
330       }
331       //Reset our backoff counter
332       threadinfo[ev.getThread()].priority=0;
333       threadinfo[ev.getThread()].aborted=false;
334       backoff[ev.getThread()]=BACKOFFSTART;
335       retrycount[ev.getThread()]=0;
336       transferred[ev.getThread()]=0;
337
338       //abort the other threads
339       for(int i=0;i<trans.numEvents();i++) {
340         ObjIndex object=trans.getObjIndex(i);
341         int op=trans.getEvent(i);
342         //Mark commits to objects
343         if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
344           if (object==null) {
345             System.out.println(op);
346           }
347           getmapping(object).recordCommit();
348         }
349         //Check for threads we might cause to abort
350         if (op==Transaction.WRITE) {
351           HashSet abortset=new HashSet();
352           if (rdobjmap.containsKey(object)) {
353             for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
354               Integer threadid=(Integer)it.next();
355               abortset.add(threadid);
356               if (isLock()) {
357                 ObjectInfo oi=getmapping(object);
358                 oi.recordAbort();
359               }
360             }
361           }
362           if (wrobjmap.containsKey(object)) {
363             for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
364               Integer threadid=(Integer)it.next();
365               abortset.add(threadid);
366               if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
367                 //if this object hasn't already cause this thread to
368                 //abort, then flag it as an abort cause
369                 ObjectInfo oi=getmapping(object);
370                 oi.recordAbort();
371               }
372             }
373           }
374           for(Iterator abit=abortset.iterator();abit.hasNext();) {
375             Integer threadid=(Integer)abit.next();
376             if (policy==LAZY||policy==LOCK) {
377               //just flag to abort when it trie to commit
378               aborted[threadid]=true;
379               if (serAbort!=null)
380                 serAbort.addPoint(currtime, threadid);
381             } else if (policy==COMMIT||policy==LOCKCOMMIT) {
382               //abort it immediately
383               timewasted(threadid, currtime);
384               reschedule(threadid, currtime, 0);
385               abortcount++;
386             }
387           }
388         }
389       }
390     } else {
391       abortcount++;
392       timewasted(ev.getThread(), currtime);
393     }
394     
395     //add next transaction event...could be us if we aborted
396     int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
397     if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
398       Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
399       if (serStart!=null) {
400         if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
401           serStart.addPoint(ev.getTime(),ev.getThread());
402         }
403       }
404
405       Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
406       currentevents[ev.getThread()]=nev;
407       eq.add(nev);
408     } else {
409       if (earliesttime==-1)
410         earliesttime=currtime;
411     }
412   }
413
414   public Set rdConflictSet(int thread, ObjIndex obj) {
415     if (!wrobjmap.containsKey(obj))
416       return null;
417     HashSet conflictset=new HashSet();
418     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
419       Integer threadid=(Integer)it.next();
420       if (threadid.intValue()!=thread)
421         conflictset.add(threadid);
422     }
423     if (conflictset.isEmpty())
424       return null;
425     else
426       return conflictset;
427   }
428
429   public Set wrConflictSet(int thread, ObjIndex obj) {
430     HashSet conflictset=new HashSet();
431     if (rdobjmap.containsKey(obj)) {
432       for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
433         Integer threadid=(Integer)it.next();
434         if (threadid.intValue()!=thread)
435           conflictset.add(threadid);
436       }
437     }
438     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
439       Integer threadid=(Integer)it.next();
440       if (threadid.intValue()!=thread)
441         conflictset.add(threadid);
442     }
443     if (conflictset.isEmpty())
444       return null;
445     else
446       return conflictset;
447   }
448
449   //Takes as parameter -- current transaction read event ev, conflicting
450   //set of threads, and the current time
451   //Returning false causes current transaction not continue to be scheduled
452
453
454   public boolean handleConflicts(Event ev, Set threadstokill, long time) {
455     if (policy==RANDOM) {
456       boolean b=r.nextBoolean();
457       if (b) {
458         //delay
459         int thread=ev.getThread();
460         int dback=backoff[thread]*2;
461         if (dback>0)
462           backoff[thread]=dback;
463         stall(ev, time, r.nextInt(backoff[thread]));
464         return false;
465       } else {
466         //abort other transactions
467         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
468           Integer thread=(Integer)thit.next();
469           timewasted(thread, time);
470           reschedule(thread, time, 0);
471           abortcount++;
472         }
473         return true;
474       }
475     } else if (policy==KARMA) {
476       int maxpriority=0;
477       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
478         Integer thread=(Integer)thit.next();
479         if (threadinfo[thread].priority>maxpriority)
480           maxpriority=threadinfo[thread].priority;
481       }
482       if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
483         //stall for a little while
484         threadinfo[ev.getThread()].priority--;
485         retrycount[ev.getThread()]++;
486         int rtime=r.nextInt(3000);
487         stall(ev, time, rtime);
488         return false;
489       } else {
490         //we win
491         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
492           Integer thread=(Integer)thit.next();
493           int dback=backoff[thread]*2;
494           if (dback>0)
495             backoff[thread]=dback;
496           int atime=r.nextInt(backoff[thread]);
497           timewasted(thread, time);
498           reschedule(thread, time, atime);
499           abortcount++;
500         }
501         return true;
502       }
503     } else if (policy==ERUPTION) {
504       int maxpriority=0;
505       //abort other transactions
506       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
507         Integer thread=(Integer)thit.next();
508         if (threadinfo[thread].priority>maxpriority)
509           maxpriority=threadinfo[thread].priority;
510       }
511       if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
512         //we lose
513         threadinfo[ev.getThread()].priority--;
514         //stall for a little while
515         int rtime=r.nextInt(3000);
516         stall(ev, time, rtime);
517         int ourpriority=threadinfo[ev.getThread()].priority;
518         ourpriority-=transferred[ev.getThread()];
519         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
520           Integer thread=(Integer)thit.next();
521           threadinfo[thread].priority+=ourpriority;
522         }
523         transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
524         retrycount[ev.getThread()]++;
525
526         return false;
527       } else {
528         //we win
529         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
530           Integer thread=(Integer)thit.next();
531           int dback=backoff[thread]*2;
532           if (dback>0)
533             backoff[thread]=dback;
534           int atime=r.nextInt(backoff[thread]);
535           timewasted(thread, time);
536           reschedule(thread, time, atime);
537           abortcount++;
538         }
539         return true;
540       }
541     } else if (policy==POLITE) {
542       int retry=(++retrycount[ev.getThread()]);
543       if (retry>=22) {
544         retrycount[ev.getThread()]=0;
545         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
546           Integer thread=(Integer)thit.next();
547           timewasted(thread, time);
548           reschedule(thread, time, 0);
549           abortcount++;
550         }
551         return true;
552       } else {
553         //otherwise stall
554         int stalltime=(1<<(retry-1))*12;
555         if (stalltime<0)
556           stalltime=1<<30;
557         stall(ev, time, r.nextInt(stalltime));
558         return false;
559       }
560     } else if (policy==ATTACK) {
561       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
562         Integer thread=(Integer)thit.next();
563         timewasted(thread, time);
564         reschedule(thread, time, r.nextInt(backoff[thread.intValue()]));
565         int dback=backoff[thread.intValue()]*2;
566         if (dback>0)
567           backoff[thread.intValue()]=dback;
568         abortcount++;
569       }
570       return true;
571     } else if (policy==SUICIDE) {
572       timewasted(ev.getThread(), time);
573       reschedule(ev.getThread(), time, r.nextInt(backoff[ev.getThread()]));
574       int dback=backoff[ev.getThread()]*2;
575       if (dback>0)
576         backoff[ev.getThread()]=dback;
577       abortcount++;
578       return false;
579     } else if (policy==TIMESTAMP) {
580       long opponenttime=0;
581
582       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
583         Integer thread=(Integer)thit.next();
584         Event other=currentevents[thread.intValue()];
585         int eventnum=other.getEvent();
586         long otime=other.getTransaction().getTime(other.getEvent());
587         if (otime>opponenttime)
588           opponenttime=otime;
589       }
590       if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
591         //kill ourself
592         timewasted(ev.getThread(), time);
593         reschedule(ev.getThread(), time, 0);
594         abortcount++;
595         return false;
596       } else {
597         //kill the opponents
598         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
599           Integer thread=(Integer)thit.next();
600           timewasted(thread, time);
601           reschedule(thread, time, 0);
602           abortcount++;
603         }
604         return true;    
605       }
606     } else if (policy==THREAD) {
607       long tid=1000;
608
609       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
610         Integer thread=(Integer)thit.next();
611         Event other=currentevents[thread.intValue()];
612         int eventnum=other.getEvent();
613         long otid=thread.intValue();
614         if (tid>otid)
615           tid=otid;
616       }
617       if (ev.getThread()>tid) {
618         //kill ourself
619         timewasted(ev.getThread(), time);
620         reschedule(ev.getThread(), time, 0);
621         abortcount++;
622         return false;
623       } else {
624         //kill the opponents
625         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
626           Integer thread=(Integer)thit.next();
627           timewasted(thread, time);
628           reschedule(thread, time, 0);
629           abortcount++;
630         }
631         return true;    
632       }
633     } else if (policy==ATTACKTIME) {
634       boolean timebased=false;
635       int tev=ev.getThread();
636       timebased|=threadinfo[tev].aborted;
637       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
638         Integer thread=(Integer)thit.next();
639         timebased|=threadinfo[thread.intValue()].aborted;
640       }
641       if (timebased) {
642         long opponenttime=0;
643         
644         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
645           Integer thread=(Integer)thit.next();
646           Event other=currentevents[thread.intValue()];
647           int eventnum=other.getEvent();
648           long otime=other.getTransaction().getTime(other.getEvent());
649           if (otime>opponenttime)
650             opponenttime=otime;
651         }
652         if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
653           //kill ourself
654           timewasted(ev.getThread(), time);
655           reschedule(ev.getThread(), time, 0);
656           threadinfo[ev.getThread()].aborted=true;
657           abortcount++;
658           return false;
659         } else {
660           //kill the opponents
661           for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
662             Integer thread=(Integer)thit.next();
663             timewasted(thread, time);
664             reschedule(thread, time, 0);
665             threadinfo[thread.intValue()].aborted=true;
666             abortcount++;
667           }
668           return true;  
669         }
670       } else {
671         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
672           Integer thread=(Integer)thit.next();
673           timewasted(thread, time);
674           reschedule(thread, time, 0);
675           threadinfo[thread.intValue()].aborted=true;
676           abortcount++;
677         }
678         return true;
679       }
680     } else if (policy==ATTACKTHREAD) {
681       boolean threadbased=false;
682       int tev=ev.getThread();
683       threadbased|=threadinfo[tev].aborted;
684       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
685         Integer thread=(Integer)thit.next();
686         threadbased|=threadinfo[thread.intValue()].aborted;
687       }
688       if (threadbased) {
689         long opponentthr=1000;
690         
691         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
692           Integer thread=(Integer)thit.next();
693           Event other=currentevents[thread.intValue()];
694           int eventnum=other.getEvent();
695           long othr=thread.intValue();
696           if (othr<opponentthr)
697             opponentthr=othr;
698         }
699         if (opponentthr<tev) {
700           //kill ourself
701           timewasted(ev.getThread(), time);
702           reschedule(ev.getThread(), time, 0);
703           threadinfo[ev.getThread()].aborted=true;
704           abortcount++;
705           return false;
706         } else {
707           //kill the opponents
708           for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
709             Integer thread=(Integer)thit.next();
710             timewasted(thread, time);
711             reschedule(thread, time, 0);
712             threadinfo[thread.intValue()].aborted=true;
713             abortcount++;
714           }
715           return true;  
716         }
717       } else {
718         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
719           Integer thread=(Integer)thit.next();
720           timewasted(thread, time);
721           reschedule(thread, time, 0);
722           threadinfo[thread.intValue()].aborted=true;
723           abortcount++;
724         }
725         return true;
726       }
727     }
728
729     //Not eager
730     return true;
731   }
732
733   //Handle current event (read, write, delay) in a transaction and
734   //enqueue the next one
735
736   public void enqueueEvent(Event ev, Transaction trans) {
737     //just enqueue next event
738     int event=ev.getEvent();
739     long currtime=ev.getTime();
740     ObjIndex object=trans.getObjIndex(event);
741     int operation=trans.getEvent(event);
742
743     if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
744       ObjectInfo oi=getmapping(object);
745       
746       if (oi.isRisky()) {
747         if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
748           //we're going to wait
749           boolean deadlocked=true;
750           ObjectInfo toi=oi;
751           for(int i=0;i<checkdepth;i++) {
752             //check if stalling would close the loop
753             if (toi.getOwner()==ev.getThread())
754               break;
755             //see if cycle is broken
756             if (!threadinfo[toi.getOwner()].isStalled()) {
757               deadlocked=false;
758               break;
759             }
760             //follow one more in depth
761             toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
762           }
763           
764           if (!deadlocked) {
765             //don't wait on stalled threads, we could deadlock
766             threadinfo[ev.getThread()].setStall(true);
767             threadinfo[ev.getThread()].setObjIndex(object);
768             if (serStall!=null)
769               serStall.addPoint(ev.getTime(),ev.getThread());
770             oi.addWaiter(ev);
771             return;
772           } else {
773             if (serAvoid!=null)
774               serAvoid.addPoint(ev.getTime(),ev.getThread());
775             deadlockcount++;
776           }
777         } else {
778           //we have object
779           oi.setOwner(ev.getThread());
780         }
781       }
782     }
783     
784     //process the current event
785     if (operation==Transaction.READ) {
786       //record read event
787       if (!rdobjmap.containsKey(object))
788         rdobjmap.put(object,new HashSet());
789       if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
790         //added new object
791         if (countObjects()) {
792           threadinfo[ev.getThread()].priority++;
793         }
794       }
795       if (isEager()) {
796         //do eager contention management
797         Set conflicts=rdConflictSet(ev.getThread(), object);
798         if (conflicts!=null) {
799           if (!handleConflicts(ev, conflicts, currtime)) {
800             ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
801             return;
802           }
803         }
804       }
805     } else if (operation==Transaction.WRITE) {
806       //record write event
807       if (!wrobjmap.containsKey(object))
808         wrobjmap.put(object,new HashSet());
809       if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
810         if (countObjects()) {
811           threadinfo[ev.getThread()].priority++;
812         }
813       }
814       if (isEager()) {
815         Set conflicts=wrConflictSet(ev.getThread(), object);
816         if (conflicts!=null) {
817           if (!handleConflicts(ev, conflicts, currtime)) {
818             ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
819             return;
820           }
821         }
822       }
823     } else if (operation==Transaction.BARRIER) {
824       barriercount--;
825       if (barriercount==0) {
826         for(int i=0;i<e.numThreads();i++) {
827           //enqueue the next event
828           Event bev=currentevents[i];
829           int bevent=bev.getEvent();
830           long bcurrtime=bev.getTime();
831           Transaction btrans=bev.getTransaction();
832           long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
833           Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
834           currentevents[bev.getThread()]=nev;
835           eq.add(nev);
836         }
837         barriercount=e.numThreads();
838       } else {
839         //Do nothing
840         //wait until all threads in barrier
841       }
842       return;
843     }
844     retrycount[ev.getThread()]=0;
845     transferred[ev.getThread()]=0;
846     //enqueue the next event
847     long deltatime=trans.getTime(event+1)-trans.getTime(event);
848     Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
849     currentevents[ev.getThread()]=nev;
850     eq.add(nev);
851   }
852
853   
854   class Event implements Comparable {
855     boolean valid;
856     long time;
857     int num;
858     Transaction t;
859     int threadid;
860     int transnum;
861     boolean stalled;
862
863     public boolean isStalled() {
864       return stalled;
865     }
866
867     public void setStall() {
868       stalled=true;
869     }
870
871     public void makeInvalid() {
872       valid=false;
873     }
874
875     public boolean isValid() {
876       return valid;
877     }
878
879     public int getTransNum() {
880       return transnum;
881     }
882
883     public Transaction getTransaction() {
884       return t;
885     }
886
887     public int getEvent() {
888       return num;
889     }
890
891     public long getTime() {
892       return time;
893     }
894     
895     public void setTime(long time) {
896       this.time=time;
897     }
898
899     public int getThread() {
900       return threadid;
901     }
902
903     public Event(long time, Transaction t, int num, int threadid, int transnum) {
904       this.time=time;
905       this.t=t;
906       this.num=num;
907       this.threadid=threadid;
908       this.transnum=transnum;
909       valid=true;
910     }
911
912     //break ties to allow commits to occur earliest
913     public int compareTo(Object o) {
914       Event e=(Event)o;
915       long delta=time-e.time;
916       if (delta!=0) {
917         if (delta>0)
918           return 1;
919         else
920           return -1;
921       }
922       if (((getEvent()+1)==getTransaction().numEvents())&&
923           (e.getEvent()+1)!=e.getTransaction().numEvents())
924         return -1;
925       if (((getEvent()+1)!=getTransaction().numEvents())&&
926           (e.getEvent()+1)==e.getTransaction().numEvents())
927         return 1;
928       return 0;
929     }
930   }
931 }