3 public class FlexScheduler extends Thread {
11 public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
13 this.abortThreshold=abortThreshold;
14 this.abortRatio=abortRatio;
15 this.checkdepth=checkdepth;
22 public FlexScheduler(Executor e, int policy, Plot p) {
24 barriercount=e.numThreads();
25 aborted=new boolean[e.numThreads()];
26 currentevents=new Event[e.numThreads()];
27 rdobjmap=new Hashtable();
28 wrobjmap=new Hashtable();
31 eq=new PriorityQueue();
32 backoff=new int[e.numThreads()];
33 retrycount=new int[e.numThreads()];
34 transferred=new int[e.numThreads()];
35 objtoinfo=new Hashtable();
36 threadinfo=new ThreadInfo[e.numThreads()];
37 blocked=new boolean[e.numThreads()];
39 for(int i=0;i<e.numThreads();i++) {
40 backoff[i]=BACKOFFSTART;
41 threadinfo[i]=new ThreadInfo(this);
45 serCommit=p.getSeries("COMMIT");
46 serStart=p.getSeries("START");
47 serAbort=p.getSeries("ABORT");
48 serStall=p.getSeries("STALL");
49 serWake=p.getSeries("WAKE");
50 serAvoid=p.getSeries("AVOIDDEADLOCK");
62 public int getDeadLockCount() {
66 //Where to start the backoff delay at
67 public static final int BACKOFFSTART=1;
70 public static final int LAZY=0;
71 public static final int COMMIT=1;
72 public static final int ATTACK=2;
73 public static final int SUICIDE=3;
74 public static final int TIMESTAMP=4;
75 public static final int LOCK=5;
76 public static final int LOCKCOMMIT=6;
77 public static final int RANDOM=7;
78 public static final int KARMA=8;
79 public static final int POLITE=9;
80 public static final int ERUPTION=10;
81 public static final int THREAD=11;
82 public static final int ATTACKTIME=12;
83 public static final int ATTACKTHREAD=13;
97 Event[] currentevents;
103 ThreadInfo[] threadinfo;
107 public boolean isEager() {
108 return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION||policy==THREAD||policy==ATTACKTIME||policy==ATTACKTHREAD;
111 public boolean countObjects() {
112 return policy==KARMA||policy==ERUPTION;
115 public boolean isLock() {
116 return policy==LOCK||policy==LOCKCOMMIT;
119 public int getAborts() {
123 public int getCommits() {
127 public long getTime() {
128 return shorttesttime-starttime;
131 public long getStallTime() {
135 public long getBackoffTime() {
136 return backoffcycles;
139 //Aborts another thread...
140 public void reschedule(int currthread, long currtime, long backofftime) {
141 long time=currtime+backofftime;
142 backoffcycles+=backofftime;
143 currentevents[currthread].makeInvalid();
144 if (threadinfo[currthread].isStalled()) {
145 //remove from waiter list
146 threadinfo[currthread].setStall(false);
147 getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
149 if (serAbort!=null) {
150 serAbort.addPoint(time, currthread);
152 Transaction trans=currentevents[currthread].getTransaction();
154 releaseObjects(trans, currthread, time);
155 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
156 currentevents[currthread]=nev;
160 //Aborts another thread...
161 public void stall(Event ev, long time, long delay) {
163 ev.setTime(time+delay);
167 private void releaseObjects(Transaction trans, int currthread, long time) {
169 for(int i=0;i<trans.numEvents();i++) {
170 ObjIndex object=trans.getObjIndex(i);
172 if (object!=null&&rdobjmap.containsKey(object)) {
173 ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
175 if (object!=null&&wrobjmap.containsKey(object)) {
176 ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
178 if (object!=null&&objtoinfo.containsKey(object)) {
179 ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
180 if (oi.getOwner()==currentevents[currthread].getThread()) {
184 for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
185 //requeue everyone who was waiting on us and start them back up
186 Event waiter=(Event)waitit.next();
188 waiter.setTime(time);
189 threadinfo[waiter.getThread()].setStall(false);
191 serWake.addPoint(time,waiter.getThread());
192 oi.setOwner(waiter.getThread());
201 /* Initializes things and returns number of transactions */
202 public int startinitial() {
204 for(int i=0;i<e.numThreads();i++) {
205 Transaction trans=e.getThread(i).getTransaction(0);
206 long time=trans.getTime(0);
207 Event ev=new Event(time, trans, 0, i, 0);
210 tcount+=e.getThread(i).numTransactions();
215 public void dosim() {
217 //start first transactions
218 int numtrans=startinitial();
219 System.out.println("Number of transactions="+numtrans);
221 while(!eq.isEmpty()) {
222 Event ev=(Event)eq.poll();
227 Transaction trans=ev.getTransaction();
229 int event=ev.getEvent();
230 long currtime=ev.getTime();
232 if (trans.started&&starttime==-1)
235 if (trans.numEvents()==(event+1)) {
236 tryCommit(ev, trans);
238 if ((tcount%100000)==0)
239 System.out.println("Attempted "+tcount+"transactions "+policy);
241 enqueueEvent(ev, trans);
244 shorttesttime=lasttime;
249 private ObjectInfo getmapping(ObjIndex obj) {
250 if (!objtoinfo.containsKey(obj))
251 objtoinfo.put(obj, new ObjectInfo(this));
252 return (ObjectInfo)objtoinfo.get(obj);
255 public void tryCommit(Event ev, Transaction trans) {
256 //ready to commit this one
257 long currtime=ev.getTime();
258 releaseObjects(trans, ev.getThread(), currtime);
260 //See if we have been flagged as aborted for the lazy case
261 boolean abort=aborted[ev.getThread()];
262 aborted[ev.getThread()]=false;
264 //if it is a transaction, increment commit count
265 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
267 if (serCommit!=null) {
268 serCommit.addPoint(ev.getTime(),ev.getThread());
271 //Reset our backoff counter
272 threadinfo[ev.getThread()].priority=0;
273 threadinfo[ev.getThread()].aborted=false;
274 backoff[ev.getThread()]=BACKOFFSTART;
275 retrycount[ev.getThread()]=0;
276 transferred[ev.getThread()]=0;
278 //abort the other threads
279 for(int i=0;i<trans.numEvents();i++) {
280 ObjIndex object=trans.getObjIndex(i);
281 int op=trans.getEvent(i);
282 //Mark commits to objects
283 if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
285 System.out.println(op);
287 getmapping(object).recordCommit();
289 //Check for threads we might cause to abort
290 if (op==Transaction.WRITE) {
291 HashSet abortset=new HashSet();
292 if (rdobjmap.containsKey(object)) {
293 for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
294 Integer threadid=(Integer)it.next();
295 abortset.add(threadid);
297 ObjectInfo oi=getmapping(object);
302 if (wrobjmap.containsKey(object)) {
303 for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
304 Integer threadid=(Integer)it.next();
305 abortset.add(threadid);
306 if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
307 //if this object hasn't already cause this thread to
308 //abort, then flag it as an abort cause
309 ObjectInfo oi=getmapping(object);
314 for(Iterator abit=abortset.iterator();abit.hasNext();) {
315 Integer threadid=(Integer)abit.next();
316 if (policy==LAZY||policy==LOCK) {
317 //just flag to abort when it trie to commit
318 aborted[threadid]=true;
320 serAbort.addPoint(currtime, threadid);
321 } else if (policy==COMMIT||policy==LOCKCOMMIT) {
322 //abort it immediately
323 reschedule(threadid, currtime, 0);
333 //add next transaction event...could be us if we aborted
334 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
335 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
336 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
337 if (serStart!=null) {
338 if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
339 serStart.addPoint(ev.getTime(),ev.getThread());
343 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
344 currentevents[ev.getThread()]=nev;
349 public Set rdConflictSet(int thread, ObjIndex obj) {
350 if (!wrobjmap.containsKey(obj))
352 HashSet conflictset=new HashSet();
353 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
354 Integer threadid=(Integer)it.next();
355 if (threadid.intValue()!=thread)
356 conflictset.add(threadid);
358 if (conflictset.isEmpty())
364 public Set wrConflictSet(int thread, ObjIndex obj) {
365 HashSet conflictset=new HashSet();
366 if (rdobjmap.containsKey(obj)) {
367 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
368 Integer threadid=(Integer)it.next();
369 if (threadid.intValue()!=thread)
370 conflictset.add(threadid);
373 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
374 Integer threadid=(Integer)it.next();
375 if (threadid.intValue()!=thread)
376 conflictset.add(threadid);
378 if (conflictset.isEmpty())
384 //Takes as parameter -- current transaction read event ev, conflicting
385 //set of threads, and the current time
386 //Returning false causes current transaction not continue to be scheduled
389 public boolean handleConflicts(Event ev, Set threadstokill, long time) {
390 if (policy==RANDOM) {
391 boolean b=r.nextBoolean();
394 int thread=ev.getThread();
395 int dback=backoff[thread]*2;
397 backoff[thread]=dback;
398 stall(ev, time, r.nextInt(backoff[thread]));
401 //abort other transactions
402 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
403 Integer thread=(Integer)thit.next();
404 reschedule(thread, time, 0);
409 } else if (policy==KARMA) {
411 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
412 Integer thread=(Integer)thit.next();
413 if (threadinfo[thread].priority>maxpriority)
414 maxpriority=threadinfo[thread].priority;
416 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
417 //stall for a little while
418 threadinfo[ev.getThread()].priority--;
419 retrycount[ev.getThread()]++;
420 int rtime=r.nextInt(3000);
421 stall(ev, time, rtime);
425 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
426 Integer thread=(Integer)thit.next();
427 int dback=backoff[thread]*2;
429 backoff[thread]=dback;
430 int atime=r.nextInt(backoff[thread]);
431 reschedule(thread, time, atime);
436 } else if (policy==ERUPTION) {
438 //abort other transactions
439 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
440 Integer thread=(Integer)thit.next();
441 if (threadinfo[thread].priority>maxpriority)
442 maxpriority=threadinfo[thread].priority;
444 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
446 threadinfo[ev.getThread()].priority--;
447 //stall for a little while
448 int rtime=r.nextInt(3000);
449 stall(ev, time, rtime);
450 int ourpriority=threadinfo[ev.getThread()].priority;
451 ourpriority-=transferred[ev.getThread()];
452 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
453 Integer thread=(Integer)thit.next();
454 threadinfo[thread].priority+=ourpriority;
456 transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
457 retrycount[ev.getThread()]++;
462 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
463 Integer thread=(Integer)thit.next();
464 int dback=backoff[thread]*2;
466 backoff[thread]=dback;
467 int atime=r.nextInt(backoff[thread]);
468 reschedule(thread, time, atime);
473 } else if (policy==POLITE) {
474 int retry=(++retrycount[ev.getThread()]);
476 retrycount[ev.getThread()]=0;
477 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
478 Integer thread=(Integer)thit.next();
479 reschedule(thread, time, 0);
485 int stalltime=(1<<(retry-1))*12;
488 stall(ev, time, r.nextInt(stalltime));
491 } else if (policy==ATTACK) {
492 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
493 Integer thread=(Integer)thit.next();
494 reschedule(thread, time, r.nextInt(backoff[thread.intValue()]));
495 int dback=backoff[thread.intValue()]*2;
497 backoff[thread.intValue()]=dback;
501 } else if (policy==SUICIDE) {
502 reschedule(ev.getThread(), time, r.nextInt(backoff[ev.getThread()]));
503 int dback=backoff[ev.getThread()]*2;
505 backoff[ev.getThread()]=dback;
508 } else if (policy==TIMESTAMP) {
511 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
512 Integer thread=(Integer)thit.next();
513 Event other=currentevents[thread.intValue()];
514 int eventnum=other.getEvent();
515 long otime=other.getTransaction().getTime(other.getEvent());
516 if (otime>opponenttime)
519 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
521 reschedule(ev.getThread(), time, 0);
526 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
527 Integer thread=(Integer)thit.next();
528 reschedule(thread, time, 0);
533 } else if (policy==THREAD) {
536 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
537 Integer thread=(Integer)thit.next();
538 Event other=currentevents[thread.intValue()];
539 int eventnum=other.getEvent();
540 long otid=thread.intValue();
544 if (ev.getThread()>tid) {
546 reschedule(ev.getThread(), time, 0);
551 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
552 Integer thread=(Integer)thit.next();
553 reschedule(thread, time, 0);
558 } else if (policy==ATTACKTIME) {
559 boolean timebased=false;
560 int tev=ev.getThread();
561 timebased|=threadinfo[tev].aborted;
562 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
563 Integer thread=(Integer)thit.next();
564 timebased|=threadinfo[thread.intValue()].aborted;
569 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
570 Integer thread=(Integer)thit.next();
571 Event other=currentevents[thread.intValue()];
572 int eventnum=other.getEvent();
573 long otime=other.getTransaction().getTime(other.getEvent());
574 if (otime>opponenttime)
577 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
579 reschedule(ev.getThread(), time, 0);
580 threadinfo[ev.getThread()].aborted=true;
585 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
586 Integer thread=(Integer)thit.next();
587 reschedule(thread, time, 0);
588 threadinfo[thread.intValue()].aborted=true;
594 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
595 Integer thread=(Integer)thit.next();
596 reschedule(thread, time, 0);
597 threadinfo[thread.intValue()].aborted=true;
602 } else if (policy==ATTACKTHREAD) {
603 boolean threadbased=false;
604 int tev=ev.getThread();
605 threadbased|=threadinfo[tev].aborted;
606 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
607 Integer thread=(Integer)thit.next();
608 threadbased|=threadinfo[thread.intValue()].aborted;
611 long opponentthr=1000;
613 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
614 Integer thread=(Integer)thit.next();
615 Event other=currentevents[thread.intValue()];
616 int eventnum=other.getEvent();
617 long othr=thread.intValue();
618 if (othr<opponentthr)
621 if (opponentthr<tev) {
623 reschedule(ev.getThread(), time, 0);
624 threadinfo[ev.getThread()].aborted=true;
629 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
630 Integer thread=(Integer)thit.next();
631 reschedule(thread, time, 0);
632 threadinfo[thread.intValue()].aborted=true;
638 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
639 Integer thread=(Integer)thit.next();
640 reschedule(thread, time, 0);
641 threadinfo[thread.intValue()].aborted=true;
652 //Handle current event (read, write, delay) in a transaction and
653 //enqueue the next one
655 public void enqueueEvent(Event ev, Transaction trans) {
656 //just enqueue next event
657 int event=ev.getEvent();
658 long currtime=ev.getTime();
659 ObjIndex object=trans.getObjIndex(event);
660 int operation=trans.getEvent(event);
662 if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
663 ObjectInfo oi=getmapping(object);
666 if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
667 //we're going to wait
668 boolean deadlocked=true;
670 for(int i=0;i<checkdepth;i++) {
671 //check if stalling would close the loop
672 if (toi.getOwner()==ev.getThread())
674 //see if cycle is broken
675 if (!threadinfo[toi.getOwner()].isStalled()) {
679 //follow one more in depth
680 toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
684 //don't wait on stalled threads, we could deadlock
685 threadinfo[ev.getThread()].setStall(true);
686 threadinfo[ev.getThread()].setObjIndex(object);
688 serStall.addPoint(ev.getTime(),ev.getThread());
693 serAvoid.addPoint(ev.getTime(),ev.getThread());
698 oi.setOwner(ev.getThread());
703 //process the current event
704 if (operation==Transaction.READ) {
706 if (!rdobjmap.containsKey(object))
707 rdobjmap.put(object,new HashSet());
708 if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
710 if (countObjects()) {
711 threadinfo[ev.getThread()].priority++;
715 //do eager contention management
716 Set conflicts=rdConflictSet(ev.getThread(), object);
717 if (conflicts!=null) {
718 if (!handleConflicts(ev, conflicts, currtime)) {
719 ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
724 } else if (operation==Transaction.WRITE) {
726 if (!wrobjmap.containsKey(object))
727 wrobjmap.put(object,new HashSet());
728 if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
729 if (countObjects()) {
730 threadinfo[ev.getThread()].priority++;
734 Set conflicts=wrConflictSet(ev.getThread(), object);
735 if (conflicts!=null) {
736 if (!handleConflicts(ev, conflicts, currtime)) {
737 ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
742 } else if (operation==Transaction.BARRIER) {
744 if (barriercount==0) {
745 for(int i=0;i<e.numThreads();i++) {
746 //enqueue the next event
747 Event bev=currentevents[i];
748 int bevent=bev.getEvent();
749 long bcurrtime=bev.getTime();
750 Transaction btrans=bev.getTransaction();
751 long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
752 Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
753 currentevents[bev.getThread()]=nev;
756 barriercount=e.numThreads();
759 //wait until all threads in barrier
763 retrycount[ev.getThread()]=0;
764 transferred[ev.getThread()]=0;
765 //enqueue the next event
766 long deltatime=trans.getTime(event+1)-trans.getTime(event);
767 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
768 currentevents[ev.getThread()]=nev;
773 class Event implements Comparable {
781 public void makeInvalid() {
785 public boolean isValid() {
789 public int getTransNum() {
793 public Transaction getTransaction() {
797 public int getEvent() {
801 public long getTime() {
805 public void setTime(long time) {
809 public int getThread() {
813 public Event(long time, Transaction t, int num, int threadid, int transnum) {
817 this.threadid=threadid;
818 this.transnum=transnum;
822 //break ties to allow commits to occur earliest
823 public int compareTo(Object o) {
825 long delta=time-e.time;
832 if (((getEvent()+1)==getTransaction().numEvents())&&
833 (e.getEvent()+1)!=e.getTransaction().numEvents())
835 if (((getEvent()+1)!=getTransaction().numEvents())&&
836 (e.getEvent()+1)==e.getTransaction().numEvents())