get rid of unnecessary recv_data , SUCCESSFUL/UNSUCESSFUL ctrl msg
authoradash <adash>
Mon, 22 Sep 2008 22:10:02 +0000 (22:10 +0000)
committeradash <adash>
Mon, 22 Sep 2008 22:10:02 +0000 (22:10 +0000)
add new debug info

Robust/src/Runtime/DSTM/interface/dsmdebug.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index c95695a4b172983019c8173ba94f34963b49a876..0dd0e691328ad527817183bd88f81dcc140fbe7c 100644 (file)
@@ -1,6 +1,8 @@
 #ifndef _DSMDEBUG_H_
 #define _DSMDEBUG_H_
 
+#include <sys/time.h>
+
 #define TABORT1(s) {printf("%s\n", s); fflush(stdout);}
 #define TABORT2(s, msg) {printf("%s(): %s\n", s, msg); fflush(stdout);}
 #define TABORT3(func, s, msg, d) {printf("%s(): %s: for %s = %d\n", func, s, msg, d); fflush(stdout);}
 #define TABORT8(func, s, d) {printf("%s(): %s = %d\n", func, s, d); fflush(stdout);}
 #define TABORT9(func, a, b, c, d, val1, val2, val3) {printf("%s(): %s for %s =%x, %s = %d, %s = %x\n", func, a, b, val1, c, val2, d, val3); fflush(stdout);}
 
+#define ARRAY_SIZE 10100
+#define GETSTARTDELAY(start, count) { \
+  struct timeval tv; \
+  count++; \
+  gettimeofday(&tv, NULL); \
+  start = tv.tv_sec+(tv.tv_usec/1000000.0); \
+}
+
+#define GETSTART(start) { \
+  struct timeval tv; \
+  gettimeofday(&tv, NULL); \
+  start = tv.tv_sec+(tv.tv_usec/1000000.0); \
+}
+
+#define GETENDDELAY(start, end, time) { \
+  struct timeval tv; \
+  gettimeofday(&tv, NULL); \
+  end = tv.tv_sec+(tv.tv_usec/1000000.0); \
+  time = (end-start); \
+}
 
 #endif
index b5314303171d12092644de04ba40b9fd6cd0d554..1e0cac6dea3b5c405ad738d76c000555752f864c 100644 (file)
@@ -24,6 +24,7 @@ pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a
 
 sockPoolHashTable_t *transPResponseSocketPool;
 
+
 /* This function initializes the main objects store and creates the
  * global machine and location lookup table */
 
@@ -353,9 +354,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     printf("Error: In handleTransReq() %s, %d\n", __FILE__, __LINE__);
     return 1;
   }
-
   recv_data((int)acceptfd, &control, sizeof(char));
-
   /* Process the new control message */
   switch(control) {
   case TRANS_ABORT:
@@ -378,10 +377,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
        read_unlock(STATUSPTR(header));
       }
     }
-
-    /* Send ack to Coordinator */
-    sendctrl = TRANS_UNSUCESSFUL;
-    send_data((int)acceptfd, &sendctrl, sizeof(char));
     break;
 
   case TRANS_COMMIT:
@@ -407,7 +402,6 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
     //TODO Use fixed.trans_id  TID since Client may have died
     break;
   }
-
   /* Free memory */
   if (transinfo->objlocked != NULL) {
     free(transinfo->objlocked);
@@ -443,7 +437,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
   /* Process each oid in the machine pile/ group per thread */
   for (i = 0; i < fixed->numread + fixed->nummod; i++) {
     if (i < fixed->numread) { //Objs only read and not modified
-      int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
+     int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
       incr *= i;
       oid = *((unsigned int *)(objread + incr));
       incr += sizeof(unsigned int);
@@ -518,7 +512,6 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
     printf("Error: In decideCtrlMessage() %s, %d\n", __FILE__, __LINE__);
     return 0;
   }
-
   return control;
 }
 
@@ -706,10 +699,6 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
     }
   }
   //TODO Update location lookup table
-
-  /* Send ack to coordinator */
-  control = TRANS_SUCESSFUL;
-  send_data((int)acceptfd, &control, sizeof(char));
   return 0;
 }
 
index 36fdcf1ecd20221b2c866b006ec3d73ce0baa39a..ee4c92953b02525cf29c10be1d574519e68b4807 100644 (file)
@@ -19,6 +19,7 @@
 #define PREFETCH_CACHE_SIZE 1048576 //1MB
 #define CONFIG_FILENAME "dstm.conf"
 
+
 /* Global Variables */
 extern int classsize[];
 pfcstats_t *evalPrefetch;
