Object creation within transaction works now. Yay!
authorerubow <erubow>
Fri, 7 Sep 2007 22:05:25 +0000 (22:05 +0000)
committererubow <erubow>
Fri, 7 Sep 2007 22:05:25 +0000 (22:05 +0000)
Robust/src/Runtime/DSTM/interface/clookup.c
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/mlookup.c
Robust/src/Runtime/DSTM/interface/plookup.c
Robust/src/Runtime/DSTM/interface/plookup.h
Robust/src/Runtime/DSTM/interface/trans.c

index ca7d34c9f5299b8e2b35fb631b13a7a48829d0f0..45ac76c6cbbaa7e2398150010eb27de72e8b72ae 100644 (file)
@@ -46,7 +46,7 @@ unsigned int chashInsert(chashtable_t *table, unsigned int key, void *val) {
        table->numelements++;
        index = chashFunction(table, key);
 #ifdef DEBUG
-       printf("DEBUG -> index = %d, key = %d, val = %x\n", index, key, val);
+       printf("chashInsert(): DEBUG -> index = %d, key = %d, val = %x\n", index, key, val);
 #endif
        if(ptr[index].next == NULL && ptr[index].key == 0) {    // Insert at the first position in the hashtable
                ptr[index].key = key;
index cf85532608355ada140027dc3e43e06f05bdfec3..2574271af720880e874c19ff29c4aa3ba047204a 100644 (file)
@@ -120,6 +120,7 @@ typedef struct fixed_data {
   int mcount;                  /* participant count */
   short numread;               /* no of objects read */
   short nummod;                        /* no of objects modified */
+  short numcreated;            /* no of objects created */
   int sum_bytes;               /* total bytes of modified objects in a transaction */
 } fixed_data_t;
 
@@ -129,6 +130,7 @@ typedef struct trans_req_data {
   unsigned int *listmid;       /* Pointer to array holding list of participants */
   char *objread;               /* Pointer to array holding oid and version number of objects that are only read */ 
   unsigned int *oidmod;                /* Pointer to array holding oids of objects that are modified */
+  unsigned int *oidcreated;            /* Pointer to array holding oids of objects that are newly created */
 } trans_req_data_t;            
 
 /* Structure that holds information of objects that are not found in the participant
@@ -204,7 +206,6 @@ int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
 
 int dstmStartup(const char *);
 void transInit();
-void * dstmalloc(transrecord_t *trans, int size);
 
 void randomdelay(void);
 transrecord_t *transStart();
@@ -217,8 +218,7 @@ int decideResponse(thread_data_array_t *);// Coordinator decides what response t
 char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
 int transAbortProcess(void *, unsigned int *, int, int);
-//int transComProcess(trans_commit_data_t *);
-int transComProcess(void*, unsigned int *, unsigned int *, int, int);
+int transComProcess(void*, unsigned int *, unsigned int *, unsigned int *, int, int, int);
 void prefetch(int, unsigned int *, short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
index 65460bd9a58c3997c0f12f88ea337dbd1c075a81..8aa6e35f3b64e5f2b3b6a2a4f2e38452937ba840 100644 (file)
@@ -311,7 +311,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
                        ptr = modptr;
                        for(i = 0; i< fixed->nummod; i++) {
                                tmp_header = (objheader_t *)ptr;
-                               tmp_header->rcount = 1;
+                               tmp_header->rcount = 0;
                                ptr += sizeof(objheader_t) + classsize[TYPE(tmp_header)];
                        }
                        /* Unlock objects that was locked due to this transaction */
@@ -526,7 +526,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
                        return 1;
                }
                /* Change reference count of older address and free space in objstr ?? */
-               header->rcount = 1; //Not sure what would be the val
+               header->rcount = 0;
 
                /* Change ptr address in mhash table */
                mhashRemove(oidmod[i]);
index 2c95263805946da2ed5126158783b0eff1cf3047..a9cca79536745d1babb8c490f7e4f5a3a465fb0e 100644 (file)
@@ -58,7 +58,7 @@ unsigned int mhashInsert(unsigned int key, void *val) {
                        return 1;
                }
                node->key = key;
-               node->val = val ;
+               node->val = val;
                node->next = ptr[index].next;
                ptr[index].next = node;
        }
index 7bf79ba64a8de33a71ede200f2bdd203113591c1..e542ed944f405908bc58a148238f20ed646b33f5 100644 (file)
@@ -21,20 +21,21 @@ plistnode_t *pCreate(int objects) {
                free(pile);
                return NULL;
        }
