3 public class FlexScheduler {
10 public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
12 this.abortThreshold=abortThreshold;
13 this.abortRatio=abortRatio;
14 this.checkdepth=checkdepth;
17 public FlexScheduler(Executor e, int policy, Plot p) {
19 aborted=new boolean[e.numThreads()];
20 currentevents=new Event[e.numThreads()];
21 rdobjmap=new Hashtable();
22 wrobjmap=new Hashtable();
25 eq=new PriorityQueue();
26 backoff=new int[e.numThreads()];
27 objtoinfo=new Hashtable();
28 threadinfo=new ThreadInfo[e.numThreads()];
29 blocked=new boolean[e.numThreads()];
31 for(int i=0;i<e.numThreads();i++) {
32 backoff[i]=BACKOFFSTART;
33 threadinfo[i]=new ThreadInfo(this);
37 serCommit=p.getSeries("COMMIT");
38 serStart=p.getSeries("START");
39 serAbort=p.getSeries("ABORT");
40 serStall=p.getSeries("STALL");
41 serWake=p.getSeries("WAKE");
42 serAvoid=p.getSeries("AVOIDDEADLOCK");
54 public int getDeadLockCount() {
58 //Where to start the backoff delay at
59 public static final int BACKOFFSTART=1;
62 public static final int LAZY=0;
63 public static final int COMMIT=1;
64 public static final int ATTACK=2;
65 public static final int POLITE=3;
66 public static final int KARMA=4;
67 public static final int LOCK=5;
68 public static final int LOCKCOMMIT=6;
78 Event[] currentevents;
82 ThreadInfo[] threadinfo;
86 public boolean isEager() {
87 return policy==ATTACK||policy==POLITE||policy==KARMA;
90 public boolean isLock() {
91 return policy==LOCK||policy==LOCKCOMMIT;
94 public int getAborts() {
98 public int getCommits() {
102 public int getTime() {
103 return shorttesttime;
106 //Aborts another thread...
107 public void reschedule(int currthread, int time) {
108 currentevents[currthread].makeInvalid();
109 if (threadinfo[currthread].isStalled()) {
110 //remove from waiter list
111 threadinfo[currthread].setStall(false);
112 getmapping(threadinfo[currthread].getObject()).getWaiters().remove(currentevents[currthread]);
114 if (serAbort!=null) {
115 serAbort.addPoint(time, currthread);
117 Transaction trans=currentevents[currthread].getTransaction();
119 releaseObjects(trans, currthread, time);
120 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
121 currentevents[currthread]=nev;
126 private void releaseObjects(Transaction trans, int currthread, int time) {
128 for(int i=0;i<trans.numEvents();i++) {
129 int object=trans.getObject(i);
130 Integer obj=new Integer(object);
131 if (object!=-1&&rdobjmap.containsKey(obj)) {
132 ((Set)rdobjmap.get(obj)).remove(new Integer(currthread));
134 if (object!=-1&&wrobjmap.containsKey(obj)) {
135 ((Set)wrobjmap.get(obj)).remove(new Integer(currthread));
137 if (object!=-1&&objtoinfo.containsKey(obj)) {
138 ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
139 if (oi.getOwner()==currentevents[currthread].getThread()) {
143 for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
144 //requeue everyone who was waiting on us and start them back up
145 Event waiter=(Event)waitit.next();
147 waiter.setTime(time);
148 threadinfo[waiter.getThread()].setStall(false);
150 serWake.addPoint(time,waiter.getThread());
151 oi.setOwner(waiter.getThread());
160 public void startinitial() {
161 for(int i=0;i<e.numThreads();i++) {
162 Transaction trans=e.getThread(i).getTransaction(0);
163 int time=trans.getTime(0);
164 Event ev=new Event(time, trans, 0, i, 0);
170 public void dosim() {
172 //start first transactions
175 while(!eq.isEmpty()) {
176 Event ev=(Event)eq.poll();
181 Transaction trans=ev.getTransaction();
182 int event=ev.getEvent();
183 int currtime=ev.getTime();
186 if (trans.numEvents()==(event+1)) {
187 tryCommit(ev, trans);
189 enqueueEvent(ev, trans);
192 shorttesttime=lasttime;
197 private ObjectInfo getmapping(int object) {
198 Integer obj=new Integer(object);
199 if (!objtoinfo.containsKey(obj))
200 objtoinfo.put(obj, new ObjectInfo(this));
201 return (ObjectInfo)objtoinfo.get(obj);
204 public void tryCommit(Event ev, Transaction trans) {
205 //ready to commit this one
206 int currtime=ev.getTime();
207 releaseObjects(trans, ev.getThread(), currtime);
209 //See if we have been flagged as aborted for the lazy case
210 boolean abort=aborted[ev.getThread()];
211 aborted[ev.getThread()]=false;
213 //if it is a transaction, increment comit count
214 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
216 if (serCommit!=null) {
217 serCommit.addPoint(ev.getTime(),ev.getThread());
220 //Reset our backoff counter
221 backoff[ev.getThread()]=BACKOFFSTART;
223 //abort the other threads
224 for(int i=0;i<trans.numEvents();i++) {
225 int object=trans.getObject(i);
226 int op=trans.getEvent(i);
227 //Mark commits to objects
228 if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
229 getmapping(object).recordCommit();
231 //Check for threads we might cause to abort
232 if (op==Transaction.WRITE) {
233 HashSet abortset=new HashSet();
234 Integer obj=new Integer(object);
235 if (rdobjmap.containsKey(obj)) {
236 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
237 Integer threadid=(Integer)it.next();
238 abortset.add(threadid);
240 ObjectInfo oi=getmapping(object);
245 if (wrobjmap.containsKey(obj)) {
246 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
247 Integer threadid=(Integer)it.next();
248 abortset.add(threadid);
249 if (isLock()&&(!rdobjmap.containsKey(obj)||!((Set)rdobjmap.get(obj)).contains(threadid))) {
250 //if this object hasn't already cause this thread to
251 //abort, then flag it as an abort cause
252 ObjectInfo oi=getmapping(object);
257 for(Iterator abit=abortset.iterator();abit.hasNext();) {
258 Integer threadid=(Integer)abit.next();
259 if (policy==LAZY||policy==LOCK) {
260 //just flag to abort when it trie to commit
261 aborted[threadid]=true;
263 serAbort.addPoint(currtime, threadid);
264 } else if (policy==COMMIT||policy==LOCKCOMMIT) {
265 //abort it immediately
266 reschedule(threadid, currtime);
276 //add next transaction event...could be us if we aborted
277 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
278 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
279 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
280 if (serStart!=null) {
281 if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
282 serStart.addPoint(ev.getTime(),ev.getThread());
286 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
287 currentevents[ev.getThread()]=nev;
292 public Set rdConflictSet(int thread, int object) {
293 Integer obj=new Integer(object);
294 if (!wrobjmap.containsKey(obj))
296 HashSet conflictset=new HashSet();
297 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
298 Integer threadid=(Integer)it.next();
299 if (threadid.intValue()!=thread)
300 conflictset.add(threadid);
302 if (conflictset.isEmpty())
308 public Set wrConflictSet(int thread, int object) {
309 Integer obj=new Integer(object);
311 HashSet conflictset=new HashSet();
312 if (rdobjmap.containsKey(obj)) {
313 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
314 Integer threadid=(Integer)it.next();
315 if (threadid.intValue()!=thread)
316 conflictset.add(threadid);
319 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
320 Integer threadid=(Integer)it.next();
321 if (threadid.intValue()!=thread)
322 conflictset.add(threadid);
324 if (conflictset.isEmpty())
330 //Takes as parameter -- current transaction read event ev, conflicting
331 //set of threads, and the current time
332 //Returning false causes current transaction not continue to be scheduled
334 public boolean handleConflicts(Event ev, Set threadstokill, int time) {
335 if (policy==ATTACK) {
336 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
337 Integer thread=(Integer)thit.next();
338 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
339 backoff[thread.intValue()]*=2;
343 } else if (policy==POLITE) {
344 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
345 backoff[ev.getThread()]*=2;
348 } else if (policy==KARMA) {
351 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
352 Integer thread=(Integer)thit.next();
353 Event other=currentevents[thread.intValue()];
354 int eventnum=other.getEvent();
355 int otime=other.getTransaction().getTime(other.getEvent());
356 if (otime>opponenttime)
359 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
361 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
366 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
367 Integer thread=(Integer)thit.next();
368 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
379 //Handle current event (read, write, delay) in a transaction and
380 //enqueue the next one
382 public void enqueueEvent(Event ev, Transaction trans) {
383 //just enqueue next event
384 int event=ev.getEvent();
385 int currtime=ev.getTime();
386 int object=trans.getObject(event);
387 int operation=trans.getEvent(event);
388 Integer obj=new Integer(object);
390 if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
391 ObjectInfo oi=getmapping(object);
394 if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
395 //we're going to wait
396 boolean deadlocked=true;
398 for(int i=0;i<checkdepth;i++) {
399 //check if stalling would close the loop
400 if (toi.getOwner()==ev.getThread())
402 //see if cycle is broken
403 if (!threadinfo[toi.getOwner()].isStalled()) {
407 //follow one more in depth
408 toi=getmapping(threadinfo[toi.getOwner()].getObject());
412 //don't wait on stalled threads, we could deadlock
413 threadinfo[ev.getThread()].setStall(true);
414 threadinfo[ev.getThread()].setObject(object);
416 serStall.addPoint(ev.getTime(),ev.getThread());
421 serAvoid.addPoint(ev.getTime(),ev.getThread());
426 oi.setOwner(ev.getThread());
431 //process the current event
432 if (operation==Transaction.READ) {
434 if (!rdobjmap.containsKey(obj))
435 rdobjmap.put(obj,new HashSet());
436 ((Set)rdobjmap.get(obj)).add(new Integer(ev.getThread()));
438 //do eager contention management
439 Set conflicts=rdConflictSet(ev.getThread(), object);
440 if (conflicts!=null) {
441 if (!handleConflicts(ev, conflicts, currtime))
445 } else if (operation==Transaction.WRITE) {
447 if (!wrobjmap.containsKey(obj))
448 wrobjmap.put(obj,new HashSet());
449 ((Set)wrobjmap.get(obj)).add(new Integer(ev.getThread()));
451 Set conflicts=wrConflictSet(ev.getThread(), object);
452 if (conflicts!=null) {
453 if (!handleConflicts(ev, conflicts, currtime))
459 //enqueue the next event
460 int deltatime=trans.getTime(event+1)-trans.getTime(event);
461 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
462 currentevents[ev.getThread()]=nev;
467 class Event implements Comparable {
475 public void makeInvalid() {
479 public boolean isValid() {
483 public int getTransNum() {
487 public Transaction getTransaction() {
491 public int getEvent() {
495 public int getTime() {
499 public void setTime(int time) {
503 public int getThread() {
507 public Event(int time, Transaction t, int num, int threadid, int transnum) {
511 this.threadid=threadid;
512 this.transnum=transnum;
516 //break ties to allow commits to occur earliest
517 public int compareTo(Object o) {
519 int delta=time-e.time;
522 if (((getEvent()+1)==getTransaction().numEvents())&&
523 (e.getEvent()+1)!=e.getTransaction().numEvents())
525 if (((getEvent()+1)!=getTransaction().numEvents())&&
526 (e.getEvent()+1)==e.getTransaction().numEvents())