changes for prefetch objects on a transaction abort and updating the prefetch cache
authoradash <adash>
Sat, 31 May 2008 01:28:18 +0000 (01:28 +0000)
committeradash <adash>
Sat, 31 May 2008 01:28:18 +0000 (01:28 +0000)
when transactions commit

Robust/src/Runtime/DSTM/interface/dht.c
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index e0ef4445d97a377acf393a4dcf4a26013ee27c76..cccbcfcde45142f3f50572d1890ceee4f355e3b3 100644 (file)
@@ -39,7 +39,6 @@
 #include <netdb.h>
 #include <net/if.h>
 #include <linux/sockios.h>
-#include <sys/time.h>
 #include <sys/queue.h>
 #include "dht.h"
 #include "clookup.h" //this works for now, do we need anything better?
index 98f60f8637af078419b9474d1dc4224017ce4d4b..2bb46769c322c51e1ecc3e8483773d49d23b486b 100644 (file)
 #include "queue.h"
 #include "mcpileq.h"
 #include "threadnotify.h"
+#include <arpa/inet.h>
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <sys/time.h>
 #include <netdb.h>
 #include <netinet/in.h>
 #include <sys/types.h>
@@ -182,9 +185,11 @@ typedef struct trans_req_data {
 typedef struct trans_commit_data{
   unsigned int *objlocked;     /* Pointer to array holding oids of objects locked inside a transaction */
   unsigned int *objnotfound;    /* Pointer to array holding oids of objects not found on the participant machine */
+  unsigned int *objvernotmatch;    /* Pointer to array holding oids whose version doesn't match on the participant machine */
   void *modptr;                        /* Pointer to the address in the mainobject store of the participant that holds all modified objects */
   int numlocked;               /* no of objects locked */
   int numnotfound;             /* no of objects not found */
+  int numvernotmatch;          /* no of objects whose version doesn't match */
 } trans_commit_data_t;
 
 
@@ -213,7 +218,7 @@ typedef struct local_thread_data_array {
 
 //Structure to store mid and socketid information
 typedef struct midSocketInfo {
-       unsigned int mid;               /* To communicate with mid use sockid in this data structure*/
+       unsigned int mid;               /* To communicate with mid use sockid in this data structure */
        int sockid;
 } midSocketInfo_t;
 
@@ -280,6 +285,10 @@ void sendPrefetchReq(prefetchpile_t*, int);
 int getPrefetchResponse(int);
 unsigned short getObjType(unsigned int oid);
 int startRemoteThread(unsigned int oid, unsigned int mid);
+int updatePrefetchCache(thread_data_array_t *, int, char);
+
+
+
 /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid);
 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid);
