3 public class FlexScheduler extends Thread {
11 public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
13 this.abortThreshold=abortThreshold;
14 this.abortRatio=abortRatio;
15 this.checkdepth=checkdepth;
22 public FlexScheduler(Executor e, int policy, Plot p) {
24 barriercount=e.numThreads();
25 aborted=new boolean[e.numThreads()];
26 currentevents=new Event[e.numThreads()];
27 rdobjmap=new Hashtable();
28 wrobjmap=new Hashtable();
31 eq=new PriorityQueue();
32 backoff=new int[e.numThreads()];
33 retrycount=new int[e.numThreads()];
34 transferred=new int[e.numThreads()];
35 objtoinfo=new Hashtable();
36 threadinfo=new ThreadInfo[e.numThreads()];
37 blocked=new boolean[e.numThreads()];
39 for(int i=0;i<e.numThreads();i++) {
40 backoff[i]=BACKOFFSTART;
41 threadinfo[i]=new ThreadInfo(this);
45 serCommit=p.getSeries("COMMIT");
46 serStart=p.getSeries("START");
47 serAbort=p.getSeries("ABORT");
48 serStall=p.getSeries("STALL");
49 serWake=p.getSeries("WAKE");
50 serAvoid=p.getSeries("AVOIDDEADLOCK");
62 public int getDeadLockCount() {
66 //Where to start the backoff delay at
67 public static final int BACKOFFSTART=1;
70 public static final int LAZY=0;
71 public static final int COMMIT=1;
72 public static final int ATTACK=2;
73 public static final int SUICIDE=3;
74 public static final int TIMESTAMP=4;
75 public static final int LOCK=5;
76 public static final int LOCKCOMMIT=6;
77 public static final int RANDOM=7;
78 public static final int KARMA=8;
79 public static final int POLITE=9;
80 public static final int ERUPTION=10;
81 public static final int THREAD=11;
82 public static final int ATTACKTIME=12;
83 public static final int ATTACKTHREAD=13;
85 public static String getName(int policy) {
88 return new String("LAZY");
90 return new String("COMMIT");
92 return new String("ATTACK");
94 return new String("SUICIDE");
96 return new String("TIMESTAMP");
98 return new String("LOCK");
100 return new String("LOCKCOMMIT");
102 return new String("RANDOM");
104 return new String("KARMA");
106 return new String("POLITE");
108 return new String("ERUPTION");
110 return new String("THREAD");
112 return new String("ATTACKTIME");
114 return new String("ATTACKTHREAD");
123 long earliesttime=-1;
132 Event[] currentevents;
138 ThreadInfo[] threadinfo;
142 public boolean isEager() {
143 return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION||policy==THREAD||policy==ATTACKTIME||policy==ATTACKTHREAD;
146 public boolean countObjects() {
147 return policy==KARMA||policy==ERUPTION;
150 public boolean isLock() {
151 return policy==LOCK||policy==LOCKCOMMIT;
154 public int getAborts() {
158 public int getCommits() {
162 public long getEarliestTime() {
163 return earliesttime-starttime;
166 public long getTime() {
167 return shorttesttime-starttime;
170 public long getStallTime() {
174 public long getBackoffTime() {
175 return backoffcycles;
178 public long getAbortedTime() {
179 return abortedcycles;
182 //Computes wasted time
183 public void timewasted(int currthread, long currtime) {
184 Event e=currentevents[currthread];
185 Transaction trans=e.getTransaction();
186 int eIndex=e.getEvent();
187 long eTime=e.getTime();
188 long timeleft=eTime-currtime;
190 stallcycles-=timeleft; //this time is no longer stalled...back it out
191 timeleft=0;//if the event is stalled, we already waited this time...
193 long totaltime=trans.getTime(eIndex);
194 totaltime-=timeleft;//subtract off time to the next event
195 abortedcycles+=totaltime;
198 //Aborts another thread...
199 public void reschedule(int currthread, long currtime, long backofftime) {
200 long time=currtime+backofftime;
201 backoffcycles+=backofftime;
202 currentevents[currthread].makeInvalid();
203 if (threadinfo[currthread].isStalled()) {
204 //remove from waiter list
205 threadinfo[currthread].setStall(false);
206 getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
208 if (serAbort!=null) {
209 serAbort.addPoint(time, currthread);
211 Transaction trans=currentevents[currthread].getTransaction();
213 releaseObjects(trans, currthread, time);
214 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
215 currentevents[currthread]=nev;
219 //Aborts another thread...
220 public void stall(Event ev, long time, long delay) {
222 ev.setTime(time+delay);
227 private void releaseObjects(Transaction trans, int currthread, long time) {
229 for(int i=0;i<trans.numEvents();i++) {
230 ObjIndex object=trans.getObjIndex(i);
232 if (object!=null&&rdobjmap.containsKey(object)) {
233 ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
235 if (object!=null&&wrobjmap.containsKey(object)) {
236 ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
238 if (object!=null&&objtoinfo.containsKey(object)) {
239 ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
240 if (oi.getOwner()==currentevents[currthread].getThread()) {
244 for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
245 //requeue everyone who was waiting on us and start them back up
246 Event waiter=(Event)waitit.next();
248 waiter.setTime(time);
249 threadinfo[waiter.getThread()].setStall(false);
251 serWake.addPoint(time,waiter.getThread());
252 oi.setOwner(waiter.getThread());
261 /* Initializes things and returns number of transactions */
262 public int startinitial() {
264 for(int i=0;i<e.numThreads();i++) {
265 Transaction trans=e.getThread(i).getTransaction(0);
266 long time=trans.getTime(0);
267 Event ev=new Event(time, trans, 0, i, 0);
270 tcount+=e.getThread(i).numTransactions();
275 public void dosim() {
277 //start first transactions
278 int numtrans=startinitial();
279 System.out.println("Number of transactions="+numtrans);
281 while(!eq.isEmpty()) {
282 Event ev=(Event)eq.poll();
287 Transaction trans=ev.getTransaction();
289 int event=ev.getEvent();
290 long currtime=ev.getTime();
292 if (trans.started&&starttime==-1)
295 if (trans.numEvents()==(event+1)) {
296 tryCommit(ev, trans);
298 if ((tcount%100000)==0)
299 System.out.println("Attempted "+tcount+"transactions "+policy);
301 enqueueEvent(ev, trans);
304 shorttesttime=lasttime;
309 private ObjectInfo getmapping(ObjIndex obj) {
310 if (!objtoinfo.containsKey(obj))
311 objtoinfo.put(obj, new ObjectInfo(this));
312 return (ObjectInfo)objtoinfo.get(obj);
315 public void tryCommit(Event ev, Transaction trans) {
316 //ready to commit this one
317 long currtime=ev.getTime();
318 releaseObjects(trans, ev.getThread(), currtime);
320 //See if we have been flagged as aborted for the lazy case
321 boolean abort=aborted[ev.getThread()];
322 aborted[ev.getThread()]=false;
324 //if it is a transaction, increment commit count
325 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
327 if (serCommit!=null) {
328 serCommit.addPoint(ev.getTime(),ev.getThread());
331 //Reset our backoff counter
332 threadinfo[ev.getThread()].priority=0;
333 threadinfo[ev.getThread()].aborted=false;
334 backoff[ev.getThread()]=BACKOFFSTART;
335 retrycount[ev.getThread()]=0;
336 transferred[ev.getThread()]=0;
338 //abort the other threads
339 for(int i=0;i<trans.numEvents();i++) {
340 ObjIndex object=trans.getObjIndex(i);
341 int op=trans.getEvent(i);
342 //Mark commits to objects
343 if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
345 System.out.println(op);
347 getmapping(object).recordCommit();
349 //Check for threads we might cause to abort
350 if (op==Transaction.WRITE) {
351 HashSet abortset=new HashSet();
352 if (rdobjmap.containsKey(object)) {
353 for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
354 Integer threadid=(Integer)it.next();
355 abortset.add(threadid);
357 ObjectInfo oi=getmapping(object);
362 if (wrobjmap.containsKey(object)) {
363 for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
364 Integer threadid=(Integer)it.next();
365 abortset.add(threadid);
366 if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
367 //if this object hasn't already cause this thread to
368 //abort, then flag it as an abort cause
369 ObjectInfo oi=getmapping(object);
374 for(Iterator abit=abortset.iterator();abit.hasNext();) {
375 Integer threadid=(Integer)abit.next();
376 if (policy==LAZY||policy==LOCK) {
377 //just flag to abort when it trie to commit
378 aborted[threadid]=true;
380 serAbort.addPoint(currtime, threadid);
381 } else if (policy==COMMIT||policy==LOCKCOMMIT) {
382 //abort it immediately
383 timewasted(threadid, currtime);
384 reschedule(threadid, currtime, 0);
392 timewasted(ev.getThread(), currtime);
395 //add next transaction event...could be us if we aborted
396 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
397 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
398 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
399 if (serStart!=null) {
400 if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
401 serStart.addPoint(ev.getTime(),ev.getThread());
405 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
406 currentevents[ev.getThread()]=nev;
409 if (earliesttime==-1)
410 earliesttime=currtime;
414 public Set rdConflictSet(int thread, ObjIndex obj) {
415 if (!wrobjmap.containsKey(obj))
417 HashSet conflictset=new HashSet();
418 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
419 Integer threadid=(Integer)it.next();
420 if (threadid.intValue()!=thread)
421 conflictset.add(threadid);
423 if (conflictset.isEmpty())
429 public Set wrConflictSet(int thread, ObjIndex obj) {
430 HashSet conflictset=new HashSet();
431 if (rdobjmap.containsKey(obj)) {
432 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
433 Integer threadid=(Integer)it.next();
434 if (threadid.intValue()!=thread)
435 conflictset.add(threadid);
438 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
439 Integer threadid=(Integer)it.next();
440 if (threadid.intValue()!=thread)
441 conflictset.add(threadid);
443 if (conflictset.isEmpty())
449 //Takes as parameter -- current transaction read event ev, conflicting
450 //set of threads, and the current time
451 //Returning false causes current transaction not continue to be scheduled
454 public boolean handleConflicts(Event ev, Set threadstokill, long time) {
455 if (policy==RANDOM) {
456 boolean b=r.nextBoolean();
459 int thread=ev.getThread();
460 int dback=backoff[thread]*2;
462 backoff[thread]=dback;
463 stall(ev, time, r.nextInt(backoff[thread]));
466 //abort other transactions
467 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
468 Integer thread=(Integer)thit.next();
469 timewasted(thread, time);
470 reschedule(thread, time, 0);
475 } else if (policy==KARMA) {
477 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
478 Integer thread=(Integer)thit.next();
479 if (threadinfo[thread].priority>maxpriority)
480 maxpriority=threadinfo[thread].priority;
482 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
483 //stall for a little while
484 threadinfo[ev.getThread()].priority--;
485 retrycount[ev.getThread()]++;
486 int rtime=r.nextInt(3000);
487 stall(ev, time, rtime);
491 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
492 Integer thread=(Integer)thit.next();
493 int dback=backoff[thread]*2;
495 backoff[thread]=dback;
496 int atime=r.nextInt(backoff[thread]);
497 timewasted(thread, time);
498 reschedule(thread, time, atime);
503 } else if (policy==ERUPTION) {
505 //abort other transactions
506 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
507 Integer thread=(Integer)thit.next();
508 if (threadinfo[thread].priority>maxpriority)
509 maxpriority=threadinfo[thread].priority;
511 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
513 threadinfo[ev.getThread()].priority--;
514 //stall for a little while
515 int rtime=r.nextInt(3000);
516 stall(ev, time, rtime);
517 int ourpriority=threadinfo[ev.getThread()].priority;
518 ourpriority-=transferred[ev.getThread()];
519 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
520 Integer thread=(Integer)thit.next();
521 threadinfo[thread].priority+=ourpriority;
523 transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
524 retrycount[ev.getThread()]++;
529 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
530 Integer thread=(Integer)thit.next();
531 int dback=backoff[thread]*2;
533 backoff[thread]=dback;
534 int atime=r.nextInt(backoff[thread]);
535 timewasted(thread, time);
536 reschedule(thread, time, atime);
541 } else if (policy==POLITE) {
542 int retry=(++retrycount[ev.getThread()]);
544 retrycount[ev.getThread()]=0;
545 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
546 Integer thread=(Integer)thit.next();
547 timewasted(thread, time);
548 reschedule(thread, time, 0);
554 int stalltime=(1<<(retry-1))*12;
557 stall(ev, time, r.nextInt(stalltime));
560 } else if (policy==ATTACK) {
561 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
562 Integer thread=(Integer)thit.next();
563 timewasted(thread, time);
564 reschedule(thread, time, r.nextInt(backoff[thread.intValue()]));
565 int dback=backoff[thread.intValue()]*2;
567 backoff[thread.intValue()]=dback;
571 } else if (policy==SUICIDE) {
572 timewasted(ev.getThread(), time);
573 reschedule(ev.getThread(), time, r.nextInt(backoff[ev.getThread()]));
574 int dback=backoff[ev.getThread()]*2;
576 backoff[ev.getThread()]=dback;
579 } else if (policy==TIMESTAMP) {
582 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
583 Integer thread=(Integer)thit.next();
584 Event other=currentevents[thread.intValue()];
585 int eventnum=other.getEvent();
586 long otime=other.getTransaction().getTime(other.getEvent());
587 if (otime>opponenttime)
590 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
592 timewasted(ev.getThread(), time);
593 reschedule(ev.getThread(), time, 0);
598 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
599 Integer thread=(Integer)thit.next();
600 timewasted(thread, time);
601 reschedule(thread, time, 0);
606 } else if (policy==THREAD) {
609 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
610 Integer thread=(Integer)thit.next();
611 Event other=currentevents[thread.intValue()];
612 int eventnum=other.getEvent();
613 long otid=thread.intValue();
617 if (ev.getThread()>tid) {
619 timewasted(ev.getThread(), time);
620 reschedule(ev.getThread(), time, 0);
625 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
626 Integer thread=(Integer)thit.next();
627 timewasted(thread, time);
628 reschedule(thread, time, 0);
633 } else if (policy==ATTACKTIME) {
634 boolean timebased=false;
635 int tev=ev.getThread();
636 timebased|=threadinfo[tev].aborted;
637 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
638 Integer thread=(Integer)thit.next();
639 timebased|=threadinfo[thread.intValue()].aborted;
644 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
645 Integer thread=(Integer)thit.next();
646 Event other=currentevents[thread.intValue()];
647 int eventnum=other.getEvent();
648 long otime=other.getTransaction().getTime(other.getEvent());
649 if (otime>opponenttime)
652 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
654 timewasted(ev.getThread(), time);
655 reschedule(ev.getThread(), time, 0);
656 threadinfo[ev.getThread()].aborted=true;
661 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
662 Integer thread=(Integer)thit.next();
663 timewasted(thread, time);
664 reschedule(thread, time, 0);
665 threadinfo[thread.intValue()].aborted=true;
671 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
672 Integer thread=(Integer)thit.next();
673 timewasted(thread, time);
674 reschedule(thread, time, 0);
675 threadinfo[thread.intValue()].aborted=true;
680 } else if (policy==ATTACKTHREAD) {
681 boolean threadbased=false;
682 int tev=ev.getThread();
683 threadbased|=threadinfo[tev].aborted;
684 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
685 Integer thread=(Integer)thit.next();
686 threadbased|=threadinfo[thread.intValue()].aborted;
689 long opponentthr=1000;
691 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
692 Integer thread=(Integer)thit.next();
693 Event other=currentevents[thread.intValue()];
694 int eventnum=other.getEvent();
695 long othr=thread.intValue();
696 if (othr<opponentthr)
699 if (opponentthr<tev) {
701 timewasted(ev.getThread(), time);
702 reschedule(ev.getThread(), time, 0);
703 threadinfo[ev.getThread()].aborted=true;
708 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
709 Integer thread=(Integer)thit.next();
710 timewasted(thread, time);
711 reschedule(thread, time, 0);
712 threadinfo[thread.intValue()].aborted=true;
718 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
719 Integer thread=(Integer)thit.next();
720 timewasted(thread, time);
721 reschedule(thread, time, 0);
722 threadinfo[thread.intValue()].aborted=true;
733 //Handle current event (read, write, delay) in a transaction and
734 //enqueue the next one
736 public void enqueueEvent(Event ev, Transaction trans) {
737 //just enqueue next event
738 int event=ev.getEvent();
739 long currtime=ev.getTime();
740 ObjIndex object=trans.getObjIndex(event);
741 int operation=trans.getEvent(event);
743 if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
744 ObjectInfo oi=getmapping(object);
747 if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
748 //we're going to wait
749 boolean deadlocked=true;
751 for(int i=0;i<checkdepth;i++) {
752 //check if stalling would close the loop
753 if (toi.getOwner()==ev.getThread())
755 //see if cycle is broken
756 if (!threadinfo[toi.getOwner()].isStalled()) {
760 //follow one more in depth
761 toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
765 //don't wait on stalled threads, we could deadlock
766 threadinfo[ev.getThread()].setStall(true);
767 threadinfo[ev.getThread()].setObjIndex(object);
769 serStall.addPoint(ev.getTime(),ev.getThread());
774 serAvoid.addPoint(ev.getTime(),ev.getThread());
779 oi.setOwner(ev.getThread());
784 //process the current event
785 if (operation==Transaction.READ) {
787 if (!rdobjmap.containsKey(object))
788 rdobjmap.put(object,new HashSet());
789 if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
791 if (countObjects()) {
792 threadinfo[ev.getThread()].priority++;
796 //do eager contention management
797 Set conflicts=rdConflictSet(ev.getThread(), object);
798 if (conflicts!=null) {
799 if (!handleConflicts(ev, conflicts, currtime)) {
800 ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
805 } else if (operation==Transaction.WRITE) {
807 if (!wrobjmap.containsKey(object))
808 wrobjmap.put(object,new HashSet());
809 if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
810 if (countObjects()) {
811 threadinfo[ev.getThread()].priority++;
815 Set conflicts=wrConflictSet(ev.getThread(), object);
816 if (conflicts!=null) {
817 if (!handleConflicts(ev, conflicts, currtime)) {
818 ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
823 } else if (operation==Transaction.BARRIER) {
825 if (barriercount==0) {
826 for(int i=0;i<e.numThreads();i++) {
827 //enqueue the next event
828 Event bev=currentevents[i];
829 int bevent=bev.getEvent();
830 long bcurrtime=bev.getTime();
831 Transaction btrans=bev.getTransaction();
832 long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
833 Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
834 currentevents[bev.getThread()]=nev;
837 barriercount=e.numThreads();
840 //wait until all threads in barrier
844 retrycount[ev.getThread()]=0;
845 transferred[ev.getThread()]=0;
846 //enqueue the next event
847 long deltatime=trans.getTime(event+1)-trans.getTime(event);
848 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
849 currentevents[ev.getThread()]=nev;
854 class Event implements Comparable {
863 public boolean isStalled() {
867 public void setStall() {
871 public void makeInvalid() {
875 public boolean isValid() {
879 public int getTransNum() {
883 public Transaction getTransaction() {
887 public int getEvent() {
891 public long getTime() {
895 public void setTime(long time) {
899 public int getThread() {
903 public Event(long time, Transaction t, int num, int threadid, int transnum) {
907 this.threadid=threadid;
908 this.transnum=transnum;
912 //break ties to allow commits to occur earliest
913 public int compareTo(Object o) {
915 long delta=time-e.time;
922 if (((getEvent()+1)==getTransaction().numEvents())&&
923 (e.getEvent()+1)!=e.getTransaction().numEvents())
925 if (((getEvent()+1)!=getTransaction().numEvents())&&
926 (e.getEvent()+1)==e.getTransaction().numEvents())