3 public class FlexScheduler {
6 public FlexScheduler(Executor e, int policy) {
8 aborted=new boolean[e.numThreads()];
9 currentevents=new Event[e.numThreads()];
10 rdobjmap=new Hashtable();
11 wrobjmap=new Hashtable();
14 eq=new PriorityQueue();
15 backoff=new int[e.numThreads()];
16 for(int i=0;i<e.numThreads();i++) {
21 public static final int LAZY=0;
22 public static final int COMMIT=1;
23 public static final int ATTACK=2;
24 public static final int POLITE=3;
25 public static final int KARMA=4;
35 Event[] currentevents;
39 public boolean isEager() {
40 return policy==ATTACK||policy==POLITE||policy==KARMA;
43 public int getAborts() {
47 public int getCommits() {
51 public int getTime() {
55 public void reschedule(int currthread, int time) {
56 currentevents[currthread].makeInvalid();
57 Transaction trans=currentevents[currthread].getTransaction();
60 for(int i=0;i<trans.numEvents();i++) {
61 int object=trans.getObject(i);
62 if (object!=-1&&rdobjmap.containsKey(new Integer(object))) {
63 ((Set)rdobjmap.get(new Integer(object))).remove(new Integer(currthread));
65 if (object!=-1&&wrobjmap.containsKey(new Integer(object))) {
66 ((Set)wrobjmap.get(new Integer(object))).remove(new Integer(currthread));
70 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
71 currentevents[currthread]=nev;
75 public void startinitial() {
76 for(int i=0;i<e.numThreads();i++) {
77 Transaction trans=e.getThread(i).getTransaction(0);
78 int time=trans.getTime(0);
79 Event ev=new Event(time, trans, 0, i, 0);
87 //start first transactions
90 while(!eq.isEmpty()) {
91 Event ev=(Event)eq.poll();
95 Transaction trans=ev.getTransaction();
96 int event=ev.getEvent();
97 int currtime=ev.getTime();
100 if (trans.numEvents()==(event+1)) {
101 tryCommit(ev, trans);
103 enqueueEvent(ev, trans);
106 shorttesttime=lasttime;
110 public void tryCommit(Event ev, Transaction trans) {
111 //ready to commit this one
112 int currtime=ev.getTime();
114 //Remove everything we put in object sets
115 for(int i=0;i<trans.numEvents();i++) {
116 int object=trans.getObject(i);
117 if (object!=-1&&rdobjmap.containsKey(new Integer(object))) {
118 ((Set)rdobjmap.get(new Integer(object))).remove(new Integer(ev.getThread()));
120 if (object!=-1&&wrobjmap.containsKey(new Integer(object))) {
121 ((Set)wrobjmap.get(new Integer(object))).remove(new Integer(ev.getThread()));
125 //See if we have been flagged as aborted
126 boolean abort=aborted[ev.getThread()];
127 aborted[ev.getThread()]=false;
129 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
132 backoff[ev.getThread()]=1;
133 //abort the other threads
134 for(int i=0;i<trans.numEvents();i++) {
135 int object=trans.getObject(i);
136 int op=trans.getEvent(i);
137 if (op==Transaction.WRITE) {
138 HashSet abortset=new HashSet();
139 if (rdobjmap.containsKey(new Integer(object))) {
140 for(Iterator it=((Set)rdobjmap.get(new Integer(object))).iterator();it.hasNext();) {
141 Integer threadid=(Integer)it.next();
142 abortset.add(threadid);
145 if (wrobjmap.containsKey(new Integer(object))) {
146 for(Iterator it=((Set)wrobjmap.get(new Integer(object))).iterator();it.hasNext();) {
147 Integer threadid=(Integer)it.next();
148 abortset.add(threadid);
151 for(Iterator abit=abortset.iterator();abit.hasNext();) {
152 Integer threadid=(Integer)abit.next();
154 aborted[threadid]=true;
155 } else if (policy==COMMIT) {
156 reschedule(threadid, currtime);
166 //add next transaction event...could be us if we aborted
167 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
168 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
169 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
170 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
171 currentevents[ev.getThread()]=nev;
176 public Set rdConflictSet(int thread, int object) {
177 Integer obj=new Integer(object);
178 if (!wrobjmap.containsKey(obj))
180 HashSet conflictset=new HashSet();
181 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
182 Integer threadid=(Integer)it.next();
183 if (threadid.intValue()!=thread)
184 conflictset.add(threadid);
186 if (conflictset.isEmpty())
192 public Set wrConflictSet(int thread, int object) {
193 Integer obj=new Integer(object);
194 if (!rdobjmap.containsKey(obj))
196 HashSet conflictset=new HashSet();
197 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
198 Integer threadid=(Integer)it.next();
199 if (threadid.intValue()!=thread)
200 conflictset.add(threadid);
202 if (conflictset.isEmpty())
208 //Takes as parameter -- current transaction read event ev, conflicting
209 //set of threads, and the current time
211 public boolean handleConflicts(Event ev, Set threadstokill, int time) {
212 if (policy==ATTACK) {
213 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
214 Integer thread=(Integer)thit.next();
215 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
216 backoff[thread.intValue()]*=2;
220 } else if (policy==POLITE) {
221 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
222 backoff[ev.getThread()]*=2;
225 } else if (policy==KARMA) {
228 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
229 Integer thread=(Integer)thit.next();
230 Event other=currentevents[thread.intValue()];
231 int eventnum=other.getEvent();
232 int otime=other.getTransaction().getTime(other.getEvent());
233 if (otime>opponenttime)
236 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
238 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
239 backoff[ev.getThread()]*=2;
244 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
245 Integer thread=(Integer)thit.next();
246 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
247 backoff[thread.intValue()]*=2;
258 public void enqueueEvent(Event ev, Transaction trans) {
259 //just enqueue next event
260 int event=ev.getEvent();
261 int currtime=ev.getTime();
262 int object=trans.getObject(event);
263 int operation=trans.getEvent(event);
264 //process the current event
265 if (operation==Transaction.READ) {
266 Integer obj=new Integer(object);
267 if (!rdobjmap.containsKey(obj))
268 rdobjmap.put(obj,new HashSet());
269 ((Set)rdobjmap.get(obj)).add(new Integer(ev.getThread()));
271 Set conflicts=rdConflictSet(ev.getThread(), object);
273 if (!handleConflicts(ev, conflicts, currtime))
276 } else if (operation==Transaction.WRITE) {
277 Integer obj=new Integer(object);
278 if (!wrobjmap.containsKey(obj))
279 wrobjmap.put(obj,new HashSet());
280 ((Set)wrobjmap.get(obj)).add(new Integer(ev.getThread()));
282 Set conflicts=wrConflictSet(ev.getThread(), object);
284 if (!handleConflicts(ev, conflicts, currtime))
289 //enqueue the next event
290 int deltatime=trans.getTime(event+1)-trans.getTime(event);
291 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
292 currentevents[ev.getThread()]=nev;
297 class Event implements Comparable {
305 public void makeInvalid() {
309 public boolean isValid() {
313 public int getTransNum() {
317 public Transaction getTransaction() {
321 public int getEvent() {
325 public int getTime() {
329 public int getThread() {
333 public Event(int time, Transaction t, int num, int threadid, int transnum) {
337 this.threadid=threadid;
338 this.transnum=transnum;
342 public int compareTo(Object o) {