3 public class FlexScheduler extends Thread {
11 public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
13 this.abortThreshold=abortThreshold;
14 this.abortRatio=abortRatio;
15 this.checkdepth=checkdepth;
22 public FlexScheduler(Executor e, int policy, Plot p) {
24 barriercount=e.numThreads();
25 aborted=new boolean[e.numThreads()];
26 currentevents=new Event[e.numThreads()];
27 rdobjmap=new Hashtable();
28 wrobjmap=new Hashtable();
31 eq=new PriorityQueue();
32 backoff=new int[e.numThreads()];
33 retrycount=new int[e.numThreads()];
34 transferred=new int[e.numThreads()];
35 objtoinfo=new Hashtable();
36 threadinfo=new ThreadInfo[e.numThreads()];
37 blocked=new boolean[e.numThreads()];
39 for(int i=0;i<e.numThreads();i++) {
40 backoff[i]=BACKOFFSTART;
41 threadinfo[i]=new ThreadInfo(this);
45 serCommit=p.getSeries("COMMIT");
46 serStart=p.getSeries("START");
47 serAbort=p.getSeries("ABORT");
48 serStall=p.getSeries("STALL");
49 serWake=p.getSeries("WAKE");
50 serAvoid=p.getSeries("AVOIDDEADLOCK");
62 public int getDeadLockCount() {
66 //Where to start the backoff delay at
67 public static final int BACKOFFSTART=1;
70 public static final int LAZY=0;
71 public static final int COMMIT=1;
72 public static final int ATTACK=2;
73 public static final int SUICIDE=3;
74 public static final int TIMESTAMP=4;
75 public static final int LOCK=5;
76 public static final int LOCKCOMMIT=6;
77 public static final int RANDOM=7;
78 public static final int KARMA=8;
79 public static final int POLITE=9;
80 public static final int ERUPTION=10;
81 public static final int THREAD=11;
82 public static final int ATTACKTIME=12;
83 public static final int ATTACKTHREAD=13;
85 public static String getName(int policy) {
88 return new String("LAZY");
90 return new String("COMMIT");
92 return new String("ATTACK");
94 return new String("SUICIDE");
96 return new String("TIMESTAMP");
98 return new String("LOCK");
100 return new String("LOCKCOMMIT");
102 return new String("RANDOM");
104 return new String("KARMA");
106 return new String("POLITE");
108 return new String("ERUPTION");
110 return new String("THREAD");
112 return new String("ATTACKTIME");
114 return new String("ATTACKTHREAD");
131 Event[] currentevents;
137 ThreadInfo[] threadinfo;
141 public boolean isEager() {
142 return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION||policy==THREAD||policy==ATTACKTIME||policy==ATTACKTHREAD;
145 public boolean countObjects() {
146 return policy==KARMA||policy==ERUPTION;
149 public boolean isLock() {
150 return policy==LOCK||policy==LOCKCOMMIT;
153 public int getAborts() {
157 public int getCommits() {
161 public long getTime() {
162 return shorttesttime-starttime;
165 public long getStallTime() {
169 public long getBackoffTime() {
170 return backoffcycles;
173 public long getAbortedTime() {
174 return abortedcycles;
177 //Computes wasted time
178 public void timewasted(int currthread, long currtime) {
179 Event e=currentevents[currthread];
180 Transaction trans=e.getTransaction();
181 int eIndex=e.getEvent();
182 long eTime=e.getTime();
183 long timeleft=eTime-currtime;
185 stallcycles-=timeleft; //this time is no longer stalled...back it out
186 timeleft=0;//if the event is stalled, we already waited this time...
188 long totaltime=trans.getTime(eIndex);
189 totaltime-=timeleft;//subtract off time to the next event
190 abortedcycles+=totaltime;
193 //Aborts another thread...
194 public void reschedule(int currthread, long currtime, long backofftime) {
195 long time=currtime+backofftime;
196 backoffcycles+=backofftime;
197 currentevents[currthread].makeInvalid();
198 if (threadinfo[currthread].isStalled()) {
199 //remove from waiter list
200 threadinfo[currthread].setStall(false);
201 getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
203 if (serAbort!=null) {
204 serAbort.addPoint(time, currthread);
206 Transaction trans=currentevents[currthread].getTransaction();
208 releaseObjects(trans, currthread, time);
209 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
210 currentevents[currthread]=nev;
214 //Aborts another thread...
215 public void stall(Event ev, long time, long delay) {
217 ev.setTime(time+delay);
222 private void releaseObjects(Transaction trans, int currthread, long time) {
224 for(int i=0;i<trans.numEvents();i++) {
225 ObjIndex object=trans.getObjIndex(i);
227 if (object!=null&&rdobjmap.containsKey(object)) {
228 ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
230 if (object!=null&&wrobjmap.containsKey(object)) {
231 ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
233 if (object!=null&&objtoinfo.containsKey(object)) {
234 ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
235 if (oi.getOwner()==currentevents[currthread].getThread()) {
239 for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
240 //requeue everyone who was waiting on us and start them back up
241 Event waiter=(Event)waitit.next();
243 waiter.setTime(time);
244 threadinfo[waiter.getThread()].setStall(false);
246 serWake.addPoint(time,waiter.getThread());
247 oi.setOwner(waiter.getThread());
256 /* Initializes things and returns number of transactions */
257 public int startinitial() {
259 for(int i=0;i<e.numThreads();i++) {
260 Transaction trans=e.getThread(i).getTransaction(0);
261 long time=trans.getTime(0);
262 Event ev=new Event(time, trans, 0, i, 0);
265 tcount+=e.getThread(i).numTransactions();
270 public void dosim() {
272 //start first transactions
273 int numtrans=startinitial();
274 System.out.println("Number of transactions="+numtrans);
276 while(!eq.isEmpty()) {
277 Event ev=(Event)eq.poll();
282 Transaction trans=ev.getTransaction();
284 int event=ev.getEvent();
285 long currtime=ev.getTime();
287 if (trans.started&&starttime==-1)
290 if (trans.numEvents()==(event+1)) {
291 tryCommit(ev, trans);
293 if ((tcount%100000)==0)
294 System.out.println("Attempted "+tcount+"transactions "+policy);
296 enqueueEvent(ev, trans);
299 shorttesttime=lasttime;
304 private ObjectInfo getmapping(ObjIndex obj) {
305 if (!objtoinfo.containsKey(obj))
306 objtoinfo.put(obj, new ObjectInfo(this));
307 return (ObjectInfo)objtoinfo.get(obj);
310 public void tryCommit(Event ev, Transaction trans) {
311 //ready to commit this one
312 long currtime=ev.getTime();
313 releaseObjects(trans, ev.getThread(), currtime);
315 //See if we have been flagged as aborted for the lazy case
316 boolean abort=aborted[ev.getThread()];
317 aborted[ev.getThread()]=false;
319 //if it is a transaction, increment commit count
320 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
322 if (serCommit!=null) {
323 serCommit.addPoint(ev.getTime(),ev.getThread());
326 //Reset our backoff counter
327 threadinfo[ev.getThread()].priority=0;
328 threadinfo[ev.getThread()].aborted=false;
329 backoff[ev.getThread()]=BACKOFFSTART;
330 retrycount[ev.getThread()]=0;
331 transferred[ev.getThread()]=0;
333 //abort the other threads
334 for(int i=0;i<trans.numEvents();i++) {
335 ObjIndex object=trans.getObjIndex(i);
336 int op=trans.getEvent(i);
337 //Mark commits to objects
338 if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
340 System.out.println(op);
342 getmapping(object).recordCommit();
344 //Check for threads we might cause to abort
345 if (op==Transaction.WRITE) {
346 HashSet abortset=new HashSet();
347 if (rdobjmap.containsKey(object)) {
348 for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
349 Integer threadid=(Integer)it.next();
350 abortset.add(threadid);
352 ObjectInfo oi=getmapping(object);
357 if (wrobjmap.containsKey(object)) {
358 for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
359 Integer threadid=(Integer)it.next();
360 abortset.add(threadid);
361 if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
362 //if this object hasn't already cause this thread to
363 //abort, then flag it as an abort cause
364 ObjectInfo oi=getmapping(object);
369 for(Iterator abit=abortset.iterator();abit.hasNext();) {
370 Integer threadid=(Integer)abit.next();
371 if (policy==LAZY||policy==LOCK) {
372 //just flag to abort when it trie to commit
373 aborted[threadid]=true;
375 serAbort.addPoint(currtime, threadid);
376 } else if (policy==COMMIT||policy==LOCKCOMMIT) {
377 //abort it immediately
378 timewasted(threadid, currtime);
379 reschedule(threadid, currtime, 0);
387 timewasted(ev.getThread(), currtime);
390 //add next transaction event...could be us if we aborted
391 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
392 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
393 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
394 if (serStart!=null) {
395 if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
396 serStart.addPoint(ev.getTime(),ev.getThread());
400 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
401 currentevents[ev.getThread()]=nev;
406 public Set rdConflictSet(int thread, ObjIndex obj) {
407 if (!wrobjmap.containsKey(obj))
409 HashSet conflictset=new HashSet();
410 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
411 Integer threadid=(Integer)it.next();
412 if (threadid.intValue()!=thread)
413 conflictset.add(threadid);
415 if (conflictset.isEmpty())
421 public Set wrConflictSet(int thread, ObjIndex obj) {
422 HashSet conflictset=new HashSet();
423 if (rdobjmap.containsKey(obj)) {
424 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
425 Integer threadid=(Integer)it.next();
426 if (threadid.intValue()!=thread)
427 conflictset.add(threadid);
430 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
431 Integer threadid=(Integer)it.next();
432 if (threadid.intValue()!=thread)
433 conflictset.add(threadid);
435 if (conflictset.isEmpty())
441 //Takes as parameter -- current transaction read event ev, conflicting
442 //set of threads, and the current time
443 //Returning false causes current transaction not continue to be scheduled
446 public boolean handleConflicts(Event ev, Set threadstokill, long time) {
447 if (policy==RANDOM) {
448 boolean b=r.nextBoolean();
451 int thread=ev.getThread();
452 int dback=backoff[thread]*2;
454 backoff[thread]=dback;
455 stall(ev, time, r.nextInt(backoff[thread]));
458 //abort other transactions
459 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
460 Integer thread=(Integer)thit.next();
461 timewasted(thread, time);
462 reschedule(thread, time, 0);
467 } else if (policy==KARMA) {
469 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
470 Integer thread=(Integer)thit.next();
471 if (threadinfo[thread].priority>maxpriority)
472 maxpriority=threadinfo[thread].priority;
474 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
475 //stall for a little while
476 threadinfo[ev.getThread()].priority--;
477 retrycount[ev.getThread()]++;
478 int rtime=r.nextInt(3000);
479 stall(ev, time, rtime);
483 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
484 Integer thread=(Integer)thit.next();
485 int dback=backoff[thread]*2;
487 backoff[thread]=dback;
488 int atime=r.nextInt(backoff[thread]);
489 timewasted(thread, time);
490 reschedule(thread, time, atime);
495 } else if (policy==ERUPTION) {
497 //abort other transactions
498 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
499 Integer thread=(Integer)thit.next();
500 if (threadinfo[thread].priority>maxpriority)
501 maxpriority=threadinfo[thread].priority;
503 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
505 threadinfo[ev.getThread()].priority--;
506 //stall for a little while
507 int rtime=r.nextInt(3000);
508 stall(ev, time, rtime);
509 int ourpriority=threadinfo[ev.getThread()].priority;
510 ourpriority-=transferred[ev.getThread()];
511 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
512 Integer thread=(Integer)thit.next();
513 threadinfo[thread].priority+=ourpriority;
515 transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
516 retrycount[ev.getThread()]++;
521 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
522 Integer thread=(Integer)thit.next();
523 int dback=backoff[thread]*2;
525 backoff[thread]=dback;
526 int atime=r.nextInt(backoff[thread]);
527 timewasted(thread, time);
528 reschedule(thread, time, atime);
533 } else if (policy==POLITE) {
534 int retry=(++retrycount[ev.getThread()]);
536 retrycount[ev.getThread()]=0;
537 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
538 Integer thread=(Integer)thit.next();
539 timewasted(thread, time);
540 reschedule(thread, time, 0);
546 int stalltime=(1<<(retry-1))*12;
549 stall(ev, time, r.nextInt(stalltime));
552 } else if (policy==ATTACK) {
553 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
554 Integer thread=(Integer)thit.next();
555 timewasted(thread, time);
556 reschedule(thread, time, r.nextInt(backoff[thread.intValue()]));
557 int dback=backoff[thread.intValue()]*2;
559 backoff[thread.intValue()]=dback;
563 } else if (policy==SUICIDE) {
564 timewasted(ev.getThread(), time);
565 reschedule(ev.getThread(), time, r.nextInt(backoff[ev.getThread()]));
566 int dback=backoff[ev.getThread()]*2;
568 backoff[ev.getThread()]=dback;
571 } else if (policy==TIMESTAMP) {
574 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
575 Integer thread=(Integer)thit.next();
576 Event other=currentevents[thread.intValue()];
577 int eventnum=other.getEvent();
578 long otime=other.getTransaction().getTime(other.getEvent());
579 if (otime>opponenttime)
582 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
584 timewasted(ev.getThread(), time);
585 reschedule(ev.getThread(), time, 0);
590 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
591 Integer thread=(Integer)thit.next();
592 timewasted(thread, time);
593 reschedule(thread, time, 0);
598 } else if (policy==THREAD) {
601 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
602 Integer thread=(Integer)thit.next();
603 Event other=currentevents[thread.intValue()];
604 int eventnum=other.getEvent();
605 long otid=thread.intValue();
609 if (ev.getThread()>tid) {
611 timewasted(ev.getThread(), time);
612 reschedule(ev.getThread(), time, 0);
617 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
618 Integer thread=(Integer)thit.next();
619 timewasted(thread, time);
620 reschedule(thread, time, 0);
625 } else if (policy==ATTACKTIME) {
626 boolean timebased=false;
627 int tev=ev.getThread();
628 timebased|=threadinfo[tev].aborted;
629 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
630 Integer thread=(Integer)thit.next();
631 timebased|=threadinfo[thread.intValue()].aborted;
636 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
637 Integer thread=(Integer)thit.next();
638 Event other=currentevents[thread.intValue()];
639 int eventnum=other.getEvent();
640 long otime=other.getTransaction().getTime(other.getEvent());
641 if (otime>opponenttime)
644 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
646 timewasted(ev.getThread(), time);
647 reschedule(ev.getThread(), time, 0);
648 threadinfo[ev.getThread()].aborted=true;
653 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
654 Integer thread=(Integer)thit.next();
655 timewasted(thread, time);
656 reschedule(thread, time, 0);
657 threadinfo[thread.intValue()].aborted=true;
663 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
664 Integer thread=(Integer)thit.next();
665 timewasted(thread, time);
666 reschedule(thread, time, 0);
667 threadinfo[thread.intValue()].aborted=true;
672 } else if (policy==ATTACKTHREAD) {
673 boolean threadbased=false;
674 int tev=ev.getThread();
675 threadbased|=threadinfo[tev].aborted;
676 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
677 Integer thread=(Integer)thit.next();
678 threadbased|=threadinfo[thread.intValue()].aborted;
681 long opponentthr=1000;
683 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
684 Integer thread=(Integer)thit.next();
685 Event other=currentevents[thread.intValue()];
686 int eventnum=other.getEvent();
687 long othr=thread.intValue();
688 if (othr<opponentthr)
691 if (opponentthr<tev) {
693 timewasted(ev.getThread(), time);
694 reschedule(ev.getThread(), time, 0);
695 threadinfo[ev.getThread()].aborted=true;
700 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
701 Integer thread=(Integer)thit.next();
702 timewasted(thread, time);
703 reschedule(thread, time, 0);
704 threadinfo[thread.intValue()].aborted=true;
710 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
711 Integer thread=(Integer)thit.next();
712 timewasted(thread, time);
713 reschedule(thread, time, 0);
714 threadinfo[thread.intValue()].aborted=true;
725 //Handle current event (read, write, delay) in a transaction and
726 //enqueue the next one
728 public void enqueueEvent(Event ev, Transaction trans) {
729 //just enqueue next event
730 int event=ev.getEvent();
731 long currtime=ev.getTime();
732 ObjIndex object=trans.getObjIndex(event);
733 int operation=trans.getEvent(event);
735 if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
736 ObjectInfo oi=getmapping(object);
739 if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
740 //we're going to wait
741 boolean deadlocked=true;
743 for(int i=0;i<checkdepth;i++) {
744 //check if stalling would close the loop
745 if (toi.getOwner()==ev.getThread())
747 //see if cycle is broken
748 if (!threadinfo[toi.getOwner()].isStalled()) {
752 //follow one more in depth
753 toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
757 //don't wait on stalled threads, we could deadlock
758 threadinfo[ev.getThread()].setStall(true);
759 threadinfo[ev.getThread()].setObjIndex(object);
761 serStall.addPoint(ev.getTime(),ev.getThread());
766 serAvoid.addPoint(ev.getTime(),ev.getThread());
771 oi.setOwner(ev.getThread());
776 //process the current event
777 if (operation==Transaction.READ) {
779 if (!rdobjmap.containsKey(object))
780 rdobjmap.put(object,new HashSet());
781 if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
783 if (countObjects()) {
784 threadinfo[ev.getThread()].priority++;
788 //do eager contention management
789 Set conflicts=rdConflictSet(ev.getThread(), object);
790 if (conflicts!=null) {
791 if (!handleConflicts(ev, conflicts, currtime)) {
792 ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
797 } else if (operation==Transaction.WRITE) {
799 if (!wrobjmap.containsKey(object))
800 wrobjmap.put(object,new HashSet());
801 if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
802 if (countObjects()) {
803 threadinfo[ev.getThread()].priority++;
807 Set conflicts=wrConflictSet(ev.getThread(), object);
808 if (conflicts!=null) {
809 if (!handleConflicts(ev, conflicts, currtime)) {
810 ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
815 } else if (operation==Transaction.BARRIER) {
817 if (barriercount==0) {
818 for(int i=0;i<e.numThreads();i++) {
819 //enqueue the next event
820 Event bev=currentevents[i];
821 int bevent=bev.getEvent();
822 long bcurrtime=bev.getTime();
823 Transaction btrans=bev.getTransaction();
824 long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
825 Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
826 currentevents[bev.getThread()]=nev;
829 barriercount=e.numThreads();
832 //wait until all threads in barrier
836 retrycount[ev.getThread()]=0;
837 transferred[ev.getThread()]=0;
838 //enqueue the next event
839 long deltatime=trans.getTime(event+1)-trans.getTime(event);
840 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
841 currentevents[ev.getThread()]=nev;
846 class Event implements Comparable {
855 public boolean isStalled() {
859 public void setStall() {
863 public void makeInvalid() {
867 public boolean isValid() {
871 public int getTransNum() {
875 public Transaction getTransaction() {
879 public int getEvent() {
883 public long getTime() {
887 public void setTime(long time) {
891 public int getThread() {
895 public Event(long time, Transaction t, int num, int threadid, int transnum) {
899 this.threadid=threadid;
900 this.transnum=transnum;
904 //break ties to allow commits to occur earliest
905 public int compareTo(Object o) {
907 long delta=time-e.time;
914 if (((getEvent()+1)==getTransaction().numEvents())&&
915 (e.getEvent()+1)!=e.getTransaction().numEvents())
917 if (((getEvent()+1)!=getTransaction().numEvents())&&
918 (e.getEvent()+1)==e.getTransaction().numEvents())