1 /* ============================================================
3 * - single thread commit on local machine
4 * =============================================================
5 * Copyright (c) 2009, University of California, Irvine, USA.
9 * =============================================================
15 /* =======================
17 * ======================
21 extern int classsize[];
22 /* Thread transaction variables */
23 __thread objstr_t *t_cache;
26 /* ==================================================
28 * This function starts up the transaction runtime.
29 * ==================================================
38 /* ======================================
40 * - create an object store of given size
41 * ======================================
43 objstr_t *objstrCreate(unsigned int size) {
45 if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
46 printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
51 tmp->top = tmp + 1; //points to end of objstr_t structure!
55 /* =================================================
57 * This function initializes things required in the
59 * =================================================
62 t_cache = objstrCreate(1048576);
63 t_chashCreate(CHASH_SIZE, CLOADFACTOR);
66 /* =======================================================
68 * This function creates objects in the transaction record
69 * =======================================================
71 objheader_t *transCreateObj(unsigned int size) {
72 objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
73 OID(tmp) = getNewOID();
77 t_chashInsert(OID(tmp), tmp);
80 return &tmp[1]; //want space after object header
86 //TODO: when reusing oids, make sure they are not already in use!
87 static unsigned int id = 0xFFFFFFFF;
88 unsigned int getNewOID(void) {
90 if (id > oidMax || id < oidMin) {
96 /* This functions inserts randowm wait delays in the order of msec
97 * Mostly used when transaction commits retry*/
104 req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
105 nanosleep(&req, NULL);
109 /* ==============================================
111 * - allocate space in an object store
112 * ==============================================
114 void *objstrAlloc(objstr_t **osptr, unsigned int size) {
117 objstr_t *store=*osptr;
123 if (OSFREE(store)>=size) {
128 if ((store=store->next)==NULL)
133 unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE?size:DEFAULT_OBJ_STORE_SIZE;
134 objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
139 os->top=((char *)ptr)+size;
144 /* =============================================================
146 * -finds the objects either in transaction cache or main heap
147 * -copies the object into the transaction cache
148 * =============================================================
150 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
152 objheader_t *objcopy;
153 chashlistnode_t *node;
159 /* Read from the transaction cache */
160 node= &c_table[(oid & c_mask)>>1];
162 if(node->key == oid) {
164 return &((objheader_t*)node->val)[1];
170 } while(node != NULL);
172 /* Read from the main heap */
173 objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t));
174 if(read_trylock(STATUSPTR(header))) { //Can further acquire read locks
175 GETSIZE(size, header);
176 size += sizeof(objheader_t);
177 objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
178 memcpy(objcopy, header, size);
179 /* Insert into cache's lookup table */
181 t_chashInsert(OID(header), objcopy);
188 read_unlock(STATUSPTR(header));
191 /* ================================================================
193 * - This function initiates the transaction commit process
194 * - goes through the transaction cache and decides
196 * ================================================================
200 char treplyretry; /* keeps track of the common response that needs to be sent */
204 /* Look through all the objects in the transaction hash table */
205 finalResponse = traverseCache(&treplyretry);
206 if(finalResponse == TRANS_ABORT) {
209 if(finalResponse == TRANS_COMMIT) {
212 /* wait a random amount of time before retrying to commit transaction*/
213 if(treplyretry && (finalResponse == TRANS_SOFT_ABORT)) {
216 if(finalResponse != TRANS_ABORT || finalResponse != TRANS_COMMIT || finalResponse != TRANS_SOFT_ABORT) {
217 printf("Error: in %s() Unknown outcome", __func__);
220 /* Retry trans commit procedure during soft_abort case */
221 } while (treplyretry);
223 if(finalResponse == TRANS_ABORT) {
225 objstrDelete(t_cache);
228 } else if(finalResponse == TRANS_COMMIT) {
230 objstrDelete(t_cache);
234 //TODO Add other cases
235 printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
241 /* ==================================================
243 * - goes through the transaction cache and
244 * - decides if a transaction should commit or abort
245 * ==================================================
247 char traverseCache(char *treplyretry) {
248 /* Create info for newly creately objects */
250 unsigned int oidcreated[c_numelements];
251 /* Create info to keep track of objects that can be locked */
252 int numoidrdlocked=0;
253 int numoidwrlocked=0;
254 unsigned int oidrdlocked[c_numelements];
255 unsigned int oidwrlocked[c_numelements];
256 /* Counters to decide final response of this transaction */
265 chashlistnode_t *ptr = c_table;
266 /* Represents number of bins in the chash table */
267 unsigned int size = c_size;
268 for(i = 0; i<size; i++) {
269 chashlistnode_t *curr = &ptr[i];
270 /* Inner loop to traverse the linked list of the cache lookupTable */
271 while(curr != NULL) {
272 //if the first bin in hash table is empty
275 objheader_t * headeraddr=(objheader_t *) curr->val;
276 response = decideResponse(headeraddr, oidcreated, &numcreated, oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked,
277 &vmatch_lock, &vmatch_nolock, &vnomatch, &numoidmod, &numoidread);
278 if(response == TRANS_ABORT) {
280 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
287 /* Decide the final response */
288 if(vmatch_nolock == (numoidread + numoidmod)) {
290 transCommitProcess(oidcreated, &numcreated, oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
291 response = TRANS_COMMIT;
293 if(vmatch_lock > 0 && vnomatch == 0) {
295 response = TRANS_SOFT_ABORT;
300 /* ===========================================================================
302 * - increments counters that keep track of objects read, modified or locked
303 * - updates the oids locked and oids newly created
304 * ===========================================================================
306 char decideResponse(objheader_t *headeraddr, unsigned int *oidcreated, int *numcreated, unsigned int* oidrdlocked, int *numoidrdlocked,
307 unsigned int*oidwrlocked, int *numoidwrlocked, int *vmatch_lock, int *vmatch_nolock, int *vnomatch, int *numoidmod, int *numoidread) {
308 unsigned short version = headeraddr->version;
309 unsigned int oid = OID(headeraddr);
310 if(STATUS(headeraddr) & NEW) {
311 oidcreated[(*numcreated)++] = OID(headeraddr);
312 } else if(STATUS(headeraddr) & DIRTY) {
314 /* Read from the main heap and compare versions */
315 objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t));
316 if(write_trylock(STATUSPTR(header))) { //can aquire write lock
317 if (version == header->version) {/* versions match */
318 /* Keep track of objects locked */
320 oidwrlocked[(*numoidwrlocked)++] = OID(header);
323 oidwrlocked[(*numoidwrlocked)++] = OID(header);
326 } else { /* cannot aquire lock */
327 if(version == header->version) /* versions match */
336 /* Read from the main heap and compare versions */
337 objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t));
338 if(read_trylock(STATUSPTR(header))) { //can further aquire read locks
339 if(version == header->version) {/* versions match */
341 oidrdlocked[(*numoidrdlocked)++] = OID(header);
344 oidrdlocked[(*numoidrdlocked)++] = OID(header);
347 } else { /* cannot aquire lock */
348 if(version == header->version)
359 /* ==================================
362 * =================================
364 int transAbortProcess(unsigned int *oidrdlocked, int *numoidrdlocked, unsigned int *oidwrlocked, int *numoidwrlocked) {
367 /* Release read locks */
368 for(i=0; i< *numoidrdlocked; i++) {
369 /* Read from the main heap */
370 if((header = (objheader_t *)(((char *)(&oidrdlocked[i])) - sizeof(objheader_t))) == NULL) {
371 printf("Error: %s() main heap returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
374 read_unlock(STATUSPTR(header));
377 /* Release write locks */
378 for(i=0; i< *numoidwrlocked; i++) {
379 /* Read from the main heap */
380 if((header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t))) == NULL) {
381 printf("Error: %s() main heap returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
384 write_unlock(STATUSPTR(header));
388 /* ==================================
391 * =================================
393 int transCommmitProcess(unsigned int *oidcreated, int *numoidcreated, unsigned int *oidrdlocked, int *numoidrdlocked,
394 unsigned int *oidwrlocked, int *numoidwrlocked) {
395 objheader_t *header, *tcptr;
399 /* If object is newly created inside transaction then commit it */
400 for (i = 0; i < *numoidcreated; i++) {
401 if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
402 printf("Error: %s() chashSearch returned NULL for oid = %x at %s, %d\n", __func__, oidcreated[i], __FILE__, __LINE__);
406 GETSIZE(tmpsize, header);
407 tmpsize += sizeof(objheader_t);
408 /* FIXME Is this correct? */
410 ptrcreate = mygcmalloc((struct garbagelist *)header, tmpsize);
412 ptrcreate = FREEMALLOC(tmpsize);
414 /* Initialize read and write locks */
415 initdsmlocks(STATUSPTR(header));
416 memcpy(ptrcreate, header, tmpsize);
419 /* Copy from transaction cache -> main object store */
420 for (i = 0; i < *numoidwrlocked; i++) {
421 /* Read from the main heap */
422 if((header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t))) == NULL) {
423 printf("Error: %s() main heap returns NULL at %s, %d\n", __func__, __FILE__, __LINE__);
426 if ((tcptr = ((objheader_t *) t_chashSearch(oidwrlocked[i]))) == NULL) {
427 printf("Error: %s() chashSearch returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
431 GETSIZE(tmpsize, header);
432 char *tmptcptr = (char *) tcptr;
434 struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
435 struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
436 dst->___cachedCode___=src->___cachedCode___;
437 dst->___cachedHash___=src->___cachedHash___;
439 memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
442 header->version += 1;
443 if(header->notifylist != NULL) {
444 notifyAll(&header->notifylist, OID(header), header->version);
448 /* Release read locks */
449 for(i=0; i< *numoidrdlocked; i++) {
450 /* Read from the main heap */
451 header = (objheader_t *)(((char *)(&oidrdlocked[i])) - sizeof(objheader_t));
452 read_unlock(STATUSPTR(header));
455 /* Release write locks */
456 for(i=0; i< *numoidwrlocked; i++) {
457 header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t));
458 write_unlock(STATUSPTR(header));