changes
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
index 430c12b5d4b712294dce70dcd87a7df41c039e9f..ec852ce3bde1b9d06e9ad856d81f3db32eec4d01 100644 (file)
@@ -63,7 +63,9 @@ __thread struct ___Object___ *revertlist;
 __thread struct timespec exponential_backoff;
 __thread int count_exponential_backoff;
 __thread const int max_exponential_backoff = 1000; // safety limit
+#ifdef SANDBOX
 __thread int trans_allocation_bytes;
+#endif
 
 
 #ifdef ABORTREADERS
@@ -119,8 +121,6 @@ void printhex(unsigned char *, int);
 plistnode_t *createPiles();
 plistnode_t *sortPiles(plistnode_t *pileptr);
 
-
-
 /*******************************
 * Send and Recv function calls
 *******************************/
@@ -318,7 +318,7 @@ inline int findmax(int *array, int arraylength) {
   return max;
 }
 
-//#define INLINEPREFETCH
+#define INLINEPREFETCH
 #define PREFTHRESHOLD 0
 
 /* This function is a prefetch call generated by the compiler that
@@ -554,7 +554,9 @@ void transStart() {
   t_cache = objstrCreate(1048576);
   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
   revertlist=NULL;
+#ifdef SANDBOX
   trans_allocation_bytes = 0;
+#endif
 #ifdef ABORTREADERS
   t_abort=0;
 #endif
@@ -839,11 +841,13 @@ objheader_t *transCreateObj(unsigned int size) {
   tmp->rcount = 1;
   STATUS(tmp) = NEW;
   t_chashInsert(OID(tmp), tmp);
+#ifdef SANDBOX
   trans_allocation_bytes += size;
   /* Validate the read set if allocation is exceeds threshold */
   if(trans_allocation_bytes > MEM_ALLOC_THRESHOLD) {
     check_mem_alloc();
   }
+#endif
 
 #ifdef COMPILER
   return &tmp[1]; //want space after object header
