Transaction simulation framework. The idea is to basically give us a way to explore...
[IRC.git] / Robust / TransSim / FlexScheduler.java
1 import java.util.*;
2
3 public class FlexScheduler {
4   Executor e;
5   
6   public FlexScheduler(Executor e, int policy) {
7     this.e=e;
8     aborted=new boolean[e.numThreads()];
9     currentevents=new Event[e.numThreads()];
10     rdobjmap=new Hashtable();
11     wrobjmap=new Hashtable();
12     this.policy=policy;
13     r=new Random(100);
14     eq=new PriorityQueue();
15     backoff=new int[e.numThreads()];
16     for(int i=0;i<e.numThreads();i++) {
17       backoff[i]=1;
18     }
19   }
20
21   public static final int LAZY=0;
22   public static final int COMMIT=1;
23   public static final int ATTACK=2;
24   public static final int POLITE=3;
25   public static final int KARMA=4;
26
27   PriorityQueue eq;
28   int policy;
29   boolean[] aborted;
30   int shorttesttime;
31   Hashtable rdobjmap;
32   Hashtable wrobjmap;
33   int abortcount;
34   int commitcount;
35   Event[] currentevents;
36   Random r;
37   int[] backoff;
38
39   public boolean isEager() {
40     return policy==ATTACK||policy==POLITE||policy==KARMA;
41   }
42
43   public int getAborts() {
44     return abortcount;
45   }
46
47   public int getCommits() {
48     return commitcount;
49   }
50
51   public int getTime() {
52     return shorttesttime;
53   }
54
55   public void reschedule(int currthread, int time) {
56     currentevents[currthread].makeInvalid();
57     Transaction trans=currentevents[currthread].getTransaction();
58
59     //remove all events
60     for(int i=0;i<trans.numEvents();i++) {
61       int object=trans.getObject(i);
62       if (object!=-1&&rdobjmap.containsKey(new Integer(object))) {
63         ((Set)rdobjmap.get(new Integer(object))).remove(new Integer(currthread));
64       }
65       if (object!=-1&&wrobjmap.containsKey(new Integer(object))) {
66         ((Set)wrobjmap.get(new Integer(object))).remove(new Integer(currthread));
67       }
68     }
69     
70     Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
71     currentevents[currthread]=nev;
72     eq.add(nev);
73   }
74
75   public void startinitial() {
76     for(int i=0;i<e.numThreads();i++) {
77       Transaction trans=e.getThread(i).getTransaction(0);
78       int time=trans.getTime(0);
79       Event ev=new Event(time, trans, 0, i, 0);
80       currentevents[i]=ev;
81       eq.add(ev);
82     }
83   }
84
85   public void dosim() {
86     int lasttime=0;
87     //start first transactions
88     startinitial();
89
90     while(!eq.isEmpty()) {
91       Event ev=(Event)eq.poll();
92       if (!ev.isValid())
93         continue;
94
95       Transaction trans=ev.getTransaction();
96       int event=ev.getEvent();
97       int currtime=ev.getTime();
98       lasttime=currtime;
99
100       if (trans.numEvents()==(event+1)) {
101         tryCommit(ev, trans);
102       } else {
103         enqueueEvent(ev, trans);
104       }
105     }
106     shorttesttime=lasttime;
107   }
108
109
110   public void tryCommit(Event ev, Transaction trans) {
111     //ready to commit this one
112     int currtime=ev.getTime();
113
114     //Remove everything we put in object sets
115     for(int i=0;i<trans.numEvents();i++) {
116       int object=trans.getObject(i);
117       if (object!=-1&&rdobjmap.containsKey(new Integer(object))) {
118         ((Set)rdobjmap.get(new Integer(object))).remove(new Integer(ev.getThread()));
119       }
120       if (object!=-1&&wrobjmap.containsKey(new Integer(object))) {
121         ((Set)wrobjmap.get(new Integer(object))).remove(new Integer(ev.getThread()));
122       }
123     }
124     
125     //See if we have been flagged as aborted
126     boolean abort=aborted[ev.getThread()];
127     aborted[ev.getThread()]=false;
128     if (!abort) {
129       if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
130         commitcount++;
131       }
132       backoff[ev.getThread()]=1;
133       //abort the other threads
134       for(int i=0;i<trans.numEvents();i++) {
135         int object=trans.getObject(i);
136         int op=trans.getEvent(i);
137         if (op==Transaction.WRITE) {
138           HashSet abortset=new HashSet();
139           if (rdobjmap.containsKey(new Integer(object))) {
140             for(Iterator it=((Set)rdobjmap.get(new Integer(object))).iterator();it.hasNext();) {
141               Integer threadid=(Integer)it.next();
142               abortset.add(threadid);
143             }
144           }
145           if (wrobjmap.containsKey(new Integer(object))) {
146             for(Iterator it=((Set)wrobjmap.get(new Integer(object))).iterator();it.hasNext();) {
147               Integer threadid=(Integer)it.next();
148               abortset.add(threadid);
149             }
150           }
151           for(Iterator abit=abortset.iterator();abit.hasNext();) {
152             Integer threadid=(Integer)abit.next();
153             if (policy==LAZY) {
154               aborted[threadid]=true;
155             } else if (policy==COMMIT) {
156               reschedule(threadid, currtime);
157               abortcount++;
158             }
159           }
160         }
161       }
162     } else {
163       abortcount++;
164     }
165     
166     //add next transaction event...could be us if we aborted
167     int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
168     if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
169       Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
170       Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
171       currentevents[ev.getThread()]=nev;
172       eq.add(nev);
173     }
174   }
175
176   public Set rdConflictSet(int thread, int object) {
177     Integer obj=new Integer(object);
178     if (!wrobjmap.containsKey(obj))
179       return null;
180     HashSet conflictset=new HashSet();
181     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
182       Integer threadid=(Integer)it.next();
183       if (threadid.intValue()!=thread)
184         conflictset.add(threadid);
185     }
186     if (conflictset.isEmpty())
187       return null;
188     else
189       return conflictset;
190   }
191
192   public Set wrConflictSet(int thread, int object) {
193     Integer obj=new Integer(object);
194     if (!rdobjmap.containsKey(obj))
195       return null;
196     HashSet conflictset=new HashSet();
197     for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
198       Integer threadid=(Integer)it.next();
199       if (threadid.intValue()!=thread)
200         conflictset.add(threadid);
201     }
202     if (conflictset.isEmpty())
203       return null;
204     else
205       return conflictset;
206   }
207
208   //Takes as parameter -- current transaction read event ev, conflicting
209   //set of threads, and the current time
210
211   public boolean handleConflicts(Event ev, Set threadstokill, int time) {
212     if (policy==ATTACK) {
213       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
214         Integer thread=(Integer)thit.next();
215         reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
216         backoff[thread.intValue()]*=2;
217         abortcount++;
218       }
219       return true;
220     } else if (policy==POLITE) {
221       reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
222       backoff[ev.getThread()]*=2;
223       abortcount++;
224       return false;
225     } else if (policy==KARMA) {
226       int opponenttime=0;
227
228       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
229         Integer thread=(Integer)thit.next();
230         Event other=currentevents[thread.intValue()];
231         int eventnum=other.getEvent();
232         int otime=other.getTransaction().getTime(other.getEvent());
233         if (otime>opponenttime)
234           opponenttime=otime;
235       }
236       if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
237         //kill ourself
238         reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
239         backoff[ev.getThread()]*=2;
240         abortcount++;
241         return false;
242       } else {
243         //kill the opponents
244         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
245           Integer thread=(Integer)thit.next();
246           reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
247           backoff[thread.intValue()]*=2;
248           abortcount++;
249         }
250         return true;    
251       }
252     }
253
254     //Not eager
255     return true;
256   }
257
258   public void enqueueEvent(Event ev, Transaction trans) {
259     //just enqueue next event
260     int event=ev.getEvent();
261     int currtime=ev.getTime();
262     int object=trans.getObject(event);
263     int operation=trans.getEvent(event);
264     //process the current event
265     if (operation==Transaction.READ) {
266       Integer obj=new Integer(object);
267       if (!rdobjmap.containsKey(obj))
268         rdobjmap.put(obj,new HashSet());
269       ((Set)rdobjmap.get(obj)).add(new Integer(ev.getThread()));
270       if (isEager()) {
271         Set conflicts=rdConflictSet(ev.getThread(), object);
272         if (conflicts!=null)
273           if (!handleConflicts(ev, conflicts, currtime))
274             return;
275       }
276     } else if (operation==Transaction.WRITE) {
277       Integer obj=new Integer(object);
278       if (!wrobjmap.containsKey(obj))
279         wrobjmap.put(obj,new HashSet());
280       ((Set)wrobjmap.get(obj)).add(new Integer(ev.getThread()));
281       if (isEager()) {
282         Set conflicts=wrConflictSet(ev.getThread(), object);
283         if (conflicts!=null)
284           if (!handleConflicts(ev, conflicts, currtime))
285             return;
286       }
287     }
288     
289     //enqueue the next event
290     int deltatime=trans.getTime(event+1)-trans.getTime(event);
291     Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
292     currentevents[ev.getThread()]=nev;
293     eq.add(nev);
294   }
295
296   
297   class Event implements Comparable {
298     boolean valid;
299     int time;
300     int num;
301     Transaction t;
302     int threadid;
303     int transnum;
304
305     public void makeInvalid() {
306       valid=false;
307     }
308
309     public boolean isValid() {
310       return valid;
311     }
312
313     public int getTransNum() {
314       return transnum;
315     }
316
317     public Transaction getTransaction() {
318       return t;
319     }
320
321     public int getEvent() {
322       return num;
323     }
324
325     public int getTime() {
326       return time;
327     }
328
329     public int getThread() {
330       return threadid;
331     }
332
333     public Event(int time, Transaction t, int num, int threadid, int transnum) {
334       this.time=time;
335       this.t=t;
336       this.num=num;
337       this.threadid=threadid;
338       this.transnum=transnum;
339       valid=true;
340     }
341
342     public int compareTo(Object o) {
343       Event e=(Event)o;
344       return time-e.time;
345     }
346   }
347
348
349
350 }