6 extern sockPoolHashTable_t *transPrefetchSockPool;
7 extern unsigned int myIpAddr;
8 extern sockPoolHashTable_t *transPResponseSocketPool;
9 extern pthread_mutex_t prefetchcache_mutex;
10 extern prehashtable_t pflookup;
13 // Function for new prefetch call
14 void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
15 /* Allocate memory in prefetch queue and push the block there */
16 int qnodesize = sizeof(unsigned int) + sizeof(unsigned short) + numoffset * sizeof(short);
17 char *node = (char *) getmemory(qnodesize);
21 ((unsigned int *)node)[0] = oid;
22 index = index + (sizeof(unsigned int));
23 *((short *)(node+index)) = numoffset;
24 index = index + (sizeof(short));
25 memcpy(node+index, offsets, numoffset * sizeof(short));
29 void *transPrefetchNew() {
31 /* Read from prefetch queue */
32 void *node = gettail();
34 /* Check tuples if they are found locally */
35 perMcPrefetchList_t* pilehead = processLocal(node);
39 /* Send Prefetch Request */
40 perMcPrefetchList_t *ptr = pilehead;
42 // Get sock from shared pool
43 int sd = getSock2(transPrefetchSockPool, ptr->mid);
44 sendRangePrefetchReq(ptr, sd, myIpAddr);
48 /* Deallocated pilehead */
49 proPrefetchQDealloc(pilehead);
51 // Deallocate the prefetch queue pile node
56 perMcPrefetchList_t *processLocal(char *ptr) {
57 unsigned int oid = *(GET_OID(ptr));
58 short numoffset = *(GET_NUM_OFFSETS(ptr));
59 short *offsetarray = GET_OFFSETS(ptr);
61 unsigned int dfsList[numoffset];
62 int offstop=numoffset-2;
65 perMcPrefetchList_t *head = NULL;
67 objheader_t * header = searchObj(oid);
70 int machinenum = lhashSearch(oid);
71 insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
78 //Start searching the dfsList
80 oid=getNextOid(header, offsetarray, dfsList, top);
85 header=searchObj(oid);
88 int machinenum = lhashSearch(dfsList[top]);
89 insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
90 } else if (top<offstop)
91 //okay to continue going down
96 int machinenum = lhashSearch(dfsList[objindex]);
97 insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
100 //go backwards until we can increment
106 } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
108 header=searchObj(dfsList[top]);
109 //header shouldn't be null unless the object moves away, but allow
110 //ourselves the option to just continue on if we lose the object
111 } while(header==NULL);
118 perMcPrefetchList_t *processRemote(unsigned int oid, short * offsetarray, int sd, short numoffset) {
120 unsigned int dfsList[numoffset];
123 perMcPrefetchList_t *head = NULL;
125 objheader_t * header = searchObj(oid);
126 int offstop=numoffset-2;
129 int machinenum = lhashSearch(oid);
130 insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
132 sendOidFound(header, oid, sd);
138 //Start searching the dfsList
140 oid=getNextOid(header, offsetarray, dfsList, top);
145 header=searchObj(oid);
148 int machinenum = lhashSearch(dfsList[top]);
149 insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
151 sendOidFound(header, oid, sd);
153 //okay to continue going down
157 //send prefetch first
159 int machinenum = lhashSearch(dfsList[objindex]);
160 insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
163 //go backwards until we can increment
169 } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
171 header=searchObj(dfsList[top]);
172 //header shouldn't be null unless the object moves away, but allow
173 //ourselves the option to just continue on if we lose the object
174 } while(header!=NULL);
182 INLINE objheader_t *searchObj(unsigned int oid) {
184 if ((header = (objheader_t *)mhashSearch(oid)) != NULL) {
187 return prehashSearch(oid);
190 /* Delete perMcPrefetchList_t and everything it points to */
191 void proPrefetchQDealloc(perMcPrefetchList_t *node) {
192 while (node != NULL) {
193 perMcPrefetchList_t * prefetchpile_next_ptr = node;
194 while(node->list != NULL) {
195 //offsets aren't owned by us, so we don't free them.
196 objOffsetPile_t * objpile_ptr = node->list;
197 node->list = objpile_ptr->next;
200 node = prefetchpile_next_ptr->next;
201 free(prefetchpile_next_ptr);
205 void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) {
206 perMcPrefetchList_t *ptr;
207 objOffsetPile_t *objnode;
208 objOffsetPile_t **tmp;
210 //Loop through the machines
211 for(; 1; head=&((*head)->next)) {
213 if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
214 perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t));
216 objnode = malloc(sizeof(objOffsetPile_t));
217 objnode->offsets = offsets;
219 objnode->numoffset = numoffset;
220 objnode->next = NULL;
232 for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) {
236 if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
237 objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
238 objnode->offsets = offsets;
240 objnode->numoffset = numoffset;
241 objnode->next = *tmp;
250 int onumoffset=(*tmp)->numoffset;
251 short * ooffset=(*tmp)->offsets;
253 for(i=0; i<numoffset; i++) {
255 //We've matched, let's just extend the current prefetch
256 (*tmp)->numoffset=numoffset;
257 (*tmp)->offsets=offsets;
260 if (ooffset[i]<offsets[i]) {
262 } else if (ooffset[i]>offsets[i]) {
263 //Place item before the current one
264 objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
265 objnode->offsets = offsets;
267 objnode->numoffset = numoffset;
268 objnode->next = *tmp;
273 //if we get to the end, we're already covered by this prefetch
281 void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd, unsigned int mid) {
284 objOffsetPile_t *tmp;
286 /* Send TRANS_PREFETCH control message */
287 control = TRANS_PREFETCH;
288 send_data(sd, &control, sizeof(char));
290 /* Send Oids and offsets in pairs */
291 tmp = mcpilenode->list;
293 len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
294 char oidnoffset[len];
295 char *buf=oidnoffset;
296 *((int*)buf) = tmp->numoffset;
298 *((unsigned int *)buf) = tmp->oid;
299 buf+=sizeof(unsigned int);
300 *((unsigned int *)buf) = mid;
301 buf += sizeof(unsigned int);
302 memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
303 send_data(sd, oidnoffset, len);
307 /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
309 send_data(sd, &endpair, sizeof(int));
313 int getRangePrefetchResponse(int sd) {
315 recv_data(sd, &length, sizeof(int));
316 int size = length - sizeof(int);
317 char recvbuffer[size];
318 recv_data(sd, recvbuffer, size);
319 char control = *((char *) recvbuffer);
321 if(control == OBJECT_FOUND) {
322 oid = *((unsigned int *)(recvbuffer + sizeof(char)));
323 size = size - (sizeof(char) + sizeof(unsigned int));
324 pthread_mutex_lock(&prefetchcache_mutex);
326 if((ptr = prefetchobjstrAlloc(size)) == NULL) {
327 printf("%s() Error: objstrAlloc error for copying into prefetch cache in line %d at %s\n",
328 __func__, __LINE__, __FILE__);
329 pthread_mutex_unlock(&prefetchcache_mutex);
332 pthread_mutex_unlock(&prefetchcache_mutex);
333 memcpy(ptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
336 /* Insert into prefetch hash lookup table */
338 if((oldptr = prehashSearch(oid)) != NULL) {
339 if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
341 prehashInsert(oid, ptr);
344 prehashInsert(oid, ptr);
346 objheader_t *head = prehashSearch(oid);
347 pthread_mutex_lock(&pflookup.lock);
348 pthread_cond_broadcast(&pflookup.cond);
349 pthread_mutex_unlock(&pflookup.lock);
350 } else if(control == OBJECT_NOT_FOUND) {
351 oid = *((unsigned int *)(recvbuffer + sizeof(char)));
353 printf("%s() Error: in Decoding the control value %d, %s\n", __func__, __LINE__, __FILE__);
358 int rangePrefetchReq(int acceptfd) {
359 int numoffset, sd = -1;
360 unsigned int baseoid, mid = -1;
364 recv_data(acceptfd, &numoffset, sizeof(int));
367 recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int));
368 baseoid = oidmid.oid;
369 if(mid != oidmid.mid) {
371 freeSockWithLock(transPResponseSocketPool, mid, sd);
373 sd = getSockWithLock(transPResponseSocketPool, mid);
375 short offsetsarry[numoffset];
376 recv_data(acceptfd, offsetsarry, numoffset*sizeof(short));
378 perMcPrefetchList_t * pilehead=processRemote(baseoid, offsetsarry, sd, numoffset);
380 if (pilehead!= NULL) {
381 perMcPrefetchList_t *ptr = pilehead;
383 // Get sock from shared pool
384 int sd = getSock2(transPrefetchSockPool, ptr->mid);
385 sendRangePrefetchReq(ptr, sd, mid);
389 proPrefetchQDealloc(pilehead);
395 freeSockWithLock(transPResponseSocketPool, mid, sd);
400 unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top) {
401 int startindex= offsetarray[top+2];
402 int currcount = dfsList[top+1];
403 int range = GET_RANGE(offsetarray[top + 3]);
405 if(TYPE(header) > NUMCLASSES) {
407 struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
408 int stride = GET_STRIDE(offsetarray[top + 3])+1;
409 int length = ao->___length___;
411 //Check direction of stride
412 if(GET_STRIDEINC(offsetarray[top + 3])) {
414 currindex=startindex-stride*currcount;
418 //Also have to check whether we will eventually index into array
419 if (currindex>=length) {
420 //Skip to the point that we will index into array
421 int delta=(currindex-length-1)/stride+1; //-1, +1 is to make sure that it rounds up
422 if ((delta+currcount)>range)
424 currindex-=delta*stride;
427 //Going positive, compute current index
428 currindex=startindex+stride*currcount;
429 if(currindex >= length)
433 int elementsize = classsize[TYPE(header)];
434 return *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + elementsize*currindex));
438 if(currcount!=0 & range != 0) {
439 //go to the next offset
440 header=searchObj(dfsList[top]);
445 return *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
449 int sendOidFound(objheader_t * header, unsigned int oid, int sd) {
452 GETSIZE(objsize, header);
453 int size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
454 char sendbuffer[size];
455 *((int *)(sendbuffer + incr)) = size;
457 *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
458 incr += sizeof(char);
459 *((unsigned int *)(sendbuffer + incr)) = oid;
460 incr += sizeof(unsigned int);
461 memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
463 char control = TRANS_PREFETCH_RESPONSE;
464 sendPrefetchResponse(sd, &control, sendbuffer, &size);
468 int sendOidNotFound(unsigned int oid, int sd) {
469 int size = sizeof(int) + sizeof(char) + sizeof(unsigned int);
470 char sendbuffer[size];
471 *((int *)sendbuffer) = size;
472 *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
473 *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(unsigned int))) = oid;
474 char control = TRANS_PREFETCH_RESPONSE;
475 sendPrefetchResponse(sd, &control, sendbuffer, &size);