@@ -929,6 +933,7 @@ plistnode_t *createPiles() {
  * Sends a transrequest() to each remote machines for objects found remotely
  * and calls handleLocalReq() to process objects found locally */
 int transCommit() {
+  //char buffer[30];
   unsigned int tot_bytes_mod, *listmid;
   plistnode_t *pile, *pile_ptr;
   char treplyretry; /* keeps track of the common response that needs to be sent */
@@ -938,6 +943,8 @@ int transCommit() {
 #ifdef SANDBOX
   abortenabled=0;
 #endif
+  struct writestruct writebuffer;
+  writebuffer.offset=0;
 
 #ifdef LOGEVENTS
   int iii;
@@ -1025,18 +1032,18 @@ int transCommit() {
        }
        socklist[sockindex] = sd;
        /* Send bytes of data with TRANS_REQUEST control message */
-       send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
+       send_buf(sd, &writebuffer, &(tosend[sockindex].f), sizeof(fixed_data_t));
 
        /* Send list of machines involved in the transaction */
        {
          int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
-         send_data(sd, tosend[sockindex].listmid, size);
+         send_buf(sd, &writebuffer, tosend[sockindex].listmid, size);
        }
 
        /* Send oids and version number tuples for objects that are read */
        {
          int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
-         send_data(sd, tosend[sockindex].objread, size);
+         send_buf(sd, &writebuffer, tosend[sockindex].objread, size);
        }
 
        /* Send objects that are modified */
@@ -1064,7 +1071,7 @@ int transCommit() {
          memcpy(modptr+offset, headeraddr, size);
          offset+=size;
        }
-       send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
+       forcesend_buf(sd, &writebuffer, modptr, tosend[sockindex].f.sum_bytes);
        free(modptr);
       } else { //handle request locally
        handleLocalReq(&tosend[sockindex], &transinfo, &getReplyCtrl[sockindex]);
@@ -1072,6 +1079,7 @@ int transCommit() {
       sockindex++;
       pile = pile->next;
     } //end of pile processing
+
       /* Recv Ctrl msgs from all machines */
     int i;
     for(i = 0; i < pilecount; i++) {
@@ -1117,6 +1125,7 @@ int transCommit() {
 #endif
       }
     }
+
     /* Decide the final response */
     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, pilecount)) == 0) {
       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
@@ -1125,6 +1134,18 @@ int transCommit() {
       return 1;
     }
 
+#ifdef CACHE
+    if (finalResponse == TRANS_COMMIT) {
+      /* Invalidate objects in other machine cache */
+      int retval;
+      if((retval = invalidateObj(tosend, pilecount,finalResponse,socklist)) != 0) {
+       printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+       free(tosend);
+       free(listmid);
+       return 1;
+      }
+    }
+#endif
     /* Send responses to all machines */
     for(i = 0; i < pilecount; i++) {
       int sd = socklist[i];
@@ -1139,17 +1160,6 @@ int transCommit() {
            free(listmid);
            return 1;
          }
-
-
-         /* Invalidate objects in other machine cache */
-         if(tosend[i].f.nummod > 0) {
-           if((retval = invalidateObj(&(tosend[i]))) != 0) {
-             printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-             free(tosend);
-             free(listmid);
-             return 1;
-           }
-         }
 #ifdef ABORTREADERS
          removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
          removethisreadtransaction(tosend[i].objread, tosend[i].f.numread);
@@ -1186,10 +1196,10 @@ int transCommit() {
     /* wait a random amount of time before retrying to commit transaction*/
     if(treplyretry) {
       treplyretryCount++;
-      if(treplyretryCount >= NUM_TRY_TO_COMMIT)
-        exponentialdelay();
-      else 
-        randomdelay();
+     // if(treplyretryCount >= NUM_TRY_TO_COMMIT)
+     //   exponentialdelay();
+     // else 
+      randomdelay();
 #ifdef TRANSSTATS
       nSoftAbort++;
 #endif
@@ -1197,9 +1207,6 @@ int transCommit() {
     /* Retry trans commit procedure during soft_abort case */
   } while (treplyretry);
 
-  exponential_backoff.tv_sec = 0;
-  exponential_backoff.tv_nsec = (long)(10000);//10 microsec_
-
   if(finalResponse == TRANS_ABORT) {
 #ifdef TRANSSTATS
     LOGEVENT('A');
@@ -1277,7 +1284,7 @@ void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, cha
   transinfo->modptr = NULL;
   transinfo->numlocked = numoidlocked;
   transinfo->numnotfound = numoidnotfound;
-
+  
   /* Condition to send TRANS_AGREE */
   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
     *getReplyCtrl = TRANS_AGREE;
@@ -1296,16 +1303,6 @@ void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_da
       return;
     }
   } else if(finalResponse == TRANS_COMMIT) {
-#ifdef CACHE
-    /* Invalidate objects in other machine cache */
-    if(tdata->f.nummod > 0) {
-      int retval;
-      if((retval = invalidateObj(tdata)) != 0) {
-       printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-       return;
-      }
-    }
-#endif
     if(transComProcess(tdata, transinfo) != 0) {
       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
       fflush(stdout);
@@ -1957,7 +1954,7 @@ int startRemoteThread(unsigned int oid, unsigned int mid) {
   int sock;
   struct sockaddr_in remoteAddr;
   char msg[1 + sizeof(unsigned int)];
-  int bytesSent;
+  //int bytesSent;
   int status;
 
   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
@@ -2094,7 +2091,7 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
   struct sockaddr_in remoteAddr;
   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
   char *ptr;
-  int bytesSent;
+  //int bytesSent;
   int status, size;
   unsigned short version;
   unsigned int oid,mid;
@@ -2224,7 +2221,8 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
   unsigned int mid;
   struct sockaddr_in remoteAddr;
   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
-  int sock, status, size, bytesSent;
+  int sock, status, size;
+  //int bytesSent;
 
   while(*head != NULL) {
     ptr = *head;