updates to simulator
[IRC.git] / Robust / TransSim / FlexScheduler.java
1 import java.util.*;
2
3 public class FlexScheduler extends Thread {
4   Executor e;
5   int abortThreshold;
6   int abortRatio;
7   int deadlockcount;
8   int checkdepth;
9   int barriercount;
10
11   public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
12     this(e, policy, p);
13     this.abortThreshold=abortThreshold;
14     this.abortRatio=abortRatio;
15     this.checkdepth=checkdepth;
16   }
17   
18   public void run() {
19     dosim();
20   }
21
22   public FlexScheduler(Executor e, int policy, Plot p) {
23     this.e=e;
24     barriercount=e.numThreads();
25     aborted=new boolean[e.numThreads()];
26     currentevents=new Event[e.numThreads()];
27     rdobjmap=new Hashtable();
28     wrobjmap=new Hashtable();
29     this.policy=policy;
30     r=new Random(100);
31     eq=new PriorityQueue();
32     backoff=new int[e.numThreads()];
33     objtoinfo=new Hashtable();
34     threadinfo=new ThreadInfo[e.numThreads()];
35     blocked=new boolean[e.numThreads()];
36
37     for(int i=0;i<e.numThreads();i++) {
38       backoff[i]=BACKOFFSTART;
39       threadinfo[i]=new ThreadInfo(this);
40     }
41     this.p=p;
42     if (p!=null) {
43       serCommit=p.getSeries("COMMIT");
44       serStart=p.getSeries("START");
45       serAbort=p.getSeries("ABORT");
46       serStall=p.getSeries("STALL");
47       serWake=p.getSeries("WAKE");
48       serAvoid=p.getSeries("AVOIDDEADLOCK");
49     }
50   }
51
52   Plot p;
53   Series serCommit;
54   Series serStart;
55   Series serAbort;
56   Series serStall;
57   Series serAvoid;
58   Series serWake;
59
60   public int getDeadLockCount() {
61     return deadlockcount;
62   }
63
64   //Where to start the backoff delay at
65   public static final int BACKOFFSTART=1;
66
67   //Commit options
68   public static final int LAZY=0;
69   public static final int COMMIT=1;
70   public static final int ATTACK=2;
71   public static final int POLITE=3;
72   public static final int KARMA=4;
73   public static final int LOCK=5;
74   public static final int LOCKCOMMIT=6;
75
76   PriorityQueue eq;
77   int policy;
78   boolean[] aborted;
79   long shorttesttime;
80   long starttime=-1;
81   Hashtable rdobjmap;
82   Hashtable wrobjmap;
83   int abortcount;
84   int commitcount;
85   Event[] currentevents;
86   Random r;
87   int[] backoff;
88   Hashtable objtoinfo;
89   ThreadInfo[] threadinfo;
90   
91   boolean[] blocked;
92
93   public boolean isEager() {
94     return policy==ATTACK||policy==POLITE||policy==KARMA;
95   }
96
97   public boolean isLock() {
98     return policy==LOCK||policy==LOCKCOMMIT;
99   }
100
101   public int getAborts() {
102     return abortcount;
103   }
104
105   public int getCommits() {
106     return commitcount;
107   }
108
109   public long getTime() {
110     return shorttesttime-starttime;
111   }
112
113   //Aborts another thread...
114   public void reschedule(int currthread, long time) {
115     currentevents[currthread].makeInvalid();
116     if (threadinfo[currthread].isStalled()) {
117       //remove from waiter list
118       threadinfo[currthread].setStall(false);
119       getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
120     }
121     if (serAbort!=null) {
122       serAbort.addPoint(time, currthread);
123     }
124     Transaction trans=currentevents[currthread].getTransaction();
125     
126     releaseObjects(trans, currthread, time);
127     Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
128     currentevents[currthread]=nev;
129     eq.add(nev);
130   }
131
132
133   private void releaseObjects(Transaction trans, int currthread, long time) {
134     //remove all events
135     for(int i=0;i<trans.numEvents();i++) {
136       ObjIndex object=trans.getObjIndex(i);
137
138       if (object!=null&&rdobjmap.containsKey(object)) {
139         ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
140       }
141       if (object!=null&&wrobjmap.containsKey(object)) {
142         ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
143       }
144       if (object!=null&&objtoinfo.containsKey(object)) {
145         ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
146         if (oi.getOwner()==currentevents[currthread].getThread()) {
147           oi.releaseOwner();
148           
149           //wake up one waiter
150           for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
151             //requeue everyone who was waiting on us and start them back up
152             Event waiter=(Event)waitit.next();
153             waitit.remove();
154             waiter.setTime(time);
155             threadinfo[waiter.getThread()].setStall(false);
156             if (serWake!=null)
157               serWake.addPoint(time,waiter.getThread());
158             oi.setOwner(waiter.getThread());
159             eq.add(waiter);
160             break;
161           }
162         }
163       }
164     }
165   }
166
167   /* Initializes things and returns number of transactions */
168   public int startinitial() {
169     int tcount=0;
170     for(int i=0;i<e.numThreads();i++) {
171       Transaction trans=e.getThread(i).getTransaction(0);
172       long time=trans.getTime(0);
173       Event ev=new Event(time, trans, 0, i, 0);
174       currentevents[i]=ev;
175       eq.add(ev);
176       tcount+=e.getThread(i).numTransactions();
177     }
178     return tcount;
179   }
180
181   public void dosim() {
182     long lasttime=0;
183     //start first transactions
184     int numtrans=startinitial();
185     System.out.println("Number of transactions="+numtrans);
186     int tcount=0;
187     while(!eq.isEmpty()) {
188       Event ev=(Event)eq.poll();
189       if (!ev.isValid()) {
190         continue;
191       }
192
193       Transaction trans=ev.getTransaction();
194
195       int event=ev.getEvent();
196       long currtime=ev.getTime();
197       lasttime=currtime;
198       if (trans.started&&starttime==-1)
199         starttime=currtime;
200
201       if (trans.numEvents()==(event+1)) {
202         tryCommit(ev, trans);
203         tcount++;
204         if ((tcount%100000)==0)
205           System.out.println("Attempted "+tcount+"transactions");
206       } else {
207         enqueueEvent(ev, trans);
208       }
209     }
210     shorttesttime=lasttime;
211     if (p!=null)
212       p.close();
213   }
214
215   private ObjectInfo getmapping(ObjIndex obj) {
216     if (!objtoinfo.containsKey(obj))
217       objtoinfo.put(obj, new ObjectInfo(this));
218     return (ObjectInfo)objtoinfo.get(obj);
219   }
220
221   public void tryCommit(Event ev, Transaction trans) {
222     //ready to commit this one
223     long currtime=ev.getTime();
224     releaseObjects(trans, ev.getThread(), currtime);
225     
226     //See if we have been flagged as aborted for the lazy case
227     boolean abort=aborted[ev.getThread()];
228     aborted[ev.getThread()]=false;
229     if (!abort) {
230       //if it is a transaction, increment comit count
231       if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
232         commitcount++;
233         if (serCommit!=null) {
234           serCommit.addPoint(ev.getTime(),ev.getThread());
235         }
236       }
237       //Reset our backoff counter
238       backoff[ev.getThread()]=BACKOFFSTART;
239
240       //abort the other threads
241       for(int i=0;i<trans.numEvents();i++) {
242         ObjIndex object=trans.getObjIndex(i);
243         int op=trans.getEvent(i);
244         //Mark commits to objects
245         if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
246           if (object==null) {
247             System.out.println(op);
248           }
249           getmapping(object).recordCommit();
250         }
251         //Check for threads we might cause to abort
252         if (op==Transaction.WRITE) {
253           HashSet abortset=new HashSet();
254           if (rdobjmap.containsKey(object)) {
255             for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
256               Integer threadid=(Integer)it.next();
257               abortset.add(threadid);
258               if (isLock()) {
259                 ObjectInfo oi=getmapping(object);
260                 oi.recordAbort();
261               }
262             }
263           }
264           if (wrobjmap.containsKey(object)) {
265             for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
266               Integer threadid=(Integer)it.next();
267               abortset.add(threadid);
268               if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
269                 //if this object hasn't already cause this thread to
270                 //abort, then flag it as an abort cause
271                 ObjectInfo oi=getmapping(object);
272                 oi.recordAbort();
273               }
274             }
275           }
276           for(Iterator abit=abortset.iterator();abit.hasNext();) {
277             Integer threadid=(Integer)abit.next();
278             if (policy==LAZY||policy==LOCK) {
279               //just flag to abort when it trie to commit
280               aborted[threadid]=true;
281               if (serAbort!=null)
282                 serAbort.addPoint(currtime, threadid);
283             } else if (policy==COMMIT||policy==LOCKCOMMIT) {
284               //abort it immediately
285               reschedule(threadid, currtime);
286               abortcount++;
287             }
288           }
289         }
290       }
291     } else {
292       abortcount++;
293     }
294     
295     //add next transaction event...could be us if we aborted
296     int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
297     if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
298       Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
299       if (serStart!=null) {
300         if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
301           serStart.addPoint(ev.getTime(),ev.getThread());
302         }
303       }
304
305       Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
306       currentevents[ev.getThread()]=nev;
307       eq.add(nev);
308     }
309   }
310
311   public Set rdConflictSet(int thread, ObjIndex obj) {
312     if (!wrobjmap.containsKey(obj))
313       return null;
314     HashSet conflictset=new HashSet();
315     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
316       Integer threadid=(Integer)it.next();
317       if (threadid.intValue()!=thread)
318         conflictset.add(threadid);
319     }
320     if (conflictset.isEmpty())
321       return null;
322     else
323       return conflictset;
324   }
325
326   public Set wrConflictSet(int thread, ObjIndex obj) {
327     HashSet conflictset=new HashSet();
328     if (rdobjmap.containsKey(obj)) {
329       for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
330         Integer threadid=(Integer)it.next();
331         if (threadid.intValue()!=thread)
332           conflictset.add(threadid);
333       }
334     }
335     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
336       Integer threadid=(Integer)it.next();
337       if (threadid.intValue()!=thread)
338         conflictset.add(threadid);
339     }
340     if (conflictset.isEmpty())
341       return null;
342     else
343       return conflictset;
344   }
345
346   //Takes as parameter -- current transaction read event ev, conflicting
347   //set of threads, and the current time
348   //Returning false causes current transaction not continue to be scheduled
349
350   public boolean handleConflicts(Event ev, Set threadstokill, long time) {
351     if (policy==ATTACK) {
352       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
353         Integer thread=(Integer)thit.next();
354         reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
355         int dback=backoff[thread.intValue()]*2;
356         if (dback>0)
357           backoff[thread.intValue()]=dback;
358         abortcount++;
359       }
360       return true;
361     } else if (policy==POLITE) {
362       reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
363       int dback=backoff[ev.getThread()]*2;
364       if (dback>0)
365         backoff[ev.getThread()]=dback;
366       abortcount++;
367       return false;
368     } else if (policy==KARMA) {
369       long opponenttime=0;
370
371       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
372         Integer thread=(Integer)thit.next();
373         Event other=currentevents[thread.intValue()];
374         int eventnum=other.getEvent();
375         long otime=other.getTransaction().getTime(other.getEvent());
376         if (otime>opponenttime)
377           opponenttime=otime;
378       }
379       if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
380         //kill ourself
381         reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
382         abortcount++;
383         return false;
384       } else {
385         //kill the opponents
386         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
387           Integer thread=(Integer)thit.next();
388           reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
389           abortcount++;
390         }
391         return true;    
392       }
393     }
394
395     //Not eager
396     return true;
397   }
398
399   //Handle current event (read, write, delay) in a transaction and
400   //enqueue the next one
401
402   public void enqueueEvent(Event ev, Transaction trans) {
403     //just enqueue next event
404     int event=ev.getEvent();
405     long currtime=ev.getTime();
406     ObjIndex object=trans.getObjIndex(event);
407     int operation=trans.getEvent(event);
408
409     if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
410       ObjectInfo oi=getmapping(object);
411       
412       if (oi.isRisky()) {
413         if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
414           //we're going to wait
415           boolean deadlocked=true;
416           ObjectInfo toi=oi;
417           for(int i=0;i<checkdepth;i++) {
418             //check if stalling would close the loop
419             if (toi.getOwner()==ev.getThread())
420               break;
421             //see if cycle is broken
422             if (!threadinfo[toi.getOwner()].isStalled()) {
423               deadlocked=false;
424               break;
425             }
426             //follow one more in depth
427             toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
428           }
429           
430           if (!deadlocked) {
431             //don't wait on stalled threads, we could deadlock
432             threadinfo[ev.getThread()].setStall(true);
433             threadinfo[ev.getThread()].setObjIndex(object);
434             if (serStall!=null)
435               serStall.addPoint(ev.getTime(),ev.getThread());
436             oi.addWaiter(ev);
437             return;
438           } else {
439             if (serAvoid!=null)
440               serAvoid.addPoint(ev.getTime(),ev.getThread());
441             deadlockcount++;
442           }
443         } else {
444           //we have object
445           oi.setOwner(ev.getThread());
446         }
447       }
448     }
449     
450     //process the current event
451     if (operation==Transaction.READ) {
452       //record read event
453       if (!rdobjmap.containsKey(object))
454         rdobjmap.put(object,new HashSet());
455       ((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()));
456       if (isEager()) {
457         //do eager contention management
458         Set conflicts=rdConflictSet(ev.getThread(), object);
459         if (conflicts!=null) {
460           if (!handleConflicts(ev, conflicts, currtime))
461             return;
462         }
463       }
464     } else if (operation==Transaction.WRITE) {
465       //record write event
466       if (!wrobjmap.containsKey(object))
467         wrobjmap.put(object,new HashSet());
468       ((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()));
469       if (isEager()) {
470         Set conflicts=wrConflictSet(ev.getThread(), object);
471         if (conflicts!=null) {
472           if (!handleConflicts(ev, conflicts, currtime))
473             return;
474         }
475       }
476     } else if (operation==Transaction.BARRIER) {
477       barriercount--;
478       if (barriercount==0) {
479         for(int i=0;i<e.numThreads();i++) {
480           //enqueue the next event
481           Event bev=currentevents[i];
482           int bevent=bev.getEvent();
483           long bcurrtime=bev.getTime();
484           Transaction btrans=bev.getTransaction();
485           long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
486           Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
487           currentevents[bev.getThread()]=nev;
488           eq.add(nev);
489         }
490         barriercount=e.numThreads();
491       } else {
492         //Do nothing
493         //wait until all threads in barrier
494       }
495       return;
496     }
497     
498     //enqueue the next event
499     long deltatime=trans.getTime(event+1)-trans.getTime(event);
500     Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
501     currentevents[ev.getThread()]=nev;
502     eq.add(nev);
503   }
504
505   
506   class Event implements Comparable {
507     boolean valid;
508     long time;
509     int num;
510     Transaction t;
511     int threadid;
512     int transnum;
513
514     public void makeInvalid() {
515       valid=false;
516     }
517
518     public boolean isValid() {
519       return valid;
520     }
521
522     public int getTransNum() {
523       return transnum;
524     }
525
526     public Transaction getTransaction() {
527       return t;
528     }
529
530     public int getEvent() {
531       return num;
532     }
533
534     public long getTime() {
535       return time;
536     }
537     
538     public void setTime(long time) {
539       this.time=time;
540     }
541
542     public int getThread() {
543       return threadid;
544     }
545
546     public Event(long time, Transaction t, int num, int threadid, int transnum) {
547       this.time=time;
548       this.t=t;
549       this.num=num;
550       this.threadid=threadid;
551       this.transnum=transnum;
552       valid=true;
553     }
554
555     //break ties to allow commits to occur earliest
556     public int compareTo(Object o) {
557       Event e=(Event)o;
558       long delta=time-e.time;
559       if (delta!=0) {
560         if (delta>0)
561           return 1;
562         else
563           return -1;
564       }
565       if (((getEvent()+1)==getTransaction().numEvents())&&
566           (e.getEvent()+1)!=e.getTransaction().numEvents())
567         return -1;
568       if (((getEvent()+1)!=getTransaction().numEvents())&&
569           (e.getEvent()+1)==e.getTransaction().numEvents())
570         return 1;
571       return 0;
572     }
573   }
574 }