#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
+#include <math.h>
+#include <netinet/tcp.h>
#include "addUdpEnhance.h"
/************************
return sockfd;
}
+/* Function that listens for udp broadcast messages */
void *udpListenBroadcast(void *sockfd) {
pthread_t thread_udpBroadcast;
struct sockaddr_in servaddr;
- char readBuffer[MAX_SIZE];
socklen_t socklen = sizeof(struct sockaddr);
+ char readBuffer[MAX_SIZE];
int retval;
- memset(readBuffer, 0, MAX_SIZE);
printf("Listening on port %d, fd = %d\n", UDP_PORT, (int)sockfd);
+ memset(readBuffer, 0, MAX_SIZE);
while(1) {
- //int bytesRcvd = recvfrom((int)sockfd, readBuffer, 5, 0, NULL, NULL);
- int bytesRcvd = recvfrom((int)sockfd, readBuffer, strlen(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
- if(bytesRcvd == 0) {
- break;
- }
-
+ int bytesRcvd = recvfrom((int)sockfd, readBuffer, sizeof(readBuffer), 0, (struct sockaddr *)&servaddr, &socklen);
if(bytesRcvd == -1) {
printf("DEBUG-> Recv Error! \n");
break;
}
-
short status = *((short *) &readBuffer[0]);
switch (status) {
case INVALIDATE_OBJS:
}
}
-closeconnection:
- /* Close connection */
- if(close((int)sockfd) == -1)
- perror("close");
- pthread_exit(NULL);
+ /* Close connection */
+ if(close((int)sockfd) == -1)
+ perror("close");
+ pthread_exit(NULL);
}
-/* Function that sends a broadcast to Invalidate objects that
- * have been currently modified */
+/* Function that invalidate objects that
+ * have been currently modified
+ * returns -1 on error and 0 on success */
int invalidateObj(thread_data_array_t *tdata) {
struct sockaddr_in clientaddr;
- //TODO Instead of sending "hello" send modified objects
- char writeBuffer[MAX_SIZE];
- //char writeBuffer[] = "hello";
- const int on = 1;
+ int retval;
bzero(&clientaddr, sizeof(clientaddr));
clientaddr.sin_family = AF_INET;
clientaddr.sin_port = htons(UDP_PORT);
clientaddr.sin_addr.s_addr = INADDR_BROADCAST;
- /* Create Udp Message */
- int offset = 0;
- *((short *)&writeBuffer[0]) = INVALIDATE_OBJS;
- offset += sizeof(short);
- *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
- offset += sizeof(short);
- int i;
- for(i = 0; i < tdata->buffer->f.nummod; i++) {
- if(offset == MAX_SIZE) {
- if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
- perror("sendto error- ");
- printf("DEBUG-> sendto error: errorno %d\n", errno);
+ int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+ if(tdata->buffer->f.nummod < maxObjsPerMsg) {
+ /* send single udp msg */
+ int iteration = 0;
+ if((retval = sendUdpMsg(tdata, &clientaddr, iteration)) < 0) {
+ printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ } else {
+ /* Split into several udp msgs */
+ int maxUdpMsg = tdata->buffer->f.nummod/maxObjsPerMsg;
+ if (tdata->buffer->f.nummod%maxObjsPerMsg) maxUdpMsg++;
+ int i;
+ for(i = 1; i <= maxUdpMsg; i++) {
+ if((retval = sendUdpMsg(tdata, &clientaddr, i)) < 0) {
+ printf("%s() error in sending udp message at %s, %d\n", __func__, __FILE__, __LINE__);
return -1;
}
- offset = 0;
- }
- /*
- if(offset >= MAX_SIZE) {
- printf("DEBUG-> Large number of objects for one udp message\n");
- return -1;
}
- */
+ }
+ return 0;
+}
- *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
- offset += sizeof(unsigned int);
+/* Function sends a udp broadcast, also distinguishes
+ * msg size to be sent based on the iteration flag
+ * returns -1 on error and 0 on success */
+int sendUdpMsg(thread_data_array_t *tdata, struct sockaddr_in *clientaddr, int iteration) {
+ char writeBuffer[MAX_SIZE];
+ int maxObjsPerMsg = (MAX_SIZE - sizeof(unsigned int))/sizeof(unsigned int);
+ int offset = 0;
+ *((short *)&writeBuffer[0]) = INVALIDATE_OBJS; //control msg
+ offset += sizeof(short);
+ if(iteration == 0) { // iteration flag == zero, send single udp msg
+ *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * (tdata->buffer->f.nummod));
+ offset += sizeof(short);
+ int i;
+ for(i = 0; i < tdata->buffer->f.nummod; i++) {
+ *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[i];
+ offset += sizeof(unsigned int);
+ }
+ } else { // iteration flag > zero, send multiple udp msg
+ int numObj;
+ if((tdata->buffer->f.nummod - (iteration * maxObjsPerMsg)) > 0)
+ numObj = maxObjsPerMsg;
+ else
+ numObj = tdata->buffer->f.nummod - ((iteration - 1)*maxObjsPerMsg);
+ *((short *) (writeBuffer+offset)) = (short) (sizeof(unsigned int) * numObj);
+ offset += sizeof(short);
+ int index = (iteration - 1) * maxObjsPerMsg;
+ int i;
+ for(i = 0; i < numObj; i++) {
+ *((unsigned int *) (writeBuffer+offset)) = tdata->buffer->oidmod[index+i];
+ offset += sizeof(unsigned int);
+ }
}
int n;
- if((n = sendto(udpSockFd, (const void *) writeBuffer, strlen(writeBuffer), 0, (const struct sockaddr *)&clientaddr, sizeof(clientaddr))) < 0) {
+ if((n = sendto(udpSockFd, (const void *) writeBuffer, sizeof(writeBuffer), 0, (const struct sockaddr *)clientaddr, sizeof(struct sockaddr_in))) < 0) {
perror("sendto error- ");
printf("DEBUG-> sendto error: errorno %d\n", errno);
return -1;
}
- //printf("DEBUG-> Client sending: %d bytes, %s\n", n, writeBuffer);
return 0;
-}
+}
+/* Function searches given oid in prefetch cache and invalidates obj from cache
+ * returns -1 on error and 0 on success */
int invalidateFromPrefetchCache(char *buffer) {
- int offset = sizeof(int);
+ int offset = sizeof(short);
/* Read objects sent */
- int numObjs = *((short *)(buffer+offset)) / sizeof(unsigned int);
+ int numObjsRecv = *((short *)(buffer+offset)) / sizeof(unsigned int);
int i;
- for(i = 0; i < numObjs; i++) {
+ for(i = 0; i < numObjsRecv; i++) {
unsigned int oid;
oid = *((unsigned int *)(buffer+offset));
objheader_t *header;