Changes to runtime for read/write lock implementation
authoradash <adash>
Fri, 5 Sep 2008 01:35:16 +0000 (01:35 +0000)
committeradash <adash>
Fri, 5 Sep 2008 01:35:16 +0000 (01:35 +0000)
and modifying benchmark for dw-10 with dw-14

12 files changed:
Robust/src/Benchmarks/Prefetch/Em3d/dsm/Em3d2.java
Robust/src/Benchmarks/Prefetch/MatrixMultiply/MatrixMultiplyN.java
Robust/src/Benchmarks/Prefetch/MicroBenchmarks/MultiMCReadcommit.java
Robust/src/Benchmarks/Prefetch/MicroBenchmarks/MultiMCWritecommit.java
Robust/src/Benchmarks/Prefetch/SOR/dsm/JGFSORBench.java
Robust/src/Benchmarks/Prefetch/SOR/dsm/SORRunner.java
Robust/src/IR/Flat/BuildCode.java
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/signal.c
Robust/src/Runtime/DSTM/interface/trans.c
Robust/src/buildscript

index 20edbdf3926af52960cba86c48d0236abae6c476..e35e4731e7e7ed113fd123a8817f6d4dcbb62fb9 100644 (file)
@@ -61,7 +61,7 @@ public class Em3d extends Thread {
     Random random;
     String hname;
 
-    barr = new Barrier("128.195.175.79");
+    barr = new Barrier("128.195.175.84");
     atomic {
        iteration = numIter;
        degree = numDegree;
@@ -139,10 +139,10 @@ public class Em3d extends Thread {
     long start0 = System.currentTimeMillis();
     int numThreads = em.numThreads;
     int[] mid = new int[4];
-    mid[0] = (128<<24)|(195<<16)|(175<<8)|79;//dw-8
-    mid[1] = (128<<24)|(195<<16)|(175<<8)|80;//dw-9
-    mid[2] = (128<<24)|(195<<16)|(175<<8)|78;//dw-7
-    mid[3] = (128<<24)|(195<<16)|(175<<8)|73;//dw-5
+    mid[0] = (128<<24)|(195<<16)|(175<<8)|84;//dw-8
+    mid[1] = (128<<24)|(195<<16)|(175<<8)|85;//dw-9
+    mid[2] = (128<<24)|(195<<16)|(175<<8)|86;//dw-7
+    mid[3] = (128<<24)|(195<<16)|(175<<8)|87;//dw-5
 
     System.printString("DEBUG -> numThreads = " + numThreads+"\n");
     BarrierServer mybarr;
index 236592c6e760bd39ca0e14b101e71c1dc2fe97c9..59c837a43baa8cc93f9818d48f9dc90d275fad27 100644 (file)
@@ -42,10 +42,10 @@ public class MatrixMultiply extends Thread{
        }
        
        int[] mid = new int[4];
-       mid[0] = (128<<24)|(195<<16)|(175<<8)|79; //dw-8
-       mid[1] = (128<<24)|(195<<16)|(175<<8)|80; //dw-9
-       mid[2] = (128<<24)|(195<<16)|(175<<8)|78; //dw-7
-       mid[3] = (128<<24)|(195<<16)|(175<<8)|73; //dw-5
+       mid[0] = (128<<24)|(195<<16)|(175<<8)|84; //dw-10
+       mid[1] = (128<<24)|(195<<16)|(175<<8)|85; //dw-11
+       mid[2] = (128<<24)|(195<<16)|(175<<8)|86; //dw-12
+       mid[3] = (128<<24)|(195<<16)|(175<<8)|87; //dw-13
        int p, q, r;
        MatrixMultiply[] mm;
        MatrixMultiply tmp;
index 5954126de3697a1710ee0ae71a4d5eeeda3dc29c..ded2de86bc92d04cd2cb823cd77cd44f5b9de2e3 100644 (file)
@@ -25,7 +25,6 @@ public class ReadArrayObj extends Thread {
 
     Barrier.enterBarrier(barr);
 
-    /*
     //All machines reading data from array
     int val;
     for(int i=0; i<10000; i++) {
@@ -35,7 +34,6 @@ public class ReadArrayObj extends Thread {
         }
       }
     }
-    */
   }
 
   public static void main(String[] args) {
index ed7eb6e2e0cf8f9943067add83232ae72c957174..f2fd6003e100aab90005ba47c3e57f9cbc4c8fb1 100644 (file)
@@ -24,7 +24,6 @@ public class WriteArrayObj extends Thread {
     }
 
     Barrier.enterBarrier(barr);
-/*
     //Write into array elements
     Integer val;
     for(int j=0; j<10000; j++) {
@@ -35,7 +34,6 @@ public class WriteArrayObj extends Thread {
         }
       }
     }
-    */
   }
 
   public static void main(String[] args) {
index 35e7fdd4d4e76235cb1949bd7f885ccedb1f9167..c629c0466bbfa8735c4d6b14f701eaef8fc6aad5 100644 (file)
@@ -47,10 +47,10 @@ public class JGFSORBench {
     BarrierServer mybarr;
 
     int[] mid = new int[4];
-    mid[0] = (128<<24)|(195<<16)|(175<<8)|79;//dw-8
-    mid[1] = (128<<24)|(195<<16)|(175<<8)|80;//dw-9
-    mid[2] = (128<<24)|(195<<16)|(175<<8)|78;//dw-7
-    mid[3] = (128<<24)|(195<<16)|(175<<8)|73;//dw-5
+    mid[0] = (128<<24)|(195<<16)|(175<<8)|84;//dw-10
+    mid[1] = (128<<24)|(195<<16)|(175<<8)|85;//dw-11
+    mid[2] = (128<<24)|(195<<16)|(175<<8)|86;//dw-12
+    mid[3] = (128<<24)|(195<<16)|(175<<8)|87;//dw-13
 
     double[][] G;
     int num_iterations;
@@ -109,7 +109,7 @@ public class JGFSORBench {
   public int JGFvalidate(){
 
     double refval[];
-    refval = new double[3];
+    refval = new double[4];
     refval[0] = 0.498574406322512;
     refval[1] = 1.1234778980135105;
     refval[2] = 1.9954895063582696;
index 8c661af8d7a511c352e5928dd4d410b56954d77c..032cc6ed480327618af9b1918aea57d38165dd7e 100644 (file)
@@ -21,7 +21,7 @@
 
 class SORRunner extends Thread {
 
-  int id,num_iterations;
+  int id, num_iterations;
   double G[][],omega;
   int nthreads;
 
@@ -38,7 +38,7 @@ class SORRunner extends Thread {
     double omega_over_four, one_minus_omega;
     int numiterations;
     Barrier barr;
-    barr = new Barrier("128.195.175.79");
+    barr = new Barrier("128.195.175.84");
     int ilow, iupper, slice, tslice, ttslice, Mm1, Nm1;
 
     atomic {
index 89e549f19e4833f3d9b34f436293cd8102ec1f6a..318f3c0d4883cd80f116ffd48517dafe418939ec 100644 (file)
@@ -274,6 +274,7 @@ public class BuildCode {
     outmethod.println("printf(\"nmhashSearch= %d\\n\", nmhashSearch);");
     outmethod.println("printf(\"nprehashSearch= %d\\n\", nprehashSearch);");
     outmethod.println("printf(\"nRemoteReadSend= %d\\n\", nRemoteSend);");
+    outmethod.println("printf(\"nSoftAbort= %d\\n\", nSoftAbort);");
     outmethod.println("#endif\n");
     outmethod.println("}");
 
@@ -735,6 +736,7 @@ public class BuildCode {
     outclassdefs.print("extern int nmhashSearch;\n");
     outclassdefs.print("extern int nprehashSearch;\n");
     outclassdefs.print("extern int nRemoteSend;\n");
+    outclassdefs.print("extern int nSoftAbort;\n");
     outclassdefs.print("extern void handle();\n");
     outclassdefs.print("#endif\n");
     outclassdefs.print("int numprefetchsites = " + pa.prefetchsiteid + ";\n");
index 993d730efca4fbb03b6c685c758be1f69a1eed5a..6eb801b8988d8e68b19226fa33614f774a842854 100644 (file)
@@ -256,6 +256,10 @@ char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char
 char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
 int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
 void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid);
+void getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *,
+                             int *, int *, int *, int *, int *, int *, char *, unsigned int, unsigned short);
+void getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *,
+                              int *, int *, char *, unsigned int, unsigned short);
 /* end server portion */
 
 /* Prototypes for transactions */
@@ -294,6 +298,8 @@ int getPrefetchResponse(int);
 unsigned short getObjType(unsigned int oid);
 int startRemoteThread(unsigned int oid, unsigned int mid);
 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs);
+void commitCountForObjRead(local_thread_data_array_t *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short);
+void commitCountForObjMod(local_thread_data_array_t *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short);
 
 /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid);
index c1e36c8ba624a1f0377dd00781595f4a54eebba2..b5314303171d12092644de04ba40b9fd6cd0d554 100644 (file)
@@ -362,12 +362,21 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     if (fixed->nummod > 0)
       free(modptr);
     /* Unlock objects that was locked due to this transaction */
+    int useWriteUnlock = 0;
     for(i = 0; i< transinfo->numlocked; i++) {
+      if(transinfo->objlocked[i] == -1) {
+       useWriteUnlock = 1;
+       continue;
+      }
       if((header = mhashSearch(transinfo->objlocked[i])) == NULL) {
-       printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);                              // find the header address
+       printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__); // find the header address
        return 1;
       }
-      UnLock(STATUSPTR(header));
+      if(useWriteUnlock) {
+       write_unlock(STATUSPTR(header));
+      } else {
+       read_unlock(STATUSPTR(header));
+      }
     }
 
     /* Send ack to Coordinator */
@@ -417,12 +426,11 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
   char control = 0, *ptr;
   unsigned int oid;
   unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
-  void *mobj;
   objheader_t *headptr;
 
   /* Counters and arrays to formulate decision on control message to be sent */
   oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
-  oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
+  oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod + 1, sizeof(unsigned int));
   oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
   int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
@@ -434,61 +442,28 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
 
   /* Process each oid in the machine pile/ group per thread */
   for (i = 0; i < fixed->numread + fixed->nummod; i++) {
-    if (i < fixed->numread) {            //Objs only read and not modified
-      int incr = sizeof(unsigned int) + sizeof(unsigned short);                  // Offset that points to next position in the objread array
+    if (i < fixed->numread) { //Objs only read and not modified
+      int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
       incr *= i;
       oid = *((unsigned int *)(objread + incr));
       incr += sizeof(unsigned int);
       version = *((unsigned short *)(objread + incr));
-    } else {            //Objs modified
+      getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
+                               &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
+    } else {  //Objs modified
+      if(i == fixed->numread) {
+       oidlocked[objlocked] = -1;
+       objlocked++;
+      }
       int tmpsize;
       headptr = (objheader_t *) ptr;
       oid = OID(headptr);
       version = headptr->version;
       GETSIZE(tmpsize, headptr);
       ptr += sizeof(objheader_t) + tmpsize;
-    }
-
-    /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
-
-    if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
-      /* Save the oids not found and number of oids not found for later use */
-      oidnotfound[objnotfound] = oid;
-      objnotfound++;
-    } else {     /* If Obj found in machine (i.e. has not moved) */
-      /* Check if Obj is locked by any previous transaction */
-      if (test_and_set(STATUSPTR(mobj))) {
-       //don't have lock
-       if (version == ((objheader_t *)mobj)->version) {          /* If locked then match versions */
-         v_matchlock++;
-       } else {    /* If versions don't match ...HARD ABORT */
-         v_nomatch++;
-         oidvernotmatch[objvernotmatch] = oid;
-         objvernotmatch++;
-         int size;
-         GETSIZE(size, mobj);
-         size += sizeof(objheader_t);
-         numBytes += size;
-         /* Send TRANS_DISAGREE to Coordinator */
-         control = TRANS_DISAGREE;
-       }
-      } else {    /* If Obj is not locked then lock object */
-       /* Save all object oids that are locked on this machine during this transaction request call */
-       oidlocked[objlocked] = OID(((objheader_t *)mobj));
-       objlocked++;
-       if (version == ((objheader_t *)mobj)->version) {     /* Check if versions match */
-         v_matchnolock++;
-       } else {     /* If versions don't match ...HARD ABORT */
-         v_nomatch++;
-         oidvernotmatch[objvernotmatch] = oid;
-         objvernotmatch++;
-         int size;
-         GETSIZE(size, mobj);
-         size += sizeof(objheader_t);
-         numBytes += size;
-         control = TRANS_DISAGREE;
-       }
-      }
+      getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
+                              &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
+                              &numBytes, &control, oid, version);
     }
   }
 
@@ -507,12 +482,21 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
     }
 #endif
     if (objlocked > 0) {
+      int useWriteUnlock = 0;
       for(j = 0; j < objlocked; j++) {
+       if(oidlocked[j] == -1) {
+         useWriteUnlock = 1;
+         continue;
+       }
        if((headptr = mhashSearch(oidlocked[j])) == NULL) {
          printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
          return 0;
        }
-       UnLock(STATUSPTR(headptr));
+       if(useWriteUnlock) {
+         write_unlock(STATUSPTR(headptr));
+       } else {
+         read_unlock(STATUSPTR(headptr));
+       }
       }
       free(oidlocked);
     }
@@ -537,6 +521,101 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
 
   return control;
 }
+
+/* Update Commit info for objects that are read */
+void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
+                             unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
+                             int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
+                             char *control, unsigned int oid, unsigned short version) {
+  void *mobj;
+  /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+
+  if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
+    /* Save the oids not found and number of oids not found for later use */
+    oidnotfound[*objnotfound] = oid;
+    (*objnotfound)++;
+  } else {     /* If Obj found in machine (i.e. has not moved) */
+    /* Check if Obj is locked by any previous transaction */
+    if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
+      if (version == ((objheader_t *)mobj)->version) { /* match versions */
+       (*v_matchnolock)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       oidvernotmatch[*objvernotmatch] = oid;
+       (*objvernotmatch)++;
+       int size;
+       GETSIZE(size, mobj);
+       size += sizeof(objheader_t);
+       *numBytes += size;
+       /* Send TRANS_DISAGREE to Coordinator */
+       *control = TRANS_DISAGREE;
+      }
+      //Keep track of oid locked
+      oidlocked[*objlocked] = OID(((objheader_t *)mobj));
+      (*objlocked)++;
+    } else {  //we are locked
+      if (version == ((objheader_t *)mobj)->version) {     /* Check if versions match */
+       (*v_matchlock)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       oidvernotmatch[*objvernotmatch] = oid;
+       (*objvernotmatch)++;
+       int size;
+       GETSIZE(size, mobj);
+       size += sizeof(objheader_t);
+       *numBytes += size;
+       *control = TRANS_DISAGREE;
+      }
+    }
+  }
+}
+
+/* Update Commit info for objects that are read */
+void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
+                              int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
+                              int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
+  void *mobj;
+  /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+  if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
+    /* Save the oids not found and number of oids not found for later use */
+    oidnotfound[*objnotfound] = oid;
+    (*objnotfound)++;
+  } else {     /* If Obj found in machine (i.e. has not moved) */
+    /* Check if Obj is locked by any previous transaction */
+    if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
+      if (version == ((objheader_t *)mobj)->version) { /* match versions */
+       (*v_matchnolock)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       oidvernotmatch[*objvernotmatch] = oid;
+       (*objvernotmatch)++;
+       int size;
+       GETSIZE(size, mobj);
+       size += sizeof(objheader_t);
+       *numBytes += size;
+       /* Send TRANS_DISAGREE to Coordinator */
+       *control = TRANS_DISAGREE;
+      }
+      //Keep track of oid locked
+      oidlocked[*objlocked] = OID(((objheader_t *)mobj));
+      (*objlocked)++;
+    } else { /* Some other transaction has aquired a write lock on this object */
+      if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+       (*v_matchlock)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       oidvernotmatch[*objvernotmatch] = oid;
+       (*objvernotmatch)++;
+       int size;
+       GETSIZE(size, mobj);
+       size += sizeof(objheader_t);
+       *numBytes += size;
+       *control = TRANS_DISAGREE;
+      }
+    }
+  }
+}
+
 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
  * to send to Coordinator based on the votes of oids involved in the transaction */
 char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