index e1ec8c926cd2620ab51a761e2c2574061bd6907c..4a35c19f3b4fc749304415b59b4905dc0b8023fc 100644 (file)
@@ -143,35 +143,34 @@ void *dstmAccept(void *acceptfd) {
       /* Read oid requested and search if available */
       recv_data((int)acceptfd, &oid, sizeof(unsigned int));
       if((srcObj = mhashSearch(oid)) == NULL) {
-       printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
-       break;
+        printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
+        break;
       }
       h = (objheader_t *) srcObj;
       GETSIZE(size, h);
       size += sizeof(objheader_t);
       sockid = (int) acceptfd;
-      
       if (h == NULL) {
-       ctrl = OBJECT_NOT_FOUND;
-       send_data(sockid, &ctrl, sizeof(char));
+        ctrl = OBJECT_NOT_FOUND;
+        send_data(sockid, &ctrl, sizeof(char));
       } else {
-       /* Type */
-       char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
-       *((int *)&msg[1])=size;
-       send_data(sockid, &msg, sizeof(msg));
-       send_data(sockid, h, size);
+        // Type 
+        char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+        *((int *)&msg[1])=size;
+        send_data(sockid, &msg, sizeof(msg));
+        send_data(sockid, h, size);
       }
       break;
-      
+
     case READ_MULT_REQUEST:
       break;
-      
+
     case MOVE_REQUEST:
       break;
-      
+
     case MOVE_MULT_REQUEST:
       break;
-      
+
     case TRANS_REQUEST:
       /* Read transaction request */
       transinfo.objlocked = NULL;
@@ -180,20 +179,20 @@ void *dstmAccept(void *acceptfd) {
       transinfo.numlocked = 0;
       transinfo.numnotfound = 0;
       if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
-       printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
-       pthread_exit(NULL);
+        printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
+        pthread_exit(NULL);
       }
       break;
     case TRANS_PREFETCH:
       if((val = prefetchReq((int)acceptfd)) != 0) {
-       printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
-       break;
+        printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
+        break;
       }
       break;
     case TRANS_PREFETCH_RESPONSE:
       if((val = getPrefetchResponse((int) acceptfd)) != 0) {
-       printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
-       break;
+        printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+        break;
       }
       break;
     case START_REMOTE_THREAD:
@@ -201,17 +200,17 @@ void *dstmAccept(void *acceptfd) {
       objType = getObjType(oid);
       startDSMthread(oid, objType);
       break;
-      
+
     case THREAD_NOTIFY_REQUEST:
       recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
       size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
       if((buffer = calloc(1,size)) == NULL) {
-       printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
-       pthread_exit(NULL);
+        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+        pthread_exit(NULL);
       }
-      
+
       recv_data((int)acceptfd, buffer, size);
-      
+
       oidarry = calloc(numoid, sizeof(unsigned int)); 
       memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
       size = sizeof(unsigned int) * numoid;
@@ -223,18 +222,18 @@ void *dstmAccept(void *acceptfd) {
       threadid = *((unsigned int *)(buffer+size));
       processReqNotify(numoid, oidarry, versionarry, mid, threadid);
       free(buffer);
-      
+
       break;
 
     case THREAD_NOTIFY_RESPONSE:
       size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
       if((buffer = calloc(1,size)) == NULL) {
-       printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
-       pthread_exit(NULL);
+        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+        pthread_exit(NULL);
       }
-      
+
       recv_data((int)acceptfd, buffer, size);
-      
+
       oid = *((unsigned int *)buffer);
       size = sizeof(unsigned int);
       version = *((unsigned short *)(buffer+size));
@@ -252,56 +251,56 @@ void *dstmAccept(void *acceptfd) {
     }
   }
 
- closeconnection:
+closeconnection:
   /* Close connection */
   if (close((int)acceptfd) == -1)
     perror("close");
   pthread_exit(NULL);
 }
-  
+
 /* This function reads the information available in a transaction request
  * and makes a function call to process the request */
 int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
-       char *ptr;
-       void *modptr;
-       unsigned int *oidmod, oid;
-       fixed_data_t fixed;
-       objheader_t *headaddr;
-       int sum, i, size, n, val;
-
-       oidmod = NULL;
-
-       /* Read fixed_data_t data structure */ 
-       size = sizeof(fixed) - 1;
-       ptr = (char *)&fixed;;
-       fixed.control = TRANS_REQUEST;
-       recv_data((int)acceptfd, ptr+1, size);
-
-       /* Read list of mids */
-       int mcount = fixed.mcount;
-       size = mcount * sizeof(unsigned int);
-       unsigned int listmid[mcount];
-       ptr = (char *) listmid;
-       recv_data((int)acceptfd, ptr, size);
-       
-       /* Read oid and version tuples for those objects that are not modified in the transaction */
-       int numread = fixed.numread;
-       size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
-       char objread[size];
-       if(numread != 0) { //If pile contains more than one object to be read, 
-                         // keep reading all objects
-               recv_data((int)acceptfd, objread, size);        
-       }
-       
-       /* Read modified objects */
-       if(fixed.nummod != 0) {
-               if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
-                       printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
-                       return 1;
-               }
-               size = fixed.sum_bytes;
-               recv_data((int)acceptfd, modptr, size); 
-       }
+  char *ptr;
+  void *modptr;
+  unsigned int *oidmod, oid;
+  fixed_data_t fixed;
+  objheader_t *headaddr;
+  int sum, i, size, n, val;
+
+  oidmod = NULL;
+
+  /* Read fixed_data_t data structure */ 
+  size = sizeof(fixed) - 1;
+  ptr = (char *)&fixed;;
+  fixed.control = TRANS_REQUEST;
+  recv_data((int)acceptfd, ptr+1, size);
+
+  /* Read list of mids */
+  int mcount = fixed.mcount;
+  size = mcount * sizeof(unsigned int);
+  unsigned int listmid[mcount];
+  ptr = (char *) listmid;
+  recv_data((int)acceptfd, ptr, size);
+
+  /* Read oid and version tuples for those objects that are not modified in the transaction */
+  int numread = fixed.numread;
+  size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
+  char objread[size];
+  if(numread != 0) { //If pile contains more than one object to be read, 
+    // keep reading all objects
+    recv_data((int)acceptfd, objread, size);   
+  }
+
+  /* Read modified objects */
+  if(fixed.nummod != 0) {
+    if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
+      printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+      return 1;
+    }
+    size = fixed.sum_bytes;
+    recv_data((int)acceptfd, modptr, size);    
+  }
 
        /* Create an array of oids for modified objects */
        oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
