Udp invalidation of objects
authoradash <adash>
Tue, 10 Jun 2008 17:17:53 +0000 (17:17 +0000)
committeradash <adash>
Tue, 10 Jun 2008 17:17:53 +0000 (17:17 +0000)
Robust/src/Runtime/DSTM/interface/addUdpEnhance.c [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/addUdpEnhance.h [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/sockpool.c
Robust/src/Runtime/DSTM/interface/sockpool.h
Robust/src/Runtime/DSTM/interface/trans.c

diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.c
new file mode 100644 (file)
index 0000000..8126962
--- /dev/null
@@ -0,0 +1,176 @@
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <string.h>
+#include "addUdpEnhance.h"
+
+/************************
+ * Global Variables *
+ ***********************/
+int udpSockFd;
+
+int createUdpSocket() {
+  int sockfd;
+  struct sockaddr_in clientaddr;
+  const int on = 1;
+
+  if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
+    perror("socket creation failed");
+    return -1;
+  }
+  if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
+    perror("setsockopt - SOL_SOCKET");
+    return -1;
+  }
+  return sockfd;
+}
+
+int udpInit() {
+  int sockfd;
+  int setsockflag = 1;
+  struct sockaddr_in servaddr;
+
+  //Create Global Udp Socket
+  if((udpSockFd = createUdpSocket()) < 0) {
+    printf("Error in socket\n");
+  }
+
+  sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+  if(sockfd < 0) {
+    perror("socket");
+    exit(1);
+  }
+
+  if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
+    perror("socket");
+    exit(1);
+  }
+
+#ifdef MAC 
+  if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
+    perror("socket");
+    exit(1);
+  }
+#endif
+
+  bzero(&servaddr, sizeof(servaddr));
+  servaddr.sin_family = AF_INET;
+  servaddr.sin_port = htons(UDP_PORT);
+  servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+  if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
+    perror("bind");
+    exit(1);
+  }
+
+  return sockfd;
+}
+
+void *udpListenBroadcast(void *sockfd) {
+  pthread_t thread_udpBroadcast;
+  struct sockaddr_in servaddr;
+  char readBuffer[MAX_SIZE];
+  socklen_t socklen = sizeof(struct sockaddr);
+  int retval;
+
+  memset(readBuffer, 0, MAX_SIZE);
+  printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
+
+  while(1) {
+    //int bytesRcvd = recvfrom((int)sockfd, readBuffer, 5, 0, NULL, NULL);
+    int bytesRcvd = recvfrom((int)sockfd, readBuffer, strlen(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
+    if(bytesRcvd == 0) {
+      break;
+    }
+
+    if(bytesRcvd == -1) {
+      printf("DEBUG-> Recv Error! \n");
+      break;
+    }
+
+    short status = *((short *) &readBuffer[0]);
+    switch (status) {
+      case INVALIDATE_OBJS:
+        if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
+          printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
+          break;
+        }
+        break;
+      default:
+        printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
+    }
+  }
+
+closeconnection:
+    /* Close connection */
+    if(close((int)sockfd) == -1)
+      perror("close");
+    pthread_exit(NULL);
+}
+
+/* Function that sends a broadcast to Invalidate objects that
+ * have been currently modified */
+int invalidateObj(thread_data_array_t *tdata) {
+  struct sockaddr_in clientaddr;
+  //TODO Instead of sending "hello" send modified objects
+  char writeBuffer[MAX_SIZE];
+  //char writeBuffer[] = "hello";
+  const int on = 1;
+
+  bzero(&clientaddr, sizeof(clientaddr));
+  clientaddr.sin_family = AF_INET;
+  clientaddr.sin_port = htons(UDP_PORT);
+  clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
+  /* Create Udp Message */
+  int offset = 0;
+  *((short *)&writeBuffer[0]) = INVALIDATE_OBJS;
+  offset += sizeof(short);
+  *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
+  offset += sizeof(short);
+  int i;
+  for(i = 0; i < tdata->buffer->f.nummod; i++) {
+    if(offset == MAX_SIZE) {
+      if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
+        perror("sendto error- ");
+        printf("DEBUG-> sendto error: errorno %d\n", errno);
+        return -1;
+      }
+      offset = 0;
+    }
+    /*
+    if(offset >= MAX_SIZE) {
+      printf("DEBUG-> Large number of objects for one udp message\n");
+      return -1;
+    }
+    */
+
+    *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
+    offset += sizeof(unsigned int);
+  }
+  int n;
+  if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
+    perror("sendto error- ");
+    printf("DEBUG-> sendto error: errorno %d\n", errno);
+    return -1;
+  }
+  //printf("DEBUG-> Client sending: %d bytes, %s\n", n, writeBuffer);
+  return 0;
+}
+
+int invalidateFromPrefetchCache(char *buffer) {
+  int offset = sizeof(int);
+  /* Read objects sent */
+  int numObjs = *((short *)(buffer+offset)) / sizeof(unsigned int);
+  int i;
+  for(i = 0; i < numObjs; i++) {
+    unsigned int oid;
+    oid = *((unsigned int *)(buffer+offset));
+    objheader_t *header;
+    /* Lookup Objects in prefetch cache and remove them */
+    if((header = prehashSearch(oid)) != NULL) {
+      prehashRemove(oid);
+    }
+    offset += sizeof(unsigned int);
+  }
+  return 0;
+}
diff --git a/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h b/Robust/src/Runtime/DSTM/interface/addUdpEnhance.h
new file mode 100644 (file)
index 0000000..7d3d98c
--- /dev/null
@@ -0,0 +1,25 @@
+#ifndef _UDP_H
+#define _UDP_H
+
+#include "dstm.h"
+
+
+/*******************************
+ * Udp Message structures
+ ******************************/
+#define INVALIDATE_OBJS 101
+
+/*************************
+ * Global constants
+ ************************/
+#define MAX_SIZE  4000
+
+/********************************
+ *  Function Prototypes
+ *******************************/
+int createUdpSocket();
+int udpInit();
+void *udpListenBroadcast(void *);
+int invalidateObj(thread_data_array_t *);
+int invalidateFromPrefetchCache(char *); 
+#endif
index 2bb46769c322c51e1ecc3e8483773d49d23b486b..abfcae6b3a2bb5715399cf394f28fc8e6de50afd 100644 (file)
@@ -53,6 +53,7 @@
 //Transaction id per machine
 #define TID_LEN 20
 #define LISTEN_PORT 2156
