bug fixes for udp broadcast
authoradash <adash>
Thu, 10 Jul 2008 00:52:54 +0000 (00:52 +0000)
committeradash <adash>
Thu, 10 Jul 2008 00:52:54 +0000 (00:52 +0000)
flags and code added to collect transaction commit+abort statistics

Robust/src/ClassLibrary/Signal.java [new file with mode: 0644]
Robust/src/IR/Flat/BuildCode.java
Robust/src/Main/Main.java
Robust/src/Runtime/DSTM/interface/addUdpEnhance.c
Robust/src/Runtime/DSTM/interface/addUdpEnhance.h
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/trans.c
Robust/src/Runtime/runtime.c
Robust/src/Runtime/signal.c [new file with mode: 0644]
Robust/src/buildscript

diff --git a/Robust/src/ClassLibrary/Signal.java b/Robust/src/ClassLibrary/Signal.java
new file mode 100644 (file)
index 0000000..85e8832
--- /dev/null
@@ -0,0 +1,8 @@
+public class Signal {
+  public Signal() {
+  }
+  public native void nativeSigAction(); 
+  public void sigAction() {
+    nativeSigAction();
+  }
+}
index f50802880634bb128621cb9445d26902a142b987..fe3dc9a950ee89752dbd8721de24370b20cf594e 100644 (file)
@@ -262,8 +262,10 @@ public class BuildCode {
                outmethod.println("pthread_exit(NULL);");
        }
 
-
+    outmethod.println("printf(\"numTransAbort= %d\\n\", numTransAbort);");
+    outmethod.println("printf(\"numTransCommit= %d\\n\", numTransCommit);");
        outmethod.println("}");
+
     }
 
     /* This method outputs code for each task. */