@@ -405,7 +404,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
        if (transinfo->objnotfound != NULL) {
                free(transinfo->objnotfound);
        }
-
        return 0;
 }
 
@@ -416,16 +414,17 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
        unsigned short version;
        char control = 0, *ptr;
        unsigned int oid;
-       unsigned int *oidnotfound, *oidlocked;
+       unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
        void *mobj;
        objheader_t *headptr;
 
        /* Counters and arrays to formulate decision on control message to be sent */
        oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
        oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
-       int objnotfound = 0, objlocked = 0;
+       oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int)); 
+       int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
        int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
-
+    int numBytes = 0;
        /* modptr points to the beginning of the object store 
         * created at the Pariticipant. 
         * Object store holds the modified objects involved in the transaction request */ 
@@ -462,20 +461,14 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
               v_matchlock++;
             } else {/* If versions don't match ...HARD ABORT */
               v_nomatch++;
+              oidvernotmatch[objvernotmatch] = oid;
+              objvernotmatch++;
+              int size;
+              GETSIZE(size, mobj);
+              size += sizeof(objheader_t);
+              numBytes += size;
               /* Send TRANS_DISAGREE to Coordinator */
               control = TRANS_DISAGREE;
-              if (objlocked > 0) {
-                for(j = 0; j < objlocked; j++) {
-                  if((headptr = mhashSearch(oidlocked[j])) == NULL) {
-                    printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                    return 0;
-                  }
-                 UnLock(STATUSPTR(headptr));
-                }
-                free(oidlocked);
-              }
-              send_data(acceptfd, &control, sizeof(char));
-              return control;
             }
           } else {/* If Obj is not locked then lock object */
             /* Save all object oids that are locked on this machine during this transaction request call */
@@ -485,26 +478,50 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
               v_matchnolock++;
             } else { /* If versions don't match ...HARD ABORT */
               v_nomatch++;
+              oidvernotmatch[objvernotmatch] = oid;
+              objvernotmatch++;
+              int size;
+              GETSIZE(size, mobj);
+              size += sizeof(objheader_t);
+              numBytes += size;
               control = TRANS_DISAGREE;
-              if (objlocked > 0) {
-                for(j = 0; j < objlocked; j++) {
-                  if((headptr = mhashSearch(oidlocked[j])) == NULL) {
-                    printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
-                    return 0;
-                  }
-                 UnLock(STATUSPTR(headptr));
-                }
-                free(oidlocked);
-              }
-
-              /* Send TRANS_DISAGREE to Coordinator */
-              send_data(acceptfd, &control, sizeof(char));
-              return control;
             }
           }
         }
        }
        