-       /*
-       if ((pile->oidread = calloc(objects, sizeof(unsigned int))) == NULL) {
+       if ((pile->oidcreated = calloc(objects, sizeof(unsigned int))) == NULL) {
                printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               free(pile);
+               free(pile->oidmod);
                return NULL;
        }
-       */
        if ((pile->objread = calloc(objects, sizeof(unsigned int) + sizeof(short))) == NULL) {
                printf("Calloc error %s %d\n", __FILE__, __LINE__);
                free(pile);
                free(pile->oidmod);
+               free(pile->oidcreated);
                return NULL;
        }
 
-       pile->nummod = pile->numread = pile->sum_bytes = 0;
+       pile->nummod = pile->numread = pile->numcreated = pile->sum_bytes = 0;
        pile->next = NULL;
        return pile;
 }
@@ -49,12 +50,15 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
        //Add oid into a machine that is already present in the pile linked list structure
        while(tmp != NULL) {
                if (tmp->mid == mid) {
-                       if (STATUS(headeraddr) & DIRTY) {
+                       if (STATUS(headeraddr) & NEW) {
+                               tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
+                               tmp->numcreated = tmp->numcreated + 1;
+                               tmp->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
+                       }       else if (STATUS(headeraddr) & DIRTY) {
                                tmp->oidmod[tmp->nummod] = OID(headeraddr);
                                tmp->nummod = tmp->nummod + 1;
                                tmp->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
                        } else {
-               //              tmp->oidread[tmp->numread] = OID(headeraddr);
                                offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
                                *((unsigned int *)(tmp->objread + offset))=OID(headeraddr);
                                offset += sizeof(unsigned int);
@@ -72,12 +76,15 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
                        return NULL;
                }
                ptr->mid = mid;
-               if (STATUS(headeraddr) & DIRTY) {
+               if (STATUS(headeraddr) & NEW) {
+                       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
+                       ptr->numcreated = ptr->numcreated + 1;
+                       ptr->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
+               } else if (STATUS(headeraddr) & DIRTY) {
                        ptr->oidmod[ptr->nummod] = OID(headeraddr);
                        ptr->nummod = ptr->nummod + 1;
                        ptr->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
                } else {
-               //      ptr->oidread[ptr->numread] = OID(headeraddr);
                        *((unsigned int *)ptr->objread)=OID(headeraddr);
                        memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
                        ptr->numread = ptr->numread + 1;
@@ -86,6 +93,9 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
                pile = ptr;
        }
 
+       STATUS(headeraddr) &= ~(NEW);
+       STATUS(headeraddr) &= ~(DIRTY);
+
        return pile;
 }
 
index e35c451b3b2fd67b02487b0bf646774df5f59c14..777f0259adca6a644e6ec5dcc1f152c5bbbce17b 100644 (file)
  * participants involved in a transaction. */
 typedef struct plistnode {
        unsigned int mid;
-       int local;              /* Variable that keeps track if this pile is for LOCAL machine */
-       unsigned int *oidmod;   /* Pointer to array containing oids of modified objects */ 
-       unsigned int *oidread;  /* TODO: REMOVE THIS Pointer to array of objects read */
-       int nummod;             /* no of objects read */
-       int numread;            /* no of objects modified */
+       short numread;          /* no of objects modified */
+       short nummod;           /* no of objects read */
+       short numcreated; /* no of objects created */
        int sum_bytes;          /* total bytes of objects modified */
        char *objread;          /* Pointer to array containing oids of objects read and their version numbers*/
+       unsigned int *oidmod;   /* Pointer to array containing oids of modified objects */ 
+       unsigned int *oidcreated;       /* Pointer to array containing oids of newly created objects */ 
        struct plistnode *next;
 } plistnode_t;
 
index 8e86b346043243c6a969be4bd2861d95dde75fca..83bdaa6fcbc9c3d7ff9fac267108609f23e38479 100644 (file)
@@ -33,6 +33,7 @@ extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
 pthread_t tPrefetch;
 extern objstr_t *mainobjstore;
+unsigned int myIpAddr;
 
 plistnode_t *createPiles(transrecord_t *);
 inline int arrayLength(int *array) {
@@ -80,22 +81,14 @@ void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfi
        pthread_mutex_unlock(&pqueue.qlock);
 }
 
-/* This function allocates an object on the local machine */
-//FIXME
-
-void * dstmalloc(transrecord_t *trans, int size) {
-  objheader_t * newobj=(objheader_t *)objstrAlloc(trans->cache, size+sizeof(objheader_t));
-  //Need to assign OID
-
-  return newobj;
-}
-
 /* This function starts up the transaction runtime. */
 int dstmStartup(const char * option) {
   pthread_t thread_Listen;
   pthread_attr_t attr;
   int master=strcmp(option, "master")==0;
 
+       myIpAddr = getMyIpAddr("eth0");
+
   dstmInit();
   transInit();
 
@@ -180,7 +173,6 @@ transrecord_t *transStart()
 /* This function finds the location of the objects involved in a transaction
  * and returns the pointer to the object if found in a remote location */
 objheader_t *transRead(transrecord_t *record, unsigned int oid) {      
-       printf("Inside transaction read call\n");
        unsigned int machinenumber;
        objheader_t *tmp, *objheader;
        void *objcopy;
@@ -196,11 +188,9 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
 
        /* Search local transaction cache */
        if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
-               printf("Inside transaction cache \n");
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                /* Look up in machine lookup table  and copy  into cache*/
-               printf("Inside mainobject store \n");
                tmp = mhashSearch(oid);
                size = sizeof(objheader_t)+classsize[TYPE(tmp)];
                objcopy = objstrAlloc(record->cache, size);
@@ -209,7 +199,6 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) {
                chashInsert(record->lookupTable, OID(objheader), objcopy); 
                return(objcopy);
        } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
-               printf("Inside prefetch cache \n");
                found = 1;
                size = sizeof(objheader_t)+classsize[TYPE(tmp)];
                objcopy = objstrAlloc(record->cache, size);
@@ -262,7 +251,7 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
        OID(tmp) = getNewOID();
        TYPE(tmp) = type;
        tmp->version = 1;
-       tmp->rcount = 0; //? not sure how to handle this yet
+       tmp->rcount = 1;
        STATUS(tmp) = NEW;
        chashInsert(record->lookupTable, OID(tmp), tmp);
        return tmp;
@@ -278,7 +267,7 @@ plistnode_t *createPiles(transrecord_t *record) {
        unsigned int machinenum;
        void *localmachinenum;
        objheader_t *headeraddr;
-
+       
        ptr = record->lookupTable->table;
        size = record->lookupTable->size;
 
@@ -291,30 +280,26 @@ plistnode_t *createPiles(transrecord_t *record) {
                                break;
                        }
                        next = curr->next;
-                       //Get machine location for object id
-                       //TODO Check is the object is newly created ...if not then lookup the location table 
 
-                       if ((machinenum = lhashSearch(curr->key)) == 0) {
-                               printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
+                       if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
+                               printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
                                return NULL;
                        }
 
-                       if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
-                               printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
+                       //Get machine location for object id (and whether local or not)
+                       if (STATUS(headeraddr) & NEW || mhashSearch(curr->key) != NULL) {
+                               machinenum = myIpAddr;
+                       } else  if ((machinenum = lhashSearch(curr->key)) == 0) {
+                               printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
                                return NULL;
                        }
+
                        //Make machine groups
                        if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
                                printf("pInsert error %s, %d\n", __FILE__, __LINE__);
                                return NULL;
                        }
 
-                       /* Check if local or not */
-                       if((localmachinenum = mhashSearch(curr->key)) != NULL) { 
-                               /* Set the pile->local flag*/
-                               pile->local = 1; //True i.e. local
-                       }
-
                        curr = next;
                }
        }
@@ -339,126 +324,125 @@ int transCommit(transrecord_t *record) {
        char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
        char localstat = 0;
 
+       do {
 
-       /* Look through all the objects in the transaction record and make piles 
-        * for each machine involved in the transaction*/
-       pile_ptr = pile = createPiles(record);
-
-       /* Create the packet to be sent in TRANS_REQUEST */
+               /* Look through all the objects in the transaction record and make piles 
+                * for each machine involved in the transaction*/
+               pile_ptr = pile = createPiles(record);
 
-       /* Count the number of participants */
-       pilecount = pCount(pile);
+               /* Create the packet to be sent in TRANS_REQUEST */
 
-       /* Create a list of machine ids(Participants) involved in transaction   */
-       if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
-               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-               return 1;
-       }               
-       pListMid(pile, listmid);
+               /* Count the number of participants */
+               pilecount = pCount(pile);
 
-
-       /* Initialize thread variables,
-        * Spawn a thread for each Participant involved in a transaction */
-       pthread_t thread[pilecount];
-       pthread_attr_t attr;                    
-       pthread_cond_t tcond;
-       pthread_mutex_t tlock;
-       pthread_mutex_t tlshrd;
-
-       thread_data_array_t *thread_data_array;
-       thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
-       local_thread_data_array_t *ltdata;
-       if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
-               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
-               return 1;
-       }
-
-       thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
-
-       /* Initialize and set thread detach attribute */
-       pthread_attr_init(&attr);
-       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-       pthread_mutex_init(&tlock, NULL);
-       pthread_cond_init(&tcond, NULL);
-
-       /* Process each machine pile */
-       while(pile != NULL) {
-               //Create transaction id
-               newtid++;
-               if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
+               /* Create a list of machine ids(Participants) involved in transaction   */
+               if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
+                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                       return 1;
+               }               
+               pListMid(pile, listmid);
+
+
+               /* Initialize thread variables,
+                * Spawn a thread for each Participant involved in a transaction */
+               pthread_t thread[pilecount];
+               pthread_attr_t attr;                    
+               pthread_cond_t tcond;
+               pthread_mutex_t tlock;
+               pthread_mutex_t tlshrd;
+
+               thread_data_array_t *thread_data_array;
+               thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
+               local_thread_data_array_t *ltdata;
+               if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
                        printf("Calloc error %s, %d\n", __FILE__, __LINE__);
                        return 1;
                }
-               tosend->f.control = TRANS_REQUEST;
-               sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
-               tosend->f.mcount = pilecount;
-               tosend->f.numread = pile->numread;
-               tosend->f.nummod = pile->nummod;
-               tosend->f.sum_bytes = pile->sum_bytes;
-               tosend->listmid = listmid;
-               tosend->objread = pile->objread;
-               tosend->oidmod = pile->oidmod;
-               thread_data_array[threadnum].thread_id = threadnum;
-               thread_data_array[threadnum].mid = pile->mid;
-               thread_data_array[threadnum].pilecount = pilecount;
-               thread_data_array[threadnum].buffer = tosend;
-               thread_data_array[threadnum].recvmsg = rcvd_control_msg;
-               thread_data_array[threadnum].threshold = &tcond;
-               thread_data_array[threadnum].lock = &tlock;
-               thread_data_array[threadnum].count = &trecvcount;
-               thread_data_array[threadnum].replyctrl = &treplyctrl;
-               thread_data_array[threadnum].replyretry = &treplyretry;
-               thread_data_array[threadnum].rec = record;
-               /* If local do not create any extra connection */
-               if(pile->local != 1) { /* Not local */
-                       rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);  
-                       if (rc) {
-                               perror("Error in pthread create\n");
+
+               thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
+
+               /* Initialize and set thread detach attribute */
+               pthread_attr_init(&attr);
+               pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+               pthread_mutex_init(&tlock, NULL);
+               pthread_cond_init(&tcond, NULL);
+
+               /* Process each machine pile */
+               while(pile != NULL) {
+                       //Create transaction id
+                       newtid++;
+                       if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
+                               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
                                return 1;
                        }
-               } else { /*Local*/
-                       /*Unset the pile->local flag*/
-                       pile->local = 0;
-                       /*Set flag to identify that Local machine is involved*/
-                       ltdata->tdata = &thread_data_array[threadnum];
-                       ltdata->transinfo = &transinfo;
-                       val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
-                       if (val) {
-                               perror("Error in pthread create\n");
-                               return 1;
+                       tosend->f.control = TRANS_REQUEST;
+                       sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
+                       tosend->f.mcount = pilecount;
+                       tosend->f.numread = pile->numread;
+                       tosend->f.nummod = pile->nummod;
+                       tosend->f.numcreated = pile->numcreated;
+                       tosend->f.sum_bytes = pile->sum_bytes;
+                       tosend->listmid = listmid;
+                       tosend->objread = pile->objread;
+                       tosend->oidmod = pile->oidmod;
+                       tosend->oidcreated = pile->oidcreated;
+                       thread_data_array[threadnum].thread_id = threadnum;
+                       thread_data_array[threadnum].mid = pile->mid;
+                       thread_data_array[threadnum].pilecount = pilecount;
+                       thread_data_array[threadnum].buffer = tosend;
+                       thread_data_array[threadnum].recvmsg = rcvd_control_msg;
+                       thread_data_array[threadnum].threshold = &tcond;
+                       thread_data_array[threadnum].lock = &tlock;
+                       thread_data_array[threadnum].count = &trecvcount;
+                       thread_data_array[threadnum].replyctrl = &treplyctrl;
+                       thread_data_array[threadnum].replyretry = &treplyretry;
+                       thread_data_array[threadnum].rec = record;
+                       /* If local do not create any extra connection */
+                       if(pile->mid != myIpAddr) { /* Not local */
+                               rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);  
+                               if (rc) {
+                                       perror("Error in pthread create\n");
+                                       return 1;
+                               }
+                       } else { /*Local*/
+                               ltdata->tdata = &thread_data_array[threadnum];
+                               ltdata->transinfo = &transinfo;
+                               val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
+                               if (val) {
+                                       perror("Error in pthread create\n");
+                                       return 1;
+                               }
                        }
+                       threadnum++;            
+                       pile = pile->next;
                }
-               threadnum++;            
-               pile = pile->next;
-       }
 
