3 public class FlexScheduler extends Thread {
11 public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
13 this.abortThreshold=abortThreshold;
14 this.abortRatio=abortRatio;
15 this.checkdepth=checkdepth;
22 public FlexScheduler(Executor e, int policy, Plot p) {
24 barriercount=e.numThreads();
25 aborted=new boolean[e.numThreads()];
26 currentevents=new Event[e.numThreads()];
27 rdobjmap=new Hashtable();
28 wrobjmap=new Hashtable();
31 eq=new PriorityQueue();
32 backoff=new int[e.numThreads()];
33 retrycount=new int[e.numThreads()];
34 transferred=new int[e.numThreads()];
35 objtoinfo=new Hashtable();
36 threadinfo=new ThreadInfo[e.numThreads()];
37 blocked=new boolean[e.numThreads()];
39 for(int i=0;i<e.numThreads();i++) {
40 backoff[i]=BACKOFFSTART;
41 threadinfo[i]=new ThreadInfo(this);
45 serCommit=p.getSeries("COMMIT");
46 serStart=p.getSeries("START");
47 serAbort=p.getSeries("ABORT");
48 serStall=p.getSeries("STALL");
49 serWake=p.getSeries("WAKE");
50 serAvoid=p.getSeries("AVOIDDEADLOCK");
62 public int getDeadLockCount() {
66 //Where to start the backoff delay at
67 public static final int BACKOFFSTART=1;
70 public static final int LAZY=0;
71 public static final int COMMIT=1;
72 public static final int ATTACK=2;
73 public static final int SUICIDE=3;
74 public static final int TIMESTAMP=4;
75 public static final int LOCK=5;
76 public static final int LOCKCOMMIT=6;
77 public static final int RANDOM=7;
78 public static final int KARMA=8;
79 public static final int POLITE=9;
80 public static final int ERUPTION=10;
81 public static final int THREAD=11;
92 Event[] currentevents;
98 ThreadInfo[] threadinfo;
102 public boolean isEager() {
103 return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION||policy==THREAD;
106 public boolean countObjects() {
107 return policy==KARMA||policy==ERUPTION;
110 public boolean isLock() {
111 return policy==LOCK||policy==LOCKCOMMIT;
114 public int getAborts() {
118 public int getCommits() {
122 public long getTime() {
123 return shorttesttime-starttime;
126 //Aborts another thread...
127 public void reschedule(int currthread, long time) {
128 currentevents[currthread].makeInvalid();
129 if (threadinfo[currthread].isStalled()) {
130 //remove from waiter list
131 threadinfo[currthread].setStall(false);
132 getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
134 if (serAbort!=null) {
135 serAbort.addPoint(time, currthread);
137 Transaction trans=currentevents[currthread].getTransaction();
139 releaseObjects(trans, currthread, time);
140 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
141 currentevents[currthread]=nev;
145 //Aborts another thread...
146 public void stall(Event ev, long time) {
152 private void releaseObjects(Transaction trans, int currthread, long time) {
154 for(int i=0;i<trans.numEvents();i++) {
155 ObjIndex object=trans.getObjIndex(i);
157 if (object!=null&&rdobjmap.containsKey(object)) {
158 ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
160 if (object!=null&&wrobjmap.containsKey(object)) {
161 ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
163 if (object!=null&&objtoinfo.containsKey(object)) {
164 ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
165 if (oi.getOwner()==currentevents[currthread].getThread()) {
169 for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
170 //requeue everyone who was waiting on us and start them back up
171 Event waiter=(Event)waitit.next();
173 waiter.setTime(time);
174 threadinfo[waiter.getThread()].setStall(false);
176 serWake.addPoint(time,waiter.getThread());
177 oi.setOwner(waiter.getThread());
186 /* Initializes things and returns number of transactions */
187 public int startinitial() {
189 for(int i=0;i<e.numThreads();i++) {
190 Transaction trans=e.getThread(i).getTransaction(0);
191 long time=trans.getTime(0);
192 Event ev=new Event(time, trans, 0, i, 0);
195 tcount+=e.getThread(i).numTransactions();
200 public void dosim() {
202 //start first transactions
203 int numtrans=startinitial();
204 System.out.println("Number of transactions="+numtrans);
206 while(!eq.isEmpty()) {
207 Event ev=(Event)eq.poll();
212 Transaction trans=ev.getTransaction();
214 int event=ev.getEvent();
215 long currtime=ev.getTime();
217 if (trans.started&&starttime==-1)
220 if (trans.numEvents()==(event+1)) {
221 tryCommit(ev, trans);
223 if ((tcount%100000)==0)
224 System.out.println("Attempted "+tcount+"transactions "+policy);
226 enqueueEvent(ev, trans);
229 shorttesttime=lasttime;
234 private ObjectInfo getmapping(ObjIndex obj) {
235 if (!objtoinfo.containsKey(obj))
236 objtoinfo.put(obj, new ObjectInfo(this));
237 return (ObjectInfo)objtoinfo.get(obj);
240 public void tryCommit(Event ev, Transaction trans) {
241 //ready to commit this one
242 long currtime=ev.getTime();
243 releaseObjects(trans, ev.getThread(), currtime);
245 //See if we have been flagged as aborted for the lazy case
246 boolean abort=aborted[ev.getThread()];
247 aborted[ev.getThread()]=false;
249 //if it is a transaction, increment commit count
250 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
252 if (serCommit!=null) {
253 serCommit.addPoint(ev.getTime(),ev.getThread());
256 //Reset our backoff counter
257 threadinfo[ev.getThread()].priority=0;
258 backoff[ev.getThread()]=BACKOFFSTART;
259 retrycount[ev.getThread()]=0;
260 transferred[ev.getThread()]=0;
262 //abort the other threads
263 for(int i=0;i<trans.numEvents();i++) {
264 ObjIndex object=trans.getObjIndex(i);
265 int op=trans.getEvent(i);
266 //Mark commits to objects
267 if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
269 System.out.println(op);
271 getmapping(object).recordCommit();
273 //Check for threads we might cause to abort
274 if (op==Transaction.WRITE) {
275 HashSet abortset=new HashSet();
276 if (rdobjmap.containsKey(object)) {
277 for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
278 Integer threadid=(Integer)it.next();
279 abortset.add(threadid);
281 ObjectInfo oi=getmapping(object);
286 if (wrobjmap.containsKey(object)) {
287 for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
288 Integer threadid=(Integer)it.next();
289 abortset.add(threadid);
290 if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
291 //if this object hasn't already cause this thread to
292 //abort, then flag it as an abort cause
293 ObjectInfo oi=getmapping(object);
298 for(Iterator abit=abortset.iterator();abit.hasNext();) {
299 Integer threadid=(Integer)abit.next();
300 if (policy==LAZY||policy==LOCK) {
301 //just flag to abort when it trie to commit
302 aborted[threadid]=true;
304 serAbort.addPoint(currtime, threadid);
305 } else if (policy==COMMIT||policy==LOCKCOMMIT) {
306 //abort it immediately
307 reschedule(threadid, currtime);
317 //add next transaction event...could be us if we aborted
318 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
319 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
320 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
321 if (serStart!=null) {
322 if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
323 serStart.addPoint(ev.getTime(),ev.getThread());
327 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
328 currentevents[ev.getThread()]=nev;
333 public Set rdConflictSet(int thread, ObjIndex obj) {
334 if (!wrobjmap.containsKey(obj))
336 HashSet conflictset=new HashSet();
337 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
338 Integer threadid=(Integer)it.next();
339 if (threadid.intValue()!=thread)
340 conflictset.add(threadid);
342 if (conflictset.isEmpty())
348 public Set wrConflictSet(int thread, ObjIndex obj) {
349 HashSet conflictset=new HashSet();
350 if (rdobjmap.containsKey(obj)) {
351 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
352 Integer threadid=(Integer)it.next();
353 if (threadid.intValue()!=thread)
354 conflictset.add(threadid);
357 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
358 Integer threadid=(Integer)it.next();
359 if (threadid.intValue()!=thread)
360 conflictset.add(threadid);
362 if (conflictset.isEmpty())
368 //Takes as parameter -- current transaction read event ev, conflicting
369 //set of threads, and the current time
370 //Returning false causes current transaction not continue to be scheduled
375 public boolean handleConflicts(Event ev, Set threadstokill, long time) {
376 if (policy==RANDOM) {
377 boolean b=r.nextBoolean();
380 int thread=ev.getThread();
381 int dback=backoff[thread]*2;
383 backoff[thread]=dback;
384 stall(ev, time+r.nextInt(backoff[thread]));
387 //abort other transactions
388 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
389 Integer thread=(Integer)thit.next();
390 reschedule(thread, time);
395 } else if (policy==KARMA) {
397 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
398 Integer thread=(Integer)thit.next();
399 if (threadinfo[thread].priority>maxpriority)
400 maxpriority=threadinfo[thread].priority;
402 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
403 //stall for a little while
404 threadinfo[ev.getThread()].priority--;
405 retrycount[ev.getThread()]++;
406 int rtime=r.nextInt(3000);
407 stall(ev, time+rtime);
412 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
413 Integer thread=(Integer)thit.next();
414 int dback=backoff[thread]*2;
416 backoff[thread]=dback;
417 int atime=r.nextInt(backoff[thread]);
418 reschedule(thread, time+atime);
424 } else if (policy==ERUPTION) {
426 //abort other transactions
427 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
428 Integer thread=(Integer)thit.next();
429 if (threadinfo[thread].priority>maxpriority)
430 maxpriority=threadinfo[thread].priority;
432 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
434 threadinfo[ev.getThread()].priority--;
435 //stall for a little while
436 int rtime=r.nextInt(3000);
437 stall(ev, time+rtime);
439 int ourpriority=threadinfo[ev.getThread()].priority;
440 ourpriority-=transferred[ev.getThread()];
441 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
442 Integer thread=(Integer)thit.next();
443 threadinfo[thread].priority+=ourpriority;
445 transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
446 retrycount[ev.getThread()]++;
451 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
452 Integer thread=(Integer)thit.next();
453 int dback=backoff[thread]*2;
455 backoff[thread]=dback;
456 int atime=r.nextInt(backoff[thread]);
457 reschedule(thread, time+atime);
463 } else if (policy==POLITE) {
464 int retry=(++retrycount[ev.getThread()]);
466 retrycount[ev.getThread()]=0;
467 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
468 Integer thread=(Integer)thit.next();
469 reschedule(thread, time);
475 int stalltime=(1<<(retry-1))*12;
478 stall(ev, time+r.nextInt(stalltime));
481 } else if (policy==ATTACK) {
482 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
483 Integer thread=(Integer)thit.next();
484 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
485 int dback=backoff[thread.intValue()]*2;
487 backoff[thread.intValue()]=dback;
491 } else if (policy==SUICIDE) {
492 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
493 int dback=backoff[ev.getThread()]*2;
495 backoff[ev.getThread()]=dback;
498 } else if (policy==TIMESTAMP) {
501 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
502 Integer thread=(Integer)thit.next();
503 Event other=currentevents[thread.intValue()];
504 int eventnum=other.getEvent();
505 long otime=other.getTransaction().getTime(other.getEvent());
506 if (otime>opponenttime)
509 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
511 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
516 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
517 Integer thread=(Integer)thit.next();
518 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
523 } else if (policy==TIMESTAMP) {
526 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
527 Integer thread=(Integer)thit.next();
528 Event other=currentevents[thread.intValue()];
529 int eventnum=other.getEvent();
530 long otime=other.getTransaction().getTime(other.getEvent());
531 if (otime>opponenttime)
534 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
536 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
541 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
542 Integer thread=(Integer)thit.next();
543 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
548 } else if (policy==THREAD) {
551 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
552 Integer thread=(Integer)thit.next();
553 Event other=currentevents[thread.intValue()];
554 int eventnum=other.getEvent();
555 long otid=thread.intValue();
559 if (ev.getThread()>tid) {
561 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
566 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
567 Integer thread=(Integer)thit.next();
568 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
580 //Handle current event (read, write, delay) in a transaction and
581 //enqueue the next one
583 public void enqueueEvent(Event ev, Transaction trans) {
584 //just enqueue next event
585 int event=ev.getEvent();
586 long currtime=ev.getTime();
587 ObjIndex object=trans.getObjIndex(event);
588 int operation=trans.getEvent(event);
590 if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
591 ObjectInfo oi=getmapping(object);
594 if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
595 //we're going to wait
596 boolean deadlocked=true;
598 for(int i=0;i<checkdepth;i++) {
599 //check if stalling would close the loop
600 if (toi.getOwner()==ev.getThread())
602 //see if cycle is broken
603 if (!threadinfo[toi.getOwner()].isStalled()) {
607 //follow one more in depth
608 toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
612 //don't wait on stalled threads, we could deadlock
613 threadinfo[ev.getThread()].setStall(true);
614 threadinfo[ev.getThread()].setObjIndex(object);
616 serStall.addPoint(ev.getTime(),ev.getThread());
621 serAvoid.addPoint(ev.getTime(),ev.getThread());
626 oi.setOwner(ev.getThread());
631 //process the current event
632 if (operation==Transaction.READ) {
634 if (!rdobjmap.containsKey(object))
635 rdobjmap.put(object,new HashSet());
636 if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
638 if (countObjects()) {
639 threadinfo[ev.getThread()].priority++;
643 //do eager contention management
644 Set conflicts=rdConflictSet(ev.getThread(), object);
645 if (conflicts!=null) {
646 if (!handleConflicts(ev, conflicts, currtime)) {
647 ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
652 } else if (operation==Transaction.WRITE) {
654 if (!wrobjmap.containsKey(object))
655 wrobjmap.put(object,new HashSet());
656 if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
657 if (countObjects()) {
658 threadinfo[ev.getThread()].priority++;
662 Set conflicts=wrConflictSet(ev.getThread(), object);
663 if (conflicts!=null) {
664 if (!handleConflicts(ev, conflicts, currtime)) {
665 ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
670 } else if (operation==Transaction.BARRIER) {
672 if (barriercount==0) {
673 for(int i=0;i<e.numThreads();i++) {
674 //enqueue the next event
675 Event bev=currentevents[i];
676 int bevent=bev.getEvent();
677 long bcurrtime=bev.getTime();
678 Transaction btrans=bev.getTransaction();
679 long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
680 Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
681 currentevents[bev.getThread()]=nev;
684 barriercount=e.numThreads();
687 //wait until all threads in barrier
691 retrycount[ev.getThread()]=0;
692 transferred[ev.getThread()]=0;
693 //enqueue the next event
694 long deltatime=trans.getTime(event+1)-trans.getTime(event);
695 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
696 currentevents[ev.getThread()]=nev;
701 class Event implements Comparable {
709 public void makeInvalid() {
713 public boolean isValid() {
717 public int getTransNum() {
721 public Transaction getTransaction() {
725 public int getEvent() {
729 public long getTime() {
733 public void setTime(long time) {
737 public int getThread() {
741 public Event(long time, Transaction t, int num, int threadid, int transnum) {
745 this.threadid=threadid;
746 this.transnum=transnum;
750 //break ties to allow commits to occur earliest
751 public int compareTo(Object o) {
753 long delta=time-e.time;
760 if (((getEvent()+1)==getTransaction().numEvents())&&
761 (e.getEvent()+1)!=e.getTransaction().numEvents())
763 if (((getEvent()+1)!=getTransaction().numEvents())&&
764 (e.getEvent()+1)==e.getTransaction().numEvents())