code to do plots of transaction events..
[IRC.git] / Robust / TransSim / FlexScheduler.java
1 import java.util.*;
2
3 public class FlexScheduler {
4   Executor e;
5   int abortThreshold;
6   int abortRatio;
7   int deadlockcount;
8   int checkdepth;
9
10   public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
11     this(e, policy, p);
12     this.abortThreshold=abortThreshold;
13     this.abortRatio=abortRatio;
14     this.checkdepth=checkdepth;
15   }
16   
17   public FlexScheduler(Executor e, int policy, Plot p) {
18     this.e=e;
19     aborted=new boolean[e.numThreads()];
20     currentevents=new Event[e.numThreads()];
21     rdobjmap=new Hashtable();
22     wrobjmap=new Hashtable();
23     this.policy=policy;
24     r=new Random(100);
25     eq=new PriorityQueue();
26     backoff=new int[e.numThreads()];
27     objtoinfo=new Hashtable();
28     threadinfo=new ThreadInfo[e.numThreads()];
29     blocked=new boolean[e.numThreads()];
30
31     for(int i=0;i<e.numThreads();i++) {
32       backoff[i]=BACKOFFSTART;
33       threadinfo[i]=new ThreadInfo(this);
34     }
35     this.p=p;
36     if (p!=null) {
37       serCommit=p.getSeries("COMMIT");
38       serStart=p.getSeries("START");
39       serAbort=p.getSeries("ABORT");
40       serStall=p.getSeries("STALL");
41       serWake=p.getSeries("WAKE");
42       serAvoid=p.getSeries("AVOIDDEADLOCK");
43     }
44   }
45
46   Plot p;
47   Series serCommit;
48   Series serStart;
49   Series serAbort;
50   Series serStall;
51   Series serAvoid;
52   Series serWake;
53
54   public int getDeadLockCount() {
55     return deadlockcount;
56   }
57
58   //Where to start the backoff delay at
59   public static final int BACKOFFSTART=1;
60
61   //Commit options
62   public static final int LAZY=0;
63   public static final int COMMIT=1;
64   public static final int ATTACK=2;
65   public static final int POLITE=3;
66   public static final int KARMA=4;
67   public static final int LOCK=5;
68   public static final int LOCKCOMMIT=6;
69
70   PriorityQueue eq;
71   int policy;
72   boolean[] aborted;
73   int shorttesttime;
74   Hashtable rdobjmap;
75   Hashtable wrobjmap;
76   int abortcount;
77   int commitcount;
78   Event[] currentevents;
79   Random r;
80   int[] backoff;
81   Hashtable objtoinfo;
82   ThreadInfo[] threadinfo;
83   
84   boolean[] blocked;
85
86   public boolean isEager() {
87     return policy==ATTACK||policy==POLITE||policy==KARMA;
88   }
89
90   public boolean isLock() {
91     return policy==LOCK||policy==LOCKCOMMIT;
92   }
93
94   public int getAborts() {
95     return abortcount;
96   }
97
98   public int getCommits() {
99     return commitcount;
100   }
101
102   public int getTime() {
103     return shorttesttime;
104   }
105
106   //Aborts another thread...
107   public void reschedule(int currthread, int time) {
108     currentevents[currthread].makeInvalid();
109     if (threadinfo[currthread].isStalled()) {
110       //remove from waiter list
111       threadinfo[currthread].setStall(false);
112       getmapping(threadinfo[currthread].getObject()).getWaiters().remove(currentevents[currthread]);
113     }
114     if (serAbort!=null) {
115       serAbort.addPoint(time, currthread);
116     }
117     Transaction trans=currentevents[currthread].getTransaction();
118     
119     releaseObjects(trans, currthread, time);
120     Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
121     currentevents[currthread]=nev;
122     eq.add(nev);
123   }
124
125
126   private void releaseObjects(Transaction trans, int currthread, int time) {
127     //remove all events
128     for(int i=0;i<trans.numEvents();i++) {
129       int object=trans.getObject(i);
130       Integer obj=new Integer(object);
131       if (object!=-1&&rdobjmap.containsKey(obj)) {
132         ((Set)rdobjmap.get(obj)).remove(new Integer(currthread));
133       }
134       if (object!=-1&&wrobjmap.containsKey(obj)) {
135         ((Set)wrobjmap.get(obj)).remove(new Integer(currthread));
136       }
137       if (object!=-1&&objtoinfo.containsKey(obj)) {
138         ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
139         if (oi.getOwner()==currentevents[currthread].getThread()) {
140           oi.releaseOwner();
141           
142           //wake up one waiter
143           for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
144             //requeue everyone who was waiting on us and start them back up
145             Event waiter=(Event)waitit.next();
146             waitit.remove();
147             waiter.setTime(time);
148             threadinfo[waiter.getThread()].setStall(false);
149             if (serWake!=null)
150               serWake.addPoint(time,waiter.getThread());
151             oi.setOwner(waiter.getThread());
152             eq.add(waiter);
153             break;
154           }
155         }
156       }
157     }
158   }
159
160   public void startinitial() {
161     for(int i=0;i<e.numThreads();i++) {
162       Transaction trans=e.getThread(i).getTransaction(0);
163       int time=trans.getTime(0);
164       Event ev=new Event(time, trans, 0, i, 0);
165       currentevents[i]=ev;
166       eq.add(ev);
167     }
168   }
169
170   public void dosim() {
171     int lasttime=0;
172     //start first transactions
173     startinitial();
174
175     while(!eq.isEmpty()) {
176       Event ev=(Event)eq.poll();
177       if (!ev.isValid()) {
178         continue;
179       }
180
181       Transaction trans=ev.getTransaction();
182       int event=ev.getEvent();
183       int currtime=ev.getTime();
184       lasttime=currtime;
185
186       if (trans.numEvents()==(event+1)) {
187         tryCommit(ev, trans);
188       } else {
189         enqueueEvent(ev, trans);
190       }
191     }
192     shorttesttime=lasttime;
193     if (p!=null)
194       p.close();
195   }
196
197   private ObjectInfo getmapping(int object) {
198     Integer obj=new Integer(object);
199     if (!objtoinfo.containsKey(obj))
200       objtoinfo.put(obj, new ObjectInfo(this));
201     return (ObjectInfo)objtoinfo.get(obj);
202   }
203
204   public void tryCommit(Event ev, Transaction trans) {
205     //ready to commit this one
206     int currtime=ev.getTime();
207     releaseObjects(trans, ev.getThread(), currtime);
208     
209     //See if we have been flagged as aborted for the lazy case
210     boolean abort=aborted[ev.getThread()];
211     aborted[ev.getThread()]=false;
212     if (!abort) {
213       //if it is a transaction, increment comit count
214       if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
215         commitcount++;
216         if (serCommit!=null) {
217           serCommit.addPoint(ev.getTime(),ev.getThread());
218         }
219       }
220       //Reset our backoff counter
221       backoff[ev.getThread()]=BACKOFFSTART;
222
223       //abort the other threads
224       for(int i=0;i<trans.numEvents();i++) {
225         int object=trans.getObject(i);
226         int op=trans.getEvent(i);
227         //Mark commits to objects
228         if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
229           getmapping(object).recordCommit();
230         }
231         //Check for threads we might cause to abort
232         if (op==Transaction.WRITE) {
233           HashSet abortset=new HashSet();
234           Integer obj=new Integer(object);
235           if (rdobjmap.containsKey(obj)) {
236             for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
237               Integer threadid=(Integer)it.next();
238               abortset.add(threadid);
239               if (isLock()) {
240                 ObjectInfo oi=getmapping(object);
241                 oi.recordAbort();
242               }
243             }
244           }
245           if (wrobjmap.containsKey(obj)) {
246             for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
247               Integer threadid=(Integer)it.next();
248               abortset.add(threadid);
249               if (isLock()&&(!rdobjmap.containsKey(obj)||!((Set)rdobjmap.get(obj)).contains(threadid))) {
250                 //if this object hasn't already cause this thread to
251                 //abort, then flag it as an abort cause
252                 ObjectInfo oi=getmapping(object);
253                 oi.recordAbort();
254               }
255             }
256           }
257           for(Iterator abit=abortset.iterator();abit.hasNext();) {
258             Integer threadid=(Integer)abit.next();
259             if (policy==LAZY||policy==LOCK) {
260               //just flag to abort when it trie to commit
261               aborted[threadid]=true;
262               if (serAbort!=null)
263                 serAbort.addPoint(currtime, threadid);
264             } else if (policy==COMMIT||policy==LOCKCOMMIT) {
265               //abort it immediately
266               reschedule(threadid, currtime);
267               abortcount++;
268             }
269           }
270         }
271       }
272     } else {
273       abortcount++;
274     }
275     
276     //add next transaction event...could be us if we aborted
277     int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
278     if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
279       Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
280       if (serStart!=null) {
281         if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
282           serStart.addPoint(ev.getTime(),ev.getThread());
283         }
284       }
285
286       Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
287       currentevents[ev.getThread()]=nev;
288       eq.add(nev);
289     }
290   }
291
292   public Set rdConflictSet(int thread, int object) {
293     Integer obj=new Integer(object);
294     if (!wrobjmap.containsKey(obj))
295       return null;
296     HashSet conflictset=new HashSet();
297     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
298       Integer threadid=(Integer)it.next();
299       if (threadid.intValue()!=thread)
300         conflictset.add(threadid);
301     }
302     if (conflictset.isEmpty())
303       return null;
304     else
305       return conflictset;
306   }
307
308   public Set wrConflictSet(int thread, int object) {
309     Integer obj=new Integer(object);
310
311     HashSet conflictset=new HashSet();
312     if (rdobjmap.containsKey(obj)) {
313       for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
314         Integer threadid=(Integer)it.next();
315         if (threadid.intValue()!=thread)
316           conflictset.add(threadid);
317       }
318     }
319     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
320       Integer threadid=(Integer)it.next();
321       if (threadid.intValue()!=thread)
322         conflictset.add(threadid);
323     }
324     if (conflictset.isEmpty())
325       return null;
326     else
327       return conflictset;
328   }
329
330   //Takes as parameter -- current transaction read event ev, conflicting
331   //set of threads, and the current time
332   //Returning false causes current transaction not continue to be scheduled
333
334   public boolean handleConflicts(Event ev, Set threadstokill, int time) {
335     if (policy==ATTACK) {
336       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
337         Integer thread=(Integer)thit.next();
338         reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
339         backoff[thread.intValue()]*=2;
340         abortcount++;
341       }
342       return true;
343     } else if (policy==POLITE) {
344       reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
345       backoff[ev.getThread()]*=2;
346       abortcount++;
347       return false;
348     } else if (policy==KARMA) {
349       int opponenttime=0;
350
351       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
352         Integer thread=(Integer)thit.next();
353         Event other=currentevents[thread.intValue()];
354         int eventnum=other.getEvent();
355         int otime=other.getTransaction().getTime(other.getEvent());
356         if (otime>opponenttime)
357           opponenttime=otime;
358       }
359       if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
360         //kill ourself
361         reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
362         abortcount++;
363         return false;
364       } else {
365         //kill the opponents
366         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
367           Integer thread=(Integer)thit.next();
368           reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
369           abortcount++;
370         }
371         return true;    
372       }
373     }
374
375     //Not eager
376     return true;
377   }
378
379   //Handle current event (read, write, delay) in a transaction and
380   //enqueue the next one
381
382   public void enqueueEvent(Event ev, Transaction trans) {
383     //just enqueue next event
384     int event=ev.getEvent();
385     int currtime=ev.getTime();
386     int object=trans.getObject(event);
387     int operation=trans.getEvent(event);
388     Integer obj=new Integer(object);
389
390     if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
391       ObjectInfo oi=getmapping(object);
392       
393       if (oi.isRisky()) {
394         if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
395           //we're going to wait
396           boolean deadlocked=true;
397           ObjectInfo toi=oi;
398           for(int i=0;i<checkdepth;i++) {
399             //check if stalling would close the loop
400             if (toi.getOwner()==ev.getThread())
401               break;
402             //see if cycle is broken
403             if (!threadinfo[toi.getOwner()].isStalled()) {
404               deadlocked=false;
405               break;
406             }
407             //follow one more in depth
408             toi=getmapping(threadinfo[toi.getOwner()].getObject());
409           }
410           
411           if (!deadlocked) {
412             //don't wait on stalled threads, we could deadlock
413             threadinfo[ev.getThread()].setStall(true);
414             threadinfo[ev.getThread()].setObject(object);
415             if (serStall!=null)
416               serStall.addPoint(ev.getTime(),ev.getThread());
417             oi.addWaiter(ev);
418             return;
419           } else {
420             if (serAvoid!=null)
421               serAvoid.addPoint(ev.getTime(),ev.getThread());
422             deadlockcount++;
423           }
424         } else {
425           //we have object
426           oi.setOwner(ev.getThread());
427         }
428       }
429     }
430     
431     //process the current event
432     if (operation==Transaction.READ) {
433       //record read event
434       if (!rdobjmap.containsKey(obj))
435         rdobjmap.put(obj,new HashSet());
436       ((Set)rdobjmap.get(obj)).add(new Integer(ev.getThread()));
437       if (isEager()) {
438         //do eager contention management
439         Set conflicts=rdConflictSet(ev.getThread(), object);
440         if (conflicts!=null) {
441           if (!handleConflicts(ev, conflicts, currtime))
442             return;
443         }
444       }
445     } else if (operation==Transaction.WRITE) {
446       //record write event
447       if (!wrobjmap.containsKey(obj))
448         wrobjmap.put(obj,new HashSet());
449       ((Set)wrobjmap.get(obj)).add(new Integer(ev.getThread()));
450       if (isEager()) {
451         Set conflicts=wrConflictSet(ev.getThread(), object);
452         if (conflicts!=null) {
453           if (!handleConflicts(ev, conflicts, currtime))
454             return;
455         }
456       }
457     }
458     
459     //enqueue the next event
460     int deltatime=trans.getTime(event+1)-trans.getTime(event);
461     Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
462     currentevents[ev.getThread()]=nev;
463     eq.add(nev);
464   }
465
466   
467   class Event implements Comparable {
468     boolean valid;
469     int time;
470     int num;
471     Transaction t;
472     int threadid;
473     int transnum;
474
475     public void makeInvalid() {
476       valid=false;
477     }
478
479     public boolean isValid() {
480       return valid;
481     }
482
483     public int getTransNum() {
484       return transnum;
485     }
486
487     public Transaction getTransaction() {
488       return t;
489     }
490
491     public int getEvent() {
492       return num;
493     }
494
495     public int getTime() {
496       return time;
497     }
498     
499     public void setTime(int time) {
500       this.time=time;
501     }
502
503     public int getThread() {
504       return threadid;
505     }
506
507     public Event(int time, Transaction t, int num, int threadid, int transnum) {
508       this.time=time;
509       this.t=t;
510       this.num=num;
511       this.threadid=threadid;
512       this.transnum=transnum;
513       valid=true;
514     }
515
516     //break ties to allow commits to occur earliest
517     public int compareTo(Object o) {
518       Event e=(Event)o;
519       int delta=time-e.time;
520       if (delta!=0)
521         return delta;
522       if (((getEvent()+1)==getTransaction().numEvents())&&
523           (e.getEvent()+1)!=e.getTransaction().numEvents())
524         return -1;
525       if (((getEvent()+1)!=getTransaction().numEvents())&&
526           (e.getEvent()+1)==e.getTransaction().numEvents())
527         return 1;
528       return 0;
529     }
530   }
531 }