debugging in progress
authorjihoonl <jihoonl>
Wed, 29 Jun 2011 06:31:18 +0000 (06:31 +0000)
committerjihoonl <jihoonl>
Wed, 29 Jun 2011 06:31:18 +0000 (06:31 +0000)
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index c455ec84a3e4abab2ae7106230d0caea8f34cba6..eb0836ee5ac4ed405e872c63f5cd461f7d8b84be 100644 (file)
@@ -529,7 +529,6 @@ void *dstmAccept(void *acceptfd) {
           else {
             printf("Got new Leader! : %d\n",epoch_num);
             pthread_mutex_lock(&recovery_mutex);
-            currentEpoch = epoch_num;
             okCommit = TRANS_BEFORE;
             leader_index = new_leader_index;
             pthread_mutex_unlock(&recovery_mutex);
@@ -932,7 +931,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
 
 //  printf("%s -> transID : %u\n",__func__,fixed->transid);
   if(inspectEpoch(fixed->epoch_num,"procesClient1") < 0) {
-    printf("%s-> Higher Epoch current epoch = %u epoch %u\n",__func__,currentEpoch,fixed->epoch_num);
+//    printf("%s-> Higher Epoch current epoch = %u epoch %u\n",__func__,currentEpoch,fixed->epoch_num);
     control = RESPOND_HIGHER_EPOCH;
     send_data((int)acceptfd,&control,sizeof(char));
   }
@@ -947,6 +946,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
        int timeout1 = recv_data((int)acceptfd, &control, sizeof(char));
   int timeout2 = recv_data((int)acceptfd, &epoch_num, sizeof(unsigned int));
 
+//  printf("%s -> Received for transID : %u\n",__func__,fixed->transid);
   if(timeout1 < 0 || timeout2 < 0) {  // timeout. failed to receiving data from coordinator
     control = DECISION_LOST;
   }
@@ -958,7 +958,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
   // check if it is allowed to commit
   tNode->decision = control;
   do {
-    tNode->status = TRANS_INPROGRESS
+    tNode->status = TRANS_BEFORE
     if(okCommit != TRANS_BEFORE) {
       if(inspectEpoch(tNode->epoch_num,"processCleint2") > 0) {
         tNode->status = TRANS_INPROGRESS;
@@ -976,11 +976,10 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     }
     else {
       tNode->status = TRANS_WAIT;
-//      printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
-//      sleep(3);
+      printf("%s -> Waiting!! \ttransID : %u decision : %d status : %d \n",__func__,tNode->transid,tNode->decision,tNode->status);
+      sleep(3);
       randomdelay();
     }
-    
   }while(tNode->status != TRANS_AFTER);
 
 //  printf("%s -> trans ID : %u is cleared\n",__func__,tNode->transid);
@@ -1882,7 +1881,7 @@ int stopTransactions(int TRANS_FLAG,unsigned int epoch_num)
       while(walker)
       {
         // locking
-        while(walker->status != TRANS_WAIT && tlistSearch(transList,walker->transid) != NULL) {
+        while((walker->status != TRYING_TO_COMMIT && walker->status != TRANS_WAIT) && tlistSearch(transList,walker->transid) != NULL) {
 //          printf("%s -> BEFORE transid : %u - decision %d Status : %d \n",__func__,walker->transid,walker->decision,walker->status);
           if(inspectEpoch(epoch_num,"stopTrans_Before") < 0) {
 //            printf("%s -> Higher Epoch is seen, walker->epoch = %u currentEpoch = %u\n",__func__,epoch_num,currentEpoch);
@@ -1919,7 +1918,7 @@ int stopTransactions(int TRANS_FLAG,unsigned int epoch_num)
         return -1;
       }
 
-//      sleep(5);
+ //     sleep(5);
       randomdelay();
     }while(size != 0);
   }
index 5b5d0d6147c9bd295d85b7e030c8af5e5b3f79a6..6b07b3d0f8adad28067a2091d1cc18068cefcbd1 100644 (file)
@@ -19,7 +19,6 @@
 #include "abortreaders.h"
 #endif
 #include "trans.h"
-#include "mlp_lock.h"
 
 #ifdef RECOVERY
 #include <unistd.h>
@@ -113,6 +112,8 @@ char ip[16];      // for debugging purpose
 extern tlist_t* transList;
 extern pthread_mutex_t translist_mutex;
 extern pthread_mutex_t clearNotifyList_mutex;
+pthread_mutex_t oidlock;
+pthread_mutex_t tidlock;
 
 
 unsigned int currentEpoch;
@@ -269,7 +270,7 @@ GDBRECV1:
           goto GDBRECV1;
 #endif
 
-#ifdef DEBUG
+#ifndef DEBUG
         printf("%s -> Unexpected ERROR!\n",__func__);
         printf("%s-> errno = %d %s\n", __func__, errno, strerror(errno));
 #endif
@@ -281,7 +282,7 @@ GDBRECV1:
       //Case: numbytes==0
       //machine has failed -- this case probably doesn't occur in reality
       //
-#ifdef DEBUG
+#ifndef DEBUG
       printf("%s -> SHOULD NOT BE HERE\n",__func__);
 #endif
       return -1;
@@ -573,6 +574,10 @@ void transInit() {
   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
   pthread_mutex_init(&notifymutex, NULL);
   pthread_mutex_init(&atomicObjLock, NULL);
+#ifdef RECOVERY                      
+  pthread_mutex_init(&oidlock,NULL);
+  pthread_mutex_init(&tidlock,NULL);
+#endif
 
 
 #ifdef CACHE
@@ -977,6 +982,7 @@ remoteread:
 
 /* This function creates objects in the transaction record */
 objheader_t *transCreateObj(unsigned int size) {
+  pthread_mutex_lock(&oidlock);
   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
   OID(tmp) = getNewOID();
   tmp->notifylist = NULL;
@@ -984,6 +990,7 @@ objheader_t *transCreateObj(unsigned int size) {
   tmp->isBackup = 0;
   STATUS(tmp) = NEW;
   t_chashInsert(OID(tmp), tmp);
+  pthread_mutex_unlock(&oidlock);
 #ifdef COMPILER
   return &tmp[1]; //want space after object header
 #else
@@ -1382,7 +1389,14 @@ int transCommit() {
                        pile = pile->next;
                } //end of pile processing
    
-               /* Recv Ctrl msgs from all machines */
+
+   pthread_mutex_lock(&translist_mutex);                                           
+   transList = tlistInsertNode(transList,transID,-3,TRYING_TO_COMMIT,epoch_num);   
+   tNode = tlistSearch(transList,transID);                                         
+   pthread_mutex_unlock(&translist_mutex);                                         
+    
+    
+    /* Recv Ctrl msgs from all machines */
 #ifdef DEBUG
                printf("%s-> Finished sending transaction read/mod objects transID = %u\n",__func__,transID);
 #endif
@@ -1393,7 +1407,7 @@ int transCommit() {
                        if(sd != 0) {
                                char control;
         int timeout;            // a variable to check if the connection is still alive. if it is -1, then need to transcommit again
-//        printf("%s -> Waiting for mid : %s transID = %u\n",__func__,midtoIPString(midlist[i]),transID);
+//        printf("%s -> Waiting for mid : %s transID = %u sd = %d\n",__func__,midtoIPString(midlist[i]),transID,sd);
         timeout = recv_data(sd, &control, sizeof(char));
 
 //        printf("%s -> Received mid : %s control %d timeout = %d\n",__func__,midtoIPString(midlist[i]),control,timeout);
@@ -1457,13 +1471,14 @@ int transCommit() {
       printf("%s -> Received Higher epoch\n",__func__);
       finalResponse = TRANS_ABORT;
       treplyretry = 0;
+//      sleep(5);
     }
 #endif
-//    printf("%s -> transID = %u Passed this point\n",__func__,transID);
-    pthread_mutex_lock(&translist_mutex);
-    transList = tlistInsertNode(transList,transID,-3,TRYING_TO_COMMIT,epoch_num);
-    tNode = tlistSearch(transList,transID);
-    pthread_mutex_unlock(&translist_mutex);
+ //   printf("%s -> transID = %u Passed this point\n",__func__,transID);
+
+
+
+
 
 #ifdef CACHE
     if (finalResponse == TRANS_COMMIT) {
@@ -1489,7 +1504,6 @@ int transCommit() {
       tNode->status = TRANS_AFTER;
     }
     else { 
-      tNode->status = TRYING_TO_COMMIT;
       if(inspectEpoch(epoch_num,"TRANS_COMMIT2") > 0) {
         treplyretry = 1; 
       }
@@ -1589,6 +1603,7 @@ void commitMessages(unsigned int epoch_num,int* socklist,unsigned int deadsd,int
            }
 #endif
 #endif
+//      printf("%s -> Trans Id = %u Sending to sd = %d\n",__func__,tosend[i].f.transid,sd);
       send_data(sd,&finalResponse,sizeof(char));
       send_data(sd,&epoch_num,sizeof(unsigned int));
      } else {
@@ -1867,7 +1882,7 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
   int flag = 0;
 
 #ifdef RECOVERYSTATS
-//  printf("Recovery Start\n");
+  printf("Recovery Start dead = %s\n",midtoIPString(deadHost));
   long long st;
   long long fi;
   unsigned int dupeSize = 0;  // to calculate the size of backed up data
@@ -1884,13 +1899,13 @@ void restoreDuplicationState(unsigned int deadHost,unsigned int epoch_num)
       if((flag = pingMachines(epoch_num,sdlist,&tList)) < 0) break;
 
       pthread_mutex_lock(&translist_mutex);
-//      tlistPrint(tList);
+      tlistPrint(tList);
       pthread_mutex_unlock(&translist_mutex);
 //      getchar();
-//      printf("%s -> I'm currently leader num : %d releaseing new lists\n\n",__func__,epoch_num);
+      printf("%s -> I'm currently leader num : %d releaseing new lists\n\n",__func__,epoch_num);
       if((flag = releaseNewLists(epoch_num,sdlist,tList)) < 0) break;
   //    getchar();
-//      printf("%s -> I'm currently leader num : %d duplicate objects\n\n",__func__,epoch_num);
+      printf("%s -> I'm currently leader num : %d duplicate objects\n\n",__func__,epoch_num);
       // transfer lost objects
       if((flag= duplicateLostObjects(epoch_num,sdlist)) < 0) break;
 
@@ -2018,19 +2033,19 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
     if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
       continue;
   
-//    printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i]));
+    printf("%s -> sending request_trans_wait to %s\n",__func__,midtoIPString(hostIpAddrs[i]));
     request = REQUEST_TRANS_WAIT;
     send_data(sdlist[i],&request, sizeof(char));
     send_data(sdlist[i],&epoch_num,sizeof(unsigned int));
     send_data(sdlist[i],&myIndexInHostArray,sizeof(unsigned int));
   }
 
-//  printf("%s -> Stop transaction\n",__func__);
+  printf("%s -> Stop transaction\n",__func__);
   /* stop all local transactions */
   if(stopTransactions(TRANS_BEFORE,epoch_num) < 0)
     return -1;
 
-//  printf("After Stop transaction\n");
+  printf("After Stop transaction\n");
 
   // grab leader's transaction list first
   tlist_node_t* walker = transList->head;
@@ -2042,15 +2057,15 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
     walker = walker->next;
   }
 
-//  printf("%s -> Local Transactions\n",__func__);
-//  tlistPrint(currentTransactionList);
+  printf("%s -> Local Transactions\n",__func__);
+  tlistPrint(currentTransactionList);
 
   for(i = 0; i < numHostsInSystem; i++)
   {
     if(sdlist[i] == -1 || hostIpAddrs[i] == myIpAddr)
       continue;
 
-//    printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i]));
+    printf("%s -> receving from %s\n",__func__,midtoIPString(hostIpAddrs[i]));
     if(recv_data(sdlist[i],&response,sizeof(char)) < 0)
     {
       printf("Here\n");
@@ -2062,11 +2077,11 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
 
     if(response == RESPOND_TRANS_WAIT) 
     {
-//      printf("%s -> RESPOND_TRANS_WAIT\n",__func__);
+      printf("%s -> RESPOND_TRANS_WAIT\n",__func__);
       int timeout1 = computeLiveHosts(sdlist[i]);
-//      printf("%s -> received host list\n",__func__);
+      printf("%s -> received host list\n",__func__);
       int timeout2 = makeTransactionLists(&currentTransactionList,sdlist[i],epoch_num);
-//      printf("%s -> received transaction list\n",__func__);
+      printf("%s -> received transaction list\n",__func__);
       // receive live host list       // receive transaction list
       if(timeout1 < 0 || timeout2 < 0) {
         pthread_mutex_lock(&translist_mutex);
@@ -2074,7 +2089,7 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
         pthread_mutex_unlock(&translist_mutex);
         return -2;
       }
//     tlistPrint(currentTransactionList);
+      tlistPrint(currentTransactionList);
     }
     else if(response == RESPOND_HIGHER_EPOCH)
     {
@@ -2102,7 +2117,7 @@ int pingMachines(unsigned int epoch_num,int* sdlist,tlist_t** tList)
   }
   *tList = currentTransactionList;
 
-//  printf("%s -> Exit\n",__func__);
+  printf("%s -> Exit\n",__func__);
   return 0;
 }
 
@@ -2242,7 +2257,7 @@ int makeTransactionLists(tlist_t** tlist,int sd,unsigned int epoch_num)
       tlist_node_t* tNode = &transArray[j];
       tNode->status = TRANS_OK;
 
-//      printf("%s -> transid = %u decision = %d\n",__func__,transArray[j].transid,transArray[j].decision);
+      printf("%s -> transid = %u decision = %d\n",__func__,transArray[j].transid,transArray[j].decision);
       *tlist = tlistInsertNode2(*tlist,&(transArray[j]),epoch_num);
     }
     else {
@@ -2308,11 +2323,11 @@ int inspectEpoch(unsigned int epoch_num,const char* f)
   pthread_mutex_lock(&recovery_mutex);
   if(epoch_num < currentEpoch) {
     flag = -1;
-  }/*
-  else if(epoch_num > currentEpoch) {
-//    printf("%s -> current epoch %u is changed to epoch num = %u\n",f,currentEpoch,epoch_num);
-//    currentEpoch = epoch_num;
-  }*/
+  }
+  else if((epoch_num > currentEpoch) && strcmp(f,"REQUEST_TRANS_WAIT")==0) {
+    printf("%s -> current epoch %u is changed to epoch num = %u\n",f,currentEpoch,epoch_num);
+    currentEpoch = epoch_num;
+  }
   pthread_mutex_unlock(&recovery_mutex);
 
   return flag;
@@ -3059,29 +3074,23 @@ int startRemoteThread(unsigned int oid, unsigned int mid) {
 //TODO: when reusing oids, make sure they are not already in use!
 static unsigned int id = 0xFFFFFFFF;
 unsigned int getNewOID(void) {
-  do {
-    unsigned int origid=id;
-    unsigned int newid=id+2;
-    if (newid> oidMax || newid < oidMin) {
-      newid=oidMin | 1;
-    }
-    if (CAS32(&id, origid, newid)==origid)
-      return newid;
-  } while(1);
+  id += 2;
+  if (id > oidMax || id < oidMin) {
+    id = (oidMin | 1);
+  }
+  return id;
 }
 
 #ifdef RECOVERY
 static unsigned int tid = 0xFFFFFFFF;
 unsigned int getNewTransID(void) {
-  do {
-    unsigned int origtid=tid;
-    unsigned int newtid=tid+2;
-    if (newtid>transIDMax || newtid < transIDMin) {
-      newtid=transIDMin | 1;
-    }
-    if (CAS32(&tid, origtid, newtid)==origtid)
-      return newtid;
-  } while(1);
+  pthread_mutex_lock(&tidlock);
+  tid+=2;
+  if (tid > transIDMax || tid < transIDMin) {
+    tid = (transIDMin | 1);
+  }
+  pthread_mutex_unlock(&tidlock);
+  return tid;
 }
 #endif