changes to copy remote objs read into prefetch cache
[IRC.git] / Robust / src / Runtime / DSTM / interface / addPrefetchEnhance.c
1 #include "addPrefetchEnhance.h"
2 #include "prelookup.h"
3
4 extern int numprefetchsites; // Number of prefetch sites
5 extern pfcstats_t *evalPrefetch; //Global array that keeps track of operation mode (ON/OFF) for each prefetch site
6 extern objstr_t *prefetchcache; //Global Prefetch cache
7 extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache
8 extern unsigned int myIpAddr;
9
10 //#define LOGEVENTS
11 #ifdef LOGEVENTS
12 extern char bigarray[16*1024*1024];
13 extern int bigindex;
14 #define LOGEVENT(x) { \
15   int tmp=bigindex++;                         \
16   bigarray[tmp]=x;                            \
17   }
18 #else
19 #define LOGEVENT(x)
20 #endif
21
22
23 /* This function creates and initializes the
24  * evalPrefetch global array */
25 pfcstats_t *initPrefetchStats() {
26   pfcstats_t *ptr;
27   if((ptr = calloc(numprefetchsites, sizeof(pfcstats_t))) == NULL) {
28     printf("%s() Calloc error in %s at line %d\n", __func__, __FILE__, __LINE__);
29     return NULL;
30   }
31   int i;
32   /* Enable prefetching at the beginning */
33   for(i=0; i<numprefetchsites; i++) {
34     ptr[i].operMode = 1;
35     ptr[i].callcount = 0;
36     ptr[i].retrycount = RETRYINTERVAL; //N
37     ptr[i].uselesscount = SHUTDOWNINTERVAL; //M
38   }
39   return ptr;
40 }
41
42 int getRetryCount(int siteid) {
43   return evalPrefetch[siteid].retrycount;
44 }
45
46 int getUselessCount(int siteid) {
47   return evalPrefetch[siteid].uselesscount;
48 }
49
50 char getOperationMode(int siteid) {
51   return evalPrefetch[siteid].operMode;
52 }
53
54 /* This function updates counters and mode of operation of a
55  * prefetch site during runtime. When the prefetch call at a site
56  * generates oids that are found/not found in the prefetch cache,
57  * we take action accordingly */
58 void handleDynPrefetching(int numLocal, int ntuples, int siteid) {
59   if(numLocal < ntuples) {
60     /* prefetch not found locally(miss in cache) */
61     evalPrefetch[siteid].operMode = 1;
62     evalPrefetch[siteid].uselesscount = SHUTDOWNINTERVAL;
63   } else {
64     if(getOperationMode(siteid) != 0) {
65       evalPrefetch[siteid].uselesscount--;
66       if(evalPrefetch[siteid].uselesscount <= 0) {
67         LOGEVENT('O');
68         evalPrefetch[siteid].operMode = 0;
69       }
70     }
71   }
72 }
73
74 #if 1
75 /* This function clears from prefetch cache those
76  * entries that caused a transaction abort */
77 void cleanPCache() {
78   unsigned int size = c_size;
79   chashlistnode_t *ptr = c_table;
80   int i;
81   for(i = 0; i < size; i++) {
82     chashlistnode_t *curr = &ptr[i]; //for each entry in the cache lookupTable
83     while(curr != NULL) {
84       if(curr->key == 0)
85         break;
86       objheader_t *header1, *header2;
87       /* Not found in local machine's object store and found in prefetch cache */
88       if((header1 = mhashSearch(curr->key)) == NULL && ((header2 = prehashSearch(curr->key)) != NULL)) {
89         /* Remove from prefetch cache */
90         prehashRemove(curr->key);
91       }
92       curr = curr->next;
93     }
94   }
95 }
96 #else
97 /* This function clears from prefetch cache those
98  * entries that caused a transaction abort */
99 void cleanPCache() {
100   unsigned int size = c_size;
101   struct chashentry *ptr = c_table;
102   int i;
103   for(i = 0; i < size; i++) {
104     struct chashentry *curr = &ptr[i]; //for each entry in the cache lookupTable
105     if(curr->key == 0)
106       continue;
107     objheader_t *header1, *header2;
108     /* Not found in local machine's object store and found in prefetch cache */
109     if((header1 = mhashSearch(curr->key)) == NULL && ((header2 = prehashSearch(curr->key)) != NULL)) {
110       /* Remove from prefetch cache */
111       prehashRemove(curr->key);
112     }
113   }
114 }
115 #endif
116
117 /* This function updates the prefetch cache with
118  * entries from the transaction cache when a
119  * transaction commits
120  * Return -1 on error else returns 0 */
121 int updatePrefetchCache(trans_req_data_t *tdata) {
122   int retval;
123   char oidType;
124   /*//TODO comment it for now because objects read are already in the prefetch cache
125   oidType = 'R';
126   if(tdata->f.numread > 0) {
127     if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), oidType)) != 0) {
128       printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
129       return -1;
130     }
131   }
132   */
133   if(tdata->f.nummod > 0) {
134     oidType = 'M';
135     if((retval = copyToCache(tdata->f.nummod, tdata->oidmod, oidType)) != 0) {
136       printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
137       return -1;
138     }
139   }
140   return 0;
141 }
142
143 int copyToCache(int numoid, unsigned int *oidarray, char oidType) {
144   int i;
145   for (i = 0; i < numoid; i++) {
146     unsigned int oid;
147     //if(oidType == 'R') {
148     //  char * objread = (char *) oidarray;
149     //  oid = *((unsigned int *)(objread+(sizeof(unsigned int)+
150     //                                    sizeof(unsigned short))*i));
151     //} else {
152       oid = oidarray[i];
153     //}
154     pthread_mutex_lock(&prefetchcache_mutex);
155     objheader_t * header;
156     if((header = (objheader_t *) t_chashSearch(oid)) == NULL) {
157       printf("%s() obj %x is no longer in transaction cache at %s , %d\n", __func__, oid,__FILE__, __LINE__);
158       fflush(stdout);
159       return -1;
160     }
161     //copy into prefetch cache
162     int size;
163     GETSIZE(size, header);
164     objheader_t * newAddr;
165     if((newAddr = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
166       printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
167              __FILE__, __LINE__);
168       pthread_mutex_unlock(&prefetchcache_mutex);
169       return -1;
170     }
171     pthread_mutex_unlock(&prefetchcache_mutex);
172     memcpy(newAddr, header, size+sizeof(objheader_t));
173     //Increment version for every modified object
174     if(oidType == 'M') {
175       newAddr->version += 1;
176       newAddr->notifylist = NULL;
177     }
178     //make an entry in prefetch lookup hashtable
179     void *oldptr;
180     if((oldptr = prehashSearch(oid)) != NULL) {
181       prehashRemove(oid);
182       prehashInsert(oid, newAddr);
183     } else {
184       prehashInsert(oid, newAddr);
185     }
186   } //end of for
187   return 0;
188 }