Changes to runtime for read/write lock implementation
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index 3eaa411d3d43d527e193fc848f14c0d87ac03e04..36fdcf1ecd20221b2c866b006ec3d73ce0baa39a 100644 (file)
@@ -10,6 +10,7 @@
 #include "addUdpEnhance.h"
 #include "addPrefetchEnhance.h"
 #include "gCollect.h"
+#include "dsmlock.h"
 #ifdef COMPILER
 #include "thread.h"
 #endif
@@ -54,6 +55,7 @@ int nchashSearch = 0;
 int nmhashSearch = 0;
 int nprehashSearch = 0;
 int nRemoteSend = 0;
+int nSoftAbort = 0;
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
@@ -299,7 +301,7 @@ void randomdelay() {
 
   t = time(NULL);
   req.tv_sec = 0;
-  req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
+  req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
   nanosleep(&req, NULL);
   return;
 }
@@ -622,6 +624,9 @@ int transCommit(transrecord_t *record) {
       free(thread_data_array);
       free(ltdata);
       randomdelay();
+#ifdef TRANSSTATS
+      nSoftAbort++;
+#endif
     }
 
     /* Retry trans commit procedure during soft_abort case */
@@ -714,6 +719,7 @@ void *transRequest(void *threadarg) {
     pthread_mutex_lock(&prefetchcache_mutex);
     if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      pthread_mutex_unlock(&prefetchcache_mutex);
       pthread_exit(NULL);
     }
     pthread_mutex_unlock(&prefetchcache_mutex);
@@ -958,14 +964,14 @@ void *handleLocalReq(void *threadarg) {
   int numread, i;
   unsigned int oid;
   unsigned short version;
-  void *mobj;
-  objheader_t *headptr;
 
   localtdata = (local_thread_data_array_t *) threadarg;
 
   /* Counters and arrays to formulate decision on control message to be sent */
   oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
-  oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
+  oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
+                                                                                                                                              //setting a divider of read locks
+                                                                                                                                              //and write locks
 
   numread = localtdata->tdata->buffer->f.numread;
   /* Process each oid in the machine pile/ group per thread */
@@ -975,8 +981,14 @@ void *handleLocalReq(void *threadarg) {
       incr *= i;
       oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
       version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
+      commitCountForObjRead(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
     } else { // Objects Modified
+      if(i == localtdata->tdata->buffer->f.numread) {
+       oidlocked[numoidlocked] = -1;
+       numoidlocked++;
+      }
       int tmpsize;
+      objheader_t *headptr;
       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
       if (headptr == NULL) {
        printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
@@ -984,42 +996,11 @@ void *handleLocalReq(void *threadarg) {
       }
       oid = OID(headptr);
       version = headptr->version;
+      commitCountForObjMod(localtdata, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
     }
-    /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+  }
 
-    /* Save the oids not found and number of oids not found for later use */
-    if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
-      /* Save the oids not found and number of oids not found for later use */
-      oidnotfound[numoidnotfound] = oid;
-      numoidnotfound++;
-    } else { /* If Obj found in machine (i.e. has not moved) */
-      /* Check if Obj is locked by any previous transaction */
-      if (test_and_set(STATUSPTR(mobj))) {
-       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
-         v_matchlock++;
-       } else { /* If versions don't match ...HARD ABORT */
-         v_nomatch++;
-         /* Send TRANS_DISAGREE to Coordinator */
-         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-         break;
-       }
-      } else {
-       //we're locked
-       /* Save all object oids that are locked on this machine during this transaction request call */
-       oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
-       numoidlocked++;
-       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
-         v_matchnolock++;
-       } else { /* If versions don't match ...HARD ABORT */
-         v_nomatch++;
-         /* Send TRANS_DISAGREE to Coordinator */
-         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-         break;
-       }
-      }
-    }
-  } // End for
-    /* Condition to send TRANS_AGREE */
+  /* Condition to send TRANS_AGREE */
   if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
   }
