6 extern sockPoolHashTable_t *transPrefetchSockPool;
7 extern unsigned int myIpAddr;
8 extern sockPoolHashTable_t *transPResponseSocketPool;
9 extern pthread_mutex_t prefetchcache_mutex;
10 extern prehashtable_t pflookup;
13 // Function for new prefetch call
14 void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
15 /* Allocate memory in prefetch queue and push the block there */
16 int qnodesize = sizeof(unsigned int) + sizeof(unsigned short) + numoffset * sizeof(short);
17 char *node = (char *) getmemory(qnodesize);
21 ((unsigned int *)node)[0] = oid;
22 index = index + (sizeof(unsigned int));
23 *((short *)(node+index)) = numoffset;
24 index = index + (sizeof(short));
25 memcpy(node+index, offsets, numoffset * sizeof(short));
29 void *transPrefetchNew() {
31 /* Read from prefetch queue */
32 void *node = gettail();
34 /* Check tuples if they are found locally */
35 perMcPrefetchList_t* pilehead = processLocal(node);
39 /* Send Prefetch Request */
40 perMcPrefetchList_t *ptr = pilehead;
42 // Get sock from shared pool
43 int sd = getSock2(transPrefetchSockPool, ptr->mid);
44 sendRangePrefetchReq(ptr, sd, myIpAddr);
48 /* Deallocated pilehead */
49 proPrefetchQDealloc(pilehead);
51 // Deallocate the prefetch queue pile node
56 perMcPrefetchList_t *processLocal(char *ptr) {
57 unsigned int oid = *(GET_OID(ptr));
58 short numoffset = *(GET_NUM_OFFSETS(ptr));
59 short *offsetarray = GET_OFFSETS(ptr);
61 unsigned int dfsList[numoffset];
62 int offstop=numoffset-2;
65 perMcPrefetchList_t *head = NULL;
67 objheader_t * header = searchObj(oid);
70 int machinenum = lhashSearch(oid);
71 insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
78 //Start searching the dfsList
80 oid=getNextOid(header, offsetarray, dfsList, top);
85 header=searchObj(oid);
88 int machinenum = lhashSearch(dfsList[top]);
89 insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
90 } else if (top<offstop)
91 //okay to continue going down
96 int machinenum = lhashSearch(dfsList[objindex]);
97 insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
100 //go backwards until we can increment
106 } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
108 header=searchObj(dfsList[top]);
109 //header shouldn't be null unless the object moves away, but allow
110 //ourselves the option to just continue on if we lose the object
111 } while(header==NULL);
118 perMcPrefetchList_t *processRemote(unsigned int oid, short * offsetarray, int sd, short numoffset) {
120 unsigned int dfsList[numoffset];
123 perMcPrefetchList_t *head = NULL;
125 objheader_t * header = searchObj(oid);
126 int offstop=numoffset-2;
129 int machinenum = lhashSearch(oid);
130 insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
133 sendOidFound(header, oid, sd);
139 //Start searching the dfsList
140 for(top=0; top>=0;) {
141 oid=getNextOid(header, offsetarray, dfsList, top);
147 header=searchObj(oid);
150 int machinenum = lhashSearch(dfsList[top]);
151 insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
153 sendOidFound(header, oid, sd);
155 //okay to continue going down
159 //send prefetch first
161 int machinenum = lhashSearch(dfsList[objindex]);
162 insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
165 //go backwards until we can increment
171 } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
173 header=searchObj(dfsList[top]);
174 //header shouldn't be null unless the object moves away, but allow
175 //ourselves the option to just continue on if we lose the object
176 } while(header==NULL);
184 INLINE objheader_t *searchObj(unsigned int oid) {
186 if ((header = (objheader_t *)mhashSearch(oid)) != NULL) {
189 return prehashSearch(oid);
192 /* Delete perMcPrefetchList_t and everything it points to */
193 void proPrefetchQDealloc(perMcPrefetchList_t *node) {
194 while (node != NULL) {
195 perMcPrefetchList_t * prefetchpile_next_ptr = node;
196 while(node->list != NULL) {
197 //offsets aren't owned by us, so we don't free them.
198 objOffsetPile_t * objpile_ptr = node->list;
199 node->list = objpile_ptr->next;
202 node = prefetchpile_next_ptr->next;
203 free(prefetchpile_next_ptr);
207 void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) {
208 perMcPrefetchList_t *ptr;
209 objOffsetPile_t *objnode;
210 objOffsetPile_t **tmp;
214 //Loop through the machines
215 for(; 1; head=&((*head)->next)) {
217 if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
218 perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t));
220 objnode = malloc(sizeof(objOffsetPile_t));
221 objnode->offsets = offsets;
223 objnode->numoffset = numoffset;
224 objnode->next = NULL;
236 for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) {
240 if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
241 objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
242 objnode->offsets = offsets;
244 objnode->numoffset = numoffset;
245 objnode->next = *tmp;
254 int onumoffset=(*tmp)->numoffset;
255 short * ooffset=(*tmp)->offsets;
257 for(i=0; i<numoffset; i++) {
259 //We've matched, let's just extend the current prefetch
260 (*tmp)->numoffset=numoffset;
261 (*tmp)->offsets=offsets;
264 if (ooffset[i]<offsets[i]) {
266 } else if (ooffset[i]>offsets[i]) {
267 //Place item before the current one
268 objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
269 objnode->offsets = offsets;
271 objnode->numoffset = numoffset;
272 objnode->next = *tmp;
277 //if we get to the end, we're already covered by this prefetch
285 void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd, unsigned int mid) {
288 objOffsetPile_t *tmp;
290 /* Send TRANS_PREFETCH control message */
291 control = TRANS_PREFETCH;
292 send_data(sd, &control, sizeof(char));
294 /* Send Oids and offsets in pairs */
295 tmp = mcpilenode->list;
297 len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
298 char oidnoffset[len];
299 char *buf=oidnoffset;
300 *((int*)buf) = tmp->numoffset;
302 *((unsigned int *)buf) = tmp->oid;
303 buf+=sizeof(unsigned int);
304 *((unsigned int *)buf) = mid;
305 buf += sizeof(unsigned int);
306 memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
307 send_data(sd, oidnoffset, len);
311 /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
313 send_data(sd, &endpair, sizeof(int));
317 int getRangePrefetchResponse(int sd) {
319 recv_data(sd, &length, sizeof(int));
320 int size = length - sizeof(int);
321 char recvbuffer[size];
322 recv_data(sd, recvbuffer, size);
323 char control = *((char *) recvbuffer);
325 if(control == OBJECT_FOUND) {
326 oid = *((unsigned int *)(recvbuffer + sizeof(char)));
327 size = size - (sizeof(char) + sizeof(unsigned int));
328 pthread_mutex_lock(&prefetchcache_mutex);
330 if((ptr = prefetchobjstrAlloc(size)) == NULL) {
331 printf("%s() Error: objstrAlloc error for copying into prefetch cache in line %d at %s\n",
332 __func__, __LINE__, __FILE__);
333 pthread_mutex_unlock(&prefetchcache_mutex);
336 pthread_mutex_unlock(&prefetchcache_mutex);
337 memcpy(ptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
340 /* Insert into prefetch hash lookup table */
342 if((oldptr = prehashSearch(oid)) != NULL) {
343 if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
345 prehashInsert(oid, ptr);
348 prehashInsert(oid, ptr);
350 objheader_t *head = prehashSearch(oid);
351 pthread_mutex_lock(&pflookup.lock);
352 pthread_cond_broadcast(&pflookup.cond);
353 pthread_mutex_unlock(&pflookup.lock);
354 } else if(control == OBJECT_NOT_FOUND) {
355 oid = *((unsigned int *)(recvbuffer + sizeof(char)));
357 printf("%s() Error: in Decoding the control value %d, %s\n", __func__, __LINE__, __FILE__);
362 int rangePrefetchReq(int acceptfd) {
363 int numoffset, sd = -1;
364 unsigned int baseoid, mid = -1;
368 recv_data(acceptfd, &numoffset, sizeof(int));
371 recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int));
372 baseoid = oidmid.oid;
373 if(mid != oidmid.mid) {
375 freeSockWithLock(transPResponseSocketPool, mid, sd);
377 sd = getSockWithLock(transPResponseSocketPool, mid);
379 short offsetsarry[numoffset];
380 recv_data(acceptfd, offsetsarry, numoffset*sizeof(short));
382 perMcPrefetchList_t * pilehead=processRemote(baseoid, offsetsarry, sd, numoffset);
384 if (pilehead!= NULL) {
385 perMcPrefetchList_t *ptr = pilehead;
387 // Get sock from shared pool
388 int sd = getSock2(transPrefetchSockPool, ptr->mid);
389 sendRangePrefetchReq(ptr, sd, mid);
393 proPrefetchQDealloc(pilehead);
399 freeSockWithLock(transPResponseSocketPool, mid, sd);
404 unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top) {
405 int startindex= offsetarray[top+2];
406 int currcount = dfsList[top+1];
407 int range = GET_RANGE(offsetarray[top + 3]);
409 if(TYPE(header) > NUMCLASSES) {
411 struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
412 int stride = GET_STRIDE(offsetarray[top + 3])+1;
413 int length = ao->___length___;
415 //Check direction of stride
416 if(GET_STRIDEINC(offsetarray[top + 3])) {
418 currindex=startindex-stride*currcount;
422 //Also have to check whether we will eventually index into array
423 if (currindex>=length) {
424 //Skip to the point that we will index into array
425 int delta=(currindex-length-1)/stride+1; //-1, +1 is to make sure that it rounds up
426 if ((delta+currcount)>range)
428 currindex-=delta*stride;
431 //Going positive, compute current index
432 currindex=startindex+stride*currcount;
433 if(currindex >= length)
437 int elementsize = classsize[TYPE(header)];
438 return *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + elementsize*currindex));
442 if(currcount!=0 & range != 0) {
443 //go to the next offset
444 header=searchObj(dfsList[top+2]);
449 return *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
453 int sendOidFound(objheader_t * header, unsigned int oid, int sd) {
456 GETSIZE(objsize, header);
457 int size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
458 char sendbuffer[size];
459 *((int *)(sendbuffer + incr)) = size;
461 *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
462 incr += sizeof(char);
463 *((unsigned int *)(sendbuffer + incr)) = oid;
464 incr += sizeof(unsigned int);
465 memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
467 char control = TRANS_PREFETCH_RESPONSE;
468 sendPrefetchResponse(sd, &control, sendbuffer, &size);
472 int sendOidNotFound(unsigned int oid, int sd) {
473 int size = sizeof(int) + sizeof(char) + sizeof(unsigned int);
474 char sendbuffer[size];
475 *((int *)sendbuffer) = size;
476 *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
477 *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(unsigned int))) = oid;
478 char control = TRANS_PREFETCH_RESPONSE;
479 sendPrefetchResponse(sd, &control, sendbuffer, &size);