abortreaders option finished
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index 27e61f7ebe1d5a55584534b6417b428aad8eef81..85564b8b05127ad1449518bbb7c49598ed723c7a 100644 (file)
@@ -320,7 +320,7 @@ void randomdelay() {
 }
 
 /* This function initializes things required in the transaction start*/
-transrecord_t *transStart() {
+__attribute__((malloc)) transrecord_t *transStart() {
   transrecord_t *tmp;
   if((tmp = calloc(1, sizeof(transrecord_t))) == NULL) {
     printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
@@ -328,9 +328,9 @@ transrecord_t *transStart() {
   }
   tmp->cache = objstrCreate(1048576);
   tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
-#ifdef COMPILER
-  tmp->revertlist=NULL;
-#endif
+  //#ifdef COMPILER
+  //  tmp->revertlist=NULL; //Not necessary...already null
+  //#endif
   return tmp;
 }
 
@@ -354,7 +354,7 @@ INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
 
 /* This function finds the location of the objects involved in a transaction
  * and returns the pointer to the object if found in a remote location */
-objheader_t *transRead(transrecord_t *record, unsigned int oid) {
+__attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int oid) {
   unsigned int machinenumber;
   objheader_t *tmp, *objheader;
   objheader_t *objcopy;
@@ -395,6 +395,15 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 #endif
   } else 
   */
+
+#ifdef ABORTREADERS
+  if (trans->abort) {
+    //abort this transaction
+    longjmp(trans->aborttrans,1);
+  } else
+    addtransaction(oid,record);
+#endif
+
   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
 #ifdef TRANSSTATS
     nmhashSearch++;
@@ -732,6 +741,8 @@ int transCommit(transrecord_t *record) {
            free(listmid);
            return 1;
          }
+
+
          /* Invalidate objects in other machine cache */
          if(tosend[i].f.nummod > 0) {
            if((retval = invalidateObj(&(tosend[i]))) != 0) {
@@ -741,12 +752,31 @@ int transCommit(transrecord_t *record) {
              return 1;
            }
          }
+#ifdef ABORTREADERS
+         removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+         removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread, record);
+#endif
+       }
+#ifdef ABORTREADERS
+       else if (!treplyretry) {
+         removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record);
+         removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread,record);
        }
+#endif
 #endif
        send_data(sd, &finalResponse, sizeof(char));
       } else {
        /* Complete local processing */
        doLocalProcess(finalResponse, &(tosend[i]), &transinfo, record);
+#ifdef ABORTREADERS
+       if(finalResponse == TRANS_COMMIT) {
+         removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+         removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread, record);
+       } else if (!treplyretry) {
+         removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record);
+         removethisreadtransaction(tosend[i].oidmod,tosend[i].f.numread,record);
+       }
+#endif
       }
     }
 
@@ -818,8 +848,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra
       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
     } else { // Objects Modified
       if(i == tdata->f.numread) {
-       oidlocked[numoidlocked] = -1;
-       numoidlocked++;
+       oidlocked[numoidlocked++] = -1;
       }
       int tmpsize;
       objheader_t *headptr;
@@ -875,6 +904,8 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da
       fflush(stdout);
       return;
     }
+  } else {
+    printf("ERROR...No Decision\n");
   }
 
   /* Free memory */
@@ -985,16 +1016,14 @@ void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigne
       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
        (*v_matchnolock)++;
        //Keep track of what is locked
-       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
-       (*numoidlocked)++;
+       oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
       } else { /* If versions don't match ...HARD ABORT */
        (*v_nomatch)++;
        /* Send TRANS_DISAGREE to Coordinator */
        *getReplyCtrl = TRANS_DISAGREE;
 
        //Keep track of what is locked
-       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
-       (*numoidlocked)++;
+       oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
        //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
        return;
       }
@@ -1028,15 +1057,13 @@ void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsign
       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
        (*v_matchnolock)++;
        //Keep track of what is locked
-       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
-       (*numoidlocked)++;
+       oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
       } else { /* If versions don't match ...HARD ABORT */
        (*v_nomatch)++;
        /* Send TRANS_DISAGREE to Coordinator */
        *getReplyCtrl = TRANS_DISAGREE;
        //Keep track of what is locked
-       oidlocked[*numoidlocked] = OID(((objheader_t *)mobj));
-       (*numoidlocked)++;
+       oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
        //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
        return;
       }
@@ -1110,7 +1137,15 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, tra
     }
     GETSIZE(tmpsize, header);
     char *tmptcptr = (char *) tcptr;
-    memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
+    {
+      struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
+      struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
+      dst->___cachedCode___=src->___cachedCode___;
+      dst->___cachedHash___=src->___cachedHash___;
+
+      memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
+    }
+
     header->version += 1;
     if(header->notifylist != NULL) {
       notifyAll(&header->notifylist, OID(header), header->version);
@@ -1648,29 +1683,21 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
     *((unsigned int *)(&msg[1])) = numoid;
     /* Send array of oids  */
     size = sizeof(unsigned int);
-    {
-      i = 0;
-      while(i < numoid) {
-       oid = oidarry[i];
-       *((unsigned int *)(&msg[1] + size)) = oid;
-       size += sizeof(unsigned int);
-       i++;
-      }
+
+    for(i = 0;i < numoid; i++) {
+      oid = oidarry[i];
+      *((unsigned int *)(&msg[1] + size)) = oid;
+      size += sizeof(unsigned int);
     }
 
     /* Send array of version  */
-    {
-      i = 0;
-      while(i < numoid) {
-       version = versionarry[i];
-       *((unsigned short *)(&msg[1] + size)) = version;
-       size += sizeof(unsigned short);
-       i++;
-      }
+    for(i = 0;i < numoid; i++) {
+      version = versionarry[i];
+      *((unsigned short *)(&msg[1] + size)) = version;
+      size += sizeof(unsigned short);
     }
 
-    *((unsigned int *)(&msg[1] + size)) = myIpAddr;
-    size += sizeof(unsigned int);
+    *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
     *((unsigned int *)(&msg[1] + size)) = threadid;
     pthread_mutex_lock(&(ndata->threadnotify));
     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
@@ -1716,7 +1743,9 @@ void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
          prehashRemove(oid);
        }
 #endif
+       pthread_mutex_lock(&(ndata->threadnotify));
        pthread_cond_signal(&(ndata->threadcond));
+       pthread_mutex_unlock(&(ndata->threadnotify));
       }
     }
   }
@@ -1771,6 +1800,9 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
 }
 
 void transAbort(transrecord_t *trans) {
+#ifdef ABORTREADERS
+  removetransactionhash(trans->lookupTable, trans);
+#endif
   objstrDelete(trans->cache);
   chashDelete(trans->lookupTable);
   free(trans);