8126962c2ecdc1523ff02d3d5fbbe3938889d7f0
[IRC.git] / Robust / src / Runtime / DSTM / interface / addUdpEnhance.c
1 #include <sys/socket.h>
2 #include <netinet/in.h>
3 #include <stdio.h>
4 #include <string.h>
5 #include "addUdpEnhance.h"
6
7 /************************
8  * Global Variables *
9  ***********************/
10 int udpSockFd;
11
12 int createUdpSocket() {
13   int sockfd;
14   struct sockaddr_in clientaddr;
15   const int on = 1;
16
17   if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
18     perror("socket creation failed");
19     return -1;
20   }
21   if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
22     perror("setsockopt - SOL_SOCKET");
23     return -1;
24   }
25   return sockfd;
26 }
27
28 int udpInit() {
29   int sockfd;
30   int setsockflag = 1;
31   struct sockaddr_in servaddr;
32
33   //Create Global Udp Socket
34   if((udpSockFd = createUdpSocket()) < 0) {
35     printf("Error in socket\n");
36   }
37
38   sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
39   if(sockfd < 0) {
40     perror("socket");
41     exit(1);
42   }
43
44   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
45     perror("socket");
46     exit(1);
47   }
48
49 #ifdef MAC 
50   if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
51     perror("socket");
52     exit(1);
53   }
54 #endif
55
56   bzero(&servaddr, sizeof(servaddr));
57   servaddr.sin_family = AF_INET;
58   servaddr.sin_port = htons(UDP_PORT);
59   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
60
61   if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
62     perror("bind");
63     exit(1);
64   }
65
66   return sockfd;
67 }
68
69 void *udpListenBroadcast(void *sockfd) {
70   pthread_t thread_udpBroadcast;
71   struct sockaddr_in servaddr;
72   char readBuffer[MAX_SIZE];
73   socklen_t socklen = sizeof(struct sockaddr);
74   int retval;
75
76   memset(readBuffer, 0, MAX_SIZE);
77   printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
78
79   while(1) {
80     //int bytesRcvd = recvfrom((int)sockfd, readBuffer, 5, 0, NULL, NULL);
81     int bytesRcvd = recvfrom((int)sockfd, readBuffer, strlen(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
82     if(bytesRcvd == 0) {
83       break;
84     }
85
86     if(bytesRcvd == -1) {
87       printf("DEBUG-> Recv Error! \n");
88       break;
89     }
90
91     short status = *((short *) &readBuffer[0]);
92     switch (status) {
93       case INVALIDATE_OBJS:
94         if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
95           printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
96           break;
97         }
98         break;
99       default:
100         printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
101     }
102   }
103
104 closeconnection:
105     /* Close connection */
106     if(close((int)sockfd) == -1)
107       perror("close");
108     pthread_exit(NULL);
109 }
110
111 /* Function that sends a broadcast to Invalidate objects that
112  * have been currently modified */
113 int invalidateObj(thread_data_array_t *tdata) {
114   struct sockaddr_in clientaddr;
115   //TODO Instead of sending "hello" send modified objects
116   char writeBuffer[MAX_SIZE];
117   //char writeBuffer[] = "hello";
118   const int on = 1;
119
120   bzero(&clientaddr, sizeof(clientaddr));
121   clientaddr.sin_family = AF_INET;
122   clientaddr.sin_port = htons(UDP_PORT);
123   clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
124   /* Create Udp Message */
125   int offset = 0;
126   *((short *)&writeBuffer[0]) = INVALIDATE_OBJS;
127   offset += sizeof(short);
128   *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
129   offset += sizeof(short);
130   int i;
131   for(i = 0; i < tdata->buffer->f.nummod; i++) {
132     if(offset == MAX_SIZE) {
133       if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
134         perror("sendto error- ");
135         printf("DEBUG-> sendto error: errorno %d\n", errno);
136         return -1;
137       }
138       offset = 0;
139     }
140     /*
141     if(offset >= MAX_SIZE) {
142       printf("DEBUG-> Large number of objects for one udp message\n");
143       return -1;
144     }
145     */
146
147     *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
148     offset += sizeof(unsigned int);
149   }
150   int n;
151   if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
152     perror("sendto error- ");
153     printf("DEBUG-> sendto error: errorno %d\n", errno);
154     return -1;
155   }
156   //printf("DEBUG-> Client sending: %d bytes, %s\n", n, writeBuffer);
157   return 0;
158 }
159
160 int invalidateFromPrefetchCache(char *buffer) {
161   int offset = sizeof(int);
162   /* Read objects sent */
163   int numObjs = *((short *)(buffer+offset)) / sizeof(unsigned int);
164   int i;
165   for(i = 0; i < numObjs; i++) {
166     unsigned int oid;
167     oid = *((unsigned int *)(buffer+offset));
168     objheader_t *header;
169     /* Lookup Objects in prefetch cache and remove them */
170     if((header = prehashSearch(oid)) != NULL) {
171       prehashRemove(oid);
172     }
173     offset += sizeof(unsigned int);
174   }
175   return 0;
176 }