1) tweak hash tables
[IRC.git] / Robust / src / Runtime / DSTM / interface / prefetch.c
1 #include "prefetch.h"
2 #include "prelookup.h"
3 #include "sockpool.h"
4 #include "gCollect.h"
5
6 extern sockPoolHashTable_t *transPrefetchSockPool;
7 extern unsigned int myIpAddr;
8 extern sockPoolHashTable_t *transPResponseSocketPool;
9 extern pthread_mutex_t prefetchcache_mutex;
10 extern prehashtable_t pflookup;
11
12
13 // Function for new prefetch call
14 void rangePrefetch(unsigned int oid, short numoffset, short *offsets) {
15   /* Allocate memory in prefetch queue and push the block there */
16   int qnodesize = sizeof(unsigned int) + sizeof(unsigned short) + numoffset * sizeof(short);
17   char *node = (char *) getmemory(qnodesize);
18   if(node == NULL)
19     return;
20   int index = 0;
21   ((unsigned int *)node)[0] = oid;
22   index = index + (sizeof(unsigned int));
23   *((short *)(node+index)) = numoffset;
24   index = index + (sizeof(short));
25   memcpy(node+index, offsets, numoffset * sizeof(short));
26   movehead(qnodesize);
27 }
28
29 void *transPrefetchNew() {
30   while(1) {
31     /* Read from prefetch queue */
32     void *node = gettail();
33
34     /* Check tuples if they are found locally */
35     perMcPrefetchList_t* pilehead = processLocal(node);
36
37     if (pilehead!=NULL) {
38
39       /* Send  Prefetch Request */
40       perMcPrefetchList_t *ptr = pilehead;
41       while(ptr != NULL) {
42         // Get sock from shared pool
43         int sd = getSock2(transPrefetchSockPool, ptr->mid);
44         sendRangePrefetchReq(ptr, sd, myIpAddr);
45         ptr = ptr->next;
46       }
47
48       /* Deallocated pilehead */
49       proPrefetchQDealloc(pilehead);
50     }
51     // Deallocate the prefetch queue pile node
52     inctail();
53   }
54 }
55
56 perMcPrefetchList_t *processLocal(char *ptr) {
57   unsigned int oid = *(GET_OID(ptr));
58   short numoffset = *(GET_NUM_OFFSETS(ptr));
59   short *offsetarray = GET_OFFSETS(ptr);
60   int top;
61   unsigned int dfsList[numoffset];
62   int offstop=numoffset-2;
63
64   /* Initialize */
65   perMcPrefetchList_t *head = NULL;
66
67   objheader_t * header = searchObj(oid);
68   if (header==NULL) {
69     //forward prefetch
70     int machinenum = lhashSearch(oid);
71     insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
72     return head;
73   }
74   dfsList[0]=oid;
75   dfsList[1]=0;
76   
77   
78   //Start searching the dfsList
79   for(top=0;top>=0;) {
80     oid=getNextOid(header, offsetarray, dfsList, top);
81     if (oid&1) {
82       top+=2;
83       dfsList[top]=oid;
84       dfsList[top+1]=0;
85       header=searchObj(oid);
86       if (header==NULL) {
87         //forward prefetch
88         int machinenum = lhashSearch(dfsList[top]);
89         insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
90       } else if (top<offstop)
91         //okay to continue going down
92         continue;
93     } else if (oid==2) {
94       //send prefetch first
95       int objindex=top+2;
96       int machinenum = lhashSearch(dfsList[objindex]);
97       insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
98     }
99     //oid is 0
100     //go backwards until we can increment
101     do {
102       do {
103         top-=2;
104         if (top<0)
105           return head;
106       } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
107       
108       header=searchObj(dfsList[top]);
109       //header shouldn't be null unless the object moves away, but allow
110       //ourselves the option to just continue on if we lose the object
111     } while(header==NULL);
112     //increment
113     dfsList[top+1]++;
114   }
115   return head;
116 }
117
118 perMcPrefetchList_t *processRemote(unsigned int oid,  short * offsetarray, int sd, short numoffset) {
119   int top;
120   unsigned int dfsList[numoffset];
121
122   /* Initialize */
123   perMcPrefetchList_t *head = NULL;
124
125   objheader_t * header = searchObj(oid);
126   int offstop=numoffset-2;
127   if (header==NULL) {
128     //forward prefetch
129     int machinenum = lhashSearch(oid);
130     insertPrefetch(machinenum, oid, numoffset, offsetarray, &head);
131   } else {
132     sendOidFound(header, oid, sd);
133   }
134   
135   dfsList[0]=oid;
136   dfsList[1]=0;
137   
138   //Start searching the dfsList
139   for(top=0;top>=0;) {
140     oid=getNextOid(header, offsetarray, dfsList, top);
141     if (oid&1) {
142       top+=2;
143       dfsList[top]=oid;
144       dfsList[top+1]=0;
145       header=searchObj(oid);
146       if (header==NULL) {
147         //forward prefetch
148         int machinenum = lhashSearch(dfsList[top]);
149         insertPrefetch(machinenum, dfsList[top], numoffset-top, &offsetarray[top], &head);
150       } else {
151         sendOidFound(header, oid, sd);
152         if (top<offstop)
153           //okay to continue going down
154           continue;
155       }
156     } else if (oid==2) {
157       //send prefetch first
158       int objindex=top+2;
159       int machinenum = lhashSearch(dfsList[objindex]);
160       insertPrefetch(machinenum, dfsList[objindex], numoffset-top, &offsetarray[top], &head);
161     }
162     //oid is 0
163     //go backwards until we can increment
164     do {
165       do {
166         top-=2;
167         if (top<0)
168           return head;
169       } while(dfsList[top+1] == GET_RANGE(offsetarray[top + 3]));
170       
171       header=searchObj(dfsList[top]);
172       //header shouldn't be null unless the object moves away, but allow
173       //ourselves the option to just continue on if we lose the object
174     } while(header!=NULL);
175     //increment
176     dfsList[top+1]++;
177   }
178   return head;
179 }
180
181
182 INLINE objheader_t *searchObj(unsigned int oid) {
183   objheader_t *header;
184   if ((header = (objheader_t *)mhashSearch(oid)) != NULL) {
185     return header;
186   } else 
187     return prehashSearch(oid);
188 }
189
190 /* Delete perMcPrefetchList_t and everything it points to */
191 void proPrefetchQDealloc(perMcPrefetchList_t *node) {
192   while (node != NULL) {
193     perMcPrefetchList_t * prefetchpile_next_ptr = node;
194     while(node->list != NULL) {
195       //offsets aren't owned by us, so we don't free them.
196       objOffsetPile_t * objpile_ptr = node->list;
197       node->list = objpile_ptr->next;
198       free(objpile_ptr);
199     }
200     node = prefetchpile_next_ptr->next;
201     free(prefetchpile_next_ptr);
202   }
203 }
204
205 void insertPrefetch(int mid, unsigned int oid, short numoffset, short *offsets, perMcPrefetchList_t **head) {
206   perMcPrefetchList_t *ptr;
207   objOffsetPile_t *objnode;
208   objOffsetPile_t **tmp;
209
210   //Loop through the machines
211   for(; 1; head=&((*head)->next)) {
212     int tmid;
213     if ((*head)==NULL||(tmid=(*head)->mid)>mid) {
214       perMcPrefetchList_t * tmp = (perMcPrefetchList_t *) malloc(sizeof(perMcPrefetchList_t));
215       tmp->mid = mid;
216       objnode =  malloc(sizeof(objOffsetPile_t));
217       objnode->offsets = offsets;
218       objnode->oid = oid;
219       objnode->numoffset = numoffset;
220       objnode->next = NULL;
221       tmp->list = objnode;
222       tmp->next = *head;
223       *head=tmp;
224       return;
225     }
226
227     //keep looking
228     if (tmid < mid)
229       continue;
230
231     //found mid list
232     for(tmp=&((*head)->list); 1; tmp=&((*tmp)->next)) {
233       int toid;
234       int matchstatus;
235
236       if ((*tmp)==NULL||((toid=(*tmp)->oid)>oid)) {
237         objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
238         objnode->offsets = offsets;
239         objnode->oid = oid;
240         objnode->numoffset = numoffset;
241         objnode->next = *tmp;
242         *tmp = objnode;
243         return;
244       }
245       if (toid < oid)
246         continue;
247
248       /* Fill list DS */
249       int i;
250       int onumoffset=(*tmp)->numoffset;
251       short * ooffset=(*tmp)->offsets;
252
253       for(i=0; i<numoffset; i++) {
254         if (i>onumoffset) {
255           //We've matched, let's just extend the current prefetch
256           (*tmp)->numoffset=numoffset;
257           (*tmp)->offsets=offsets;
258           return;
259         }
260         if (ooffset[i]<offsets[i]) {
261           goto oidloop;
262         } else if (ooffset[i]>offsets[i]) {
263           //Place item before the current one
264           objnode = (objOffsetPile_t *) malloc(sizeof(objOffsetPile_t));
265           objnode->offsets = offsets;
266           objnode->oid = oid;
267           objnode->numoffset = numoffset;
268           objnode->next = *tmp;
269           *tmp = objnode;
270           return;
271         }
272       }
273       //if we get to the end, we're already covered by this prefetch
274       return;
275 oidloop:
276       ;
277     }
278   }
279 }
280
281 void sendRangePrefetchReq(perMcPrefetchList_t *mcpilenode, int sd, unsigned int mid) {
282   int len, endpair;
283   char control;
284   objOffsetPile_t *tmp;
285
286   /* Send TRANS_PREFETCH control message */
287   control = TRANS_PREFETCH;
288   send_data(sd, &control, sizeof(char));
289
290   /* Send Oids and offsets in pairs */
291   tmp = mcpilenode->list;
292   while(tmp != NULL) {
293     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
294     char oidnoffset[len];
295     char *buf=oidnoffset;
296     *((int*)buf) = tmp->numoffset;
297     buf+=sizeof(int);
298     *((unsigned int *)buf) = tmp->oid;
299     buf+=sizeof(unsigned int);
300     *((unsigned int *)buf) = mid;
301     buf += sizeof(unsigned int);
302     memcpy(buf, tmp->offsets, (tmp->numoffset)*sizeof(short));
303     send_data(sd, oidnoffset, len);
304     tmp = tmp->next;
305   }
306
307   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
308   endpair = -1;
309   send_data(sd, &endpair, sizeof(int));
310   return;
311 }
312
313 int getRangePrefetchResponse(int sd) {
314   int length = 0;
315   recv_data(sd, &length, sizeof(int));
316   int size = length - sizeof(int);
317   char recvbuffer[size];
318   recv_data(sd, recvbuffer, size);
319   char control = *((char *) recvbuffer);
320   unsigned int oid;
321   if(control == OBJECT_FOUND) {
322     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
323     size = size - (sizeof(char) + sizeof(unsigned int));
324     pthread_mutex_lock(&prefetchcache_mutex);
325     void *ptr;
326     if((ptr = prefetchobjstrAlloc(size)) == NULL) {
327       printf("%s() Error: objstrAlloc error for copying into prefetch cache in line %d at %s\n",
328              __func__, __LINE__, __FILE__);
329       pthread_mutex_unlock(&prefetchcache_mutex);
330       return -1;
331     }
332     pthread_mutex_unlock(&prefetchcache_mutex);
333     memcpy(ptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
334     STATUS(ptr)=0;
335
336     /* Insert into prefetch hash lookup table */
337     void * oldptr;
338     if((oldptr = prehashSearch(oid)) != NULL) {
339       if(((objheader_t *)oldptr)->version <= ((objheader_t *)ptr)->version) {
340         prehashRemove(oid);
341         prehashInsert(oid, ptr);
342       }
343     } else {
344       prehashInsert(oid, ptr);
345     }
346     objheader_t *head = prehashSearch(oid);
347     pthread_mutex_lock(&pflookup.lock);
348     pthread_cond_broadcast(&pflookup.cond);
349     pthread_mutex_unlock(&pflookup.lock);
350   } else if(control == OBJECT_NOT_FOUND) {
351     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
352   } else {
353     printf("%s() Error: in Decoding the control value %d, %s\n", __func__, __LINE__, __FILE__);
354   }
355   return 0;
356 }
357
358 int rangePrefetchReq(int acceptfd) {
359   int numoffset, sd = -1;
360   unsigned int baseoid, mid = -1;
361   oidmidpair_t oidmid;
362
363   while (1) {
364     recv_data(acceptfd, &numoffset, sizeof(int));
365     if(numoffset == -1)
366       break;
367     recv_data(acceptfd, &oidmid, 2*sizeof(unsigned int));
368     baseoid = oidmid.oid;
369     if(mid != oidmid.mid) {
370       if(mid!= -1)
371         freeSockWithLock(transPResponseSocketPool, mid, sd);
372       mid = oidmid.mid;
373       sd = getSockWithLock(transPResponseSocketPool, mid);
374     }
375     short offsetsarry[numoffset];
376     recv_data(acceptfd, offsetsarry, numoffset*sizeof(short));
377
378     perMcPrefetchList_t * pilehead=processRemote(baseoid, offsetsarry, sd, numoffset);
379
380     if (pilehead!= NULL) {
381       perMcPrefetchList_t *ptr = pilehead;
382       while(ptr != NULL) {
383         // Get sock from shared pool
384         int sd = getSock2(transPrefetchSockPool, ptr->mid);
385         sendRangePrefetchReq(ptr, sd, mid);
386         ptr = ptr->next;
387       }
388
389       proPrefetchQDealloc(pilehead);
390     }
391   }
392
393   //Release socket
394   if(mid!=-1)
395     freeSockWithLock(transPResponseSocketPool, mid, sd);
396   return 0;
397 }
398
399
400 unsigned int getNextOid(objheader_t * header, short * offsetarray, unsigned int *dfsList, int top) {
401   int startindex= offsetarray[top+2];
402   int currcount = dfsList[top+1];
403   int range = GET_RANGE(offsetarray[top + 3]);
404   
405   if(TYPE(header) > NUMCLASSES) {
406     //Array case
407     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
408     int stride = GET_STRIDE(offsetarray[top + 3])+1;
409     int length = ao->___length___;
410     int currindex;
411     //Check direction of stride
412     if(GET_STRIDEINC(offsetarray[top + 3])) {
413       //Negative
414       currindex=startindex-stride*currcount;
415       if (currindex<0)
416         return 0;
417
418       //Also have to check whether we will eventually index into array
419       if (currindex>=length) {
420         //Skip to the point that we will index into array
421         int delta=(currindex-length-1)/stride+1; //-1, +1 is to make sure that it rounds up
422         if ((delta+currcount)>range)
423           return 0;
424         currindex-=delta*stride;
425       }
426     } else {
427       //Going positive, compute current index
428       currindex=startindex+stride*currcount;
429       if(currindex >= length)
430         return 0;
431     }
432
433     int elementsize = classsize[TYPE(header)];
434     return *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + elementsize*currindex));
435   } else { 
436     //handle fields
437
438     if(currcount!=0 & range != 0) { 
439       //go to the next offset
440       header=searchObj(dfsList[top]);
441       if (header==NULL)
442         return 2;
443     }
444
445     return *((unsigned int *)(((char *)header) + sizeof(objheader_t) + startindex));
446   }
447 }
448
449 int sendOidFound(objheader_t * header, unsigned int oid, int sd) {
450   int incr = 0;
451   int objsize;
452   GETSIZE(objsize, header);
453   int size  = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
454   char sendbuffer[size];
455   *((int *)(sendbuffer + incr)) = size;
456   incr += sizeof(int);
457   *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
458   incr += sizeof(char);
459   *((unsigned int *)(sendbuffer + incr)) = oid;
460   incr += sizeof(unsigned int);
461   memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
462
463   char control = TRANS_PREFETCH_RESPONSE;
464   sendPrefetchResponse(sd, &control, sendbuffer, &size);
465   return 0;
466 }
467
468 int sendOidNotFound(unsigned int oid, int sd) {
469   int size  = sizeof(int) + sizeof(char) + sizeof(unsigned int);
470   char sendbuffer[size];
471   *((int *)sendbuffer) = size;
472   *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
473   *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(unsigned int))) = oid;
474   char control = TRANS_PREFETCH_RESPONSE;
475   sendPrefetchResponse(sd, &control, sendbuffer, &size);
476   return 0;
477 }