this.abortRatio=abortRatio;
this.checkdepth=checkdepth;
}
-
+
public void run() {
dosim();
}
eq=new PriorityQueue();
backoff=new int[e.numThreads()];
retrycount=new int[e.numThreads()];
+ transferred=new int[e.numThreads()];
objtoinfo=new Hashtable();
threadinfo=new ThreadInfo[e.numThreads()];
blocked=new boolean[e.numThreads()];
}
}
+ int lowid=0;
+ int countlow=0;
+
Plot p;
Series serCommit;
Series serStart;
public static final int KARMA=8;
public static final int POLITE=9;
public static final int ERUPTION=10;
+ public static final int THREAD=11;
+ public static final int ATTACKTIME=12;
+ public static final int ATTACKTHREAD=13;
+
+ public static String getName(int policy) {
+ switch (policy) {
+ case LAZY:
+ return new String("LAZY");
+ case COMMIT:
+ return new String("COMMIT");
+ case ATTACK:
+ return new String("ATTACK");
+ case SUICIDE:
+ return new String("TIMID");
+ case TIMESTAMP:
+ return new String("TIMESTAMP");
+ case LOCK:
+ return new String("LOCK");
+ case LOCKCOMMIT:
+ return new String("LOCKCOMMIT");
+ case RANDOM:
+ return new String("RANDOM");
+ case KARMA:
+ return new String("KARMA");
+ case POLITE:
+ return new String("POLITE");
+ case ERUPTION:
+ return new String("ERUPTION");
+ case THREAD:
+ return new String("THREAD");
+ case ATTACKTIME:
+ return new String("ATTACKTIME");
+ case ATTACKTHREAD:
+ return new String("ATTACKTHREAD");
+ }
+ return null;
+ }
PriorityQueue eq;
int policy;
boolean[] aborted;
long shorttesttime;
+ long earliesttime=-1;
long starttime=-1;
Hashtable rdobjmap;
Hashtable wrobjmap;
int abortcount;
int commitcount;
+ long backoffcycles;
+ long stallcycles;
+ long abortedcycles;
Event[] currentevents;
Random r;
int[] backoff;
int[] retrycount;
+ int[] transferred;
Hashtable objtoinfo;
ThreadInfo[] threadinfo;
boolean[] blocked;
public boolean isEager() {
- return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE;
+ return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION||policy==THREAD||policy==ATTACKTIME||policy==ATTACKTHREAD;
}
public boolean countObjects() {
return commitcount;
}
+ public long getEarliestTime() {
+ return earliesttime-starttime;
+ }
+
public long getTime() {
return shorttesttime-starttime;
}
+ public long getStallTime() {
+ return stallcycles;
+ }
+
+ public long getBackoffTime() {
+ return backoffcycles;
+ }
+
+ public long getAbortedTime() {
+ return abortedcycles;
+ }
+
+ //Computes wasted time
+ public void timewasted(int currthread, long currtime) {
+ Event e=currentevents[currthread];
+ Transaction trans=e.getTransaction();
+ int eIndex=e.getEvent();
+ long eTime=e.getTime();
+ long timeleft=eTime-currtime;
+ if (e.isStalled()) {
+ stallcycles-=timeleft; //this time is no longer stalled...back it out
+ timeleft=0;//if the event is stalled, we already waited this time...
+ }
+ long totaltime=trans.getTime(eIndex);
+ totaltime-=timeleft;//subtract off time to the next event
+ abortedcycles+=totaltime;
+ }
+
//Aborts another thread...
- public void reschedule(int currthread, long time) {
+ public void reschedule(int currthread, long currtime, long backofftime) {
+ long time=currtime+backofftime;
+ backoffcycles+=backofftime;
currentevents[currthread].makeInvalid();
if (threadinfo[currthread].isStalled()) {
//remove from waiter list
}
//Aborts another thread...
- public void stall(Event ev, long time) {
- ev.setTime(time);
+ public void stall(Event ev, long time, long delay) {
+ stallcycles+=delay;
+ ev.setTime(time+delay);
+ ev.setStall();
eq.add(ev);
}
-
private void releaseObjects(Transaction trans, int currthread, long time) {
//remove all events
for(int i=0;i<trans.numEvents();i++) {
tryCommit(ev, trans);
tcount++;
if ((tcount%100000)==0)
- System.out.println("Attempted "+tcount+"transactions");
+ System.out.println("Attempted "+tcount+"transactions "+policy);
} else {
enqueueEvent(ev, trans);
}
//ready to commit this one
long currtime=ev.getTime();
releaseObjects(trans, ev.getThread(), currtime);
+
+ if (ev.getThread()==lowid) {
+ countlow++;
+ if (countlow==4) {
+ countlow=0;
+ lowid++;
+ if (lowid==e.numThreads())
+ lowid=0;
+ }
+ }
//See if we have been flagged as aborted for the lazy case
boolean abort=aborted[ev.getThread()];
aborted[ev.getThread()]=false;
if (!abort) {
- //if it is a transaction, increment comit count
+ //if it is a transaction, increment commit count
if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
- threadinfo[ev.getThread()].priority=0;
commitcount++;
if (serCommit!=null) {
serCommit.addPoint(ev.getTime(),ev.getThread());
}
}
//Reset our backoff counter
+ threadinfo[ev.getThread()].priority=0;
+ threadinfo[ev.getThread()].aborted=false;
backoff[ev.getThread()]=BACKOFFSTART;
retrycount[ev.getThread()]=0;
+ transferred[ev.getThread()]=0;
//abort the other threads
for(int i=0;i<trans.numEvents();i++) {
serAbort.addPoint(currtime, threadid);
} else if (policy==COMMIT||policy==LOCKCOMMIT) {
//abort it immediately
- reschedule(threadid, currtime);
+ timewasted(threadid, currtime);
+ reschedule(threadid, currtime, 0);
abortcount++;
}
}
}
} else {
abortcount++;
+ timewasted(ev.getThread(), currtime);
}
//add next transaction event...could be us if we aborted
Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
currentevents[ev.getThread()]=nev;
eq.add(nev);
+ } else {
+ if (earliesttime==-1)
+ earliesttime=currtime;
}
}
return conflictset;
}
+
+ int normalize(int tid) {
+ int newtid=tid-lowid;
+ if (newtid<0)
+ newtid+=e.numThreads();
+ return newtid;
+ }
+
public Set wrConflictSet(int thread, ObjIndex obj) {
HashSet conflictset=new HashSet();
if (rdobjmap.containsKey(obj)) {
//set of threads, and the current time
//Returning false causes current transaction not continue to be scheduled
+
public boolean handleConflicts(Event ev, Set threadstokill, long time) {
if (policy==RANDOM) {
boolean b=r.nextBoolean();
if (b) {
//delay
- stall(ev, time+r.nextInt(200));
+ int thread=ev.getThread();
+ int dback=backoff[thread]*2;
+ if (dback>0)
+ backoff[thread]=dback;
+ stall(ev, time, r.nextInt(backoff[thread]));
return false;
} else {
//abort other transactions
for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
Integer thread=(Integer)thit.next();
- reschedule(thread, time);
+ timewasted(thread, time);
+ reschedule(thread, time, 0);
abortcount++;
}
return true;
}
} else if (policy==KARMA) {
int maxpriority=0;
- //abort other transactions
for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
Integer thread=(Integer)thit.next();
if (threadinfo[thread].priority>maxpriority)
maxpriority=threadinfo[thread].priority;
}
- if (maxpriority>threadinfo[ev.getThread()].priority) {
- //we lose
- threadinfo[ev.getThread()].priority++;
+ if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
//stall for a little while
- stall(ev, time+20);
+ threadinfo[ev.getThread()].priority--;
+ retrycount[ev.getThread()]++;
+ int rtime=r.nextInt(3000);
+ stall(ev, time, rtime);
return false;
} else {
//we win
for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
Integer thread=(Integer)thit.next();
- reschedule(thread, time);
+ int dback=backoff[thread]*2;
+ if (dback>0)
+ backoff[thread]=dback;
+ int atime=r.nextInt(backoff[thread]);
+ timewasted(thread, time);
+ reschedule(thread, time, atime);
abortcount++;
}
return true;
if (threadinfo[thread].priority>maxpriority)
maxpriority=threadinfo[thread].priority;
}
- if (maxpriority>threadinfo[ev.getThread()].priority) {
+ if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
//we lose
+ threadinfo[ev.getThread()].priority--;
//stall for a little while
- stall(ev, time);
+ int rtime=r.nextInt(3000);
+ stall(ev, time, rtime);
int ourpriority=threadinfo[ev.getThread()].priority;
+ ourpriority-=transferred[ev.getThread()];
for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
Integer thread=(Integer)thit.next();
- threadinfo[thread].priority+=ourpriority;;
+ threadinfo[thread].priority+=ourpriority;
}
+ transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
+ retrycount[ev.getThread()]++;
+
return false;
} else {
//we win
for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
Integer thread=(Integer)thit.next();
- reschedule(thread, time);
+ int dback=backoff[thread]*2;
+ if (dback>0)
+ backoff[thread]=dback;
+ int atime=r.nextInt(backoff[thread]);
+ timewasted(thread, time);
+ reschedule(thread, time, atime);
abortcount++;
}
return true;
}
} else if (policy==POLITE) {
- int retry=retrycount[ev.getThread()]++;
- if (retry==22) {
+ int retry=(++retrycount[ev.getThread()]);
+ if (retry>=22) {
+ retrycount[ev.getThread()]=0;
for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
Integer thread=(Integer)thit.next();
- reschedule(thread, time);
- int dback=backoff[thread.intValue()]*2;
- if (dback>0)
- backoff[thread.intValue()]=dback;
+ timewasted(thread, time);
+ reschedule(thread, time, 0);
abortcount++;
}
return true;
} else {
//otherwise stall
- stall(ev, time+r.nextInt((1<<retry)*12));
+ int stalltime=(1<<(retry-1))*12;
+ if (stalltime<0)
+ stalltime=1<<30;
+ stall(ev, time, r.nextInt(stalltime));
return false;
}
} else if (policy==ATTACK) {
for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
Integer thread=(Integer)thit.next();
- reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
+ timewasted(thread, time);
+ reschedule(thread, time, r.nextInt(backoff[thread.intValue()]));
int dback=backoff[thread.intValue()]*2;
if (dback>0)
backoff[thread.intValue()]=dback;
}
return true;
} else if (policy==SUICIDE) {
- reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
+ timewasted(ev.getThread(), time);
+ reschedule(ev.getThread(), time, r.nextInt(backoff[ev.getThread()]));
int dback=backoff[ev.getThread()]*2;
if (dback>0)
backoff[ev.getThread()]=dback;
}
if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
//kill ourself
- reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
+ timewasted(ev.getThread(), time);
+ reschedule(ev.getThread(), time, 0);
+ abortcount++;
+ return false;
+ } else {
+ //kill the opponents
+ for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
+ Integer thread=(Integer)thit.next();
+ timewasted(thread, time);
+ reschedule(thread, time, 0);
+ abortcount++;
+ }
+ return true;
+ }
+ } else if (policy==THREAD) {
+ long tid=1000;
+
+ for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
+ Integer thread=(Integer)thit.next();
+ Event other=currentevents[thread.intValue()];
+ int eventnum=other.getEvent();
+ long otid=normalize(thread.intValue());
+ if (tid>otid)
+ tid=otid;
+ }
+ if (normalize(ev.getThread())>tid) {
+ //kill ourself
+ timewasted(ev.getThread(), time);
+ reschedule(ev.getThread(), time, 0);
abortcount++;
return false;
} else {
//kill the opponents
for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
Integer thread=(Integer)thit.next();
- reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
+ timewasted(thread, time);
+ reschedule(thread, time, 0);
abortcount++;
}
return true;
}
+ } else if (policy==ATTACKTIME) {
+ boolean timebased=false;
+ int tev=ev.getThread();
+ timebased|=threadinfo[tev].aborted;
+ for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
+ Integer thread=(Integer)thit.next();
+ timebased|=threadinfo[thread.intValue()].aborted;
+ }
+ if (timebased) {
+ long opponenttime=0;
+
+ for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
+ Integer thread=(Integer)thit.next();
+ Event other=currentevents[thread.intValue()];
+ int eventnum=other.getEvent();
+ long otime=other.getTransaction().getTime(other.getEvent());
+ if (otime>opponenttime)
+ opponenttime=otime;
+ }
+ if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
+ //kill ourself
+ timewasted(ev.getThread(), time);
+ reschedule(ev.getThread(), time, 0);
+ threadinfo[ev.getThread()].aborted=true;
+ abortcount++;
+ return false;
+ } else {
+ //kill the opponents
+ for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
+ Integer thread=(Integer)thit.next();
+ timewasted(thread, time);
+ reschedule(thread, time, 0);
+ threadinfo[thread.intValue()].aborted=true;
+ abortcount++;
+ }
+ return true;
+ }
+ } else {
+ for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
+ Integer thread=(Integer)thit.next();
+ timewasted(thread, time);
+ reschedule(thread, time, 0);
+ threadinfo[thread.intValue()].aborted=true;
+ abortcount++;
+ }
+ return true;
+ }
+ } else if (policy==ATTACKTHREAD) {
+ boolean threadbased=false;
+ int tev=ev.getThread();
+ threadbased|=threadinfo[tev].aborted;
+ for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
+ Integer thread=(Integer)thit.next();
+ threadbased|=threadinfo[thread.intValue()].aborted;
+ }
+ if (threadbased) {
+ long opponentthr=1000;
+
+ for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
+ Integer thread=(Integer)thit.next();
+ Event other=currentevents[thread.intValue()];
+ int eventnum=other.getEvent();
+ long othr=thread.intValue();
+ if (othr<opponentthr)
+ opponentthr=othr;
+ }
+ if (opponentthr<tev) {
+ //kill ourself
+ timewasted(ev.getThread(), time);
+ reschedule(ev.getThread(), time, 0);
+ threadinfo[ev.getThread()].aborted=true;
+ abortcount++;
+ return false;
+ } else {
+ //kill the opponents
+ for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
+ Integer thread=(Integer)thit.next();
+ timewasted(thread, time);
+ reschedule(thread, time, 0);
+ threadinfo[thread.intValue()].aborted=true;
+ abortcount++;
+ }
+ return true;
+ }
+ } else {
+ for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
+ Integer thread=(Integer)thit.next();
+ timewasted(thread, time);
+ reschedule(thread, time, 0);
+ threadinfo[thread.intValue()].aborted=true;
+ abortcount++;
+ }
+ return true;
+ }
}
//Not eager
//do eager contention management
Set conflicts=rdConflictSet(ev.getThread(), object);
if (conflicts!=null) {
- if (!handleConflicts(ev, conflicts, currtime))
+ if (!handleConflicts(ev, conflicts, currtime)) {
+ ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
return;
+ }
}
}
} else if (operation==Transaction.WRITE) {
if (isEager()) {
Set conflicts=wrConflictSet(ev.getThread(), object);
if (conflicts!=null) {
- if (!handleConflicts(ev, conflicts, currtime))
+ if (!handleConflicts(ev, conflicts, currtime)) {
+ ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
return;
+ }
}
}
} else if (operation==Transaction.BARRIER) {
}
return;
}
-
+ retrycount[ev.getThread()]=0;
+ transferred[ev.getThread()]=0;
//enqueue the next event
long deltatime=trans.getTime(event+1)-trans.getTime(event);
Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
Transaction t;
int threadid;
int transnum;
+ boolean stalled;
+
+ public boolean isStalled() {
+ return stalled;
+ }
+
+ public void setStall() {
+ stalled=true;
+ }
public void makeInvalid() {
valid=false;