+#define UDP_PORT 2158
 
 
 #include <stdlib.h>
index 98a537ef20c5f7ee664c68a3b6532ce930b10255..fed260bba249686abbce9f8d52e1fb71e52d29b0 100644 (file)
@@ -35,7 +35,6 @@ inline void Lock(volatile unsigned int *s) {
   }
 }
 
-
 sockPoolHashTable_t *createSockPool(sockPoolHashTable_t * sockhash, unsigned int size) {
   if((sockhash = calloc(1, sizeof(sockPoolHashTable_t))) == NULL) {
     printf("Calloc error at %s line %d\n", __FILE__, __LINE__);
@@ -78,14 +77,13 @@ int createNewSocket(unsigned int mid) {
   return sd;
 }
 
-
 int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
   socknode_t **ptr;
   int key = mid%(sockhash->size);
   int sd;
   
   Lock(&sockhash->mylock);
-  ptr=&sockhash->table[key];
+  ptr=&(sockhash->table[key]);
   
   while(*ptr!=NULL) {
     if (mid == (*ptr)->mid) {
@@ -114,7 +112,7 @@ int getSock(sockPoolHashTable_t *sockhash, unsigned int mid) {
   int key = mid%(sockhash->size);
   int sd;
   
-  ptr=&sockhash->table[key];
+  ptr=&(sockhash->table[key]);
   
   while(*ptr!=NULL) {
     if (mid == (*ptr)->mid) {
@@ -142,7 +140,7 @@ int getSock2(sockPoolHashTable_t *sockhash, unsigned int mid) {
   int key = mid%(sockhash->size);
   int sd;
   
-  ptr=&sockhash->table[key];
+  ptr=&(sockhash->table[key]);
   
   while(*ptr!=NULL) {
     if (mid == (*ptr)->mid) {
@@ -160,7 +158,6 @@ int getSock2(sockPoolHashTable_t *sockhash, unsigned int mid) {
   }
 }
 
-
 void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) {
     Lock(&sockhash->mylock);
     inusenode->next = sockhash->inuse;
index c85d7da826a9bdde5bcf5d46fbf3863da0279f9f..be392e47ce4b534d37f614e6729848b44e828209 100644 (file)
@@ -2,6 +2,7 @@
 #define _SOCKPOOL_H_
 
 #include "dstm.h"
+#include "ip.h"
 
 int test_and_set(volatile unsigned int *addr);
 void UnLock(volatile unsigned int *addr);
index d078e0c42b993aa6dbe69b2964fded9779539a11..ea81203ecec6c562f58c4fa785bd3283fd033554 100644 (file)
@@ -8,6 +8,7 @@
 #include "prelookup.h"
 #include "threadnotify.h"
 #include "queue.h"
+#include "addUdpEnhance.h"
 #ifdef COMPILER
 #include "thread.h"
 #endif
@@ -145,10 +146,11 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short
 
 /* This function starts up the transaction runtime. */
 int dstmStartup(const char * option) {
-  pthread_t thread_Listen;
+  pthread_t thread_Listen, udp_thread_Listen;
   pthread_attr_t attr;
   int master=option!=NULL && strcmp(option, "master")==0;
   int fd;
+  int udpfd;
 
   if (processConfigFile() != 0)
     return 0; //TODO: return error value, cause main program to exit
@@ -165,6 +167,8 @@ int dstmStartup(const char * option) {
   transInit();
   
   fd=startlistening();
+  udpfd = udpInit();
+  pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
   if (master) {
     pthread_attr_init(&attr);
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
@@ -617,11 +621,10 @@ void *transRequest(void *threadarg) {
   bzero((char*) &serv_addr, sizeof(serv_addr));
   serv_addr.sin_family = AF_INET;
   serv_addr.sin_port = htons(LISTEN_PORT);
-  midtoIP(tdata->mid,machineip);
-  machineip[15] = '\0';
-  serv_addr.sin_addr.s_addr = inet_addr(machineip);
+  serv_addr.sin_addr.s_addr = htonl(tdata->mid);
+
   /* Open Connection */
-  if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+  if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
     perror("Error in connect for TRANS_REQUEST\n");
     close(sd);
     pthread_exit(NULL);
@@ -676,7 +679,13 @@ void *transRequest(void *threadarg) {
       GETSIZE(size, header);
       size += sizeof(objheader_t);
       //make an entry in prefetch hash table
+      void *oldptr;
+      if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
+      prehashRemove(oidToPrefetch);
+      prehashInsert(oidToPrefetch, header);
+      } else {
       prehashInsert(oidToPrefetch, header);
+      }
       length = length - size;
       offset += size;
     }
@@ -780,6 +789,11 @@ void decideResponse(thread_data_array_t *tdata) {
       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
       return;
     }
+    /* Invalidate objects in other machine cache */
+    if((retval = invalidateObj(tdata)) != 0) {
+      printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
+      return;
+    }
   } else { 
     /* Send Abort in soft abort case followed by retry commiting transaction again*/
     *(tdata->replyctrl) = TRANS_ABORT;
@@ -800,7 +814,7 @@ int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
     int size;
     unsigned int oid;
     if(oidType == 'R') {
-      oid = *((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)); 
+      oid = *((unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i)); 
     } else {
       oid = tdata->buffer->oidmod[i];
     }
@@ -815,11 +829,18 @@ int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
     pthread_mutex_unlock(&prefetchcache_mutex);
     memcpy(newAddr, header, (size + sizeof(objheader_t)));
     //make an entry in prefetch hash table
-    prehashInsert(oid, newAddr);
+    void *oldptr;
+    if((oldptr = prehashSearch(oid)) != NULL) {
+      prehashRemove(oid);
+      prehashInsert(oid, newAddr);
+    } else {
+      prehashInsert(oid, newAddr);
+    }
   }
   return 0;
 }
 
+
 /* This function sends the final response to remote machines per
  * thread in their respective socket id It returns a char that is only
  * needed to check the correctness of execution of this function