3 public class FlexScheduler extends Thread {
11 public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
13 this.abortThreshold=abortThreshold;
14 this.abortRatio=abortRatio;
15 this.checkdepth=checkdepth;
22 public FlexScheduler(Executor e, int policy, Plot p) {
24 barriercount=e.numThreads();
25 aborted=new boolean[e.numThreads()];
26 currentevents=new Event[e.numThreads()];
27 rdobjmap=new Hashtable();
28 wrobjmap=new Hashtable();
31 eq=new PriorityQueue();
32 backoff=new int[e.numThreads()];
33 retrycount=new int[e.numThreads()];
34 transferred=new int[e.numThreads()];
35 objtoinfo=new Hashtable();
36 threadinfo=new ThreadInfo[e.numThreads()];
37 blocked=new boolean[e.numThreads()];
39 for(int i=0;i<e.numThreads();i++) {
40 backoff[i]=BACKOFFSTART;
41 threadinfo[i]=new ThreadInfo(this);
45 serCommit=p.getSeries("COMMIT");
46 serStart=p.getSeries("START");
47 serAbort=p.getSeries("ABORT");
48 serStall=p.getSeries("STALL");
49 serWake=p.getSeries("WAKE");
50 serAvoid=p.getSeries("AVOIDDEADLOCK");
62 public int getDeadLockCount() {
66 //Where to start the backoff delay at
67 public static final int BACKOFFSTART=1;
70 public static final int LAZY=0;
71 public static final int COMMIT=1;
72 public static final int ATTACK=2;
73 public static final int SUICIDE=3;
74 public static final int TIMESTAMP=4;
75 public static final int LOCK=5;
76 public static final int LOCKCOMMIT=6;
77 public static final int RANDOM=7;
78 public static final int KARMA=8;
79 public static final int POLITE=9;
80 public static final int ERUPTION=10;
91 Event[] currentevents;
97 ThreadInfo[] threadinfo;
101 public boolean isEager() {
102 return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION;
105 public boolean countObjects() {
106 return policy==KARMA||policy==ERUPTION;
109 public boolean isLock() {
110 return policy==LOCK||policy==LOCKCOMMIT;
113 public int getAborts() {
117 public int getCommits() {
121 public long getTime() {
122 return shorttesttime-starttime;
125 //Aborts another thread...
126 public void reschedule(int currthread, long time) {
127 currentevents[currthread].makeInvalid();
128 if (threadinfo[currthread].isStalled()) {
129 //remove from waiter list
130 threadinfo[currthread].setStall(false);
131 getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
133 if (serAbort!=null) {
134 serAbort.addPoint(time, currthread);
136 Transaction trans=currentevents[currthread].getTransaction();
138 releaseObjects(trans, currthread, time);
139 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
140 currentevents[currthread]=nev;
144 //Aborts another thread...
145 public void stall(Event ev, long time) {
151 private void releaseObjects(Transaction trans, int currthread, long time) {
153 for(int i=0;i<trans.numEvents();i++) {
154 ObjIndex object=trans.getObjIndex(i);
156 if (object!=null&&rdobjmap.containsKey(object)) {
157 ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
159 if (object!=null&&wrobjmap.containsKey(object)) {
160 ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
162 if (object!=null&&objtoinfo.containsKey(object)) {
163 ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
164 if (oi.getOwner()==currentevents[currthread].getThread()) {
168 for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
169 //requeue everyone who was waiting on us and start them back up
170 Event waiter=(Event)waitit.next();
172 waiter.setTime(time);
173 threadinfo[waiter.getThread()].setStall(false);
175 serWake.addPoint(time,waiter.getThread());
176 oi.setOwner(waiter.getThread());
185 /* Initializes things and returns number of transactions */
186 public int startinitial() {
188 for(int i=0;i<e.numThreads();i++) {
189 Transaction trans=e.getThread(i).getTransaction(0);
190 long time=trans.getTime(0);
191 Event ev=new Event(time, trans, 0, i, 0);
194 tcount+=e.getThread(i).numTransactions();
199 public void dosim() {
201 //start first transactions
202 int numtrans=startinitial();
203 System.out.println("Number of transactions="+numtrans);
205 while(!eq.isEmpty()) {
206 Event ev=(Event)eq.poll();
211 Transaction trans=ev.getTransaction();
213 int event=ev.getEvent();
214 long currtime=ev.getTime();
216 if (trans.started&&starttime==-1)
219 if (trans.numEvents()==(event+1)) {
220 tryCommit(ev, trans);
222 if ((tcount%100000)==0)
223 System.out.println("Attempted "+tcount+"transactions "+policy);
225 enqueueEvent(ev, trans);
228 shorttesttime=lasttime;
233 private ObjectInfo getmapping(ObjIndex obj) {
234 if (!objtoinfo.containsKey(obj))
235 objtoinfo.put(obj, new ObjectInfo(this));
236 return (ObjectInfo)objtoinfo.get(obj);
239 public void tryCommit(Event ev, Transaction trans) {
240 //ready to commit this one
241 long currtime=ev.getTime();
242 releaseObjects(trans, ev.getThread(), currtime);
244 //See if we have been flagged as aborted for the lazy case
245 boolean abort=aborted[ev.getThread()];
246 aborted[ev.getThread()]=false;
248 //if it is a transaction, increment commit count
249 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
251 if (serCommit!=null) {
252 serCommit.addPoint(ev.getTime(),ev.getThread());
255 //Reset our backoff counter
256 threadinfo[ev.getThread()].priority=0;
257 backoff[ev.getThread()]=BACKOFFSTART;
258 retrycount[ev.getThread()]=0;
259 transferred[ev.getThread()]=0;
261 //abort the other threads
262 for(int i=0;i<trans.numEvents();i++) {
263 ObjIndex object=trans.getObjIndex(i);
264 int op=trans.getEvent(i);
265 //Mark commits to objects
266 if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
268 System.out.println(op);
270 getmapping(object).recordCommit();
272 //Check for threads we might cause to abort
273 if (op==Transaction.WRITE) {
274 HashSet abortset=new HashSet();
275 if (rdobjmap.containsKey(object)) {
276 for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
277 Integer threadid=(Integer)it.next();
278 abortset.add(threadid);
280 ObjectInfo oi=getmapping(object);
285 if (wrobjmap.containsKey(object)) {
286 for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
287 Integer threadid=(Integer)it.next();
288 abortset.add(threadid);
289 if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
290 //if this object hasn't already cause this thread to
291 //abort, then flag it as an abort cause
292 ObjectInfo oi=getmapping(object);
297 for(Iterator abit=abortset.iterator();abit.hasNext();) {
298 Integer threadid=(Integer)abit.next();
299 if (policy==LAZY||policy==LOCK) {
300 //just flag to abort when it trie to commit
301 aborted[threadid]=true;
303 serAbort.addPoint(currtime, threadid);
304 } else if (policy==COMMIT||policy==LOCKCOMMIT) {
305 //abort it immediately
306 reschedule(threadid, currtime);
316 //add next transaction event...could be us if we aborted
317 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
318 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
319 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
320 if (serStart!=null) {
321 if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
322 serStart.addPoint(ev.getTime(),ev.getThread());
326 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
327 currentevents[ev.getThread()]=nev;
332 public Set rdConflictSet(int thread, ObjIndex obj) {
333 if (!wrobjmap.containsKey(obj))
335 HashSet conflictset=new HashSet();
336 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
337 Integer threadid=(Integer)it.next();
338 if (threadid.intValue()!=thread)
339 conflictset.add(threadid);
341 if (conflictset.isEmpty())
347 public Set wrConflictSet(int thread, ObjIndex obj) {
348 HashSet conflictset=new HashSet();
349 if (rdobjmap.containsKey(obj)) {
350 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
351 Integer threadid=(Integer)it.next();
352 if (threadid.intValue()!=thread)
353 conflictset.add(threadid);
356 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
357 Integer threadid=(Integer)it.next();
358 if (threadid.intValue()!=thread)
359 conflictset.add(threadid);
361 if (conflictset.isEmpty())
367 //Takes as parameter -- current transaction read event ev, conflicting
368 //set of threads, and the current time
369 //Returning false causes current transaction not continue to be scheduled
371 public boolean handleConflicts(Event ev, Set threadstokill, long time) {
372 if (policy==RANDOM) {
373 boolean b=r.nextBoolean();
376 int thread=ev.getThread();
377 int dback=backoff[thread]*2;
379 backoff[thread]=dback;
380 stall(ev, time+r.nextInt(backoff[thread]));
383 //abort other transactions
384 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
385 Integer thread=(Integer)thit.next();
386 reschedule(thread, time);
391 } else if (policy==KARMA) {
393 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
394 Integer thread=(Integer)thit.next();
395 if (threadinfo[thread].priority>maxpriority)
396 maxpriority=threadinfo[thread].priority;
398 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
399 //stall for a little while
400 threadinfo[ev.getThread()].priority--;
401 retrycount[ev.getThread()]++;
402 stall(ev, time+r.nextInt(3000));
406 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
407 Integer thread=(Integer)thit.next();
408 int dback=backoff[thread]*2;
410 backoff[thread]=dback;
411 reschedule(thread, time+r.nextInt(backoff[thread]));
416 } else if (policy==ERUPTION) {
418 //abort other transactions
419 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
420 Integer thread=(Integer)thit.next();
421 if (threadinfo[thread].priority>maxpriority)
422 maxpriority=threadinfo[thread].priority;
424 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
426 threadinfo[ev.getThread()].priority--;
427 //stall for a little while
428 stall(ev, time+r.nextInt(3000));
429 int ourpriority=threadinfo[ev.getThread()].priority;
430 ourpriority-=transferred[ev.getThread()];
431 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
432 Integer thread=(Integer)thit.next();
433 threadinfo[thread].priority+=ourpriority;
435 transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
436 retrycount[ev.getThread()]++;
441 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
442 Integer thread=(Integer)thit.next();
443 int dback=backoff[thread]*2;
445 backoff[thread]=dback;
446 reschedule(thread, time+r.nextInt(backoff[thread]));
451 } else if (policy==POLITE) {
452 int retry=(++retrycount[ev.getThread()]);
454 retrycount[ev.getThread()]=0;
455 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
456 Integer thread=(Integer)thit.next();
457 reschedule(thread, time);
463 int stalltime=(1<<(retry-1))*12;
466 stall(ev, time+r.nextInt(stalltime));
469 } else if (policy==ATTACK) {
470 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
471 Integer thread=(Integer)thit.next();
472 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
473 int dback=backoff[thread.intValue()]*2;
475 backoff[thread.intValue()]=dback;
479 } else if (policy==SUICIDE) {
480 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
481 int dback=backoff[ev.getThread()]*2;
483 backoff[ev.getThread()]=dback;
486 } else if (policy==TIMESTAMP) {
489 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
490 Integer thread=(Integer)thit.next();
491 Event other=currentevents[thread.intValue()];
492 int eventnum=other.getEvent();
493 long otime=other.getTransaction().getTime(other.getEvent());
494 if (otime>opponenttime)
497 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
499 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
504 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
505 Integer thread=(Integer)thit.next();
506 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
517 //Handle current event (read, write, delay) in a transaction and
518 //enqueue the next one
520 public void enqueueEvent(Event ev, Transaction trans) {
521 //just enqueue next event
522 int event=ev.getEvent();
523 long currtime=ev.getTime();
524 ObjIndex object=trans.getObjIndex(event);
525 int operation=trans.getEvent(event);
527 if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
528 ObjectInfo oi=getmapping(object);
531 if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
532 //we're going to wait
533 boolean deadlocked=true;
535 for(int i=0;i<checkdepth;i++) {
536 //check if stalling would close the loop
537 if (toi.getOwner()==ev.getThread())
539 //see if cycle is broken
540 if (!threadinfo[toi.getOwner()].isStalled()) {
544 //follow one more in depth
545 toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
549 //don't wait on stalled threads, we could deadlock
550 threadinfo[ev.getThread()].setStall(true);
551 threadinfo[ev.getThread()].setObjIndex(object);
553 serStall.addPoint(ev.getTime(),ev.getThread());
558 serAvoid.addPoint(ev.getTime(),ev.getThread());
563 oi.setOwner(ev.getThread());
568 //process the current event
569 if (operation==Transaction.READ) {
571 if (!rdobjmap.containsKey(object))
572 rdobjmap.put(object,new HashSet());
573 if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
575 if (countObjects()) {
576 threadinfo[ev.getThread()].priority++;
580 //do eager contention management
581 Set conflicts=rdConflictSet(ev.getThread(), object);
582 if (conflicts!=null) {
583 if (!handleConflicts(ev, conflicts, currtime)) {
584 ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
589 } else if (operation==Transaction.WRITE) {
591 if (!wrobjmap.containsKey(object))
592 wrobjmap.put(object,new HashSet());
593 if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
594 if (countObjects()) {
595 threadinfo[ev.getThread()].priority++;
599 Set conflicts=wrConflictSet(ev.getThread(), object);
600 if (conflicts!=null) {
601 if (!handleConflicts(ev, conflicts, currtime)) {
602 ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
607 } else if (operation==Transaction.BARRIER) {
609 if (barriercount==0) {
610 for(int i=0;i<e.numThreads();i++) {
611 //enqueue the next event
612 Event bev=currentevents[i];
613 int bevent=bev.getEvent();
614 long bcurrtime=bev.getTime();
615 Transaction btrans=bev.getTransaction();
616 long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
617 Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
618 currentevents[bev.getThread()]=nev;
621 barriercount=e.numThreads();
624 //wait until all threads in barrier
628 retrycount[ev.getThread()]=0;
629 transferred[ev.getThread()]=0;
630 //enqueue the next event
631 long deltatime=trans.getTime(event+1)-trans.getTime(event);
632 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
633 currentevents[ev.getThread()]=nev;
638 class Event implements Comparable {
646 public void makeInvalid() {
650 public boolean isValid() {
654 public int getTransNum() {
658 public Transaction getTransaction() {
662 public int getEvent() {
666 public long getTime() {
670 public void setTime(long time) {
674 public int getThread() {
678 public Event(long time, Transaction t, int num, int threadid, int transnum) {
682 this.threadid=threadid;
683 this.transnum=transnum;
687 //break ties to allow commits to occur earliest
688 public int compareTo(Object o) {
690 long delta=time-e.time;
697 if (((getEvent()+1)==getTransaction().numEvents())&&
698 (e.getEvent()+1)!=e.getTransaction().numEvents())
700 if (((getEvent()+1)!=getTransaction().numEvents())&&
701 (e.getEvent()+1)==e.getTransaction().numEvents())