Thread local hack to make transRead faster...
authorbdemsky <bdemsky>
Fri, 27 Feb 2009 04:34:51 +0000 (04:34 +0000)
committerbdemsky <bdemsky>
Fri, 27 Feb 2009 04:34:51 +0000 (04:34 +0000)
1) make transrecords threadlocal so we don't pass them around anymore
2) inline the clookup into the code...call transread only if we miss on the transaction
cache

This doesn't help much for the current benchmark set, but should make a significant
difference if benchmarks read fields a lot.

14 files changed:
Robust/src/IR/Flat/BuildCode.java
Robust/src/Runtime/DSTM/interface/abortreaders.c
Robust/src/Runtime/DSTM/interface/abortreaders.h
Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c
Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.h
Robust/src/Runtime/DSTM/interface/clookup.c
Robust/src/Runtime/DSTM/interface/clookup.h
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/trans.c
Robust/src/Runtime/DSTM/interface/trans.h [new file with mode: 0644]
Robust/src/Runtime/garbage.c
Robust/src/Runtime/runtime.c
Robust/src/Runtime/runtime.h
Robust/src/Runtime/thread.c

index 0d224a062d7667f0e709360caffb95eaff0414c3..0e69ab9155279f1616b63817d2e73278cda4f666 100644 (file)
@@ -1258,13 +1258,6 @@ public class BuildCode {
       printcomma=true;
     }
 
