new files for STM
[IRC.git] / Robust / src / Runtime / STM / stm.c
1 /* ============================================================
2  * singleTMCommit.c 
3  * - single thread commit on local machine
4  * =============================================================
5  * Copyright (c) 2009, University of California, Irvine, USA.
6  * All rights reserved.
7  * Author: Alokika Dash 
8  *         adash@uci.edu
9  * =============================================================
10  *
11  */
12
13 #include "tm.h"
14
15 /* =======================
16  * Global variables
17  * ======================
18  */
19 unsigned int oidMin;
20 unsigned int oidMax;
21 extern int classsize[];
22 /* Thread transaction variables */
23 __thread objstr_t *t_cache;
24
25
26 /* ==================================================
27  * dstmStartup
28  * This function starts up the transaction runtime. 
29  * ==================================================
30  */
31 int stmStartup() {
32   oidMax = 0xFFFFFFFF;
33   oidMin = 0;
34
35   return 0;
36 }
37
38 /* ======================================
39  * objstrCreate
40  * - create an object store of given size
41  * ======================================
42  */
43 objstr_t *objstrCreate(unsigned int size) {
44   objstr_t *tmp;
45   if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
46     printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
47     return NULL;
48   }
49   tmp->size = size;
50   tmp->next = NULL;
51   tmp->top = tmp + 1; //points to end of objstr_t structure!
52   return tmp;
53 }
54
55 /* =================================================
56  * transStart
57  * This function initializes things required in the 
58  * transaction start
59  * =================================================
60  */
61 void transStart() {
62   t_cache = objstrCreate(1048576);
63   t_chashCreate(CHASH_SIZE, CLOADFACTOR);
64 }
65
66 /* =======================================================
67  * transCreateObj
68  * This function creates objects in the transaction record 
69  * =======================================================
70  */
71 objheader_t *transCreateObj(unsigned int size) {
72   objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
73   OID(tmp) = getNewOID();
74   tmp->version = 1;
75   tmp->rcount = 1;
76   STATUS(tmp) = NEW;
77   t_chashInsert(OID(tmp), tmp);
78
79 #ifdef COMPILER
80   return &tmp[1]; //want space after object header
81 #else
82   return tmp;
83 #endif
84 }
85
86 //TODO: when reusing oids, make sure they are not already in use!
87 static unsigned int id = 0xFFFFFFFF;
88 unsigned int getNewOID(void) {
89   id += 2;
90   if (id > oidMax || id < oidMin) {
91     id = (oidMin | 1);
92   }
93   return id;
94 }
95
96 /* This functions inserts randowm wait delays in the order of msec
97  * Mostly used when transaction commits retry*/
98 void randomdelay() {
99   struct timespec req;
100   time_t t;
101
102   t = time(NULL);
103   req.tv_sec = 0;
104   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
105   nanosleep(&req, NULL);
106   return;
107 }
108
109 /* ==============================================
110  * objstrAlloc
111  * - allocate space in an object store
112  * ==============================================
113  */
114 void *objstrAlloc(objstr_t **osptr, unsigned int size) {
115   void *tmp;
116   int i=0;
117   objstr_t *store=*osptr;
118   if ((size&7)!=0) {
119     size+=(8-(size&7));
120   }
121
122   for(;i<3;i++) {
123     if (OSFREE(store)>=size) {
124       tmp=store->top;
125       store->top +=size;
126       return tmp;
127     }
128     if ((store=store->next)==NULL)
129       break;
130   }
131
132   {
133     unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE?size:DEFAULT_OBJ_STORE_SIZE;
134     objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
135     void *ptr=&os[1];
136     os->next=store;
137     (*osptr)=os;
138     os->size=newsize;
139     os->top=((char *)ptr)+size;
140     return ptr;
141   }
142 }
143
144 /* =============================================================
145  * transRead
146  * -finds the objects either in transaction cache or main heap
147  * -copies the object into the transaction cache
148  * =============================================================
149  */
150 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
151   int size;
152   objheader_t *objcopy;
153   chashlistnode_t *node;
154
155   if(oid == 0) {
156     return NULL;
157   }
158   
159   /* Read from the transaction cache */
160   node= &c_table[(oid & c_mask)>>1];
161   do {
162     if(node->key == oid) {
163 #ifdef COMPILER
164     return &((objheader_t*)node->val)[1];
165 #else
166     return node->val;
167 #endif
168     }
169     node = node->next;
170   } while(node != NULL);
171
172   /* Read from the main heap */
173   objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t)); 
174   if(read_trylock(STATUSPTR(header))) { //Can further acquire read locks
175     GETSIZE(size, header);
176     size += sizeof(objheader_t);
177     objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
178     memcpy(objcopy, header, size);
179     /* Insert into cache's lookup table */
180     STATUS(objcopy)=0;
181     t_chashInsert(OID(header), objcopy);
182 #ifdef COMPILER
183     return &objcopy[1];
184 #else
185     return objcopy;
186 #endif
187   }
188   read_unlock(STATUSPTR(header));
189 }
190
191 /* ================================================================
192  * transCommit
193  * - This function initiates the transaction commit process
194  * - goes through the transaction cache and decides
195  * - a final response 
196  * ================================================================
197  */
198 int transCommit() {
199   char finalResponse;
200   char treplyretry; /* keeps track of the common response that needs to be sent */
201
202   do {
203     treplyretry = 0;
204     /* Look through all the objects in the transaction hash table */
205     finalResponse = traverseCache(&treplyretry);
206     if(finalResponse == TRANS_ABORT) {
207       break;
208     }
209     if(finalResponse == TRANS_COMMIT) {
210       break;
211     }
212     /* wait a random amount of time before retrying to commit transaction*/
213     if(treplyretry && (finalResponse == TRANS_SOFT_ABORT)) {
214       randomdelay();
215     }
216     if(finalResponse != TRANS_ABORT || finalResponse != TRANS_COMMIT || finalResponse != TRANS_SOFT_ABORT) {
217       printf("Error: in %s() Unknown outcome", __func__);
218       exit(-1);
219     }
220     /* Retry trans commit procedure during soft_abort case */
221   } while (treplyretry);
222
223   if(finalResponse == TRANS_ABORT) {
224     /* Free Resources */
225     objstrDelete(t_cache);
226     t_chashDelete();
227     return TRANS_ABORT;
228   } else if(finalResponse == TRANS_COMMIT) {
229     /* Free Resources */
230     objstrDelete(t_cache);
231     t_chashDelete();
232     return 0;
233   } else {
234     //TODO Add other cases
235     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
236     exit(-1);
237   }
238   return 0;
239 }
240
241 /* ==================================================
242  * traverseCache
243  * - goes through the transaction cache and
244  * - decides if a transaction should commit or abort
245  * ==================================================
246  */
247 char traverseCache(char *treplyretry) {
248   /* Create info for newly creately objects */
249   int numcreated=0;
250   unsigned int oidcreated[c_numelements];
251   /* Create info to keep track of objects that can be locked */
252   int numoidrdlocked=0;
253   int numoidwrlocked=0;
254   unsigned int oidrdlocked[c_numelements];
255   unsigned int oidwrlocked[c_numelements];
256   /* Counters to decide final response of this transaction */
257   int vmatch_lock;
258   int vmatch_nolock;
259   int vnomatch;
260   int numoidread;
261   int numoidmod;
262   char response;
263
264   int i;
265   chashlistnode_t *ptr = c_table;
266   /* Represents number of bins in the chash table */
267   unsigned int size = c_size;
268   for(i = 0; i<size; i++) {
269     chashlistnode_t *curr = &ptr[i];
270     /* Inner loop to traverse the linked list of the cache lookupTable */
271     while(curr != NULL) {
272       //if the first bin in hash table is empty
273       if(curr->key == 0)
274         break;
275       objheader_t * headeraddr=(objheader_t *) curr->val;
276       response = decideResponse(headeraddr, oidcreated, &numcreated, oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked,
277                                 &vmatch_lock, &vmatch_nolock, &vnomatch, &numoidmod, &numoidread);
278       if(response == TRANS_ABORT) {
279         *treplyretry = 0;
280         transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
281         return TRANS_ABORT;
282       }
283       curr = curr->next;
284     }
285   } //end of for
286   
287   /* Decide the final response */
288   if(vmatch_nolock == (numoidread + numoidmod)) {
289     *treplyretry = 0;
290     transCommitProcess(oidcreated, &numcreated, oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
291     response = TRANS_COMMIT;
292   }
293   if(vmatch_lock > 0 && vnomatch == 0) {
294     *treplyretry = 1;
295     response = TRANS_SOFT_ABORT;
296   }
297   return response;
298 }
299
300 /* ===========================================================================
301  * decideResponse
302  * - increments counters that keep track of objects read, modified or locked
303  * - updates the oids locked and oids newly created 
304  * ===========================================================================
305  */
306 char decideResponse(objheader_t *headeraddr, unsigned int *oidcreated, int *numcreated, unsigned int* oidrdlocked, int *numoidrdlocked,
307     unsigned int*oidwrlocked, int *numoidwrlocked, int *vmatch_lock, int *vmatch_nolock, int *vnomatch, int *numoidmod, int *numoidread) {
308   unsigned short version = headeraddr->version;
309   unsigned int oid = OID(headeraddr);
310   if(STATUS(headeraddr) & NEW) {
311     oidcreated[(*numcreated)++] = OID(headeraddr);
312   } else if(STATUS(headeraddr) & DIRTY) {
313     (*numoidmod)++;
314     /* Read from the main heap  and compare versions */
315     objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t)); 
316     if(write_trylock(STATUSPTR(header))) { //can aquire write lock
317       if (version == header->version) {/* versions match */
318         /* Keep track of objects locked */
319         (*vmatch_nolock)++;
320         oidwrlocked[(*numoidwrlocked)++] = OID(header);
321       } else { 
322         (*vnomatch)++;
323         oidwrlocked[(*numoidwrlocked)++] = OID(header);
324         return TRANS_ABORT;
325       }
326     } else { /* cannot aquire lock */
327       if(version == header->version) /* versions match */
328         (*vmatch_lock)++;
329       else {
330         (*vnomatch)++;
331         return TRANS_ABORT;
332       }
333     }
334   } else {
335     (*numoidread)++;
336     /* Read from the main heap  and compare versions */
337     objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t)); 
338     if(read_trylock(STATUSPTR(header))) { //can further aquire read locks
339       if(version == header->version) {/* versions match */
340         (*vmatch_nolock)++;
341         oidrdlocked[(*numoidrdlocked)++] = OID(header);
342       } else {
343         (*vnomatch)++;
344         oidrdlocked[(*numoidrdlocked)++] = OID(header);
345         return TRANS_ABORT;
346       }
347     } else { /* cannot aquire lock */
348       if(version == header->version)
349         (*vmatch_lock)++;
350       else {
351         (*vnomatch)++;
352         return TRANS_ABORT;
353       }
354     }
355   }
356   return 0;
357 }
358
359 /* ==================================
360  * transAbortProcess
361  *
362  * =================================
363  */
364 int transAbortProcess(unsigned int *oidrdlocked, int *numoidrdlocked, unsigned int *oidwrlocked, int *numoidwrlocked) {
365   int i;
366   objheader_t *header;
367   /* Release read locks */
368   for(i=0; i< *numoidrdlocked; i++) {
369     /* Read from the main heap */
370     if((header = (objheader_t *)(((char *)(&oidrdlocked[i])) - sizeof(objheader_t))) == NULL) {
371       printf("Error: %s() main heap returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
372       return 1;
373     }
374     read_unlock(STATUSPTR(header));
375   }
376
377   /* Release write locks */
378   for(i=0; i< *numoidwrlocked; i++) {
379     /* Read from the main heap */
380     if((header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t))) == NULL) {
381       printf("Error: %s() main heap returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
382       return 1;
383     }
384     write_unlock(STATUSPTR(header));
385   }
386 }
387
388 /* ==================================
389  * transCommitProcess
390  *
391  * =================================
392  */
393 int transCommmitProcess(unsigned int *oidcreated, int *numoidcreated, unsigned int *oidrdlocked, int *numoidrdlocked,
394                     unsigned int *oidwrlocked, int *numoidwrlocked) {
395   objheader_t *header, *tcptr;
396   void *ptrcreate;
397
398   int i;
399   /* If object is newly created inside transaction then commit it */
400   for (i = 0; i < *numoidcreated; i++) {
401     if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
402       printf("Error: %s() chashSearch returned NULL for oid = %x at %s, %d\n", __func__, oidcreated[i], __FILE__, __LINE__);
403       return 1;
404     }
405     int tmpsize;
406     GETSIZE(tmpsize, header);
407     tmpsize += sizeof(objheader_t);
408     /* FIXME Is this correct? */
409 #ifdef PRECISE_GC
410     ptrcreate = mygcmalloc((struct garbagelist *)header, tmpsize);
411 #else
412     ptrcreate = FREEMALLOC(tmpsize);
413 #endif
414     /* Initialize read and write locks */
415     initdsmlocks(STATUSPTR(header));
416     memcpy(ptrcreate, header, tmpsize);
417   }
418
419   /* Copy from transaction cache -> main object store */
420   for (i = 0; i < *numoidwrlocked; i++) {
421     /* Read from the main heap */ 
422     if((header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t))) == NULL) {
423       printf("Error: %s() main heap returns NULL at %s, %d\n", __func__, __FILE__, __LINE__);
424       return 1;
425     }
426     if ((tcptr = ((objheader_t *) t_chashSearch(oidwrlocked[i]))) == NULL) {
427       printf("Error: %s() chashSearch returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
428       return 1;
429     }
430     int tmpsize;
431     GETSIZE(tmpsize, header);
432     char *tmptcptr = (char *) tcptr;
433     {
434       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
435       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
436       dst->___cachedCode___=src->___cachedCode___;
437       dst->___cachedHash___=src->___cachedHash___;
438
439       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
440     }
441
442     header->version += 1;
443     if(header->notifylist != NULL) {
444       notifyAll(&header->notifylist, OID(header), header->version);
445     }
446   }
447   
448   /* Release read locks */
449   for(i=0; i< *numoidrdlocked; i++) {
450     /* Read from the main heap */
451     header = (objheader_t *)(((char *)(&oidrdlocked[i])) - sizeof(objheader_t)); 
452     read_unlock(STATUSPTR(header));
453   }
454
455   /* Release write locks */
456   for(i=0; i< *numoidwrlocked; i++) {
457     header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t)); 
458     write_unlock(STATUSPTR(header));
459   }
460
461   return 0;
462 }
463