+       /* send TRANS_DISAGREE and objs*/
+    if(v_nomatch > 0) {
+      char *objs = calloc(1, numBytes);
+      int j, offset = 0;
+      for(j = 0; j<objvernotmatch; j++) {
+        objheader_t *header = mhashSearch(oidvernotmatch[j]);
+        int size = 0;
+        GETSIZE(size, header);
+        size += sizeof(objheader_t);
+        memcpy(objs+offset, header, size);
+        offset += size;
+      }
+      if (objlocked > 0) {
+        for(j = 0; j < objlocked; j++) {
+          if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+            printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+            return 0;
+          }
+          UnLock(STATUSPTR(headptr));
+        }
+        free(oidlocked);
+      }
+      send_data(acceptfd, &control, sizeof(char));
+      send_data(acceptfd, &numBytes, sizeof(int));
+      send_data(acceptfd, objs, numBytes);
+      transinfo->objvernotmatch = oidvernotmatch;
+      transinfo->numvernotmatch = objvernotmatch;
+      free(objs);
+      free(transinfo->objvernotmatch);
+      return control;
+    }
+
        /* Decide what control message to send to Coordinator */
        if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
                                        modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
@@ -513,7 +530,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
        }
        
        return control;
-
 }
 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
  * to send to Coordinator based on the votes of oids involved in the transaction */
@@ -536,7 +552,7 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
                /* Send control message */
                send_data(acceptfd, &control, sizeof(char));
        
-               /* Send number of oids not found and the missing oids if objects are missing in the machine */
+               /*  FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
                if(*(objnotfound) != 0) { 
                        int msg[1];
                        msg[0] = *(objnotfound);
index d60bea5f3fbcafa7bfbd4368a109420233d069bd..d078e0c42b993aa6dbe69b2964fded9779539a11 100644 (file)
@@ -653,6 +653,34 @@ void *transRequest(void *threadarg) {
   
   /* Read control message from Participant */
   recv_data(sd, &control, sizeof(char));
+  /* Recv Objects if participant sends TRANS_DISAGREE */
+  if(control == TRANS_DISAGREE) {
+    int length;
+    recv_data(sd, &length, sizeof(int));
+    void *newAddr;
+    pthread_mutex_lock(&prefetchcache_mutex);
+    if ((newAddr = objstrAlloc(prefetchcache, length)) == NULL) {
+      printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      close(sd);
+      pthread_exit(NULL);
+    }
+    pthread_mutex_unlock(&prefetchcache_mutex);
+    recv_data(sd, newAddr, length);
+    int offset = 0;
+    while(length != 0) {
+      unsigned int oidToPrefetch;
+      objheader_t * header;
+      header = (objheader_t *) (((char *)newAddr) + offset);
+      oidToPrefetch = OID(header);
+      int size = 0;
+      GETSIZE(size, header);
+      size += sizeof(objheader_t);
+      //make an entry in prefetch hash table
+      prehashInsert(oidToPrefetch, header);
+      length = length - size;
+      offset += size;
+    }
+  }
   recvcontrol = control;
   /* Update common data structure and increment count */
   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
@@ -672,7 +700,7 @@ void *transRequest(void *threadarg) {
   }
   pthread_mutex_unlock(tdata->lock);
   
-  /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
+  /* Send the final response such as TRANS_COMMIT or TRANS_ABORT 
    * to all participants in their respective socket */
   if (sendResponse(tdata, sd) == 0) { 
     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
@@ -728,23 +756,70 @@ void decideResponse(thread_data_array_t *tdata) {
     *(tdata->replyctrl) = TRANS_ABORT;
     *(tdata->replyretry) = 0;
     /* clear objects from prefetch cache */
-    for (i = 0; i < tdata->buffer->f.numread; i++)
+    for (i = 0; i < tdata->buffer->f.numread; i++) {
       prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
-    for (i = 0; i < tdata->buffer->f.nummod; i++)
+    }
+    for (i = 0; i < tdata->buffer->f.nummod; i++) {
       prehashRemove(tdata->buffer->oidmod[i]);
+    }
   } else if(transagree == tdata->buffer->f.mcount){
     /* Send Commit */
     *(tdata->replyctrl) = TRANS_COMMIT;
     *(tdata->replyretry) = 0;
+    /* update prefetch cache */
+    /* For objects read */
+    char oidType;
+    int retval;
+    oidType = 'R'; 
+    if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
+      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      return;
+    }
+    oidType = 'M'; 
+    if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
+      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      return;
+    }
   } else { 
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
     *(tdata->replyctrl) = TRANS_ABORT;
     *(tdata->replyretry) = 1;
   }
