-int prefetchReq(int acceptfd) {
- int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
- unsigned int oid, index = 0;
- char *ptr, buffer[PRE_BUF_SIZE];
- void *mobj;
- unsigned int objoid;
- char *header, control;
- objheader_t * head;
-
- /* Repeatedly recv the oid and offset pairs sent for prefetch */
- while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
- count++;
- if(length == -1)
- break;
- sum = 0;
- index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the
- // first 4 bytes are saved to send the
- // size of the buffer (that is computed at the end of the loop)
- oid = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
- numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
- N = numoffset * sizeof(short);
- short offset[numoffset];
- ptr = (char *)&offset;
- /* Recv the offset values per oid */
- do {
- n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0);
- sum += n;
- } while(sum < N && n != 0);
-
- /* Process each oid */
- if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found in buffer for later use */
- *(buffer + index) = OBJECT_NOT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* send the oid, it's size, it's header and data */
- header = (char *) mobj;
- head = (objheader_t *) header;
- size = sizeof(objheader_t) + sizeof(classsize[TYPE(head)]);
- *(buffer + index) = OBJECT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- memcpy(buffer+index, &size, sizeof(int));
- index += sizeof(int);
- memcpy(buffer + index, header, size);
- index += size;
- /* Calculate the oid corresponding to the offset value */
- for(i = 0 ; i< numoffset ; i++) {
- objoid = *((int *)(header + sizeof(objheader_t) + offset[i]));
- if((header = (char *) mhashSearch(objoid)) == NULL) {
- /* Obj not found, send oid */
- *(buffer + index) = OBJECT_NOT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- break;
- } else {/* Obj Found */
- /* send the oid, it's size, it's header and data */
- head = (objheader_t *) header;
- size = sizeof(objheader_t) + sizeof(classsize[TYPE(head)]);
- *(buffer + index) = OBJECT_FOUND;
- index += sizeof(char);
- memcpy(buffer+index, &oid, sizeof(unsigned int));
- index += sizeof(unsigned int);
- memcpy(buffer+index, &size, sizeof(int));
- index += sizeof(int);
- memcpy(buffer + index, header, size);
- index += size;
- continue;
- }
- }
- }
- /* Check for overflow in the buffer */
- if (index >= PRE_BUF_SIZE) {
- printf("Char buffer is overflowing\n");
- return 1;
- }
- /* Send Prefetch response control message only once*/
- if(count == 1) {
- control = TRANS_PREFETCH_RESPONSE;
- if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error in sending PREFETCH RESPONSE to Coordinator\n");
- return 1;
- }
- }
-
- /* Add the buffer size into buffer as a parameter */
- memcpy(buffer, &index, sizeof(unsigned int));
- /* Send the entire buffer with its size and oids found and not found */
- if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) {
- perror("Error sending oids found\n");
- return 1;
- }
+int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
+ int i, size, objsize, numoffset = 0;
+ int length;
+ char *recvbuffer, control;
+ unsigned int oid, mid=-1;
+ objheader_t *header;
+ oidmidpair_t oidmid;
+ int sd = -1;
+ while(1) {
+ recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int));
+ if(numoffset == -1)
+ break;
+ recv_data_buf((int)acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int));
+ oid = oidmid.oid;
+ if (mid != oidmid.mid) {
+ if (mid!=-1) {
+ freeSockWithLock(transPResponseSocketPool, mid, sd);
+ }
+ mid=oidmid.mid;
+ sd = getSockWithLock(transPResponseSocketPool, mid);
+ }
+ short offsetarry[numoffset];
+ recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
+
+ /*Process each oid */
+ if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found in buffer for later use */
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+ *((int *) (sendbuffer+sizeof(char))) = size;
+ *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid;
+ send_data(sd, sendbuffer, size+1);
+ } else { /* Object Found */
+ int incr = 1;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+ *((int *)(sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+ send_data(sd, sendbuffer, size+1);
+
+ /* Calculate the oid corresponding to the offset value */
+ for(i = 0 ; i< numoffset ; i++) {
+ /* Check for arrays */
+ if(TYPE(header) >= NUMCLASSES) {
+ int elementsize = classsize[TYPE(header)];
+ struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+ unsigned short length = ao->___length___;
+ /* Check if array out of bounds */
+ if(offsetarry[i]< 0 || offsetarry[i] >= length) {
+ break;
+ }
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
+ } else {
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
+ }
+
+ /* Don't continue if we hit a NULL pointer */
+ if (oid==0)
+ break;
+
+ if((header = mhashSearch(oid)) == NULL) {
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+ *((int *) (sendbuffer+1)) = size;
+ *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(char)+sizeof(int) + sizeof(char))) = oid;
+
+ send_data(sd, sendbuffer, size+1);
+ break;
+ } else { /* Obj Found */
+ int incr = 1;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+ *((int *)(sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+ send_data(sd, sendbuffer, size+1);
+ }
+ } //end of for
+ }
+ } //end of while
+ //Release socket
+ if (mid!=-1)
+ freeSockWithLock(transPResponseSocketPool, mid, sd);
+ return 0;
+}
+
+void sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
+ send_data(sd, control, sizeof(char));
+ /* Send the buffer with its size */
+ int length = *(size);
+ send_data(sd, sendbuffer, length);
+}
+
+void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
+ objheader_t *header;
+ unsigned int oid;
+ unsigned short newversion;
+ char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)];
+ int sd;
+ struct sockaddr_in remoteAddr;
+ int bytesSent;
+ int size;
+ int i = 0;
+
+ while(i < numoid) {
+ oid = *(oidarry + i);
+ if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
+ printf("Error: mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else {
+ /* Check to see if versions are same */
+checkversion:
+ if (write_trylock(STATUSPTR(header))) { // Can acquire write lock
+ newversion = header->version;
+ if(newversion == *(versionarry + i)) {
+ //Add to the notify list
+ if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
+ printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ write_unlock(STATUSPTR(header));
+ } else {
+ write_unlock(STATUSPTR(header));
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("processReqNotify():socket()");
+ return;
+ }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+ printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ close(sd);
+ return;
+ } else {
+ //Send Update notification
+ msg[0] = THREAD_NOTIFY_RESPONSE;
+ *((unsigned int *)&msg[1]) = oid;
+ size = sizeof(unsigned int);
+ *((unsigned short *)(&msg[1]+size)) = newversion;
+ size += sizeof(unsigned short);
+ *((unsigned int *)(&msg[1]+size)) = threadid;
+ size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
+ send_data(sd, msg, size);
+ }
+ close(sd);