79d08be5284ebc4716bba78c501706c0872ad82d
[IRC.git] / Robust / src / Runtime / DSTM / interface / addUdpEnhance.c
1 #include <sys/socket.h>
2 #include <netinet/in.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include <math.h>
6 #include <netinet/tcp.h>
7 #include "addUdpEnhance.h"
8
9 /************************
10  * Global Variables *
11  ***********************/
12 int udpSockFd;
13
14 int createUdpSocket() {
15   int sockfd;
16   struct sockaddr_in clientaddr;
17   const int on = 1;
18
19   if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
20     perror("socket creation failed");
21     return -1;
22   }
23   if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
24     perror("setsockopt - SOL_SOCKET");
25     return -1;
26   }
27   return sockfd;
28 }
29
30 int udpInit() {
31   int sockfd;
32   int setsockflag = 1;
33   struct sockaddr_in servaddr;
34
35   //Create Global Udp Socket
36   if((udpSockFd = createUdpSocket()) < 0) {
37     printf("Error in socket\n");
38   }
39
40   sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
41   if(sockfd < 0) {
42     perror("socket");
43     exit(1);
44   }
45
46   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
47     perror("socket");
48     exit(1);
49   }
50
51 #ifdef MAC 
52   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
53     perror("socket");
54     exit(1);
55   }
56 #endif
57
58   bzero(&servaddr, sizeof(servaddr));
59   servaddr.sin_family = AF_INET;
60   servaddr.sin_port = htons(UDP_PORT);
61   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
62
63   if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
64     perror("bind");
65     exit(1);
66   }
67
68   return sockfd;
69 }
70
71 /* Function that listens for udp broadcast messages */
72 void *udpListenBroadcast(void *sockfd) {
73   pthread_t thread_udpBroadcast;
74   struct sockaddr_in servaddr;
75   socklen_t socklen = sizeof(struct sockaddr);
76   char readBuffer[MAX_SIZE];
77   int retval;
78
79   printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
80
81   memset(readBuffer, 0, MAX_SIZE);
82   while(1) {
83     int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
84     if(bytesRcvd == -1) {
85       printf("DEBUG-> Recv Error! \n");
86       break;
87     }
88     short status = *((short *) &readBuffer[0]);
89     switch (status) {
90       case INVALIDATE_OBJS:
91         if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
92           printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
93           break;
94         }
95         break;
96       default:
97         printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
98     }
99   }
100
101   /* Close connection */
102   if(close((int)sockfd) == -1)
103     perror("close");
104   pthread_exit(NULL);
105 }
106
107 /* Function that invalidate objects that
108  * have been currently modified
109  * returns -1 on error and 0 on success */
110 int invalidateObj(thread_data_array_t *tdata) {
111   struct sockaddr_in clientaddr;
112   int retval;
113
114   bzero(&clientaddr, sizeof(clientaddr));
115   clientaddr.sin_family = AF_INET;
116   clientaddr.sin_port = htons(UDP_PORT);
117   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
118   int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
119   if(tdata->buffer->f.nummod < maxObjsPerMsg) {
120     /* send single udp msg */
121     int iteration = 0;
122     if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
123       printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
124       return -1;
125     }
126   } else {
127     /* Split into several udp msgs */
128     int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg;
129     if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++;
130     int i;
131     for(i = 1; i <= maxUdpMsg; i++) {
132       if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
133         printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
134         return -1;
135       }
136     }
137   }
138   return 0;
139 }
140
141 /* Function sends a udp broadcast, also distinguishes 
142  * msg size to be sent based on the iteration flag
143  * returns -1 on error and 0 on success */
144 int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
145   char writeBuffer[MAX_SIZE];
146   int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
147   int offset = 0;
148   *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
149   offset += sizeof(short);
150   if(iteration == 0) { // iteration flag == zero, send single udp msg
151     *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
152     offset += sizeof(short);
153     int i;
154     for(i = 0; i < tdata->buffer->f.nummod; i++) {
155       *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
156       offset += sizeof(unsigned int);
157     }
158   } else { // iteration flag > zero, send multiple udp msg
159     int numObj;
160     if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0) 
161       numObj = maxObjsPerMsg;
162     else  
163       numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg);
164     *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
165     offset += sizeof(short);
166     int index = (iteration - 1) * maxObjsPerMsg;
167     int i;
168     for(i = 0; i < numObj; i++) {
169       *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i];
170       offset += sizeof(unsigned int);
171     }
172   }
173   int n;
174   if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
175     perror("sendto error- ");
176     printf("DEBUG-> sendto error: errorno %d\n", errno);
177     return -1;
178   }
179   return 0;
180
181
182 /* Function searches given oid in prefetch cache and invalidates obj from cache 
183  * returns -1 on error and 0 on success */
184 int invalidateFromPrefetchCache(char *buffer) {
185   int offset = sizeof(short);
186   /* Read objects sent */
187   int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
188   int i;
189   for(i = 0; i < numObjsRecv; i++) {
190     unsigned int oid;
191     oid = *((unsigned int *)(buffer+offset));
192     objheader_t *header;
193     /* Lookup Objects in prefetch cache and remove them */
194     if((header = prehashSearch(oid)) != NULL) {
195       prehashRemove(oid);
196     }
197     offset += sizeof(unsigned int);
198   }
199   return 0;
200 }