runtime changes for ecoop submission
[IRC.git] / Robust / src / Runtime / DSTM / interface / addPrefetchEnhance.c
1 #include "addPrefetchEnhance.h"
2 #include "altprelookup.h"
3
4 extern int numprefetchsites; // Number of prefetch sites
5 extern pfcstats_t *evalPrefetch; //Global array that keeps track of operation mode (ON/OFF) for each prefetch site
6 extern objstr_t *prefetchcache; //Global Prefetch cache
7 extern pthread_mutex_t prefetchcache_mutex; //Mutex to lock Prefetch Cache
8 extern unsigned int myIpAddr;
9
10 //#define LOGEVENTS
11 #ifdef LOGEVENTS
12 extern char bigarray[16*1024*1024];
13 extern int bigindex;
14 #define LOGEVENT(x) { \
15   int tmp=bigindex++;                         \
16   bigarray[tmp]=x;                            \
17   }
18 #else
19 #define LOGEVENT(x)
20 #endif
21
22
23 /* This function creates and initializes the
24  * evalPrefetch global array */
25 pfcstats_t *initPrefetchStats() {
26   pfcstats_t *ptr;
27   if((ptr = calloc(numprefetchsites, sizeof(pfcstats_t))) == NULL) {
28     printf("%s() Calloc error in %s at line %d\n", __func__, __FILE__, __LINE__);
29     return NULL;
30   }
31   int i;
32   /* Enable prefetching at the beginning */
33   for(i=0; i<numprefetchsites; i++) {
34     ptr[i].operMode = 1;
35     ptr[i].callcount = 0;
36     ptr[i].retrycount = RETRYINTERVAL; //N
37     ptr[i].uselesscount = SHUTDOWNINTERVAL; //M
38   }
39   return ptr;
40 }
41
42 int getRetryCount(int siteid) {
43   return evalPrefetch[siteid].retrycount;
44 }
45
46 int getUselessCount(int siteid) {
47   return evalPrefetch[siteid].uselesscount;
48 }
49
50 char getOperationMode(int siteid) {
51   return evalPrefetch[siteid].operMode;
52 }
53
54 /* This function updates counters and mode of operation of a
55  * prefetch site during runtime. When the prefetch call at a site
56  * generates oids that are found/not found in the prefetch cache,
57  * we take action accordingly */
58 void handleDynPrefetching(int numLocal, int ntuples, int siteid) {
59   if(numLocal < ntuples) {
60     /* prefetch not found locally(miss in cache); turn on prefetching*/
61     evalPrefetch[siteid].operMode = 1;
62     evalPrefetch[siteid].uselesscount = SHUTDOWNINTERVAL;
63   } else {
64     //Turn off prefetch site
65     if(getOperationMode(siteid) != 0) {
66       evalPrefetch[siteid].uselesscount--;
67       if(evalPrefetch[siteid].uselesscount <= 0) {
68         LOGEVENT('O');
69         evalPrefetch[siteid].operMode = 0;
70       }
71     }
72   }
73 }
74
75 #if 1
76 /* This function clears from prefetch cache those
77  * entries that caused a transaction abort */
78 void cleanPCache() {
79   unsigned int size = c_size;
80   chashlistnode_t *ptr = c_table;
81   int i;
82   for(i = 0; i < size; i++) {
83     chashlistnode_t *curr = &ptr[i]; //for each entry in the cache lookupTable
84     while(curr != NULL) {
85       if(curr->key == 0)
86         break;
87       objheader_t *header1, *header2;
88       /* Not found in local machine's object store and found in prefetch cache */
89       if((header1 = mhashSearch(curr->key)) == NULL && ((header2 = prehashSearch(curr->key)) != NULL)) {
90         /* Remove from prefetch cache */
91         prehashRemove(curr->key);
92       }
93       curr = curr->next;
94     }
95   }
96 }
97 #else
98 /* This function clears from prefetch cache those
99  * entries that caused a transaction abort */
100 void cleanPCache() {
101   unsigned int size = c_size;
102   struct chashentry *ptr = c_table;
103   int i;
104   for(i = 0; i < size; i++) {
105     struct chashentry *curr = &ptr[i]; //for each entry in the cache lookupTable
106     if(curr->key == 0)
107       continue;
108     objheader_t *header1, *header2;
109     /* Not found in local machine's object store and found in prefetch cache */
110     if((header1 = mhashSearch(curr->key)) == NULL && ((header2 = prehashSearch(curr->key)) != NULL)) {
111       /* Remove from prefetch cache */
112       prehashRemove(curr->key);
113     }
114   }
115 }
116 #endif
117
118 /* This function updates the prefetch cache with
119  * entries from the transaction cache when a
120  * transaction commits
121  * Return -1 on error else returns 0 */
122 int updatePrefetchCache(trans_req_data_t *tdata) {
123   int retval;
124   char oidType;
125   /*//TODO comment it for now because objects read are already in the prefetch cache
126   oidType = 'R';
127   if(tdata->f.numread > 0) {
128     if((retval = copyToCache(tdata->f.numread, (unsigned int *)(tdata->objread), oidType)) != 0) {
129       printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
130       return -1;
131     }
132   }
133   */
134   if(tdata->f.nummod > 0) {
135     oidType = 'M';
136     if((retval = copyToCache(tdata->f.nummod, tdata->oidmod, oidType)) != 0) {
137       printf("%s(): Error in copying objects read at %s, %d\n", __func__, __FILE__, __LINE__);
138       return -1;
139     }
140   }
141   return 0;
142 }
143
144 int copyToCache(int numoid, unsigned int *oidarray, char oidType) {
145   int i;
146   for (i = 0; i < numoid; i++) {
147     unsigned int oid;
148     //if(oidType == 'R') {
149     //  char * objread = (char *) oidarray;
150     //  oid = *((unsigned int *)(objread+(sizeof(unsigned int)+
151     //                                    sizeof(unsigned short))*i));
152     //} else {
153       oid = oidarray[i];
154     //}
155     pthread_mutex_lock(&prefetchcache_mutex);
156     objheader_t * header;
157     if((header = (objheader_t *) t_chashSearch(oid)) == NULL) {
158       printf("%s() obj %x is no longer in transaction cache at %s , %d\n", __func__, oid,__FILE__, __LINE__);
159       fflush(stdout);
160       return -1;
161     }
162     //copy into prefetch cache
163     int size;
164     GETSIZE(size, header);
165     objheader_t * newAddr;
166     if((newAddr = prefetchobjstrAlloc(size + sizeof(objheader_t))) == NULL) {
167       printf("%s(): Error in getting memory from prefetch cache at %s, %d\n", __func__,
168              __FILE__, __LINE__);
169       pthread_mutex_unlock(&prefetchcache_mutex);
170       return -1;
171     }
172     pthread_mutex_unlock(&prefetchcache_mutex);
173     memcpy(newAddr, header, size+sizeof(objheader_t));
174     //Increment version for every modified object
175     if(oidType == 'M') {
176       newAddr->version += 1;
177       newAddr->notifylist = NULL;
178     }
179     STATUS(newAddr)=0;
180
181     //make an entry in prefetch lookup hashtable
182     prehashInsert(oid, newAddr);
183   } //end of for
184   return 0;
185 }