1 /* ============================================================
3 * - single thread commit on local machine
4 * =============================================================
5 * Copyright (c) 2009, University of California, Irvine, USA.
9 * =============================================================
15 /* Thread transaction variables */
16 __thread objstr_t *t_cache;
17 __thread objstr_t *t_reserve;
18 __thread struct objlist * newobjs;
21 int numTransCommit = 0;
22 int numTransAbort = 0;
24 int nSoftAbortCommit = 0;
25 int nSoftAbortAbort = 0;
29 /* ==================================================
31 * This function starts up the transaction runtime.
32 * ==================================================
38 /* ======================================
40 * - create an object store of given size
41 * ======================================
43 objstr_t *objstrCreate(unsigned int size) {
45 if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
46 printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
51 tmp->top = tmp + 1; //points to end of objstr_t structure!
56 while(t_cache->next!=NULL) {
57 objstr_t *next=t_cache->next;
58 t_cache->next=t_reserve;
62 t_cache->top=t_cache+1;
65 //free entire list, starting at store
66 void objstrDelete(objstr_t *store) {
68 while (store != NULL) {
76 /* =================================================
78 * This function initializes things required in the
80 * =================================================
83 //Transaction start is currently free...commit and aborting is not
86 /* =======================================================
88 * This function creates objects in the transaction record
89 * =======================================================
91 objheader_t *transCreateObj(void * ptr, unsigned int size) {
92 objheader_t *tmp = mygcmalloc(ptr, (sizeof(objheader_t) + size));
93 objheader_t *retval=&tmp[1];
94 tmp->lock=RW_LOCK_BIAS;
97 // don't insert into table
98 if (newobjs->offset<MAXOBJLIST) {
99 newobjs->objs[newobjs->offset++]=retval;
101 struct objlist *tmp=malloc(sizeof(struct objlist));
107 return retval; //want space after object header
110 /* This functions inserts randowm wait delays in the order of msec
111 * Mostly used when transaction commits retry*/
112 void randomdelay(int softaborted) {
116 gettimeofday(&t,NULL);
119 req.tv_nsec = (long)((t.tv_usec)%(1<<softaborted))<<1; //1-11 microsec
120 nanosleep(&req, NULL);
124 /* ==============================================
126 * - allocate space in an object store
127 * ==============================================
129 void *objstrAlloc(unsigned int size) {
132 objstr_t *store=t_cache;
138 if (OSFREE(store)>=size) {
143 if ((store=store->next)==NULL)
148 unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE?size:DEFAULT_OBJ_STORE_SIZE;
149 objstr_t **otmp=&t_reserve;
151 while((ptr=*otmp)!=NULL) {
152 if (ptr->size>=newsize) {
157 ptr->top=((char *)(&ptr[1]))+size;
162 objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
167 os->top=((char *)nptr)+size;
172 /* =============================================================
174 * -finds the objects either in main heap
175 * -copies the object into the transaction cache
176 * =============================================================
178 __attribute__((pure)) void *transRead(void * oid) {
179 objheader_t *tmp, *objheader;
180 objheader_t *objcopy;
183 //quick case for new objects
184 if (((struct ___Object___ *)oid)->___objstatus___ & NEW)
187 /* Read from the main heap */
189 objheader_t *header = (objheader_t *)(((char *)oid) - sizeof(objheader_t));
190 GETSIZE(size, header);
191 size += sizeof(objheader_t);
192 objcopy = (objheader_t *) objstrAlloc(size);
193 memcpy(objcopy, header, size);
194 /* Insert into cache's lookup table */
196 t_chashInsert(oid, &objcopy[1]);
201 struct objlist *ptr=newobjs;
202 while(ptr->next!=NULL) {
203 struct objlist *tmp=ptr->next;
211 /* ================================================================
213 * - This function initiates the transaction commit process
214 * - goes through the transaction cache and decides
216 * ================================================================
223 /* Look through all the objects in the transaction hash table */
225 if (c_numelements<(c_size>>3))
226 finalResponse= alttraverseCache();
228 finalResponse= traverseCache();
229 if(finalResponse == TRANS_ABORT) {
241 if(finalResponse == TRANS_COMMIT) {
253 /* wait a random amount of time before retrying to commit transaction*/
254 if(finalResponse == TRANS_SOFT_ABORT) {
260 //retry if to many soft aborts
266 randomdelay(softaborted);
268 printf("Error: in %s() Unknown outcome", __func__);
274 /* ==================================================
276 * - goes through the transaction cache and
277 * - decides if a transaction should commit or abort
278 * ==================================================
280 int traverseCache() {
281 /* Create info to keep track of objects that can be locked */
282 int numoidrdlocked=0;
283 int numoidwrlocked=0;
284 void * rdlocked[200];
285 void * wrlocked[200];
290 if (c_numelements<200) {
291 oidrdlocked=rdlocked;
292 oidwrlocked=wrlocked;
294 int size=c_numelements*sizeof(void*);
295 oidrdlocked=malloc(size);
296 oidwrlocked=malloc(size);
298 chashlistnode_t *ptr = c_table;
299 /* Represents number of bins in the chash table */
300 unsigned int size = c_size;
301 for(i = 0; i<size; i++) {
302 chashlistnode_t *curr = &ptr[i];
303 /* Inner loop to traverse the linked list of the cache lookupTable */
304 while(curr != NULL) {
305 //if the first bin in hash table is empty
306 if(curr->key == NULL)
308 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
310 unsigned int version = headeraddr->version;
311 objheader_t *header=(objheader_t *) (((char *)curr->key)-sizeof(objheader_t));
313 if(STATUS(headeraddr) & DIRTY) {
314 /* Read from the main heap and compare versions */
315 if(write_trylock(&header->lock)) { //can aquire write lock
316 if (version == header->version) {/* versions match */
317 /* Keep track of objects locked */
318 oidwrlocked[numoidwrlocked++] = OID(header);
320 oidwrlocked[numoidwrlocked++] = OID(header);
321 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
324 } else { /* cannot aquire lock */
325 if(version == header->version) {
329 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
334 /* Read from the main heap and compare versions */
335 if(read_trylock(&header->lock)) { //can further acquire read locks
336 if(version == header->version) {/* versions match */
337 oidrdlocked[numoidrdlocked++] = OID(header);
339 oidrdlocked[numoidrdlocked++] = OID(header);
340 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
343 } else { /* cannot aquire lock */
344 if(version == header->version) {
347 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
357 /* Decide the final response */
359 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
360 return TRANS_SOFT_ABORT;
362 transCommitProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
367 /* ==================================================
369 * - goes through the transaction cache and
370 * - decides if a transaction should commit or abort
371 * ==================================================
373 int alttraverseCache() {
374 /* Create info to keep track of objects that can be locked */
375 int numoidrdlocked=0;
376 int numoidwrlocked=0;
377 void * rdlocked[200];
378 void * wrlocked[200];
383 if (c_numelements<200) {
384 oidrdlocked=rdlocked;
385 oidwrlocked=wrlocked;
387 int size=c_numelements*sizeof(void*);
388 oidrdlocked=malloc(size);
389 oidwrlocked=malloc(size);
391 chashlistnode_t *curr = c_list;
392 /* Inner loop to traverse the linked list of the cache lookupTable */
393 while(curr != NULL) {
394 //if the first bin in hash table is empty
395 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
397 unsigned int version = headeraddr->version;
398 objheader_t *header=(objheader_t *) (((char *)curr->key)-sizeof(objheader_t));
400 if(STATUS(headeraddr) & DIRTY) {
401 /* Read from the main heap and compare versions */
402 if(write_trylock(&header->lock)) { //can aquire write lock
403 if (version == header->version) {/* versions match */
404 /* Keep track of objects locked */
405 oidwrlocked[numoidwrlocked++] = OID(header);
407 oidwrlocked[numoidwrlocked++] = OID(header);
408 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
411 } else { /* cannot aquire lock */
412 if(version == header->version) {
416 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
421 /* Read from the main heap and compare versions */
422 if(read_trylock(&header->lock)) { //can further aquire read locks
423 if(version == header->version) {/* versions match */
424 oidrdlocked[numoidrdlocked++] = OID(header);
426 oidrdlocked[numoidrdlocked++] = OID(header);
427 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
430 } else { /* cannot aquire lock */
431 if(version == header->version) {
434 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
443 /* Decide the final response */
445 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
446 return TRANS_SOFT_ABORT;
448 transCommitProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
454 /* ==================================
457 * =================================
459 int transAbortProcess(void **oidrdlocked, int *numoidrdlocked, void **oidwrlocked, int *numoidwrlocked) {
462 /* Release read locks */
463 for(i=0; i< *numoidrdlocked; i++) {
464 /* Read from the main heap */
465 header = (objheader_t *)(((char *)(oidrdlocked[i])) - sizeof(objheader_t));
466 read_unlock(&header->lock);
469 /* Release write locks */
470 for(i=0; i< *numoidwrlocked; i++) {
471 /* Read from the main heap */
472 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
473 write_unlock(&header->lock);
475 if (c_numelements>=200) {
481 /* ==================================
484 * =================================
486 int transCommitProcess(void ** oidrdlocked, int *numoidrdlocked,
487 void ** oidwrlocked, int *numoidwrlocked) {
491 struct objlist *ptr=newobjs;
496 ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
501 /* Copy from transaction cache -> main object store */
502 for (i = 0; i < *numoidwrlocked; i++) {
503 /* Read from the main heap */
504 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
506 GETSIZE(tmpsize, header);
507 struct ___Object___ *dst=(struct ___Object___*)oidwrlocked[i];
508 struct ___Object___ *src=t_chashSearch(oidwrlocked[i]);
509 dst->___cachedCode___=src->___cachedCode___;
510 dst->___cachedHash___=src->___cachedHash___;
511 memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
512 header->version += 1;
515 /* Release read locks */
516 for(i=0; i< *numoidrdlocked; i++) {
517 /* Read from the main heap */
518 header = (objheader_t *)(((char *)(oidrdlocked[i])) - sizeof(objheader_t));
519 read_unlock(&header->lock);
522 /* Release write locks */
523 for(i=0; i< *numoidwrlocked; i++) {
524 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
525 write_unlock(&header->lock);
527 if (c_numelements>=200) {