3 public class FlexScheduler extends Thread {
11 public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
13 this.abortThreshold=abortThreshold;
14 this.abortRatio=abortRatio;
15 this.checkdepth=checkdepth;
22 public FlexScheduler(Executor e, int policy, Plot p) {
24 barriercount=e.numThreads();
25 aborted=new boolean[e.numThreads()];
26 currentevents=new Event[e.numThreads()];
27 rdobjmap=new Hashtable();
28 wrobjmap=new Hashtable();
31 eq=new PriorityQueue();
32 backoff=new int[e.numThreads()];
33 retrycount=new int[e.numThreads()];
34 transferred=new int[e.numThreads()];
35 objtoinfo=new Hashtable();
36 threadinfo=new ThreadInfo[e.numThreads()];
37 blocked=new boolean[e.numThreads()];
39 for(int i=0;i<e.numThreads();i++) {
40 backoff[i]=BACKOFFSTART;
41 threadinfo[i]=new ThreadInfo(this);
45 serCommit=p.getSeries("COMMIT");
46 serStart=p.getSeries("START");
47 serAbort=p.getSeries("ABORT");
48 serStall=p.getSeries("STALL");
49 serWake=p.getSeries("WAKE");
50 serAvoid=p.getSeries("AVOIDDEADLOCK");
62 public int getDeadLockCount() {
66 //Where to start the backoff delay at
67 public static final int BACKOFFSTART=1;
70 public static final int LAZY=0;
71 public static final int COMMIT=1;
72 public static final int ATTACK=2;
73 public static final int SUICIDE=3;
74 public static final int TIMESTAMP=4;
75 public static final int LOCK=5;
76 public static final int LOCKCOMMIT=6;
77 public static final int RANDOM=7;
78 public static final int KARMA=8;
79 public static final int POLITE=9;
80 public static final int ERUPTION=10;
81 public static final int THREAD=11;
82 public static final int ATTACKTIME=12;
83 public static final int ATTACKTHREAD=13;
85 public static String getName(int policy) {
88 return new String("LAZY");
90 return new String("COMMIT");
92 return new String("ATTACK");
94 return new String("SUICIDE");
96 return new String("TIMESTAMP");
98 return new String("LOCK");
100 return new String("LOCKCOMMIT");
102 return new String("RANDOM");
104 return new String("KARMA");
106 return new String("POLITE");
108 return new String("ERUPTION");
110 return new String("THREAD");
112 return new String("ATTACKTIME");
114 return new String("ATTACKTHREAD");
131 Event[] currentevents;
137 ThreadInfo[] threadinfo;
141 public boolean isEager() {
142 return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION||policy==THREAD||policy==ATTACKTIME||policy==ATTACKTHREAD;
145 public boolean countObjects() {
146 return policy==KARMA||policy==ERUPTION;
149 public boolean isLock() {
150 return policy==LOCK||policy==LOCKCOMMIT;
153 public int getAborts() {
157 public int getCommits() {
161 public long getTime() {
162 return shorttesttime-starttime;
165 public long getStallTime() {
169 public long getBackoffTime() {
170 return backoffcycles;
173 public long getAbortedTime() {
174 return abortedcycles;
177 //Computes wasted time
178 public void timewasted(int currthread, long currtime) {
179 Event e=currentevents[currthread];
180 Transaction trans=e.getTransaction();
181 int eIndex=e.getEvent();
182 long eTime=e.getTime();
183 long timeleft=eTime-currtime;
185 for(int i=0;i<=eIndex;i++)
186 totaltime+=trans.getTime(i);
187 totaltime-=timeleft;//subtract off time to the next event
188 abortedcycles+=totaltime;
191 //Aborts another thread...
192 public void reschedule(int currthread, long currtime, long backofftime) {
193 long time=currtime+backofftime;
194 backoffcycles+=backofftime;
195 currentevents[currthread].makeInvalid();
196 if (threadinfo[currthread].isStalled()) {
197 //remove from waiter list
198 threadinfo[currthread].setStall(false);
199 getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
201 if (serAbort!=null) {
202 serAbort.addPoint(time, currthread);
204 Transaction trans=currentevents[currthread].getTransaction();
206 releaseObjects(trans, currthread, time);
207 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
208 currentevents[currthread]=nev;
212 //Aborts another thread...
213 public void stall(Event ev, long time, long delay) {
215 ev.setTime(time+delay);
219 private void releaseObjects(Transaction trans, int currthread, long time) {
221 for(int i=0;i<trans.numEvents();i++) {
222 ObjIndex object=trans.getObjIndex(i);
224 if (object!=null&&rdobjmap.containsKey(object)) {
225 ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
227 if (object!=null&&wrobjmap.containsKey(object)) {
228 ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
230 if (object!=null&&objtoinfo.containsKey(object)) {
231 ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
232 if (oi.getOwner()==currentevents[currthread].getThread()) {
236 for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
237 //requeue everyone who was waiting on us and start them back up
238 Event waiter=(Event)waitit.next();
240 waiter.setTime(time);
241 threadinfo[waiter.getThread()].setStall(false);
243 serWake.addPoint(time,waiter.getThread());
244 oi.setOwner(waiter.getThread());
253 /* Initializes things and returns number of transactions */
254 public int startinitial() {
256 for(int i=0;i<e.numThreads();i++) {
257 Transaction trans=e.getThread(i).getTransaction(0);
258 long time=trans.getTime(0);
259 Event ev=new Event(time, trans, 0, i, 0);
262 tcount+=e.getThread(i).numTransactions();
267 public void dosim() {
269 //start first transactions
270 int numtrans=startinitial();
271 System.out.println("Number of transactions="+numtrans);
273 while(!eq.isEmpty()) {
274 Event ev=(Event)eq.poll();
279 Transaction trans=ev.getTransaction();
281 int event=ev.getEvent();
282 long currtime=ev.getTime();
284 if (trans.started&&starttime==-1)
287 if (trans.numEvents()==(event+1)) {
288 tryCommit(ev, trans);
290 if ((tcount%100000)==0)
291 System.out.println("Attempted "+tcount+"transactions "+policy);
293 enqueueEvent(ev, trans);
296 shorttesttime=lasttime;
301 private ObjectInfo getmapping(ObjIndex obj) {
302 if (!objtoinfo.containsKey(obj))
303 objtoinfo.put(obj, new ObjectInfo(this));
304 return (ObjectInfo)objtoinfo.get(obj);
307 public void tryCommit(Event ev, Transaction trans) {
308 //ready to commit this one
309 long currtime=ev.getTime();
310 releaseObjects(trans, ev.getThread(), currtime);
312 //See if we have been flagged as aborted for the lazy case
313 boolean abort=aborted[ev.getThread()];
314 aborted[ev.getThread()]=false;
316 //if it is a transaction, increment commit count
317 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
319 if (serCommit!=null) {
320 serCommit.addPoint(ev.getTime(),ev.getThread());
323 //Reset our backoff counter
324 threadinfo[ev.getThread()].priority=0;
325 threadinfo[ev.getThread()].aborted=false;
326 backoff[ev.getThread()]=BACKOFFSTART;
327 retrycount[ev.getThread()]=0;
328 transferred[ev.getThread()]=0;
330 //abort the other threads
331 for(int i=0;i<trans.numEvents();i++) {
332 ObjIndex object=trans.getObjIndex(i);
333 int op=trans.getEvent(i);
334 //Mark commits to objects
335 if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
337 System.out.println(op);
339 getmapping(object).recordCommit();
341 //Check for threads we might cause to abort
342 if (op==Transaction.WRITE) {
343 HashSet abortset=new HashSet();
344 if (rdobjmap.containsKey(object)) {
345 for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
346 Integer threadid=(Integer)it.next();
347 abortset.add(threadid);
349 ObjectInfo oi=getmapping(object);
354 if (wrobjmap.containsKey(object)) {
355 for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
356 Integer threadid=(Integer)it.next();
357 abortset.add(threadid);
358 if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
359 //if this object hasn't already cause this thread to
360 //abort, then flag it as an abort cause
361 ObjectInfo oi=getmapping(object);
366 for(Iterator abit=abortset.iterator();abit.hasNext();) {
367 Integer threadid=(Integer)abit.next();
368 if (policy==LAZY||policy==LOCK) {
369 //just flag to abort when it trie to commit
370 aborted[threadid]=true;
372 serAbort.addPoint(currtime, threadid);
373 } else if (policy==COMMIT||policy==LOCKCOMMIT) {
374 //abort it immediately
375 timewasted(threadid, currtime);
376 reschedule(threadid, currtime, 0);
384 timewasted(ev.getThread(), currtime);
387 //add next transaction event...could be us if we aborted
388 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
389 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
390 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
391 if (serStart!=null) {
392 if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
393 serStart.addPoint(ev.getTime(),ev.getThread());
397 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
398 currentevents[ev.getThread()]=nev;
403 public Set rdConflictSet(int thread, ObjIndex obj) {
404 if (!wrobjmap.containsKey(obj))
406 HashSet conflictset=new HashSet();
407 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
408 Integer threadid=(Integer)it.next();
409 if (threadid.intValue()!=thread)
410 conflictset.add(threadid);
412 if (conflictset.isEmpty())
418 public Set wrConflictSet(int thread, ObjIndex obj) {
419 HashSet conflictset=new HashSet();
420 if (rdobjmap.containsKey(obj)) {
421 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
422 Integer threadid=(Integer)it.next();
423 if (threadid.intValue()!=thread)
424 conflictset.add(threadid);
427 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
428 Integer threadid=(Integer)it.next();
429 if (threadid.intValue()!=thread)
430 conflictset.add(threadid);
432 if (conflictset.isEmpty())
438 //Takes as parameter -- current transaction read event ev, conflicting
439 //set of threads, and the current time
440 //Returning false causes current transaction not continue to be scheduled
443 public boolean handleConflicts(Event ev, Set threadstokill, long time) {
444 if (policy==RANDOM) {
445 boolean b=r.nextBoolean();
448 int thread=ev.getThread();
449 int dback=backoff[thread]*2;
451 backoff[thread]=dback;
452 stall(ev, time, r.nextInt(backoff[thread]));
455 //abort other transactions
456 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
457 Integer thread=(Integer)thit.next();
458 timewasted(thread, time);
459 reschedule(thread, time, 0);
464 } else if (policy==KARMA) {
466 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
467 Integer thread=(Integer)thit.next();
468 if (threadinfo[thread].priority>maxpriority)
469 maxpriority=threadinfo[thread].priority;
471 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
472 //stall for a little while
473 threadinfo[ev.getThread()].priority--;
474 retrycount[ev.getThread()]++;
475 int rtime=r.nextInt(3000);
476 stall(ev, time, rtime);
480 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
481 Integer thread=(Integer)thit.next();
482 int dback=backoff[thread]*2;
484 backoff[thread]=dback;
485 int atime=r.nextInt(backoff[thread]);
486 timewasted(thread, time);
487 reschedule(thread, time, atime);
492 } else if (policy==ERUPTION) {
494 //abort other transactions
495 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
496 Integer thread=(Integer)thit.next();
497 if (threadinfo[thread].priority>maxpriority)
498 maxpriority=threadinfo[thread].priority;
500 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
502 threadinfo[ev.getThread()].priority--;
503 //stall for a little while
504 int rtime=r.nextInt(3000);
505 stall(ev, time, rtime);
506 int ourpriority=threadinfo[ev.getThread()].priority;
507 ourpriority-=transferred[ev.getThread()];
508 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
509 Integer thread=(Integer)thit.next();
510 threadinfo[thread].priority+=ourpriority;
512 transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
513 retrycount[ev.getThread()]++;
518 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
519 Integer thread=(Integer)thit.next();
520 int dback=backoff[thread]*2;
522 backoff[thread]=dback;
523 int atime=r.nextInt(backoff[thread]);
524 timewasted(thread, time);
525 reschedule(thread, time, atime);
530 } else if (policy==POLITE) {
531 int retry=(++retrycount[ev.getThread()]);
533 retrycount[ev.getThread()]=0;
534 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
535 Integer thread=(Integer)thit.next();
536 timewasted(thread, time);
537 reschedule(thread, time, 0);
543 int stalltime=(1<<(retry-1))*12;
546 stall(ev, time, r.nextInt(stalltime));
549 } else if (policy==ATTACK) {
550 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
551 Integer thread=(Integer)thit.next();
552 timewasted(thread, time);
553 reschedule(thread, time, r.nextInt(backoff[thread.intValue()]));
554 int dback=backoff[thread.intValue()]*2;
556 backoff[thread.intValue()]=dback;
560 } else if (policy==SUICIDE) {
561 timewasted(ev.getThread(), time);
562 reschedule(ev.getThread(), time, r.nextInt(backoff[ev.getThread()]));
563 int dback=backoff[ev.getThread()]*2;
565 backoff[ev.getThread()]=dback;
568 } else if (policy==TIMESTAMP) {
571 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
572 Integer thread=(Integer)thit.next();
573 Event other=currentevents[thread.intValue()];
574 int eventnum=other.getEvent();
575 long otime=other.getTransaction().getTime(other.getEvent());
576 if (otime>opponenttime)
579 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
581 timewasted(ev.getThread(), time);
582 reschedule(ev.getThread(), time, 0);
587 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
588 Integer thread=(Integer)thit.next();
589 timewasted(thread, time);
590 reschedule(thread, time, 0);
595 } else if (policy==THREAD) {
598 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
599 Integer thread=(Integer)thit.next();
600 Event other=currentevents[thread.intValue()];
601 int eventnum=other.getEvent();
602 long otid=thread.intValue();
606 if (ev.getThread()>tid) {
608 timewasted(ev.getThread(), time);
609 reschedule(ev.getThread(), time, 0);
614 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
615 Integer thread=(Integer)thit.next();
616 timewasted(thread, time);
617 reschedule(thread, time, 0);
622 } else if (policy==ATTACKTIME) {
623 boolean timebased=false;
624 int tev=ev.getThread();
625 timebased|=threadinfo[tev].aborted;
626 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
627 Integer thread=(Integer)thit.next();
628 timebased|=threadinfo[thread.intValue()].aborted;
633 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
634 Integer thread=(Integer)thit.next();
635 Event other=currentevents[thread.intValue()];
636 int eventnum=other.getEvent();
637 long otime=other.getTransaction().getTime(other.getEvent());
638 if (otime>opponenttime)
641 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
643 timewasted(ev.getThread(), time);
644 reschedule(ev.getThread(), time, 0);
645 threadinfo[ev.getThread()].aborted=true;
650 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
651 Integer thread=(Integer)thit.next();
652 timewasted(thread, time);
653 reschedule(thread, time, 0);
654 threadinfo[thread.intValue()].aborted=true;
660 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
661 Integer thread=(Integer)thit.next();
662 timewasted(thread, time);
663 reschedule(thread, time, 0);
664 threadinfo[thread.intValue()].aborted=true;
669 } else if (policy==ATTACKTHREAD) {
670 boolean threadbased=false;
671 int tev=ev.getThread();
672 threadbased|=threadinfo[tev].aborted;
673 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
674 Integer thread=(Integer)thit.next();
675 threadbased|=threadinfo[thread.intValue()].aborted;
678 long opponentthr=1000;
680 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
681 Integer thread=(Integer)thit.next();
682 Event other=currentevents[thread.intValue()];
683 int eventnum=other.getEvent();
684 long othr=thread.intValue();
685 if (othr<opponentthr)
688 if (opponentthr<tev) {
690 timewasted(ev.getThread(), time);
691 reschedule(ev.getThread(), time, 0);
692 threadinfo[ev.getThread()].aborted=true;
697 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
698 Integer thread=(Integer)thit.next();
699 timewasted(thread, time);
700 reschedule(thread, time, 0);
701 threadinfo[thread.intValue()].aborted=true;
707 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
708 Integer thread=(Integer)thit.next();
709 timewasted(thread, time);
710 reschedule(thread, time, 0);
711 threadinfo[thread.intValue()].aborted=true;
722 //Handle current event (read, write, delay) in a transaction and
723 //enqueue the next one
725 public void enqueueEvent(Event ev, Transaction trans) {
726 //just enqueue next event
727 int event=ev.getEvent();
728 long currtime=ev.getTime();
729 ObjIndex object=trans.getObjIndex(event);
730 int operation=trans.getEvent(event);
732 if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
733 ObjectInfo oi=getmapping(object);
736 if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
737 //we're going to wait
738 boolean deadlocked=true;
740 for(int i=0;i<checkdepth;i++) {
741 //check if stalling would close the loop
742 if (toi.getOwner()==ev.getThread())
744 //see if cycle is broken
745 if (!threadinfo[toi.getOwner()].isStalled()) {
749 //follow one more in depth
750 toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
754 //don't wait on stalled threads, we could deadlock
755 threadinfo[ev.getThread()].setStall(true);
756 threadinfo[ev.getThread()].setObjIndex(object);
758 serStall.addPoint(ev.getTime(),ev.getThread());
763 serAvoid.addPoint(ev.getTime(),ev.getThread());
768 oi.setOwner(ev.getThread());
773 //process the current event
774 if (operation==Transaction.READ) {
776 if (!rdobjmap.containsKey(object))
777 rdobjmap.put(object,new HashSet());
778 if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
780 if (countObjects()) {
781 threadinfo[ev.getThread()].priority++;
785 //do eager contention management
786 Set conflicts=rdConflictSet(ev.getThread(), object);
787 if (conflicts!=null) {
788 if (!handleConflicts(ev, conflicts, currtime)) {
789 ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
794 } else if (operation==Transaction.WRITE) {
796 if (!wrobjmap.containsKey(object))
797 wrobjmap.put(object,new HashSet());
798 if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
799 if (countObjects()) {
800 threadinfo[ev.getThread()].priority++;
804 Set conflicts=wrConflictSet(ev.getThread(), object);
805 if (conflicts!=null) {
806 if (!handleConflicts(ev, conflicts, currtime)) {
807 ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
812 } else if (operation==Transaction.BARRIER) {
814 if (barriercount==0) {
815 for(int i=0;i<e.numThreads();i++) {
816 //enqueue the next event
817 Event bev=currentevents[i];
818 int bevent=bev.getEvent();
819 long bcurrtime=bev.getTime();
820 Transaction btrans=bev.getTransaction();
821 long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
822 Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
823 currentevents[bev.getThread()]=nev;
826 barriercount=e.numThreads();
829 //wait until all threads in barrier
833 retrycount[ev.getThread()]=0;
834 transferred[ev.getThread()]=0;
835 //enqueue the next event
836 long deltatime=trans.getTime(event+1)-trans.getTime(event);
837 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
838 currentevents[ev.getThread()]=nev;
843 class Event implements Comparable {
851 public void makeInvalid() {
855 public boolean isValid() {
859 public int getTransNum() {
863 public Transaction getTransaction() {
867 public int getEvent() {
871 public long getTime() {
875 public void setTime(long time) {
879 public int getThread() {
883 public Event(long time, Transaction t, int num, int threadid, int transnum) {
887 this.threadid=threadid;
888 this.transnum=transnum;
892 //break ties to allow commits to occur earliest
893 public int compareTo(Object o) {
895 long delta=time-e.time;
902 if (((getEvent()+1)==getTransaction().numEvents())&&
903 (e.getEvent()+1)!=e.getTransaction().numEvents())
905 if (((getEvent()+1)!=getTransaction().numEvents())&&
906 (e.getEvent()+1)==e.getTransaction().numEvents())