-       /* Free attribute and wait for the other threads */
-       pthread_attr_destroy(&attr);
-       for (i = 0 ;i < pilecount ; i++) {
-               rc = pthread_join(thread[i], NULL);
-               if (rc)
-               {
-                       printf("ERROR return code from pthread_join() is %d\n", rc);
-                       return 1;
+               /* Free attribute and wait for the other threads */
+               pthread_attr_destroy(&attr);
+               for (i = 0 ;i < pilecount ; i++) {
+                       rc = pthread_join(thread[i], NULL);
+                       if (rc)
+                       {
+                               printf("ERROR return code from pthread_join() is %d\n", rc);
+                               return 1;
+                       }
+                       free(thread_data_array[i].buffer);
                }
-               free(thread_data_array[i].buffer);
-       }
 
-       /* Free resources */    
-       pthread_cond_destroy(&tcond);
-       pthread_mutex_destroy(&tlock);
-       free(listmid);
-       pDelete(pile_ptr);
-       free(thread_data_array);
-       free(ltdata);
+               /* Free resources */    
+               pthread_cond_destroy(&tcond);
+               pthread_mutex_destroy(&tlock);
+               free(listmid);
+               pDelete(pile_ptr);
+               free(thread_data_array);
+               free(ltdata);
 
-       /* Retry trans commit procedure if not sucessful in the first try */
-       if(treplyretry == 1) {
                /* wait a random amount of time */
-               randomdelay();
-               /* Retry the commiting transaction again */
-               transCommit(record);
-       }
+               if (treplyretry == 1)
+                       randomdelay();
+
+       /* Retry trans commit procedure if not sucessful in the first try */
+       } while (treplyretry == 1);
 
        return 0;
 }
