recovery
authorjihoonl <jihoonl>
Tue, 22 Sep 2009 00:19:08 +0000 (00:19 +0000)
committerjihoonl <jihoonl>
Tue, 22 Sep 2009 00:19:08 +0000 (00:19 +0000)
20 files changed:
Robust/src/Runtime/DSTM/interface_recovery/dstm.h
Robust/src/Runtime/DSTM/interface_recovery/dstmserver.c
Robust/src/Runtime/DSTM/interface_recovery/ip.c
Robust/src/Runtime/DSTM/interface_recovery/mlookup.c
Robust/src/Runtime/DSTM/interface_recovery/mlookup.h
Robust/src/Runtime/DSTM/interface_recovery/plookup.c
Robust/src/Runtime/DSTM/interface_recovery/queue.c
Robust/src/Runtime/DSTM/interface_recovery/signal.c
Robust/src/Runtime/DSTM/interface_recovery/singleTMCommit.c
Robust/src/Runtime/DSTM/interface_recovery/sockpool.c
Robust/src/Runtime/DSTM/interface_recovery/stmlock.c
Robust/src/Runtime/DSTM/interface_recovery/stmlock.h
Robust/src/Runtime/DSTM/interface_recovery/stmlookup.c
Robust/src/Runtime/DSTM/interface_recovery/stmlookup.h
Robust/src/Runtime/DSTM/interface_recovery/threadnotify.c
Robust/src/Runtime/DSTM/interface_recovery/threadnotify.h
Robust/src/Runtime/DSTM/interface_recovery/tlookup.c [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface_recovery/tlookup.h [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface_recovery/tm.h
Robust/src/Runtime/DSTM/interface_recovery/trans.c

index e7a80c532b2832f03eecbc60a399de935bfb5da2..e88c414208703856f2c8b8935d9b4decc09f0537 100644 (file)
@@ -5,6 +5,11 @@
 #define MSG_NOSIGNAL 0
 #endif
 
+#ifdef RECOVERY
+#define WAIT_TIME 3
+#endif
+
+
 /***********************************************************
  *       Macros
  **********************************************************/
@@ -45,6 +50,8 @@
 #define THREAD_NOTIFY_RESPONSE          25
 #define TRANS_UNSUCESSFUL               26
 #define CLOSE_CONNECTION                                                       27
+#define ASK_COMMIT                      28
+#define CLEAR_NOTIFY_LIST               29
 /*******************************
  * Duplication Messages
  *****************************/
@@ -221,13 +228,22 @@ typedef struct trans_commit_data {
   int numvernotmatch;           /* no of objects whose version doesn't match */
 } trans_commit_data_t;
 
+#ifdef RECOVERY
+  int leaderFixing;
+  pthread_mutex_t leaderFixing_mutex;
+  pthread_mutex_t liveHosts_mutex;
+#endif
+
+
+
 
 #define PRINT_TID(PTR) printf("DEBUG -> %x %d\n", PTR->mid, PTR->thread_id);
 
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
-void send_data(int fd, void *buf, int buflen);
-void recv_data(int fd, void *buf, int buflen);
+int send_data(int fd, void *buf, int buflen);
+int recv_data(int fd, void *buf, int buflen);
+int recv_data_block(int fd, void *buf, int buflen); // wrapper function for recv_data to check dead machine.
 int recv_data_errorcode(int fd, void *buf, int buflen);
 
 /* Prototypes for object header */
@@ -240,30 +256,52 @@ void objstrDelete(objstr_t *store); //traverse and free entire list
 void *objstrAlloc(objstr_t **store, unsigned int size); //size in bytes
 void clearObjStore(); // TODO:currently only clears the prefetch cache object store
 /* end object store */
+unsigned int getNewTransID(void);
 
+#ifdef RECOVERY
 /* Prototypes for duplications */
-void updateLiveHosts();
+unsigned int updateLiveHosts();
 int updateLiveHostsCommit();
 void setLocateObjHosts();
+void setReLocateObjHosts();
 void printHostsStatus();
 int allHostsLive();
-void duplicateLostObjects(unsigned int mid);
-void duplicateLocalBackupObjects();
-void duplicateLocalOriginalObjects();
-int readDuplicateObjs(int);
-void restoreDuplicationState(unsigned int deadHost);
 unsigned int getPrimaryMachine(unsigned int mid);
 unsigned int getBackupMachine(unsigned int mid);
+unsigned int getDuplicatedPrimaryMachine(unsigned int mid);
 int getNumLiveHostsInSystem();
-unsigned int getNewTransID(void);
+int getMyStatus();
+void* startAsking();
+unsigned int checkIfAnyMachineDead(int*);
+void clearDeadThreadsNotification();
 /* end duplication */
 
+/* for recovery */
+void reqClearNotifyList(unsigned int oid);
+void clearNotifyList(unsigned int oid);
+void duplicateLostObjects(unsigned int mid);
+void duplicateLocalBackupObjects();
+void duplicateLocalOriginalObjects();
+void restoreDuplicationState(unsigned int deadHost);
+int readDuplicateObjs(int);
+
+/* Paxo's algorithm */
+int paxos();
+int paxosPrepare();
+int paxosAccept();
+void paxosLearn();
+
+#endif
+
 /* Prototypes for server portion */
 void *dstmListen(void *);
 int startlistening();
 void *dstmAccept(void *);
+
 int readClientReq(trans_commit_data_t *, int);
 int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
+char checkDecision(unsigned int);
+char receiveDecisionFromBackup(unsigned int,int,unsigned int*);
 char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int);
 char decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
 int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
@@ -329,16 +367,15 @@ void commitCountForObjRead(char *, unsigned int *, unsigned int *, int *, int *,
 void commitCountForObjMod(char *, unsigned int *, unsigned int *, int *, int *, int *, int *, int *, unsigned int, unsigned short);
 
 /* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
-int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid);
+#ifdef RECOVERY
+  int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid,int mid);
+#else
+  int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid);
+#endif
+
 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid);
 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version);
 
-/* Paxo's algorithm */
-int paxos();
-int paxosPrepare();
-int paxosAccept();
-void paxosLearn();
-
 /* Internal functions from signal.c */
 int getthreadid();
 double getMax(double *array, int size);
index b5b6a58a60b84def56302be7aa22291f2d4c66a2..7dd1b782505cddb1405198ed0c51c040a17c0ed9 100644 (file)
@@ -17,6 +17,7 @@
 #ifdef RECOVERY
 #include <unistd.h>
 #include <signal.h>
+#include "tlookup.h"
 #endif
 
 #define BACKLOG 10 //max pending connections
@@ -26,28 +27,30 @@ extern int classsize[];
 extern int numHostsInSystem;
 extern pthread_mutex_t notifymutex;
 
-extern int *liveHosts;
-extern unsigned int *locateObjHosts;
-pthread_mutex_t liveHosts_mutex;
-pthread_mutex_t leaderFixing_mutex;
+extern unsigned int myIpAddr;
+extern unsigned int *hostIpAddrs;
 
+#ifdef RECOVERY
+extern unsigned int *locateObjHosts;
+extern int *liveHosts;
 extern int liveHostsValid;
 extern int numLiveHostsInSystem;
-extern __thread int timeoutFlag;
-extern __thread int timeoutFlag;
-int testcount = 0;
+int clearNotifyListFlag;
+#endif
 
 objstr_t *mainobjstore;
 pthread_mutex_t mainobjstore_mutex;
 pthread_mutex_t lockObjHeader;
+pthread_mutex_t clearNotifyList_mutex;
 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
 
 sockPoolHashTable_t *transPResponseSocketPool;
 extern sockPoolHashTable_t *transRequestSockPool;
+extern sockPoolHashTable_t *transReadSockPool;
 
 int failFlag = 0; //debug
-int leaderFixing;
 
+#ifdef RECOVERY
 /******************************
  * Global variables for Paxos
  ******************************/
@@ -59,6 +62,7 @@ extern int leader;
 extern int paxosRound;
 /* This function initializes the main objects store and creates the
  * global machine and location lookup table */
+#endif
 
 int dstmInit(void) {
   mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
@@ -68,8 +72,11 @@ int dstmInit(void) {
   pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
   pthread_mutex_init(&lockObjHeader,NULL);
 
+#ifdef RECOVERY
        pthread_mutex_init(&liveHosts_mutex, NULL);
        pthread_mutex_init(&leaderFixing_mutex, NULL);
+  pthread_mutex_init(&clearNotifyList_mutex,NULL);
+#endif
 
   if (mhashCreate(MHASH_SIZE, MLOADFACTOR))
     return 1;             //failure
@@ -77,6 +84,11 @@ int dstmInit(void) {
   if (lhashCreate(HASH_SIZE, LOADFACTOR))
     return 1;             //failure
 
+#ifdef RECOVERY
+  if (thashCreate(THASH_SIZE, LOADFACTOR))
+    return 1;
+#endif
+
   if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
     return 1;             //failure
 
@@ -138,12 +150,31 @@ void *dstmListen(void *lfd) {
   socklen_t addrlength = sizeof(struct sockaddr);
   pthread_t thread_dstm_accept;
 
+#ifdef RECOVERY
+  int firsttime = 1;
+  pthread_t thread_dstm_asking;
+#endif
+#ifdef DEBUG
   printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
+#endif
   while(1) {
     int retval;
     int flag=1;
-               if(failFlag) while(1);
     acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+
+#ifdef RECOVERY
+    if(firsttime) {
+      do {
+        retval = pthread_create(&thread_dstm_asking, NULL, startAsking, NULL);
+      }while(retval!=0);
+      firsttime=0;
+      pthread_detach(thread_dstm_asking);
+    }
+#endif
+#ifdef debug
+    printf("%s -> fd accepted\n",__func__);
+#endif
+
     setsockopt(acceptfd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
     do {
        retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
@@ -151,6 +182,96 @@ void *dstmListen(void *lfd) {
     pthread_detach(thread_dstm_accept);
   }
 }
+
+#ifdef RECOVERY
+void* startAsking()
+{
+  unsigned int deadMachineIndex = -1;
+  int i;
+  int validHost;
+  int *socklist;
+  int sd;
+#ifdef DEBUG
+  printf("%s -> Entering\n",__func__);
+#endif
+
+    socklist = (int*) calloc(numHostsInSystem,sizeof(int)); 
+
+    for(i = 0; i< numHostsInSystem;i++) { // for 1
+        if((sd = getSockWithLock(transRequestSockPool,hostIpAddrs[i])) < 0) {
+          printf("%s -> Cannot create socket connection to [%s]\n",__func__,midtoIPString(hostIpAddrs[i]));
+          socklist[i] = -1;
+        }
+        else { // else 1
+          socklist[i] = sd;
+        }   // end of else 1
+    }
+  
+    while(1) {
+
+     deadMachineIndex = checkIfAnyMachineDead(socklist);
+
+      // free socket of dead machine
+      if(deadMachineIndex >= 0) { // if 2
+#ifdef DEBUG
+        printf("%s -> Dead Machine : %s\n",__func__, midtoIPString(hostIpAddrs[deadMachineIndex]));
+#endif
+        restoreDuplicationState(hostIpAddrs[deadMachineIndex]);
+        freeSockWithLock(transRequestSockPool, hostIpAddrs[deadMachineIndex], socklist[deadMachineIndex]);
+        socklist[deadMachineIndex] = -1;
+      } // end of if 2
+    } // end of while 1
+#ifdef DEBUG
+   printf("%s -> Exiting\n",__func__);
+#endif
+}
+
+
+unsigned int checkIfAnyMachineDead(int* socklist)
+{
+  int timeout = 0;
+  int i;
+  char control = RESPOND_LIVE;
+  char response;
+#ifdef DEBUG
+  printf("%s -> Entering\n",__func__);
+#endif
+  
+  while(1){
+    for(i = 0; i< numHostsInSystem;i++) {
+#ifdef DEBUG
+      printf("%s -> socklist[%d] = %d\n",__func__,i,socklist[i]);
+#endif
+      if(socklist[i] > 0) {
+        send_data(socklist[i], &control,sizeof(char));
+
+        if(recv_data(socklist[i], &response, sizeof(char)) < 0) {
+          // if machine is dead, returns index of socket
+#ifdef DEBUG
+          printf("%s -> Machine dead detecteed\n",__func__);
+#endif
+          return i;
+        }
+        else {
+          // machine responded
+          if(response != LIVE) {
+#ifdef DEBUG
+            printf("%s -> Machine dead detected\n",__func__);
+#endif
+            return i;
+          }
+        } // end else
+      }// end if(socklist[i]
+    } // end for()
+
+    clearDeadThreadsNotification();
+
+    sleep(numLiveHostsInSystem);  // wait for seconds for next checking
+  } // end while(1)
+}
+#endif
+
+
 /* This function accepts a new connection request, decodes the control message in the connection
  * and accordingly calls other functions to process new requests */
 void *dstmAccept(void *acceptfd) {
@@ -167,32 +288,35 @@ void *dstmAccept(void *acceptfd) {
   unsigned short objType, *versionarry, version;
        unsigned int *oidarry, numoid, mid, threadid;
   int n, v;
+  unsigned int transIDreceived;
+  char decision;
+  struct sockaddr_in remoteAddr;
 
+#ifdef DEBUG
        printf("%s-> Entering dstmAccept\n", __func__); fflush(stdout);
+#endif
        /* Receive control messages from other machines */
        while(1) {
                int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
-               /*      if(timeoutFlag || timeoutFlag)  {
-               //is there any way to force a context switch?
-               printf("recv_data_errorcode: exiting, timeoutFlag:%d, timeoutFlag:%d\n", failedMachineFlag, timeoutFlag);
-               exit(0);
-               }*/
-               if(failFlag) {
-                       while(1) { 
-                               sleep(10);
-                       }
-               }
+    dupeptr = NULL;
 
                if (ret==0)
                        break;
                if (ret==-1) {
+#ifdef DEBUG
                        printf("DEBUG -> RECV Error!.. retrying\n");
-                       exit(0);
+#endif
+       //              exit(0);
                        break;
                }
+#ifdef DEBUG
                printf("%s-> dstmAccept control = %d\n", __func__, (int)control);
+#endif
                switch(control) {
                        case READ_REQUEST:
+#ifdef DEBUG
+        printf("control -> READ_REQUEST\n");
+#endif
                                /* Read oid requested and search if available */
                                recv_data((int)acceptfd, &oid, sizeof(unsigned int));
                                while((srcObj = mhashSearch(oid)) == NULL) {
@@ -208,35 +332,12 @@ void *dstmAccept(void *acceptfd) {
                                if (h == NULL) {
                                        ctrl = OBJECT_NOT_FOUND;
                                        send_data(sockid, &ctrl, sizeof(char));
-                                       if(timeoutFlag || timeoutFlag) {
-                                               printf("send_data: remote machine dead, line:%d\n", __LINE__);
-                                               timeoutFlag = 0;
-                                               exit(1);
-                                       }
                                } else {
                                        // Type
                                        char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
                                        *((int *)&msg[1])=size;
-                                       printf("*****testcount:%d\n", testcount);
-                                       printf("oid:%u, h->version:%d\n", OID(h), h->version);
-                                       //if(OID(h) == 1 && ((h->version == 20 && liveHosts[0]) || (h->version == 15000  && liveHosts[2])))
-                                       if(testcount == 1000)
-                                       {
-                                               printf("Pretending to fail\n");
-            failFlag = 1;//sleep(5);
-                                               while(1) {
-                                                       sleep(10);
-                                               }//exit(0);
-                                       }
-                                       else
-                                               testcount++;
                                        send_data(sockid, &msg, sizeof(msg));
                                        send_data(sockid, h, size);
-                                       if(timeoutFlag || timeoutFlag) {
-                                               printf("send_data: remote machine dead, line:%d\n", __LINE__);
-                                               timeoutFlag = 0;
-                                               exit(1);
-                                       }
                                }
                                break;
 
@@ -250,6 +351,9 @@ void *dstmAccept(void *acceptfd) {
                                break;
 
                        case TRANS_REQUEST:
+#ifdef DEBUG
+        printf("control -> TRANS_REQUEST\n");
+#endif
                                /* Read transaction request */
                                transinfo.objlocked = NULL;
                                transinfo.objnotfound = NULL;
@@ -261,8 +365,21 @@ void *dstmAccept(void *acceptfd) {
                                        pthread_exit(NULL);
                                }
                                break;
+#ifdef RECOVERY
+      case ASK_COMMIT :
+
+        recv_data((int)acceptfd, &transIDreceived, sizeof(unsigned int));
+
+        decision = checkDecision(transIDreceived);
+
+        send_data((int)acceptfd,&decision,sizeof(char));
 
+        break;
+#endif
                        case TRANS_PREFETCH:
+#ifdef DEBUG
+        printf("control -> TRANS_PREFETCH\n");
+#endif
 #ifdef RANGEPREFETCH
                                if((val = rangePrefetchReq((int)acceptfd)) != 0) {
                                        printf("Error: In rangePrefetchReq() %s, %d\n", __FILE__, __LINE__);
@@ -277,6 +394,9 @@ void *dstmAccept(void *acceptfd) {
                                break;
 
                        case TRANS_PREFETCH_RESPONSE:
+#ifdef DEBUG
+                printf("control -> TRANS_PREFETCH_RESPONSE\n");
+#endif
 #ifdef RANGEPREFETCH
                                if((val = getRangePrefetchResponse((int)acceptfd)) != 0) {
                                        printf("Error: In getRangePrefetchRespose() %s, %d\n", __FILE__, __LINE__);
@@ -291,21 +411,27 @@ void *dstmAccept(void *acceptfd) {
                                break;
 
                        case START_REMOTE_THREAD:
+#ifdef DEBUG
+        printf("control -> START_REMOTE_THREAD\n");
+#endif
                                recv_data((int)acceptfd, &oid, sizeof(unsigned int));
                                objType = getObjType(oid);
-                               printf("%s-> Call startDSMthread\n", __func__);
                                startDSMthread(oid, objType);
-                               printf("%s-> Finish startDSMthread\n", __func__);
                                break;
 
-                       case THREAD_NOTIFY_REQUEST:
+      case THREAD_NOTIFY_REQUEST:
+#ifdef DEBUG
+        printf("control -> THREAD_NOTIFY_REQUEST FD : %d\n",acceptfd);
+#endif
+        numoid = 0;
                                recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
+      
                                size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
                                if((buffer = calloc(1,size)) == NULL) {
                                        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
                                        pthread_exit(NULL);
                                }
-
+                
                                recv_data((int)acceptfd, buffer, size);
 
                                oidarry = calloc(numoid, sizeof(unsigned int));
@@ -323,6 +449,9 @@ void *dstmAccept(void *acceptfd) {
                                break;
 
                        case THREAD_NOTIFY_RESPONSE:
+#ifdef DEBUG
+        printf("control -> THREAD_NOTIFY_RESPONSE\n");
+#endif
                                size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
                                if((buffer = calloc(1,size)) == NULL) {
                                        printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
@@ -340,60 +469,95 @@ void *dstmAccept(void *acceptfd) {
                                threadNotify(oid,version,threadid);
                                free(buffer);
                                break;
-
+#ifdef RECOVERY
+      case CLEAR_NOTIFY_LIST:
+#ifdef DEBUG
+        printf("control -> CLEAR_NOTIFY_LIST\n");
+#endif  
+        size = sizeof(unsigned int);
+        if((buffer = calloc(1,size)) == NULL) {
+          printf("%s() Caclloc error at CLEAR_NOTIFY_LIST\n");
+          pthread_exit(NULL);
+        }
+
+        recv_data((int)acceptfd,buffer, size);
+
+        oid = *((unsigned int *)buffer);
+
+        pthread_mutex_lock(&clearNotifyList_mutex);
+        if(clearNotifyListFlag == 0) {
+          clearNotifyListFlag = 1;
+          pthread_mutex_unlock(&clearNotifyList_mutex);
+          clearNotifyList(oid);
+        }
+        else {
+          pthread_mutex_unlock(&clearNotifyList_mutex);
+        }
+        free(buffer);
+        break;
+#endif
                        case CLOSE_CONNECTION:
+#ifdef DEBUG
+        printf("control -> CLOSE_CONNECTION\n");
+#endif
                                goto closeconnection;
 
+#ifdef RECOVERY
                        case RESPOND_LIVE:
+#ifdef DEBUG
+        printf("control -> RESPOND_LIVE\n");
+#endif
                                liveHostsValid = 0;
                                ctrl = LIVE;
                                send_data((int)acceptfd, &ctrl, sizeof(ctrl));
-                               if(timeoutFlag) {
-                                       printf("send_data: remote machine dead, line:%d\n", __LINE__);
-                                       timeoutFlag = 0;
-                                       exit(1);
-                               }
+#ifdef DEBUG
                                printf("%s (RESPOND_LIVE)-> Sending LIVE!\n", __func__);
+#endif
                                break;
-
+#endif
+#ifdef RECOVERY
                        case REMOTE_RESTORE_DUPLICATED_STATE:
-                               printf("%s (REMOTE_RESTORE_DUPLICATED_STATE)-> Starting process\n", __func__);  
+#ifdef DEBUG
+        printf("control -> REMOTE_RESTORE_DUPLICATED_STATE\n");
+#endif
                                recv_data((int)acceptfd, &mid, sizeof(unsigned int));
-                               ctrl = DUPLICATION_COMPLETE;
-                               send_data((int)acceptfd, &ctrl, sizeof(char));  
-                               if(!liveHosts[findHost(mid)]) 
+                               if(!liveHosts[findHost(mid)]) {
+#ifdef DEBUG
+          printf("%s (REMOTE_RESTORE_DUPLICATED_STATE) -> already fixed\n",__func__);
+#endif
                                        break;
-                               //ctrl = LIVE;
-                               //send_data((int)acceptfd, &ctrl, sizeof(char));        
+        }
                                pthread_mutex_lock(&leaderFixing_mutex);
                                if(!leaderFixing) {
                                        leaderFixing = 1;
                                        pthread_mutex_unlock(&leaderFixing_mutex);
                                        // begin fixing
                                        updateLiveHosts();
-                                       if(!liveHosts[findHost(mid)]) { //confirmed dead
-                                               duplicateLostObjects(mid);
-                                       }
-                                       if(updateLiveHostsCommit() != 0) {
-                                               printf("error updateLiveHostsCommit()\n");
-                                               exit(1);
-                                       }
-                                       // finish fixing
+                                       duplicateLostObjects(mid);
+                               if(updateLiveHostsCommit() != 0) {
+                                       printf("error updateLiveHostsCommit()\n");
+                                       exit(1);
+                               }
+
+        // finish fixing
                                pthread_mutex_lock(&leaderFixing_mutex);
-                                       leaderFixing = 0;
-                                       pthread_mutex_unlock(&leaderFixing_mutex);
-                                       //ctrl = DUPLICATION_COMPLETE;
-                                       //send_data((int)acceptfd, &ctrl, sizeof(char));        
+                               leaderFixing = 0;
+                               pthread_mutex_unlock(&leaderFixing_mutex);
                                }
-                               else {                  
+                               else {
                                        pthread_mutex_unlock(&leaderFixing_mutex);
-                                       //while(leaderFixing);
+#ifdef DEBUG
+          printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__);
+#endif
+          sleep(WAIT_TIME);
                                }
                                break;
-
+#endif
+#ifdef RECOVERY
                        case UPDATE_LIVE_HOSTS:
-                               // update livehosts.
-                               printf("%s (UPDATE_LIVE_HOSTS)-> Attempt to update live machines\n", __func__); 
+#ifdef DEBUG
+        printf("control -> UPDATE_LIVE_HOSTS\n");
+#endif
                                // copy back
                                pthread_mutex_lock(&liveHosts_mutex);
                          recv_data((int)acceptfd, liveHosts, sizeof(int)*numHostsInSystem);
@@ -401,13 +565,20 @@ void *dstmAccept(void *acceptfd) {
                                pthread_mutex_unlock(&liveHosts_mutex);
                                liveHostsValid = 1;
                                numLiveHostsInSystem = getNumLiveHostsInSystem();
+#ifdef DEBUG
                                printHostsStatus();
                          printf("%s (UPDATE_LIVE_HOSTS)-> Finished\n", __func__);      
+#endif
                                //exit(0);
                                break;
+#endif
 
+#ifdef RECOVERY
                        case DUPLICATE_ORIGINAL:
+#ifdef DEBUG
+        printf("control -> DUPLICATE_ORIGINAL\n");
                                printf("%s (DUPLICATE_ORIGINAL)-> Attempt to duplicate original objects\n", __func__);  
+#endif
                                //object store stuffffff
                                recv_data((int)acceptfd, &mid, sizeof(unsigned int));
                                tempsize = mhashGetDuplicate(&dupeptr, 0);
@@ -415,65 +586,125 @@ void *dstmAccept(void *acceptfd) {
                                //send control and dupes after
                                ctrl = RECEIVE_DUPES;
 
-                               if((sd = getSockWithLock(transRequestSockPool, mid)) < 0) {
-                                       printf("DUPLICATE_ORIGINAL: socket create error\n");
-                                       //usleep(1000);
-                               }
-                               printf("sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", sd, tempsize, *((unsigned int *)(dupeptr)));
+        if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+          perror("ORIGINAL : ");
+          exit(0);
+        }
+
+        bzero(&remoteAddr, sizeof(remoteAddr));
+        remoteAddr.sin_family = AF_INET;
+        remoteAddr.sin_port = htons(LISTEN_PORT);
+        remoteAddr.sin_addr.s_addr = htonl(mid);
+
+        if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {
+          printf("ORIGINAL ERROR : %s\n",strerror(errno));
+          exit(0);
+        }
+        else {
+                               send_data(sd, &ctrl, sizeof(char));
+                               send_data(sd, dupeptr, tempsize);
+
+                               recv_data(sd, &response, sizeof(char));
+#ifdef DEBUG
+          printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
+#endif
+                               if(response != DUPLICATION_COMPLETE) {
+#ifdef DEBUG
+           printf("%s(DUPLICATION_ORIGINAL) -> DUPLICATION FAIL\n",__func__);
+#endif
+                                 //fail message
+           exit(0);
+                               }
 
-                               send_data(sd, &ctrl, sizeof(char));
-                               send_data(sd, dupeptr, tempsize);
-                               
-                               recv_data(sd, &response, sizeof(char));
-                               if(response != DUPLICATION_COMPLETE) {
-                                       //fail message
-                               }
-                               ctrl = DUPLICATION_COMPLETE;
+          close(sd);
+        }
+        free(dupeptr);
+
+        ctrl = DUPLICATION_COMPLETE;
                                send_data((int)acceptfd, &ctrl, sizeof(char));
+#ifndef DEBUG
                                printf("%s (DUPLICATE_ORIGINAL)-> Finished\n", __func__);       
-                        freeSockWithLock(transRequestSockPool, mid, sd);
+#endif
                                break;
 
                        case DUPLICATE_BACKUP:
+#ifndef DEBUG
+        printf("control -> DUPLICATE_BACKUP\n");
                                printf("%s (DUPLICATE_BACKUP)-> Attempt to duplicate backup objects\n", __func__);
+#endif
                                //object store stuffffff
                                recv_data((int)acceptfd, &mid, sizeof(unsigned int));
+
+
                                tempsize = mhashGetDuplicate(&dupeptr, 1);
 
-                               printf("tempsize:%d, dupeptrfirstvalue:%d\n", tempsize, *((unsigned int *)(dupeptr)));
                                //send control and dupes after
                                ctrl = RECEIVE_DUPES;
-                               if((sd = getSockWithLock(transRequestSockPool, mid)) < 0) {
-                                       printf("DUPLICATE_BACKUP: socket create error\n");
-                                       //usleep(1000);
-                               }
-                               
-                               printf("sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", sd, tempsize, *((unsigned int *)(dupeptr)));
-                               send_data(sd, &ctrl, sizeof(char));
-                               send_data(sd, dupeptr, tempsize);
-                               recv_data(sd, &response, sizeof(char));
-                               if(response != DUPLICATION_COMPLETE) {
-                                       //fail message
-                               }
+
+        if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+          perror("BACKUP : ");
+          exit(0);
+        }
+
+         bzero(&remoteAddr, sizeof(remoteAddr));                                       
+         remoteAddr.sin_family = AF_INET;                                              
+         remoteAddr.sin_port = htons(LISTEN_PORT);                                     
+         remoteAddr.sin_addr.s_addr = htonl(mid);                                      
+                                                                                       
+         if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr))<0) {       
+           printf("BACKUP ERROR : %s\n",strerror(errno));
+           exit(0);
+         }                                                                             
+         else {                                                                        
+          send_data(sd, &ctrl, sizeof(char));
+                               send_data(sd, dupeptr, tempsize);
+          
+          recv_data(sd, &response, sizeof(char));
+#ifdef DEBUG
+          printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
+#endif
+                               if(response != DUPLICATION_COMPLETE) {
+#ifndef DEBUG
+            printf("%s(DUPLICATION_BACKUP) -> DUPLICATION FAIL\n",__func__);
+#endif
+            exit(0);
+          }
+
+          close(sd);
+         }
+
+        free(dupeptr);
+
                                ctrl = DUPLICATION_COMPLETE;
                                send_data((int)acceptfd, &ctrl, sizeof(char));
+#ifndef DEBUG
                                printf("%s (DUPLICATE_BACKUP)-> Finished\n", __func__); 
+#endif
                                
-                        freeSockWithLock(transRequestSockPool, mid, sd);
                                break;
 
                        case RECEIVE_DUPES:
-                               if((val = readDuplicateObjs((int)acceptfd)) != 0) {
+#ifndef DEBUG
+        printf("control -> RECEIVE_DUPES sd : %d\n",(int)acceptfd);
+#endif
+                               if((readDuplicateObjs((int)acceptfd)) != 0) {
                                        printf("Error: In readDuplicateObjs() %s, %d\n", __FILE__, __LINE__);
                                        pthread_exit(NULL);
                                }
+
                                ctrl = DUPLICATION_COMPLETE;
                                send_data((int)acceptfd, &ctrl, sizeof(char));
+#ifndef DEBUG
+        printf("%s (RECEIVE_DUPES) -> Finished\n",__func__);
+#endif
                                break;
-
+#endif
+#ifdef RECOVERY
                        case PAXOS_PREPARE:
+#ifdef DEBUG
+        printf("control -> PAXOS_PREPARE\n");
+#endif
                                recv_data((int)acceptfd, &val, sizeof(int));
-                               printf("%s (PAXOS_PREPARE)-> prop n:%d, n_h:%d\n", __func__, val, n_h);
                                if (val <= n_h) {
                                        control = PAXOS_PREPARE_REJECT;
                                        send_data((int)acceptfd, &control, sizeof(char));
@@ -481,7 +712,7 @@ void *dstmAccept(void *acceptfd) {
                                else {
                                        n_h = val;
                                        control = PAXOS_PREPARE_OK;
-                               printf("%s (PAXOS_PREPARE)-> n_h now:%d, sending OK\n", __func__, n_h);
+                    
                                        send_data((int)acceptfd, &control, sizeof(char));
                                        send_data((int)acceptfd, &n_a, sizeof(int));
                                        send_data((int)acceptfd, &v_a, sizeof(int));
@@ -489,6 +720,9 @@ void *dstmAccept(void *acceptfd) {
                                break;
 
                        case PAXOS_ACCEPT:
+#ifdef DEBUG
+        printf("control -> PAXOS_ACCEPT\n");
+#endif
                                recv_data((int)acceptfd, &n, sizeof(int));
                                recv_data((int)acceptfd, &v, sizeof(int));
                                if (n < n_h) {
@@ -505,21 +739,31 @@ void *dstmAccept(void *acceptfd) {
                                break;
 
                        case PAXOS_LEARN:
+#ifdef DEBUG
+        printf("control -> PAXOS_LEARN\n");
+#endif
                                recv_data((int)acceptfd, &v, sizeof(int));
                                leader = v_a;
                                paxosRound++;
+#ifdef DEBUG
                                printf("%s (PAXOS_LEARN)-> This is my leader!: [%s]\n", __func__, midtoIPString(leader));
+#endif
                                break;
 
                        case DELETE_LEADER:
+#ifdef DEBUG
+        printf("control -> DELETE_LEADER\n");
+#endif
                                v_a = 0;
                                break;
-
+#endif
                        default:
                                printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
                }
        }
+#ifdef DEBUG
        printf("%s-> Exiting\n", __func__); fflush(stdout);
+#endif
 closeconnection:
        /* Close connection */
        if (close((int)acceptfd) == -1)
@@ -533,12 +777,13 @@ int readDuplicateObjs(int acceptfd) {
        void *dupeptr, *ptrcreate, *ptr;
        objheader_t *header;
 
+#ifdef DEBUG
        printf("%s-> Start\n", __func__);
+#endif
        recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
        recv_data((int)acceptfd, &size, sizeof(int));   
        // do i need array of oids?
        // answer: no! now get to work
-       printf("%s-> numoid:%d, size:%d\n", __func__, numoid, size);
        if(numoid != 0) {
                if ((dupeptr = calloc(1, size)) == NULL) {
                        printf("calloc error for duplicated objects %s, %d\n", __FILE__, __LINE__);
@@ -552,26 +797,64 @@ int readDuplicateObjs(int acceptfd) {
                        oid = OID(header);
                        GETSIZE(tmpsize, header);
                        tmpsize += sizeof(objheader_t);
+
+#ifdef DEBUG
                        printf("%s-> oid being received/backed:%u, version:%d, type:%d\n", __func__, oid, header->version, TYPE(header));
                        printf("STATUSPTR(header):%u, STATUS:%d\n", STATUSPTR(header), STATUS(header));
-                       pthread_mutex_lock(&mainobjstore_mutex);
-                       if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
-                               printf("Error: readDuplicateObjs() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
-                               pthread_mutex_unlock(&mainobjstore_mutex);
-                               return 1;
-                       }
-                       pthread_mutex_unlock(&mainobjstore_mutex);
-           memcpy(ptrcreate, header, tmpsize);
+#endif
 
-                       mhashInsert(oid, ptrcreate);
-                       ptr += tmpsize;
-               }
+      if(mhashSearch(oid) != NULL) {
+#ifdef DEBUG
+        printf("%s -> oid : %d is already in there\n",__func__,oid);
+#endif
 
+        if(header->notifylist != NULL) {
+          unsigned int *listSize = (ptr + tmpsize);
+          tmpsize += sizeof(unsigned int);
+          tmpsize += sizeof(threadlist_t) * (*listSize);
+        }
+      }
+      else {
+                       pthread_mutex_lock(&mainobjstore_mutex);
+                       if ((ptrcreate = objstrAlloc(&mainobjstore, tmpsize)) == NULL) {
+                               printf("Error: readDuplicateObjs() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
+                               pthread_mutex_unlock(&mainobjstore_mutex);
+                                 return 1;
+                       }
+                       pthread_mutex_unlock(&mainobjstore_mutex);
+             memcpy(ptrcreate, header, tmpsize);
+
+        objheader_t* oPtr = (objheader_t*)ptrcreate;
+
+        if(oPtr->notifylist != NULL) {
+          oPtr->notifylist = NULL;  // reset for new list
+          threadlist_t *listNode;
+          unsigned int* listSize = (ptr + tmpsize);  // get number of notifylist
+          unsigned int j;
+
+          tmpsize += sizeof(unsigned int);   // skip number of notifylist 
+          listNode = (threadlist_t*)(ptr + tmpsize); // get first element of address
+          for(j = 0; j< *listSize; j++) {      // retreive all threadlist
+            oPtr->notifylist = insNode(oPtr->notifylist,listNode[j].threadid,listNode[j].mid);
+          
+          }
+          tmpsize += sizeof(threadlist_t) * (*listSize);
+
+        }
+               mhashInsert(oid, ptrcreate);
+      }
+               ptr += tmpsize;
+               }
+#ifdef DEBUG
                printf("%s-> End\n", __func__);
+#endif
+    free(dupeptr);
                return 0;
        }
        else {
+#ifdef DEBUG
                printf("%s-> No objects duplicated\n", __func__);
+#endif
                return 0;
        }
 }
@@ -585,23 +868,28 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   fixed_data_t fixed;
   objheader_t *headaddr;
   int sum, i, size, n, val;
+  int timeout;
 
   oidmod = NULL;
+#ifdef DEBUG
        printf("%s-> Entering\n", __func__);
+#endif
 
   /* Read fixed_data_t data structure */
   size = sizeof(fixed) - 1;
   ptr = (char *)&fixed;
   fixed.control = TRANS_REQUEST;
-  recv_data((int)acceptfd, ptr+1, size);
+  timeout = recv_data((int)acceptfd, ptr+1, size);
 
   /* Read list of mids */
   int mcount = fixed.mcount;
   size = mcount * sizeof(unsigned int);
   unsigned int listmid[mcount];
   ptr = (char *) listmid;
-  recv_data((int)acceptfd, ptr, size);
+  timeout = recv_data((int)acceptfd, ptr, size);
 
+  if(timeout < 0)   // coordinator failed
+    return 0;
 
   /* Read oid and version tuples for those objects that are not modified in the transaction */
   int numread = fixed.numread;
@@ -609,7 +897,7 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   char objread[size];
   if(numread != 0) { //If pile contains more than one object to be read,
     // keep reading all objects
-    recv_data((int)acceptfd, objread, size);
+    timeout = recv_data((int)acceptfd, objread, size);
   }
 
   /* Read modified objects */
@@ -619,9 +907,12 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
       return 1;
     }
     size = fixed.sum_bytes;
-    recv_data((int)acceptfd, modptr, size);
+    timeout = recv_data((int)acceptfd, modptr, size);
   }
 
+  if(timeout < 0) // coordinator failed
+    return 0;
+
   /* Create an array of oids for modified objects */
   oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
   if (oidmod == NULL) {
@@ -637,9 +928,9 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
     GETSIZE(tmpsize, headaddr);
     ptr += sizeof(objheader_t) + tmpsize;
   }
-
-       printf("%s-> num oid read = %d, oids modified = %d, size = %d\n", __func__, fixed.numread,  fixed.nummod, size); fflush(stdout);
-// sleep(1); 
+#ifdef DEBUG
+       printf("%s-> num oid read = %d, oids modified = %d, size = %d\n", __func__, fixed.numread,  fixed.nummod, size);
+#endif
   /*Process the information read */
   if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
     printf("Error: In processClientReq() %s, %d\n", __FILE__, __LINE__);
@@ -654,7 +945,9 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
   if(oidmod != NULL) {
     free(oidmod);
   }
+#ifdef DEBUG
        printf("%s-> Exiting\n", __func__);
+#endif
 
   return 0;
 }
@@ -666,11 +959,18 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                      unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
 
   char control, sendctrl, retval;
+  
   objheader_t *tmp_header;
   void *header;
   int i = 0, val;
-
+  unsigned int transID;
+#ifdef DEBUG
        printf("%s-> Entering\n", __func__);
+#endif
+
+  /* receives transaction id */
+  recv_data((int)acceptfd, &transID, sizeof(unsigned int));
+
   /* Send reply to the Coordinator */
   if((retval = handleTransReq(fixed, transinfo, listmid, objread, modptr,acceptfd)) == 0 ) {
     printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
@@ -678,9 +978,27 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     return 1;
   }
 
-       recv_data((int)acceptfd, &control, sizeof(char));
+       int timeout = recv_data((int)acceptfd, &control, sizeof(char));
        /* Process the new control message */
-       switch(control) {
+#ifdef DEBUG
+  printf("%s -> timeout = %d   control = %d\n",__func__,timeout,control); 
+#endif
+  
+#ifdef RECOVERY
+  if(timeout < 0) {  // timeout. failed to receiving data from coordinator
+#ifdef DEBUG
+    printf("%s -> timeout!! assumes coordinator is dead\n",__func__);
+#endif
+    control = receiveDecisionFromBackup(transID,fixed->mcount,listmid);
+#ifdef DEBUG
+    printf("%s -> received Decision %d\n",__func__,control);
+#endif
+  }    
+  
+  /* insert received control into thash for another transaction*/
+  thashInsert(transID, control);
+#endif
+  switch(control) {
                case TRANS_ABORT:
                        if (fixed->nummod > 0)
                                free(modptr);
@@ -718,6 +1036,8 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                                printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
                                return 1;
                        }
+
+
                        break;
 
                case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
@@ -736,11 +1056,29 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
   if (transinfo->objnotfound != NULL) {
     free(transinfo->objnotfound);
   }
+#ifdef DEBUG
        printf("%s-> Exiting, line:%d\n", __func__, __LINE__);
+#endif
 
   return 0;
 }
 
+#ifdef RECOVERY
+char checkDecision(unsigned int transID) 
+{
+#ifdef DEBUG
+  printf("%s -> transID :  %u\n",__func__,transID);
+#endif
+
+  char response = thashSearch(transID);
+
+  if(response == 0)
+    return -1;
+  else
+    return response;
+}
+#endif
+
 /* This function increments counters while running a voting decision on all objects involved
  * in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
 char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
@@ -771,6 +1109,9 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
       oid = *((unsigned int *)(objread + incr));
       incr += sizeof(unsigned int);
       version = *((unsigned short *)(objread + incr));
+#ifdef DEBUG
+      printf("%s -> oid : %u    version : %d\n",__func__,oid,version);
+#endif
       getCommitCountForObjRead(oidnotfound, oidlocked, oidvernotmatch, &objnotfound, &objlocked, &objvernotmatch,
                                &v_matchnolock, &v_matchlock, &v_nomatch, &numBytes, &control, oid, version);
     } else {  //Objs modified
@@ -783,6 +1124,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
       version = headptr->version;
       GETSIZE(tmpsize, headptr);
       ptr += sizeof(objheader_t) + tmpsize;
+
       getCommitCountForObjMod(oidnotfound, oidlocked, oidvernotmatch, &objnotfound,
                               &objlocked, &objvernotmatch, &v_matchnolock, &v_matchlock, &v_nomatch,
                               &numBytes, &control, oid, version);
@@ -822,18 +1164,16 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
                        }
                        free(oidlocked);
                }
-               printf("control = %d, file = %s, line = %d\n", (int)control, __FILE__, __LINE__);
+    
+
+#ifdef DEBUG
+               printf("%s -> control = %d, file = %s, line = %d\n", __func__,(int)control, __FILE__, __LINE__);
+#endif
 
                send_data(acceptfd, &control, sizeof(char));
 #ifdef CACHE
                send_data(acceptfd, &numBytes, sizeof(int));
                send_data(acceptfd, objs, numBytes);
-if(timeoutFlag || timeoutFlag) {
-printf("send_data: remote machine dead, line:%d\n", __LINE__);
-timeoutFlag = 0;
-timeoutFlag = 0;
-exit(1);
-}
 
                transinfo->objvernotmatch = oidvernotmatch;
                transinfo->numvernotmatch = objvernotmatch;
@@ -863,25 +1203,28 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
 #ifdef RECOVERY
   if(version == 1) {
                (*v_matchnolock)++;
-               printf("*backup object* oid:%u\n", oid);
+#ifdef DEBUG
+               printf("%s -> *backup object* oid:%u\n", __func__,oid);
+#endif
                return;
        }
 #endif
 
        if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
+#ifdef DEBUG
                printf("Obj not found: %s() oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
                fflush(stdout);
+#endif
                /* Save the oids not found and number of oids not found for later use */
                oidnotfound[*objnotfound] = oid;
                (*objnotfound)++;
        } else {     /* If Obj found in machine (i.e. has not moved) */
-               printf("Obj found: %s() oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
-               fflush(stdout);
                /* Check if Obj is locked by any previous transaction */
                if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
-
-               printf("****%s->Trying to acquire 'remote' writelock for oid:%d, version:%d\n", __func__, oid, version);
+#ifdef DEBUG
+                 printf("****%s->Trying to acquire 'remote' writelock for oid:%d, version:%d\n", __func__, oid, version);
                        printf("this version: %d, mlookup version: %d\n", version, ((objheader_t *)mobj)->version);
+#endif
                        if (version == ((objheader_t *)mobj)->version) { /* match versions */
                                (*v_matchnolock)++;
                        } else { /* If versions don't match ...HARD ABORT */
@@ -914,7 +1257,9 @@ void getCommitCountForObjMod(unsigned int *oidnotfound, unsigned int *oidlocked,
                        }
                }
   }
-       printf("oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n", oid, *v_matchnolock, *v_matchlock, *v_nomatch);
+#ifdef DEBUG
+       printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__,oid, *v_matchnolock, *v_matchlock, *v_nomatch);
+#endif
 }
 
 /* Update Commit info for objects that are read */
@@ -924,6 +1269,9 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
   void *mobj;
   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
   //printf("version number: %d\n", version);
+#ifdef DEBUG
+  printf("%s -> Entering\n",__func__);
+#endif
 #ifdef RECOVERY
   if(version == 1) {
                (*v_matchnolock)++;
@@ -931,49 +1279,59 @@ void getCommitCountForObjRead(unsigned int *oidnotfound, unsigned int *oidlocked
                return;
        }
 #endif
+
        if ((mobj = mhashSearch(oid)) == NULL) {    /* Obj not found */
-       printf("Obj not found: %s() file:%s oid = %d, type = %d\t\n", __func__, __FILE__, OID(mobj), TYPE((objheader_t *)mobj));
-       fflush(stdout);
+#ifdef DEBUG
+    printf("%s -> Obj not found!\n",__func__);
+         printf("%s -> Obj not found: oid = %d, type = %d\t\n", __func__,OID(mobj), TYPE((objheader_t *)mobj));
+       fflush(stdout);
+#endif
     /* Save the oids not found and number of oids not found for later use */
     oidnotfound[*objnotfound] = oid;
     (*objnotfound)++;
   } else {     /* If Obj found in machine (i.e. has not moved) */
-       printf("Obj found: %s() file:%s oid = %d, type = %d\t\n", __func__, __FILE__, OID(mobj), TYPE((objheader_t *)mobj));
-       fflush(stdout);
+#ifdef DEBUG
+    printf("%s -> Obj found!!\n",__func__);
+       printf("%s -> Obj found: oid = %d, type = %d\t\n", __func__, OID(mobj), TYPE((objheader_t *)mobj));
+         fflush(stdout);
+#endif
+    
     /* Check if Obj is locked by any previous transaction */
     if (read_trylock(STATUSPTR(mobj))) { //Can further acquire read locks
       if (version == ((objheader_t *)mobj)->version) { /* match versions */
-       (*v_matchnolock)++;
+       (*v_matchnolock)++;
       } else { /* If versions don't match ...HARD ABORT */
-       (*v_nomatch)++;
-       oidvernotmatch[(*objvernotmatch)++] = oid;
-       int size;
-       GETSIZE(size, mobj);
-       size += sizeof(objheader_t);
-       *numBytes += size;
-       /* Send TRANS_DISAGREE to Coordinator */
-       *control = TRANS_DISAGREE;
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
+       (*v_nomatch)++;
+       oidvernotmatch[(*objvernotmatch)++] = oid;
+       int size;
+       GETSIZE(size, mobj);
+       size += sizeof(objheader_t);
+       *numBytes += size;
+      
+        /* Send TRANS_DISAGREE to Coordinator */
+       *control = TRANS_DISAGREE;
       }
+
       //Keep track of oid locked
       oidlocked[(*objlocked)++] = OID(((objheader_t *)mobj));
     } else { /* Some other transaction has aquired a write lock on this object */
       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
-       (*v_matchlock)++;
+       (*v_matchlock)++;
       } else { /* If versions don't match ...HARD ABORT */
-       (*v_nomatch)++;
-       oidvernotmatch[*objvernotmatch] = oid;
-       (*objvernotmatch)++;
-       int size;
-       GETSIZE(size, mobj);
-       size += sizeof(objheader_t);
-       *numBytes += size;
-       *control = TRANS_DISAGREE;
-       //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
+       (*v_nomatch)++;
+       oidvernotmatch[*objvernotmatch] = oid;
+       (*objvernotmatch)++;
+       int size;
+       GETSIZE(size, mobj);
+       size += sizeof(objheader_t);
+       *numBytes += size;
+       *control = TRANS_DISAGREE;
       }
     }
   }
-       printf("oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n", oid, *v_matchnolock, *v_matchlock, *v_nomatch);
+#ifdef DEBUG
+       printf("%s -> oid: %u, v_matchnolock: %d, v_matchlock: %d, v_nomatch: %d\n",__func__, oid, *v_matchnolock, *v_matchlock, *v_nomatch);
+#endif
 }
 
 /* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
@@ -988,22 +1346,21 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
   if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
     control = TRANS_AGREE;
     /* Send control message */
-               printf("control = %d, file = %s, line = %d\n", (int)control, __FILE__, __LINE__);
+#ifdef DEBUG
+               printf("%s -> control = %s\n", __func__,"TRANS_AGREE");
+#endif
     send_data(acceptfd, &control, sizeof(char));
-if(timeoutFlag || timeoutFlag) {
-printf("send_data: remote machine dead, line:%d\n", __LINE__);
-timeoutFlag = 0;
-timeoutFlag = 0;
-exit(1);
-}
-
-               printf("finished sending control\n");
+    
+#ifdef DEBUG
+               printf("%s -> finished sending control\n",__func__);
+#endif
   }
   /* Condition to send TRANS_SOFT_ABORT */
-  if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
+  else if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
     control = TRANS_SOFT_ABORT;
-
-               printf("control = %d, file = %s, line = %d\n", (int)control, __FILE__, __LINE__);
+#ifdef DEBUG
+               printf("%s -> control = %s\n", __func__,"TRANS_SOFT_ABORT");
+#endif
     /* Send control message */
     send_data(acceptfd, &control, sizeof(char));
 
@@ -1037,8 +1394,10 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
   char control;
   int tmpsize;
   void *ptrcreate;
+#ifdef DEBUG
        printf("DEBUG-> Entering transCommitProcess, dstmserver.c\n");
        printf("nummod: %d, numlocked: %d\n", nummod, numlocked);
+#endif
 
   /* Process each modified object saved in the mainobject store */
   for(i = 0; i < nummod; i++) {
@@ -1047,11 +1406,15 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
       printf("Error: mhashsearch returns NULL at dstmserver.c %d\n", __LINE__);
                        return 1;
 #else
+#ifdef DEBUG
                        printf("DEBUG->*backup* i:%d, nummod:%d\n", i, nummod);
+#endif
                        header = (objheader_t *)(modptr+offset);
                        header->version += 1;
                        header->isBackup = 1;
+#ifdef DEBUG
       printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
+#endif
                        GETSIZE(tmpsize, header);
                        tmpsize += sizeof(objheader_t);
                        pthread_mutex_lock(&mainobjstore_mutex);
@@ -1082,10 +1445,24 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
     }
     header->version += 1;
+#ifdef DEBUG
     printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
+#endif
     /* If threads are waiting on this object to be updated, notify them */
     if(header->notifylist != NULL) {
-      notifyAll(&header->notifylist, OID(header), header->version);
+#ifdef DEBUG
+      printf("%s -> type : %d notifylist : %d\n",__func__,TYPE(header),header->notifylist);
+#endif
+
+#ifdef RECOVERY
+      if(header->isBackup != 0)
+        notifyAll(&header->notifylist, OID(header), header->version);
+      else
+        clearNotifyList(OID(header));
+#else
+        notifyAll(&header->notifylist, OID(header), header->version);
+#endif
+
     }
     offset += sizeof(objheader_t) + tmpsize;
   }
@@ -1105,7 +1482,9 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
       return 1;
     }
                
+#ifdef DEBUG
                printf("header oid:%d, version:%d, useWriteUnlock:%d\n", OID(header), header->version, useWriteUnlock);
+#endif
     if(useWriteUnlock) {
       write_unlock(STATUSPTR(header));
     } else {
@@ -1229,7 +1608,9 @@ int prefetchReq(int acceptfd) {
 }
 
 void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
+#ifdef DEBUG
                printf("control = %d, file = %s, line = %d\n", (int)control, __FILE__, __LINE__);
+#endif
   send_data(sd, control, sizeof(char));
   /* Send the buffer with its size */
   int length = *(size);
@@ -1256,46 +1637,48 @@ void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short
       /* Check to see if versions are same */
 checkversion:
       if (write_trylock(STATUSPTR(header))) { // Can acquire write lock
-       newversion = header->version;
-       if(newversion == *(versionarry + i)) {
-         //Add to the notify list
-         if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
-           printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
-           return;
-         }
-         write_unlock(STATUSPTR(header));
-       } else {
-         write_unlock(STATUSPTR(header));
-         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-           perror("processReqNotify():socket()");
-           return;
-         }
-         bzero(&remoteAddr, sizeof(remoteAddr));
-         remoteAddr.sin_family = AF_INET;
-         remoteAddr.sin_port = htons(LISTEN_PORT);
-         remoteAddr.sin_addr.s_addr = htonl(mid);
-
-         if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
-           printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
-                  inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-           close(sd);
-           return;
-         } else {
-           //Send Update notification
-           msg[0] = THREAD_NOTIFY_RESPONSE;
-           *((unsigned int *)&msg[1]) = oid;
-           size = sizeof(unsigned int);
-           *((unsigned short *)(&msg[1]+size)) = newversion;
-           size += sizeof(unsigned short);
-           *((unsigned int *)(&msg[1]+size)) = threadid;
-           size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
-           send_data(sd, msg, size);
-         }
-         close(sd);
-       }
+       newversion = header->version;
+         
+        if(newversion == *(versionarry + i)) {
+               //Add to the notify list
+             if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
+                 printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
+               return;
+             }
+           write_unlock(STATUSPTR(header));
+       } 
+        else {
+          write_unlock(STATUSPTR(header));
+         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+                 perror("processReqNotify():socket()");
+                 return;
+           }
+             bzero(&remoteAddr, sizeof(remoteAddr));
+           remoteAddr.sin_family = AF_INET;
+             remoteAddr.sin_port = htons(LISTEN_PORT);
+           remoteAddr.sin_addr.s_addr = htonl(mid);
+  
+         if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+               printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
+            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+             close(sd);
+             return;
+             } else {
+               //Send Update notification
+             msg[0] = THREAD_NOTIFY_RESPONSE;
+             *((unsigned int *)&msg[1]) = oid;
+           size = sizeof(unsigned int);
+                 *((unsigned short *)(&msg[1]+size)) = newversion;
+            size += sizeof(unsigned short);
+             *((unsigned int *)(&msg[1]+size)) = threadid;
+             size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
+           send_data(sd, msg, size);
+                 }
+         close(sd);
+             }
       } else {
-       randomdelay();
-       goto checkversion;
+           randomdelay();
+         goto checkversion;
       }
     }
     i++;
@@ -1303,3 +1686,40 @@ checkversion:
   free(oidarry);
   free(versionarry);
 }
+
+#ifdef RECOVERY
+/* go through oid's notifylist and clear them */
+void clearNotifyList(unsigned int oid)
+{
+#ifdef DEBUG
+  printf("%s -> Entering\n",__func__);
+#endif
+
+  objheader_t* header;
+  threadlist_t* t;
+  threadlist_t* tmp;
+  
+  if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
+    printf("%s -> mhashSearch returned NULL!!\n",__func__);
+  }
+
+  if(header->notifylist != NULL) {
+      t = header->notifylist;
+       
+      while(t) {
+        tmp = t;
+        t = t->next;
+
+        free(tmp);
+      }
+      header->notifylist = NULL;
+  }
+  
+  pthread_mutex_lock(&clearNotifyList_mutex);
+  clearNotifyListFlag = 0;
+  pthread_mutex_unlock(&clearNotifyList_mutex);
+#ifdef DEBUG
+  printf("%s -> finished\n",__func__);
+#endif
+}
+#endif
index 1f84a9b3499a3cb1c23755d136ff4bbc11941d9d..d5c6b45a806720637eee2530692a74b4bd6930be 100644 (file)
@@ -29,9 +29,9 @@ void midtoIP(unsigned int mid, char *ptr) {
   i.c = (mid & 0x0000ff00) >> 8;
   i.d = mid & 0x000000ff;
   sprintf(ptr, "%d.%d.%d.%d", i.a, i.b, i.c, i.d);
-/*#ifdef DEBUG
+#ifdef DEBUG
   printf("DEBUG-> midtoIP() mid = %d.%d.%d.%d\n", i.a, i.b, i.c, i.d);
-#endif*/
+#endif
   return;
 }
 
index bb316449fedf8c693d662b86c09c8eb333bd2041..fa4cef88fff6715c08b8902b2f0d54022ca7db38 100644 (file)
@@ -190,9 +190,10 @@ unsigned int *mhashGetKeys(unsigned int *numKeys) {
   for (i = 0; i < mlookup.size; i++) {
     if (mlookup.table[i].key != 0) {
       curr = &mlookup.table[i];
+
       while (curr != NULL) {
-       keys[keyindex++] = curr->key;
-       curr = curr->next;
+       keys[keyindex++] = curr->key;
+       curr = curr->next;
       }
     }
   }
@@ -204,10 +205,14 @@ unsigned int *mhashGetKeys(unsigned int *numKeys) {
   return keys;
 }
 
+#ifdef RECOVERY
 int mhashGetDuplicate(void **dupeptr, int backup) { //how big?
+#ifdef DEBUG
        printf("%s-> Start\n", __func__); 
+#endif
        unsigned int numdupe = 0;
-       
+       void* dPtr;
+
 //     ok let's do this;
        unsigned int oidsdupe[mlookup.size];
   int size = 0, tempsize = 0, i = 0;
@@ -218,6 +223,9 @@ int mhashGetDuplicate(void **dupeptr, int backup) { //how big?
 //     track sizes, oids, and num
        pthread_mutex_lock(&mlookup.locktable); 
 
+  size =0;
+  tempsize =0;
+
        for(i = 0; i < mlookup.size; i++) {
                if (mlookup.table[i].key != 0) {
                        node = &mlookup.table[i];
@@ -227,6 +235,11 @@ int mhashGetDuplicate(void **dupeptr, int backup) { //how big?
                                        oidsdupe[numdupe++] = OID(header);
                                        GETSIZE(tempsize, header);
                                        size += tempsize + sizeof(objheader_t);
+
+          if(header->notifylist != NULL) {
+            //      number of nodes     +       actual size of array
+            size += (sizeof(unsigned int) + (getListSize(header->notifylist) * sizeof(threadlist_t)));
+          }
                                }
                                node = node->next;
                        }
@@ -234,47 +247,128 @@ int mhashGetDuplicate(void **dupeptr, int backup) { //how big?
        }
 
   pthread_mutex_unlock(&mlookup.locktable);
-       printf("%s-> size:%d, numdupe:%d\n", __func__, size, numdupe);
 
        //i got sizes, oids, and num now
+  //
 
-       if(((*dupeptr) = calloc(1, sizeof(unsigned int)+sizeof(int)+size)) == NULL) {
-               printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+       if((dPtr =(void*) malloc(sizeof(unsigned int)+sizeof(int)+ size)) == NULL) {
+               printf("malloc error for modified objects %s, %d\n", __FILE__, __LINE__);
                return;
        }
 
 //     for each oid in oiddupe[] get object and format
-       void *ptr = *(dupeptr);
-       *((unsigned int *)(ptr)) = numdupe;
+
+  *dupeptr = dPtr;
+       void* ptr = dPtr;
+  *((unsigned int *)(ptr)) = numdupe;
        ptr += sizeof(unsigned int);
-       *((int *)(ptr)) = size;
+  *((int *)(ptr)) = size;
        ptr += sizeof(int);
+  void* ttt = *dupeptr;
+
        for(i = 0; i < numdupe; i++) {
-               printf("%s-> oid being backed:%u\n", __func__, oidsdupe[i]);
     header = mhashSearch(oidsdupe[i]);
-               printf("new header oid:%d, version:%d\n", OID(header), header->version);
-                       printf("STATUSPTR(header):%u, STATUS:%d\n", STATUSPTR(header), STATUS(header));
-/*                     if(write_trylock(STATUSPTR(header))) {
-                               printf("this object is not locked\n");
-
-      write_unlock(STATUSPTR(header));
-                       }
-                       else 
-                       printf("its locked\n");*/
-       
 
                GETSIZE(tempsize, header);
                tempsize += sizeof(objheader_t);
                memcpy(ptr, header, tempsize); //*ptr = header maybe wont work, use memcopy instead probably
+
+               if(header->isBackup && backup) {
+      ((objheader_t*)ptr)->isBackup = 0;
+    }else if(!(header->isBackup) && !backup) {
+      ((objheader_t*)ptr)->isBackup = 1;
+    }
+    else {
+      printf("%s -> ERROR\n",__func__);
+      exit(0);
+    }
+
                ptr += tempsize;
-       }
 
-/*   printf("dupeptrfirstvalue:%d\n", *((unsigned int *)(dupeptr)));
-   dupeptr += sizeof(unsigned int); 
-   printf("dupeptrfirstvalue:%d\n", *((int *)(dupeptr)));*/
+    if(header->notifylist != NULL) {
+      unsigned int listSize;
+      /* get duplicate array of threadlist */
+      threadlist_t *threadArray;
+      listSize = convertToArray(header->notifylist,&threadArray);
 
+      memcpy(ptr, &listSize,sizeof(unsigned int));
+      ptr += sizeof(unsigned int);
 
-       printf("%s-> End\n", __func__); 
+      memcpy(ptr, threadArray, (sizeof(threadlist_t) * listSize));
+      ptr += (sizeof(threadlist_t) * listSize);  
+      free(threadArray);
+    }
+       }
+#ifdef DEBUG
+       printf("%s-> End\n", __func__);
+#endif
+  //          number of oid       size    + data array 
        return (sizeof(unsigned int) + sizeof(int) + size);
 }
 
+int mhashGetThreadObjects(unsigned int** oidArray,unsigned int** midArray,unsigned int** threadidArray)
+{
+       printf("%s-> Start\n", __func__); 
+       unsigned int oidArr[mlookup.numelements];
+  unsigned int midArr[mlookup.numelements];
+  unsigned int threadidArr[mlookup.numelements];
+  unsigned int* hashkeys;
+  unsigned int numKeys;
+       objheader_t *header;
+  int i;
+
+  int size =0;
+       mhashlistnode_t *node;
+//     go through object store;
+//     track sizes, oids, and num
+
+  hashkeys = mhashGetKeys(&numKeys);
+  printf("%s -> numKeys : %d\n",__func__,numKeys);
+
+  threadlist_t* t;
+  threadlist_t* tmp;
+
+  for(i = 0; i < numKeys; i++) {
+    header = (objheader_t*)mhashSearch(hashkeys[i]);
+    pthread_mutex_lock(&mlookup.locktable);
+
+    if(header->isBackup && header->notifylist != NULL) {
+        
+      t = header->notifylist;
+
+      while(t) {
+        oidArr[size] = OID(header);
+        midArr[size] = t->mid;
+        threadidArr[size++] = t->threadid;
+        tmp = t;
+        t = t->next;
+        free(tmp);
+      }
+
+      header->notifylist = NULL;
+    }
+    pthread_mutex_unlock(&mlookup.locktable);
+  }
+
+  free(hashkeys);
+
+  printf("%s -> end copying    Size : %d\n",__func__,size);
+
+  if(size > 0) {
+    *oidArray = (unsigned int*) calloc(size, sizeof(unsigned int));
+    *midArray = (unsigned int*) calloc(size, sizeof(unsigned int));
+    *threadidArray = (unsigned int*) calloc(size, sizeof(unsigned int));
+
+    for(i = 0; i < size; i++) {
+      (*oidArray)[i] = oidArr[i];
+      (*midArray)[i] = midArr[i];
+      (*threadidArray)[i] = threadidArr[i];
+    }
+  }
+
+  printf("%s -> End\n",__func__);
+
+  return size;
+
+}
+#endif
index 2133b498d4252b5254e8d6bc4f20e33fca39c438..d6158e5374d317737d2de2d9c60f3648ae163da2 100644 (file)
@@ -37,6 +37,11 @@ unsigned int mhashRemove(unsigned int key); //returns -1 if not found
 unsigned int mhashResize(unsigned int newsize);
 unsigned int *mhashGetKeys(unsigned int *numKeys);
 
+#ifdef RECOVERY
+int mhashGetDuplicate(void** dupeptr,int backup);
+int mhashGetThreadObjects(unsigned int** oidArray,unsigned int** midArray,unsigned int** threadidArray);
+#endif
+
 void mhashPrint();
 
 #endif
index 4b353b067d020e6110c3842b99e1468517a8888f..3eee2cbd0abf66c75a2c5f9b00366f2f0bbb57f2 100644 (file)
@@ -1,5 +1,4 @@
 #include "plookup.h"
-#include "ip.h"
 extern int classsize[];
 
 //NOTE: "pile" ptr points to the head of the linked list of the machine pile data structures
@@ -58,7 +57,6 @@ int pListMid(plistnode_t *pile, unsigned int *list) {
   int i = 0;
   plistnode_t *tmp;
   tmp = pile;
-       char ip[16];
   while (tmp != NULL) {
     list[i] = tmp->mid;
     i++;
index c892a030266b67d02f095c6995e1139c7c7e2739..fcb58191864383ea2b65ceacde1c56ab08ae47f8 100644 (file)
@@ -58,10 +58,10 @@ void movehead(int size) {
 void * gettail() {
   while(tailoffset==headoffset) {
     //Sleep
-    pthread_mutex_lock(&qlock);
-    if (tailoffset==headoffset)
-      pthread_cond_wait(&qcond, &qlock);
-    pthread_mutex_unlock(&qlock);
+    //    pthread_mutex_lock(&qlock);
+    //    if (tailoffset==headoffset)
+    //      pthread_cond_wait(&qcond, &qlock);
+    //    pthread_mutex_unlock(&qlock);
   }
   if (*((int *)(memory+tailoffset))==-1) {
     tailoffset=0; //do loop
index 094e9c9df3e77443a51aa62be06bd0195831c71f..85445905b5703e3c82ee26bd95c2f8bad391e4ec 100644 (file)
@@ -14,10 +14,13 @@ extern int bytesSent;
 extern int bytesRecv;
 extern int totalObjSize;
 extern unsigned int myIpAddr;
+extern int getResponse;
+extern int sendRemoteReq;
 
 void handle();
 extern pfcstats_t *evalPrefetch;
 
+/*
 void transStatsHandler(int sig, siginfo_t* info, void *context) {
 #ifdef TRANSSTATS
   FILE *fp;
@@ -42,6 +45,28 @@ void transStatsHandler(int sig, siginfo_t* info, void *context) {
   exit(0);
 #endif
 }
+*/
+
+void transStatsHandler(int sig, siginfo_t* info, void *context) {
+#ifdef TRANSSTATS
+  printf("******  Transaction Stats   ******\n");
+  printf("myIpAddr = %x\n", myIpAddr);
+  printf("numTransCommit = %d\n", numTransCommit);
+  printf("numTransAbort = %d\n", numTransAbort);
+  printf("nchashSearch = %d\n", nchashSearch);
+  printf("nmhashSearch = %d\n", nmhashSearch);
+  printf("nprehashSearch = %d\n", nprehashSearch);
+  printf("nRemoteReadSend = %d\n", nRemoteSend);
+  printf("nSoftAbort = %d\n", nSoftAbort);
+  printf("bytesSent = %d\n", bytesSent);
+  printf("bytesRecv = %d\n", bytesRecv);
+  printf("totalObjSize= %d\n", totalObjSize);
+  printf("sendRemoteReq= %d\n", sendRemoteReq);
+  printf("getResponse= %d\n", getResponse);
+  printf("**********************************\n");
+  exit(0);
+#endif
+}
 
 void handle() {
 #ifdef TRANSSTATS
index 57610e216f7189a61097fb2b8b774f477b2d835e..c209fb4daf0f00855c76f99a0fb25f89b6eb8ac4 100644 (file)
@@ -37,11 +37,23 @@ int nSoftAbortAbort = 0;
 #endif
 
 #ifdef STMSTATS
+int timeInMS ()
+{
+  struct timeval t;
+
+  gettimeofday(&t, NULL);
+  return (
+      (int)t.tv_sec * 1000000 +
+      (int)t.tv_usec
+      );
+}
 /* Thread variable for locking/unlocking */
 __thread threadrec_t *trec;
 __thread struct objlist * lockedobjs;
-/** Global lock **/
-int typesCausingAbort[TOTALNUMCLASSANDARRAY];
+__thread int t_objnumcount=0;
+
+/* Collect stats for object classes causing abort */
+objtypestat_t typesCausingAbort[TOTALNUMCLASSANDARRAY];
 /******Keep track of objects and types causing aborts******/
 /* TODO uncomment for later use
 #define DEBUGSTMSTAT(args...) { \
@@ -49,6 +61,20 @@ int typesCausingAbort[TOTALNUMCLASSANDARRAY];
   fflush(stdout); \
 }
 */
+
+/**
+ * Inline fuction to get Transaction size per object type for those
+ * objects that cause 
+ *
+ **/
+INLINE void getTransSize(objheader_t *header , int *isObjTypeTraverse) {
+  (typesCausingAbort[TYPE(header)]).numabort++;
+  if(isObjTypeTraverse[TYPE(header)] != 1) {
+    (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+    (typesCausingAbort[TYPE(header)]).numtrans+=1; //should this count be kept per object
+  }
+  isObjTypeTraverse[TYPE(header)]=1;
+}
 #define DEBUGSTMSTAT(args...)
 #else
 #define DEBUGSTMSTAT(args...)
@@ -57,15 +83,42 @@ int typesCausingAbort[TOTALNUMCLASSANDARRAY];
 #ifdef STMDEBUG
 #define DEBUGSTM(x...) printf(x);
 #else
-#define DEBUGSTM(x...)
+#define DEBUGSTM(x...);
 #endif
 
-#ifdef FASTMEMCPY
-void * A_memcpy (void * dest, const void * src, size_t count);
+#ifdef STATDEBUG
+#define DEBUGSTATS(x...) printf(x);
 #else
-#define A_memcpy memcpy
+#define DEBUGSTATS(x...);
 #endif
 
+//#ifdef FASTMEMCPY
+//void * A_memcpy (void * dest, const void * src, size_t count);
+//#else
+//#define A_memcpy memcpy
+//#endif
+
+void * A_memcpy (void * dest, const void * src, size_t count) {
+  int off=0;
+  INTPTR *desti=(INTPTR *)dest;
+  INTPTR *srci=(INTPTR *)src;
+
+  //word copy
+  while(count>=sizeof(INTPTR)) {
+    desti[off]=srci[off];
+    off+=1;
+    count-=sizeof(INTPTR);
+  }
+  off*=sizeof(INTPTR);
+  //byte copy
+  while(count>0) {
+    ((char *)dest)[off]=((char *)src)[off];
+    off++;
+    count--;
+  }
+}
+
+
 extern void * curr_heapbase;
 extern void * curr_heapptr;
 extern void * curr_heaptop;
@@ -79,8 +132,13 @@ objlockstate_t *objlockscope;
  * Increments the abort count for each object
  **/
 void ABORTCOUNT(objheader_t * x) {
-  x->abortCount++;  
-  if (x->abortCount > MAXABORTS && (x->riskyflag != 1)) {       
+  int avgTransSize = typesCausingAbort[TYPE(x)].numaccess / typesCausingAbort[TYPE(x)].numtrans; 
+  float transAbortProbForObj = (PERCENT_ALLOWED_ABORT*FACTOR)/(float) avgTransSize;
+  float ObjAbortProb = x->abortCount/(float) (x->accessCount);
+  DEBUGSTM("ABORTSTATS: oid= %x, type= %2d, transAbortProb= %2.2f, ObjAbortProb= %2.2f, Typenumaccess= %3d, avgtranssize = %2d, ObjabortCount= %2d, ObjaccessCount= %3d\n", OID(x), TYPE(x), transAbortProbForObj, ObjAbortProb, typesCausingAbort[TYPE(x)].numaccess, avgTransSize, x->abortCount, x->accessCount);
+  /* Condition for locking objects */
+  if (((ObjAbortProb*100) >= transAbortProbForObj) && (x->riskyflag != 1)) {    
+    DEBUGSTATS("AFTER LOCK ABORTSTATS: oid= %x, type= %2d, transAbortProb= %2.2f, ObjAbortProb= %2.2f, Typenumaccess= %3d, avgtranssize = %2d, ObjabortCount= %2d, ObjaccessCount= %3d\n", OID(x), TYPE(x), transAbortProbForObj, ObjAbortProb, typesCausingAbort[TYPE(x)].numaccess, avgTransSize, x->abortCount, x->accessCount);
     //makes riskflag sticky
     pthread_mutex_lock(&lockedobjstore); 
     if (objlockscope->offset<MAXOBJLIST) { 
@@ -134,6 +192,9 @@ void objstrReset() {
     t_cache=next;
   }
   t_cache->top=t_cache+1;
+#ifdef STMSTATS
+  t_objnumcount=0;
+#endif
 }
 
 //free entire list, starting at store
@@ -249,13 +310,15 @@ void *objstrAlloc(unsigned int size) {
   }
 }
 
+
 /* =============================================================
  * transRead
  * -finds the objects either in main heap
  * -copies the object into the transaction cache
  * =============================================================
  */
-__attribute__ ((pure)) void *transRead(void * oid, void *gl) {
+//__attribute__ ((pure)) 
+void *transRead(void * oid, void *gl) {
   objheader_t *tmp, *objheader;
   objheader_t *objcopy;
   int size;
@@ -273,6 +336,12 @@ __attribute__ ((pure)) void *transRead(void * oid, void *gl) {
   }
 #endif
   A_memcpy(objcopy, header, size);
+#ifdef STMSTATS
+  /* keep track of the object's access sequence in a transaction */
+  objheader_t *tmpheader = objcopy;
+  tmpheader->accessCount = ++t_objnumcount;
+#endif
+
   /* Insert into cache's lookup table */
   STATUS(objcopy)=0;
   if (((unsigned INTPTR)oid)<((unsigned INTPTR ) curr_heapbase)|| ((unsigned INTPTR)oid) >((unsigned INTPTR) curr_heapptr))
@@ -316,6 +385,9 @@ void freelockedobjs() {
 int transCommit(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params) {
 #else
 int transCommit() {
+#endif
+#ifdef TRANSSTATS
+  numTransCommit++;
 #endif
   int softaborted=0;
   do {
@@ -345,6 +417,9 @@ int transCommit() {
 #endif
       objstrReset();
       t_chashreset();
+#ifdef READSET
+      rd_t_chashreset();
+#endif
 #ifdef DELAYCOMP
       dc_t_chashreset();
       ptrstack.count=0;
@@ -355,7 +430,7 @@ int transCommit() {
     }
     if(finalResponse == TRANS_COMMIT) {
 #ifdef TRANSSTATS
-      numTransCommit++;
+      //numTransCommit++;
       if (softaborted) {
        nSoftAbortCommit++;
       }
@@ -366,6 +441,9 @@ int transCommit() {
 #endif
       objstrReset();
       t_chashreset();
+#ifdef READSET
+      rd_t_chashreset();
+#endif
 #ifdef DELAYCOMP
       dc_t_chashreset();
       ptrstack.count=0;
@@ -374,6 +452,7 @@ int transCommit() {
 #endif
       return 0;
     }
+
     /* wait a random amount of time before retrying to commit transaction*/
     if(finalResponse == TRANS_SOFT_ABORT) {
 #ifdef TRANSSTATS
@@ -392,6 +471,9 @@ int transCommit() {
 #endif
        objstrReset();
        t_chashreset();
+#ifdef READSET
+       rd_t_chashreset();
+#endif
 #ifdef DELAYCOMP
        dc_t_chashreset();
        ptrstack.count=0;
@@ -412,6 +494,7 @@ int transCommit() {
 #define freearrays   if (c_numelements>=200) { \
     free(oidrdlocked); \
     free(oidrdversion); \
+    free(oidrdage); \
   } \
   if (t_numelements>=200) { \
     free(oidwrlocked); \
@@ -420,9 +503,46 @@ int transCommit() {
 #define freearrays   if (c_numelements>=200) { \
     free(oidrdlocked); \
     free(oidrdversion); \
+    free(oidrdage); \
     free(oidwrlocked); \
   }
 #endif
+
+#ifdef DELAYCOMP
+#define allocarrays int t_numelements=c_numelements+dc_c_numelements; \
+  if (t_numelements<200) { \
+    oidwrlocked=wrlocked; \
+  } else { \
+    oidwrlocked=malloc(t_numelements*sizeof(void *)); \
+  } \
+  if (c_numelements<200) { \
+    oidrdlocked=rdlocked; \
+    oidrdversion=rdversion; \
+    oidrdage=rdage; \
+  } else { \
+    int size=c_numelements*sizeof(void*); \
+    oidrdlocked=malloc(size); \
+    oidrdversion=malloc(size); \
+    oidrdage=malloc(size); \
+  }
+#else
+#define allocarrays if (c_numelements<200) { \
+    oidrdlocked=rdlocked; \
+    oidrdversion=rdversion; \
+    oidrdage=rdage; \
+    oidwrlocked=wrlocked; \
+  } else { \
+    int size=c_numelements*sizeof(void*); \
+    oidrdlocked=malloc(size); \
+    oidrdversion=malloc(size); \
+    oidwrlocked=malloc(size); \
+    oidrdage=malloc(size); \
+  }
+#endif
+
+
+
+
 /* ==================================================
  * traverseCache
  * - goes through the transaction cache and
@@ -439,39 +559,21 @@ int traverseCache() {
   int numoidwrlocked=0;
   void * rdlocked[200];
   int rdversion[200];
+  int rdage[200];
   void * wrlocked[200];
   int softabort=0;
   int i;
   void ** oidrdlocked;
   void ** oidwrlocked;
+  int * oidrdage;
   int * oidrdversion;
-#ifdef DELAYCOMP
-  int t_numelements=c_numelements+dc_c_numelements;
-  if (t_numelements<200) {
-    oidwrlocked=wrlocked;
-  } else {
-    oidwrlocked=malloc(t_numelements*sizeof(void *));
-  }
-  if (c_numelements<200) {
-    oidrdlocked=rdlocked;
-    oidrdversion=rdversion;
-  } else {
-    int size=c_numelements*sizeof(void*);
-    oidrdlocked=malloc(size);
-    oidrdversion=malloc(size);
-  }
-#else
-  if (c_numelements<200) {
-    oidrdlocked=rdlocked;
-    oidrdversion=rdversion;
-    oidwrlocked=wrlocked;
-  } else {
-    int size=c_numelements*sizeof(void*);
-    oidrdlocked=malloc(size);
-    oidrdversion=malloc(size);
-    oidwrlocked=malloc(size);
-  }
-#endif
+  allocarrays;
+  int objtypetraverse[TOTALNUMCLASSANDARRAY];
+  int ObjSeqId;
+
+  for(i=0; i<TOTALNUMCLASSANDARRAY; i++)
+    objtypetraverse[i] = 0;
+
   chashlistnode_t *ptr = c_table;
   /* Represents number of bins in the chash table */
   unsigned int size = c_size;
@@ -482,8 +584,8 @@ int traverseCache() {
       //if the first bin in hash table is empty
       if(curr->key == NULL)
        break;
-      objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
-      objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
+      objheader_t * headeraddr=&((objheader_t *) curr->val)[-1]; //cached object
+      objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t)); //real object
       unsigned int version = headeraddr->version;
 
       if(STATUS(headeraddr) & DIRTY) {
@@ -496,9 +598,13 @@ int traverseCache() {
            oidwrlocked[numoidwrlocked++] = header;
            transAbortProcess(oidwrlocked, numoidwrlocked);
 #ifdef STMSTATS
-           ABORTCOUNT(header);
-           (typesCausingAbort[TYPE(header)])++;
-        getTotalAbortCount(i+1, size, (void *)(curr->next), numoidrdlocked, oidrdlocked, oidrdversion);
+        header->abortCount++;
+        ObjSeqId = headeraddr->accessCount;
+           (typesCausingAbort[TYPE(header)]).numabort++;
+           (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+        (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+        objtypetraverse[TYPE(header)]=1;
+        getTotalAbortCount(i+1, size, (void *)(curr->next), numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse);
 #endif
            DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
            DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
@@ -507,23 +613,24 @@ int traverseCache() {
            return TRANS_SOFT_ABORT;
              else 
            return TRANS_ABORT;
-          
          }
        } else {
-#ifdef DELAYCOMP
-      //TODO: check to see if we already have lock
-#endif
          if(version == header->version) {
            /* versions match */
            softabort=1;
          }
          transAbortProcess(oidwrlocked, numoidwrlocked);
 #ifdef STMSTATS
-         ABORTCOUNT(header);
-         (typesCausingAbort[TYPE(header)])++;
+      header->abortCount++;
+      ObjSeqId = headeraddr->accessCount;
+      (typesCausingAbort[TYPE(header)]).numabort++;
+      (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+      (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+      objtypetraverse[TYPE(header)]=1;
+         //(typesCausingAbort[TYPE(header)])++;
 #endif
 #if defined(STMSTATS)||defined(SOFTABORT)
-      if(getTotalAbortCount(i+1, size, (void *)(curr->next), numoidrdlocked, oidrdlocked, oidrdversion))
+      if(getTotalAbortCount(i+1, size, (void *)(curr->next), numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse))
            softabort=0;
 #endif
          DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
@@ -536,15 +643,21 @@ int traverseCache() {
       
        }
       } else {
+#ifdef STMSTATS
        oidrdversion[numoidrdlocked]=version;
-       oidrdlocked[numoidrdlocked++] = header;
+       oidrdlocked[numoidrdlocked]=header;
+    oidrdage[numoidrdlocked++]=headeraddr->accessCount;
+#else
+    oidrdversion[numoidrdlocked]=version;
+    oidrdlocked[numoidrdlocked++]=header;
+#endif
       }
       curr = curr->next;
     }
   } //end of for
 
 #ifdef DELAYCOMP
-  //acquire other locks
+  //acquire access set locks
   unsigned int numoidwrtotal=numoidwrlocked;
 
   chashlistnode_t *dc_curr = dc_c_list;
@@ -574,11 +687,15 @@ int traverseCache() {
       //have to abort to avoid deadlock
       transAbortProcess(oidwrlocked, numoidwrtotal);
 #ifdef STMSTATS
-      ABORTCOUNT(header);
-      (typesCausingAbort[TYPE(header)])++;
+      ObjSeqId = headeraddr->accessCount;
+      header->abortCount++;
+      (typesCausingAbort[TYPE(header)]).numabort++;
+      (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+      (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+      objtypetraverse[TYPE(header)]=1;
 #endif
 #if defined(STMSTATS)||defined(SOFTABORT)
-      if(getTotalAbortCount(i+1, size, (void *)(curr->next), numoidrdlocked, oidrdlocked, oidrdversion))
+      if(getTotalAbortCount(i+1, size, (void *)(curr->next), numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse))
        softabort=0;
 #endif
       DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
@@ -601,6 +718,7 @@ int traverseCache() {
     objheader_t *header=oidrdlocked[i];
     unsigned int version=oidrdversion[i];
     if(header->lock>0) { //not write locked
+      CFENCE;
       if(version != header->version) { /* versions do not match */
 #ifdef DELAYCOMP
        transAbortProcess(oidwrlocked, numoidwrtotal);
@@ -608,11 +726,15 @@ int traverseCache() {
        transAbortProcess(oidwrlocked, numoidwrlocked);
 #endif
 #ifdef STMSTATS
-       ABORTCOUNT(header);
-       (typesCausingAbort[TYPE(header)])++;
-       getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion);
-#endif
-       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+    ObjSeqId = oidrdage[i];
+    header->abortCount++;
+    (typesCausingAbort[TYPE(header)]).numabort++;
+    (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+    (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+    objtypetraverse[TYPE(header)]=1;
+       getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse);
+#endif
+       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u oid: %x\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version, OID(header));
        DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
        freearrays;
        return TRANS_ABORT;
@@ -624,11 +746,15 @@ int traverseCache() {
       if (version!=header->version) {
        transAbortProcess(oidwrlocked, numoidwrtotal);
 #ifdef STMSTATS
-       ABORTCOUNT(header);
-       (typesCausingAbort[TYPE(header)])++;
-       getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion);
-#endif
-       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+    ObjSeqId = oidrdage[i];
+    header->abortCount++;
+    (typesCausingAbort[TYPE(header)]).numabort++;
+    (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+    (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+    objtypetraverse[TYPE(header)]=1;
+       getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse);
+#endif
+       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u, oid: %x\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version, OID(header));
        DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
        freearrays;
        return TRANS_ABORT;
@@ -645,23 +771,115 @@ int traverseCache() {
       transAbortProcess(oidwrlocked, numoidwrlocked);
 #endif
 #ifdef STMSTATS
-      ABORTCOUNT(header);
-      (typesCausingAbort[TYPE(header)])++;
+      ObjSeqId = oidrdage[i];
+      header->abortCount++;
+      (typesCausingAbort[TYPE(header)]).numabort++;
+      (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+      (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+      objtypetraverse[TYPE(header)]=1;
 #endif
 #if defined(STMSTATS)||defined(SOFTABORT)
-      if(getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion))
+      if(getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse))
        softabort=0;
 #endif
-      DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+      DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u oid: %x\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version, OID(header));
       DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
       freearrays;
       if (softabort)
        return TRANS_SOFT_ABORT;
       else 
        return TRANS_ABORT;
-      
     }
   }
+
+#ifdef READSET
+  //need to validate auxilary readset
+  rdchashlistnode_t *rd_curr = rd_c_list;
+  /* Inner loop to traverse the linked list of the cache lookupTable */
+  while(likely(rd_curr != NULL)) {
+    //if the first bin in hash table is empty
+    unsigned int version=rd_curr->version;
+    objheader_t *header=(objheader_t *)(((char *)rd_curr->key)-sizeof(objheader_t));
+    if(header->lock>0) { //object is not locked
+      if (version!=header->version) {
+       //have to abort
+#ifdef DELAYCOMP
+       transAbortProcess(oidwrlocked, numoidwrtotal);
+#else
+       transAbortProcess(oidwrlocked, numoidwrlocked);
+#endif
+#ifdef STMSTATS
+       //ABORTCOUNT(header);
+       (typesCausingAbort[TYPE(header)])++;
+#endif
+#if defined(STMSTATS)||defined(SOFTABORT)
+       //if(getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion))
+       //  softabort=0;
+#endif
+       DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u oid: %x\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version, OID(header));
+       DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
+       freearrays;
+       if (softabort)
+         return TRANS_SOFT_ABORT;
+       else
+         return TRANS_ABORT;   
+      }
+    } else {
+      //maybe we already have lock
+      if (version==header->version) {
+       void * key=rd_curr->key;
+#ifdef DELAYCOMP
+       //check to see if it is in the delaycomp table
+       {
+         chashlistnode_t *node = &dc_c_table[(((unsigned INTPTR)key) & dc_c_mask)>>4];
+         do {
+           if(node->key == key)
+             goto nextloopread;
+           node = node->next;
+         } while(node != NULL);
+       }
+#endif
+       //check normal table
+       {
+         chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
+         do {
+           if(node->key == key) {
+             objheader_t * headeraddr=&((objheader_t *) node->val)[-1];          
+             if(STATUS(headeraddr) & DIRTY) {
+               goto nextloopread;
+             }
+           }
+           node = node->next;
+         } while(node != NULL);
+       }
+      }
+#ifdef DELAYCOMP
+      //have to abort to avoid deadlock
+      transAbortProcess(oidwrlocked, numoidwrtotal);
+#else
+      transAbortProcess(oidwrlocked, numoidwrlocked);
+#endif
+
+#ifdef STMSTATS
+      //ABORTCOUNT(header);
+      (typesCausingAbort[TYPE(header)])++;
+#endif
+#if defined(STMSTATS)||defined(SOFTABORT)
+      //if(getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion))
+       //softabort=0;
+#endif
+      DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u oid: %x\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version, OID(header));
+      DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
+      freearrays;
+      if (softabort)
+       return TRANS_SOFT_ABORT;
+      else
+       return TRANS_ABORT;
+    }
+  nextloopread:
+    rd_curr = rd_curr->lnext;
+  }
+#endif
   
   /* Decide the final response */
 #ifdef DELAYCOMP
@@ -691,39 +909,20 @@ int alttraverseCache() {
   int numoidwrlocked=0;
   void * rdlocked[200];
   int rdversion[200];
+  int rdage[200];
   void * wrlocked[200];
   int softabort=0;
   int i;
   void ** oidrdlocked;
   int * oidrdversion;
+  int * oidrdage;
   void ** oidwrlocked;
-#ifdef DELAYCOMP
-  int t_numelements=c_numelements+dc_c_numelements;
-  if (t_numelements<200) {
-    oidwrlocked=wrlocked;
-  } else {
-    oidwrlocked=malloc(t_numelements*sizeof(void *));
-  }
-  if (c_numelements<200) {
-    oidrdlocked=rdlocked;
-    oidrdversion=rdversion;
-  } else {
-    int size=c_numelements*sizeof(void*);
-    oidrdlocked=malloc(size);
-    oidrdversion=malloc(size);
-  }
-#else
-  if (c_numelements<200) {
-    oidrdlocked=rdlocked;
-    oidrdversion=rdversion;
-    oidwrlocked=wrlocked;
-  } else {
-    int size=c_numelements*sizeof(void*);
-    oidrdlocked=malloc(size);
-    oidrdversion=malloc(size);
-    oidwrlocked=malloc(size);
-  }
-#endif
+  allocarrays;
+  int ObjSeqId;
+  int objtypetraverse[TOTALNUMCLASSANDARRAY];
+
+  for(i=0; i<TOTALNUMCLASSANDARRAY; i++)
+    objtypetraverse[i] = 0;
   chashlistnode_t *curr = c_list;
   /* Inner loop to traverse the linked list of the cache lookupTable */
   while(likely(curr != NULL)) {
@@ -742,11 +941,15 @@ int alttraverseCache() {
          oidwrlocked[numoidwrlocked++] = header;
          transAbortProcess(oidwrlocked, numoidwrlocked);
 #ifdef STMSTATS
-         ABORTCOUNT(header);
-         (typesCausingAbort[TYPE(header)])++;
-      getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion);
-#endif
-         DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+      header->abortCount++;
+      ObjSeqId = headeraddr->accessCount; 
+      (typesCausingAbort[TYPE(header)]).numabort++;
+      (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+      (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+      objtypetraverse[TYPE(header)]=1;
+      getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse);
+#endif
+         DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u oid: %x\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version, OID(header));
          DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
          freearrays;
          return TRANS_ABORT;
@@ -758,14 +961,18 @@ int alttraverseCache() {
        }
        transAbortProcess(oidwrlocked, numoidwrlocked);
 #ifdef STMSTATS
-       ABORTCOUNT(header);
-       (typesCausingAbort[TYPE(header)])++;
+    header->abortCount++;
+    ObjSeqId = headeraddr->accessCount; 
+    (typesCausingAbort[TYPE(header)]).numabort++;
+    (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+    (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+    objtypetraverse[TYPE(header)]=1;
 #endif
 #if defined(STMSTATS)||defined(SOFTABORT)
-    if(getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion))
+    if(getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse))
          softabort=0;
 #endif
-       DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+       DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u oid: %x\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version, OID(header));
        DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
        freearrays;
        if (softabort)
@@ -790,7 +997,7 @@ int alttraverseCache() {
     //if the first bin in hash table is empty
     objheader_t * headeraddr=&((objheader_t *) dc_curr->val)[-1];
     objheader_t *header=(objheader_t *)(((char *)dc_curr->key)-sizeof(objheader_t));
-    if(write_trylock(&header->lock)) { //can aquire write lock    
+    if(write_trylock(&header->lock)) { //can aquire write lock
       oidwrlocked[numoidwrtotal++] = header;
     } else {
       //maybe we already have lock
@@ -799,7 +1006,7 @@ int alttraverseCache() {
       
       do {
        if(node->key == key) {
-         objheader_t * headeraddr=&((objheader_t *) node->val)[-1];      
+         objheader_t * headeraddr=&((objheader_t *) node->val)[-1];
          if(STATUS(headeraddr) & DIRTY) {
            goto nextloop;
          }
@@ -810,14 +1017,18 @@ int alttraverseCache() {
       //have to abort to avoid deadlock
       transAbortProcess(oidwrlocked, numoidwrtotal);
 #ifdef STMSTATS
-      ABORTCOUNT(header);
-      (typesCausingAbort[TYPE(header)])++;
+      header->abortCount++;
+      ObjSeqId = headeraddr->accessCount; 
+      (typesCausingAbort[TYPE(header)]).numabort++;
+      (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+      (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+      objtypetraverse[TYPE(header)]=1;
 #endif
 #if defined(STMSTATS)||defined(SOFTABORT)
-      if(getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion))
+      if(getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse))
        softabort=0;
 #endif
-      DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+      DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u oid: %x\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version, OID(header));
       DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
       freearrays;
       if (softabort)
@@ -835,7 +1046,8 @@ int alttraverseCache() {
   for(i=0; i<numoidrdlocked; i++) {
     objheader_t * header=oidrdlocked[i];
     unsigned int version=oidrdversion[i];
-    if(header->lock>=0) {
+    if(header->lock>0) {
+      CFENCE;
       if(version != header->version) {
 #ifdef DELAYCOMP
        transAbortProcess(oidwrlocked, numoidwrtotal);
@@ -843,17 +1055,35 @@ int alttraverseCache() {
        transAbortProcess(oidwrlocked, numoidwrlocked);
 #endif
 #ifdef STMSTATS
-       ABORTCOUNT(header);
-       (typesCausingAbort[TYPE(header)])++;
-       getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion);
-#endif
-       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+    ObjSeqId = oidrdage[i];
+    header->abortCount++;
+    (typesCausingAbort[TYPE(header)]).numabort++;
+    (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+    (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+    objtypetraverse[TYPE(header)]=1;
+       getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse);
+#endif
+       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u oid: %x\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version, OID(header));
        DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
        freearrays;
        return TRANS_ABORT;
       }
 #ifdef DELAYCOMP
-      //TODO: check to see if we already have lock
+    } else if (dc_t_chashSearch(((char *)header)+sizeof(objheader_t))!=NULL) {
+      //couldn't get lock because we already have it
+      //check if it is the right version number
+      if (version!=header->version) {
+       transAbortProcess(oidwrlocked, numoidwrtotal);
+#ifdef STMSTATS
+    ObjSeqId = oidrdage[i];
+    header->abortCount++;
+       getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse);
+#endif
+       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u oid: %x\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version, OID(header));
+       DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
+       freearrays;
+       return TRANS_ABORT;
+      }
 #endif
     } else { /* cannot aquire lock */
       if(version == header->version) {
@@ -865,11 +1095,15 @@ int alttraverseCache() {
       transAbortProcess(oidwrlocked, numoidwrlocked);
 #endif
 #ifdef STMSTATS
-      ABORTCOUNT(header);
-      (typesCausingAbort[TYPE(header)])++;
+      ObjSeqId = oidrdage[i];
+      header->abortCount++;
+      (typesCausingAbort[TYPE(header)]).numabort++;
+      (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+      (typesCausingAbort[TYPE(header)]).numtrans+=1; 
+      objtypetraverse[TYPE(header)]=1;
 #endif
 #if defined(STMSTATS)||defined(SOFTABORT)
-      if(getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion))
+      if(getReadAbortCount(i+1, numoidrdlocked, oidrdlocked, oidrdversion, oidrdage, ObjSeqId, header, objtypetraverse))
        softabort=0;
 #endif
       DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
@@ -882,6 +1116,93 @@ int alttraverseCache() {
     }
   }
 
+#ifdef READSET
+  //need to validate auxilary readset
+  rdchashlistnode_t *rd_curr = rd_c_list;
+  /* Inner loop to traverse the linked list of the cache lookupTable */
+  while(likely(rd_curr != NULL)) {
+    //if the first bin in hash table is empty
+    int version=rd_curr->version;
+    objheader_t *header=(objheader_t *)(((char *)rd_curr->key)-sizeof(objheader_t));
+    if(header->lock>0) { //object is not locked
+      if (version!=header->version) {
+       //have to abort
+#ifdef DELAYCOMP
+       transAbortProcess(oidwrlocked, numoidwrtotal);
+#else
+       transAbortProcess(oidwrlocked, numoidwrlocked);
+#endif
+#ifdef STMSTATS
+       //ABORTCOUNT(header);
+       (typesCausingAbort[TYPE(header)])++;
+#endif
+#if defined(STMSTATS)||defined(SOFTABORT)
+       //if(getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion))
+       //  softabort=0;
+#endif
+       DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+       DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
+       freearrays;
+       if (softabort)
+         return TRANS_SOFT_ABORT;
+       else
+         return TRANS_ABORT;   
+      }
+    } else {
+      if (version==header->version) {
+       void * key=rd_curr->key;
+#ifdef DELAYCOMP
+       //check to see if it is in the delaycomp table
+       {
+         chashlistnode_t *node = &dc_c_table[(((unsigned INTPTR)key) & dc_c_mask)>>4];
+         do {
+           if(node->key == key)
+             goto nextloopread;
+           node = node->next;
+         } while(node != NULL);
+       }
+#endif
+       //check normal table
+       {
+         chashlistnode_t *node = &c_table[(((unsigned INTPTR)key) & c_mask)>>4];
+         do {
+           if(node->key == key) {
+             objheader_t * headeraddr=&((objheader_t *) node->val)[-1];          
+             if(STATUS(headeraddr) & DIRTY) {
+               goto nextloopread;
+             }
+           }
+           node = node->next;
+         } while(node != NULL);
+       }
+      }
+#ifdef DELAYCOMP
+       //have to abort to avoid deadlock
+       transAbortProcess(oidwrlocked, numoidwrtotal);
+#else
+       transAbortProcess(oidwrlocked, numoidwrlocked);
+#endif
+#ifdef STMSTATS
+      //ABORTCOUNT(header);
+      (typesCausingAbort[TYPE(header)])++;
+#endif
+#if defined(STMSTATS)||defined(SOFTABORT)
+     // if(getTotalAbortCount2((void *) curr->next, numoidrdlocked, oidrdlocked, oidrdversion))
+       //softabort=0;
+#endif
+      DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
+      DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
+      freearrays;
+      if (softabort)
+       return TRANS_SOFT_ABORT;
+      else
+       return TRANS_ABORT;
+    }
+  nextloopread:
+    rd_curr = rd_curr->lnext;
+  }
+#endif
+
   /* Decide the final response */
 #ifdef DELAYCOMP
   transCommitProcess(oidwrlocked, numoidwrlocked, numoidwrtotal, commitmethod, primitives, locals, params);
@@ -959,12 +1280,8 @@ void transAbortProcess(void **oidwrlocked, int numoidwrlocked) {
     dst->___cachedCode___=src->___cachedCode___;
     dst->___cachedHash___=src->___cachedHash___;
     A_memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
-    __asm__ __volatile__("": : :"memory");
-#ifndef DELAYCOMP
-    header->version++;
-#endif
   }
-  __asm__ __volatile__("": : :"memory");
+  CFENCE;
 
 #ifdef DELAYCOMP
   //  call commit method
@@ -981,9 +1298,7 @@ void transAbortProcess(void **oidwrlocked, int numoidwrlocked) {
   for(i=numoidwrlocked-1; i>=0; i--) {
 #endif
     header = (objheader_t *)oidwrlocked[i];
-#ifdef DELAYCOMP
     header->version++;
-#endif
     write_unlock(&header->lock);
   }
 
@@ -1011,12 +1326,16 @@ void transAbortProcess(void **oidwrlocked, int numoidwrlocked) {
  *        : numoidrdlocked : number of objects read that are locked
  *        : oidrdlocked : array of objects read and currently locked
  *        : oidrdversion : array of versions of object read
+ *        : oidrdage : array of ages of objects read ina transaction cache
+ *        : ObjSeqId : sequence Id/age to start the comparision with
  * =========================================================================================
  **/
-int getTotalAbortCount(int start, int stop, void *startptr, int numoidrdlocked, void *oidrdlocked, int *oidrdversion) {
+int getTotalAbortCount(int start, int stop, void *startptr, int numoidrdlocked, 
+    void *oidrdlocked, int *oidrdversion, int *oidrdage, int ObjSeqId, objheader_t *header, int *isObjTypeTraverse) {
   int i;
   int hardabort=0;
   int isFirstTime=0;
+  objheader_t *ObjToBeLocked=header;
   chashlistnode_t *curr = (chashlistnode_t *) startptr;
   chashlistnode_t *ptr = c_table;
   /* First go through all objects left in the cache that have not been covered yet */
@@ -1033,8 +1352,12 @@ int getTotalAbortCount(int start, int stop, void *startptr, int numoidrdlocked,
       /* versions do not match */
       if(version != header->version) {
 #ifdef STMSTATS
-        ABORTCOUNT(header);
-        (typesCausingAbort[TYPE(header)])++;
+        header->abortCount++;
+        if(ObjSeqId > headeraddr->accessCount) {
+          ObjSeqId = headeraddr->accessCount;
+          ObjToBeLocked = header;
+        }
+        getTransSize(header, isObjTypeTraverse);
 #endif
         hardabort=1;
       }
@@ -1046,18 +1369,28 @@ int getTotalAbortCount(int start, int stop, void *startptr, int numoidrdlocked,
   /* Then go through all objects that are read and are currently present in the readLockedArray */
   if(numoidrdlocked>0) {
     for(i=0; i<numoidrdlocked; i++) {
-      objheader_t *header = ((void **)oidrdlocked)[i];
+      objheader_t *header = ((void **) oidrdlocked)[i];
+      int OidAge = oidrdage[i];
       unsigned int version = oidrdversion[i];
       if(version != header->version) { /* versions do not match */
 #ifdef STMSTATS
-        ABORTCOUNT(header);
-        (typesCausingAbort[TYPE(header)])++;
+        header->abortCount++;
+        if(ObjSeqId > OidAge) {
+          ObjSeqId = OidAge;
+          ObjToBeLocked = header;
+        }
+        getTransSize(header, isObjTypeTraverse);
 #endif
         hardabort=1;
       }
     }
   }
 
+  /* Acquire lock on the oldest object accessed in the transaction cache */
+  if(ObjToBeLocked != NULL) {
+    ABORTCOUNT(ObjToBeLocked);
+  }
+
   return hardabort;
 }
 
@@ -1067,11 +1400,16 @@ int getTotalAbortCount(int start, int stop, void *startptr, int numoidrdlocked,
  *        : numoidrdlocked : number of objects read that are locked
  *        : oidrdlocked : array of objects read and currently locked
  *        : oidrdversion : array of versions of object read
+ *        : oidrdage : array of ages of objects read ina transaction cache
+ *        : ObjSeqId : sequence Id/age to start the comparision with
  * =========================================================================================
  **/
-int getTotalAbortCount2(void *startptr, int numoidrdlocked, void *oidrdlocked, int *oidrdversion) {
+int getTotalAbortCount2(void *startptr, int numoidrdlocked, void *oidrdlocked, 
+    int *oidrdversion, int *oidrdage, int ObjSeqId, objheader_t *header, int *isObjTypeTraverse) {
   int hardabort=0;
   chashlistnode_t *curr = (chashlistnode_t *) startptr;
+  objheader_t *ObjToBeLocked=header;
+
   /* Inner loop to traverse the linked list of the cache lookupTable */
   while(curr != NULL) {
     objheader_t *headeraddr=&((objheader_t *) curr->val)[-1];
@@ -1080,8 +1418,12 @@ int getTotalAbortCount2(void *startptr, int numoidrdlocked, void *oidrdlocked, i
     /* versions do not match */
     if(version != header->version) {
 #ifdef STMSTATS
-      ABORTCOUNT(header);
-      (typesCausingAbort[TYPE(header)])++;
+      header->abortCount++;
+      if(ObjSeqId > headeraddr->accessCount) {
+        ObjSeqId = headeraddr->accessCount;
+        ObjToBeLocked = header;
+      }
+      getTransSize(header, isObjTypeTraverse);
 #endif
       hardabort=1;
     }
@@ -1094,16 +1436,25 @@ int getTotalAbortCount2(void *startptr, int numoidrdlocked, void *oidrdlocked, i
     for(i=0; i<numoidrdlocked; i++) {
       objheader_t *header = ((void **)oidrdlocked)[i];
       unsigned int version = oidrdversion[i];
+      int OidAge = oidrdage[i];
       if(version != header->version) { /* versions do not match */
 #ifdef STMSTATS
-        ABORTCOUNT(header);
-        (typesCausingAbort[TYPE(header)])++;
+        header->abortCount++;
+        if(ObjSeqId > OidAge) {
+          ObjSeqId = OidAge;
+          ObjToBeLocked = header;
+        }
+        getTransSize(header, isObjTypeTraverse);
 #endif
         hardabort=1;
       }
     }
   }
 
+  if(ObjToBeLocked!=NULL) {
+    ABORTCOUNT(ObjToBeLocked);
+  }
+
   return hardabort;
 }
 
@@ -1113,25 +1464,41 @@ int getTotalAbortCount2(void *startptr, int numoidrdlocked, void *oidrdlocked, i
  * params: int start, int stop are indexes to readLocked array
  *         void  *oidrdlocked = readLocked array
  *         int *oidrdversion = version array
+ *        : oidrdage : array of ages of objects read ina transaction cache
+ *        : ObjSeqId : sequence Id/age to start the comparision with
  **/
-int getReadAbortCount(int start, int stop, void *oidrdlocked, int *oidrdversion) {
+int getReadAbortCount(int start, int stop, void *oidrdlocked, int *oidrdversion, 
+    int *oidrdage, int ObjSeqId, objheader_t *header, int *isObjTypeTraverse) {
   int i;
   int hardabort=0;
+  objheader_t *ObjToBeLocked=header;
+
   /* Go through oids read that are locked */
   for(i = start; i < stop; i++) {
     objheader_t *header = ((void **)oidrdlocked)[i];
     unsigned int version = oidrdversion[i];
+    int OidAge = oidrdage[i];
     if(version != header->version) { /* versions do not match */
 #ifdef STMSTATS
-      ABORTCOUNT(header);
-      (typesCausingAbort[TYPE(header)])++;
+      header->abortCount++;
+      if(ObjSeqId > OidAge) {
+        ObjSeqId = OidAge;
+        ObjToBeLocked = header;
+      }
+      getTransSize(header, isObjTypeTraverse);
 #endif
       hardabort=1;
     }
   }
+
+  if(ObjToBeLocked != NULL) {
+    ABORTCOUNT(ObjToBeLocked);
+  }
+
   return hardabort;
 }
 
+
 /**
  * needLock
  * params: Object header, ptr to garbage collector
@@ -1150,7 +1517,7 @@ objheader_t * needLock(objheader_t *header, void *gl) {
   } else { //failed to get lock
     trec->blocked=1;
     //memory barrier
-    __asm__ __volatile__("":::"memory");
+    CFENCE;
     //see if other thread is blocked
     if(ptr->blocked == 1) {
       //it might be block, so ignore lock and clear our blocked flag
@@ -1189,4 +1556,18 @@ objheader_t * needLock(objheader_t *header, void *gl) {
   return header;
 }
 
+/**
+ * Inline fuction to get Transaction size per object type for those
+ * objects that cause 
+ *
+ **/
+/*
+INLINE void getTransSize(objheader_t *header , int *isObjTypeTraverse) {
+  (typesCausingAbort[TYPE(header)]).numabort++;
+  if(isObjTypeTraverse[TYPE(header)] != 1)
+    (typesCausingAbort[TYPE(header)]).numaccess+=c_numelements;
+  isObjTypeTraverse[TYPE(header)]=1;
+}
+*/
+
 #endif
index d73e4e988a33e9d6cc449d828e15c13fd65aa854..c39f142896206050eb5161d61a3431d640769b3a 100644 (file)
@@ -1,23 +1,19 @@
 #include "sockpool.h"
 #include <netinet/tcp.h>
 
-#ifdef RECOVERY
-#define TIMEOUT_TIME 3
-#endif
-
 #if defined(__i386__)
 inline int test_and_set(volatile unsigned int *addr) {
-       int oldval;
-       /* Note: the "xchg" instruction does not need a "lock" prefix */
-       __asm__ __volatile__ ("xchgl %0, %1"
+  int oldval;
+  /* Note: the "xchg" instruction does not need a "lock" prefix */
+  __asm__ __volatile__ ("xchgl %0, %1"
                        : "=r" (oldval), "=m" (*(addr))
                        : "0" (1), "m" (*(addr)));
-       return oldval;
+  return oldval;
 }
 inline void UnLock(volatile unsigned int *addr) {
-       int oldval;
-       /* Note: the "xchg" instruction does not need a "lock" prefix */
-       __asm__ __volatile__ ("xchgl %0, %1"
+  int oldval;
+  /* Note: the "xchg" instruction does not need a "lock" prefix */
+  __asm__ __volatile__ ("xchgl %0, %1"
                        : "=r" (oldval), "=m" (*(addr))
                        : "0" (0), "m" (*(addr)));
 }
@@ -28,15 +24,15 @@ inline void UnLock(volatile unsigned int *addr) {
 #define MAXSPINS 4
 
 inline void Lock(volatile unsigned int *s) {
-       while(test_and_set(s)) {
-               int i=0;
-               while(*s) {
-                       if (i++>MAXSPINS) {
-                               sched_yield();
-                               i=0;
-                       }
-               }
-       }
+  while(test_and_set(s)) {
+    int i=0;
+    while(*s) {
+      if (i++>MAXSPINS) {
+       sched_yield();
+       i=0;
+      }
+    }
+  }
 }
 
 sockPoolHashTable_t *createSockPool(sockPoolHashTable_t * sockhash, unsigned int size) {
@@ -67,17 +63,7 @@ int createNewSocket(unsigned int mid) {
   if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
     printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__);
     return -1;
-       }
-#ifdef RECOVERY        
-#ifdef DEBUG
-       printf("%s-> Setting timeouts for sd:%d\n", __func__, sd);
-#endif
-       struct timeval tv;
-       tv.tv_sec = TIMEOUT_TIME;
-       tv.tv_usec = 0;
-       setsockopt(sd, SOL_SOCKET, SO_SNDTIMEO, (struct timeval *)&tv, sizeof(tv));
-       setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (struct timeval *)&tv, sizeof(tv));
-#endif
+  }
   setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(flag));
   struct sockaddr_in remoteAddr;
   bzero(&remoteAddr, sizeof(remoteAddr));
@@ -119,7 +105,6 @@ int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
     inusenode->sd = sd;
     inusenode->mid = mid;
     insToListWithLock(sockhash, inusenode);
-               printf("returning sd:%d\n", sd);
     return sd;
   } else {
     return -1;
@@ -188,7 +173,6 @@ int getSock2WithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
   while(*ptr!=NULL) {
     if (mid == (*ptr)->mid) {
       UnLock(&sockhash->mylock);
-                       printf("RETURNING SD\n");
       return (*ptr)->sd;
     }
     ptr=&((*ptr)->next);
@@ -199,7 +183,6 @@ int getSock2WithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
     inusenode->sd = sd;
     inusenode->mid = mid;
     addSockWithLock(sockhash, inusenode);
-         printf("RETURNING NEW SD\n");
     return sd;
   } else {
     return -1;
index 15ef3ab94399c39c47b10e4784194d46a0814012..971f90262c7f16d04798f541a2f9d62150a4d4f6 100644 (file)
@@ -6,6 +6,7 @@ inline void initdsmlocks(volatile unsigned int *addr) {
   (*addr) = RW_LOCK_BIAS;
 }
 
+/*
 int write_trylock(volatile unsigned int *lock) {
   int retval=0;
   __asm__ __volatile__("xchgl %0,%1"
@@ -14,8 +15,9 @@ int write_trylock(volatile unsigned int *lock) {
               : "memory");
   return retval;
 }
+*/
 
 void write_unlock(volatile unsigned int *lock) {
-  __asm __volatile__("movl $1, %0" : "+m" (*__xg(lock))::"memory");
+  __asm __volatile__("movl $1, %0" : "+m" (*lock)::"memory");
 }
 
index 4c79da49e514b6c5c1a6382e72d7b964f99033c5..a396d8a530b9ef75e613f7b19b105b50b06f77a1 100644 (file)
@@ -3,6 +3,7 @@
 
 #define RW_LOCK_BIAS                 1
 #define LOCK_UNLOCKED          { LOCK_BIAS }
+#define CFENCE   asm volatile("":::"memory");
 
 struct __xchg_dummy {
        unsigned long a[100];
@@ -11,23 +12,24 @@ struct __xchg_dummy {
 #define __xg(x) ((struct __xchg_dummy *)(x))
 
 void initdsmlocks(volatile unsigned int *addr);
-int write_trylock(volatile unsigned int *lock);
+//int write_trylock(volatile unsigned int *lock);
 void write_unlock(volatile unsigned int *lock);
 
 /*
 static inline void initdsmlocks(volatile unsigned int *addr) {
   (*addr) = RW_LOCK_BIAS;
 }
-
+*/
 static inline int write_trylock(volatile unsigned int *lock) {
   int retval=0;
   __asm__ __volatile__("xchgl %0,%1"
                       : "=r"(retval)
-                      : "m"(*__xg(lock)), "0"(retval)
+                      : "m"(*lock), "0"(retval)
                       : "memory");
   return retval;
 }
 
+/*
 static inline void write_unlock(volatile unsigned int *lock) {
   __asm __volatile__("movl $1, %0" : "+m" (*__xg(lock))::"memory");
 }
index 7c7afe33c229ca231db05bd0a44c4203a3f31b67..12345be618cec1cf3cf323b6ceaad17620686914 100644 (file)
@@ -10,6 +10,197 @@ __thread unsigned int c_threshold;
 __thread double c_loadfactor;
 __thread cliststruct_t *c_structs;
 
+#ifdef READSET
+__thread rdchashlistnode_t *rd_c_table;
+__thread rdchashlistnode_t *rd_c_list;
+__thread unsigned int rd_c_size;
+__thread unsigned INTPTR rd_c_mask;
+__thread unsigned int rd_c_numelements;
+__thread unsigned int rd_c_threshold;
+__thread double rd_c_loadfactor;
+__thread rdcliststruct_t *rd_c_structs;
+
+void rd_t_chashCreate(unsigned int size, double loadfactor) {
+  chashtable_t *ctable;
+  chashlistnode_t *nodes;
+  int i;
+
+  // Allocate space for the hash table
+  rd_c_table = calloc(size, sizeof(rdchashlistnode_t));
+  rd_c_loadfactor = loadfactor;
+  rd_c_size = size;
+  rd_c_threshold=size*loadfactor;
+  rd_c_mask = (size << 4)-1;
+  rd_c_structs=calloc(1, sizeof(rdcliststruct_t));
+  rd_c_numelements = 0; // Initial number of elements in the hash
+  rd_c_list=NULL;
+}
+
+void rd_t_chashreset() {
+  rdchashlistnode_t *ptr = rd_c_table;
+  int i;
+
+  if (rd_c_numelements<(rd_c_size>>4)) {
+    rdchashlistnode_t *top=&ptr[rd_c_size];
+    rdchashlistnode_t *tmpptr=rd_c_list;
+    while(tmpptr!=NULL) {
+      rdchashlistnode_t *next=tmpptr->lnext;
+      if (tmpptr>=ptr&&tmpptr<top) {
+       //zero in list
+       tmpptr->key=NULL;
+       tmpptr->next=NULL;
+      }
+      tmpptr=next;
+    }
+  } else {
+    bzero(rd_c_table, sizeof(rdchashlistnode_t)*rd_c_size);
+  }
+  while(rd_c_structs->next!=NULL) {
+    rdcliststruct_t *next=rd_c_structs->next;
+    free(rd_c_structs);
+    rd_c_structs=next;
+  }
+  rd_c_structs->num = 0;
+  rd_c_numelements = 0;
+  rd_c_list=NULL;
+}
+
+//Store objects and their pointers into hash
+void rd_t_chashInsertOnce(void * key, unsigned int version) {
+  rdchashlistnode_t *ptr;
+
+  if (key==NULL)
+    return;
+
+  if(rd_c_numelements > (rd_c_threshold)) {
+    //Resize
+    unsigned int newsize = rd_c_size << 1;
+    rd_t_chashResize(newsize);
+  }
+
+  ptr = &rd_c_table[(((unsigned INTPTR)key)&rd_c_mask)>>4];
+
+  if(ptr->key==0) {
+    ptr->key=key;
+    ptr->version=version;
+    ptr->lnext=rd_c_list;
+    rd_c_list=ptr;
+    rd_c_numelements++;
+  } else { // Insert in the beginning of linked list
+    rdchashlistnode_t * node;
+    rdchashlistnode_t *search=ptr;
+    
+    //make sure it isn't here
+    do {
+      if(search->key == key) {
+       return;
+      }
+      search=search->next;
+    } while(search != NULL);
+
+    rd_c_numelements++;    
+    if (rd_c_structs->num<NUMCLIST) {
+      node=&rd_c_structs->array[rd_c_structs->num];
+      rd_c_structs->num++;
+    } else {
+      //get new list
+      rdcliststruct_t *tcl=calloc(1,sizeof(rdcliststruct_t));
+      tcl->next=rd_c_structs;
+      rd_c_structs=tcl;
+      node=&tcl->array[0];
+      tcl->num=1;
+    }
+    node->key = key;
+    node->version = version;
+    node->next = ptr->next;
+    ptr->next=node;
+    node->lnext=rd_c_list;
+    rd_c_list=node;
+  }
+}
+
+unsigned int rd_t_chashResize(unsigned int newsize) {
+  rdchashlistnode_t *node, *ptr, *curr;    // curr and next keep track of the current and the next chashlistnodes in a linked list
+  unsigned int oldsize;
+  int isfirst;    // Keeps track of the first element in the chashlistnode_t for each bin in hashtable
+  unsigned int i,index;
+  unsigned int mask;
+
+  ptr = rd_c_table;
+  oldsize = rd_c_size;
+  rd_c_list=NULL;
+
+  if((node = calloc(newsize, sizeof(rdchashlistnode_t))) == NULL) {
+    printf("Calloc error %s %d\n", __FILE__, __LINE__);
+    return 1;
+  }
+
+  rd_c_table = node;          //Update the global hashtable upon resize()
+  rd_c_size = newsize;
+  rd_c_threshold = newsize * rd_c_loadfactor;
+  mask=rd_c_mask = (newsize << 4)-1;
+
+  for(i = 0; i < oldsize; i++) {                        //Outer loop for each bin in hash table
+    curr = &ptr[i];
+    isfirst = 1;
+    do {                      //Inner loop to go through linked lists
+      void * key;
+      rdchashlistnode_t *tmp,*next;
+
+      if ((key=curr->key) == 0) {             //Exit inner loop if there the first element is 0
+       break;                  //key = val =0 for element if not present within the hash table
+      }
+      index = (((unsigned INTPTR)key) & mask) >>4;
+      tmp=&node[index];
+      next = curr->next;
+      // Insert into the new table
+      if(tmp->key == 0) {
+       tmp->key = key;
+       tmp->version = curr->version;
+       tmp->lnext=rd_c_list;
+       rd_c_list=tmp;
+      } /*
+          NOTE:  Add this case if you change this...
+          This case currently never happens because of the way things rehash....
+          else if (isfirst) {
+          chashlistnode_t *newnode= calloc(1, sizeof(chashlistnode_t));
+          newnode->key = curr->key;
+          newnode->val = curr->val;
+          newnode->next = tmp->next;
+          tmp->next=newnode;
+          } */
+      else {
+       curr->next=tmp->next;
+       tmp->next=curr;
+       curr->lnext=rd_c_list;
+       rd_c_list=curr;
+      }
+
+      isfirst = 0;
+      curr = next;
+    } while(curr!=NULL);
+  }
+
+  free(ptr);            //Free the memory of the old hash table
+  return 0;
+}
+
+//Delete the entire hash table
+void rd_t_chashDelete() {
+  int i;
+  rdcliststruct_t *ptr=rd_c_structs;
+  while(ptr!=NULL) {
+    rdcliststruct_t *next=ptr->next;
+    free(ptr);
+    ptr=next;
+  }
+  free(rd_c_table);
+  rd_c_table=NULL;
+  rd_c_structs=NULL;
+  rd_c_list=NULL;
+}
+#endif
+
 #ifdef DELAYCOMP
 __thread chashlistnode_t *dc_c_table;
 __thread chashlistnode_t *dc_c_list;
@@ -48,7 +239,7 @@ void dc_t_chashreset() {
       chashlistnode_t *next=tmpptr->lnext;
       if (tmpptr>=ptr&&tmpptr<top) {
        //zero in list
-       tmpptr->key=0;
+       tmpptr->key=NULL;
        tmpptr->next=NULL;
       }
       tmpptr=next;
@@ -247,7 +438,7 @@ void t_chashreset() {
       chashlistnode_t *next=tmpptr->lnext;
       if (tmpptr>=ptr&&tmpptr<top) {
        //zero in list
-       tmpptr->key=0;
+       tmpptr->key=NULL;
        tmpptr->next=NULL;
       }
       tmpptr=next;
index cecb80516f778d5371bf84f479c5ac6a68901416..378f1c98219320a3ab4951f44873d451c7786165 100644 (file)
@@ -17,7 +17,6 @@
 
 #define INLINE    inline __attribute__((always_inline))
 
-
 typedef struct chashlistnode {
   void * key;
   void * val;     //this can be cast to another type or used to point to a larger structure
@@ -49,7 +48,6 @@ unsigned int t_chashResize(unsigned int newsize);
 void t_chashDelete();
 void t_chashreset();
 
-
 extern __thread chashlistnode_t *c_table;
 extern __thread chashlistnode_t *c_list;
 extern __thread unsigned int c_size;
@@ -59,6 +57,38 @@ extern __thread unsigned int c_threshold;
 extern __thread double c_loadfactor;
 extern __thread cliststruct_t *c_structs;
 
+#ifdef READSET
+
+typedef struct rdchashlistnode {
+  void * key;
+  unsigned int version;
+  struct rdchashlistnode *next;
+  struct rdchashlistnode *lnext;
+} rdchashlistnode_t;
+
+typedef struct rdclist {
+  struct rdchashlistnode array[NUMCLIST];
+  int num;
+  struct rdclist *next;
+} rdcliststruct_t;
+
+
+extern __thread rdchashlistnode_t *rd_c_table;
+extern __thread rdchashlistnode_t *rd_c_list;
+extern __thread unsigned int rd_c_size;
+extern __thread unsigned INTPTR rd_c_mask;
+extern __thread unsigned int rd_c_numelements;
+extern __thread unsigned int rd_c_threshold;
+extern __thread double rd_c_loadfactor;
+extern __thread rdcliststruct_t *rd_c_structs;
+
+void rd_t_chashCreate(unsigned int size, double loadfactor);
+void rd_t_chashInsertOnce(void * key, unsigned int val);
+unsigned int rd_t_chashResize(unsigned int newsize);
+void rd_t_chashDelete();
+void rd_t_chashreset();
+#endif
+
 #ifdef DELAYCOMP
 extern __thread chashlistnode_t *dc_c_table;
 extern __thread chashlistnode_t *dc_c_list;
@@ -76,5 +106,4 @@ unsigned int dc_t_chashResize(unsigned int newsize);
 void dc_t_chashDelete();
 void dc_t_chashreset();
 #endif
-
 #endif
index 4df4dc435bcb38370528444022ca0d579a1fcf2c..e140144b3ac774f7260b8b8978b41d7fc603858e 100644 (file)
@@ -33,12 +33,60 @@ void display(threadlist_t *head) {
     while(head != NULL) {
       ptr = head;
       printf("The threadid waiting is = %d\n", ptr->threadid);
-      printf("The mid on which thread present = %d\n", ptr->mid);
+      printf("The mid on which thread present = %s\n", midtoIPString(ptr->mid));
       head = ptr->next;
     }
   }
 }
 
+/* counts the number of nodes */
+unsigned int getListSize(threadlist_t* head)
+{
+  unsigned int size =0;
+
+  while(head) {
+    size++;
+    head = head->next;
+  }
+  return size;
+}
+
+/* This function creates an array of threadlist_t */
+/* returns number of threadlist_t and need pointer of threadlist to be returned */
+unsigned int convertToArray(threadlist_t* head,threadlist_t ** tPtr)
+{
+  int size = 0;
+  threadlist_t* walker;
+  threadlist_t* ptr;
+  int i = 0;
+  
+  /* counting number of nodes */
+  walker = head;
+  while(walker) {
+    walker = walker->next;
+    size++;
+  }
+
+  /* creates array and duplicate it as array */
+  if((ptr = (threadlist_t*)calloc(size,sizeof(threadlist_t)))== NULL) {
+    printf("%s -> calloc error in %s\n",__func__,__FILE__);
+    exit(0);
+  }
+
+  walker = head;
+  *tPtr = ptr;
+  i =0;
+
+  while(walker) {
+    ptr[i++] = *walker;
+      
+    walker = walker->next;
+  }
+
+  return size;
+}
+
+
 /* This function creates a new hash table that stores a mapping between the threadid and
  * a pointer to the thread notify data */
 unsigned int notifyhashCreate(unsigned int size, float loadfactor) {
index 91f9c57d2b4d567632734cde2ab7acaab93d2918..26785294432201882ab02f23eb86fde3bc3bb03e 100644 (file)
@@ -40,6 +40,8 @@ typedef struct notifyhashtable {
 } notifyhashtable_t;
 
 threadlist_t *insNode(threadlist_t *head, unsigned int threadid, unsigned int mid); //Inserts nodes for one object that
+unsigned int getListSize(threadlist_t * head);
+unsigned int convertToArray(threadlist_t* head,threadlist_t **tPtr);
 //needs to send notification to threads waiting on it
 void display(threadlist_t *head); // Displays linked list of nodes for one object
 unsigned int notifyhashCreate(unsigned int size, float loadfactor); //returns 1 if hashtable creation is not successful
diff --git a/Robust/src/Runtime/DSTM/interface_recovery/tlookup.c b/Robust/src/Runtime/DSTM/interface_recovery/tlookup.c
new file mode 100644 (file)
index 0000000..06666d6
--- /dev/null
@@ -0,0 +1,188 @@
+#include "tlookup.h"
+
+thashtable_t tlookup;           //Global Hash table
+
+// Creates a hash table with size and an array of lhashlistnode_t
+unsigned int thashCreate(unsigned int size, float loadfactor) {
+  thashlistnode_t *nodes;
+  int i;
+
+  // Allocate space for the hash table
+  if((nodes = calloc(size, sizeof(thashlistnode_t))) == NULL) {
+    printf("Calloc error %s %d\n", __FILE__, __LINE__);
+    return 1;
+  }
+
+  tlookup.table = nodes;
+  tlookup.size = size;
+  tlookup.numelements = 0;       // Initial number of elements in the hash
+  tlookup.loadfactor = loadfactor;
+  //Initialize the pthread_mutex variable
+  pthread_mutex_init(&tlookup.locktable, NULL);
+  return 0;
+}
+
+// Assign to transids to bins inside hash table
+unsigned int thashFunction(unsigned int transid) {
+  return( transid % (tlookup.size));
+}
+
+// Insert transid and decision mapping into the hash table
+unsigned int thashInsert(unsigned int transid, char decision) {
+  unsigned int newsize;
+  int index;
+  thashlistnode_t *ptr, *node;
+
+  if (tlookup.numelements > (tlookup.loadfactor * tlookup.size)) {
+    //Resize Table
+    newsize = 2 * tlookup.size + 1;
+    pthread_mutex_lock(&tlookup.locktable);
+    thashResize(newsize);
+    pthread_mutex_unlock(&tlookup.locktable);
+  }
+
+  ptr = tlookup.table;
+  tlookup.numelements++;
+
+  index = thashFunction(transid);
+#ifdef DEBUG
+  printf("DEBUG(insert) transid = %d, decision  = %d, index = %d\n",transid, decision, index);
+#endif
+  pthread_mutex_lock(&tlookup.locktable);
+  if(ptr[index].next == NULL && ptr[index].transid == 0) {          // Insert at the first position in the hashtable
+    ptr[index].transid = transid;
+    ptr[index].decision = decision;
+  } else {                              // Insert in the linked list
+    if ((node = calloc(1, sizeof(thashlistnode_t))) == NULL) {
+      printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+      pthread_mutex_unlock(&tlookup.locktable);
+      return 1;
+    }
+    node->transid = transid;
+    node->decision = decision;
+    node->next = ptr[index].next;
+    ptr[index].next = node;
+  }
+
+  pthread_mutex_unlock(&tlookup.locktable);
+  return 0;
+}
+
+// Return decision for a given transid in the hash table
+char thashSearch(unsigned int transid) {
+  int index;
+  thashlistnode_t *ptr, *node;
+
+  ptr = tlookup.table;          // Address of the beginning of hash table
+  index = thashFunction(transid);
+  node = &ptr[index];
+  pthread_mutex_lock(&tlookup.locktable);
+  while(node != NULL) {
+    if(node->transid == transid) {
+      pthread_mutex_unlock(&tlookup.locktable);
+      return node->decision;
+    }
+    node = node->next;
+  }
+  pthread_mutex_unlock(&tlookup.locktable);
+  return 0;
+}
+
+// Remove an entry from the hash table
+unsigned int thashRemove(unsigned int transid) {
+  int index;
+  thashlistnode_t *curr, *prev;
+  thashlistnode_t *ptr, *node;
+
+  ptr = tlookup.table;
+  index = thashFunction(transid);
+  curr = &ptr[index];
+
+  pthread_mutex_lock(&tlookup.locktable);
+  for (; curr != NULL; curr = curr->next) {
+    if (curr->transid == transid) {                     // Find a match in the hash table
+      tlookup.numelements--;                    // Decrement the number of elements in the global hashtable
+      if ((curr == &ptr[index]) && (curr->next == NULL)) {                    // Delete the first item inside the hashtable with no linked list of lhashlistnode_t
+       curr->transid = 0;
+       curr->decision = 0;
+      } else if ((curr == &ptr[index]) && (curr->next != NULL)) {                   //Delete the first item with a linked list of lhashlistnode_t  connected
+       curr->transid = curr->next->transid;
+       curr->decision = curr->next->decision;
+       node = curr->next;
+       curr->next = curr->next->next;
+       free(node);
+      } else {                                                                  // Regular delete from linked listed
+       prev->next = curr->next;
+       free(curr);
+      }
+      pthread_mutex_unlock(&tlookup.locktable);
+      return 0;
+    }
+    prev = curr;
+  }
+  pthread_mutex_unlock(&tlookup.locktable);
+  return 1;
+}
+
+// Resize table
+unsigned int thashResize(unsigned int newsize) {
+  thashlistnode_t *node, *ptr, *curr, *next;            // curr and next keep track of the current and the next lhashlistnodes in a linked list
+  unsigned int oldsize;
+  int isfirst;          // Keeps track of the first element in the lhashlistnode_t for each bin in hashtable
+  int i,index;
+  thashlistnode_t *newnode;
+
+  ptr = tlookup.table;
+  oldsize = tlookup.size;
+
+  if((node = calloc(newsize, sizeof(thashlistnode_t))) == NULL) {
+    printf("Calloc error %s %d\n", __FILE__, __LINE__);
+    return 1;
+  }
+
+  tlookup.table = node;                 //Update the global hashtable upon resize()
+  tlookup.size = newsize;
+  tlookup.numelements = 0;
+
+  for(i = 0; i < oldsize; i++) {                        //Outer loop for each bin in hash table
+    curr = &ptr[i];
+    isfirst = 1;
+
+    while (curr != NULL) {                              //Inner loop to go through linked lists
+      if (curr->transid == 0) {                             //Exit inner loop if there the first element for a given bin/index is NULL
+       break;                                          //transid = decision =0 for element if not present within the hash table
+      }
+      next = curr->next;
+      index = thashFunction(curr->transid);
+      // Insert into the new table
+      if(tlookup.table[index].next == NULL && tlookup.table[index].transid == 0) {
+        tlookup.table[index].transid = curr->transid;
+       tlookup.table[index].decision = curr->decision;
+       tlookup.numelements++;
+      } else {
+        if((newnode = calloc(1, sizeof(thashlistnode_t))) == NULL) {
+           printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+           return 1;
+         }
+               newnode->transid = curr->transid;
+         newnode->decision = curr->decision;
+       newnode->next = tlookup.table[index].next;
+       tlookup.table[index].next = newnode;
+         tlookup.numelements++;
+      }
+
+      //free the linked list of lhashlistnode_t if not the first element in the hash table
+      if (isfirst != 1) {
+         free(curr);
+      } 
+
+      isfirst = 0;
+      curr = next;
+    }
+  }
+
+  free(ptr);                    //Free the memory of the old hash table
+  return 0;
+}
+
+
diff --git a/Robust/src/Runtime/DSTM/interface_recovery/tlookup.h b/Robust/src/Runtime/DSTM/interface_recovery/tlookup.h
new file mode 100644 (file)
index 0000000..b2d4f98
--- /dev/null
@@ -0,0 +1,39 @@
+#ifndef _TTLOOKUP_H_
+#define _TTLOOKUP_H_
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <pthread.h>
+
+#define SIMPLE_TTLOOKUP
+
+#define LOADFACTOR 0.5
+#define THASH_SIZE 10000
+
+typedef struct thashlistnode {
+  unsigned int transid;
+  char decision;
+  struct thashlistnode *next;
+} thashlistnode_t;
+
+typedef struct thashtable {
+  thashlistnode_t *table;       // points to beginning of hash table
+  unsigned int size;
+  unsigned int numelements;
+  float loadfactor;
+  pthread_mutex_t locktable;
+} thashtable_t;
+
+//returns 0 for success and 1 for failure
+unsigned int thashCreate(unsigned int size, float loadfactor);
+//returns 0 for success and 1 for failure
+unsigned int thashInsert(unsigned int transid, char decision);
+//returns mid, 0 if not found
+char thashSearch(unsigned int transid);
+//returns 0 for success and 1 for failure
+unsigned int thashRemove(unsigned int transid);
+
+//helper functions
+unsigned int thashResize(unsigned int newsize);
+unsigned int thashFunction(unsigned int transid);
+#endif
index 05937ffad83a9f307b7ebf563e9359d4de1fffea..614e7b8a6f881cfade7e4145fc9b4f02df1f5735 100644 (file)
@@ -8,7 +8,7 @@
 #define TRANS_SOFT_ABORT    12
 #define TRANS_ABORT         13
 #define TRANS_COMMIT        14
-
+#define TRANS_ABORT_RETRY         15
 
 /* ========================
  * Library header files
@@ -107,6 +107,19 @@ typedef struct objheader {
           } while(1); \
         }}
 
+#define TRANSREADRD(x,y) { \
+    void * inputvalue; \
+    if ((inputvalue=y)==NULL) x=NULL;\
+         else { \
+           chashlistnode_t * cnodetmp=&c_table[(((unsigned INTPTR)inputvalue)&c_mask)>>4]; \
+           do { \
+             if (cnodetmp->key==inputvalue) {x=cnodetmp->val; break;} \
+             cnodetmp=cnodetmp->next; \
+             if (cnodetmp==NULL) {if (((struct ___Object___*)inputvalue)->___objstatus___&NEW) {x=inputvalue; break;} else \
+                                   {x=inputvalue;rd_t_chashInsertOnce(inputvalue, ((objheader_t *)inputvalue)[-1].version); break;}} \
+          } while(1); \
+        }}
+
 /* =================================
  * Data structures
  * =================================
@@ -137,6 +150,15 @@ extern __thread threadrec_t *trec;
 extern __thread struct objlist * lockedobjs;
 extern objlockstate_t *objlockscope;
 pthread_mutex_t lockedobjstore;
+
+typedef struct objtypestat {
+  int numabort;         
+  int numaccess;
+  int numtrans;    //num of transactions that accessed this object type and aborted
+} objtypestat_t;
+
+/* Variables for probability model */
+#define FACTOR 3
 #endif
 
 
@@ -152,7 +174,7 @@ extern int nSoftAbortCommit;
 #endif
 
 #ifdef STMSTATS
-extern int typesCausingAbort[];
+extern objtypestat_t typesCausingAbort[];
 #endif
 
 
@@ -169,6 +191,9 @@ objheader_t *transCreateObj(void * ptr, unsigned int size);
 unsigned int getNewOID(void);
 void *objstrAlloc(unsigned int size);
 __attribute__((pure)) void *transRead(void *, void *);
+#ifdef READSET
+__attribute__((pure)) void *transReadOnly(void *);
+#endif
 #ifdef DELAYCOMP
 int transCommit(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params);
 int traverseCache(void (*commitmethod)(void *, void *, void *), void * primitives, void * locals, void * params);
@@ -184,9 +209,9 @@ int altalttraverseCache();
 void transAbortProcess(void **, int);
 void randomdelay(int);
 #if defined(STMSTATS)||defined(SOFTABORT)
-int getTotalAbortCount(int, int, void *, int, void*, int*);
-int getTotalAbortCount2(void *, int, void *, int *);
-int getReadAbortCount(int, int, void*, int*);
+int getTotalAbortCount(int, int, void *, int, void*, int*, int*, int, objheader_t*, int*);
+int getTotalAbortCount2(void *, int, void *, int *, int*, int, objheader_t*, int*);
+int getReadAbortCount(int, int, void*, int*, int*, int, objheader_t*, int*);
 #endif
 #ifdef STMSTATS
 objheader_t * needLock(objheader_t *, void *);
index 9c4dc88e290ad51cae6f8a889c37caadc9003a29..771316bb2d60f4a1ac651bc1a9880a015bb09c1b 100644 (file)
@@ -22,7 +22,7 @@
 #include <unistd.h>
 #include <signal.h>
 #include <sys/select.h>
-#define WAIT_TIME 3
+#include "tlookup.h"
 #endif
 
 #define NUM_THREADS 1
@@ -53,6 +53,9 @@ unsigned int *hostIpAddrs;
 int sizeOfHostArray;
 int numHostsInSystem;
 int myIndexInHostArray;
+int waitThreadMid;
+unsigned int waitThreadID; 
+
 unsigned int oidsPerBlock;
 unsigned int oidMin;
 unsigned int oidMax;
@@ -77,6 +80,7 @@ int bytesSent = 0;
 int bytesRecv = 0;
 int totalObjSize = 0;
 
+#ifdef RECOVERY
 /***********************************
  * Global variables for Duplication
  ***********************************/
@@ -85,19 +89,15 @@ int liveHostsValid;
 int numLiveHostsInSystem;      
 int flipBit;                                                           // Used to distribute requests between primary and backup evenly
 unsigned int *locateObjHosts;
-__thread int timeoutFlag;
-extern int leaderFixing;
-extern pthread_mutex_t leaderFixing_mutex;
-extern pthread_mutex_t liveHosts_mutex;
+#endif
 
-unsigned int liveTransactions[25];
+int transRetryFlag;
 unsigned int transIDMax;
 unsigned int transIDMin;
 unsigned int transIDIndex;
-#ifdef DEBUG
 char ip[16];
-#endif
 
+#ifdef RECOVERY
 /******************************
  * Global variables for Paxos
  ******************************/
@@ -109,6 +109,7 @@ unsigned int leader;
 unsigned int origleader;
 unsigned int temp_v_a;
 int paxosRound;
+#endif
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles();
@@ -117,165 +118,177 @@ plistnode_t *sortPiles(plistnode_t *pileptr);
 /*******************************
 * Send and Recv function calls
 *******************************/
-void send_data(int fd, void *buf, int buflen) {
-#ifdef DEBUG
-//     printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
-#endif
-       char *buffer = (char *)(buf);
+int send_data(int fd, void *buf, int buflen) {
+  char *buffer = (char *)(buf);
   int size = buflen;
   int numbytes;
+
   while (size > 0) {
-               numbytes = send(fd, buffer, size, 0);
-               bytesSent = bytesSent + numbytes;
+
+#ifdef GDBDEBUG
+GDBSEND1:
+#endif
+    numbytes = send(fd, buffer, size, 0);
+
+    if( numbytes>0) {
+      bytesSent += numbytes;
+      size -= numbytes;
+    }
 #ifdef RECOVERY
-#ifdef DEBUG
-//             printf("%s-> numbytes: %d\n", __func__, numbytes);
+    else if( numbytes < 0) {
+      // Receive returned an error.
+      // Analyze underlying cause
+#ifndef DEBUG
+      printf("%s -> fd : %d errno = %d %s\n",__func__, fd, errno,strerror(errno));
+      fflush(stdout);
 #endif
-               if(errno == ECONNRESET) {       // EINT/EPIPE??; Connection reset, possible disconnected machine
+      if(errno == ECONNRESET || errno == EAGAIN || errno == EWOULDBLOCK) {
+        // machine has failed
+        //
+        // if we see EAGAIN w/o failures, we should record the time
+        // when we start send and finish send see if it is longer
+        // than our threshold
+        //
 #ifdef DEBUG
-                       printf("%s-> errno = ECONNRESET; connection reset\n", __func__);
-                       printf("***SETTING TIMEOUTFLAG***\n");
+        printf("%s -> EAGAIN : %s\n",__func__,(errno == EAGAIN)?"TRUE":"FALSE");
 #endif
-                       errno = 0;
-                       timeoutFlag = 1;
-                       return;
-               }
-               else if(errno == EAGAIN || errno == EWOULDBLOCK) {
+        return -1;
+      } else {
+#ifdef GDBDEBUG
+      if(errno == 4)
+        goto GDBSEND1;    
+#endif
+
+
 #ifdef DEBUG
-                       printf("%s-> errno = EAGAIN|EWOULDBLOCK; socket timeout\n", __func__);  
-                       printf("***SETTING TIMEOUTFLAG***\n");
+        printf("%s -> Unexpected ERROR!\n",__func__);
 #endif
-                       errno = 0;
-                       timeoutFlag = 1;
-                       return;
-               }
-               else if(numbytes == -1) {
+        return -2;
+      }
+    }
+    else{
+      // Case : numbytes == 0
+      // // machine has failed -- this case probably doesn't occur in reality
+      //
+
+
+      
 #ifdef DEBUG
-                       printf("%s-> numbytes = -1; socket timeout\n", __func__);       
-                       printf("***SETTING TIMEOUTFLAG***\n");
+      printf("%s -> SHOULD NOT BE HERE\n",__func__);
 #endif
-                       timeoutFlag = 1;
-                       return;
-               }
-#else
-               if (numbytes == -1) {
-                       perror("send");
-                       exit(0);
-               }
+      return -1;
+    }
 #endif
-               buffer += numbytes;
-               size -= numbytes;
-       }
+  } // close while loop
 #ifdef DEBUG
-//     printf("%s-> Exiting\n", __func__);
+  printf("%s-> Exiting\n", __func__);
 #endif
+
+  return 0; // completed sending data
 }
 
-void recv_data(int fd, void *buf, int buflen) {
-#ifdef DEBUG
-//     printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
-#endif
-       char *buffer = (char *)(buf);
-       int size = buflen;
+//Returns negative value if receive cannot be completed because of
+//timeout or machine failure
+
+int recv_data(int fd, void *buf, int buflen) {
+  char *buffer = (char *)(buf);
+  int size = buflen;
   int numbytes;
-       while (size > 0) {
-       numbytes = recv(fd, buffer, size, 0);
-    bytesRecv = bytesRecv + numbytes;
+  int trycounter = 0;
+  
+  while (size > 0) {
+#ifdef GDBDEBUG
+GDBRECV1:
+#endif
+    numbytes = recv(fd, buffer, size, 0);
+    
+    if (numbytes>0) {
+      buffer += numbytes;
+      size -= numbytes;
+    }
 #ifdef RECOVERY
-#ifdef DEBUG
-//             printf("%s-> numbytes: %d\n", __func__, numbytes);
-#endif
-               if(errno == ECONNRESET) {
-#ifdef DEBUG
-                       printf("%s-> errno = ECONNRESET; connection reset\n", __func__);
-                       printf("***SETTING TIMEOUTFLAG***\n");
+    else if (numbytes<0){ 
+      //Receive returned an error.
+      //Analyze underlying cause
+#ifdef DEBUG
+      printf("%s-> fd : %d errno = %d %s\n", __func__, fd, errno, strerror(errno));    
+#endif
+      if(errno == ECONNRESET || errno == EAGAIN || errno == EWOULDBLOCK) {
+        //machine has failed
+        //if we see EAGAIN w/o failures, we should record the time
+       //when we start read and finish read and see if it is longer
+       //than our threshold
+#ifdef DEBUG
+        printf("%s -> EAGAIN : %s\n",__func__,(errno == EAGAIN)?"TRUE":"FALSE");
+#endif
+        if(errno == EAGAIN) {
+          if(trycounter < 5) {
+#ifndef DEBUG
+            printf("%s -> TRYcounter increases\n",__func__);
+#endif
+            trycounter++;
+            continue;
+          }
+          else
+            return -1;
+        }
+        return -1;
+      } else {
+#ifdef GDBDEBUG
+        if(errno == 4)
+          goto GDBRECV1;
 #endif
-                       errno = 0;
-                       timeoutFlag = 1;
-                       return;
-               }
-               else if(errno == EAGAIN || errno == EWOULDBLOCK) {
+
 #ifdef DEBUG
-                       printf("%s-> errno = EAGAIN|EWOULDBLOCK; socket timeout\n", __func__);  
-                       printf("***SETTING TIMEOUTFLAG***\n");
+        printf("%s -> Unexpected ERROR!\n",__func__);
+        printf("%s-> errno = %d %s\n", __func__, errno, strerror(errno));
 #endif
-                       errno = 0;
-                       timeoutFlag = 1;
-                       return;
-               }
-               else if(numbytes == -1) {
+       return -2;
+      }
+    } else {
+      //Case: numbytes==0
+      //machine has failed -- this case probably doesn't occur in reality
+      //
 #ifdef DEBUG
-                       printf("%s-> numbytes = -1; socket timeout\n", __func__);       
-                       printf("***SETTING TIMEOUTFLAG***\n");
+      printf("%s -> SHOULD NOT BE HERE\n",__func__);
 #endif
-                       timeoutFlag = 1;
-                       return;
-               }
-#else
-               if (numbytes == -1) {
-      perror("recv");
-      exit(0);
+      return -1;
     }
 #endif
-    buffer += numbytes;
-               size -= numbytes;
-       }
-#ifdef DEBUG
-//     printf("%s-> Exiting\n", __func__);
-#endif
-}
-
-void recv_data_block(int fd, void *buf, int buflen) {
-#ifdef DEBUG
-       printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
-#endif
-       char *buffer = (char *)(buf);
-       int size = buflen;
-       int numbytes;
-       while (size > 0) {
-               numbytes = recv(fd, buffer, size, 0);
-#ifdef DEBUG
-               printf("%s-> numbytes: %d\n", __func__, numbytes);
-#endif
-               if(errno == EAGAIN || errno == EWOULDBLOCK) {
-                       errno = 0;
-               }
-               if(numbytes != -1) {
-                       bytesRecv = bytesRecv + numbytes;
-                       buffer += numbytes;
-                       size -= numbytes;
-               }
-       }
+  } //close while loop
 #ifdef DEBUG
-       printf("%s-> Exiting\n", __func__);
+  printf("%s -> fd = %d Exiting\n",__func__,fd);
 #endif
+  return 0; // got all the data
 }
 
 int recv_data_errorcode(int fd, void *buf, int buflen) {
 #ifdef DEBUG
-       printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
+  printf("%s-> Start; fd:%d, buflen:%d\n", __func__, fd, buflen);
 #endif
   char *buffer = (char *)(buf);
   int size = buflen;
-       int numbytes;
-       while (size > 0) {
-               numbytes = recv(fd, buffer, size, 0);
+  int numbytes;
+  while (size > 0) {
+    numbytes = recv(fd, buffer, size, 0);
 #ifdef DEBUG
-               printf("%s-> numbytes: %d\n", __func__, numbytes);
+    printf("%s-> numbytes: %d\n", __func__, numbytes);
 #endif
-               if (numbytes==0)
-                       return 0;
-               else if (numbytes == -1) {
-                       perror("recv_data_errorcode");
-                       return -1;
-               }
-               buffer += numbytes;
-               size -= numbytes;
-       }
+    if (numbytes==0)
+      return 0;
+    else if (numbytes == -1) {
+      printf("%s -> ERROR NUMBER = %d %s\n",__func__,errno,strerror(errno));
+      perror("recv_data_errorcode");
+      return -1;
+    }
+    
+    buffer += numbytes;
+    size -= numbytes;
+  }
 #ifdef DEBUG
-       printf("%s-> Exiting\n", __func__);
+  printf("%s-> Exiting\n", __func__);
 #endif
-       return 1;
+  return 1;
 }
 
 void printhex(unsigned char *ptr, int numBytes) {
@@ -383,7 +396,7 @@ int dstmStartup(const char * option) {
                updateLiveHosts();
                setLocateObjHosts();
                updateLiveHostsCommit();
-               leader = paxos();
+               paxos();
                if(!allHostsLive()) {
                        printf("Not all hosts live. Exiting.\n");
                        exit(-1);
@@ -505,7 +518,6 @@ void transStart() {
 #endif
 }
 
-// Search for an address for a given oid                                                                               
 /*#define INLINE    inline __attribute__((always_inline))
 
 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
@@ -665,29 +677,29 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
     addtransaction(oid);
 #endif
 
-  if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
+    if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
 #ifdef DEBUG
-               printf("%s-> Grab from this machine\n", __func__);
+                 printf("%s-> Grab from this machine\n", __func__);
 #endif
 #ifdef TRANSSTATS
-    nmhashSearch++;
+      nmhashSearch++;
 #endif
-    /* Look up in machine lookup table  and copy  into cache*/
-    GETSIZE(size, objheader);
-    size += sizeof(objheader_t);
-    objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
-    memcpy(objcopy, objheader, size);
-    /* Insert into cache's lookup table */
-    STATUS(objcopy)=0;
-    t_chashInsert(OID(objheader), objcopy);
+      /* Look up in machine lookup table  and copy  into cache*/
+      GETSIZE(size, objheader);
+      size += sizeof(objheader_t);
+      objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
+      memcpy(objcopy, objheader, size);
+      /* Insert into cache's lookup table */
+      STATUS(objcopy)=0;
+      t_chashInsert(OID(objheader), objcopy);
 #ifdef COMPILER
-    return &objcopy[1];
+      return &objcopy[1];
 #else
-    return objcopy;
+      return objcopy;
 #endif
-  } else {
+    } else {
 #ifdef CACHE
-    , TYPE(header)if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
+      , TYPE(header)if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
 #ifdef TRANSSTATS
       nprehashSearch++;
 #endif
@@ -703,44 +715,60 @@ __attribute__((pure)) objheader_t *transRead2(unsigned int oid) {
 #else
                        return objcopy;
 #endif
-               }
+       } 
 #endif
                /* Get the object from the remote location */
+
 #ifdef DEBUG
-                       printf("%s-> Grab from remote machine\n", __func__);
+       printf("%s-> Grab from remote machine\n", __func__);
 #endif
 #ifdef RECOVERY
-               //while(!liveHostsValid) {
-               //}
-               /*if(!liveHostsValid){
-                       sleep(WAIT_TIME);
-               }*/
-               unsigned int mindex = findHost(lhashSearch(oid));
-               machinenumber = locateObjHosts[2*mindex+flipBit];
-               flipBit ^= 1;
-               printf("mindex:%d, oid:%d, machinenumber:%s\n", mindex, oid, midtoIPString(machinenumber));
+    transRetryFlag = 0;
+    unsigned int mindex = findHost(lhashSearch(oid));
+    machinenumber = locateObjHosts[2*mindex+flipBit];
+  
+    if(numLiveHostsInSystem > 1)
+      flipBit ^= 1;
+    else
+      flipBit = 0;
+
+#ifdef DEBUG
+    printf("mindex:%d, oid:%d, machinenumber:%s\n", mindex, oid, midtoIPString(machinenumber));
+#endif
 #else
-               if((machinenumber = lhashSearch(oid)) == 0) {
-                       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
-                       return NULL;
-               }
+    if((machinenumber = lhashSearch(oid)) == 0) {
+      printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
+           return NULL;
+       }
 #endif
-               objcopy = getRemoteObj(machinenumber, oid);
+         objcopy = getRemoteObj(machinenumber, oid);
+#ifdef RECOVERY
+    if(transRetryFlag) {
+      restoreDuplicationState(machinenumber);
+#ifdef DEBUG
+      printf("%s -> Recall transRead2\n",__func__);
+#endif
+      return transRead2(oid);
+    }
+#endif
+   }
 
-               if(objcopy == NULL) {
-                       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
-                       return NULL;
-               } else {
+  if(objcopy == NULL) {
+         printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
+               return NULL;
+       } else {
 #ifdef TRANSSTATS
-                       nRemoteSend++;
+         nRemoteSend++;
 #endif
 #ifdef COMPILER
-                       return &objcopy[1];
+               return &objcopy[1];
 #else
-                       return objcopy;
+               return objcopy;
+#endif
+       }
+#ifdef DEBUG
+  printf("%s -> Finished!!\n",__func__);
 #endif
-               }
-  }
 }
 
 /* This function creates objects in the transaction record */
@@ -776,37 +804,39 @@ plistnode_t *createPiles() {
   unsigned int size = c_size;
 
        for(i = 0; i < size ; i++) {
-               chashlistnode_t * curr = &ptr[i];
+    chashlistnode_t * curr = &ptr[i];
                /* Inner loop to traverse the linked list of the cache lookupTable */
-               while(curr != NULL) {
-                       //if the first bin in hash table is empty
-                       if(curr->key == 0)
-                               break;
-                       headeraddr=(objheader_t *) curr->val;
+    while(curr != NULL) {
+      //if the first bin in hash table is empty
+      if(curr->key == 0)
+        break;
+      headeraddr=(objheader_t *) curr->val;
 
 #if RECOVERY
-                       oid = OID(headeraddr);
+      oid = OID(headeraddr);
 #ifdef DEBUG
                        printf("%s-> oid:%u, version:%d, status:%d, type:%d\n", __func__, OID(headeraddr), headeraddr->version, STATUS(headeraddr), TYPE(headeraddr));
 
                        if (STATUS(headeraddr) & NEW) {  // new/local object
                                printf("%s-> new/local object\n", __func__);
                        } 
-                       else if ((mhashSearch(curr->key) != NULL)) {    //local/nonnew
-                               if(STATUS(headeraddr) & DIRTY) {        // modified
+      else if ((mhashSearch(curr->key) != NULL)) {     //local/nonnew
+        if(STATUS(headeraddr) & DIRTY) {       // modified
                                        printf("%s-> old/local/mod object\n", __func__);
                                }
                                else {  //read
                                        printf("%s-> old/local/read object\n", __func__);
                                }
-                       } else if ((machinenum = lhashSearch(curr->key)) != 0) { // remote/nonnew object
+                       } 
+      else if ((machinenum = lhashSearch(curr->key)) != 0) { // remote/nonnew object
                                if(STATUS(headeraddr) & DIRTY) {                //modified
                                        printf("%s-> remote/local/mod object\n", __func__);
                                }
                                else {  //read
                                        printf("%s-> remote/local/read object\n", __func__);
                                }
-                       } else {
+                       } 
+      else {
                                printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
                                return NULL;
                        }
@@ -830,16 +860,15 @@ plistnode_t *createPiles() {
                        if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
                                machinenum = myIpAddr;
                        } else if ((machinenum = lhashSearch(curr->key)) == 0) {
-                                       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
-                                       return NULL;
-                               }
-
-                       //Make machine groups
-                       pile = pInsert(pile, headeraddr, machinenum, c_numelements);
+        printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
+        return NULL;
+      }
+      //Make machine groups
+      pile = pInsert(pile, headeraddr, machinenum, c_numelements);
 #endif
-                       curr = curr->next;
-               }
-       }
+      curr = curr->next;
+    }
+  }
        return pile;
 }
 #else
@@ -886,17 +915,16 @@ plistnode_t *createPiles() {
 int transCommit() {
   unsigned int tot_bytes_mod, *listmid;
   plistnode_t *pile, *pile_ptr;
-  int trecvcount;
   char treplyretry; /* keeps track of the common response that needs to be sent */
   int firsttime=1;
   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
   char finalResponse;
+  int deadsd = -1;
+  int deadmid = -1;
+  unsigned int transID = getNewTransID();
 
-       int tmpTransIndex = (transIDIndex++)%25;
-       liveTransactions[tmpTransIndex] = getNewTransID();
-       
 #ifdef DEBUG
-       printf("%s-> Start, transID:%d\n", __func__, liveTransactions[tmpTransIndex]);
+  printf("%s -> Starts transCommit\n",__func__);
 #endif
 
 #ifdef ABORTREADERS
@@ -910,17 +938,12 @@ int transCommit() {
     t_chashDelete();
 #ifdef DEBUG
          printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
-         liveTransactions[tmpTransIndex] = 0;
 #endif
     return 1;
   }
 #endif
 
-
   do {
-    trecvcount = 0;
     treplyretry = 0;
 
     /* Look through all the objects in the transaction record and make piles
@@ -976,30 +999,19 @@ int transCommit() {
                        tosend[sockindex].oidcreated = pile->oidcreated;
                        int sd = 0;
                        if(pile->mid != myIpAddr) {
-#ifdef RECOVERY
                                if((sd = getSockWithLock(transRequestSockPool, pile->mid)) < 0) {
-#else 
-                               if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
-#endif
                                        printf("\ntransRequest(): socket create error\n");
                                        free(listmid);
                                        free(tosend);
 #ifdef DEBUG
                                        printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
-                                       liveTransactions[tmpTransIndex] = 0;
 #endif
                                        return 1;
                                }
                                socklist[sockindex] = sd;
                                /* Send bytes of data with TRANS_REQUEST control message */
                                send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
-                               /*if(timeoutFlag) {
-                                               printf("send_data: remote machine dead, line:%d\n", __LINE__);
-                                               timeoutFlag = 0;
-                                               exit(1);
-                                       }*/
+
                                /* Send list of machines involved in the transaction */
                                {
                                        int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
@@ -1020,9 +1032,6 @@ int transCommit() {
                                        free(tosend);
 #ifdef DEBUG
                                        printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
-                                       liveTransactions[tmpTransIndex] = 0;
 #endif
                                        return 1;
                                }
@@ -1038,9 +1047,6 @@ int transCommit() {
                                                free(tosend);
 #ifdef DEBUG
                                                printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
-                                               liveTransactions[tmpTransIndex] = 0;
 #endif
                                                return 1;
                                        }
@@ -1050,6 +1056,11 @@ int transCommit() {
                                        offset+=size;
                                }
                                send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+
+#ifdef RECOVERY
+        /* send transaction id, number of machine involved, machine ids */
+        send_data(sd, &transID, sizeof(unsigned int));
+#endif
                                free(modptr);
                        } else { //handle request locally
                                localReqsock = sockindex;
@@ -1064,19 +1075,15 @@ int transCommit() {
                printf("%s-> Finished sending transaction read/mod objects\n",__func__);
 #endif
                int i;
+
                for(i = 0; i < pilecount; i++) {
-                       printf("i:%d\n", i);
                        if(i == localReqsock)
                                continue;
                        int sd = socklist[i]; 
                        if(sd != 0) {
                                char control;
-                               recv_data(sd, &control, sizeof(char));
-                               /*if(timeoutFlag) {
-                                       printf("recv_data: remote machine dead, timeoutFlag:%d, timeoutFlag:%d, line:%d\n", timeoutFlag, timeoutFlag, __LINE__);
-                                       timeoutFlag = 0;
-                                       exit(1);
-                               }*/
+        int timeout;            // a variable to check if the connection is still alive. if it is -1, then need to transcommit again
+        timeout = recv_data(sd, &control, sizeof(char));
                                //Update common data structure with new ctrl msg
                                getReplyCtrl[i] = control;
                                /* Recv Objects if participant sends TRANS_DISAGREE */
@@ -1084,7 +1091,7 @@ int transCommit() {
 #ifdef CACHE
                                if(control == TRANS_DISAGREE) {
                                        int length;
-                                       recv_data(sd, &length, sizeof(int));
+                                       timeout = recv_data(sd, &length, sizeof(int));
                                        void *newAddr;
                                        pthread_mutex_lock(&prefetchcache_mutex);
                                        if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
@@ -1094,14 +1101,11 @@ int transCommit() {
                                                pthread_mutex_unlock(&prefetchcache_mutex);
 #ifdef DEBUG
                                                printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
-                                               liveTransactions[tmpTransIndex] = 0;
 #endif
                                                return 1;
                                        }
                                        pthread_mutex_unlock(&prefetchcache_mutex);
-                                       recv_data(sd, newAddr, length);
+                                       timeout = recv_data(sd, newAddr, length);
                                        int offset = 0;
                                        while(length != 0) {
                                                unsigned int oidToPrefetch;
@@ -1125,8 +1129,25 @@ int transCommit() {
                                        }
                                } //end of receiving objs
 #endif
+
+#ifdef RECOVERY
+        if(timeout < 0) {
+#ifdef DEBUG
+          printf("%s -> TIMEOUT!!!!!!!\n",__func__);
+#endif
+
+          deadmid = listmid[i];
+          deadsd = sd;
+#ifdef DEBUG
+          printf("%s -> Dead Machine ID : %s\n",__func__,midtoIPString(deadmid));  
+          printf("%s -> Dead SD         : %d\n",__func__,sd);
+#endif
+          getReplyCtrl[i] = TRANS_DISAGREE;
+        }
+#endif
                        }
                }
+
 #ifdef DEBUG
                printf("%s-> Decide final response now\n", __func__);
 #endif
@@ -1137,81 +1158,88 @@ int transCommit() {
                        free(listmid);
 #ifdef DEBUG
                        printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
-                       liveTransactions[tmpTransIndex] = 0;
 #endif
                        return 1;
                }
 #ifdef DEBUG
     printf("%s-> Final Response: %d\n", __func__, (int)finalResponse);
 #endif
+    
                /* Send responses to all machines */
                for(i = 0; i < pilecount; i++) {
                        int sd = socklist[i];
-                       if(sd != 0) {
+
+      if(sd != deadsd) {
+                       if(sd != 0) {
 #ifdef CACHE
-                               if(finalResponse == TRANS_COMMIT) {
-                                       int retval;
-                                       /* Update prefetch cache */
-                                       if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
-                                               printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-                                               free(tosend);
-                                               free(listmid);
-#ifdef DEBUG
-                                               printf("%s-> End, line:%d\n\n", __func__, __LINE__);
+                               if(finalResponse == TRANS_COMMIT) {
+                                       int retval;
+                                       /* Update prefetch cache */
+                                       if((retval = updatePrefetchCache(&(tosend[i]))) != 0) {
+                                               printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+                                                 free(tosend);
+                                               free(listmid);
+#ifdef DEBUG
+                                               printf("%s-> End, line:%d\n\n", __func__, __LINE__);
+#endif
+                                               return 1;
+                                       }
+
+                                       /* Invalidate objects in other machine cache */
+                                       if(tosend[i].f.nummod > 0) {
+                                               if((retval = invalidateObj(&(tosend[i]))) != 0) {
+                                                       printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+                                                       free(tosend);
+                                                       free(listmid);
+#ifdef DEBUG
+                                                       printf("%s-> End, line:%d\n\n", __func__, __LINE__);
+#endif
+                                                       return 1;
+                                               }
+                                       }
+#ifdef ABORTREADERS
+                                       removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+                                       removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
 #endif
-#ifdef RECOVERY
-                                               liveTransactions[tmpTransIndex] = 0;
+                                 }
+#ifdef ABORTREADERS
+                               else if (!treplyretry) {
+                                       removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+                                       removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+                               }
 #endif
-                                               return 1;
-                                       }
-
-
-                                       /* Invalidate objects in other machine cache */
-                                       if(tosend[i].f.nummod > 0) {
-                                               if((retval = invalidateObj(&(tosend[i]))) != 0) {
-                                                       printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-                                                       free(tosend);
-                                                       free(listmid);
+#endif
+          send_data(sd,&finalResponse,sizeof(char));
 #ifdef DEBUG
-                                                       printf("%s-> End, line:%d\n\n", __func__, __LINE__);
+          printf("%s -> Decision Sent to %s\n",__func__,midtoIPString(listmid[i]));
 #endif
+
+      } else {
+                               /* Complete local processing */
 #ifdef RECOVERY
-                                                       liveTransactions[tmpTransIndex] = 0;
+          thashInsert(transID,finalResponse);
 #endif
-                                                       return 1;
-                                               }
-                                       }
-#ifdef ABORTREADERS
-                                       removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
-                                       removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
-#endif
-                               }
+          doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
+
 #ifdef ABORTREADERS
-                               else if (!treplyretry) {
-                                       removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
-                                       removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
-                               }
-#endif
-#endif
-                               send_data(sd, &finalResponse, sizeof(char));
-                       } else {
-                               /* Complete local processing */
-                               doLocalProcess(finalResponse, &(tosend[i]), &transinfo);
+                                 if(finalResponse == TRANS_COMMIT) {
+                                         removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+                                       removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+                               } else if (!treplyretry) {
+                                       removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
+                                       removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
+                                 }
+#endif
+                       }
+      } else {
 #ifdef ABORTREADERS
-                               if(finalResponse == TRANS_COMMIT) {
-                                       removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
-                                       removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
-                               } else if (!treplyretry) {
-                                       removethistransaction(tosend[i].oidmod,tosend[i].f.nummod);
-                                       removethisreadtransaction(tosend[i].objread,tosend[i].f.numread);
-                               }
+        removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
+        removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
 #endif
-                       }
+      }
                }
 
-#ifdef RECOVERY
+
 #ifdef DEBUG
                printf("%s-> Free sockets\n", __func__);
 #endif
@@ -1220,7 +1248,7 @@ int transCommit() {
                          freeSockWithLock(transRequestSockPool, listmid[i], socklist[i]);      
                        }
                }
-#endif         
+
                        /* Free resources */
     free(tosend);
     free(listmid);
@@ -1233,11 +1261,10 @@ int transCommit() {
                        nSoftAbort++;
 #endif
                }
-               /* Retry trans commit procedure during soft_abort case */
-       } while (treplyretry);
+       } while (treplyretry && deadmid != -1);
 
        if(finalResponse == TRANS_ABORT) {
-               //printf("Aborting trans\n");
+
 #ifdef TRANSSTATS
                numTransAbort++;
 #endif
@@ -1248,7 +1275,16 @@ int transCommit() {
          printf("%s-> End, line:%d\n\n", __func__, __LINE__);
 #endif
 #ifdef RECOVERY
-         liveTransactions[tmpTransIndex] = 0;
+    if(deadmid != -1) { /* if deadmid is greater than or equal to 0, 
+                          then there is dead machine. */
+#ifdef DEBUG
+      printf("%s -> Dead machine Detected : %s\n",__func__,midtoIPString(deadmid));
+#endif
+      restoreDuplicationState(deadmid);
+#ifdef DEBUG
+      printf("%s -> Duplication completed\n",__func__);
+#endif
+    }
 #endif
     return TRANS_ABORT;
   } else if(finalResponse == TRANS_COMMIT) {
@@ -1260,9 +1296,6 @@ int transCommit() {
     t_chashDelete();
 #ifdef DEBUG
                                        printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
-                                       liveTransactions[tmpTransIndex] = 0;
 #endif
     return 0;
   } else {
@@ -1270,17 +1303,11 @@ int transCommit() {
     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
 #ifdef DEBUG
        printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
-       liveTransactions[tmpTransIndex] = 0;
 #endif
     exit(-1);
   }
 #ifdef DEBUG
        printf("%s-> End, line:%d\n\n", __func__, __LINE__);
-#endif
-#ifdef RECOVERY
-       liveTransactions[tmpTransIndex] = 0;
 #endif
   return 0;
 }
@@ -1336,15 +1363,22 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha
 
   /* Condition to send TRANS_AGREE */
   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
+#ifdef DEBUG
+    printf("%s -> TRANS_AGREE\n",__func__);
+#endif
     *getReplyCtrl = TRANS_AGREE;
   }
   /* Condition to send TRANS_SOFT_ABORT */
   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
+#ifdef DEBUG
+    printf("%s -> TRANS_SOFT_ABORT\n",__func__);
+#endif
     *getReplyCtrl = TRANS_SOFT_ABORT;
   }
 }
 
 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
+
   if(finalResponse == TRANS_ABORT) {
     if(transAbortProcess(transinfo) != 0) {
       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
@@ -1451,58 +1485,96 @@ void *getRemoteObj(unsigned int mnum, unsigned int oid) {
   objheader_t *h;
   void *objcopy = NULL;
 
-  int sd = getSock2(transReadSockPool, mnum);
-  char readrequest[sizeof(char)+sizeof(unsigned int)];
-  readrequest[0] = READ_REQUEST;
-  *((unsigned int *)(&readrequest[1])) = oid;
-  send_data(sd, readrequest, sizeof(readrequest));
+  int sd;
+  int flag;
 
+  if((sd = getSock2(transReadSockPool, mnum)) != -1) {
+    char readrequest[sizeof(char)+sizeof(unsigned int)];
+    readrequest[0] = READ_REQUEST;
+    *((unsigned int *)(&readrequest[1])) = oid;
+    send_data(sd, readrequest, sizeof(readrequest));
+  }
+  else {
+      printf("%s -> creating socket error\n",__func__);
+  }
+  
   /* Read response from the Participant */
-  recv_data(sd, &control, sizeof(char));
+  if(recv_data(sd, &control, sizeof(char)) < 0) {
+      transRetryFlag = 1;
+      return NULL;
+  }
 
   if (control==OBJECT_NOT_FOUND) {
     objcopy = NULL;
   } else if(control==OBJECT_FOUND) {
-    /* Read object if found into local cache */
-    recv_data(sd, &size, sizeof(int));
- objcopy = objstrAlloc(&t_cache, size);
-    recv_data(sd, objcopy, size);
-    STATUS(objcopy)=0;
-    /* Insert into cache's lookup table */
-    t_chashInsert(oid, objcopy);
-#ifdef TRANSSTATS
-    totalObjSize += size;
-#endif
-       }
+  
+  /* Read object if found into local cache */
 
-#ifdef RECOVERY
-       if( detectMachineFailure(mnum) ) { //check for timeouts
-               printf("looking for oid:%d\n", oid);
-               restoreDuplicationState(mnum);  // suspect machine failure, restore state
+  if(recv_data(sd, &size, sizeof(int)) < 0) {
+    transRetryFlag = 1;
+    return NULL;
+  }
 
-               objheader_t *temp;
-               temp = transRead2(oid);         // retry transRead
-#ifdef COMPILER
-               temp -= 1;              // return object w/ objheader
+  objcopy = objstrAlloc(&t_cache, size);
+
+  if(recv_data(sd, objcopy, size) < 0) {
+    transRetryFlag = 1;
+    return NULL;
+  }
+
+  STATUS(objcopy)=0;
+  
+  /* Insert into cache's lookup table */
+  t_chashInsert(oid, objcopy);
+#ifdef TRANSSTATS
+  totalObjSize += size;
 #endif
-               return (void *)temp;
        }
-#endif
        return objcopy;
 }
 
-int detectMachineFailure(unsigned int mid) {
-       if(timeoutFlag == 1) {
+#ifdef RECOVERY
+/* ask machines if they received decision */
+char receiveDecisionFromBackup(unsigned int transID,int nummid,unsigned int *listmid)
+{
+#ifdef DEBUG
+  printf("%s -> Entering\n",__func__);
+#endif
+
+  int sd; // socket id
+  int i;
+  char response;
+  
+  for(i = 0; i < nummid; i++) {
+    if((sd = getSock(transReadSockPool, listmid[i])) < 0) {
+      printf("%s -> socket Error!!\n");
+    }
+    else {
+      char control = ASK_COMMIT;
+
+      send_data(sd,&control, sizeof(char));
+      send_data(sd,&transID, sizeof(unsigned int));
+
+      // return -1 if it didn't receive the response
+      int timeout = recv_data(sd,&response, sizeof(char));
+
+
+      if(timeout == 0 || response > 0)
+        break;  // received response
+
+      // else check next machine
+      freeSock(transReadSockPool, listmid[i],sd);
+     }
+  }
 #ifdef DEBUG
-               printf("%s-> Suspect machine failure: [%s]\n", __func__, midtoIPString(mid));
+  printf("%s -> response : %d\n",__func__,response);
 #endif
-               timeoutFlag = 0;
-               return 1;
-       } 
-       else
-               return 0;
+
+  return (response==-1)?TRANS_ABORT:response;
 }
+#endif
 
+#ifdef RECOVERY
 void restoreDuplicationState(unsigned int deadHost) {
        int sd;
        char ctrl;
@@ -1511,11 +1583,12 @@ void restoreDuplicationState(unsigned int deadHost) {
                sleep(WAIT_TIME);
                return;
        }
+
        if(deadHost == leader)
                paxos();
        
 #ifdef DEBUG
-       printf("%s-> leader?:%s, me?:%d\n", __func__, midtoIPString(leader), (myIpAddr == leader));
+       printf("%s-> leader?:%s, me?:%s\n", __func__, midtoIPString(leader), (myIpAddr == leader)?"LEADER":"NOT LEADER");
 #endif
        
        if(leader == myIpAddr) {
@@ -1523,40 +1596,52 @@ void restoreDuplicationState(unsigned int deadHost) {
                if(!leaderFixing) {
                        leaderFixing = 1;
                        pthread_mutex_unlock(&leaderFixing_mutex);
-                       //fixit
-                       updateLiveHosts();
 
-                       if(!liveHosts[findHost(deadHost)]) {    //confirmed dead
-                               duplicateLostObjects(deadHost);
-                       }
-                       if(updateLiveHostsCommit() != 0) {
-                               printf("error updateLiveHostsCommit()\n");
-                               exit(1);
-                       }
-               pthread_mutex_lock(&leaderFixing_mutex);
-                       leaderFixing = 0;
-                       pthread_mutex_unlock(&leaderFixing_mutex);
+      if(!liveHosts[findHost(deadHost)]) {
+#ifdef DEBUG
+        printf("%s -> already fixed\n",__func__);
+#endif
+        pthread_mutex_lock(&leaderFixing_mutex);
+        leaderFixing =0;
+        pthread_mutex_unlock(&leaderFixing_mutex);
+      }
+      else {
+                       updateLiveHosts();
+        duplicateLostObjects(deadHost);
+                       
+                   if(updateLiveHostsCommit() != 0) {
+                               printf("%s -> error updateLiveHostsCommit()\n",__func__);
+                                 exit(1);
+                       }
+               pthread_mutex_lock(&leaderFixing_mutex);
+                       leaderFixing = 0;
+                         pthread_mutex_unlock(&leaderFixing_mutex);
+      }
                }
                else {
                        pthread_mutex_unlock(&leaderFixing_mutex);
+#ifdef DEBUG
+      printf("%s (REMOTE_RESTORE_DUPLICATED_STATE -> LEADER is already fixing\n",__func__);
+#endif
                        sleep(WAIT_TIME);
-                       //while(leaderFixing);
-                       return;
                }
        }
        else {
-               if((sd = getSock2WithLock(transRequestSockPool, leader)) < 0) {
-                       printf("restoreDuplicationState(): socket create error\n");
+               if((sd = getSockWithLock(transRequestSockPool, leader)) < 0) {
+                       printf("%s -> socket create error\n",__func__);
                        exit(-1);
                }
                ctrl = REMOTE_RESTORE_DUPLICATED_STATE;
                send_data(sd, &ctrl, sizeof(char));
                send_data(sd, &deadHost, sizeof(unsigned int));
-               recv_data(sd, &ctrl, sizeof(char));
+    freeSockWithLock(transRequestSockPool,leader,sd);
          sleep(WAIT_TIME);
-               return;
        }
+
+  printf("%s -> Finished!\n",__func__);
 }
+#endif
+
 
 /*  Commit info for objects modified */
 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
@@ -1686,7 +1771,6 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
   numlocked = transinfo->numlocked;
   oidlocked = transinfo->objlocked;
 
-
 #ifdef DEBUG
        printf("%s-> nummod: %d, numcreated: %d, numlocked: %d\n", __func__, nummod, numcreated, numlocked);
 #endif
@@ -1715,7 +1799,17 @@ int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo) {
     header->version += 1;
     //printf("oid: %u, new header version: %d\n", oidmod[i], header->version);
     if(header->notifylist != NULL) {
+#ifdef DEBUG
+      printf("%s -> type : %d notifylist : %d\n",__func__,TYPE(header),header->notifylist);
+#endif
+#ifdef RECOVERY
+      if(header->isBackup != 0)
+        notifyAll(&header->notifylist, OID(header), header->version);
+      else
+        clearNotifyList(OID(header));
+#else  
       notifyAll(&header->notifylist, OID(header), header->version);
+#endif
     }
   }
   /* If object is newly created inside transaction then commit it */
@@ -2113,6 +2207,7 @@ int processConfigFile() {
   char *token;
   const char *delimiters = " \t\n";
   char *commentBegin;
+  unsigned int i;
   in_addr_t tmpAddr;
 
   configFile = fopen(CONFIG_FILENAME, "r");
@@ -2128,6 +2223,7 @@ int processConfigFile() {
 #ifdef RECOVERY        
        liveHosts = calloc(sizeOfHostArray, sizeof(unsigned int));
        locateObjHosts = calloc(sizeOfHostArray*2, sizeof(unsigned int));
+
   liveHostsValid = 0;
 #endif
 
@@ -2178,9 +2274,13 @@ int processConfigFile() {
        transIDMin = oidMin;
        transIDMax = oidMax;
 
+  waitThreadID = -1;
+  waitThreadMid = -1;
+
   return 0;
 }
 
+#ifdef RECOVERY
 unsigned int getDuplicatedPrimaryMachine(unsigned int mid) {
        int i;
        for(i = 0; i < numHostsInSystem; i++) {
@@ -2211,67 +2311,90 @@ unsigned int getBackupMachine(unsigned int mid) {
        return bmid;
 }
 
+int getStatus(int mid) {
+#ifdef DEBUG
+  printf("%s -> host %s : %s\n",__func__,midtoIPString(hostIpAddrs[mid]),(liveHosts[mid] == 1)?"LIVE":"DEAD");
+#endif
+  return liveHosts[mid];
+}
+#endif
+
+#ifdef RECOVERY
 // updates the leader's liveHostArray and locateObj
-void updateLiveHosts() {
+unsigned int updateLiveHosts() {
 #ifdef DEBUG
-  printf("%s-> Entering updateLiveHosts\n", __func__); 
+    printf("%s-> Entering updateLiveHosts\n", __func__);       
 #endif
        // update everyone's list
-       liveHostsValid = 0;
-  //int *tmpLiveHosts = calloc(sizeOfHostArray, sizeof(unsigned int));
-               //foreach in hostipaddrs, ping -> update list of livemachines   
-    //socket connection?
-
-               //liveHosts lock here
-               int sd = 0, i, j, tmpNumLiveHosts = 0;
-               for(i = 0; i < numHostsInSystem; i++) {
-                       if(i == myIndexInHostArray) 
-                       {       
-                               tmpNumLiveHosts++;
-                               continue;
-                       }
-                       for(j = 0; j < 5; j++) {        // hard define num of retries
-                               if((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
-                               printf("updateLiveHosts(): Cannot create socket connection to [%s], attempt %d\n", __func__, midtoIPString(hostIpAddrs[i]), j);
-                                       usleep(1000);
-                                       if(j == 4)
-                                               liveHosts[i] = 0;               
-                                       continue;
-                               }
-        char liverequest[sizeof(char)];
-                               liverequest[0] = RESPOND_LIVE;
-               
-                               send_data(sd, &liverequest[0], sizeof(liverequest));
-                               char response = 0;
-                               recv_data(sd, &response, sizeof(response));
-                               
-                               //try to send msg
-                               //if timeout, dead host
-                               printf("YES received %d\n", response);
-                               if(response == LIVE) {
-                                       printf("must enter here\n");
-                               liveHosts[i] = 1;
-                               tmpNumLiveHosts++;
-                               //locateObjHosts[i*2] = hostIpAddrs[i];
-                               }
-                               else {
-                                       printf("or here\n");
-                                       liveHosts[i] = 0;
-                                       timeoutFlag = 0;
-                               }
-                               break;
-                               
+    liveHostsValid = 0;
+       
+  //foreach in hostipaddrs, ping -> update list of livemachines        
+  //socket connection?
+
+  int deadhost = -1;
+       //liveHosts lock here
+       int sd = 0, i, j, tmpNumLiveHosts = 0;
+       for(i = 0; i < numHostsInSystem; i++) {
+    if(i == myIndexInHostArray) 
+               {       
+                       tmpNumLiveHosts++;
+                       continue;
+               }
+               for(j = 0; j < 5; j++) {        // hard define num of retries
+                       if((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
+#ifdef DEBUG
+               printf("%s -> Cannot create socket connection to [%s], attempt %d\n", __func__, midtoIPString(hostIpAddrs[i]), j);
+#endif
+                               usleep(1000);
+  
+        if(j == 4) {
+                         if(liveHosts[i]) {
+            liveHosts[i] = 0;
+            deadhost = i;
+          }
+        }
+                       continue;
+                 }
+      
+      char liverequest[sizeof(char)];
+                       liverequest[0] = RESPOND_LIVE;
+       
+                       send_data(sd, &liverequest[0], sizeof(liverequest));
+      
+                       char response = 0;
+                       int timeout = recv_data(sd, &response, sizeof(response));
+                       
+                       //try to send msg
+                       //if timeout, dead host
+                       if(response == LIVE) {
+                       liveHosts[i] = 1;
+                       tmpNumLiveHosts++;
                        }
-                       if(liveHosts[i] == 0)
-                               printf("updateLiveHosts(): cannot make connection to machine %s\n", midtoIPString(hostIpAddrs[i]));
+                       else {
+        if(liveHosts[i]) {
+          liveHosts[i] = 0;
+          deadhost = i;
+        }
+                 }
+                       break;
                }
-               numLiveHostsInSystem = tmpNumLiveHosts;
-               printf("numLiveHostsInSystem:%d\n", numLiveHostsInSystem);
-               //have updated list of live machines
+#ifdef DEBUG
+         if(liveHosts[i] == 0)
+
+                 printf("updateLiveHosts(): cannot make connection to machine %s\n", midtoIPString(hostIpAddrs[i]));
+#endif
+       }
+       numLiveHostsInSystem = tmpNumLiveHosts;
+#ifdef DEBUG
+       printf("numLiveHostsInSystem:%d\n", numLiveHostsInSystem);
+#endif
+       //have updated list of live machines
 #ifdef DEBUG   
   printf("%s-> Exiting updateLiveHosts\n", __func__);  
        printHostsStatus();
 #endif
+
+  return deadhost;
 }
 
 int getNumLiveHostsInSystem() {
@@ -2284,58 +2407,44 @@ int getNumLiveHostsInSystem() {
 }
 
 int updateLiveHostsCommit() {
-               int sd = 0, i;
+#ifdef DEBUG      
+  printf("%s -> Enter\n",__func__);
+#endif
+       int sd = 0, i;
        
-               char updaterequest[sizeof(char)+sizeof(int)*numHostsInSystem+sizeof(unsigned int)*(numHostsInSystem*2)];
-               updaterequest[0] = UPDATE_LIVE_HOSTS;
+       char updaterequest[sizeof(char)+sizeof(int)*numHostsInSystem+sizeof(unsigned int)*(numHostsInSystem*2)];
+  
+  updaterequest[0] = UPDATE_LIVE_HOSTS;
                
-               for(i = 0; i < numHostsInSystem; i++) {
-                       *((int *)(&updaterequest[i*4+1])) = liveHosts[i];  // clean this up later
-               }
-
-               for(i = 0; i < numHostsInSystem*2; i++) {
-                       *((unsigned int *)(&updaterequest[i*4+(numHostsInSystem*4)+1])) = locateObjHosts[i];    //ditto
-               }
-
-               //for each machine send data
-               for(i = 1; i < numHostsInSystem; i++) {         // hard define num of retries
-                               if(i == myIndexInHostArray) 
-                                       continue;
-                               if(liveHosts[i] == 1) {
-                                       if((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
-                                       printf("updateLiveHosts(): socket create error, attempt %d\n", i);
-                                               return -1;
-                                       }
-                                       send_data(sd, updaterequest, sizeof(updaterequest));
-                               }
-               }
-               liveHostsValid = 1;
-               printHostsStatus();
-               return 0;
-}
+       for(i = 0; i < numHostsInSystem; i++) {
+               *((int *)(&updaterequest[i*4+1])) = liveHosts[i];  // clean this up later
+       }
 
-/*void updateLocateObjHosts(unsigned int failedmid) {
-       int failedmidIndex = findHost(failedmid);
-       int i = 0, validIndex = 0;
+       for(i = 0; i < numHostsInSystem*2; i++) {
+               *((unsigned int *)(&updaterequest[i*4+(numHostsInSystem*4)+1])) = locateObjHosts[i];    //ditto
+       }
 
-       for(; i < numHostsInSystem; i++) {
-               if(locateObjHosts[(i*2)] == failedmid) {
-                       while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) 
-                               validIndex++;
-                       locateObjHosts[(i*2)] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
-                       validIndex++;
-                       while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) 
-                               validIndex++;
-                       locateObjHosts[(i*2)+1] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
-               }
-               else if(locateObjHosts[(i*2)+1] == failedmid) {
-                       while(liveHosts[(i+validIndex)%numHostsInSystem] == 0) 
-                               validIndex++;
-                       locateObjHosts[(i*2)+1] = hostIpAddrs[(i+validIndex)%numHostsInSystem];
-                       validIndex = 0;
+       //for each machine send data
+        
+       for(i = 0; i < numHostsInSystem; i++) {         // hard define num of retries
+               if(i == myIndexInHostArray) 
+                       continue;
+               if(liveHosts[i] == 1) {
+                       if((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
+                       printf("%s -> socket create error, attempt %d\n",__func__, i);
+                               return -1;
+                       }
+                       send_data(sd, updaterequest, sizeof(updaterequest));
                }
        }
-}*/
+       liveHostsValid = 1;
+#ifdef DEBUG
+       printHostsStatus();
+  printf("%s -> Finish\n",__func__);
+#endif
+
+       return 0;
+}
 
 void setLocateObjHosts() {
        int i = 0, validIndex = 0;
@@ -2371,27 +2480,40 @@ void setLocateObjHosts() {
        }
 }
 
+void setReLocateObjHosts(int mid)
+{
+  int mIndex = findHost(mid);
+  int backupMachine = getBackupMachine(mid);
+  int newPrimary = getDuplicatedPrimaryMachine(mid);
+  int newPrimaryIndex = findHost(newPrimary);
+  int i;
+
+  locateObjHosts[2*newPrimaryIndex+1] = backupMachine;
+  locateObjHosts[2*mIndex] = newPrimary;
+
+/* relocate the objects of the machines already dead */
+  for(i=0; i<numHostsInSystem *2; i+=2) {
+    if(locateObjHosts[i] == mid)
+      locateObjHosts[i] = newPrimary;
+    if(locateObjHosts[i+1] == mid)
+      locateObjHosts[i+1] = backupMachine;
+  }
+}
+
+
 //debug function
 void printHostsStatus() {
        int i;
-#ifdef DEBUG
        printf("%s-> *printing live machines and backups*\n", __func__);
-#endif
        for(i = 0; i < numHostsInSystem; i++) {
                if(liveHosts[i]) {
-#ifdef DEBUG
                        printf("%s-> [%s]: LIVE\n", __func__, midtoIPString(hostIpAddrs[i])); 
-#endif
                }
                else {
-#ifdef DEBUG
                        printf("%s-> [%s]: DEAD\n", __func__, midtoIPString(hostIpAddrs[i]));
-#endif
                }
-#ifdef DEBUG
                        printf("%s-> original:\t[%s]\n", __func__, midtoIPString(locateObjHosts[i*2]));
                        printf("%s-> backup:\t[%s]\n", __func__, midtoIPString(locateObjHosts[i*2+1]));
-#endif
        }
 }
 
@@ -2406,59 +2528,66 @@ int allHostsLive() {
 
 void duplicateLostObjects(unsigned int mid){
 
-#ifdef DEBUG
        printf("%s-> Start, mid: [%s]\n", __func__, midtoIPString(mid));  
-#endif
        
        //this needs to be changed.
-       unsigned int backupMid = getBackupMachine(mid);
-       unsigned int originalMid = getDuplicatedPrimaryMachine(mid);
+       unsigned int backupMid = getBackupMachine(mid); // get backup machine of dead machine
+       unsigned int originalMid = getDuplicatedPrimaryMachine(mid); // get primary machine that used deadmachine as backup machine.
 
 #ifdef DEBUG
        printf("%s-> backupMid: [%s], ", __func__, midtoIPString(backupMid));
        printf("originalMid: [%s]\n", midtoIPString(originalMid));
+       printHostsStatus(); 
 #endif
 
-  setLocateObjHosts();
-       printHostsStatus(); 
+  setReLocateObjHosts(mid);
        
        //connect to these machines
        //go through their object store copying necessary (in a transaction)
        //transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
        int sd = 0, i, j, tmpNumLiveHosts = 0;
 
-       if(originalMid == myIpAddr) {
-               originalMid = getPrimaryMachine(mid);
-               printf("originalMid: [%s]\n", midtoIPString(originalMid));
-               duplicateLocalOriginalObjects(originalMid);     
+  /* duplicateLostObject example
+   * Before M24 die,
+   * MID        21      24      26
+   * Primary    21      24      26
+   * Backup     26      21      24
+   * After M24 die,
+   * MID        21      26
+   * Primary   21,24    26
+   * Backup     26      21,24
+   */
+
+       if(originalMid == myIpAddr) {   // copy local machine's backup data, make it as primary data of backup machine.
+               duplicateLocalOriginalObjects(backupMid);       
        }
-       else if((sd = getSock2WithLock(transRequestSockPool, originalMid)) < 0) {
-               printf("updateLiveHosts(): socket create error, attempt %d\n", j);
+       else if((sd = getSockWithLock(transRequestSockPool, originalMid)) < 0) {
+               printf("%s -> socket create error, attempt %d\n", __func__,j);
+    exit(0);
                //usleep(1000);
        }
-       else {
+       else {      // if original is not local
                char duperequest;
                duperequest = DUPLICATE_ORIGINAL;
                send_data(sd, &duperequest, sizeof(char));
 #ifdef DEBUG
-         printf("%s-> Sent DUPLICATE_ORIGINAL request\n", __func__);   
+         printf("%s-> SD : %d  Sent DUPLICATE_ORIGINAL request to %s\n", __func__,sd,midtoIPString(originalMid));      
 #endif
-               originalMid = getPrimaryMachine(mid);
-               printf("originalMid: [%s]\n", midtoIPString(originalMid));
-               send_data(sd, &originalMid, sizeof(unsigned int));
+               send_data(sd, &backupMid, sizeof(unsigned int));
+
+    char response;
+               recv_data(sd, &response, sizeof(char));
 #ifdef DEBUG
-         printf("%s-> Sent originalMid\n", __func__);  
+               printf("%s (DUPLICATE_ORIGINAL) -> Received %s\n", __func__,(response==DUPLICATION_COMPLETE)?"DUPLICATION_COMPLETE":"DUPLICATION_FAIL");
 #endif
-               char response;
-               recv_data_block(sd, &response, sizeof(char));
-               printf("YES! Received %d\n", response);
-               }
 
-       if(backupMid == myIpAddr) {
-               backupMid = getBackupMachine(mid);
-               duplicateLocalBackupObjects(backupMid); 
+    freeSockWithLock(transRequestSockPool, originalMid, sd);
+       }
+
+       if(backupMid == myIpAddr) {   // copy local machine's primary data, and make it as backup data of original machine.
+               duplicateLocalBackupObjects(originalMid);       
        }
-       else if((sd = getSock2WithLock(transRequestSockPool, backupMid)) < 0) {
+       else if((sd = getSockWithLock(transRequestSockPool, backupMid)) < 0) {
                printf("updateLiveHosts(): socket create error, attempt %d\n", j);
                exit(1);
        }
@@ -2467,52 +2596,66 @@ void duplicateLostObjects(unsigned int mid){
                duperequest = DUPLICATE_BACKUP;
                send_data(sd, &duperequest, sizeof(char));
 #ifdef DEBUG
-         printf("%s-> Sent DUPLICATE_BACKUP request\n", __func__);     
+         printf("%s-> SD : %d  Sent DUPLICATE_BACKUP request to %s\n", __func__,sd,midtoIPString(backupMid));  
 #endif
-               backupMid = getBackupMachine(mid);
-               send_data(sd, &backupMid, sizeof(unsigned int));
+               send_data(sd, &originalMid, sizeof(unsigned int));
+
+               char response;
+               recv_data(sd, &response, sizeof(char));
 #ifdef DEBUG
-         printf("%s-> Sent backupMid\n", __func__);    
+               printf("%s (DUPLICATE_BACKUP) -> Received %s\n", __func__,(response==DUPLICATION_COMPLETE)?"DUPLICATION_COMPLETE":"DUPLICATION_FAIL");
 #endif
 
-               char response;
-               recv_data_block(sd, &response, sizeof(char));
-               printf("YES! Received %d\n", response);
+    freeSockWithLock(transRequestSockPool, backupMid, sd);
        }
 
-#ifdef DEBUG
+#ifndef DEBUG
        printf("%s-> End\n", __func__);  
 #endif
 }
 
 void duplicateLocalBackupObjects(unsigned int mid) {
        int tempsize, sd;
+  int i;
        char *dupeptr, ctrl, response;
-
-#ifdef DEBUG
+#ifndef DEBUG
        printf("%s-> Start; backup mid:%s\n", __func__, midtoIPString(mid));  
 #endif
+
        //copy code from dstmserver here
-       tempsize = mhashGetDuplicate(&dupeptr, 1);
+       tempsize = mhashGetDuplicate((void**)&dupeptr, 1);
 
+#ifdef DEBUG
        printf("tempsize:%d, dupeptrfirstvalue:%d\n", tempsize, *((unsigned int *)(dupeptr)));
+#endif
        //send control and dupes after
        ctrl = RECEIVE_DUPES;
        if((sd = getSockWithLock(transRequestSockPool, mid)) < 0) {
                printf("duplicatelocalbackup: socket create error\n");
                //usleep(1000);
        }
-
-       printf("sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", sd, tempsize, *((unsigned int *)(dupeptr)));
+#ifdef DEBUG
+       printf("%s -> sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", __func__,sd, tempsize, *((unsigned int *)(dupeptr)));
+#endif
        send_data(sd, &ctrl, sizeof(char));
        send_data(sd, dupeptr, tempsize);
-       recv_data(sd, &response, sizeof(char));
+       
+  recv_data(sd, &response, sizeof(char));
+  freeSockWithLock(transRequestSockPool,mid,sd);
+
+#ifdef DEBUG
+  printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
+#endif
+
        if(response != DUPLICATION_COMPLETE) {
-               //fail message
+#ifndef DEBUG
+    printf("%s -> DUPLICATION_FAIL\n",__func__);
+#endif
+    exit(-1);
        }
+  free(dupeptr);
 
-       freeSockWithLock(transRequestSockPool, mid, sd);
-#ifdef DEBUG
+#ifndef DEBUG
        printf("%s-> End\n", __func__);  
 #endif
 
@@ -2522,12 +2665,12 @@ void duplicateLocalOriginalObjects(unsigned int mid) {
        int tempsize, sd;
        char *dupeptr, ctrl, response;
 
-#ifdef DEBUG
+#ifndef DEBUG
        printf("%s-> Start\n", __func__);  
 #endif
        //copy code fom dstmserver here
 
-       tempsize = mhashGetDuplicate(&dupeptr, 0);
+       tempsize = mhashGetDuplicate((void**)&dupeptr, 0);
 
        //send control and dupes after
        ctrl = RECEIVE_DUPES;
@@ -2536,23 +2679,38 @@ void duplicateLocalOriginalObjects(unsigned int mid) {
                printf("DUPLICATE_ORIGINAL: socket create error\n");
                //usleep(1000);
        }
+#ifdef DEBUG
        printf("sd:%d, tempsize:%d, dupeptrfirstvalue:%d\n", sd, tempsize, *((unsigned int *)(dupeptr)));
+#endif
 
        send_data(sd, &ctrl, sizeof(char));
        send_data(sd, dupeptr, tempsize);
 
        recv_data(sd, &response, sizeof(char));
+  freeSockWithLock(transRequestSockPool,mid,sd);
+
+#ifdef DEBUG
+  printf("%s ->response : %d  -  %d\n",__func__,response,DUPLICATION_COMPLETE);
+#endif
+
        if(response != DUPLICATION_COMPLETE) {
                //fail message
+#ifndef DEBUG
+    printf("%s -> DUPLICATION_FAIL\n",__func__);
+#endif
+    exit(0);
        }
-       freeSockWithLock(transRequestSockPool, mid, sd);
+  
+  free(dupeptr);
 
-#ifdef DEBUG
+#ifndef DEBUG
        printf("%s-> End\n", __func__);  
 #endif
 
 }
 
+#endif
+
 void addHost(unsigned int hostIp) {
   unsigned int *tmpArray;
   int *tmpliveHostsArray;      
@@ -2567,6 +2725,7 @@ void addHost(unsigned int hostIp) {
     free(hostIpAddrs);
     hostIpAddrs = tmpArray;
 
+#ifdef RECOVERY
                tmpliveHostsArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
                memcpy(tmpliveHostsArray, liveHosts, sizeof(unsigned int) * numHostsInSystem);
     free(liveHosts);
@@ -2576,13 +2735,16 @@ void addHost(unsigned int hostIp) {
                memcpy(tmplocateObjHostsArray, locateObjHosts, sizeof(unsigned int) * numHostsInSystem);
     free(locateObjHosts);
     locateObjHosts = tmplocateObjHostsArray;
-
+#endif
                sizeOfHostArray *= 2;
   }
 
   hostIpAddrs[numHostsInSystem] = hostIp;
+
+#ifdef RECOVERY
   liveHosts[numHostsInSystem] = 0;
   locateObjHosts[numHostsInSystem*2] = hostIp;
+#endif
 
        numHostsInSystem++;
   return;
@@ -2600,13 +2762,16 @@ int findHost(unsigned int hostIp) {
 
 /* This function sends notification request per thread waiting on object(s) whose version
  * changes */
+#ifdef RECOVERY
+int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid, int waitmid) {
+#else
 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
-  int sock,i;
+#endif
+  int psock,i;
   objheader_t *objheader;
-  struct sockaddr_in remoteAddr;
+  struct sockaddr_in premoteAddr;
   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
   char *ptr;
-  int bytesSent;
   int status, size;
   unsigned short version;
   unsigned int oid,mid;
@@ -2615,25 +2780,50 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
   notifydata_t *ndata;
 
+#ifdef RECOVERY
+  int bsock;
+  struct sockaddr_in bremoteAddr;
+#endif
+
   oid = oidarry[0];
+
   if((mid = lhashSearch(oid)) == 0) {
     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
     return;
   }
+#ifdef RECOVERY
+  int pmid = getPrimaryMachine(mid);
+  int bmid = getBackupMachine(mid);
+#else
+  int pmid = mid;
+#endif
 
-  if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+#ifdef RECOVERY
+  if ((psock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ||
+      (bsock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) {
+#else
+  if ((psock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+#endif
     perror("reqNotify():socket()");
     return -1;
   }
 
-  bzero(&remoteAddr, sizeof(remoteAddr));
-  remoteAddr.sin_family = AF_INET;
-  remoteAddr.sin_port = htons(LISTEN_PORT);
-  remoteAddr.sin_addr.s_addr = htonl(mid);
+  /* for primary machine */
+  bzero(&premoteAddr, sizeof(premoteAddr));
+  premoteAddr.sin_family = AF_INET;
+  premoteAddr.sin_port = htons(LISTEN_PORT);
+  premoteAddr.sin_addr.s_addr = htonl(pmid);
 
+#ifdef RECOVERY
+  /* for backup machine */
+  bzero(&bremoteAddr, sizeof(bremoteAddr));
+  bremoteAddr.sin_family = AF_INET;
+  bremoteAddr.sin_port = htons(LISTEN_PORT);
+  bremoteAddr.sin_addr.s_addr = htonl(bmid);
+#endif
   /* Generate unique threadid */
   threadid++;
-
+  
   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
@@ -2652,19 +2842,34 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
   }
 
   /* Send  number of oids, oidarry, version array, machine id and threadid */
-  if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+#ifdef RECOVERY
+  if ((connect(psock, (struct sockaddr *)&premoteAddr, sizeof(premoteAddr))< 0) || 
+      (connect(bsock, (struct sockaddr *)&bremoteAddr, sizeof(bremoteAddr))< 0)) {
+#else
+  if ((connect(psock, (struct sockaddr *)&premoteAddr, sizeof(premoteAddr))< 0)) {
+#endif
     printf("reqNotify():error %d connecting to %s:%d\n", errno,
-           inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+    inet_ntoa(premoteAddr.sin_addr), LISTEN_PORT);
     free(ndata);
     return -1;
   } else {
+
+#ifdef DEBUG
+    printf("%s -> Pmid = %s\n",__func__,midtoIPString(pmid));
+    printf("%s -> Bmid = %s\n",__func__,midtoIPString(bmid));
+#endif
+
     msg[0] = THREAD_NOTIFY_REQUEST;
+
     *((unsigned int *)(&msg[1])) = numoid;
     /* Send array of oids  */
     size = sizeof(unsigned int);
 
     for(i = 0;i < numoid; i++) {
       oid = oidarry[i];
+#ifdef DEBUG
+      printf("%s -> oid[%d] = %d\n",__func__,i,oidarry[i]);
+#endif
       *((unsigned int *)(&msg[1] + size)) = oid;
       size += sizeof(unsigned int);
     }
@@ -2676,11 +2881,21 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
       size += sizeof(unsigned short);
     }
 
-    *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
+    *((unsigned int *)(&msg[1] + size)) = myIpAddr; 
+    size += sizeof(unsigned int);
     *((unsigned int *)(&msg[1] + size)) = threadid;
-    pthread_mutex_lock(&(ndata->threadnotify));
+#ifdef RECOVERY
+    waitThreadMid = waitmid;
+    waitThreadID = threadid;
+    printf("%s -> This Thread is waiting for %s\n",__func__,midtoIPString(waitmid));
+#endif
+
     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
-    send_data(sock, msg, size);
+    pthread_mutex_lock(&(ndata->threadnotify));
+    send_data(psock, msg, size);
+#ifdef RECOVERY
+    send_data(bsock, msg, size);
+#endif
     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
     pthread_mutex_unlock(&(ndata->threadnotify));
   }
@@ -2688,14 +2903,22 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
   pthread_cond_destroy(&threadcond);
   pthread_mutex_destroy(&threadnotify);
   free(ndata);
-  close(sock);
+  close(psock);
+
+#ifdef RECOVERY
+  close(bsock);
+#endif
+
   return status;
 }
 
 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
   notifydata_t *ndata;
-  int i, objIsFound = 0, index;
+  int i, objIsFound = 0, index = -1;
   void *ptr;
+#ifdef DEBUG
+  printf("%s -> oid = %d   vesion = %d    tid = %d\n",__func__,oid,version,tid);
+#endif
 
   //Look up the tid and call the corresponding pthread_cond_signal
   if((ndata = notifyhashSearch(tid)) == NULL) {
@@ -2704,30 +2927,36 @@ void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
   } else  {
     for(i = 0; i < ndata->numoid; i++) {
       if(ndata->oidarry[i] == oid) {
-       objIsFound = 1;
-       index = i;
+        objIsFound = 1;
+        index = i;
+
       }
     }
     if(objIsFound == 0) {
       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
       return;
-    } else {
-      if(version <= ndata->versionarry[index]) {
-       printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
-       return;
+    } 
+    else {
+      if(version <= ndata->versionarry[index] && version >= 0) {
+             printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
+       return;
       } else {
 #ifdef CACHE
        /* Clear from prefetch cache and free thread related data structure */
-       if((ptr = prehashSearch(oid)) != NULL) {
-         prehashRemove(oid);
-       }
+       if((ptr = prehashSearch(oid)) != NULL) {
+         prehashRemove(oid);
+       }
 #endif
-       pthread_mutex_lock(&(ndata->threadnotify));
-       pthread_cond_signal(&(ndata->threadcond));
-       pthread_mutex_unlock(&(ndata->threadnotify));
+       pthread_mutex_lock(&(ndata->threadnotify));
+       pthread_cond_signal(&(ndata->threadcond));
+       pthread_mutex_unlock(&(ndata->threadnotify));
       }
     }
   }
+
+#ifdef DEBUG
+  printf("%s -> Finished\n",__func__);
+#endif
   return;
 }
 
@@ -2737,10 +2966,18 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
   struct sockaddr_in remoteAddr;
   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
   int sock, status, size, bytesSent;
+#ifdef DEBUG
+  printf("%s -> Entering \n",__func__);
+#endif
 
   while(*head != NULL) {
     ptr = *head;
+
     mid = ptr->mid;
+#ifdef DEBUG
+    printf("%s -> trying to connect MID : %s\n",__func__,midtoIPString(mid));
+#endif
+
     //create a socket connection to that machine
     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
       perror("notifyAll():socket()");
@@ -2758,6 +2995,9 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
       fflush(stdout);
       status = -1;
     } else {
+#ifdef DEBUG
+      printf("%s -> connected\n",__func__);
+#endif
       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
       msg[0] = THREAD_NOTIFY_RESPONSE;
       *((unsigned int *)&msg[1]) = oid;
@@ -2771,13 +3011,18 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
     }
     //close socket
     close(sock);
+
     // Update head
     *head = ptr->next;
     free(ptr);
+#ifdef DEBUG
+    printf("%s -> End notifying MID : %s\n",__func__,midtoIPString(mid));  
+#endif
   }
   return status;
 }
 
+
 void transAbort() {
 #ifdef ABORTREADERS
   removetransactionhash();
@@ -2795,6 +3040,7 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
   tmp = pile;
   //Add oid into a machine that is already present in the pile linked list structure
   while(tmp != NULL) {
+//    printf("tmp->mid = [%s], mid = [%s]\n", midtoIPString(tmp->mid), midtoIPString(mid));
     if (tmp->mid == mid) {
       int tmpsize;
 
@@ -2803,16 +3049,6 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
                                tmp->numcreated++;
                                GETSIZE(tmpsize, headeraddr);
                                tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
-                         /*if(numHostsInSystem > 1) {
-                                       STATUS(headeraddr) = DIRTY;
-                                       //printf("Redo pInsert for oid %d, now modified\n", OID(headeraddr));
-                                       //printf("this machine: %d\n", mid);
-                                       midtoIP(tmp->mid, ip);
-                                       pile = pInsert(tmp, headeraddr, locateBackupMachine(headeraddr), num_objs);
-
-                               //      printf("header version: %d\n", headeraddr->version);
-                                       //printf("Finished Redo pInsert for oid %d, now modified\n", OID(headeraddr));
-                               }*/
                        } else if (STATUS(headeraddr) & DIRTY) {
                                tmp->oidmod[tmp->nummod] = OID(headeraddr);
                                tmp->nummod++;
@@ -2844,23 +3080,11 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
       ptr->numcreated++;
       GETSIZE(tmpsize, headeraddr);
       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
-         /*if(numHostsInSystem > 1) {
-                               STATUS(headeraddr) = DIRTY;
-                                       midtoIP(ptr->mid, ip);
-
-                                       printf("np; ptr->mid: %s, oid: %d, header version: %d\n", ip, OID(headeraddr), headeraddr->version);
-                                       //printf("header version: %d\n", headeraddr->version);
-                               pile = pInsert(tmp, headeraddr, locateBackupMachine(headeraddr), num_objs);
-                                       //printf("header version: %d\n", headeraddr->version);
-                       }*/
          } else if (STATUS(headeraddr) & DIRTY) {
       ptr->oidmod[ptr->nummod] = OID(headeraddr);
       ptr->nummod++;
       GETSIZE(tmpsize, headeraddr);
       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
-                               //printf("Redo oid %d?\n", OID(headeraddr));
-                               /*      midtoIP(ptr->mid, ip);
-                                       printf("np; Redo? ptr->mid: %s, oid: %d, header version: %d\n", ip, OID(headeraddr), headeraddr->version);*/
     } else {
       *((unsigned int *)ptr->objread)=OID(headeraddr);
       offset = sizeof(unsigned int);
@@ -2910,6 +3134,7 @@ plistnode_t *sortPiles(plistnode_t *pileptr) {
        return pileptr;
 }
 
+#ifdef RECOVERY
 /* Paxo Algorithm: 
  * Executes when the known leader has failed.  
  * Guarantees consensus on next leader among all live hosts.  */
@@ -2938,7 +3163,7 @@ int paxos()
        } while (ret == -1);
 
 #ifdef DEBUG
-       printf("\n>> Debug : Leader : [%s]\n", midtoIPString(leader));
+       printf("\n>> Debug : Leader : [%s]\t[%u]\n", midtoIPString(leader),leader);
 #endif
 
        return ret;
@@ -2968,7 +3193,7 @@ int paxosPrepare()
                if(!liveHosts[i]) 
                        continue;
 
-               if ((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
+               if ((sd = getSockWithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
                        printf("paxosPrepare(): socket create error\n");
                        continue;
                }
@@ -2977,12 +3202,11 @@ int paxosPrepare()
 #endif
                send_data(sd, &control, sizeof(char));  
                send_data(sd, &my_n, sizeof(int));
-               recv_data(sd, &control, sizeof(char));
-               if ((sd == -1) || (timeoutFlag == 1)) {
+               int timeout = recv_data(sd, &control, sizeof(char));
+               if ((sd == -1) || (timeout < 0)) {
 #ifdef DEBUG
                        printf("%s-> timeout to machine [%s]\n", __func__, midtoIPString(hostIpAddrs[i]));
 #endif
-                       timeoutFlag = 0;
                        continue;
                }
 
@@ -3004,6 +3228,8 @@ int paxosPrepare()
                        case PAXOS_PREPARE_REJECT:
                                break;
                }
+    
+    freeSockWithLock(transRequestSockPool,hostIpAddrs[i],sd);
        }
 
 #ifdef DEBUG
@@ -3029,13 +3255,13 @@ int paxosAccept()
 #ifdef DEBUG
        printf("[Accept]...\n");
 #endif
-               
        for (i = 0; i < numHostsInSystem; ++i) {
                control = PAXOS_ACCEPT;
-                       if(!liveHosts[i]) 
+         
+    if(!liveHosts[i]) 
                        continue;
 
-       if ((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
+       if ((sd = getSockWithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
                        printf("paxosAccept(): socket create error\n");
                        continue;
                }
@@ -3044,12 +3270,11 @@ int paxosAccept()
                send_data(sd, &my_n, sizeof(int));
                send_data(sd, &remote_v, sizeof(int));
 
-               recv_data(sd, &control, sizeof(char));
-               if ((sd == -1) || (timeoutFlag == 1)) {
+               int timeout = recv_data(sd, &control, sizeof(char));
+               if ((sd == -1) || (timeout < 0)) {
 #ifdef DEBUG
                        printf("%s-> timeout to machine [%s]\n", __func__, midtoIPString(hostIpAddrs[i]));
 #endif
-                       timeoutFlag = 0; 
                        continue;  
                }
 
@@ -3063,6 +3288,7 @@ int paxosAccept()
 #ifdef DEBUG
                printf(">> Debug : Accept - n_h [%d], n_a [%d], v_a [%s]\n", n_h, n_a, midtoIPString(v_a));
 #endif
+    freeSockWithLock(transRequestSockPool,hostIpAddrs[i],sd);
        }
 
        if (cnt >= (numLiveHostsInSystem / 2)) {
@@ -3084,7 +3310,6 @@ void paxosLearn()
 #endif
 
        control = PAXOS_LEARN;
-       //      transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
 
        for (i = 0; i < numHostsInSystem; ++i) {
                if(!liveHosts[i]) 
@@ -3098,13 +3323,121 @@ void paxosLearn()
 #endif
                        continue;
                }
-               if ((sd = getSock2WithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
+               if ((sd = getSockWithLock(transRequestSockPool, hostIpAddrs[i])) < 0) {
                        continue;
                        //                      printf("paxosLearn(): socket create error, attemp\n");
                }
 
                send_data(sd, &control, sizeof(char));
                send_data(sd, &v_a, sizeof(int));
+
+    freeSockWithLock(transRequestSockPool,hostIpAddrs[i],sd);
+
        }
        //return v_a;
 }
+
+
+void clearDeadThreadsNotification() 
+{
+
+#ifdef DEBUG
+  printf("%s -> Entered\n",__func__);
+#endif
+// clear all the threadnotify request first
+  
+  if(waitThreadID != -1) {
+    printf("%s -> I was waitng for %s\n",__func__,midtoIPString(waitThreadMid));
+    int waitThreadIndex = findHost(waitThreadMid);
+    int i;
+    notifydata_t *ndata;
+
+    if(liveHosts[waitThreadIndex] == 0) // the thread waiting for is dead
+    {
+      if((ndata = (notifydata_t*)notifyhashSearch(waitThreadID)) == NULL) {
+        return;
+      }
+   
+      for(i =0 ; i < ndata->numoid; i++) {
+        clearNotifyList(ndata->oidarry[i]);  // clear thread object's notifylist
+      }
+
+      pthread_mutex_lock(&(ndata->threadnotify));
+      pthread_cond_signal(&(ndata->threadcond));
+      pthread_mutex_unlock(&(ndata->threadnotify));
+
+      waitThreadMid = -1;
+      waitThreadID = -1;
+    }
+  }
+
+#ifdef DEBUG
+  printf("%s -> Finished\n",__func__);
+#endif
+}
+
+/* request the primary and the backup machines to clear
+   thread obj's notify list */
+void reqClearNotifyList(unsigned int oid)
+{
+  int psock,bsock,i;
+  int mid,pmid,bmid;
+  objheader_t *objheader;
+  struct sockaddr_in premoteAddr, bremoteAddr;
+  char msg[1 + sizeof(unsigned int)];
+
+  if((mid = lhashSearch(oid)) == 0) {
+    printf("%s -> No such machine found for oid %x\n",__func__,oid);
+    return;
+  }
+
+  pmid = getPrimaryMachine(mid);
+  bmid = getBackupMachine(mid);
+
+  if((psock = socket(AF_INET, SOCK_STREAM, 0)) < 0 ||
+     (bsock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+        perror("clearNotifyList() : socket()");
+    return ;
+  }
+
+  /* for primary machine */
+  bzero(&premoteAddr, sizeof(premoteAddr));
+  premoteAddr.sin_family = AF_INET;
+  premoteAddr.sin_port = htons(LISTEN_PORT);
+  premoteAddr.sin_addr.s_addr = htonl(pmid);
+
+  /* for backup machine */
+  bzero(&bremoteAddr, sizeof(bremoteAddr));
+  bremoteAddr.sin_family = AF_INET;
+  bremoteAddr.sin_port = htons(LISTEN_PORT);
+  bremoteAddr.sin_addr.s_addr = htonl(bmid);
+
+  /* send message to both the primary and the backup */
+  if((connect(psock, (struct sockaddr *)&premoteAddr, sizeof(premoteAddr)) < 0) ||
+     (connect(bsock, (struct sockaddr *)&bremoteAddr, sizeof(bremoteAddr)) < 0)) {
+      printf("%s -> error in connecting\n",__func__);
+      return;
+  }
+  else {
+    printf("%s -> Pmid = %s\n",__func__,midtoIPString(pmid));
+    printf("%s -> Bmid = %s\n",__func__,midtoIPString(bmid));
+    
+    msg[0] = CLEAR_NOTIFY_LIST;
+    *((unsigned int *)(&msg[1])) = oid;
+    
+    send_data(psock, &msg, sizeof(char) + sizeof(unsigned int));
+    send_data(bsock, &msg, sizeof(char) + sizeof(unsigned int)); 
+  }
+  
+  close(psock);
+  close(bsock);
+  
+}
+   
+
+int checkiftheMachineDead(unsigned int mid) {
+  int mIndex = findHost(mid);
+  return getStatus(mIndex);
+}
+
+#endif