add changes to release locks early on version mismatch and soft abort
authoradash <adash>
Wed, 21 Oct 2009 01:47:22 +0000 (01:47 +0000)
committeradash <adash>
Wed, 21 Oct 2009 01:47:22 +0000 (01:47 +0000)
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 7762784b269736b3886510ea5cbd3d2f905f93da..2e9c6e9d2e328ea47800bda119a8f35786aa32a4 100644 (file)
@@ -242,10 +242,14 @@ char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char
 char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
 int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
 void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid);
-void getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *,
+char getCommitCountForObjMod(unsigned int *, unsigned int *, unsigned int *, int *,
                              int *, int *, int *, int *, int *, int *, char *, unsigned int, unsigned short);
-void getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *,
+char getCommitCountForObjRead(unsigned int *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *,
                               int *, int *, char *, unsigned int, unsigned short);
+
+void procRestObjs(char *, char *, int , int, int, unsigned int *, unsigned int *, int *, int *, int *, int *);
+void processVerNoMatch(unsigned int *, unsigned int *, int *, int *, int *, int *, unsigned int, unsigned short);
 /* end server portion */
 
 /* Prototypes for transactions */
index fc95dbc38e02436bc90ba278fe7cb220315bf30d..4122ddc89f4c9972b3c2d7e14ed280e29bedb05b 100644 (file)
@@ -384,7 +384,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     if (fixed->nummod > 0)
       free(modptr);
     /* Unlock objects that was locked due to this transaction */
-    int useWriteUnlock = 0;
+    int useWriteUnlock = 0; //TODO verify is this piece of unlocking code ever used
     for(i = 0; i< transinfo->numlocked; i++) {
       if(transinfo->objlocked[i] == -1) {
        useWriteUnlock = 1;
@@ -459,6 +459,8 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
    * Object store holds the modified objects involved in the transaction request */
   ptr = (char *) modptr;
 
+  char retval;
+
   /* Process each oid in the machine pile/ group per thread */
   for (i = 0; i < fixed->numread + fixed->nummod; i++) {
     if (i < fixed->numread) { //Objs only read and not modified
@@ -467,11 +469,11 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
       oid = *((unsigned int *)(objread + incr));
       incr += sizeof(unsigned int);
       version = *((unsigned short *)(objread + incr));
-      getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
+      retval=getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
                                &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
     } else {  //Objs modified
       if(i == fixed->numread) {
-       oidlocked[objlocked++] = -1;
+        oidlocked[objlocked++] = -1;
       }
       int tmpsize;
       headptr = (objheader_t *) ptr;
@@ -479,10 +481,40 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
       version = headptr->version;
       GETSIZE(tmpsize, headptr);
       ptr += sizeof(objheader_t) + tmpsize;
-      getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
+      retval=getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
                               &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
                               &numBytes, &control, oid, version);
     }
+    if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
+      //unlock objects as soon versions mismatch or locks cannot be acquired)
+      if (objlocked > 0) {
+        int useWriteUnlock = 0;
+        for(j = 0; j < objlocked; j++) {
+          if(oidlocked[j] == -1) {
+            useWriteUnlock = 1;
+            continue;
+          }
+          if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+            printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+            return 0;
+          }
+          if(useWriteUnlock) {
+            write_unlock(STATUSPTR(headptr));
+          } else {
+            read_unlock(STATUSPTR(headptr));
+          }
+        }
+        if(v_nomatch > 0)
+          free(oidlocked);
+      }
+      objlocked=0;
+      break;
+    }
+  }
+  //go through rest of the objects for version mismatches
+  if(retval==TRANS_DISAGREE || retval==TRANS_SOFT_ABORT) {
+    i++;
+    procRestObjs(objread, ptr, i, fixed->numread, fixed->nummod, oidnotfound, oidvernotmatch, &objnotfound, &objvernotmatch, &v_nomatch, &numBytes);
   }
 
   /* send TRANS_DISAGREE and objs*/
@@ -499,25 +531,28 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
       offset += size;
     }
 #endif