@@ -717,6 +719,8 @@ public class BuildCode {
      * information. */
 
     private void generateSizeArray(PrintWriter outclassdefs) {
+       outclassdefs.print("int numTransAbort = 0;");
+       outclassdefs.print("int numTransCommit = 0;");
        outclassdefs.print("int classsize[]={");
        Iterator it=state.getClassSymbolTable().getDescriptorsIterator();
        cdarray=new ClassDescriptor[state.numClasses()];
index a46cca8253f35ac80d147011014c73325ee38ee4..2fa3e23ec23f36c6f21ce611b773d2565255492d 100644 (file)
@@ -164,6 +164,7 @@ public class Main {
       readSourceFile(state, ClassLibraryPrefix+"gnu/Random.java");
          readSourceFile(state, ClassLibraryPrefix+"Vector.java");
          readSourceFile(state, ClassLibraryPrefix+"Enumeration.java");
+         readSourceFile(state, ClassLibraryPrefix+"Signal.java");
 
 
       if (state.TASK) {
index 8126962c2ecdc1523ff02d3d5fbbe3938889d7f0..79d08be5284ebc4716bba78c501706c0872ad82d 100644 (file)
@@ -2,6 +2,8 @@
 #include <netinet/in.h>
 #include <stdio.h>
 #include <string.h>
+#include <math.h>
+#include <netinet/tcp.h>
 #include "addUdpEnhance.h"
 
 /************************
@@ -66,28 +68,23 @@ int udpInit() {
   return sockfd;
 }
 
+/* Function that listens for udp broadcast messages */
 void *udpListenBroadcast(void *sockfd) {
   pthread_t thread_udpBroadcast;
   struct sockaddr_in servaddr;
-  char readBuffer[MAX_SIZE];
   socklen_t socklen = sizeof(struct sockaddr);
+  char readBuffer[MAX_SIZE];
   int retval;
 
-  memset(readBuffer, 0, MAX_SIZE);
   printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
 
+  memset(readBuffer, 0, MAX_SIZE);
   while(1) {
-    //int bytesRcvd = recvfrom((int)sockfd, readBuffer, 5, 0, NULL, NULL);
-    int bytesRcvd = recvfrom((int)sockfd, readBuffer, strlen(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
-    if(bytesRcvd == 0) {
-      break;
-    }
-
+    int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
     if(bytesRcvd == -1) {
       printf("DEBUG-> Recv Error! \n");
       break;
     }
-
     short status = *((short *) &readBuffer[0]);
     switch (status) {
       case INVALIDATE_OBJS:
@@ -101,68 +98,95 @@ void *udpListenBroadcast(void *sockfd) {
     }
   }
 
-closeconnection:
-    /* Close connection */
-    if(close((int)sockfd) == -1)
-      perror("close");
-    pthread_exit(NULL);
+  /* Close connection */
+  if(close((int)sockfd) == -1)
+    perror("close");
+  pthread_exit(NULL);
 }
 
-/* Function that sends a broadcast to Invalidate objects that
- * have been currently modified */
+/* Function that invalidate objects that
+ * have been currently modified
+ * returns -1 on error and 0 on success */
 int invalidateObj(thread_data_array_t *tdata) {
   struct sockaddr_in clientaddr;
-  //TODO Instead of sending "hello" send modified objects
-  char writeBuffer[MAX_SIZE];
-  //char writeBuffer[] = "hello";
-  const int on = 1;
+  int retval;
 
   bzero(&clientaddr, sizeof(clientaddr));
   clientaddr.sin_family = AF_INET;
   clientaddr.sin_port = htons(UDP_PORT);
   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
-  /* Create Udp Message */
-  int offset = 0;
-  *((short *)&writeBuffer[0]) = INVALIDATE_OBJS;
-  offset += sizeof(short);
-  *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
-  offset += sizeof(short);
-  int i;
-  for(i = 0; i < tdata->buffer->f.nummod; i++) {
-    if(offset == MAX_SIZE) {
-      if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
-        perror("sendto error- ");
-        printf("DEBUG-> sendto error: errorno %d\n", errno);
+  int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+  if(tdata->buffer->f.nummod < maxObjsPerMsg) {
+    /* send single udp msg */
+    int iteration = 0;
+    if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
+      printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
+      return -1;
+    }
+  } else {
+    /* Split into several udp msgs */
+    int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg;
+    if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++;
+    int i;
+    for(i = 1; i <= maxUdpMsg; i++) {
+      if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
+        printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
         return -1;
       }
-      offset = 0;
-    }
-    /*
-    if(offset >= MAX_SIZE) {
-      printf("DEBUG-> Large number of objects for one udp message\n");
-      return -1;
     }
-    */
+  }
+  return 0;
+}
 
-    *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
-    offset += sizeof(unsigned int);
+/* Function sends a udp broadcast, also distinguishes 
+ * msg size to be sent based on the iteration flag
+ * returns -1 on error and 0 on success */
+int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
+  char writeBuffer[MAX_SIZE];
+  int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+  int offset = 0;
+  *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
+  offset += sizeof(short);
+  if(iteration == 0) { // iteration flag == zero, send single udp msg
+    *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
+    offset += sizeof(short);
+    int i;
+    for(i = 0; i < tdata->buffer->f.nummod; i++) {
+      *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
+      offset += sizeof(unsigned int);
+    }
+  } else { // iteration flag > zero, send multiple udp msg
+    int numObj;
+    if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0) 
+      numObj = maxObjsPerMsg;
+    else  
+      numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg);
+    *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
+    offset += sizeof(short);
+    int index = (iteration - 1) * maxObjsPerMsg;
+    int i;
+    for(i = 0; i < numObj; i++) {
+      *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i];
+      offset += sizeof(unsigned int);
+    }
   }
   int n;
-  if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
+  if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
     perror("sendto error- ");
     printf("DEBUG-> sendto error: errorno %d\n", errno);
     return -1;
   }
-  //printf("DEBUG-> Client sending: %d bytes, %s\n", n, writeBuffer);
   return 0;
-}
+} 
 
+/* Function searches given oid in prefetch cache and invalidates obj from cache 
+ * returns -1 on error and 0 on success */
 int invalidateFromPrefetchCache(char *buffer) {
-  int offset = sizeof(int);
+  int offset = sizeof(short);
   /* Read objects sent */
-  int numObjs = *((short *)(buffer+offset)) / sizeof(unsigned int);
+  int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
   int i;
-  for(i = 0; i < numObjs; i++) {
+  for(i = 0; i < numObjsRecv; i++) {
     unsigned int oid;
     oid = *((unsigned int *)(buffer+offset));
     objheader_t *header;
index 7d3d98c1a3d764e6f6db2e99de83410aa51febc8..3a0f9174c69590e8ee0838ebbe15fb12546237de 100644 (file)
@@ -12,7 +12,7 @@
 /*************************
  * Global constants
  ************************/
-#define MAX_SIZE  4000
+#define MAX_SIZE  2000
 
 /********************************
  *  Function Prototypes
@@ -22,4 +22,5 @@ int udpInit();
 void *udpListenBroadcast(void *);
 int invalidateObj(thread_data_array_t *);
 int invalidateFromPrefetchCache(char *); 
+int sendUdpMsg(thread_data_array_t *, struct sockaddr_in *, int);
 #endif
index 51d586c5f17f76100bf1e58f07a64283c8049c30..38434dc61c8a14e6f736df9ad8085ec6f8c2a13f 100644 (file)
@@ -13,7 +13,7 @@ void queueInit(void) {
   /* Intitialize primary queue */
   headoffset=0;
   tailoffset=0;
-  memory=malloc(QSIZE);
+  memory=malloc(QSIZE+sizeof(int));//leave space for -1
   pthread_mutexattr_init(&qlockattr);
   pthread_mutexattr_settype(&qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
   pthread_mutex_init(&qlock, &qlockattr);
@@ -30,13 +30,13 @@ void * getmemory(int size) {
     //Wait for tail to go past new start
     while(tailoffset<tmpoffset)
       ;
-    *((int *)(memory+headoffset))=-1;
+    *((int *)(memory+headoffset))=-1;//safe because we left space
     *((int*)memory)=size+sizeof(int);
     return memory+sizeof(int);
   } else {
     while(headoffset<tailoffset&&tailoffset<tmpoffset)
       ;
-    *((int*)(memory+headoffset))=size+sizeof(int);
+     *((int*)(memory+headoffset))=size+sizeof(int);
     return memory+headoffset+sizeof(int);
   }
 }
@@ -73,7 +73,6 @@ void inctail() {
     tailoffset=tmpoffset;
 }
 
-
 void predealloc() {
   free(memory);
 }
index ea81203ecec6c562f58c4fa785bd3283fd033554..c4139fc0ee4718c391f8302c293d52ebe593b66c 100644 (file)
@@ -41,6 +41,12 @@ sockPoolHashTable_t *transPrefetchSockPool;
 pthread_mutex_t notifymutex;
 pthread_mutex_t atomicObjLock;
 
+/***********************************
+ * Global Variables for statistics
+ **********************************/
+extern int numTransCommit;
+extern int numTransAbort;
+
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
 
@@ -158,6 +164,11 @@ int dstmStartup(const char * option) {
   if (!master)
     threadcount--;
 #endif
+
+#ifdef TRANSSTATS
+  printf("Trans stats is on\n");
+  fflush(stdout);
+#endif
   
   //Initialize socket pool
   transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1);
@@ -424,7 +435,7 @@ int transCommit(transrecord_t *record) {
   thread_data_array_t *thread_data_array;
   local_thread_data_array_t *ltdata;
   int firsttime=1;
-  
+
   do { 
     treplyctrl=0;
     trecvcount = 0; 
@@ -576,6 +587,9 @@ int transCommit(transrecord_t *record) {
   } while (treplyretry);
   
   if(treplyctrl == TRANS_ABORT) {
+#ifdef TRANSSTATS
+    ++numTransAbort;
+#endif
     /* Free Resources */
     objstrDelete(record->cache);
     chashDelete(record->lookupTable);
@@ -584,6 +598,9 @@ int transCommit(transrecord_t *record) {
     free(ltdata);
     return TRANS_ABORT;
   } else if(treplyctrl == TRANS_COMMIT) {
+#ifdef TRANSSTATS
+    ++numTransCommit;
+#endif
     /* Free Resources */
     objstrDelete(record->cache);
     chashDelete(record->lookupTable);
@@ -681,10 +698,10 @@ void *transRequest(void *threadarg) {
       //make an entry in prefetch hash table
       void *oldptr;
       if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
-      prehashRemove(oidToPrefetch);
-      prehashInsert(oidToPrefetch, header);
+        prehashRemove(oidToPrefetch);
+        prehashInsert(oidToPrefetch, header);
       } else {
-      prehashInsert(oidToPrefetch, header);
+        prehashInsert(oidToPrefetch, header);
       }
       length = length - size;
       offset += size;
@@ -790,9 +807,11 @@ void decideResponse(thread_data_array_t *tdata) {
       return;
     }
     /* Invalidate objects in other machine cache */
-    if((retval = invalidateObj(tdata)) != 0) {
-      printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
-      return;
+    if(tdata->buffer->f.nummod > 0) {
+      if((retval = invalidateObj(tdata)) != 0) {
+        printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+        return;
+      }
     }
   } else { 
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
@@ -818,10 +837,10 @@ int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
     } else {
       oid = tdata->buffer->oidmod[i];
     }
+    pthread_mutex_lock(&prefetchcache_mutex);
     header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
     //copy object into prefetch cache
     GETSIZE(size, header);
-    pthread_mutex_lock(&prefetchcache_mutex);
     if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) {
       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
       return -1;
@@ -1275,7 +1294,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
     buf+=sizeof(unsigned int);
     *((unsigned int *)buf) = myIpAddr; 
     buf += sizeof(unsigned int);
-    memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
+    memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
     send_data(sd, oidnoffset, len);
     tmp = tmp->next;
   }
index a5c7842552c8de9331987e0edca23b1acd5d3fa7..03ee0bcfad9fbd1ae386330b669be40dd3d680c7 100644 (file)
@@ -4,7 +4,6 @@
 #include "mem.h"
 #include<fcntl.h>
 #include<errno.h>
-#include<signal.h>
 #include<stdio.h>
 #include "option.h"
 #ifdef DSTM
diff --git a/Robust/src/Runtime/signal.c b/Robust/src/Runtime/signal.c
new file mode 100644 (file)
index 0000000..76ead1a
--- /dev/null
@@ -0,0 +1,27 @@
+#include "runtime.h"
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+extern int numTransAbort;
+extern int numTransCommit;
+
+
+void transStatsHandler(int sig, siginfo_t* info, void *context) {
+#ifdef TRANSSTATS
+  printf("numTransCommit = %d\n", numTransCommit);
+  printf("numTransAbort = %d\n", numTransAbort);
+  exit(0);
+#endif
+}
+
+#ifdef TRANSSTATS
+void CALL00(___Signal______nativeSigAction____) {
+  struct sigaction siga;
+  siga.sa_handler = NULL;
+  siga.sa_flags = SA_SIGINFO;
+  siga.sa_sigaction = &transStatsHandler;
+  sigemptyset(&siga.sa_mask);
+  sigaction(SIGUSR1, &siga, 0);
+}
+#endif
index beb59c1d61050d04a1bed2ba89953f99ad672bb0..6042f0bb5b5bd8e4183fe7c41c16c87d3f8244bf 100755 (executable)
@@ -22,6 +22,7 @@ echo -threadsimulate generate multi-thread simulate version binary
 echo -optional enable optional
 echo -debug generate debug symbols
 echo -prefetch do prefetch analysis
+echo -transstats generates transaction stats on commits and aborts
 echo -webinterface enable web interface
 echo -runtimedebug printout runtime debug messages
 echo "-thread use support for multiple threads"
@@ -46,6 +47,7 @@ NOJAVA=false
 CHECKFLAG=false
 RECOVERFLAG=false
 MULTICOREFLAG=false
+TRANSSTATSFLAG=false
 RAWFLAG=false
 THREADSIMULATEFLAG=false;
 USEDMALLOC=false
@@ -100,6 +102,9 @@ DSMFLAG=true
 elif [[ $1 = '-prefetch' ]]
 then
 JAVAOPTS="$JAVAOPTS -prefetch"
+elif [[ $1 = '-transstats' ]]
+then
+TRANSSTATSFLAG=true
 elif [[ $1 = '-printflat' ]]
 then
 JAVAOPTS="$JAVAOPTS -printflat"
@@ -284,11 +289,16 @@ $ROBUSTROOT/Runtime/SimpleHash.c $ROBUSTROOT/Runtime/option.c \
 $ROBUSTROOT/Runtime/ObjectHash.c \
 $ROBUSTROOT/Runtime/garbage.c $ROBUSTROOT/Runtime/socket.c \
 $ROBUSTROOT/Runtime/math.c \
+$ROBUSTROOT/Runtime/signal.c \
 $ROBUSTROOT/Runtime/GenericHashtable.c $ROBUSTROOT/Runtime/object.c"
 
 if $DSMFLAG
 then
 EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DCOMPILER -DDSTM -I$DSMRUNTIME"
+if $TRANSSTATSFLAG
+then
+EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DTRANSSTATS -DCOMPILER -DDSTM -I$DSMRUNTIME"
+fi
 FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c $DSMRUNTIME/addUdpEnhance.c"
 fi