d28ba1f7f18952b4a5e7870d71fda6c4dccb9799
[IRC.git] / Robust / src / Runtime / DSTM / interface / prefetch.c
1 #include "prefetch.h"
2 #include "prelookup.h"
3 #include "sockpool.h"
4 #include "gCollect.h"
5
6 extern sockPoolHashTable_t *transPrefetchSockPool;
7 extern unsigned int myIpAddr;
8 extern sockPoolHashTable_t *transPResponseSocketPool;
9 extern pthread_mutex_t prefetchcache_mutex;
10 extern prehashtable_t pflookup;
11
12
13 // Function for new prefetch call
14 void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
15   /* Allocate memory in prefetch queue and push the block there */
16   int qnodesize = sizeof(unsigned int) + sizeof(unsigned short) + numoffset * sizeof(short);
17   char *node = (char *) getmemory(qnodesize);
18   if(node == NULL)
19     return;
20   int index = 0;
21   ((unsigned int *)node)[0] = oid;
22   index = index + (sizeof(unsigned int));
23   *((short *)(node+index)) = numoffset;
24   index = index + (sizeof(short));
25   memcpy(node+index, offsets, numoffset * sizeof(short));
26   movehead(qnodesize);
27 }
28
29 void *transPrefetchNew() {
30   while(1) {
31     /* Read from prefetch queue */
32     void *node = gettail();
33
34     /* Check tuples if they are found locally */
35     perMcPrefetchList_t* pilehead = processLocal(node);
36
37     if (pilehead!=NULL) {
38
39       /* Send  Prefetch Request */
40       perMcPrefetchList_t *ptr = pilehead;
41       while(ptr != NULL) {
42         // Get sock from shared pool
43         int sd = getSock2(transPrefetchSockPool, ptr->mid);
44         sendRangePrefetchReq(ptr, sd, myIpAddr);
45         ptr = ptr->next;
46       }
47
48       /* Deallocated pilehead */
49       proPrefetchQDealloc(pilehead);
50     }
51     // Deallocate the prefetch queue pile node
52     inctail();
53   }
54 }
55
56 perMcPrefetchList_t *processLocal(char *ptr) {
57   unsigned int oid = *(GET_OID(ptr));
58   short numoffset = *(GET_NUM_OFFSETS(ptr));
59   short *offsetarray = GET_OFFSETS(ptr);
60   int top;
61   unsigned int dfsList[numoffset];
62   int offstop=numoffset-2;
63
64   /* Initialize */
65   perMcPrefetchList_t *head = NULL;
66
67   objheader_t * header = searchObj(oid);
68   if (header==NULL) {
69     //forward prefetch
70     int machinenum = lhashSearch(oid);
71     insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
72     return head;
73   }
74   dfsList[0]=oid;
75   dfsList[1]=0;
76
77
78   //Start searching the dfsList
79   for(top=0; top>=0;) {
80     oid=getNextOid(header, offsetarray, dfsList, top);
81     if (oid&1) {
82       top+=2;
83       dfsList[top]=oid;
84       dfsList[top+1]=0;
85       header=searchObj(oid);
86       if (header==NULL) {
87         //forward prefetch
88         int machinenum = lhashSearch(dfsList[top]);
89         insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
90       } else if (top<offstop)
91         //okay to continue going down
92         continue;
93     } else if (oid==2) {
94       //send prefetch first
95       int objindex=top+2;
96       int machinenum = lhashSearch(dfsList[objindex]);
97       insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
98     }
99     //oid is 0
100     //go backwards until we can increment
101     do {
102       do {
103         top-=2;
104         if (top<0)
105           return head;
106       } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
107
108       header=searchObj(dfsList[top]);
109       //header shouldn't be null unless the object moves away, but allow
110       //ourselves the option to just continue on if we lose the object
111     } while(header==NULL);
112     //increment
113     dfsList[top+1]++;
114   }
115   return head;
116 }
117
118 perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int sd, short numoffset) {
119   int top;
120   unsigned int dfsList[numoffset];
121
122   /* Initialize */
123   perMcPrefetchList_t *head = NULL;
124
125   objheader_t * header = searchObj(oid);
126   int offstop=numoffset-2;
127   if (header==NULL) {
128     //forward prefetch
129     int machinenum = lhashSearch(oid);
130     insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
131     return head;
132   } else {
133     sendOidFound(header, oid, sd);
134   }
135
136   dfsList[0]=oid;
137   dfsList[1]=0;
138
139   //Start searching the dfsList
140   for(top=0; top>=0;) {
141     oid=getNextOid(header, offsetarray, dfsList, top);
142
143     if (oid&1) {
144       top+=2;
145       dfsList[top]=oid;
146       dfsList[top+1]=0;
147       header=searchObj(oid);
148       if (header==NULL) {
149         //forward prefetch
150         int machinenum = lhashSearch(dfsList[top]);
151         insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
152       } else {
153         sendOidFound(header, oid, sd);
154         if (top<offstop)
155           //okay to continue going down
156           continue;
157       }
158     } else if (oid==2) {
159       //send prefetch first
160       int objindex=top+2;
161       int machinenum = lhashSearch(dfsList[objindex]);
162       insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
163     }
164     //oid is 0
165     //go backwards until we can increment
166     do {
167       do {
168         top-=2;
169         if (top<0)
170           return head;
171       } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
172
173       header=searchObj(dfsList[top]);
174       //header shouldn't be null unless the object moves away, but allow
175       //ourselves the option to just continue on if we lose the object
176     } while(header==NULL);
177     //increment
178     dfsList[top+1]++;
179   }
180   return head;
181 }
182
183
184 INLINE objheader_t *searchObj(unsigned int oid) {
185   objheader_t *header;
186   if ((header = (objheader_t *)mhashSearch(oid)) != NULL) {
187     return header;
188   } else
189     return prehashSearch(oid);
190 }
191
192 /* Delete perMcPrefetchList_t and everything it points to */
193 void proPrefetchQDealloc(perMcPrefetchList_t *node) {
194   while (node != NULL) {
195     perMcPrefetchList_t * prefetchpile_next_ptr = node;
196     while(node->list != NULL) {
197       //offsets aren't owned by us, so we don't free them.
198       objOffsetPile_t * objpile_ptr = node->list;
199       node->list = objpile_ptr->next;
200       free(objpile_ptr);
201     }
202     node = prefetchpile_next_ptr->next;
203     free(prefetchpile_next_ptr);
204   }
205 }
206
207 void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) {
208   perMcPrefetchList_t *ptr;
209   objOffsetPile_t *objnode;
210   objOffsetPile_t **tmp;
211
212   char ptr1[50];
213   midtoIP(mid, ptr1);
214   //Loop through the machines
215   for(; 1; head=&((*head)->next)) {
216     int tmid;
217     if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
218       perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t));
219       tmp->mid = mid;
220       objnode =  malloc(sizeof(objOffsetPile_t));
221       objnode->offsets = offsets;
222       objnode->oid = oid;
223       objnode->numoffset = numoffset;
224       objnode->next = NULL;
225       tmp->list = objnode;
226       tmp->next = *head;
227       *head=tmp;
228       return;
229     }
230
231     //keep looking
232     if (tmid < mid)
233       continue;
234
235     //found mid list
236     for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) {
237       int toid;
238       int matchstatus;
239
240       if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
241         objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
242         objnode->offsets = offsets;
243         objnode->oid = oid;
244         objnode->numoffset = numoffset;
245         objnode->next = *tmp;
246         *tmp = objnode;
247         return;
248       }
249       if (toid < oid)
250         continue;
251
252       /* Fill list DS */
253       int i;
254       int onumoffset=(*tmp)->numoffset;
255       short * ooffset=(*tmp)->offsets;
256
257       for(i=0; i<numoffset; i++) {
258         if (i>onumoffset) {
259           //We've matched, let's just extend the current prefetch
260           (*tmp)->numoffset=numoffset;
261           (*tmp)->offsets=offsets;
262           return;
263         }
264         if (ooffset[i]<offsets[i]) {
265           goto oidloop;
266         } else if (ooffset[i]>offsets[i]) {
267           //Place item before the current one
268           objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
269           objnode->offsets = offsets;
270           objnode->oid = oid;
271           objnode->numoffset = numoffset;
272           objnode->next = *tmp;
273           *tmp = objnode;
274           return;
275         }
276       }
277       //if we get to the end, we're already covered by this prefetch
278       return;
279 oidloop:
280       ;
281     }
282   }
283 }
284
285 void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd, unsigned int mid) {
286   int len, endpair;
287   char control;
288   objOffsetPile_t *tmp;
289
290   /* Send TRANS_PREFETCH control message */
291   control = TRANS_PREFETCH;
292   send_data(sd, &control, sizeof(char));
293
294   /* Send Oids and offsets in pairs */
295   tmp = mcpilenode->list;
296   while(tmp != NULL) {
297     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
298     char oidnoffset[len];
299     char *buf=oidnoffset;
300     *((int*)buf) = tmp->numoffset;
301     buf+=sizeof(int);
302     *((unsigned int *)buf) = tmp->oid;
303     buf+=sizeof(unsigned int);
304     *((unsigned int *)buf) = mid;
305     buf += sizeof(unsigned int);
306     memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
307     send_data(sd, oidnoffset, len);
308     tmp = tmp->next;
309   }
310
311   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
312   endpair = -1;
313   send_data(sd, &endpair, sizeof(int));
314   return;
315 }
316
317 int getRangePrefetchResponse(int sd) {
318   int length = 0;
319   recv_data(sd, &length, sizeof(int));
320   int size = length - sizeof(int);
321   char recvbuffer[size];
322   recv_data(sd, recvbuffer, size);
323   char control = *((char *) recvbuffer);
324   unsigned int oid;
325   if(control == OBJECT_FOUND) {
326     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
327     size = size - (sizeof(char) + sizeof(unsigned int));
328     pthread_mutex_lock(&prefetchcache_mutex);
329     void *ptr;
330     if((ptr = prefetchobjstrAlloc(size)) == NULL) {
331       printf("%s() Error: objstrAlloc error for copying into prefetch cache in line %d at %s\n",
332              __func__, __LINE__, __FILE__);
333       pthread_mutex_unlock(&prefetchcache_mutex);
334       return -1;
335     }
336     pthread_mutex_unlock(&prefetchcache_mutex);
337     memcpy(ptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
338     STATUS(ptr)=0;
339
340     /* Insert into prefetch hash lookup table */
341     void * oldptr;
342     if((oldptr = prehashSearch(oid)) != NULL) {
343       if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
344         prehashRemove(oid);
345         prehashInsert(oid, ptr);
346       }
347     } else {
348       prehashInsert(oid, ptr);
349     }
350     objheader_t *head = prehashSearch(oid);
351     pthread_mutex_lock(&pflookup.lock);
352     pthread_cond_broadcast(&pflookup.cond);
353     pthread_mutex_unlock(&pflookup.lock);
354   } else if(control == OBJECT_NOT_FOUND) {
355     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
356   } else {
357     printf("%s() Error: in Decoding the control value %d, %s\n", __func__, __LINE__, __FILE__);
358   }
359   return 0;
360 }
361
362 int rangePrefetchReq(int acceptfd) {
363   int numoffset, sd = -1;
364   unsigned int baseoid, mid = -1;
365   oidmidpair_t oidmid;
366
367   while (1) {
368     recv_data(acceptfd, &numoffset, sizeof(int));
369     if(numoffset == -1)
370       break;
371     recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int));
372     baseoid = oidmid.oid;
373     if(mid != oidmid.mid) {
374       if(mid!= -1)
375         freeSockWithLock(transPResponseSocketPool, mid, sd);
376       mid = oidmid.mid;
377       sd = getSockWithLock(transPResponseSocketPool, mid);
378     }
379     short offsetsarry[numoffset];
380     recv_data(acceptfd, offsetsarry, numoffset*sizeof(short));
381
382     perMcPrefetchList_t * pilehead=processRemote(baseoid, offsetsarry, sd, numoffset);
383
384     if (pilehead!= NULL) {
385       perMcPrefetchList_t *ptr = pilehead;
386       while(ptr != NULL) {
387         // Get sock from shared pool
388         int sd = getSock2(transPrefetchSockPool, ptr->mid);
389         sendRangePrefetchReq(ptr, sd, mid);
390         ptr = ptr->next;
391       }
392
393       proPrefetchQDealloc(pilehead);
394     }
395   }
396
397   //Release socket
398   if(mid!=-1)
399     freeSockWithLock(transPResponseSocketPool, mid, sd);
400   return 0;
401 }
402
403
404 unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top) {
405   int startindex= offsetarray[top+2];
406   int currcount = dfsList[top+1];
407   int range = GET_RANGE(offsetarray[top + 3]);
408
409   if(TYPE(header) > NUMCLASSES) {
410     //Array case
411     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
412     int stride = GET_STRIDE(offsetarray[top + 3])+1;
413     int length = ao->___length___;
414     int currindex;
415     //Check direction of stride
416     if(GET_STRIDEINC(offsetarray[top + 3])) {
417       //Negative
418       currindex=startindex-stride*currcount;
419       if (currindex<0)
420         return 0;
421
422       //Also have to check whether we will eventually index into array
423       if (currindex>=length) {
424         //Skip to the point that we will index into array
425         int delta=(currindex-length-1)/stride+1; //-1, +1 is to make sure that it rounds up
426         if ((delta+currcount)>range)
427           return 0;
428         currindex-=delta*stride;
429       }
430     } else {
431       //Going positive, compute current index
432       currindex=startindex+stride*currcount;
433       if(currindex >= length)
434         return 0;
435     }
436
437     int elementsize = classsize[TYPE(header)];
438     return *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + elementsize*currindex));
439   } else {
440     //handle fields
441
442     if(currcount!=0 & range != 0) {
443       //go to the next offset
444       header=searchObj(dfsList[top+2]);
445       if (header==NULL)
446         return 2;
447     }
448
449     return *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
450   }
451 }
452
453 int sendOidFound(objheader_t * header, unsigned int oid, int sd) {
454   int incr = 0;
455   int objsize;
456   GETSIZE(objsize, header);
457   int size  = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
458   char sendbuffer[size];
459   *((int *)(sendbuffer + incr)) = size;
460   incr += sizeof(int);
461   *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
462   incr += sizeof(char);
463   *((unsigned int *)(sendbuffer + incr)) = oid;
464   incr += sizeof(unsigned int);
465   memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
466
467   char control = TRANS_PREFETCH_RESPONSE;
468   sendPrefetchResponse(sd, &control, sendbuffer, &size);
469   return 0;
470 }
471
472 int sendOidNotFound(unsigned int oid, int sd) {
473   int size  = sizeof(int) + sizeof(char) + sizeof(unsigned int);
474   char sendbuffer[size];
475   *((int *)sendbuffer) = size;
476   *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
477   *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(unsigned int))) = oid;
478   char control = TRANS_PREFETCH_RESPONSE;
479   sendPrefetchResponse(sd, &control, sendbuffer, &size);
480   return 0;
481 }