1 /* ============================================================
3 * - single thread commit on local machine
4 * =============================================================
5 * Copyright (c) 2009, University of California, Irvine, USA.
9 * =============================================================
14 /* Thread transaction variables */
15 __thread objstr_t *t_cache;
18 /* ==================================================
20 * This function starts up the transaction runtime.
21 * ==================================================
27 /* ======================================
29 * - create an object store of given size
30 * ======================================
32 objstr_t *objstrCreate(unsigned int size) {
34 if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
35 printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
40 tmp->top = tmp + 1; //points to end of objstr_t structure!
44 /* =================================================
46 * This function initializes things required in the
48 * =================================================
51 t_cache = objstrCreate(1048576);
52 t_chashCreate(CHASH_SIZE, CLOADFACTOR);
55 /* =======================================================
57 * This function creates objects in the transaction record
58 * =======================================================
60 objheader_t *transCreateObj(unsigned int size) {
61 objheader_t *tmp = (objheader_t *) objstrAlloc(&t_cache, (sizeof(objheader_t) + size));
62 OID(tmp) = getNewOID();
65 t_chashInsert(OID(tmp), tmp);
68 return &tmp[1]; //want space after object header
74 /* This functions inserts randowm wait delays in the order of msec
75 * Mostly used when transaction commits retry*/
82 req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
83 nanosleep(&req, NULL);
87 /* ==============================================
89 * - allocate space in an object store
90 * ==============================================
92 void *objstrAlloc(objstr_t **osptr, unsigned int size) {
95 objstr_t *store=*osptr;
101 if (OSFREE(store)>=size) {
106 if ((store=store->next)==NULL)
111 unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE?size:DEFAULT_OBJ_STORE_SIZE;
112 objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
117 os->top=((char *)ptr)+size;
122 /* =============================================================
124 * -finds the objects either in main heap
125 * -copies the object into the transaction cache
126 * =============================================================
128 __attribute__((pure)) objheader_t *transRead(unsigned int oid) {
129 unsigned int machinenumber;
130 objheader_t *tmp, *objheader;
131 objheader_t *objcopy;
134 /* Read from the main heap */
135 objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t));
136 if(read_trylock(STATUSPTR(header))) { //Can further acquire read locks
137 GETSIZE(size, header);
138 size += sizeof(objheader_t);
139 objcopy = (objheader_t *) objstrAlloc(&t_cache, size);
140 memcpy(objcopy, header, size);
141 /* Insert into cache's lookup table */
143 t_chashInsert(OID(header), objcopy);
150 read_unlock(STATUSPTR(header));
153 /* ================================================================
155 * - This function initiates the transaction commit process
156 * - goes through the transaction cache and decides
158 * ================================================================
162 char treplyretry; /* keeps track of the common response that needs to be sent */
166 /* Look through all the objects in the transaction hash table */
167 finalResponse = traverseCache(&treplyretry);
168 if(finalResponse == TRANS_ABORT) {
171 if(finalResponse == TRANS_COMMIT) {
174 /* wait a random amount of time before retrying to commit transaction*/
175 if(treplyretry && (finalResponse == TRANS_SOFT_ABORT)) {
178 if(finalResponse != TRANS_ABORT || finalResponse != TRANS_COMMIT || finalResponse != TRANS_SOFT_ABORT) {
179 printf("Error: in %s() Unknown outcome", __func__);
182 /* Retry trans commit procedure during soft_abort case */
183 } while (treplyretry);
185 if(finalResponse == TRANS_ABORT) {
187 objstrDelete(t_cache);
190 } else if(finalResponse == TRANS_COMMIT) {
192 objstrDelete(t_cache);
196 //TODO Add other cases
197 printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
203 /* ==================================================
205 * - goes through the transaction cache and
206 * - decides if a transaction should commit or abort
207 * ==================================================
209 char traverseCache(char *treplyretry) {
210 /* Create info for newly creately objects */
212 unsigned int oidcreated[c_numelements];
213 /* Create info to keep track of objects that can be locked */
214 int numoidrdlocked=0;
215 int numoidwrlocked=0;
216 unsigned int oidrdlocked[c_numelements];
217 unsigned int oidwrlocked[c_numelements];
218 /* Counters to decide final response of this transaction */
227 chashlistnode_t *ptr = c_table;
228 /* Represents number of bins in the chash table */
229 unsigned int size = c_size;
230 for(i = 0; i<size; i++) {
231 chashlistnode_t *curr = &ptr[i];
232 /* Inner loop to traverse the linked list of the cache lookupTable */
233 while(curr != NULL) {
234 //if the first bin in hash table is empty
237 objheader_t * headeraddr=(objheader_t *) curr->val;
238 response = decideResponse(headeraddr, oidcreated, &numcreated, oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked,
239 &vmatch_lock, &vmatch_nolock, &vnomatch, &numoidmod, &numoidread);
240 if(response == TRANS_ABORT) {
242 transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
249 /* Decide the final response */
250 if(vmatch_nolock == (numoidread + numoidmod)) {
252 transCommitProcess(oidcreated, &numcreated, oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
253 response = TRANS_COMMIT;
255 if(vmatch_lock > 0 && vnomatch == 0) {
257 response = TRANS_SOFT_ABORT;
262 /* ===========================================================================
264 * - increments counters that keep track of objects read, modified or locked
265 * - updates the oids locked and oids newly created
266 * ===========================================================================
268 char decideResponse(objheader_t *headeraddr, unsigned int *oidcreated, int *numcreated, unsigned int* oidrdlocked, int *numoidrdlocked,
269 unsigned int*oidwrlocked, int *numoidwrlocked, int *vmatch_lock, int *vmatch_nolock, int *vnomatch, int *numoidmod, int *numoidread) {
270 unsigned short version = headeraddr->version;
271 unsigned int oid = OID(headeraddr);
272 if(STATUS(headeraddr) & NEW) {
273 oidcreated[(*numcreated)++] = OID(headeraddr);
274 } else if(STATUS(headeraddr) & DIRTY) {
276 /* Read from the main heap and compare versions */
277 objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t));
278 if(write_trylock(STATUSPTR(header))) { //can aquire write lock
279 if (version == header->version) {/* versions match */
280 /* Keep track of objects locked */
282 oidwrlocked[(*numoidwrlocked)++] = OID(header);
285 oidwrlocked[(*numoidwrlocked)++] = OID(header);
288 } else { /* cannot aquire lock */
289 if(version == header->version) /* versions match */
298 /* Read from the main heap and compare versions */
299 objheader_t *header = (objheader_t *)(((char *)(&oid)) - sizeof(objheader_t));
300 if(read_trylock(STATUSPTR(header))) { //can further aquire read locks
301 if(version == header->version) {/* versions match */
303 oidrdlocked[(*numoidrdlocked)++] = OID(header);
306 oidrdlocked[(*numoidrdlocked)++] = OID(header);
309 } else { /* cannot aquire lock */
310 if(version == header->version)
321 /* ==================================
324 * =================================
326 int transAbortProcess(unsigned int *oidrdlocked, int *numoidrdlocked, unsigned int *oidwrlocked, int *numoidwrlocked) {
329 /* Release read locks */
330 for(i=0; i< *numoidrdlocked; i++) {
331 /* Read from the main heap */
332 if((header = (objheader_t *)(((char *)(&oidrdlocked[i])) - sizeof(objheader_t))) == NULL) {
333 printf("Error: %s() main heap returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
336 read_unlock(STATUSPTR(header));
339 /* Release write locks */
340 for(i=0; i< *numoidwrlocked; i++) {
341 /* Read from the main heap */
342 if((header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t))) == NULL) {
343 printf("Error: %s() main heap returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
346 write_unlock(STATUSPTR(header));
350 /* ==================================
353 * =================================
355 int transCommmitProcess(unsigned int *oidcreated, int *numoidcreated, unsigned int *oidrdlocked, int *numoidrdlocked,
356 unsigned int *oidwrlocked, int *numoidwrlocked) {
357 objheader_t *header, *tcptr;
361 /* If object is newly created inside transaction then commit it */
362 for (i = 0; i < *numoidcreated; i++) {
363 if ((header = ((objheader_t *) t_chashSearch(oidcreated[i]))) == NULL) {
364 printf("Error: %s() chashSearch returned NULL for oid = %x at %s, %d\n", __func__, oidcreated[i], __FILE__, __LINE__);
368 GETSIZE(tmpsize, header);
369 tmpsize += sizeof(objheader_t);
370 /* FIXME Is this correct? */
372 ptrcreate = mygcmalloc((struct garbagelist *)header, tmpsize);
374 ptrcreate = FREEMALLOC(tmpsize);
376 /* Initialize read and write locks */
377 initdsmlocks(STATUSPTR(header));
378 memcpy(ptrcreate, header, tmpsize);
381 /* Copy from transaction cache -> main object store */
382 for (i = 0; i < *numoidwrlocked; i++) {
383 /* Read from the main heap */
384 if((header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t))) == NULL) {
385 printf("Error: %s() main heap returns NULL at %s, %d\n", __func__, __FILE__, __LINE__);
388 if ((tcptr = ((objheader_t *) t_chashSearch(oidwrlocked[i]))) == NULL) {
389 printf("Error: %s() chashSearch returned NULL at %s, %d\n", __func__, __FILE__, __LINE__);
393 GETSIZE(tmpsize, header);
394 char *tmptcptr = (char *) tcptr;
396 struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
397 struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
398 dst->___cachedCode___=src->___cachedCode___;
399 dst->___cachedHash___=src->___cachedHash___;
401 memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
404 header->version += 1;
405 if(header->notifylist != NULL) {
406 notifyAll(&header->notifylist, OID(header), header->version);
410 /* Release read locks */
411 for(i=0; i< *numoidrdlocked; i++) {
412 /* Read from the main heap */
413 header = (objheader_t *)(((char *)(&oidrdlocked[i])) - sizeof(objheader_t));
414 read_unlock(STATUSPTR(header));
417 /* Release write locks */
418 for(i=0; i< *numoidwrlocked; i++) {
419 header = (objheader_t *)(((char *)(&oidwrlocked[i])) - sizeof(objheader_t));
420 write_unlock(STATUSPTR(header));