r=new Random(100);
eq=new PriorityQueue();
backoff=new int[e.numThreads()];
+ objtoinfo=new Hashtable();
+ threadinfo=new ThreadInfo[e.numThreads()];
+ blocked=new boolean[e.numThreads()];
+
for(int i=0;i<e.numThreads();i++) {
- backoff[i]=1;
+ backoff[i]=BACKOFFSTART;
+ threadinfo[i]=new ThreadInfo(this);
}
}
+ //Where to start the backoff delay at
+ public static final int BACKOFFSTART=1;
+
+ //Commit options
public static final int LAZY=0;
public static final int COMMIT=1;
public static final int ATTACK=2;
public static final int POLITE=3;
public static final int KARMA=4;
+ public static final int LOCK=5;
PriorityQueue eq;
int policy;
Event[] currentevents;
Random r;
int[] backoff;
+ Hashtable objtoinfo;
+ ThreadInfo[] threadinfo;
+
+ boolean[] blocked;
public boolean isEager() {
return policy==ATTACK||policy==POLITE||policy==KARMA;
}
+ public boolean isLock() {
+ return policy==LOCK;
+ }
+
public int getAborts() {
return abortcount;
}
//Remove everything we put in object sets
for(int i=0;i<trans.numEvents();i++) {
int object=trans.getObject(i);
- if (object!=-1&&rdobjmap.containsKey(new Integer(object))) {
- ((Set)rdobjmap.get(new Integer(object))).remove(new Integer(ev.getThread()));
+ Integer obj=new Integer(object);
+ if (object!=-1&&rdobjmap.containsKey(obj)) {
+ ((Set)rdobjmap.get(obj)).remove(new Integer(ev.getThread()));
}
- if (object!=-1&&wrobjmap.containsKey(new Integer(object))) {
- ((Set)wrobjmap.get(new Integer(object))).remove(new Integer(ev.getThread()));
+ if (object!=-1&&wrobjmap.containsKey(obj)) {
+ ((Set)wrobjmap.get(obj)).remove(new Integer(ev.getThread()));
+ }
+ if (object!=-1&&objtoinfo.containsKey(obj)) {
+ ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
+ if (oi.getOwner()==ev.getThread()) {
+ oi.releaseOwner();
+
+ //wake up one waiter
+ for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
+ //requeue everyone who was waiting on us and start them back up
+ Event waiter=(Event)waitit.next();
+ waitit.remove();
+ waiter.setTime(currtime);
+ threadinfo[waiter.getThread()].setStall(false);
+ eq.add(waiter);
+ break;
+ }
+
+ }
}
}
- //See if we have been flagged as aborted
+ //See if we have been flagged as aborted for the lazy case
boolean abort=aborted[ev.getThread()];
aborted[ev.getThread()]=false;
if (!abort) {
if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
commitcount++;
}
- backoff[ev.getThread()]=1;
+ //Reset our backoff counter
+ backoff[ev.getThread()]=BACKOFFSTART;
+
+
//abort the other threads
for(int i=0;i<trans.numEvents();i++) {
int object=trans.getObject(i);
}
for(Iterator abit=abortset.iterator();abit.hasNext();) {
Integer threadid=(Integer)abit.next();
- if (policy==LAZY) {
+ if (policy==LAZY||policy==LOCK) {
aborted[threadid]=true;
} else if (policy==COMMIT) {
reschedule(threadid, currtime);
//Takes as parameter -- current transaction read event ev, conflicting
//set of threads, and the current time
+ //Returning false causes current transaction not continue to be scheduled
public boolean handleConflicts(Event ev, Set threadstokill, int time) {
if (policy==ATTACK) {
if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
//kill ourself
reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
- backoff[ev.getThread()]*=2;
abortcount++;
return false;
} else {
for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
Integer thread=(Integer)thit.next();
reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
- backoff[thread.intValue()]*=2;
abortcount++;
}
return true;
return true;
}
+ //Handle current event (read, write, delay) in a transaction and
+ //enqueue the next one
+
public void enqueueEvent(Event ev, Transaction trans) {
//just enqueue next event
int event=ev.getEvent();
//process the current event
if (operation==Transaction.READ) {
Integer obj=new Integer(object);
+
+ //check for lock based approach
+ if (isLock()) {
+ if (!objtoinfo.containsKey(obj)) {
+ objtoinfo.put(obj, new ObjectInfo(this));
+ }
+ ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
+ if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
+ //we're going to wait
+ if (!threadinfo[oi.getOwner()].isStalled()) {
+ //don't wait on stalled threads, we could deadlock
+ threadinfo[ev.getThread()].setStall(true);
+ oi.addWaiter(ev);
+ return;
+ }
+ } else {
+ //we have object
+ oi.setOwner(ev.getThread());
+ }
+ }
+
+ //record read event
if (!rdobjmap.containsKey(obj))
rdobjmap.put(obj,new HashSet());
((Set)rdobjmap.get(obj)).add(new Integer(ev.getThread()));
if (isEager()) {
+ //do eager contention management
Set conflicts=rdConflictSet(ev.getThread(), object);
if (conflicts!=null)
if (!handleConflicts(ev, conflicts, currtime))
}
} else if (operation==Transaction.WRITE) {
Integer obj=new Integer(object);
+
+ //grab lock
+ if (isLock()) {
+ if (!objtoinfo.containsKey(obj)) {
+ objtoinfo.put(obj, new ObjectInfo(this));
+ }
+ ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
+ if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
+ //we're going to wait
+ if (!threadinfo[oi.getOwner()].isStalled()) {
+ //don't wait on stalled threads, we could deadlock
+ threadinfo[ev.getThread()].setStall(true);
+ oi.addWaiter(ev);
+ return;
+ }
+ } else {
+ //we have object
+ oi.setOwner(ev.getThread());
+ }
+ }
+
+ //record write event
if (!wrobjmap.containsKey(obj))
wrobjmap.put(obj,new HashSet());
((Set)wrobjmap.get(obj)).add(new Integer(ev.getThread()));
+
if (isEager()) {
Set conflicts=wrConflictSet(ev.getThread(), object);
if (conflicts!=null)
public int getTime() {
return time;
}
+
+ public void setTime(int time) {
+ this.time=time;
+ }
public int getThread() {
return threadid;
valid=true;
}
+ //break ties to allow commits to occur earliest
public int compareTo(Object o) {
Event e=(Event)o;
int delta=time-e.time;
return 0;
}
}
-
-
-
}
\ No newline at end of file