bug fixes for udp broadcast
[IRC.git] / Robust / src / Runtime / DSTM / interface / addUdpEnhance.c
index 8126962c2ecdc1523ff02d3d5fbbe3938889d7f0..79d08be5284ebc4716bba78c501706c0872ad82d 100644 (file)
@@ -2,6 +2,8 @@
 #include <netinet/in.h>
 #include <stdio.h>
 #include <string.h>
+#include <math.h>
+#include <netinet/tcp.h>
 #include "addUdpEnhance.h"
 
 /************************
@@ -66,28 +68,23 @@ int udpInit() {
   return sockfd;
 }
 
+/* Function that listens for udp broadcast messages */
 void *udpListenBroadcast(void *sockfd) {
   pthread_t thread_udpBroadcast;
   struct sockaddr_in servaddr;
-  char readBuffer[MAX_SIZE];
   socklen_t socklen = sizeof(struct sockaddr);
+  char readBuffer[MAX_SIZE];
   int retval;
 
-  memset(readBuffer, 0, MAX_SIZE);
   printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
 
+  memset(readBuffer, 0, MAX_SIZE);
   while(1) {
-    //int bytesRcvd = recvfrom((int)sockfd, readBuffer, 5, 0, NULL, NULL);
-    int bytesRcvd = recvfrom((int)sockfd, readBuffer, strlen(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
-    if(bytesRcvd == 0) {
-      break;
-    }
-
+    int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
     if(bytesRcvd == -1) {
       printf("DEBUG-> Recv Error! \n");
       break;
     }
-
     short status = *((short *) &readBuffer[0]);
     switch (status) {
       case INVALIDATE_OBJS:
@@ -101,68 +98,95 @@ void *udpListenBroadcast(void *sockfd) {
     }
   }
 
-closeconnection:
-    /* Close connection */
-    if(close((int)sockfd) == -1)
-      perror("close");
-    pthread_exit(NULL);
+  /* Close connection */
+  if(close((int)sockfd) == -1)
+    perror("close");
+  pthread_exit(NULL);
 }
 
-/* Function that sends a broadcast to Invalidate objects that
- * have been currently modified */
+/* Function that invalidate objects that
+ * have been currently modified
+ * returns -1 on error and 0 on success */
 int invalidateObj(thread_data_array_t *tdata) {
   struct sockaddr_in clientaddr;
-  //TODO Instead of sending "hello" send modified objects
-  char writeBuffer[MAX_SIZE];
-  //char writeBuffer[] = "hello";
-  const int on = 1;
+  int retval;
 
   bzero(&clientaddr, sizeof(clientaddr));
   clientaddr.sin_family = AF_INET;
   clientaddr.sin_port = htons(UDP_PORT);
   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
-  /* Create Udp Message */
-  int offset = 0;
-  *((short *)&writeBuffer[0]) = INVALIDATE_OBJS;
-  offset += sizeof(short);
-  *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
-  offset += sizeof(short);
-  int i;
-  for(i = 0; i < tdata->buffer->f.nummod; i++) {
-    if(offset == MAX_SIZE) {
-      if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
-        perror("sendto error- ");
-        printf("DEBUG-> sendto error: errorno %d\n", errno);
+  int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+  if(tdata->buffer->f.nummod < maxObjsPerMsg) {
+    /* send single udp msg */
+    int iteration = 0;
+    if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
+      printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
+      return -1;
+    }
+  } else {
+    /* Split into several udp msgs */
+    int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg;
+    if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++;
+    int i;
+    for(i = 1; i <= maxUdpMsg; i++) {
+      if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
+        printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
         return -1;
       }
-      offset = 0;
-    }
-    /*
-    if(offset >= MAX_SIZE) {
-      printf("DEBUG-> Large number of objects for one udp message\n");
-      return -1;
     }
-    */
+  }
+  return 0;
+}
 
-    *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
-    offset += sizeof(unsigned int);
+/* Function sends a udp broadcast, also distinguishes 
+ * msg size to be sent based on the iteration flag
+ * returns -1 on error and 0 on success */
+int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
+  char writeBuffer[MAX_SIZE];
+  int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+  int offset = 0;
+  *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
+  offset += sizeof(short);
+  if(iteration == 0) { // iteration flag == zero, send single udp msg
+    *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
+    offset += sizeof(short);
+    int i;
+    for(i = 0; i < tdata->buffer->f.nummod; i++) {
+      *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
+      offset += sizeof(unsigned int);
+    }
+  } else { // iteration flag > zero, send multiple udp msg
+    int numObj;
+    if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0) 
+      numObj = maxObjsPerMsg;
+    else  
+      numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg);
+    *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
+    offset += sizeof(short);
+    int index = (iteration - 1) * maxObjsPerMsg;
+    int i;
+    for(i = 0; i < numObj; i++) {
+      *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i];
+      offset += sizeof(unsigned int);
+    }
   }
   int n;
-  if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
+  if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
     perror("sendto error- ");
     printf("DEBUG-> sendto error: errorno %d\n", errno);
     return -1;
   }
-  //printf("DEBUG-> Client sending: %d bytes, %s\n", n, writeBuffer);
   return 0;
-}
+} 
 
+/* Function searches given oid in prefetch cache and invalidates obj from cache 
+ * returns -1 on error and 0 on success */
 int invalidateFromPrefetchCache(char *buffer) {
-  int offset = sizeof(int);
+  int offset = sizeof(short);
   /* Read objects sent */
-  int numObjs = *((short *)(buffer+offset)) / sizeof(unsigned int);
+  int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
   int i;
-  for(i = 0; i < numObjs; i++) {
+  for(i = 0; i < numObjsRecv; i++) {
     unsigned int oid;
     oid = *((unsigned int *)(buffer+offset));
     objheader_t *header;