+    /*
     if (objlocked > 0) {
       int useWriteUnlock = 0;
       for(j = 0; j < objlocked; j++) {
-       if(oidlocked[j] == -1) {
-         useWriteUnlock = 1;
-         continue;
-       }
-       if((headptr = mhashSearch(oidlocked[j])) == NULL) {
-         printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-         return 0;
-       }
-       if(useWriteUnlock) {
-         write_unlock(STATUSPTR(headptr));
-       } else {
-         read_unlock(STATUSPTR(headptr));
-       }
+        if(oidlocked[j] == -1) {
+          useWriteUnlock = 1;
+          continue;
+        }
+        if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+          printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+          return 0;
+        }
+        if(useWriteUnlock) {
+          write_unlock(STATUSPTR(headptr));
+        } else {
+          read_unlock(STATUSPTR(headptr));
+        }
       }
       free(oidlocked);
     }
+    */
+    //control=TRANS_DISAGREE;
     send_data(acceptfd, &control, sizeof(char));
 #ifdef CACHE
     send_data(acceptfd, &numBytes, sizeof(int));
@@ -540,7 +575,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
 }
 
 /* Update Commit info for objects that are read */
-void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
+char getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
                              unsigned int *oidvernotmatch, int *objnotfound, int *objlocked, int *objvernotmatch,
                              int *v_matchnolock, int *v_matchlock, int *v_nomatch, int *numBytes,
                              char *control, unsigned int oid, unsigned short version) {
@@ -551,11 +586,13 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
     /* Save the oids not found and number of oids not found for later use */
     oidnotfound[*objnotfound] = oid;
     (*objnotfound)++;
+    *control = TRANS_DISAGREE;
   } else {     /* If Obj found in machine (i.e. has not moved) */
     /* Check if Obj is locked by any previous transaction */
     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
       if (version == ((objheader_t *)mobj)->version) { /* match versions */
        (*v_matchnolock)++;
+    *control = TRANS_AGREE;
       } else { /* If versions don't match ...HARD ABORT */
        (*v_nomatch)++;
        oidvernotmatch[*objvernotmatch] = oid;
@@ -573,6 +610,7 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
     } else {  //we are locked
       if (version == ((objheader_t *)mobj)->version) {     /* Check if versions match */
        (*v_matchlock)++;
+      *control=TRANS_SOFT_ABORT;
       } else { /* If versions don't match ...HARD ABORT */
        (*v_nomatch)++;
        oidvernotmatch[*objvernotmatch] = oid;
@@ -586,10 +624,11 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
       }
     }
   }
+  return *control;
 }
 
 /* Update Commit info for objects that are read */
-void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
+char getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidvernotmatch,
                               int *objnotfound, int *objlocked, int * objvernotmatch, int *v_matchnolock, int *v_matchlock,
                               int *v_nomatch, int *numBytes, char *control, unsigned int oid, unsigned short version) {
   void *mobj;
@@ -598,11 +637,13 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
     /* Save the oids not found and number of oids not found for later use */
     oidnotfound[*objnotfound] = oid;
     (*objnotfound)++;
+       *control = TRANS_DISAGREE;
   } else {     /* If Obj found in machine (i.e. has not moved) */
     /* Check if Obj is locked by any previous transaction */
     if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
       if (version == ((objheader_t *)mobj)->version) { /* match versions */
        (*v_matchnolock)++;
+      *control=TRANS_AGREE;
       } else { /* If versions don't match ...HARD ABORT */
        (*v_nomatch)++;
        oidvernotmatch[(*objvernotmatch)++] = oid;
@@ -619,6 +660,7 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
     } else { /* Some other transaction has aquired a write lock on this object */
       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
        (*v_matchlock)++;
+       *control=TRANS_SOFT_ABORT;
       } else { /* If versions don't match ...HARD ABORT */
        (*v_nomatch)++;
        oidvernotmatch[*objvernotmatch] = oid;
@@ -632,83 +674,162 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
       }
     }
   }
+  return *control;
 }
 
