1 #include <sys/socket.h>
2 #include <netinet/in.h>
6 #include <netinet/tcp.h>
7 #include "addUdpEnhance.h"
8 #include "altprelookup.h"
10 #include "abortreaders.h"
13 /************************
15 ***********************/
17 extern unsigned int myIpAddr;
19 int createUdpSocket() {
21 struct sockaddr_in clientaddr;
24 if((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
25 perror("socket creation failed");
28 if((setsockopt(sockfd, SOL_SOCKET, SO_BROADCAST, &on, sizeof(on))) < 0) {
29 perror("setsockopt - SOL_SOCKET");
38 struct sockaddr_in servaddr;
40 //Create Global Udp Socket
41 if((udpSockFd = createUdpSocket()) < 0) {
42 printf("Error in socket\n");
45 sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
51 if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
57 if(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &setsockflag, sizeof(setsockflag)) < 0) {
63 bzero(&servaddr, sizeof(servaddr));
64 servaddr.sin_family = AF_INET;
65 servaddr.sin_port = htons(UDP_PORT);
66 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
68 if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) {
76 /* Function that listens for udp broadcast messages */
77 void *udpListenBroadcast(void *sockfd) {
78 pthread_t thread_udpBroadcast;
79 struct sockaddr_in servaddr;
80 socklen_t socklen = sizeof(struct sockaddr);
81 char readBuffer[MAX_SIZE];
84 printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
87 int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
89 printf("DEBUG-> Recv Error! \n");
92 short status = *((short *) &readBuffer[0]);
95 if((retval = invalidateFromPrefetchCache(readBuffer))!= 0) {
96 printf("Error: In invalidateFromPrefetchCache() at %s, %d\n", __FILE__, __LINE__);
102 printf("Error: Cannot regcognize the status in file %s, at line %d\n", __FILE__, __LINE__);
106 /* Close connection */
107 if(close((int)sockfd) == -1)
112 /* Function that invalidate objects that
113 * have been currently modified
114 * returns -1 on error and 0 on success */
115 int invalidateObj(trans_req_data_t *tdata, int pilecount, char finalresponse, int *socklist) {
116 struct timeval start, end;
117 struct sockaddr_in clientaddr;
121 for(i=0; i<pilecount; i++) {
122 nummod+=tdata[i].f.nummod;
124 bzero(&clientaddr, sizeof(clientaddr));
125 clientaddr.sin_family = AF_INET;
126 clientaddr.sin_port = htons(UDP_PORT);
127 clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
128 int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
129 /* send single udp msg */
130 if((retval = sendUdpMsg(tdata, pilecount, nummod, &clientaddr, finalresponse, socklist)) < 0) {
131 printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
137 /* Function sends a udp broadcast, also distinguishes
138 * msg size to be sent based on the total number of objects modified
139 * returns -1 on error and 0 on success */
140 int sendUdpMsg(trans_req_data_t *tdata, int pilecount, int nummod, struct sockaddr_in *clientaddr, char finalresponse, int *socklist) {
141 char writeBuffer[MAX_SIZE];
142 int maxObjsPerMsg = (MAX_SIZE - 2*sizeof(unsigned int))/sizeof(unsigned int);
146 *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
147 offset += sizeof(short);
148 *((unsigned int *)(writeBuffer+offset)) = myIpAddr; //mid sending invalidation
149 offset += sizeof(unsigned int);
152 int numtosend=nummod>maxObjsPerMsg ? maxObjsPerMsg : nummod;
153 int localoffset=offset;
155 *((short *)(writeBuffer+offset)) = (short) (sizeof(unsigned int) * numtosend);
156 localoffset += sizeof(short);
158 for(; j < pilecount; j++) {
159 for(; i < tdata[j].f.nummod; i++) {
160 *((unsigned int *) (writeBuffer+localoffset)) = tdata[j].oidmod[i]; //copy objects
161 localoffset += sizeof(unsigned int);
162 if ((++sentmsgs)==numtosend) {
170 if(sendto(udpSockFd, (const void *) writeBuffer, localoffset, 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in)) < 0) {
171 perror("sendto error- ");
172 printf("DEBUG-> sendto error: errorno %d\n", errno);
175 nummod= nummod - numtosend;
180 /* Function searches given oid in prefetch cache and invalidates obj from cache
181 * returns -1 on error and 0 on success */
182 int invalidateFromPrefetchCache(char *buffer) {
183 int offset = sizeof(short);
184 /* Read mid from msg */
185 unsigned int mid = *((unsigned int *)(buffer+offset));
186 offset += sizeof(unsigned int);
187 //Invalidate only if broadcast if from different machine
188 if(mid != myIpAddr) {
189 /* Read objects sent */
190 int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
191 offset += sizeof(short);
194 removetransaction((unsigned int *)(buffer+offset), numObjsRecv);
196 for(i = 0; i < numObjsRecv; i++) {
198 oid = *((unsigned int *)(buffer+offset));
200 /* Lookup Objects in prefetch cache and remove them */
201 if(((header = prehashSearch(oid)) != NULL)) {
202 //Keep invalid objects
203 STATUS(header)=DIRTY;
204 //prehashRemove(oid);
206 offset += sizeof(unsigned int);