@@ -1035,6 +1016,7 @@ void *handleLocalReq(void *threadarg) {
   localtdata->transinfo->modptr = NULL;
   localtdata->transinfo->numlocked = numoidlocked;
   localtdata->transinfo->numnotfound = numoidnotfound;
+
   /* Lock and update count */
   //Thread sleeps until all messages from pariticipants are received by coordinator
   pthread_mutex_lock(localtdata->tdata->lock);
@@ -1048,6 +1030,7 @@ void *handleLocalReq(void *threadarg) {
     pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
   }
   pthread_mutex_unlock(localtdata->tdata->lock);
+
   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) {
     if(transAbortProcess(localtdata) != 0) {
       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
@@ -1079,6 +1062,88 @@ void *handleLocalReq(void *threadarg) {
     free(localtdata->transinfo->objnotfound);
   }
   pthread_exit(NULL);
+
+}
+
+/*  Commit info for objects modified */
+void commitCountForObjMod(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+                          int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
+  void *mobj;
+  /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+  /* Save the oids not found and number of oids not found for later use */
+  if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+    /* Save the oids not found and number of oids not found for later use */
+    oidnotfound[*numoidnotfound] = oid;
+    (*numoidnotfound)++;
+  } else { /* If Obj found in machine (i.e. has not moved) */
+    /* Check if Obj is locked by any previous transaction */
+    if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
+      if (version == ((objheader_t *)mobj)->version) {      /* match versions */
+       (*v_matchnolock)++;
+       //Keep track of what is locked
+       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+       (*numoidlocked)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       /* Send TRANS_DISAGREE to Coordinator */
+       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+       //Keep track of what is locked
+       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+       (*numoidlocked)++;
+       return;
+      }
+    } else { //A lock is acquired some place else
+      if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+       (*v_matchlock)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       /* Send TRANS_DISAGREE to Coordinator */
+       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+       return;
+      }
+    }
+  }
+}
+
+/*  Commit info for objects modified */
+void commitCountForObjRead(local_thread_data_array_t *localtdata, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
+                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
+  void *mobj;
+  /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+  /* Save the oids not found and number of oids not found for later use */
+  if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
+    /* Save the oids not found and number of oids not found for later use */
+    oidnotfound[*numoidnotfound] = oid;
+    (*numoidnotfound)++;
+  } else { /* If Obj found in machine (i.e. has not moved) */
+    /* Check if Obj is locked by any previous transaction */
+    if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
+      if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
+       (*v_matchnolock)++;
+       //Keep track of what is locked
+       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+       (*numoidlocked)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       /* Send TRANS_DISAGREE to Coordinator */
+       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+       //Keep track of what is locked
+       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
+       (*numoidlocked)++;
+       return;
+      }
+    } else { //Has reached max number of readers or some other transaction
+      //has acquired a lock on this object
+      if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+       (*v_matchlock)++;
+      } else { /* If versions don't match ...HARD ABORT */
+       (*v_nomatch)++;
+       /* Send TRANS_DISAGREE to Coordinator */
+       localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+       return;
+      }
+    }
+  }
 }
 
 /* This function completes the ABORT process if the transaction is aborting */
@@ -1090,12 +1155,21 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
   numlocked = localtdata->transinfo->numlocked;
   objlocked = localtdata->transinfo->objlocked;
 
+  int useWriteUnlock = 0;
   for (i = 0; i < numlocked; i++) {
+    if(objlocked[i] == -1) {
+      useWriteUnlock = 1;
+      continue;
+    }
     if((header = mhashSearch(objlocked[i])) == NULL) {
       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
       return 1;
     }
-    UnLock(STATUSPTR(header));
+    if(!useWriteUnlock) {
+      read_unlock(STATUSPTR(header));
+    } else {
+      write_unlock(STATUSPTR(header));
+    }
   }
 
   return 0;
@@ -1148,19 +1222,29 @@ int transComProcess(local_thread_data_array_t  *localtdata) {
       return 1;
     }
     pthread_mutex_unlock(&mainobjstore_mutex);
+    /* Initialize read and write locks */
+    initdsmlocks(STATUSPTR(header));
     memcpy(ptrcreate, header, tmpsize);
     mhashInsert(oidcreated[i], ptrcreate);
     lhashInsert(oidcreated[i], myIpAddr);
   }
   /* Unlock locked objects */
+  int useWriteUnlock = 0;
   for(i = 0; i < numlocked; i++) {
+    if(oidlocked[i] == -1) {
+      useWriteUnlock = 1;
+      continue;
+    }
     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
       return 1;
     }
-    UnLock(STATUSPTR(header));
+    if(!useWriteUnlock) {
+      read_unlock(STATUSPTR(header));
+    } else {
+      write_unlock(STATUSPTR(header));
+    }
   }
-
   return 0;
 }
 
@@ -1847,5 +1931,6 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
   /* Clear Flags */
   STATUS(headeraddr) =0;
 
+
   return pile;
 }