-int prefetchReq(int acceptfd) {
- int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
- int length, sd;
- char *recvbuffer, *sendbuffer, control;
- unsigned int oid, mid;
- short *offsetarry;
- objheader_t *header;
- struct sockaddr_in remoteAddr;
-
- while((numbytes = recv((int)acceptfd, &length, sizeof(int), 0)) != 0) {
- if(length == -1) { //-1 is special character to represent end of sending oids and offsets
- break;
- } else {
- numbytes = 0;
- size = length - sizeof(int);
- if((recvbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- return -1;
- }
- while(numbytes < size) {
- numbytes += recv((int)acceptfd, recvbuffer+numbytes, size-numbytes, 0);
- }
-
- oid = *((unsigned int *) recvbuffer);
- mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
- size = size - (2 * sizeof(unsigned int));
- numoffset = size / sizeof(short);
- if((offsetarry = calloc(numoffset, sizeof(short))) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- free(recvbuffer);
- return -1;
- }
- memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
- free(recvbuffer);
-
- /* Create socket to send information */
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
- perror("prefetchReq():socket()");
- return -1;
- }
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
-
- if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
- printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- close(sd);
- return -1;
- }
-
- /*Process each oid */
- if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found in buffer for later use */
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- free(offsetarry);
- close(sd);
- return -1;
- }
- *((int *) sendbuffer) = size;
- *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
- *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- free(offsetarry);
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- } else { /* Object Found */
- int incr = 0;
- GETSIZE(objsize, header);
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- free(offsetarry);
- close(sd);
- return -1;
- }
- *((int *) (sendbuffer + incr)) = size;
- incr += sizeof(int);
- *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
- incr += sizeof(char);
- *((unsigned int *)(sendbuffer+incr)) = oid;
- incr += sizeof(unsigned int);
- memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- free(offsetarry);
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- /* Calculate the oid corresponding to the offset value */
- for(i = 0 ; i< numoffset ; i++) {
- /* Check for arrays */
- if(TYPE(header) > NUMCLASSES) {
- isArray = 1;
- }
- if(isArray == 1) {
- int elementsize = classsize[TYPE(header)];
- struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
- unsigned short length = ao->___length___;
- /* Check if array out of bounds */
- if(offsetarry[i]< 0 || offsetarry[i] >= length) {
- break;
- }
- oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
- } else {
- oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
- }
-
- if((header = mhashSearch(oid)) == NULL) {
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- free(offsetarry);
- close(sd);
- return -1;
- }
- *((int *) sendbuffer) = size;
- *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
- *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- free(offsetarry);
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- break;
- } else {/* Obj Found */
- int incr = 0;
- GETSIZE(objsize, header);
- size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
- if((sendbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
- free(offsetarry);
- close(sd);
- return -1;
- }
- *((int *) (sendbuffer + incr)) = size;
- incr += sizeof(int);
- *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
- incr += sizeof(char);
- *((unsigned int *)(sendbuffer+incr)) = oid;
- incr += sizeof(unsigned int);
- memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
-
- control = TRANS_PREFETCH_RESPONSE;
- if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- free(offsetarry);
- printf("Error: %s() in sending prefetch response at %s, %d\n",
- __func__, __FILE__, __LINE__);
- close(sd);
- return -1;
- }
- }
- isArray = 0;
- }
- free(offsetarry);
- }
- }
- }
- close(sd);
- return 0;
-}
-
-int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
- int numbytes = 0;
-
- if((numbytes = send(sd, control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- printf("%s() Error: in sending PREFETCH RESPONSE to Coordinator at %s, %d\n", __func__, __FILE__, __LINE__);
- free(sendbuffer);
- return -1;
+int prefetchReq(int acceptfd, struct readstruct * readbuffer) {
+ int i, size, objsize, numoffset = 0;
+ int length;
+ char *recvbuffer, control;
+ unsigned int oid, mid=-1;
+ objheader_t *header;
+ oidmidpair_t oidmid;
+ int sd = -1;
+ while(1) {
+ recv_data_buf((int)acceptfd, readbuffer, &numoffset, sizeof(int));
+ if(numoffset == -1)
+ break;
+ recv_data_buf((int)acceptfd, readbuffer, &oidmid, 2*sizeof(unsigned int));
+ oid = oidmid.oid;
+ if (mid != oidmid.mid) {
+ if (mid!=-1) {
+ freeSockWithLock(transPResponseSocketPool, mid, sd);
+ }
+ mid=oidmid.mid;
+ sd = getSockWithLock(transPResponseSocketPool, mid);
+ }
+ short offsetarry[numoffset];
+ recv_data_buf((int) acceptfd, readbuffer, offsetarry, numoffset*sizeof(short));
+
+ /*Process each oid */
+ if ((header = mhashSearch(oid)) == NULL) { /* Obj not found */
+ /* Save the oids not found in buffer for later use */
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+ *((int *) (sendbuffer+sizeof(char))) = size;
+ *((char *)(sendbuffer + sizeof(char)+sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char)+sizeof(char))) = oid;
+ send_data(sd, sendbuffer, size+1);
+ } else { /* Object Found */
+ int incr = 1;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ char sendbuffer[size+1];
+ sendbuffer[0]=TRANS_PREFETCH_RESPONSE;
+ *((int *)(sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+ send_data(sd, sendbuffer, size+1);
+
+ /* Calculate the oid corresponding to the offset value */
+ for(i = 0 ; i< numoffset ; i++) {
+ /* Check for arrays */
+ if(TYPE(header) >= NUMCLASSES) {
+ int elementsize = classsize[TYPE(header)];
+ struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
+ unsigned short length = ao->___length___;
+ /* Check if array out of bounds */
+ if(offsetarry[i]< 0 || offsetarry[i] >= length) {
+ break;
+ }
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
+ } else {
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));