-  
   return;
 }
 
+/* This function updates the prefetch cache when commiting objects 
+ * based on the type of oid i.e. if oid is read or oid is modified
+ * Return -1 on error else returns 0 
+ */
+int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
+  int i;
+  for (i = 0; i < numoid; i++) {
+    //find address object 
+    objheader_t *header, *newAddr;
+    int size;
+    unsigned int oid;
+    if(oidType == 'R') {
+      oid = *((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)); 
+    } else {
+      oid = tdata->buffer->oidmod[i];
+    }
+    header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
+    //copy object into prefetch cache
+    GETSIZE(size, header);
+    pthread_mutex_lock(&prefetchcache_mutex);
+    if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) {
+      printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      return -1;
+    }
+    pthread_mutex_unlock(&prefetchcache_mutex);
+    memcpy(newAddr, header, (size + sizeof(objheader_t)));
+    //make an entry in prefetch hash table
+    prehashInsert(oid, newAddr);
+  }
+  return 0;
+}
+
 /* This function sends the final response to remote machines per
  * thread in their respective socket id It returns a char that is only
  * needed to check the correctness of execution of this function
@@ -802,7 +877,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
     recv_data(sd, &size, sizeof(int));
     objcopy = objstrAlloc(record->cache, size);
     recv_data(sd, objcopy, size);
-    
     /* Insert into cache's lookup table */
     chashInsert(record->lookupTable, oid, objcopy); 
   }
@@ -821,7 +895,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
 void *handleLocalReq(void *threadarg) {
   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
   local_thread_data_array_t *localtdata;
-  int objnotfound = 0, objlocked = 0; 
+  int numoidnotfound = 0, numoidlocked = 0;
   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
   int numread, i;
   unsigned int oid;
@@ -858,8 +932,8 @@ void *handleLocalReq(void *threadarg) {
     /* Save the oids not found and number of oids not found for later use */
     if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
       /* Save the oids not found and number of oids not found for later use */
-      oidnotfound[objnotfound] = oid;
-      objnotfound++;
+      oidnotfound[numoidnotfound] = oid;
+      numoidnotfound++;
     } else { /* If Obj found in machine (i.e. has not moved) */
       /* Check if Obj is locked by any previous transaction */
       if (test_and_set(STATUSPTR(mobj))) {
@@ -873,8 +947,8 @@ void *handleLocalReq(void *threadarg) {
       } else {
        //we're locked
         /* Save all object oids that are locked on this machine during this transaction request call */
-        oidlocked[objlocked] = OID(((objheader_t *)mobj));
-        objlocked++;
+        oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
+        numoidlocked++;
         if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
           v_matchnolock++;
         } else { /* If versions don't match ...HARD ABORT */
@@ -890,7 +964,7 @@ void *handleLocalReq(void *threadarg) {
     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
   }
   /* Condition to send TRANS_SOFT_ABORT */
-  if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
+  if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
   }
   
@@ -899,8 +973,8 @@ void *handleLocalReq(void *threadarg) {
   localtdata->transinfo->objlocked = oidlocked;
   localtdata->transinfo->objnotfound = oidnotfound;
   localtdata->transinfo->modptr = NULL;
-  localtdata->transinfo->numlocked = objlocked;
-  localtdata->transinfo->numnotfound = objnotfound;
+  localtdata->transinfo->numlocked = numoidlocked;
+  localtdata->transinfo->numnotfound = numoidnotfound;
   /* Lock and update count */
   //Thread sleeps until all messages from pariticipants are received by coordinator
   pthread_mutex_lock(localtdata->tdata->lock);
@@ -952,7 +1026,7 @@ int transAbortProcess(local_thread_data_array_t  *localtdata) {
     }
     UnLock(STATUSPTR(header));
   }
-  
+
   return 0;
 }