@@ -584,17 +568,14 @@ int decideResponse(thread_data_array_t *tdata) {
                                                           written onto the shared array */
                switch(control) {
                        case TRANS_DISAGREE:
-                               printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
                                transdisagree++;
                                break;
 
                        case TRANS_AGREE:
-                               printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
                                transagree++;
                                break;
 
                        case TRANS_SOFT_ABORT:
-                               printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
                                transsoftabort++;
                                break;
                        default:
@@ -606,7 +587,6 @@ int decideResponse(thread_data_array_t *tdata) {
        /* Send Abort */
        if(transdisagree > 0) {
                *(tdata->replyctrl) = TRANS_ABORT;
-               printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
                /* Free resources */
                objstrDelete(tdata->rec->cache);
                chashDelete(tdata->rec->lookupTable);
@@ -614,7 +594,6 @@ int decideResponse(thread_data_array_t *tdata) {
        } else if(transagree == tdata->pilecount){
                /* Send Commit */
                *(tdata->replyctrl) = TRANS_COMMIT;
-               printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
                /* Free resources */
                objstrDelete(tdata->rec->cache);
                chashDelete(tdata->rec->lookupTable);
@@ -623,9 +602,7 @@ int decideResponse(thread_data_array_t *tdata) {
                /* Send Abort in soft abort case followed by retry commiting transaction again*/
                *(tdata->replyctrl) = TRANS_ABORT;
                *(tdata->replyretry) = 1;
-               printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
        } else {
-               printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
                return -1;
        }
 
@@ -716,7 +693,6 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        }
        switch(control) {
                case OBJECT_NOT_FOUND:
-                       printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
                        return NULL;
                case OBJECT_FOUND:
                        /* Read object if found into local cache */
@@ -752,7 +728,7 @@ void *handleLocalReq(void *threadarg) {
        short version;
        char control = 0, *ptr;
        unsigned int oid;
-       unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL;
+       unsigned int *oidnotfound = NULL, *oidlocked = NULL;
        void *mobj, *modptr;
        objheader_t *headptr, *headeraddr;
        local_thread_data_array_t *localtdata;
@@ -778,6 +754,13 @@ void *handleLocalReq(void *threadarg) {
                memcpy(modptr+offset, headeraddr, size);  
                offset += size;
        }
+       /* Write new objects into the mainobject store */
+       for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) {
+               headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]);
+               size = sizeof(objheader_t) + classsize[TYPE(headeraddr)];
+               memcpy(modptr+offset, headeraddr, size);  
+               offset += size;
+       }
 
        ptr = modptr;
        offset = 0; //Reset 
@@ -813,7 +796,6 @@ void *handleLocalReq(void *threadarg) {
                                        v_nomatch++;
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
                                }
                        } else {/* If Obj is not locked then lock object */
                                STATUS(((objheader_t *)mobj)) |= LOCK;
@@ -829,7 +811,6 @@ void *handleLocalReq(void *threadarg) {
                                        v_nomatch++;
                                        /* Send TRANS_DISAGREE to Coordinator */
                                        localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
-                                       printf("DEBUG -> Sending TRANS_DISAGREE\n");
                                }
                        }
                }
@@ -838,12 +819,10 @@ void *handleLocalReq(void *threadarg) {
        /* Condition to send TRANS_AGREE */
        if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
                localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
-               printf("DEBUG -> Sending TRANS_AGREE\n");
        }
        /* Condition to send TRANS_SOFT_ABORT */
        if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
                localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
-               printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
                //TODO  currently the only soft abort case that is supported is when object locked by previous
                //transaction => v_matchlock > 0 
                //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported 
@@ -888,15 +867,13 @@ void *handleLocalReq(void *threadarg) {
                        return NULL;
                }
        }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
