minor changes for update cache call(where is it called from)
authoradash <adash>
Tue, 5 Aug 2008 08:32:53 +0000 (08:32 +0000)
committeradash <adash>
Tue, 5 Aug 2008 08:32:53 +0000 (08:32 +0000)
Robust/src/Runtime/DSTM/interface/addPrefetchEnhance.c
Robust/src/Runtime/DSTM/interface/addUdpEnhance.c
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/gCollect.c
Robust/src/Runtime/DSTM/interface/trans.c

index a39bc897727e2d4727b2cabdf0fdb48d41be38eb..317f30fcc10ed739fe925f3770d796f00c676886 100644 (file)
@@ -84,23 +84,17 @@ void cleanPCache(thread_data_array_t *tdata) {
  * transaction commits 
  * Return -1 on error else returns 0 */ 
 int updatePrefetchCache(thread_data_array_t* tdata) {
-  plistnode_t *pile = tdata->pilehead;
-  while(pile != NULL) {
-    if(pile->mid != myIpAddr) { //Not local machine
-      int retval;
-      char oidType;
-      oidType = 'R';
-      if((retval = copyToCache(pile->numread, (unsigned int *)(pile->objread), tdata, oidType)) != 0) {
-        printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
-        return -1;
-      }
-      oidType = 'M';
-      if((retval = copyToCache(pile->nummod, pile->oidmod, tdata, oidType)) != 0) {
-        printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
-        return -1;
-      }
-    }
-    pile = pile->next;
+  int retval;
+  char oidType;
+  oidType = 'R';
+  if((retval = copyToCache(tdata->buffer->f.numread, (unsigned int *)(tdata->buffer->objread), tdata, oidType)) != 0) {
+    printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
+    return -1;
+  }
+  oidType = 'M';
+  if((retval = copyToCache(tdata->buffer->f.nummod, tdata->buffer->oidmod, tdata, oidType)) != 0) {
+    printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
+    return -1;
   }
   return 0;
 }
@@ -133,6 +127,7 @@ int copyToCache(int numoid, unsigned int *oidarray, thread_data_array_t *tdata,
     //Increment version for every modified object
     if(oidType == 'M') {
       newAddr->version += 1;
+      newAddr->notifylist = NULL;
     }
     //make an entry in prefetch lookup hashtable
     void *oldptr;
index d5b886213f6467d9e8c6d1c492fd14bd42b79916..aa8eeb82ef36bc9467dbe18cb61c8cdc17b219e2 100644 (file)
@@ -11,6 +11,7 @@
  * Global Variables *
  ***********************/
 int udpSockFd;
+extern unsigned int myIpAddr;
 
 int createUdpSocket() {
   int sockfd;
@@ -116,7 +117,7 @@ int invalidateObj(thread_data_array_t *tdata) {
   clientaddr.sin_family = AF_INET;
   clientaddr.sin_port = htons(UDP_PORT);
   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
-  int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+  int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
   if(tdata->buffer->f.nummod < maxObjsPerMsg) {
     /* send single udp msg */
     int iteration = 0;
@@ -144,10 +145,12 @@ int invalidateObj(thread_data_array_t *tdata) {
  * returns -1 on error and 0 on success */
 int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
   char writeBuffer[MAX_SIZE];
-  int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+  int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
   int offset = 0;
   *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
   offset += sizeof(short);
+  *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
+  offset += sizeof(unsigned int);
   if(iteration == 0) { // iteration flag == zero, send single udp msg
     *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod)); //sizeof msg
     offset += sizeof(short);
@@ -184,18 +187,24 @@ int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int i
  * returns -1 on error and 0 on success */
 int invalidateFromPrefetchCache(char *buffer) {
   int offset = sizeof(short);
-  /* Read objects sent */
-  int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
-  int i;
-  for(i = 0; i < numObjsRecv; i++) {
-    unsigned int oid;
-    oid = *((unsigned int *)(buffer+offset));
-    objheader_t *header;
-    /* Lookup Objects in prefetch cache and remove them */
-    if((header = prehashSearch(oid)) != NULL) {
-      prehashRemove(oid);
+  /* Read mid from msg */
+  unsigned int mid = *((unsigned int *)(buffer+offset));
+  offset += sizeof(unsigned int);
+  //Invalidate only if broadcast if from different machine
+  if(mid != myIpAddr) {
+    /* Read objects sent */
+    int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
+    int i;
+    for(i = 0; i < numObjsRecv; i++) {
+      unsigned int oid;
+      oid = *((unsigned int *)(buffer+offset));
+      objheader_t *header;
+      /* Lookup Objects in prefetch cache and remove them */
+      if((header = prehashSearch(oid)) != NULL) {
+        prehashRemove(oid);
+      }
+      offset += sizeof(unsigned int);
     }
-    offset += sizeof(unsigned int);
   }
   return 0;
 }
index 2fbf05f60dffb7207bb95f4e404c7eb6010cbd16..df979d749dda7cf2a5da7e44e952bbedd64b2650 100644 (file)
@@ -214,7 +214,6 @@ typedef struct thread_data_array {
   char *replyctrl;             /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */
   char *replyretry;            /* Shared variable that keep track if coordinator needs retry */
   transrecord_t *rec;          /* To send modified objects */
-  plistnode_t *pilehead;   /*  Shared variable, ptr to the head of the machine piles for the transaction rec */
 } thread_data_array_t;
 
 
index fed70e2d2013162ab861713e38ef93094611f22f..91fddb565a576e2945c411ab3a2b5b794b4222e6 100644 (file)
@@ -7,7 +7,7 @@ extern prehashtable_t pflookup; //Global prefetch cache  lookup table
 prefetchNodeInfo_t *pNodeInfo; //Global prefetch holding metadata
 
 void initializePCache() {
-  pNodeInfo = calloc(1, sizeof(prefetchNodeInfo_t));
+  pNodeInfo = calloc(1, sizeof(prefetchNodeInfo_t)); //Not freed yet
   pNodeInfo->oldptr = prefetchcache;
   pNodeInfo->newptr = NULL;
   pNodeInfo->num_old_objstr = 1; //for prefetch cache allocated by objstralloc in trans.c file
index b5527c2be573efd1ca69930b98d0e58a5fc1e57d..242e6bce56d7a0b8d7038a880faad75012b8f59c 100644 (file)
@@ -192,10 +192,10 @@ int dstmStartup(const char * option) {
   
   fd=startlistening();
   udpfd = udpInit();
+  pthread_attr_init(&attr);
+  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
   if (master) {
-    pthread_attr_init(&attr);
-    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
     return 1;
   } else {
@@ -537,7 +537,6 @@ int transCommit(transrecord_t *record) {
       thread_data_array[threadnum].replyctrl = &treplyctrl;
       thread_data_array[threadnum].replyretry = &treplyretry;
       thread_data_array[threadnum].rec = record;
-      thread_data_array[threadnum].pilehead = pile_ptr;
       /* If local do not create any extra connection */
       if(pile->mid != myIpAddr) { /* Not local */
        do {
@@ -761,8 +760,15 @@ void *transRequest(void *threadarg) {
   }
   pthread_mutex_unlock(tdata->lock);
 
-  /* Invalidate objects in other machine cache */
   if(*(tdata->replyctrl) == TRANS_COMMIT) {
+    int retval;
+     /* Update prefetch cache */
+    if((retval = updatePrefetchCache(tdata)) != 0) {
+      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+      return;
+    }
+
+    /* Invalidate objects in other machine cache */
     if(tdata->buffer->f.nummod > 0) {
       if((retval = invalidateObj(tdata)) != 0) {
         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
@@ -771,7 +777,6 @@ void *transRequest(void *threadarg) {
     }
   }
   
-
   /* Send the final response such as TRANS_COMMIT or TRANS_ABORT 
    * to all participants in their respective socket */
   if (sendResponse(tdata, sd) == 0) { 
@@ -788,7 +793,6 @@ void *transRequest(void *threadarg) {
   } else {
     //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
   }
-
   pthread_exit(NULL);
 }
 
@@ -830,12 +834,6 @@ void decideResponse(thread_data_array_t *tdata) {
     /* Send Commit */
     *(tdata->replyctrl) = TRANS_COMMIT;
     *(tdata->replyretry) = 0;
-    int retval;
-    /* Update prefetch cache */
-    if((retval = updatePrefetchCache(tdata)) != 0) {
-      printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
-      return;
-    }
   } else { 
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
     *(tdata->replyctrl) = TRANS_ABORT;
@@ -926,7 +924,7 @@ void *handleLocalReq(void *threadarg) {
   unsigned short version;
   void *mobj;
   objheader_t *headptr;
-  
+
   localtdata = (local_thread_data_array_t *) threadarg;
   
   /* Counters and arrays to formulate decision on control message to be sent */
@@ -1035,11 +1033,13 @@ void *handleLocalReq(void *threadarg) {
   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
     if(transAbortProcess(localtdata) != 0) {
       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
+      fflush(stdout);
       pthread_exit(NULL);
     }
   } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
     if(transComProcess(localtdata) != 0) {
       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
+      fflush(stdout);
       pthread_exit(NULL);
     }
   }
@@ -1050,7 +1050,6 @@ void *handleLocalReq(void *threadarg) {
   if (localtdata->transinfo->objnotfound != NULL) {
     free(localtdata->transinfo->objnotfound);
   }
-  
   pthread_exit(NULL);
 }
 
@@ -1363,7 +1362,7 @@ int getPrefetchResponse(int sd) {
     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
     /* TODO: For each object not found query DHT for new location and retrieve the object */
     /* Throw an error */
-    printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
+    //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
     //    exit(-1);
   } else {
     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
@@ -1705,8 +1704,8 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
       printf("notifyAll():error %d connecting to %s:%d\n", errno,
             inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-      status = -1;
       fflush(stdout);
+      status = -1;
     } else {
       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
       msg[0] = THREAD_NOTIFY_RESPONSE;