bug fixes for udp broadcast
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index ea81203ecec6c562f58c4fa785bd3283fd033554..c4139fc0ee4718c391f8302c293d52ebe593b66c 100644 (file)
@@ -41,6 +41,12 @@ sockPoolHashTable_t *transPrefetchSockPool;
 pthread_mutex_t notifymutex;
 pthread_mutex_t atomicObjLock;
 
+/***********************************
+ * Global Variables for statistics
+ **********************************/
+extern int numTransCommit;
+extern int numTransAbort;
+
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
 
@@ -158,6 +164,11 @@ int dstmStartup(const char * option) {
   if (!master)
     threadcount--;
 #endif
+
+#ifdef TRANSSTATS
+  printf("Trans stats is on\n");
+  fflush(stdout);
+#endif
   
   //Initialize socket pool
   transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
@@ -424,7 +435,7 @@ int transCommit(transrecord_t *record) {
   thread_data_array_t *thread_data_array;
   local_thread_data_array_t *ltdata;
   int firsttime=1;
-  
+
   do { 
     treplyctrl=0;
     trecvcount = 0; 
@@ -576,6 +587,9 @@ int transCommit(transrecord_t *record) {
   } while (treplyretry);
   
   if(treplyctrl == TRANS_ABORT) {
+#ifdef TRANSSTATS
+    ++numTransAbort;
+#endif
     /* Free Resources */
     objstrDelete(record->cache);
     chashDelete(record->lookupTable);
@@ -584,6 +598,9 @@ int transCommit(transrecord_t *record) {
     free(ltdata);
     return TRANS_ABORT;
   } else if(treplyctrl == TRANS_COMMIT) {
+#ifdef TRANSSTATS
+    ++numTransCommit;
+#endif
     /* Free Resources */
     objstrDelete(record->cache);
     chashDelete(record->lookupTable);
@@ -681,10 +698,10 @@ void *transRequest(void *threadarg) {
       //make an entry in prefetch hash table
       void *oldptr;
       if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
-      prehashRemove(oidToPrefetch);
-      prehashInsert(oidToPrefetch, header);
+        prehashRemove(oidToPrefetch);
+        prehashInsert(oidToPrefetch, header);
       } else {
-      prehashInsert(oidToPrefetch, header);
+        prehashInsert(oidToPrefetch, header);
       }
       length = length - size;
       offset += size;
@@ -790,9 +807,11 @@ void decideResponse(thread_data_array_t *tdata) {
       return;
     }
     /* Invalidate objects in other machine cache */
-    if((retval = invalidateObj(tdata)) != 0) {
-      printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-      return;
+    if(tdata->buffer->f.nummod > 0) {
+      if((retval = invalidateObj(tdata)) != 0) {
+        printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+        return;
+      }
     }
   } else { 
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
@@ -818,10 +837,10 @@ int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
     } else {
       oid = tdata->buffer->oidmod[i];
     }
+    pthread_mutex_lock(&prefetchcache_mutex);
     header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
     //copy object into prefetch cache
     GETSIZE(size, header);
-    pthread_mutex_lock(&prefetchcache_mutex);
     if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) {
       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
       return -1;
@@ -1275,7 +1294,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
     buf+=sizeof(unsigned int);
     *((unsigned int *)buf) = myIpAddr; 
     buf += sizeof(unsigned int);
-    memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
+    memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
     send_data(sd, oidnoffset, len);
     tmp = tmp->next;
   }