-               if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->transinfo->numlocked) != 0) {
+               if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) {
                        printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
                        return NULL;
                }
        }
 
        /* Free memory */
-       printf("DEBUG -> Freeing...\n");
-       fflush(stdout);
 
        if (localtdata->transinfo->objlocked != NULL) {
                free(localtdata->transinfo->objlocked);
@@ -917,12 +894,11 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
        objheader_t *tmp_header;
        void *header;
 
-       printf("DEBUG -> Recv TRANS_ABORT\n");
        /* Set all ref counts as 1 and do garbage collection */
        ptr = (char *)modptr;
        for(i = 0; i< nummod; i++) {
                tmp_header = (objheader_t *)ptr;
-               tmp_header->rcount = 1;
+               tmp_header->rcount = 0;
                ptr += sizeof(objheader_t) + classsize[TYPE(tmp_header)];
        }
        /* Unlock objects that was locked due to this transaction */
@@ -935,7 +911,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
        }
 
        /* Send ack to Coordinator */
-       printf("DEBUG-> TRANS_SUCCESSFUL\n");
 
        /*Free the pointer */
        ptr = NULL;
@@ -944,7 +919,7 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
 
 /*This function completes the COMMIT process is the transaction is commiting
 */
-int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, int nummod, int numlocked) {
+int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated, unsigned int *objlocked, int nummod, int numcreated, int numlocked) {
        objheader_t *header;
        int i = 0, offset = 0;
        char control;
@@ -956,10 +931,10 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked,
                        return 1;
                }
                /* Change reference count of older address and free space in objstr ?? */
-               header->rcount = 1; //TODO Not sure what would be the val
+               header->rcount = 0;
 
                /* Change ptr address in mhash table */
-               mhashRemove(oidmod[i]);
+               mhashRemove(oidmod[i]); //TODO: this shouldn't be necessary
                mhashInsert(oidmod[i], (((char *)modptr) + offset));
                offset += sizeof(objheader_t) + classsize[TYPE(header)];
 
@@ -968,6 +943,15 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked,
                header->version += 1;
        }
 
+       for (i = 0; i < numcreated; i++)
+       {
+               header = (objheader_t *)(((char *)modptr) + offset);
+               mhashInsert(oidcreated[i], (((char *)modptr) + offset));
+               offset += sizeof(objheader_t) + classsize[TYPE(header)];
+
+               lhashInsert(oidcreated[i], myIpAddr);
+       }
+
        /* Unlock locked objects */
        for(i = 0; i < numlocked; i++) {
                if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) {
@@ -980,7 +964,6 @@ int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked,
        //TODO Update location lookup table
 
        /* Send ack to Coordinator */
-       printf("DEBUG-> TRANS_SUCESSFUL\n");
        return 0;
 }