-/* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
- * to send to Coordinator based on the votes of oids involved in the transaction */
-char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
-                       int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
-                       unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
-  int val;
-  char control = 0;
-
-  /* Condition to send TRANS_AGREE */
-  if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
-    control = TRANS_AGREE;
-    /* Send control message */
-    send_data(acceptfd, &control, sizeof(char));
+void procRestObjs(char *objread, 
+                  char *objmod, 
+                  int index, 
+                  int numread, 
+                  int nummod, 
+                  unsigned int *oidnotfound, 
+                  unsigned int *oidvernotmatch,
+                  int *objnotfound, 
+                  int *objvernotmatch, 
+                  int *v_nomatch, 
+                  int *numBytes) {
+  int i;
+  unsigned int oid;
+  unsigned short version;
+
+  /* Process each oid in the machine pile/ group per thread */
+  //printf("DEBUG: index= %d, numread= %d, nummod= %d numread+nummod= %d\n", index,numread,nummod,numread+nummod);
+  for (i = index; i < numread+nummod; i++) {
+    //printf("DEBUG: i= %d\n", i);
+    //fflush(stdout);
+    if (i < numread) { //Objs only read and not modified
+      int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
+      incr *= i;
+      oid = *((unsigned int *)(objread + incr));
+      incr += sizeof(unsigned int);
+      version = *((unsigned short *)(objread + incr));
+    } else {  //Objs modified
+      objheader_t *headptr;
+      headptr = (objheader_t *) objmod;
+      oid = OID(headptr);
+      version = headptr->version;
+      int tmpsize;
+      GETSIZE(tmpsize, headptr);
+      objmod += sizeof(objheader_t) + tmpsize;
+    }
+    processVerNoMatch(oidnotfound,
+        oidvernotmatch,
+        objnotfound,
+        objvernotmatch,
+        v_nomatch,
+        numBytes,
+        oid, 
+        version);
   }
-  /* Condition to send TRANS_SOFT_ABORT */
-  if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
-    control = TRANS_SOFT_ABORT;
+  return;
+}
 
-    /* Send control message */
-    send_data(acceptfd, &control, sizeof(char));
+void processVerNoMatch(unsigned int *oidnotfound, 
+                      unsigned int *oidvernotmatch, 
+                      int *objnotfound, 
+                      int *objvernotmatch, 
+                      int *v_nomatch, 
+                      int *numBytes,
+                      unsigned int oid, 
+                      unsigned short version) {
+  void *mobj;
+  /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
 
-    /*  FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
-    if(*(objnotfound) != 0) {
-      int msg[1];
-      msg[0] = *(objnotfound);
-      send_data(acceptfd, &msg, sizeof(int));
-      int size = sizeof(unsigned int)* *(objnotfound);
-      send_data(acceptfd, oidnotfound, size);
+  if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
+    /* Save the oids not found and number of oids not found for later use */
+    oidnotfound[*objnotfound] = oid;
+    (*objnotfound)++;
+  } else {     /* If Obj found in machine (i.e. has not moved) */
+    /* Check if Obj is locked by any previous transaction */
+    //if (!write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
+    if (version != ((objheader_t *)mobj)->version) { /* match versions */
+      (*v_nomatch)++;
+      oidvernotmatch[*objvernotmatch] = oid;
+         (*objvernotmatch)++;
+         int size;
+      GETSIZE(size, mobj);
+      size += sizeof(objheader_t);
+      *numBytes += size;
     }
   }
-
-  /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
-   * if Participant receives a TRANS_COMMIT */
-  transinfo->objlocked = oidlocked;
-  transinfo->objnotfound = oidnotfound;
-  transinfo->modptr = modptr;
-  transinfo->numlocked = *(objlocked);
-  transinfo->numnotfound = *(objnotfound);
-  return control;
 }
 
-/* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
- * addresses in lookup table and also changes version number
- * Sends an ACK back to Coordinator */
-int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
-  objheader_t *header;
-  objheader_t *newheader;
-  int i = 0, offset = 0;
-  char control;
-  int tmpsize;
-
-  /* Process each modified object saved in the mainobject store */
-  for(i = 0; i < nummod; i++) {
-    if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
-      printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-      return 1;
+  /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
+   * to send to Coordinator based on the votes of oids involved in the transaction */
+  char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
+      int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
+      unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
+    int val;
+    char control = 0;
+
+    /* Condition to send TRANS_AGREE */
+    if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
+      control = TRANS_AGREE;
+      /* Send control message */
+      send_data(acceptfd, &control, sizeof(char));
     }
