get rid of compile errors
[IRC.git] / Robust / src / Runtime / STM / stm.c
1 /* ============================================================
2  * singleTMCommit.c 
3  * - single thread commit on local machine
4  * =============================================================
5  * Copyright (c) 2009, University of California, Irvine, USA.
6  * All rights reserved.
7  * Author: Alokika Dash 
8  *         adash@uci.edu
9  * =============================================================
10  *
11  */
12
13 #include "tm.h"
14 /* Thread transaction variables */
15 __thread objstr_t *t_cache;
16
17
18 /* ==================================================
19  * stmStartup
20  * This function starts up the transaction runtime. 
21  * ==================================================
22  */
23 int stmStartup() {
24   return 0;
25 }
26
27 /* ======================================
28  * objstrCreate
29  * - create an object store of given size
30  * ======================================
31  */
32 objstr_t *objstrCreate(unsigned int size) {
33   objstr_t *tmp;
34   if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
35     printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
36     return NULL;
37   }
38   tmp->size = size;
39   tmp->next = NULL;
40   tmp->top = tmp + 1; //points to end of objstr_t structure!
41   return tmp;
42 }
43
44 /* =================================================
45  * transStart
46  * This function initializes things required in the 
47  * transaction start
48  * =================================================
49  */
50 void transStart() {
51   t_cache = objstrCreate(1048576);
52   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
53 }
54
55 /* =======================================================
56  * transCreateObj
57  * This function creates objects in the transaction record 
58  * =======================================================
59  */
60 objheader_t *transCreateObj(unsigned int size) {
61   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
62   OID(tmp) = getNewOID();
63   tmp->version = 1;
64   STATUS(tmp) = NEW;
65   t_chashInsert(OID(tmp), tmp);
66
67 #ifdef COMPILER
68   return &tmp[1]; //want space after object header
69 #else
70   return tmp;
71 #endif
72 }
73
74 /* This functions inserts randowm wait delays in the order of msec
75  * Mostly used when transaction commits retry*/
76 void randomdelay() {
77   struct timespec req;
78   time_t t;
79
80   t = time(NULL);
81   req.tv_sec = 0;
82   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
83   nanosleep(&req, NULL);
84   return;
85 }
86
87 /* ==============================================
88  * objstrAlloc
89  * - allocate space in an object store
90  * ==============================================
91  */
92 void *objstrAlloc(objstr_t **osptr, unsigned int size) {
93   void *tmp;
94   int i=0;
95   objstr_t *store=*osptr;
96   if ((size&7)!=0) {
97     size+=(8-(size&7));
98   }
99
100   for(;i<3;i++) {
101     if (OSFREE(store)>=size) {
102       tmp=store->top;
103       store->top +=size;
104       return tmp;
105     }
106     if ((store=store->next)==NULL)
107       break;
108   }
109
110   {
111     unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE?size:DEFAULT_OBJ_STORE_SIZE;
112     objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
113     void *ptr=&os[1];
114     os->next=store;
115     (*osptr)=os;
116     os->size=newsize;
117     os->top=((char *)ptr)+size;
118     return ptr;
119   }
120 }
121
122 /* =============================================================
123  * transRead
124  * -finds the objects either in main heap
125  * -copies the object into the transaction cache
126  * =============================================================
127  */
128 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
129   unsigned int machinenumber;
130   objheader_t *tmp, *objheader;
131   objheader_t *objcopy;
132   int size;
133
134   /* Read from the main heap */
135   objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t)); 
136   if(read_trylock(STATUSPTR(header))) { //Can further acquire read locks
137     GETSIZE(size, header);
138     size += sizeof(objheader_t);
139     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
140     memcpy(objcopy, header, size);
141     /* Insert into cache's lookup table */
142     STATUS(objcopy)=0;
143     t_chashInsert(OID(header), objcopy);
144 #ifdef COMPILER
145     return &objcopy[1];
146 #else
147     return objcopy;
148 #endif
149   }
150   read_unlock(STATUSPTR(header));
151 }
152
153 /* ================================================================
154  * transCommit
155  * - This function initiates the transaction commit process
156  * - goes through the transaction cache and decides
157  * - a final response 
158  * ================================================================
159  */
160 int transCommit() {
161   char finalResponse;
162   char treplyretry; /* keeps track of the common response that needs to be sent */
163
164   do {
165     treplyretry = 0;
166     /* Look through all the objects in the transaction hash table */
167     finalResponse = traverseCache(&treplyretry);
168     if(finalResponse == TRANS_ABORT) {
169       break;
170     }
171     if(finalResponse == TRANS_COMMIT) {
172       break;
173     }
174     /* wait a random amount of time before retrying to commit transaction*/
175     if(treplyretry && (finalResponse == TRANS_SOFT_ABORT)) {
176       randomdelay();
177     }
178     if(finalResponse != TRANS_ABORT || finalResponse != TRANS_COMMIT || finalResponse != TRANS_SOFT_ABORT) {
179       printf("Error: in %s() Unknown outcome", __func__);
180       exit(-1);
181     }
182     /* Retry trans commit procedure during soft_abort case */
183   } while (treplyretry);
184
185   if(finalResponse == TRANS_ABORT) {
186     /* Free Resources */
187     objstrDelete(t_cache);
188     t_chashDelete();
189     return TRANS_ABORT;
190   } else if(finalResponse == TRANS_COMMIT) {
191     /* Free Resources */
192     objstrDelete(t_cache);
193     t_chashDelete();
194     return 0;
195   } else {
196     //TODO Add other cases
197     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
198     exit(-1);
199   }
200   return 0;
201 }
202
203 /* ==================================================
204  * traverseCache
205  * - goes through the transaction cache and
206  * - decides if a transaction should commit or abort
207  * ==================================================
208  */
209 char traverseCache(char *treplyretry) {
210   /* Create info for newly creately objects */
211   int numcreated=0;
212   unsigned int oidcreated[c_numelements];
213   /* Create info to keep track of objects that can be locked */
214   int numoidrdlocked=0;
215   int numoidwrlocked=0;
216   unsigned int oidrdlocked[c_numelements];
217   unsigned int oidwrlocked[c_numelements];
218   /* Counters to decide final response of this transaction */
219   int vmatch_lock;
220   int vmatch_nolock;
221   int vnomatch;
222   int numoidread;
223   int numoidmod;
224   char response;
225
226   int i;
227   chashlistnode_t *ptr = c_table;
228   /* Represents number of bins in the chash table */
229   unsigned int size = c_size;
230   for(i = 0; i<size; i++) {
231     chashlistnode_t *curr = &ptr[i];
232     /* Inner loop to traverse the linked list of the cache lookupTable */
233     while(curr != NULL) {
234       //if the first bin in hash table is empty
235       if(curr->key == 0)
236         break;
237       objheader_t * headeraddr=(objheader_t *) curr->val;
238       response = decideResponse(headeraddr, oidcreated, &numcreated, oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked,
239                                 &vmatch_lock, &vmatch_nolock, &vnomatch, &numoidmod, &numoidread);
240       if(response == TRANS_ABORT) {
241         *treplyretry = 0;
242         transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
243         return TRANS_ABORT;
244       }
245       curr = curr->next;
246     }
247   } //end of for
248   
249   /* Decide the final response */
250   if(vmatch_nolock == (numoidread + numoidmod)) {
251     *treplyretry = 0;
252     transCommitProcess(oidcreated, &numcreated, oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
253     response = TRANS_COMMIT;
254   }
255   if(vmatch_lock > 0 && vnomatch == 0) {
256     *treplyretry = 1;
257     response = TRANS_SOFT_ABORT;
258   }
259   return response;
260 }
261
262 /* ===========================================================================
263  * decideResponse
264  * - increments counters that keep track of objects read, modified or locked
265  * - updates the oids locked and oids newly created 
266  * ===========================================================================
267  */
268 char decideResponse(objheader_t *headeraddr, unsigned int *oidcreated, int *numcreated, unsigned int* oidrdlocked, int *numoidrdlocked,
269     unsigned int*oidwrlocked, int *numoidwrlocked, int *vmatch_lock, int *vmatch_nolock, int *vnomatch, int *numoidmod, int *numoidread) {
270   unsigned short version = headeraddr->version;
271   unsigned int oid = OID(headeraddr);
272   if(STATUS(headeraddr) & NEW) {
273     oidcreated[(*numcreated)++] = OID(headeraddr);
274   } else if(STATUS(headeraddr) & DIRTY) {
275     (*numoidmod)++;
276     /* Read from the main heap  and compare versions */
277     objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t)); 
278     if(write_trylock(STATUSPTR(header))) { //can aquire write lock
279       if (version == header->version) {/* versions match */
280         /* Keep track of objects locked */
281         (*vmatch_nolock)++;
282         oidwrlocked[(*numoidwrlocked)++] = OID(header);
283       } else { 
284         (*vnomatch)++;
285         oidwrlocked[(*numoidwrlocked)++] = OID(header);
286         return TRANS_ABORT;
287       }
288     } else { /* cannot aquire lock */
289       if(version == header->version) /* versions match */
290         (*vmatch_lock)++;
291       else {
292         (*vnomatch)++;
293         return TRANS_ABORT;
294       }
295     }
296   } else {
297     (*numoidread)++;
298     /* Read from the main heap  and compare versions */
299     objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t)); 
300     if(read_trylock(STATUSPTR(header))) { //can further aquire read locks
301       if(version == header->version) {/* versions match */
302         (*vmatch_nolock)++;
303         oidrdlocked[(*numoidrdlocked)++] = OID(header);
304       } else {
305         (*vnomatch)++;
306         oidrdlocked[(*numoidrdlocked)++] = OID(header);
307         return TRANS_ABORT;
308       }
309     } else { /* cannot aquire lock */
310       if(version == header->version)
311         (*vmatch_lock)++;
312       else {
313         (*vnomatch)++;
314         return TRANS_ABORT;
315       }
316     }
317   }
318   return 0;
319 }
320
321 /* ==================================
322  * transAbortProcess
323  *
324  * =================================
325  */
326 int transAbortProcess(unsigned int *oidrdlocked, int *numoidrdlocked, unsigned int *oidwrlocked, int *numoidwrlocked) {
327   int i;
328   objheader_t *header;
329   /* Release read locks */
330   for(i=0; i< *numoidrdlocked; i++) {
331     /* Read from the main heap */
332     if((header = (objheader_t *)(((char *)(&oidrdlocked[i])) - sizeof(objheader_t))) == NULL) {
333       printf("Error: %s() main heap returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
334       return 1;
335     }
336     read_unlock(STATUSPTR(header));
337   }
338
339   /* Release write locks */
340   for(i=0; i< *numoidwrlocked; i++) {
341     /* Read from the main heap */
342     if((header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t))) == NULL) {
343       printf("Error: %s() main heap returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
344       return 1;
345     }
346     write_unlock(STATUSPTR(header));
347   }
348 }
349
350 /* ==================================
351  * transCommitProcess
352  *
353  * =================================
354  */
355 int transCommmitProcess(unsigned int *oidcreated, int *numoidcreated, unsigned int *oidrdlocked, int *numoidrdlocked,
356                     unsigned int *oidwrlocked, int *numoidwrlocked) {
357   objheader_t *header, *tcptr;
358   void *ptrcreate;
359
360   int i;
361   /* If object is newly created inside transaction then commit it */
362   for (i = 0; i < *numoidcreated; i++) {
363     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
364       printf("Error: %s() chashSearch returned NULL for oid = %x at %s, %d\n", __func__, oidcreated[i], __FILE__, __LINE__);
365       return 1;
366     }
367     int tmpsize;
368     GETSIZE(tmpsize, header);
369     tmpsize += sizeof(objheader_t);
370     /* FIXME Is this correct? */
371 #ifdef PRECISE_GC
372     ptrcreate = mygcmalloc((struct garbagelist *)header, tmpsize);
373 #else
374     ptrcreate = FREEMALLOC(tmpsize);
375 #endif
376     /* Initialize read and write locks */
377     initdsmlocks(STATUSPTR(header));
378     memcpy(ptrcreate, header, tmpsize);
379   }
380
381   /* Copy from transaction cache -> main object store */
382   for (i = 0; i < *numoidwrlocked; i++) {
383     /* Read from the main heap */ 
384     if((header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t))) == NULL) {
385       printf("Error: %s() main heap returns NULL at %s, %d\n", __func__, __FILE__, __LINE__);
386       return 1;
387     }
388     if ((tcptr = ((objheader_t *) t_chashSearch(oidwrlocked[i]))) == NULL) {
389       printf("Error: %s() chashSearch returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
390       return 1;
391     }
392     int tmpsize;
393     GETSIZE(tmpsize, header);
394     char *tmptcptr = (char *) tcptr;
395     {
396       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
397       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
398       dst->___cachedCode___=src->___cachedCode___;
399       dst->___cachedHash___=src->___cachedHash___;
400
401       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
402     }
403
404     header->version += 1;
405     if(header->notifylist != NULL) {
406       notifyAll(&header->notifylist, OID(header), header->version);
407     }
408   }
409   
410   /* Release read locks */
411   for(i=0; i< *numoidrdlocked; i++) {
412     /* Read from the main heap */
413     header = (objheader_t *)(((char *)(&oidrdlocked[i])) - sizeof(objheader_t)); 
414     read_unlock(STATUSPTR(header));
415   }
416
417   /* Release write locks */
418   for(i=0; i< *numoidwrlocked; i++) {
419     header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t)); 
420     write_unlock(STATUSPTR(header));
421   }
422
423   return 0;
424 }
425