changes for prefetch objects on a transaction abort and updating the prefetch cache
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index d60bea5f3fbcafa7bfbd4368a109420233d069bd..d078e0c42b993aa6dbe69b2964fded9779539a11 100644 (file)
@@ -653,6 +653,34 @@ void *transRequest(void *threadarg) {
   
   /* Read control message from Participant */
   recv_data(sd, &control, sizeof(char));
+  /* Recv Objects if participant sends TRANS_DISAGREE */
+  if(control == TRANS_DISAGREE) {
+    int length;
+    recv_data(sd, &length, sizeof(int));
+    void *newAddr;
+    pthread_mutex_lock(&prefetchcache_mutex);
+    if ((newAddr = objstrAlloc(prefetchcache, length)) == NULL) {
+      printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      close(sd);
+      pthread_exit(NULL);
+    }
+    pthread_mutex_unlock(&prefetchcache_mutex);
+    recv_data(sd, newAddr, length);
+    int offset = 0;
+    while(length != 0) {
+      unsigned int oidToPrefetch;
+      objheader_t * header;
+      header = (objheader_t *) (((char *)newAddr) + offset);
+      oidToPrefetch = OID(header);
+      int size = 0;
+      GETSIZE(size, header);
+      size += sizeof(objheader_t);
+      //make an entry in prefetch hash table
+      prehashInsert(oidToPrefetch, header);
+      length = length - size;
+      offset += size;
+    }
+  }
   recvcontrol = control;
   /* Update common data structure and increment count */
   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
@@ -672,7 +700,7 @@ void *transRequest(void *threadarg) {
   }
   pthread_mutex_unlock(tdata->lock);
   
-  /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
+  /* Send the final response such as TRANS_COMMIT or TRANS_ABORT 
    * to all participants in their respective socket */
   if (sendResponse(tdata, sd) == 0) { 
     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
@@ -728,23 +756,70 @@ void decideResponse(thread_data_array_t *tdata) {
     *(tdata->replyctrl) = TRANS_ABORT;
     *(tdata->replyretry) = 0;
     /* clear objects from prefetch cache */
-    for (i = 0; i < tdata->buffer->f.numread; i++)
+    for (i = 0; i < tdata->buffer->f.numread; i++) {
       prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
-    for (i = 0; i < tdata->buffer->f.nummod; i++)
+    }
+    for (i = 0; i < tdata->buffer->f.nummod; i++) {
       prehashRemove(tdata->buffer->oidmod[i]);
+    }
   } else if(transagree == tdata->buffer->f.mcount){
     /* Send Commit */
     *(tdata->replyctrl) = TRANS_COMMIT;
     *(tdata->replyretry) = 0;
+    /* update prefetch cache */
+    /* For objects read */
+    char oidType;
+    int retval;
+    oidType = 'R'; 
+    if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
+      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      return;
+    }
+    oidType = 'M'; 
+    if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
+      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      return;
+    }
   } else { 
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
     *(tdata->replyctrl) = TRANS_ABORT;
     *(tdata->replyretry) = 1;
   }
-  
   return;
 }
 
+/* This function updates the prefetch cache when commiting objects 
+ * based on the type of oid i.e. if oid is read or oid is modified
+ * Return -1 on error else returns 0 
+ */
+int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
+  int i;
+  for (i = 0; i < numoid; i++) {
+    //find address object 
+    objheader_t *header, *newAddr;
+    int size;
+    unsigned int oid;
+    if(oidType == 'R') {
+      oid = *((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)); 
+    } else {
+      oid = tdata->buffer->oidmod[i];
+    }
+    header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
+    //copy object into prefetch cache
+    GETSIZE(size, header);
+    pthread_mutex_lock(&prefetchcache_mutex);
+    if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) {
+      printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      return -1;
+    }
+    pthread_mutex_unlock(&prefetchcache_mutex);
+    memcpy(newAddr, header, (size + sizeof(objheader_t)));
+    //make an entry in prefetch hash table
+    prehashInsert(oid, newAddr);
+  }
+  return 0;
+}
+
 /* This function sends the final response to remote machines per
  * thread in their respective socket id It returns a char that is only
  * needed to check the correctness of execution of this function
@@ -802,7 +877,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
     recv_data(sd, &size, sizeof(int));
     objcopy = objstrAlloc(record->cache, size);
     recv_data(sd, objcopy, size);
-    
     /* Insert into cache's lookup table */
     chashInsert(record->lookupTable, oid, objcopy); 
   }
@@ -821,7 +895,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
 void *handleLocalReq(void *threadarg) {
   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
   local_thread_data_array_t *localtdata;
-  int objnotfound = 0, objlocked = 0; 
+  int numoidnotfound = 0, numoidlocked = 0;
   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
   int numread, i;
   unsigned int oid;
@@ -858,8 +932,8 @@ void *handleLocalReq(void *threadarg) {
     /* Save the oids not found and number of oids not found for later use */
     if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
       /* Save the oids not found and number of oids not found for later use */
-      oidnotfound[objnotfound] = oid;
-      objnotfound++;
+      oidnotfound[numoidnotfound] = oid;
+      numoidnotfound++;
     } else { /* If Obj found in machine (i.e. has not moved) */
       /* Check if Obj is locked by any previous transaction */
       if (test_and_set(STATUSPTR(mobj))) {
@@ -873,8 +947,8 @@ void *handleLocalReq(void *threadarg) {
       } else {
        //we're locked
         /* Save all object oids that are locked on this machine during this transaction request call */
-        oidlocked[objlocked] = OID(((objheader_t *)mobj));
-        objlocked++;
+        oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
+        numoidlocked++;
         if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
           v_matchnolock++;
         } else { /* If versions don't match ...HARD ABORT */
@@ -890,7 +964,7 @@ void *handleLocalReq(void *threadarg) {
     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
   }
   /* Condition to send TRANS_SOFT_ABORT */
-  if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
+  if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
   }
   
@@ -899,8 +973,8 @@ void *handleLocalReq(void *threadarg) {
   localtdata->transinfo->objlocked = oidlocked;
   localtdata->transinfo->objnotfound = oidnotfound;
   localtdata->transinfo->modptr = NULL;
-  localtdata->transinfo->numlocked = objlocked;
-  localtdata->transinfo->numnotfound = objnotfound;
+  localtdata->transinfo->numlocked = numoidlocked;
+  localtdata->transinfo->numnotfound = numoidnotfound;
   /* Lock and update count */
   //Thread sleeps until all messages from pariticipants are received by coordinator
   pthread_mutex_lock(localtdata->tdata->lock);
@@ -952,7 +1026,7 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
     }
     UnLock(STATUSPTR(header));
   }
-  
+
   return 0;
 }