@@ -609,12 +688,22 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
     free(modptr);
 
   /* Unlock locked objects */
+  int useWriteUnlock = 0;
   for(i = 0; i < numlocked; i++) {
+    if(oidlocked[i] == -1) {
+      useWriteUnlock = 1;
+      continue;
+    }
     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
       printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
       return 1;
     }
-    UnLock(STATUSPTR(header));
+
+    if(useWriteUnlock) {
+      write_unlock(STATUSPTR(header));
+    } else {
+      read_unlock(STATUSPTR(header));
+    }
   }
   //TODO Update location lookup table
 
@@ -762,8 +851,7 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short
     } else {
       /* Check to see if versions are same */
 checkversion:
-      if (test_and_set(STATUSPTR(header))==0) {
-       //have lock
+      if (write_trylock(STATUSPTR(header))) { // Can acquire write lock
        newversion = header->version;
        if(newversion == *(versionarry + i)) {
          //Add to the notify list
@@ -771,9 +859,9 @@ checkversion:
            printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
            return;
          }
-         UnLock(STATUSPTR(header));
+         write_unlock(STATUSPTR(header));
        } else {
-         UnLock(STATUSPTR(header));
+         write_unlock(STATUSPTR(header));
          if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
            perror("processReqNotify():socket()");
            return;
index f88b06d8b37e6b543f067a689c8d02713ec05c03..9433e4fbe9bd4d28269d889807adabaff3c1cf87 100644 (file)
@@ -8,6 +8,7 @@ extern int nchashSearch;
 extern int nmhashSearch;
 extern int nprehashSearch;
 extern int nRemoteSend;
+extern int nSoftAbort;
 extern int numprefetchsites;
 void handle();
 extern pfcstats_t *evalPrefetch;
@@ -21,6 +22,7 @@ void transStatsHandler(int sig, siginfo_t* info, void *context) {
   printf("nmhashSearch = %d\n", nmhashSearch);
   printf("nprehashSearch = %d\n", nprehashSearch);
   printf("nRemoteReadSend = %d\n", nRemoteSend);
+  printf("nSoftAbort = %d\n", nSoftAbort);
   //TODO Remove later
   /*
      int i;
index 3eaa411d3d43d527e193fc848f14c0d87ac03e04..36fdcf1ecd20221b2c866b006ec3d73ce0baa39a 100644 (file)
@@ -10,6 +10,7 @@
 #include "addUdpEnhance.h"
 #include "addPrefetchEnhance.h"
 #include "gCollect.h"
+#include "dsmlock.h"
 #ifdef COMPILER
 #include "thread.h"
 #endif
@@ -54,6 +55,7 @@ int nchashSearch = 0;
 int nmhashSearch = 0;
 int nprehashSearch = 0;
 int nRemoteSend = 0;
+int nSoftAbort = 0;
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
@@ -299,7 +301,7 @@ void randomdelay() {
 
   t = time(NULL);
   req.tv_sec = 0;
-  req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
+  req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
   nanosleep(&req, NULL);
   return;
 }
@@ -622,6 +624,9 @@ int transCommit(transrecord_t *record) {
       free(thread_data_array);
       free(ltdata);
       randomdelay();
+#ifdef TRANSSTATS
+      nSoftAbort++;
+#endif
     }
 
     /* Retry trans commit procedure during soft_abort case */
@@ -714,6 +719,7 @@ void *transRequest(void *threadarg) {
     pthread_mutex_lock(&prefetchcache_mutex);
     if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      pthread_mutex_unlock(&prefetchcache_mutex);
       pthread_exit(NULL);
     }
     pthread_mutex_unlock(&prefetchcache_mutex);
@@ -958,14 +964,14 @@ void *handleLocalReq(void *threadarg) {
   int numread, i;
   unsigned int oid;
   unsigned short version;
-  void *mobj;
-  objheader_t *headptr;
 
   localtdata = (local_thread_data_array_t *) threadarg;
 
   /* Counters and arrays to formulate decision on control message to be sent */
   oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
-  oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+  oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
+                                                                                                                                              //setting a divider of read locks
+                                                                                                                                              //and write locks
 
   numread = localtdata->tdata->buffer->f.numread;
   /* Process each oid in the machine pile/ group per thread */
@@ -975,8 +981,14 @@ void *handleLocalReq(void *threadarg) {
       incr *= i;
       oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
       version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
+      commitCountForObjRead(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
     } else { // Objects Modified
+      if(i == localtdata->tdata->buffer->f.numread) {
+       oidlocked[numoidlocked] = -1;
+       numoidlocked++;
+      }
       int tmpsize;
+      objheader_t *headptr;
       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
       if (headptr == NULL) {
        printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
@@ -984,42 +996,11 @@ void *handleLocalReq(void *threadarg) {
       }
       oid = OID(headptr);
       version = headptr->version;
+      commitCountForObjMod(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
     }
-    /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+  }
 
-    /* Save the oids not found and number of oids not found for later use */
-    if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
-      /* Save the oids not found and number of oids not found for later use */
-      oidnotfound[numoidnotfound] = oid;
-      numoidnotfound++;
-    } else { /* If Obj found in machine (i.e. has not moved) */
-      /* Check if Obj is locked by any previous transaction */
-      if (test_and_set(STATUSPTR(mobj))) {
-       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
-         v_matchlock++;
-       } else { /* If versions don't match ...HARD ABORT */
-         v_nomatch++;
-         /* Send TRANS_DISAGREE to Coordinator */
-         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-         break;
-       }
-      } else {
-       //we're locked
-       /* Save all object oids that are locked on this machine during this transaction request call */
-       oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
-       numoidlocked++;
-       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
-         v_matchnolock++;
-       } else { /* If versions don't match ...HARD ABORT */
-         v_nomatch++;
-         /* Send TRANS_DISAGREE to Coordinator */
-         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-         break;
-       }
-      }
-    }
-  } // End for
-    /* Condition to send TRANS_AGREE */
+  /* Condition to send TRANS_AGREE */
   if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
   }
@@ -1035,6 +1016,7 @@ void *handleLocalReq(void *threadarg) {
   localtdata->transinfo->modptr = NULL;
   localtdata->transinfo->numlocked = numoidlocked;
   localtdata->transinfo->numnotfound = numoidnotfound;
+
   /* Lock and update count */
   //Thread sleeps until all messages from pariticipants are received by coordinator
   pthread_mutex_lock(localtdata->tdata->lock);
@@ -1048,6 +1030,7 @@ void *handleLocalReq(void *threadarg) {
     pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
   }
   pthread_mutex_unlock(localtdata->tdata->lock);
+
   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) {
     if(transAbortProcess(localtdata) != 0) {
       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
@@ -1079,6 +1062,88 @@ void *handleLocalReq(void *threadarg) {
     free(localtdata->transinfo->objnotfound);
   }
   pthread_exit(NULL);
+
+}
+
+/*  Commit info for objects modified */
+void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+                          int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
+  void *mobj;
+  /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+  /* Save the oids not found and number of oids not found for later use */
+  if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+    /* Save the oids not found and number of oids not found for later use */
+    oidnotfound[*numoidnotfound] = oid;
+    (*numoidnotfound)++;
+  } else { /* If Obj found in machine (i.e. has not moved) */
+    /* Check if Obj is locked by any previous transaction */
+    if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
+      if (version == ((objheader_t *)mobj)->version) {      /* match versions */
+       (*v_matchnolock)++;
+       //Keep track of what is locked
+       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+       (*numoidlocked)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       /* Send TRANS_DISAGREE to Coordinator */
+       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+       //Keep track of what is locked
+       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+       (*numoidlocked)++;
+       return;
+      }
+    } else { //A lock is acquired some place else
+      if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+       (*v_matchlock)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       /* Send TRANS_DISAGREE to Coordinator */
+       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+       return;
+      }
+    }
+  }
+}
+
+/*  Commit info for objects modified */
+void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
+  void *mobj;
+  /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+  /* Save the oids not found and number of oids not found for later use */
+  if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+    /* Save the oids not found and number of oids not found for later use */
+    oidnotfound[*numoidnotfound] = oid;
+    (*numoidnotfound)++;
+  } else { /* If Obj found in machine (i.e. has not moved) */
+    /* Check if Obj is locked by any previous transaction */
+    if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
+      if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
+       (*v_matchnolock)++;
+       //Keep track of what is locked
+       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+       (*numoidlocked)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       /* Send TRANS_DISAGREE to Coordinator */
+       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+       //Keep track of what is locked
+       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+       (*numoidlocked)++;
+       return;
+      }
+    } else { //Has reached max number of readers or some other transaction
+      //has acquired a lock on this object
+      if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+       (*v_matchlock)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       /* Send TRANS_DISAGREE to Coordinator */
+       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+       return;
+      }
+    }
+  }
 }
 
 /* This function completes the ABORT process if the transaction is aborting */