-    GETSIZE(tmpsize,header);
-
-    {
-      struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
-      struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
-      dst->type=src->type;
-      dst->___cachedCode___=src->___cachedCode___;
-      dst->___cachedHash___=src->___cachedHash___;
-      memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
-    }
-    header->version += 1;
-    /* If threads are waiting on this object to be updated, notify them */
-    if(header->notifylist != NULL) {
-      notifyAll(&header->notifylist, OID(header), header->version);
+    /* Condition to send TRANS_SOFT_ABORT */
+    if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
+    //if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
+      control = TRANS_SOFT_ABORT;
+
+      /* Send control message */
+      send_data(acceptfd, &control, sizeof(char));
+
+      /*  FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
+      if(*(objnotfound) != 0) {
+        int msg[1];
+        msg[0] = *(objnotfound);
+        send_data(acceptfd, &msg, sizeof(int));
+        int size = sizeof(unsigned int)* *(objnotfound);
+        send_data(acceptfd, oidnotfound, size);
+      }
     }
-    offset += sizeof(objheader_t) + tmpsize;
+
+    /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
+     * if Participant receives a TRANS_COMMIT */
+    transinfo->objlocked = oidlocked;
+    transinfo->objnotfound = oidnotfound;
+    transinfo->modptr = modptr;
+    transinfo->numlocked = *(objlocked);
+    transinfo->numnotfound = *(objnotfound);
+    return control;
   }
 
+  /* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
+   * addresses in lookup table and also changes version number
+   * Sends an ACK back to Coordinator */
+  int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
+    objheader_t *header;
+    objheader_t *newheader;
+    int i = 0, offset = 0;
+    char control;
+    int tmpsize;
+
+    /* Process each modified object saved in the mainobject store */
+    for(i = 0; i < nummod; i++) {
+      if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
+        printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+        return 1;
+      }
+      GETSIZE(tmpsize,header);
+
+      {
+        struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
+        struct ___Object___ *src=(struct ___Object___*)((char*)modptr+sizeof(objheader_t)+offset);
+        dst->type=src->type;
+        dst->___cachedCode___=src->___cachedCode___;
+        dst->___cachedHash___=src->___cachedHash___;
+        memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
+      }
+      header->version += 1;
+      /* If threads are waiting on this object to be updated, notify them */
+      if(header->notifylist != NULL) {
+        notifyAll(&header->notifylist, OID(header), header->version);
+      }
+      offset += sizeof(objheader_t) + tmpsize;
+    }
+
   if (nummod > 0)
     free(modptr);
 
index 81cd2081cbd95bbafb5a6bb20e2250255f0bc799..9007bc7e975311bade05455c467dca8977778cc3 100644 (file)
@@ -79,6 +79,7 @@ void printhex(unsigned char *, int);
 plistnode_t *createPiles();
 plistnode_t *sortPiles(plistnode_t *pileptr);
 
+//#define LOGEVENTS
 #ifdef LOGEVENTS
 char bigarray[16*1024*1024];
 int bigindex=0;
@@ -666,6 +667,7 @@ __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
       } else {
         prehashInsert(oid, headerObj);
       }
+      LOGEVENT('B');
 #endif
       return &objcopy[1];
 #else
@@ -773,6 +775,7 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
       } else {
         prehashInsert(oid, headerObj);
       }
+      LOGEVENT('B');
 #endif
       return &objcopy[1];
 #else
@@ -1049,6 +1052,7 @@ int transCommit() {
            } else {
              prehashInsert(oidToPrefetch, header);
            }
+        LOGEVENT('E');
            length = length - size;
            offset += size;
          }
@@ -1135,6 +1139,7 @@ int transCommit() {
   if(finalResponse == TRANS_ABORT) {
     //printf("Aborting trans\n");
 #ifdef TRANSSTATS
+    LOGEVENT('A');
     numTransAbort++;
 #endif
     /* Free Resources */
@@ -1143,6 +1148,7 @@ int transCommit() {
     return TRANS_ABORT;
   } else if(finalResponse == TRANS_COMMIT) {
 #ifdef TRANSSTATS
+    LOGEVENT('C');
     numTransCommit++;
 #endif
     /* Free Resources */