-    if (state.DSM&&lb.isAtomic()&&!md.getModifiers().isNative()) {
-      if (printcomma)
-       headersout.print(", ");
-      headersout.print("transrecord_t * trans");
-      printcomma=true;
-    }
-
     /*  Output parameter list*/
     for(int i=0; i<objectparams.numPrimitives(); i++) {
       TempDescriptor temp=objectparams.getPrimitive(i);
@@ -1358,9 +1351,6 @@ public class BuildCode {
     ParamsObject objectparams=(ParamsObject)paramstable.get(lb!=null ? lb : md!=null ? md : task);
     generateHeader(fm, lb, md!=null ? md : task,output);
     TempObject objecttemp=(TempObject) tempstable.get(lb!=null ? lb : md!=null ? md : task);
-    if (state.DSM&&lb.getHasAtomic()) {
-      output.println("transrecord_t * trans;");
-    }
 
     if (GENERATEPRECISEGC) {
       if (md!=null&&state.DSM)
@@ -1397,7 +1387,7 @@ public class BuildCode {
 
     if ((state.THREAD||state.DSM)&&GENERATEPRECISEGC) {
       if (state.DSM&&lb.isAtomic())
-       output.println("if (needtocollect) checkcollect2(&"+localsprefix+",trans);");
+       output.println("if (needtocollect) checkcollect2(&"+localsprefix+");");
       else
        output.println("if (needtocollect) checkcollect(&"+localsprefix+");");
     }
@@ -1587,7 +1577,7 @@ public class BuildCode {
     case FKind.FlatBackEdge:
       if ((state.THREAD||state.DSM)&&GENERATEPRECISEGC) {
        if(state.DSM&&locality.getAtomic(lb).get(fn).intValue()>0) {
-         output.println("if (needtocollect) checkcollect2(&"+localsprefix+",trans);");
+         output.println("if (needtocollect) checkcollect2(&"+localsprefix+");");
        } else
          output.println("if (needtocollect) checkcollect(&"+localsprefix+");");
       } else
@@ -1794,7 +1784,7 @@ public class BuildCode {
       return;
     /* Have to generate flat globalconv */
     if (fgcn.getMakePtr()) {
-      output.println(generateTemp(fm, fgcn.getSrc(),lb)+"=(void *)transRead(trans, (unsigned int) "+generateTemp(fm, fgcn.getSrc(),lb)+");");
+      output.println("TRANSREAD("+generateTemp(fm, fgcn.getSrc(),lb)+", (unsigned int) "+generateTemp(fm, fgcn.getSrc(),lb)+");");
     } else {
       /* Need to convert to OID */
       if (fgcn.doConvert()) {
@@ -1853,11 +1843,10 @@ public class BuildCode {
     /******* Tell the runtime to start the transaction *******/
 
     output.println("transstart"+faen.getIdentifier()+":");
-    output.println("trans=transStart();");
+    output.println("transStart();");
     
     if (state.ABORTREADERS) {
-      output.println("if (_setjmp(trans->aborttrans)) {");
-      output.println("  free(trans);");
+      output.println("if (_setjmp(aborttrans)) {");
       output.println("  goto transretry"+faen.getIdentifier()+"; }");
     }
   }
@@ -1868,8 +1857,8 @@ public class BuildCode {
       return;
     //store the revert list before we lose the transaction object
     String revertptr=generateTemp(fm, reverttable.get(lb),lb);
-    output.println(revertptr+"=trans->revertlist;");
-    output.println("if (transCommit(trans)) {");
+    output.println(revertptr+"=revertlist;");
+    output.println("if (transCommit()) {");
     /* Transaction aborts if it returns true */
     output.println("goto transretry"+faen.getAtomicEnter().getIdentifier()+";");
     output.println("} else {");
@@ -1999,13 +1988,6 @@ public class BuildCode {
          output.print(temp.getType().getSafeSymbol());
       }
 
-      if (state.DSM&&locality.getBinding(lb,fc).isAtomic()&&!fc.getMethod().getModifiers().isNative()) {
-       LocalityBinding fclb=locality.getBinding(lb, fc);
-       if (printcomma)
-         output.print(", ");
-       output.print("transrecord_t *");
-       printcomma=true;
-      }
 
       if (state.DSM) {
        LocalityBinding fclb=locality.getBinding(lb, fc);
@@ -2021,13 +2003,6 @@ public class BuildCode {
       needcomma=true;
     }
 
-    if (state.DSM&&locality.getBinding(lb,fc).isAtomic()&&!fc.getMethod().getModifiers().isNative()) {
-      if (needcomma)
-       output.print(",");
-      output.print("trans");
-      needcomma=true;
-    }
-
     if (!GENERATEPRECISEGC) {
       if (fc.getThis()!=null) {
        TypeDescriptor ptd=md.getThis().getType();
@@ -2098,7 +2073,7 @@ public class BuildCode {
          //} else {
          output.println(dst+"="+ src +"->"+field+ ";");
          //output.println("if ("+dst+"&0x1) {");
-         output.println(dst+"=(void *) transRead(trans, (unsigned int) "+dst+");");
+         output.println("TRANSREAD("+dst+", (unsigned int) "+dst+");");
          //output.println(src+"->"+field+"="+src+"->"+field+";");
          //output.println("}");
          //}
@@ -2113,7 +2088,7 @@ public class BuildCode {
          String dst=generateTemp(fm, ffn.getDst(),lb);
          output.println(dst+"="+ src +"->"+field+ ";");
          if (locality.getAtomic(lb).get(ffn).intValue()>0)
-           output.println(dst+"=(void *) transRead(trans, (unsigned int) "+dst+");");
+           output.println("TRANSREAD("+dst+", (unsigned int) "+dst+");");
        } else
          output.println(generateTemp(fm, ffn.getDst(),lb)+"="+ generateTemp(fm,ffn.getSrc(),lb)+"->"+ ffn.getField().getSafeSymbol()+";");
       } else if (status==LocalityAnalysis.EITHER) {
@@ -2158,13 +2133,13 @@ public class BuildCode {
        output.println("if(!"+dst+"->"+localcopystr+") {");
        /* Link object into list */
        String revertptr=generateTemp(fm, reverttable.get(lb),lb);
-       output.println(revertptr+"=trans->revertlist;");
+       output.println(revertptr+"=revertlist;");
        if (GENERATEPRECISEGC)
          output.println("COPY_OBJ((struct garbagelist *)&"+localsprefix+",(struct ___Object___ *)"+dst+");");
        else
          output.println("COPY_OBJ("+dst+");");
        output.println(dst+"->"+nextobjstr+"="+revertptr+";");
-       output.println("trans->revertlist=(struct ___Object___ *)"+dst+";");
+       output.println("revertlist=(struct ___Object___ *)"+dst+";");
        output.println("}");
        if (srcglobal)
          output.println(dst+"->"+ fsfn.getField().getSafeSymbol()+"=srcoid;");
@@ -2221,7 +2196,7 @@ public class BuildCode {
 
        if (elementtype.isPtr()) {
          output.println(dst +"=(("+ type+"*)(((char *) &("+ generateTemp(fm,fen.getSrc(),lb)+"->___length___))+sizeof(int)))["+generateTemp(fm, fen.getIndex(),lb)+"];");
-         output.println(dst+"=(void *) transRead(trans, (unsigned int) "+dst+");");
+         output.println("TRANSREAD("+dst+", "+dst+");");
        } else {
          output.println(dst +"=(("+ type+"*)(((char *) &("+ generateTemp(fm,fen.getSrc(),lb)+"->___length___))+sizeof(int)))["+generateTemp(fm, fen.getIndex(),lb)+"];");
        }
@@ -2274,13 +2249,13 @@ public class BuildCode {
        output.println("if(!"+dst+"->"+localcopystr+") {");
        /* Link object into list */
        String revertptr=generateTemp(fm, reverttable.get(lb),lb);
-       output.println(revertptr+"=trans->revertlist;");
+       output.println(revertptr+"=revertlist;");
        if (GENERATEPRECISEGC)
          output.println("COPY_OBJ((struct garbagelist *)&"+localsprefix+",(struct ___Object___ *)"+dst+");");
        else
          output.println("COPY_OBJ("+dst+");");
        output.println(dst+"->"+nextobjstr+"="+revertptr+";");
-       output.println("trans->revertlist=(struct ___Object___ *)"+dst+";");
+       output.println("revertlist=(struct ___Object___ *)"+dst+";");
        output.println("}");
       } else throw new Error("Unknown array type");
       if (srcglobal) {
@@ -2313,12 +2288,12 @@ public class BuildCode {
     if (state.DSM && locality.getAtomic(lb).get(fn).intValue()>0&&!fn.isGlobal()) {
       //Stash pointer in case of GC
       String revertptr=generateTemp(fm, reverttable.get(lb),lb);
-      output.println(revertptr+"=trans->revertlist;");
+      output.println(revertptr+"=revertlist;");
     }
     if (fn.getType().isArray()) {
       int arrayid=state.getArrayNumber(fn.getType())+state.numClasses();
       if (fn.isGlobal()) {
-       output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newarrayglobal(trans, "+arrayid+", "+generateTemp(fm, fn.getSize(),lb)+");");
+       output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newarrayglobal("+arrayid+", "+generateTemp(fm, fn.getSize(),lb)+");");
       } else if (GENERATEPRECISEGC) {
        output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newarray(&"+localsprefix+", "+arrayid+", "+generateTemp(fm, fn.getSize(),lb)+");");
       } else {
@@ -2326,7 +2301,7 @@ public class BuildCode {
       }
     } else {
       if (fn.isGlobal()) {
-       output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newglobal(trans, "+fn.getType().getClassDesc().getId()+");");
+       output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_newglobal("+fn.getType().getClassDesc().getId()+");");
       } else if (GENERATEPRECISEGC) {
        output.println(generateTemp(fm,fn.getDst(),lb)+"=allocate_new(&"+localsprefix+", "+fn.getType().getClassDesc().getId()+");");
       } else {
@@ -2338,7 +2313,7 @@ public class BuildCode {
       String dst=generateTemp(fm,fn.getDst(),lb);
       output.println(dst+"->___localcopy___=(struct ___Object___*)1;");
       output.println(dst+"->"+nextobjstr+"="+revertptr+";");
-      output.println("trans->revertlist=(struct ___Object___ *)"+dst+";");
+      output.println("revertlist=(struct ___Object___ *)"+dst+";");
     }
     if (state.FASTCHECK) {
       String dst=generateTemp(fm,fn.getDst(),lb);
@@ -2410,13 +2385,13 @@ public class BuildCode {
        if (state.DSM && locality.getAtomic(lb).get(fln).intValue()>0) {
          //Stash pointer in case of GC
          String revertptr=generateTemp(fm, reverttable.get(lb),lb);
-         output.println(revertptr+"=trans->revertlist;");
+         output.println(revertptr+"=revertlist;");
        }
        output.println(generateTemp(fm, fln.getDst(),lb)+"=NewString(&"+localsprefix+", \""+FlatLiteralNode.escapeString((String)fln.getValue())+"\","+((String)fln.getValue()).length()+");");
        if (state.DSM && locality.getAtomic(lb).get(fln).intValue()>0) {
          //Stash pointer in case of GC
          String revertptr=generateTemp(fm, reverttable.get(lb),lb);
-         output.println("trans->revertlist="+revertptr+";");
+         output.println("revertlist="+revertptr+";");
        }
       } else {
        output.println(generateTemp(fm, fln.getDst(),lb)+"=NewString(\""+FlatLiteralNode.escapeString((String)fln.getValue())+"\","+((String)fln.getValue()).length()+");");
@@ -2493,13 +2468,6 @@ public class BuildCode {
       printcomma=true;
     }
 
-    if (state.DSM&&lb.isAtomic()) {
-      if (printcomma)
-       output.print(", ");
-      output.print("transrecord_t * trans");
-      printcomma=true;
-    }
-
     if (md!=null) {
       /* Method */
       for(int i=0; i<objectparams.numPrimitives(); i++) {
index 860655b5e3f61a1b9b833b3979ef4ab3e8b82374..a1bcd360556dcc19b271f4452d64f7075a14ef3a 100644 (file)
@@ -11,7 +11,7 @@ void initreaderlist() {
   freelist=NULL;
 }
 
-void addtransaction(unsigned int oid, struct transrecord * trans) {
+void addtransaction(unsigned int oid) {
   struct readerlist * rl;
   int i;
   if (pthread_mutex_trylock(&aborttablelock)!=0)
@@ -38,7 +38,7 @@ void addtransaction(unsigned int oid, struct transrecord * trans) {
   rl->numreaders++;
   for(i=0;i<READERSIZE;i++) {
     if (rl->array[i]==NULL) {
-      rl->array[i]=trans;
+      rl->array[i]=&t_abort;
       pthread_mutex_unlock(&aborttablelock);
       return;
     }
@@ -60,9 +60,9 @@ void removetransaction(unsigned int oidarray[], unsigned int numoids) {
       int count=rl->numreaders;
       int j;
       for(j=0;count;j++) {
-       struct transrecord *trans=rl->array[j];
-       if (trans!=NULL) {
-         trans->abort=1;//It's okay to set our own abort flag...it is
+       int *t_abort=rl->array[j];
+       if (t_abort!=NULL) {
+         *t_abort=1;//It's okay to set our own abort flag...it is
                         //too late to abort us
          count--;
        }
@@ -76,7 +76,7 @@ void removetransaction(unsigned int oidarray[], unsigned int numoids) {
   pthread_mutex_unlock(&aborttablelock);
 }
 
-void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids, struct transrecord * trans) {
+void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids) {
   int i,j;
   pthread_mutex_lock(&aborttablelock);
   for(i=0;i<numoids;i++) {
@@ -86,7 +86,7 @@ void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids,
     oidverread+=(sizeof(unsigned int)+sizeof(unsigned short));
     while(rl!=NULL) {
       for(j=0;j<READERSIZE;j++) {
-       if (rl->array[j]==trans) {
+       if (rl->array[j]==&t_abort) {
          rl->array[j]=NULL;
          if ((--rl->numreaders)==0) {
            if (first==rl) {
@@ -113,12 +113,11 @@ void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids,
   pthread_mutex_unlock(&aborttablelock);
 }
 
-void removetransactionhash(chashtable_t *table, struct transrecord *trans) {
-  chashlistnode_t *ptr=table->table;
-  unsigned int size=table->size;
+void removetransactionhash() {
+  chashlistnode_t *ptr=c_table;
   int i,j;
   pthread_mutex_lock(&aborttablelock);
-  for(i=0;i<size;i++) {
+  for(i=0;i<c_size;i++) {
     chashlistnode_t *curr=&ptr[i];
     do {
       unsigned int oid=curr->key;
@@ -128,7 +127,7 @@ void removetransactionhash(chashtable_t *table, struct transrecord *trans) {
       struct readerlist *first=rl;
       while(rl!=NULL) {
        for(j=0;j<READERSIZE;j++) {
-         if (rl->array[j]==trans) {
+         if (rl->array[j]==&t_abort) {
            rl->array[j]=NULL;
            if ((--rl->numreaders)==0) {
              if (first==rl) {
@@ -157,7 +156,7 @@ void removetransactionhash(chashtable_t *table, struct transrecord *trans) {
 }
 
 
-void removethistransaction(unsigned int oidarray[], unsigned int numoids, struct transrecord * trans) {
+void removethistransaction(unsigned int oidarray[], unsigned int numoids) {
   int i,j;
   pthread_mutex_lock(&aborttablelock);
   for(i=0;i<numoids;i++) {
@@ -167,7 +166,7 @@ void removethistransaction(unsigned int oidarray[], unsigned int numoids, struct
     struct readerlist *first=rl;
     while(rl!=NULL) {
       for(j=0;j<READERSIZE;j++) {
-       if (rl->array[j]==trans) {
+       if (rl->array[j]==&t_abort) {
          rl->array[j]=NULL;
          if ((--rl->numreaders)==0) {
            if (first==rl) {
index a3801cc230fca006c4cc3331140aee04e639e345..90fe2279d0249369a3f3ef66f02980bb0a918e9c 100644 (file)
@@ -5,15 +5,15 @@
 #define READERSIZE 8
 
 struct readerlist {
-  struct transrecord *array[READERSIZE];
+  int *array[READERSIZE];
   int numreaders;
   struct readerlist * next;
 };
 
 void initreaderlist();
-void addtransaction(unsigned int oid, struct transrecord * trans);
+void addtransaction(unsigned int oid);
 void removetransaction(unsigned int oidarray[], unsigned int numoids);
-void removethistransaction(unsigned int oidarray[], unsigned int numoids, struct transrecord * trans);
-void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids, struct transrecord * trans);
-void removetransactionhash(chashtable_t *table, struct transrecord *trans);
+void removethistransaction(unsigned int oidarray[], unsigned int numoids);
+void removethisreadtransaction(unsigned char* oidverread, unsigned int numoids);
+void removetransactionhash();
 #endif
index fedcb919d27eb04c23cbc37182245803d3d59c0a..ca7c845a0dec9c2235e665ef0e3a52cf58e5673b 100644 (file)
@@ -60,10 +60,9 @@ void handleDynPrefetching(int numLocal, int ntuples, int siteid) {
 #if 1
 /* This function clears from prefetch cache those
  * entries that caused a transaction abort */
-void cleanPCache(transrecord_t *record) {
-  transrecord_t *rec = record;
-  unsigned int size = rec->lookupTable->size;
-  chashlistnode_t *ptr = rec->lookupTable->table;
+void cleanPCache() {
+  unsigned int size = c_size;
+  chashlistnode_t *ptr = c_table;
   int i;
   for(i = 0; i < size; i++) {
     chashlistnode_t *curr = &ptr[i]; //for each entry in the cache lookupTable
@@ -83,10 +82,9 @@ void cleanPCache(transrecord_t *record) {
 #else
 /* This function clears from prefetch cache those
  * entries that caused a transaction abort */
-void cleanPCache(transrecord_t *record) {
-  transrecord_t *rec = record;
-  unsigned int size = rec->lookupTable->size;
-  struct chashentry *ptr = rec->lookupTable->table;
+void cleanPCache() {
+  unsigned int size = c_size;
+  struct chashentry *ptr = c_table;
   int i;
   for(i = 0; i < size; i++) {
     struct chashentry *curr = &ptr[i]; //for each entry in the cache lookupTable
@@ -106,19 +104,19 @@ void cleanPCache(transrecord_t *record) {
  * entries from the transaction cache when a
  * transaction commits
  * Return -1 on error else returns 0 */
-int updatePrefetchCache(trans_req_data_t *tdata, transrecord_t *rec) {
+int updatePrefetchCache(trans_req_data_t *tdata) {
   int retval;
   char oidType;
   oidType = 'R';
   if(tdata->f.numread > 0) {
-    if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), rec, oidType)) != 0) {
+    if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), oidType)) != 0) {
       printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
       return -1;
     }
   }
   if(tdata->f.nummod > 0) {
     oidType = 'M';
-    if((retval = copyToCache(tdata->f.nummod, tdata->oidmod, rec, oidType)) != 0) {
+    if((retval = copyToCache(tdata->f.nummod, tdata->oidmod, oidType)) != 0) {
       printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
       return -1;
     }
@@ -126,7 +124,7 @@ int updatePrefetchCache(trans_req_data_t *tdata, transrecord_t *rec) {
   return 0;
 }
 
-int copyToCache(int numoid, unsigned int *oidarray, transrecord_t *rec, char oidType) {
+int copyToCache(int numoid, unsigned int *oidarray, char oidType) {
   int i;
   for (i = 0; i < numoid; i++) {
     unsigned int oid;
@@ -139,7 +137,7 @@ int copyToCache(int numoid, unsigned int *oidarray, transrecord_t *rec, char oid
     }
     pthread_mutex_lock(&prefetchcache_mutex);
     objheader_t * header;
-    if((header = (objheader_t *) chashSearch(rec->lookupTable, oid)) == NULL) {
+    if((header = (objheader_t *) t_chashSearch(oid)) == NULL) {
       printf("%s() obj %x is no longer in transaction cache at %s , %d\n", __func__, oid,__FILE__, __LINE__);
       fflush(stdout);
       return -1;
index 93dd853ead907ca3c9b7fd095763c1241a10b9a2..44c87049fedbb4e2e7f29bda53c84a1883b1d2fe 100644 (file)
@@ -17,8 +17,8 @@ int getRetryCount(int siteid);
 int getUselessCount(int siteid);
 char getOperationMode(int);
 void handleDynPrefetching(int, int, int);
-void cleanPCache(transrecord_t *);
-int updatePrefetchCache(trans_req_data_t *, transrecord_t *);
-int copyToCache(int, unsigned int *, transrecord_t *rec, char);
+void cleanPCache();
+int updatePrefetchCache(trans_req_data_t *);
+int copyToCache(int, unsigned int *, char);
 
 #endif
index 05d9401e2ec9d7b8a9155911025ff0ceb7528283..28dda70287a7fd091607f5465fc1f2ce5cccb83a 100644 (file)
@@ -1,5 +1,27 @@
 #include "clookup.h"
-#define INLINE    inline __attribute__((always_inline))
+
+__thread chashlistnode_t *c_table;
+__thread unsigned int c_size;
+__thread unsigned int c_mask;
+__thread unsigned int c_numelements;
+__thread unsigned int c_threshold;
+__thread double c_loadfactor;
+
+void t_chashCreate(unsigned int size, double loadfactor) {
+  chashtable_t *ctable;
+  chashlistnode_t *nodes;
+  int i;
+
+  // Allocate space for the hash table
+  
+
+  c_table = calloc(size, sizeof(chashlistnode_t));
+  c_loadfactor = loadfactor;
+  c_size = size;
+  c_threshold=size*loadfactor;
+  c_mask = (size << 1)-1;
+  c_numelements = 0; // Initial number of elements in the hash
+}
 
 chashtable_t *chashCreate(unsigned int size, double loadfactor) {
   chashtable_t *ctable;
@@ -38,6 +60,7 @@ static INLINE unsigned int chashFunction(chashtable_t *table, unsigned int key)
 void chashInsert(chashtable_t *table, unsigned int key, void *val) {
   chashlistnode_t *ptr;
 
+
   if(table->numelements > (table->threshold)) {
     //Resize
     unsigned int newsize = table->size << 1;
@@ -74,6 +97,47 @@ INLINE void * chashSearch(chashtable_t *table, unsigned int key) {
   return NULL;
 }
 
+//Store objects and their pointers into hash
+void t_chashInsert(unsigned int key, void *val) {
+  chashlistnode_t *ptr;
+
+
+  if(c_numelements > (c_threshold)) {
+    //Resize
+    unsigned int newsize = c_size << 1;
+    t_chashResize(newsize);
+  }
+
+  ptr = &c_table[(key&c_mask)>>1];
+  c_numelements++;
+
+  if(ptr->key==0) {
+    ptr->key=key;
+    ptr->val=val;
+  } else { // Insert in the beginning of linked list
+    chashlistnode_t * node = calloc(1, sizeof(chashlistnode_t));
+    node->key = key;
+    node->val = val;
+    node->next = ptr->next;
+    ptr->next=node;
+  }
+}
+
+// Search for an address for a given oid
+INLINE void * t_chashSearch(unsigned int key) {
+  //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE
+  chashlistnode_t *node = &c_table[(key & c_mask)>>1];
+
+  do {
+    if(node->key == key) {
+      return node->val;
+    }
+    node = node->next;
+  } while(node != NULL);
+
+  return NULL;
+}
+
 unsigned int chashRemove(chashtable_t *table, unsigned int key) {
   return chashRemove2(table, key)==NULL;
 
@@ -179,6 +243,70 @@ unsigned int chashResize(chashtable_t *table, unsigned int newsize) {
   return 0;
 }
 
+unsigned int t_chashResize(unsigned int newsize) {
+  chashlistnode_t *node, *ptr, *curr;    // curr and next keep track of the current and the next chashlistnodes in a linked list
+  unsigned int oldsize;
+  int isfirst;    // Keeps track of the first element in the chashlistnode_t for each bin in hashtable
+  unsigned int i,index;
+  unsigned int mask;
+  
+  ptr = c_table;
+  oldsize = c_size;
+
+  if((node = calloc(newsize, sizeof(chashlistnode_t))) == NULL) {
+    printf("Calloc error %s %d\n", __FILE__, __LINE__);
+    return 1;
+  }
+
+  c_table = node;          //Update the global hashtable upon resize()
+  c_size = newsize;
+  c_threshold = newsize * c_loadfactor;
+  mask=c_mask = (newsize << 1)-1;
+
+  for(i = 0; i < oldsize; i++) {                        //Outer loop for each bin in hash table
+    curr = &ptr[i];
+    isfirst = 1;
+    do {                      //Inner loop to go through linked lists
+      unsigned int key;
+      chashlistnode_t *tmp,*next;
+      
+      if ((key=curr->key) == 0) {             //Exit inner loop if there the first element is 0
+       break;                  //key = val =0 for element if not present within the hash table
+      }
+      next = curr->next;
+      index = (key & mask) >>1;
+      tmp=&node[index];
+      // Insert into the new table
+      if(tmp->key == 0) {
+       tmp->key = curr->key;
+       tmp->val = curr->val;
+       if (!isfirst) {
+         free(curr);
+       }
+      }/*
+        NOTE:  Add this case if you change this...
+        This case currently never happens because of the way things rehash....
+        else if (isfirst) {
+       chashlistnode_t *newnode= calloc(1, sizeof(chashlistnode_t));
+       newnode->key = curr->key;
+       newnode->val = curr->val;
+       newnode->next = tmp->next;
+       tmp->next=newnode;
+       } */
+      else {
+       curr->next=tmp->next;
+       tmp->next=curr;
+      }
+
+      isfirst = 0;
+      curr = next;
+    } while(curr!=NULL);
+  }
+
+  free(ptr);            //Free the memory of the old hash table
+  return 0;
+}
+
 //Delete the entire hash table
 void chashDelete(chashtable_t *ctable) {
   int i;
@@ -195,3 +323,19 @@ void chashDelete(chashtable_t *ctable) {
   free(ptr);
   free(ctable);
 }
+
+//Delete the entire hash table
+void t_chashDelete() {
+  int i;
+  chashlistnode_t *ptr = c_table;
+
+  for(i=0 ; i<c_size ; i++) {
+    chashlistnode_t * curr = ptr[i].next;
+    while(curr!=NULL) {
+      chashlistnode_t * next = curr->next;
+      free(curr);
+      curr=next;
+    }
+  }
+  free(ptr);
+}
index c3348d210e7ed8cc10340f17430ccb0c2b837572..2382208e3ea7e614530577217ccee2406c5d1ba1 100644 (file)
@@ -7,6 +7,9 @@
 #define CLOADFACTOR 0.25
 #define CHASH_SIZE 1024
 
+#define INLINE    inline __attribute__((always_inline))
+
+
 typedef struct chashlistnode {
   unsigned int key;
   void *val;       //this can be cast to another type or used to point to a larger structure
@@ -22,6 +25,13 @@ typedef struct chashtable {
   double loadfactor;
 } chashtable_t;
 
+
+void t_chashCreate(unsigned int size, double loadfactor);
+void t_chashInsert(unsigned int key, void *val);
+void * t_chashSearch(unsigned int key);
+unsigned int t_chashResize(unsigned int newsize);
+void t_chashDelete();
+
 /* Prototypes for hash*/
 chashtable_t *chashCreate(unsigned int size, double loadfactor);
 static unsigned int chashFunction(chashtable_t *table, unsigned int key);
@@ -33,5 +43,11 @@ unsigned int chashResize(chashtable_t *table, unsigned int newsize);
 void chashDelete(chashtable_t *table);
 /* end hash */
 
-#endif
+extern __thread chashlistnode_t *c_table;
+extern __thread unsigned int c_size;
+extern __thread unsigned int c_mask;
+extern __thread unsigned int c_numelements;
+extern __thread unsigned int c_threshold;
+extern __thread double c_loadfactor;
 
+#endif
index 28dc122fa3e3bd1ccd799ee8c39fd1a7c2e6c32e..a7439836a26638469af8007a68de7def8c6cb70c 100644 (file)
@@ -163,18 +163,6 @@ typedef struct oidmidpair {
   unsigned int mid;
 } oidmidpair_t;
 
-typedef struct transrecord {
-  objstr_t *cache;
-  chashtable_t *lookupTable;
-#ifdef COMPILER
-  struct ___Object___ * revertlist;
-#endif
-#ifdef ABORTREADERS
-  int abort;
-  jmp_buf aborttrans;
-#endif
-} transrecord_t;
-
 // Structure is a shared structure that keeps track of responses from the participants
 typedef struct thread_response {
   char rcv_status;
@@ -214,33 +202,6 @@ typedef struct trans_commit_data {
 
 
 #define PRINT_TID(PTR) printf("DEBUG -> %x %d\n", PTR->mid, PTR->thread_id);
-/* Structure for passing multiple arguments to a thread
- * spawned to process each transaction on a machine */
-typedef struct thread_data_array {
-  int thread_id;
-  int mid;
-  trans_req_data_t *buffer;     /* Holds trans request information sent to a participant, based on threadid */
-  thread_response_t *recvmsg;   /* Shared datastructure to keep track of the participants response to a trans request */
-  pthread_cond_t *threshold;    /* Condition var to wake up a thread */
-  pthread_mutex_t *lock;        /* Lock for counting participants response */
-  int *count;                   /* Shared variable to count responses from all participants to the TRANS_REQUEST protocol */
-  char *replyctrl;              /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */
-  char *replyretry;             /* Shared variable that keep track if coordinator needs retry */
-  transrecord_t *rec;           /* Shared variable transaction record send to all thread data */
-} thread_data_array_t;
-
-
-//Structure for passing arguments to the local m/c thread
-typedef struct local_thread_data_array {
-  thread_data_array_t *tdata;           /* Holds all the arguments send to a thread that is spawned when transaction commits */
-  trans_commit_data_t *transinfo;       /* Holds information of objects locked and not found in the participant */
-} local_thread_data_array_t;
-
-//Structure to store mid and socketid information
-typedef struct midSocketInfo {
-  unsigned int mid;                     /* To communicate with mid use sockid in this data structure */
-  int sockid;
-} midSocketInfo_t;
 
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
@@ -286,18 +247,31 @@ void addHost(unsigned int);
 void mapObjMethod(unsigned short);
 
 void randomdelay();
-__attribute__((malloc)) transrecord_t *transStart();
-__attribute__((pure)) objheader_t *transRead(transrecord_t *, unsigned int);
-objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid header
-int transCommit(transrecord_t *record); //return 0 if successful
+void transStart();
+#define TRANSREAD(x,y) { \
+  unsigned int inputvalue;\
+if ((inputvalue=(unsigned int)y)==0) x=NULL;\
+else { \
+chashlistnode_t * cnodetmp=&c_table[(inputvalue&c_mask)>>1];   \
+do { \
+  if (cnodetmp->key==inputvalue) {x=(void *)&((objheader_t*)cnodetmp->val)[1];break;} \
+cnodetmp=cnodetmp->next;\
+if (cnodetmp==NULL) {x=(void *)transRead2(inputvalue);break;}  \
+} while(1);\
+}}
+
+__attribute__((pure)) objheader_t *transRead(unsigned int);
+__attribute__((pure)) objheader_t *transRead2(unsigned int);
+objheader_t *transCreateObj(unsigned int); //returns oid header
+int transCommit(); //return 0 if successful
 void *transRequest(void *);     //the C routine that the thread will execute when TRANS_REQUEST begins
-char decideResponse(char *, char *, transrecord_t *, int); // Coordinator decides what response to send to the participant
-void *getRemoteObj(transrecord_t *, unsigned int, unsigned int); // returns object header from main object store after object is copied into it from remote machine
-void handleLocalReq(trans_req_data_t *, trans_commit_data_t *, transrecord_t *, char *);
-int transComProcess(trans_req_data_t *, trans_commit_data_t *, transrecord_t *);
-void doLocalProcess(char, trans_req_data_t *tdata, trans_commit_data_t *, transrecord_t *);
+char decideResponse(char *, char *,  int); // Coordinator decides what response to send to the participant
+void *getRemoteObj(unsigned int, unsigned int); // returns object header from main object store after object is copied into it from remote machine
+void handleLocalReq(trans_req_data_t *, trans_commit_data_t *, char *);
+int transComProcess(trans_req_data_t *, trans_commit_data_t *);
+void doLocalProcess(char, trans_req_data_t *tdata, trans_commit_data_t *);
 int transAbortProcess(trans_commit_data_t *);
-void transAbort(transrecord_t *trans);
+void transAbort();
 void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size);
 void prefetch(int, int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
@@ -305,7 +279,7 @@ void *mcqProcess(void *);
 prefetchpile_t *foundLocal(char *); // returns node with prefetch elements(oids, offsets)
 int lookupObject(unsigned int * oid, short offset);
 int checkoid(unsigned int oid);
-int transPrefetchProcess(transrecord_t *, int **, short);
+int transPrefetchProcess(int **, short);
 void sendPrefetchReq(prefetchpile_t*, int);
 void sendPrefetchReqnew(prefetchpile_t*, int);
 int getPrefetchResponse(int);
@@ -335,4 +309,6 @@ void swap(double *e1, double *e2);
 double avgofthreads(int siteid, int threadid);
 
 /* end transactions */
+
+#include "trans.h"
 #endif
index e0a16ae72da0bab6ebd91a0dc08631b453261bc6..42aef3188b626778d13434c6c8763da241088c87 100644 (file)
 #ifdef ABORTREADERS
 #include "abortreaders.h"
 #endif
+#include "trans.h"
 
 #define NUM_THREADS 1
 #define CONFIG_FILENAME "dstm.conf"
 
+/* Thread transaction variables */
+
+__thread objstr_t *t_cache;
+__thread struct ___Object___ *revertlist;
+#ifdef ABORTREADERS
+__thread int t_abort;
+__thread jmp_buf aborttrans;
+#endif
+
 
 /* Global Variables */
 extern int classsize[];
@@ -63,7 +73,7 @@ int bytesSent = 0;
 int bytesRecv = 0;
 
 void printhex(unsigned char *, int);
-plistnode_t *createPiles(transrecord_t *);
+plistnode_t *createPiles();
 plistnode_t *sortPiles(plistnode_t *pileptr);
 
 /*******************************
@@ -323,18 +333,13 @@ void randomdelay() {
 }
 
 /* This function initializes things required in the transaction start*/
-__attribute__((malloc)) transrecord_t *transStart() {
-  transrecord_t *tmp;
-  if((tmp = calloc(1, sizeof(transrecord_t))) == NULL) {
-    printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
-    return NULL;
-  }
-  tmp->cache = objstrCreate(1048576);
-  tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
-  //#ifdef COMPILER
-  //  tmp->revertlist=NULL; //Not necessary...already null
-  //#endif
-  return tmp;
+void transStart() {
+  t_cache = objstrCreate(1048576);
+  t_chashCreate(CHASH_SIZE, CLOADFACTOR);
+  revertlist=NULL;
+#ifdef ABORTREADERS
+  t_abort=0;
+#endif
 }
 
 // Search for an address for a given oid                                                                               
@@ -355,22 +360,24 @@ INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
   }*/
 
 
+
+
 /* This function finds the location of the objects involved in a transaction
  * and returns the pointer to the object if found in a remote location */
-__attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int oid) {
+__attribute__((pure)) objheader_t *transRead(unsigned int oid) {
   unsigned int machinenumber;
   objheader_t *tmp, *objheader;
   objheader_t *objcopy;
   int size;
   void *buf;
   chashlistnode_t *node;
-  chashtable_t *table=record->lookupTable;
 
   if(oid == 0) {
     return NULL;
   }
   
-  node= &table->table[(oid & table->mask)>>1];
+
+  node= &c_table[(oid & c_mask)>>1];
   do {
     if(node->key == oid) {
 #ifdef TRANSSTATS
@@ -400,15 +407,96 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int
   */
 
 #ifdef ABORTREADERS
-  if (record->abort) {
+  if (t_abort) {
+    //abort this transaction
+    //printf("ABORTING\n");
+    removetransactionhash();
+    objstrDelete(t_cache);
+    t_chashDelete();
+    _longjmp(aborttrans,1);
+  } else
+    addtransaction(oid);
+#endif
+
+  if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
+#ifdef TRANSSTATS
+    nmhashSearch++;
+#endif
+    /* Look up in machine lookup table  and copy  into cache*/
+    GETSIZE(size, objheader);
+    size += sizeof(objheader_t);
+    objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
+    memcpy(objcopy, objheader, size);
+    /* Insert into cache's lookup table */
+    STATUS(objcopy)=0;
+    t_chashInsert(OID(objheader), objcopy);
+#ifdef COMPILER
+    return &objcopy[1];
+#else
+    return objcopy;
+#endif
+  } else {
+#ifdef CACHE
+    if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+#ifdef TRANSSTATS
+      nprehashSearch++;
+#endif
+      /* Look up in prefetch cache */
+      GETSIZE(size, tmp);
+      size+=sizeof(objheader_t);
+      objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
+      memcpy(objcopy, tmp, size);
+      /* Insert into cache's lookup table */
+      t_chashInsert(OID(tmp), objcopy);
+#ifdef COMPILER
+      return &objcopy[1];
+#else
+      return objcopy;
+#endif
+    }
+#endif
+    /* Get the object from the remote location */
+    if((machinenumber = lhashSearch(oid)) == 0) {
+      printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
+      return NULL;
+    }
+    objcopy = getRemoteObj(machinenumber, oid);
+
+    if(objcopy == NULL) {
+      printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
+      return NULL;
+    } else {
+#ifdef TRANSSTATS
+      nRemoteSend++;
+#endif
+#ifdef COMPILER
+      return &objcopy[1];
+#else
+      return objcopy;
+#endif
+    }
+  }
+}
+
+
+/* This function finds the location of the objects involved in a transaction
+ * and returns the pointer to the object if found in a remote location */
+__attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
+  unsigned int machinenumber;
+  objheader_t *tmp, *objheader;
+  objheader_t *objcopy;
+  int size;
+
+#ifdef ABORTREADERS
+  if (t_abort) {
     //abort this transaction
     //printf("ABORTING\n");
-    removetransactionhash(record->lookupTable, record);
-    objstrDelete(record->cache);
-    chashDelete(record->lookupTable);
-    _longjmp(record->aborttrans,1);
+    removetransactionhash();
+    objstrDelete(t_cache);
+    t_chashDelete();
+    _longjmp(aborttrans,1);
   } else
-    addtransaction(oid,record);
+    addtransaction(oid);
 #endif
 
   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
@@ -418,11 +506,11 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int
     /* Look up in machine lookup table  and copy  into cache*/
     GETSIZE(size, objheader);
     size += sizeof(objheader_t);
-    objcopy = (objheader_t *) objstrAlloc(&record->cache, size);
+    objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
     memcpy(objcopy, objheader, size);
     /* Insert into cache's lookup table */
     STATUS(objcopy)=0;
-    chashInsert(record->lookupTable, OID(objheader), objcopy);
+    t_chashInsert(OID(objheader), objcopy);
 #ifdef COMPILER
     return &objcopy[1];
 #else
@@ -437,10 +525,10 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int
       /* Look up in prefetch cache */
       GETSIZE(size, tmp);
       size+=sizeof(objheader_t);
-      objcopy = (objheader_t *) objstrAlloc(&record->cache, size);
+      objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
       memcpy(objcopy, tmp, size);
       /* Insert into cache's lookup table */
-      chashInsert(record->lookupTable, OID(tmp), objcopy);
+      t_chashInsert(OID(tmp), objcopy);
 #ifdef COMPILER
       return &objcopy[1];
 #else
@@ -453,7 +541,7 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int
       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
       return NULL;
     }
-    objcopy = getRemoteObj(record, machinenumber, oid);
+    objcopy = getRemoteObj(machinenumber, oid);
 
     if(objcopy == NULL) {
       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
@@ -472,13 +560,13 @@ __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int
 }
 
 /* This function creates objects in the transaction record */
-objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
-  objheader_t *tmp = (objheader_t *) objstrAlloc(&record->cache, (sizeof(objheader_t) + size));
+objheader_t *transCreateObj(unsigned int size) {
+  objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
   OID(tmp) = getNewOID();
   tmp->version = 1;
   tmp->rcount = 1;
   STATUS(tmp) = NEW;
-  chashInsert(record->lookupTable, OID(tmp), tmp);
+  t_chashInsert(OID(tmp), tmp);
 
 #ifdef COMPILER
   return &tmp[1]; //want space after object header
@@ -490,14 +578,14 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
 #if 1
 /* This function creates machine piles based on all machines involved in a
  * transaction commit request */
-plistnode_t *createPiles(transrecord_t *record) {
+plistnode_t *createPiles() {
   int i;
   plistnode_t *pile = NULL;
   unsigned int machinenum;
   objheader_t *headeraddr;
-  chashlistnode_t * ptr = record->lookupTable->table;
+  chashlistnode_t * ptr = c_table;
   /* Represents number of bins in the chash table */
-  unsigned int size = record->lookupTable->size;
+  unsigned int size = c_size;
 
   for(i = 0; i < size ; i++) {
     chashlistnode_t * curr = &ptr[i];
@@ -517,7 +605,7 @@ plistnode_t *createPiles(transrecord_t *record) {
       }
 
       //Make machine groups
-      pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
+      pile = pInsert(pile, headeraddr, machinenum, c_numelements);
       curr = curr->next;
     }
   }
@@ -526,14 +614,14 @@ plistnode_t *createPiles(transrecord_t *record) {
 #else
 /* This function creates machine piles based on all machines involved in a
  * transaction commit request */
-plistnode_t *createPiles(transrecord_t *record) {
+plistnode_t *createPiles() {
   int i;
   plistnode_t *pile = NULL;
   unsigned int machinenum;
   objheader_t *headeraddr;
-  struct chashentry * ptr = record->lookupTable->table;
+  struct chashentry * ptr = c_table;
   /* Represents number of bins in the chash table */
-  unsigned int size = record->lookupTable->size;
+  unsigned int size = c_size;
 
   for(i = 0; i < size ; i++) {
     struct chashentry * curr = & ptr[i];
@@ -552,7 +640,7 @@ plistnode_t *createPiles(transrecord_t *record) {
     }
 
     //Make machine groups
-    pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
+    pile = pInsert(pile, headeraddr, machinenum, c_numelements);
   }
   return pile;
 }
@@ -563,7 +651,7 @@ plistnode_t *createPiles(transrecord_t *record) {
  * and creates new piles by calling the createPiles(),
  * Sends a transrequest() to each remote machines for objects found remotely
  * and calls handleLocalReq() to process objects found locally */
-int transCommit(transrecord_t *record) {
+int transCommit() {
   unsigned int tot_bytes_mod, *listmid;
   plistnode_t *pile, *pile_ptr;
   int trecvcount;
@@ -573,13 +661,12 @@ int transCommit(transrecord_t *record) {
   char finalResponse;
 
 #ifdef ABORTREADERS
-  if (record->abort) {
+  if (t_abort) {
     //abort this transaction
     printf("ABORTING TRANSACTION AT COMMIT\n");
-    removetransactionhash(record->lookupTable, record);
-    objstrDelete(record->cache);
-    chashDelete(record->lookupTable);
-    free(record);
+    removetransactionhash();
+    objstrDelete(t_cache);
+    t_chashDelete();
     return 1;
   }
 #endif
@@ -592,7 +679,7 @@ int transCommit(transrecord_t *record) {
     /* Look through all the objects in the transaction record and make piles
      * for each machine involved in the transaction*/
     if (firsttime) {
-      pile_ptr = pile = createPiles(record);
+      pile_ptr = pile = createPiles();
       pile_ptr = pile = sortPiles(pile);
     } else {
       pile = pile_ptr;
@@ -669,7 +756,7 @@ int transCommit(transrecord_t *record) {
        for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
          int size;
          objheader_t *headeraddr;
-         if((headeraddr = chashSearch(record->lookupTable, tosend[sockindex].oidmod[i])) == NULL) {
+         if((headeraddr = t_chashSearch(tosend[sockindex].oidmod[i])) == NULL) {
            printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
            free(modptr);
            free(listmid);
@@ -684,7 +771,7 @@ int transCommit(transrecord_t *record) {
        send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
        free(modptr);
       } else { //handle request locally
-       handleLocalReq(&tosend[sockindex], &transinfo, record, &getReplyCtrl[sockindex]);
+       handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
       }
       sockindex++;
       pile = pile->next;
@@ -740,7 +827,7 @@ int transCommit(transrecord_t *record) {
       }
     }
     /* Decide the final response */
-    if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, record, pilecount)) == 0) {
+    if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
       free(tosend);
       free(listmid);
@@ -755,7 +842,7 @@ int transCommit(transrecord_t *record) {
        if(finalResponse == TRANS_COMMIT) {
          int retval;
          /* Update prefetch cache */
-         if((retval = updatePrefetchCache(&(tosend[i]), record)) != 0) {
+         if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
            printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
            free(tosend);
            free(listmid);
@@ -774,27 +861,27 @@ int transCommit(transrecord_t *record) {
          }
 #ifdef ABORTREADERS
          removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
-         removethisreadtransaction(tosend[i].objread, tosend[i].f.numread, record);
+         removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
 #endif
        }
 #ifdef ABORTREADERS
        else if (!treplyretry) {
-         removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record);
-         removethisreadtransaction(tosend[i].objread,tosend[i].f.numread,record);
+         removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+         removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
        }
 #endif
 #endif
        send_data(sd, &finalResponse, sizeof(char));
       } else {
        /* Complete local processing */
-       doLocalProcess(finalResponse, &(tosend[i]), &transinfo, record);
+       doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
 #ifdef ABORTREADERS
        if(finalResponse == TRANS_COMMIT) {
          removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
-         removethisreadtransaction(tosend[i].objread,tosend[i].f.numread, record);
+         removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
        } else if (!treplyretry) {
-         removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record);
-         removethisreadtransaction(tosend[i].objread,tosend[i].f.numread,record);
+         removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+         removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
        }
 #endif
       }
@@ -821,18 +908,16 @@ int transCommit(transrecord_t *record) {
     numTransAbort++;
 #endif
     /* Free Resources */
-    objstrDelete(record->cache);
-    chashDelete(record->lookupTable);
-    free(record);
+    objstrDelete(t_cache);
+    t_chashDelete();
     return TRANS_ABORT;
   } else if(finalResponse == TRANS_COMMIT) {
 #ifdef TRANSSTATS
     numTransCommit++;
 #endif
     /* Free Resources */
-    objstrDelete(record->cache);
-    chashDelete(record->lookupTable);
-    free(record);
+    objstrDelete(t_cache);
+    t_chashDelete();
     return 0;
   } else {
     //TODO Add other cases
@@ -845,7 +930,7 @@ int transCommit(transrecord_t *record) {
 /* This function handles the local objects involved in a transaction
  * commiting process.  It also makes a decision if this local machine
  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
-void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec, char *getReplyCtrl) {
+void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, char *getReplyCtrl) {
   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
   int numoidnotfound = 0, numoidlocked = 0;
   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
@@ -872,7 +957,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra
       }
       int tmpsize;
       objheader_t *headptr;
-      headptr = (objheader_t *) chashSearch(rec->lookupTable, tdata->oidmod[i-numread]);
+      headptr = (objheader_t *) t_chashSearch(tdata->oidmod[i-numread]);
       if (headptr == NULL) {
        printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
        return;
@@ -901,7 +986,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra
   }
 }
 
-void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *record) {
+void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
   if(finalResponse == TRANS_ABORT) {
     if(transAbortProcess(transinfo) != 0) {
       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
@@ -919,7 +1004,7 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da
       }
     }
 #endif
-    if(transComProcess(tdata, transinfo, record) != 0) {
+    if(transComProcess(tdata, transinfo) != 0) {
       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
       fflush(stdout);
       return;
@@ -939,7 +1024,7 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da
 
 /* This function decides the reponse that needs to be sent to
  * all Participant machines after the TRANS_REQUEST protocol */
-char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record, int pilecount) {
+char decideResponse(char *getReplyCtrl, char *treplyretry, int pilecount) {
   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
                                                                   message to send */
   for (i = 0 ; i < pilecount; i++) {
@@ -970,7 +1055,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record
     return TRANS_ABORT;
 #ifdef CACHE
     /* clear objects from prefetch cache */
-    cleanPCache(record);
+    cleanPCache();
 #endif
   } else if(transagree == pilecount) {
     /* Send Commit */
@@ -989,7 +1074,7 @@ char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record
  * available and copies the object and its header to the local
  * cache. */
 
-void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
+void *getRemoteObj(unsigned int mnum, unsigned int oid) {
   int size, val;
   struct sockaddr_in serv_addr;
   char machineip[16];
@@ -1011,11 +1096,11 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
   } else {
     /* Read object if found into local cache */
     recv_data(sd, &size, sizeof(int));
-    objcopy = objstrAlloc(&record->cache, size);
+    objcopy = objstrAlloc(&t_cache, size);
     recv_data(sd, objcopy, size);
     STATUS(objcopy)=0;
     /* Insert into cache's lookup table */
-    chashInsert(record->lookupTable, oid, objcopy);
+    t_chashInsert(oid, objcopy);
   }
 
   return objcopy;
@@ -1133,7 +1218,7 @@ int transAbortProcess(trans_commit_data_t *transinfo) {
 }
 
 /*This function completes the COMMIT process if the transaction is commiting*/
-int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec) {
+int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
   objheader_t *header, *tcptr;
   int i, nummod, tmpsize, numcreated, numlocked;
   unsigned int *oidmod, *oidcreated, *oidlocked;
@@ -1152,7 +1237,7 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra
       return 1;
     }
     /* Copy from transaction cache -> main object store */
-    if ((tcptr = ((objheader_t *) chashSearch(rec->lookupTable, oidmod[i]))) == NULL) {
+    if ((tcptr = ((objheader_t *) t_chashSearch(oidmod[i]))) == NULL) {
       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
       return 1;
     }
@@ -1174,7 +1259,7 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra
   }
   /* If object is newly created inside transaction then commit it */
   for (i = 0; i < numcreated; i++) {
-    if ((header = ((objheader_t *) chashSearch(rec->lookupTable, oidcreated[i]))) == NULL) {
+    if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
       return 1;
     }
@@ -1820,13 +1905,12 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
   return status;
 }
 
-void transAbort(transrecord_t *trans) {
+void transAbort() {
 #ifdef ABORTREADERS
-  removetransactionhash(trans->lookupTable, trans);
+  removetransactionhash();
 #endif
-  objstrDelete(trans->cache);
-  chashDelete(trans->lookupTable);
-  free(trans);
+  objstrDelete(t_cache);
+  t_chashDelete();
 }
 
 /* This function inserts necessary information into
diff --git a/Robust/src/Runtime/DSTM/interface/trans.h b/Robust/src/Runtime/DSTM/interface/trans.h
new file mode 100644 (file)
index 0000000..2eee1d3
--- /dev/null
@@ -0,0 +1,11 @@
+#ifndef TRANS_H
+#define TRANS_H
+
+extern __thread objstr_t *t_cache;
+extern __thread struct ___Object___ *revertlist;
+#ifdef ABORTREADERS
+extern __thread int t_abort;
+extern __thread jmp_buf aborttrans;
+#endif
+
+#endif
index 74a8bdd84ffce67ca0239576b01019650e819ee4..6fdbc27f4bfac2cb45149be726a1f0a599689773 100644 (file)
@@ -426,13 +426,13 @@ void checkcollect(void * ptr) {
 }
 
 #ifdef DSTM
-void checkcollect2(void * ptr, transrecord_t *trans) {
-  int ptrarray[]={1, (int)ptr, (int) trans->revertlist};
+void checkcollect2(void * ptr) {
+  int ptrarray[]={1, (int)ptr, (int) revertlist};
   struct listitem * tmp=stopforgc((struct garbagelist *)ptrarray);
   pthread_mutex_lock(&gclock); // Wait for GC
   restartaftergc(tmp);
   pthread_mutex_unlock(&gclock);
-  trans->revertlist=(struct ___Object___*)ptrarray[2];
+  revertlist=(struct ___Object___*)ptrarray[2];
 }
 #endif
 
index 95f4d54cee79bf928117db30cf927cdb9224ea5d..665eaf2dc58ed8397b495502553d2fc61c49e862 100644 (file)
@@ -161,8 +161,8 @@ void CALL02(___System______rangePrefetch____L___Object_____AR_S, struct ___Objec
 /* Object allocation function */
 
 #ifdef DSTM
-__attribute__((malloc)) void * allocate_newglobal(transrecord_t *trans, int type) {
-  struct ___Object___ * v=(struct ___Object___ *) transCreateObj(trans, classsize[type]);
+__attribute__((malloc)) void * allocate_newglobal(int type) {
+  struct ___Object___ * v=(struct ___Object___ *) transCreateObj(classsize[type]);
   v->type=type;
 #ifdef THREADS
   v->tid=0;
@@ -174,8 +174,8 @@ __attribute__((malloc)) void * allocate_newglobal(transrecord_t *trans, int type
 
 /* Array allocation function */
 
-__attribute__((malloc)) struct ArrayObject * allocate_newarrayglobal(transrecord_t *trans, int type, int length) {
-  struct ArrayObject * v=(struct ArrayObject *)transCreateObj(trans, sizeof(struct ArrayObject)+length*classsize[type]);
+__attribute__((malloc)) struct ArrayObject * allocate_newarrayglobal(int type, int length) {
+  struct ArrayObject * v=(struct ArrayObject *)transCreateObj(sizeof(struct ArrayObject)+length*classsize[type]);
   if (length<0) {
     printf("ERROR: negative array\n");
     return NULL;
index 20a1d5fb6aedde24e5c8f03676b45ff8b2bfd1dc..fa1afc2db4014f96be5d6cf2e7aa38544022c98c 100644 (file)
@@ -27,8 +27,8 @@ extern void * curr_heaptop;
 #endif
 
 #ifdef DSTM
-__attribute__((malloc)) void * allocate_newglobal(transrecord_t *, int type);
-__attribute__((malloc)) struct ArrayObject * allocate_newarrayglobal(transrecord_t *, int type, int length);
+__attribute__((malloc)) void * allocate_newglobal(int type);
+__attribute__((malloc)) struct ArrayObject * allocate_newarrayglobal(int type, int length);
 #endif
 
 #ifdef PRECISE_GC
index aa5fbadc35492da2e57e65df23341845528a8d80..25362e6eba0a4c62959343ca32d0d8cdd7675d24 100644 (file)
@@ -29,7 +29,6 @@ pthread_key_t oidval;
 void threadexit() {
   objheader_t* ptr;
   void *value;
-  transrecord_t * trans;
   unsigned int oidvalue;
 
 #ifdef THREADS
@@ -57,12 +56,12 @@ void threadexit() {
   goto transstart;
 transstart:
   {
-    transrecord_t * trans = transStart();
-    ptr = transRead(trans, oidvalue);
+    transStart();
+    ptr = transRead(oidvalue);
     struct ___Thread___ *p = (struct ___Thread___ *) ptr;
     p->___threadDone___ = 1;
     *((unsigned int *)&((struct ___Object___ *) p)->___localcopy___) |=DIRTY;
-    if(transCommit(trans) != 0) {
+    if(transCommit() != 0) {
       goto transstart;
     }
   }
@@ -149,12 +148,11 @@ void CALL00(___Thread______yield____) {
 void CALL01(___Thread______join____, struct ___Thread___ * ___this___) {
   unsigned int *oidarray;
   unsigned short *versionarray, version;
-  transrecord_t *trans;
   objheader_t *ptr;
   /* Add transaction to check if thread finished for join operation */
 transstart:
-  trans = transStart();
-  ptr = transRead(trans, (unsigned int) VAR(___this___));
+  transStart();
+  ptr = transRead((unsigned int) VAR(___this___));
   struct ___Thread___ *p = (struct ___Thread___ *) ptr;
 #ifdef THREADJOINDEBUG
   printf("Start join process for Oid = %x\n", (unsigned int) VAR(___this___));
@@ -163,7 +161,7 @@ transstart:
 #ifdef THREADJOINDEBUG
     printf("Thread oid = %x is done\n", (unsigned int) VAR(___this___));
 #endif
-    transAbort(trans);
+    transAbort();
     return;
   } else {
 
@@ -191,7 +189,7 @@ transstart:
 #endif
     free(oidarray);
     free(versionarray);
-    transAbort(trans);
+    transAbort();
     goto transstart;
   }
   return;
@@ -242,7 +240,6 @@ void globalDestructor(void *value) {
 
 void initDSMthread(int *ptr) {
   objheader_t *tmp;
-  transrecord_t * trans;
   void *threadData;
   int oid=ptr[0];
   int type=ptr[1];
@@ -264,11 +261,11 @@ void initDSMthread(int *ptr) {
   goto transstart;
 transstart:
   {
-    transrecord_t * trans = transStart();
-    tmp  = transRead(trans, (unsigned int) oid);
+    transStart();
+    tmp  = transRead((unsigned int) oid);
     ((struct ___Thread___ *)tmp)->___threadDone___ = 1;
     *((unsigned int *)&((struct ___Object___ *) tmp)->___localcopy___) |=DIRTY;
-    if(transCommit(trans)!= 0) {
+    if(transCommit()!= 0) {
       goto transstart;
     }
   }