@@ -1090,12 +1155,21 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
   numlocked = localtdata->transinfo->numlocked;
   objlocked = localtdata->transinfo->objlocked;
 
+  int useWriteUnlock = 0;
   for (i = 0; i < numlocked; i++) {
+    if(objlocked[i] == -1) {
+      useWriteUnlock = 1;
+      continue;
+    }
     if((header = mhashSearch(objlocked[i])) == NULL) {
       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
       return 1;
     }
-    UnLock(STATUSPTR(header));
+    if(!useWriteUnlock) {
+      read_unlock(STATUSPTR(header));
+    } else {
+      write_unlock(STATUSPTR(header));
+    }
   }
 
   return 0;
@@ -1148,19 +1222,29 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
       return 1;
     }
     pthread_mutex_unlock(&mainobjstore_mutex);
+    /* Initialize read and write locks */
+    initdsmlocks(STATUSPTR(header));
     memcpy(ptrcreate, header, tmpsize);
     mhashInsert(oidcreated[i], ptrcreate);
     lhashInsert(oidcreated[i], myIpAddr);
   }
   /* Unlock locked objects */
+  int useWriteUnlock = 0;
   for(i = 0; i < numlocked; i++) {
+    if(oidlocked[i] == -1) {
+      useWriteUnlock = 1;
+      continue;
+    }
     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
       return 1;
     }
-    UnLock(STATUSPTR(header));
+    if(!useWriteUnlock) {
+      read_unlock(STATUSPTR(header));
+    } else {
+      write_unlock(STATUSPTR(header));
+    }
   }
-
   return 0;
 }
 
@@ -1847,5 +1931,6 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
   /* Clear Flags */
   STATUS(headeraddr) =0;
 
+
   return pile;
 }
index 30c516d8510a46134a3115725ac6ed778cfdf90b..f24420c131bec7399ca8c2ea70767ca7fd6bd465 100755 (executable)
@@ -360,7 +360,7 @@ if $TRANSSTATSFLAG
 then
 EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DTRANSSTATS -DCOMPILER -DDSTM -I$DSMRUNTIME"
 fi
-FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c $DSMRUNTIME/addUdpEnhance.c $DSMRUNTIME/signal.c $DSMRUNTIME/gCollect.c $DSMRUNTIME/addPrefetchEnhance.c"
+FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c $DSMRUNTIME/addUdpEnhance.c $DSMRUNTIME/signal.c $DSMRUNTIME/gCollect.c $DSMRUNTIME/addPrefetchEnhance.c $DSMRUNTIME/dsmlock.c"
 fi
 
 if $RECOVERFLAG