1 #include <sys/socket.h>
2 #include <netinet/in.h>
6 #include <netinet/tcp.h>
7 #include "addUdpEnhance.h"
9 /************************
11 ***********************/
14 int createUdpSocket() {
16 struct sockaddr_in clientaddr;
19 if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
20 perror("socket creation failed");
23 if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
24 perror("setsockopt - SOL_SOCKET");
33 struct sockaddr_in servaddr;
35 //Create Global Udp Socket
36 if((udpSockFd = createUdpSocket()) < 0) {
37 printf("Error in socket\n");
40 sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
46 if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
52 if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
58 bzero(&servaddr, sizeof(servaddr));
59 servaddr.sin_family = AF_INET;
60 servaddr.sin_port = htons(UDP_PORT);
61 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
63 if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
71 /* Function that listens for udp broadcast messages */
72 void *udpListenBroadcast(void *sockfd) {
73 pthread_t thread_udpBroadcast;
74 struct sockaddr_in servaddr;
75 socklen_t socklen = sizeof(struct sockaddr);
76 char readBuffer[MAX_SIZE];
79 printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
81 memset(readBuffer, 0, MAX_SIZE);
83 int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
85 printf("DEBUG-> Recv Error! \n");
88 short status = *((short *) &readBuffer[0]);
91 if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
92 printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
97 printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
101 /* Close connection */
102 if(close((int)sockfd) == -1)
107 /* Function that invalidate objects that
108 * have been currently modified
109 * returns -1 on error and 0 on success */
110 int invalidateObj(thread_data_array_t *tdata) {
111 struct sockaddr_in clientaddr;
114 bzero(&clientaddr, sizeof(clientaddr));
115 clientaddr.sin_family = AF_INET;
116 clientaddr.sin_port = htons(UDP_PORT);
117 clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
118 int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
119 if(tdata->buffer->f.nummod < maxObjsPerMsg) {
120 /* send single udp msg */
122 if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
123 printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
127 /* Split into several udp msgs */
128 int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg;
129 if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++;
131 for(i = 1; i <= maxUdpMsg; i++) {
132 if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
133 printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
141 /* Function sends a udp broadcast, also distinguishes
142 * msg size to be sent based on the iteration flag
143 * returns -1 on error and 0 on success */
144 int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
145 char writeBuffer[MAX_SIZE];
146 int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
148 *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
149 offset += sizeof(short);
150 if(iteration == 0) { // iteration flag == zero, send single udp msg
151 *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
152 offset += sizeof(short);
154 for(i = 0; i < tdata->buffer->f.nummod; i++) {
155 *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
156 offset += sizeof(unsigned int);
158 } else { // iteration flag > zero, send multiple udp msg
160 if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0)
161 numObj = maxObjsPerMsg;
163 numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg);
164 *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
165 offset += sizeof(short);
166 int index = (iteration - 1) * maxObjsPerMsg;
168 for(i = 0; i < numObj; i++) {
169 *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i];
170 offset += sizeof(unsigned int);
174 if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
175 perror("sendto error- ");
176 printf("DEBUG-> sendto error: errorno %d\n", errno);
182 /* Function searches given oid in prefetch cache and invalidates obj from cache
183 * returns -1 on error and 0 on success */
184 int invalidateFromPrefetchCache(char *buffer) {
185 int offset = sizeof(short);
186 /* Read objects sent */
187 int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
189 for(i = 0; i < numObjsRecv; i++) {
191 oid = *((unsigned int *)(buffer+offset));
193 /* Lookup Objects in prefetch cache and remove them */
194 if((header = prehashSearch(oid)) != NULL) {
197 offset += sizeof(unsigned int);