@@ -481,6 +482,7 @@ int transCommit(transrecord_t *record) {
   local_thread_data_array_t *ltdata;
   int firsttime=1;
 
+
   do {
     treplyctrl=0;
     trecvcount = 0;
@@ -496,7 +498,6 @@ int transCommit(transrecord_t *record) {
     else
       pile=pile_ptr;
     firsttime=0;
-
     /* Create the packet to be sent in TRANS_REQUEST */
 
     /* Count the number of participants */
@@ -592,9 +593,9 @@ int transCommit(transrecord_t *record) {
       threadnum++;
       pile = pile->next;
     }
+
     /* Free attribute and wait for the other threads */
     pthread_attr_destroy(&attr);
-
     for (i = 0; i < threadnum; i++) {
       rc = pthread_join(thread[i], NULL);
       if(rc) {
@@ -610,7 +611,6 @@ int transCommit(transrecord_t *record) {
       }
       free(thread_data_array[i].buffer);
     }
-
     /* Free resources */
     pthread_cond_destroy(&tcond);
     pthread_mutex_destroy(&tlock);
@@ -675,7 +675,6 @@ void *transRequest(void *threadarg) {
   char machineip[16], retval;
 
   tdata = (thread_data_array_t *) threadarg;
-
   if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
     printf("transRequest(): socket create error\n");
     pthread_exit(NULL);
@@ -697,6 +696,12 @@ void *transRequest(void *threadarg) {
   }
 
   /* Send objects that are modified */
+  void *modptr;
+  if((modptr = calloc(1, tdata->buffer->f.sum_bytes)) == NULL) {
+    printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+    pthread_exit(NULL);
+  }
+  int offset = 0;
   for(i = 0; i < tdata->buffer->f.nummod ; i++) {
     int size;
     if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
@@ -705,11 +710,14 @@ void *transRequest(void *threadarg) {
     }
     GETSIZE(size,headeraddr);
     size+=sizeof(objheader_t);
-    send_data(sd, headeraddr, size);
+    memcpy(modptr+offset, headeraddr, size);
+    offset+=size;
   }
-
+  send_data(sd, modptr, tdata->buffer->f.sum_bytes);
+  free(modptr);
   /* Read control message from Participant */
   recv_data(sd, &control, sizeof(char));
+
   /* Recv Objects if participant sends TRANS_DISAGREE */
 #ifdef CACHE
   if(control == TRANS_DISAGREE) {
@@ -746,7 +754,6 @@ void *transRequest(void *threadarg) {
     }
   }
 #endif
-
   recvcontrol = control;
   /* Update common data structure and increment count */
   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
@@ -766,29 +773,6 @@ void *transRequest(void *threadarg) {
   }
   pthread_mutex_unlock(tdata->lock);
 
-  /* clear objects from prefetch cache */
-  /*
-     if(*(tdata->replyctrl) == TRANS_ABORT) {
-     int i;
-     for(i=0; i<tdata->buffer->f.nummod; i++) {
-      unsigned int oid = tdata->buffer->oidmod[i];
-      objheader_t *header;
-      if((header = prehashSearch(oid)) != NULL) {
-        prehashRemove(oid);
-      }
-     }
-     for(i=0; i<tdata->buffer->f.numread; i++) {
-      char *objread = tdata->buffer->objread;
-      unsigned int oid = *((unsigned int *)(objread+(sizeof(unsigned int) +
-                  sizeof(unsigned short))*i));
-      objheader_t *header;
-      if((header = prehashSearch(oid)) != NULL) {
-        prehashRemove(oid);
-      }
-     }
-     }
-   */
-
 #ifdef CACHE
   if(*(tdata->replyctrl) == TRANS_COMMIT) {
     int retval;
@@ -814,16 +798,6 @@ void *transRequest(void *threadarg) {
     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
     pthread_exit(NULL);
   }
-
-  recv_data((int)sd, &control, sizeof(char));
-
-  if(control == TRANS_UNSUCESSFUL) {
-    //printf("DEBUG-> TRANS_ABORTED\n");
-  } else if(control == TRANS_SUCESSFUL) {
-    //printf("DEBUG-> TRANS_SUCCESSFUL\n");
-  } else {
-    //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
-  }
   pthread_exit(NULL);
 }
 
@@ -833,7 +807,6 @@ void decideResponse(thread_data_array_t *tdata) {
   char control;
   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
                                                                   message to send */
-
   for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
     control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
                                               written onto the shared array */
@@ -868,14 +841,6 @@ void decideResponse(thread_data_array_t *tdata) {
     /* Send Commit */
     *(tdata->replyctrl) = TRANS_COMMIT;
     *(tdata->replyretry) = 0;
-#ifdef CACHE
-#if 0
-    /* Turn prefetching on */
-    int i;
-    for (i=0; i<numprefetchsites; i++)
-      evalPrefetch[i].operMode = 1;
-#endif
-#endif
   } else {
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
     *(tdata->replyctrl) = TRANS_ABORT;
@@ -908,7 +873,6 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
     /* If the decided response is TRANS_COMMIT */
     retval = TRANS_COMMIT;
   }
-
   return retval;
 }
 
@@ -964,9 +928,7 @@ void *handleLocalReq(void *threadarg) {
   int numread, i;
   unsigned int oid;
   unsigned short version;
-
   localtdata = (local_thread_data_array_t *) threadarg;
-
   /* Counters and arrays to formulate decision on control message to be sent */
   oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
   oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
@@ -1030,7 +992,6 @@ void *handleLocalReq(void *threadarg) {
     pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
   }
   pthread_mutex_unlock(localtdata->tdata->lock);
-
   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT) {
     if(transAbortProcess(localtdata) != 0) {
       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
@@ -1054,6 +1015,7 @@ void *handleLocalReq(void *threadarg) {
       pthread_exit(NULL);
     }
   }
+
   /* Free memory */
   if (localtdata->transinfo->objlocked != NULL) {
     free(localtdata->transinfo->objlocked);
@@ -1062,7 +1024,6 @@ void *handleLocalReq(void *threadarg) {
     free(localtdata->transinfo->objnotfound);
   }
   pthread_exit(NULL);
-
 }
 
 /*  Commit info for objects modified */