Minor bug fix for trans abort case
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "machinepile.h"
5 #include "mlookup.h"
6 #include "llookup.h"
7 #include "plookup.h"
8 #include "prelookup.h"
9 #include "queue.h"
10 #include <pthread.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <netdb.h>
14 #include <netinet/in.h>
15 #include <sys/types.h>
16 #include <unistd.h>
17 #include <errno.h>
18 #include <time.h>
19 #include <string.h>
20 #include <pthread.h>
21
22 #define LISTEN_PORT 2156
23 #define RECEIVE_BUFFER_SIZE 2048
24 #define NUM_THREADS 10
25 #define PREFETCH_CACHE_SIZE 1048576 //1MB
26 #define CONFIG_FILENAME "dstm.conf"
27
28 /* Global Variables */
29 extern int classsize[];
30 extern primarypfq_t pqueue; //Shared prefetch queue
31 extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
32 objstr_t *prefetchcache; //Global Prefetch cache
33 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
34 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
35 extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
36 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
37 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
38 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
39 extern objstr_t *mainobjstore;
40 unsigned int myIpAddr;
41 unsigned int *hostIpAddrs;
42 int sizeOfHostArray;
43 int numHostsInSystem;
44 int myIndexInHostArray;
45 unsigned int oidsPerBlock;
46 unsigned int oidMin;
47 unsigned int oidMax;
48
49 plistnode_t *createPiles(transrecord_t *);
50 inline int arrayLength(int *array) {
51         int i;
52         for(i=0 ;array[i] != -1; i++)
53                 ;
54         return i;
55 }
56 inline int findmax(int *array, int arraylength) {
57         int max, i;
58         max = array[0];
59         for(i = 0; i < arraylength; i++){
60                 if(array[i] > max) {
61                         max = array[i];
62                 }
63         }
64         return max;
65 }
66 /* This function is a prefetch call generated by the compiler that
67  * populates the shared primary prefetch queue*/
68 void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
69         int qnodesize;
70         int len = 0;
71
72         /* Allocate for the queue node*/
73         char *node;
74         qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); 
75         if((node = calloc(1, qnodesize)) == NULL) {
76                 printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
77                 return;
78         }
79         /* Set queue node values */
80         len = sizeof(prefetchqelem_t);
81         memcpy(node + len, &ntuples, sizeof(int));
82         len += sizeof(int);
83         memcpy(node + len, oids, ntuples*sizeof(unsigned int));
84         len += ntuples * sizeof(unsigned int);
85         memcpy(node + len, endoffsets, ntuples*sizeof(short));
86         len += ntuples * sizeof(short);
87         memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
88         /* Lock and insert into primary prefetch queue */
89         pthread_mutex_lock(&pqueue.qlock);
90         pre_enqueue((prefetchqelem_t *)node);
91         pthread_cond_signal(&pqueue.qcond);
92         pthread_mutex_unlock(&pqueue.qlock);
93 }
94
95 /* This function starts up the transaction runtime. */
96 int dstmStartup(const char * option) {
97   pthread_t thread_Listen;
98   pthread_attr_t attr;
99   int master=option!=NULL && strcmp(option, "master")==0;
100
101         if (processConfigFile() != 0)
102                 return 0; //TODO: return error value, cause main program to exit
103
104   dstmInit();
105   transInit();
106
107   if (master) {
108     pthread_attr_init(&attr);
109     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
110     pthread_create(&thread_Listen, &attr, dstmListen, NULL);
111     return 1;
112   } else {
113     dstmListen();
114     return 0;
115   }
116
117 }
118
119
120 /* This function initiates the prefetch thread
121  * A queue is shared between the main thread of execution
122  * and the prefetch thread to process the prefetch call
123  * Call from compiler populates the shared queue with prefetch requests while prefetch thread
124  * processes the prefetch requests */
125 void transInit() {
126         int t, rc;
127         //Create and initialize prefetch cache structure
128         prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
129
130         /* Initialize attributes for mutex */
131         pthread_mutexattr_init(&prefetchcache_mutex_attr);
132         pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
133         
134         pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
135
136         //Create prefetch cache lookup table
137         if(prehashCreate(HASH_SIZE, LOADFACTOR))
138                 return; //Failure
139         //Initialize primary shared queue
140         queueInit();
141         //Initialize machine pile w/prefetch oids and offsets shared queue
142         mcpileqInit();
143         //Create the primary prefetch thread 
144         pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
145         //Create and Initialize a pool of threads 
146         /* Threads are active for the entire period runtime is running */
147         for(t = 0; t< NUM_THREADS; t++) {
148                 rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
149                 if (rc) {
150                         printf("Thread create error %s, %d\n", __FILE__, __LINE__);
151                         return;
152                 }
153         }
154 }
155
156 /* This function stops the threads spawned */
157 void transExit() {
158         int t;
159         pthread_cancel(tPrefetch);
160         for(t = 0; t < NUM_THREADS; t++)
161                 pthread_cancel(wthreads[t]);
162
163         return;
164 }
165
166 /* This functions inserts randowm wait delays in the order of msec
167  * Mostly used when transaction commits retry*/
168 void randomdelay(void)
169 {
170         struct timespec req, rem;
171         time_t t;
172
173         t = time(NULL);
174         req.tv_sec = 0;
175         req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
176         //nanosleep(&req, &rem);
177         nanosleep(&req, NULL);
178         return;
179 }
180
181 /* This function initializes things required in the transaction start*/
182 transrecord_t *transStart()
183 {
184         transrecord_t *tmp = malloc(sizeof(transrecord_t));
185         tmp->cache = objstrCreate(1048576);
186         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
187 #ifdef COMPILER
188         tmp->revertlist=NULL;
189 #endif
190         return tmp;
191 }
192
193 /* This function finds the location of the objects involved in a transaction
194  * and returns the pointer to the object if found in a remote location */
195 objheader_t *transRead(transrecord_t *record, unsigned int oid) {       
196         unsigned int machinenumber;
197         objheader_t *tmp, *objheader;
198         objheader_t *objcopy;
199         int size, rc, found = 0;
200         void *buf;
201         struct timespec ts;
202         struct timeval tp;
203         
204         rc = gettimeofday(&tp, NULL);
205
206         /* Convert from timeval to timespec */
207         ts.tv_nsec = tp.tv_usec * 1000;
208
209         /* Search local transaction cache */
210         if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
211 #ifdef COMPILER
212           return &objheader[1];
213 #else
214           return objheader;
215 #endif
216         } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
217                 /* Look up in machine lookup table  and copy  into cache*/
218                 GETSIZE(size, objheader);
219                 size += sizeof(objheader_t);
220                 //TODO:Lock the local trans cache while copying the object here
221                 objcopy = objstrAlloc(record->cache, size);
222                 memcpy(objcopy, (void *)objheader, size);
223                 /* Insert into cache's lookup table */
224                 chashInsert(record->lookupTable, OID(objheader), objcopy); 
225 #ifdef COMPILER
226                 return &objcopy[1];
227 #else
228                 return objcopy;
229 #endif
230         } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
231                 found = 1;
232                 GETSIZE(size, tmp);
233                 size+=sizeof(objheader_t);
234                 //TODO:Lock the local  trans cache while copying the object here
235                 objcopy = objstrAlloc(record->cache, size);
236                 memcpy(objcopy, (void *)tmp, size);
237                 /* Insert into cache's lookup table */
238                 chashInsert(record->lookupTable, OID(tmp), objcopy); 
239 #ifdef COMPILER
240                 return &objcopy[1];
241 #else
242                 return objcopy;
243 #endif
244         } else {
245                 /*If object not found in prefetch cache then block until object appears in the prefetch cache */
246                 pthread_mutex_lock(&pflookup.lock);
247                 while(!found) {
248                         rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
249                         if(rc == ETIMEDOUT) {
250                                 printf("Wait timed out\n");
251                                 /* Check Prefetch cache again */
252                                 if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
253                                         found = 1;
254                                         GETSIZE(size,tmp);
255                                         size+=sizeof(objheader_t);
256                                         objcopy = objstrAlloc(record->cache, size);
257                                         memcpy(objcopy, (void *)tmp, size);
258                                         chashInsert(record->lookupTable, OID(tmp), objcopy); 
259                                         pthread_mutex_unlock(&pflookup.lock);
260 #ifdef COMPILER
261                                         return &objcopy[1];
262 #else
263                                         return objcopy;
264 #endif
265                                 } else {
266                                         pthread_mutex_unlock(&pflookup.lock);
267                                         break;
268                                 }
269                         }
270                 }
271
272                 /* Get the object from the remote location */
273                 machinenumber = lhashSearch(oid);
274                 objcopy = getRemoteObj(record, machinenumber, oid);
275                 if(objcopy == NULL) {
276                         printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
277                         return NULL;
278                 } else {
279 #ifdef COMPILER
280                   return &objcopy[1];
281 #else
282                   return objcopy;
283 #endif
284                 }
285         }
286 }
287
288 /* This function creates objects in the transaction record */
289 objheader_t *transCreateObj(transrecord_t *record, unsigned int size)
290 {
291   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
292   OID(tmp) = getNewOID();
293   tmp->version = 1;
294   tmp->rcount = 1;
295   STATUS(tmp) = NEW;
296   chashInsert(record->lookupTable, OID(tmp), tmp);
297 #ifdef COMPILER
298   return &tmp[1]; //want space after object header
299 #else
300   return tmp;
301 #endif
302 }
303
304 /* This function creates machine piles based on all machines involved in a
305  * transaction commit request */
306 plistnode_t *createPiles(transrecord_t *record) {
307         int i = 0;
308         unsigned int size;/* Represents number of bins in the chash table */
309         chashlistnode_t *curr, *ptr, *next;
310         plistnode_t *pile = NULL;
311         unsigned int machinenum;
312         void *localmachinenum;
313         objheader_t *headeraddr;
314         
315         ptr = record->lookupTable->table;
316         size = record->lookupTable->size;
317
318         for(i = 0; i < size ; i++) {
319                 curr = &ptr[i];
320                 /* Inner loop to traverse the linked list of the cache lookupTable */
321                 while(curr != NULL) {
322                         //if the first bin in hash table is empty
323                         if(curr->key == 0) {
324                                 break;
325                         }
326                         next = curr->next;
327
328                         if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
329                                 printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
330                                 return NULL;
331                         }
332
333                         //Get machine location for object id (and whether local or not)
334                         if (STATUS(headeraddr) & NEW || mhashSearch(curr->key) != NULL) {
335                                 machinenum = myIpAddr;
336                         } else  if ((machinenum = lhashSearch(curr->key)) == 0) {
337                                 printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
338                                 return NULL;
339                         }
340
341                         //Make machine groups
342                         if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
343                                 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
344                                 return NULL;
345                         }
346
347                         curr = next;
348                 }
349         }
350         return pile; 
351 }
352
353 /* This function initiates the transaction commit process
354  * Spawns threads for each of the new connections with Participants 
355  * and creates new piles by calling the createPiles(), 
356  * Sends a transrequest() to each remote machines for objects found remotely 
357  * and calls handleLocalReq() to process objects found locally */
358 int transCommit(transrecord_t *record) {        
359         unsigned int tot_bytes_mod, *listmid;
360         plistnode_t *pile, *pile_ptr;
361         int i, j, rc, val;
362         int pilecount, offset, threadnum, trecvcount;
363         char buffer[RECEIVE_BUFFER_SIZE],control;
364         char transid[TID_LEN];
365         trans_req_data_t *tosend;
366         trans_commit_data_t transinfo;
367         static int newtid = 0;
368         char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
369         char localstat = 0;
370
371         do {
372                 trecvcount = 0;
373                 threadnum = 0;
374
375                 /* Look through all the objects in the transaction record and make piles 
376                  * for each machine involved in the transaction*/
377                 pile_ptr = pile = createPiles(record);
378
379                 /* Create the packet to be sent in TRANS_REQUEST */
380
381                 /* Count the number of participants */
382                 pilecount = pCount(pile);
383
384                 /* Create a list of machine ids(Participants) involved in transaction   */
385                 if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
386                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
387                         free(record);
388                         return 1;
389                 }               
390                 pListMid(pile, listmid);
391
392
393                 /* Initialize thread variables,
394                  * Spawn a thread for each Participant involved in a transaction */
395                 pthread_t thread[pilecount];
396                 pthread_attr_t attr;                    
397                 pthread_cond_t tcond;
398                 pthread_mutex_t tlock;
399                 pthread_mutex_t tlshrd;
400
401                 thread_data_array_t *thread_data_array;
402                 if((thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount)) == NULL) {
403                         printf("Malloc error %s, %d\n", __FILE__, __LINE__);
404                         pthread_cond_destroy(&tcond);
405                         pthread_mutex_destroy(&tlock);
406                         pDelete(pile_ptr);
407                         free(listmid);
408                         free(record);
409                         return 1;
410                 }
411
412                 local_thread_data_array_t *ltdata;
413                 if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
414                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
415                         pthread_cond_destroy(&tcond);
416                         pthread_mutex_destroy(&tlock);
417                         pDelete(pile_ptr);
418                         free(listmid);
419                         free(thread_data_array);
420                         free(record);
421                         return 1;
422                 }
423
424                 thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
425
426                 /* Initialize and set thread detach attribute */
427                 pthread_attr_init(&attr);
428                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
429                 pthread_mutex_init(&tlock, NULL);
430                 pthread_cond_init(&tcond, NULL);
431
432                 /* Process each machine pile */
433                 while(pile != NULL) {
434                         //Create transaction id
435                         newtid++;
436                         if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
437                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
438                                 pthread_cond_destroy(&tcond);
439                                 pthread_mutex_destroy(&tlock);
440                                 pDelete(pile_ptr);
441                                 free(listmid);
442                                 free(thread_data_array);
443                                 free(ltdata);
444                                 free(record);
445                                 return 1;
446                         }
447                         tosend->f.control = TRANS_REQUEST;
448                         sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
449                         tosend->f.mcount = pilecount;
450                         tosend->f.numread = pile->numread;
451                         tosend->f.nummod = pile->nummod;
452                         tosend->f.numcreated = pile->numcreated;
453                         tosend->f.sum_bytes = pile->sum_bytes;
454                         tosend->listmid = listmid;
455                         tosend->objread = pile->objread;
456                         tosend->oidmod = pile->oidmod;
457                         tosend->oidcreated = pile->oidcreated;
458                         thread_data_array[threadnum].thread_id = threadnum;
459                         thread_data_array[threadnum].mid = pile->mid;
460                         thread_data_array[threadnum].buffer = tosend;
461                         thread_data_array[threadnum].recvmsg = rcvd_control_msg;
462                         thread_data_array[threadnum].threshold = &tcond;
463                         thread_data_array[threadnum].lock = &tlock;
464                         thread_data_array[threadnum].count = &trecvcount;
465                         thread_data_array[threadnum].replyctrl = &treplyctrl;
466                         thread_data_array[threadnum].replyretry = &treplyretry;
467                         thread_data_array[threadnum].rec = record;
468                         /* If local do not create any extra connection */
469                         if(pile->mid != myIpAddr) { /* Not local */
470                                 rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
471                                 if(rc) {
472                                         perror("Error in pthread create\n");
473                                         pthread_cond_destroy(&tcond);
474                                         pthread_mutex_destroy(&tlock);
475                                         pDelete(pile_ptr);
476                                         free(listmid);
477                                         for (i = 0; i < threadnum; i++)
478                                                 free(thread_data_array[i].buffer);
479                                         free(thread_data_array);
480                                         free(ltdata);
481                                         free(record);
482                                         return 1;
483                                 }
484                         } else { /*Local*/
485                                 ltdata->tdata = &thread_data_array[threadnum];
486                                 ltdata->transinfo = &transinfo;
487                                 val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
488                                 if(val) {
489                                         perror("Error in pthread create\n");
490                                         pthread_cond_destroy(&tcond);
491                                         pthread_mutex_destroy(&tlock);
492                                         pDelete(pile_ptr);
493                                         free(listmid);
494                                         for (i = 0; i < threadnum; i++)
495                                                 free(thread_data_array[i].buffer);
496                                         free(thread_data_array);
497                                         free(ltdata);
498                                         free(record);
499                                         return 1;
500                                 }
501                         }
502
503                         threadnum++;            
504                         pile = pile->next;
505                 }
506
507                 /* Free attribute and wait for the other threads */
508                 pthread_attr_destroy(&attr);
509                 
510                 for (i = 0; i < pilecount; i++) {
511                         rc = pthread_join(thread[i], NULL);
512                         if(rc)
513                         {
514                                 printf("ERROR return code from pthread_join() is %d\n", rc);
515                                 pthread_cond_destroy(&tcond);
516                                 pthread_mutex_destroy(&tlock);
517                                 pDelete(pile_ptr);
518                                 free(listmid);
519                                 for (j = i; j < pilecount; j++)
520                                         free(thread_data_array[j].buffer);
521                                 free(thread_data_array);
522                                 free(ltdata);
523                                 free(record);
524                                 return 1;
525                         }
526                         free(thread_data_array[i].buffer);
527                 }
528         
529
530                 /* Free resources */    
531                 pthread_cond_destroy(&tcond);
532                 pthread_mutex_destroy(&tlock);
533                 free(listmid);
534                 pDelete(pile_ptr);
535                 free(thread_data_array);
536                 free(ltdata);
537
538                 /* wait a random amount of time */
539                 if (treplyretry == 1) 
540                         randomdelay();
541
542         /* Retry trans commit procedure if not sucessful in the first try */
543         } while (treplyretry == 1);
544         
545         /* Free Resources */
546         objstrDelete(record->cache);
547         chashDelete(record->lookupTable);
548         free(record);
549         return 0;
550 }
551
552 /* This function sends information involved in the transaction request 
553  * to participants and accepts a response from particpants.
554  * It calls decideresponse() to decide on what control message 
555  * to send next to participants and sends the message using sendResponse()*/
556 void *transRequest(void *threadarg) {
557         int sd, i, n;
558         struct sockaddr_in serv_addr;
559         struct hostent *server;
560         thread_data_array_t *tdata;
561         objheader_t *headeraddr;
562         char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
563         char machineip[16], retval;
564
565         tdata = (thread_data_array_t *) threadarg;
566
567         /* Send Trans Request */
568         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
569                 perror("Error in socket for TRANS_REQUEST\n");
570                 pthread_exit(NULL);
571         }
572         bzero((char*) &serv_addr, sizeof(serv_addr));
573         serv_addr.sin_family = AF_INET;
574         serv_addr.sin_port = htons(LISTEN_PORT);
575         midtoIP(tdata->mid,machineip);
576         machineip[15] = '\0';
577         serv_addr.sin_addr.s_addr = inet_addr(machineip);
578         /* Open Connection */
579         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
580                 perror("Error in connect for TRANS_REQUEST\n");
581                 close(sd);
582                 pthread_exit(NULL);
583         }
584
585         /* Send bytes of data with TRANS_REQUEST control message */
586         if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
587                 perror("Error sending fixed bytes for thread\n");
588                 close(sd);
589                 pthread_exit(NULL);
590         }
591         /* Send list of machines involved in the transaction */
592         {
593                 int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
594                 if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
595                         perror("Error sending list of machines for thread\n");
596                         close(sd);
597                         pthread_exit(NULL);
598                 }
599         }
600         /* Send oids and version number tuples for objects that are read */
601         {
602                 int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
603                 if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
604                         perror("Error sending tuples for thread\n");
605                         close(sd);
606                         pthread_exit(NULL);
607                 }
608         }
609         /* Send objects that are modified */
610         for(i = 0; i < tdata->buffer->f.nummod ; i++) {
611                 int size;
612                 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
613                 GETSIZE(size,headeraddr);
614                 size+=sizeof(objheader_t);
615                 if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
616                         perror("Error sending obj modified for thread\n");
617                         close(sd);
618                         pthread_exit(NULL);
619                 }
620         }
621
622         /* Read control message from Participant */
623         if((n = read(sd, &control, sizeof(char))) <= 0) {
624                 perror("Error in reading control message from Participant\n");
625                 close(sd);
626                 pthread_exit(NULL);
627         }
628         recvcontrol = control;
629
630         /* Update common data structure and increment count */
631         tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
632
633         /* Lock and update count */
634         /* Thread sleeps until all messages from pariticipants are received by coordinator */
635         pthread_mutex_lock(tdata->lock);
636
637         (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
638
639         /* Wake up the threads and invoke decideResponse (once) */
640         if(*(tdata->count) == tdata->buffer->f.mcount) {
641                 decideResponse(tdata); 
642                 pthread_cond_broadcast(tdata->threshold);
643         } else {
644                 pthread_cond_wait(tdata->threshold, tdata->lock);
645         }
646         pthread_mutex_unlock(tdata->lock);
647
648         /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
649          * to all participants in their respective socket */
650         if (sendResponse(tdata, sd) == 0) { 
651                 printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
652                 close(sd);
653                 pthread_exit(NULL);
654         }
655
656         /* Close connection */
657         close(sd);
658         pthread_exit(NULL);
659 }
660
661 /* This function decides the reponse that needs to be sent to 
662  * all Participant machines after the TRANS_REQUEST protocol */
663 void decideResponse(thread_data_array_t *tdata) {
664         char control;
665         int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
666                                                                          message to send */
667
668         for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
669                 control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
670                                                            written onto the shared array */
671                 switch(control) {
672                         default:
673                                 printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
674                                 /* treat as disagree, pass thru */
675                         case TRANS_DISAGREE:
676                                 transdisagree++;
677                                 break;
678
679                         case TRANS_AGREE:
680                                 transagree++;
681                                 break;
682
683                         case TRANS_SOFT_ABORT:
684                                 transsoftabort++;
685                                 break;
686                 }
687         }
688
689         if(transdisagree > 0) {
690                 /* Send Abort */
691                 *(tdata->replyctrl) = TRANS_ABORT;
692         } else if(transagree == tdata->buffer->f.mcount){
693                 /* Send Commit */
694                 *(tdata->replyctrl) = TRANS_COMMIT;
695         } else { 
696                 /* Send Abort in soft abort case followed by retry commiting transaction again*/
697                 *(tdata->replyctrl) = TRANS_ABORT;
698                 *(tdata->replyretry) = 1;
699         }
700
701         return;
702 }
703 /* This function sends the final response to remote machines per thread in their respective socket id 
704  * It returns a char that is only needed to check the correctness of execution of this function inside
705  * transRequest()*/
706 char sendResponse(thread_data_array_t *tdata, int sd) {
707         int n, N, sum, oidcount = 0;
708         char *ptr, retval = 0;
709         unsigned int *oidnotfound;
710
711         /* If the decided response is due to a soft abort and missing objects at the Participant's side */
712         if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
713                 /* Read list of objects missing */
714                 if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
715                         N = oidcount * sizeof(unsigned int);
716                         if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
717                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
718                                 return 0;
719                         }
720                         ptr = (char *) oidnotfound;
721                         do {
722                                 n = read(sd, ptr+sum, N-sum);
723                                 sum += n;
724                         } while(sum < N && n !=0);
725                 }
726                 retval =  TRANS_SOFT_ABORT;
727         }
728         /* If the decided response is TRANS_ABORT */
729         if(*(tdata->replyctrl) == TRANS_ABORT) {
730                 retval = TRANS_ABORT;
731         } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
732                 retval = TRANS_COMMIT;
733         }
734
735         if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
736                 perror("Error sending ctrl message for participant\n");
737         }
738
739         return retval;
740 }
741
742 /* This function opens a connection, places an object read request to the 
743  * remote machine, reads the control message and object if available  and 
744  * copies the object and its header to the local cache.
745  * TODO replace mnum and midtoIP() with MACHINE_IP address later */ 
746
747 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
748         int sd, size, val;
749         struct sockaddr_in serv_addr;
750         struct hostent *server;
751         char control;
752         char machineip[16];
753         objheader_t *h;
754         void *objcopy;
755
756         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
757                 perror("Error in socket\n");
758                 return NULL;
759         }
760         bzero((char*) &serv_addr, sizeof(serv_addr));
761         serv_addr.sin_family = AF_INET;
762         serv_addr.sin_port = htons(LISTEN_PORT);
763         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
764         midtoIP(mnum,machineip);
765         machineip[15] = '\0';
766         serv_addr.sin_addr.s_addr = inet_addr(machineip);
767         /* Open connection */
768         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
769                 perror("Error in connect\n");
770                 return NULL;
771         }
772         char readrequest[sizeof(char)+sizeof(unsigned int)];
773         readrequest[0] = READ_REQUEST;
774         *((unsigned int *)(&readrequest[1])) = oid;
775         if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
776                 perror("Error sending message\n");
777                 return NULL;
778         }
779
780 #ifdef DEBUG1
781         printf("DEBUG -> ready to rcv ...\n");
782 #endif
783         /* Read response from the Participant */
784         if((val = read(sd, &control, sizeof(char))) <= 0) {
785                 perror("No control response for getRemoteObj sent\n");
786                 return NULL;
787         }
788         switch(control) {
789                 case OBJECT_NOT_FOUND:
790                         return NULL;
791                 case OBJECT_FOUND:
792                         /* Read object if found into local cache */
793                         if((val = read(sd, &size, sizeof(int))) <= 0) {
794                                 perror("No size is read from the participant\n");
795                                 return NULL;
796                         }
797                         objcopy = objstrAlloc(record->cache, size);
798                         if((val = read(sd, objcopy, size)) <= 0) {
799                                 perror("No objects are read from the remote participant\n");
800                                 return NULL;
801                         }
802                         /* Insert into cache's lookup table */
803                         chashInsert(record->lookupTable, oid, objcopy); 
804                         break;
805                 default:
806                         printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
807                         return NULL;
808         }
809         /* Close connection */
810         close(sd);
811         return objcopy;
812 }
813
814 /* This function handles the local objects involved in a transaction commiting process.
815  * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator.
816  * Note Coordinator = local machine
817  * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and
818  * based on common agreement it either commits or aborts the transaction.
819  * It also frees the memory resources */
820 void *handleLocalReq(void *threadarg) {
821         int val, i = 0, size, offset = 0;
822         short version;
823         char control = 0, *ptr;
824         unsigned int oid;
825         unsigned int *oidnotfound = NULL, *oidlocked = NULL;
826         void *mobj, *modptr;
827         objheader_t *headptr, *headeraddr;
828         local_thread_data_array_t *localtdata;
829
830         localtdata = (local_thread_data_array_t *) threadarg;
831
832         /* Counters and arrays to formulate decision on control message to be sent */
833         oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
834         oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
835         int objnotfound = 0, objlocked = 0; 
836         int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
837
838         /* modptr points to the beginning of the object store 
839          * created at the Pariticipant */ 
840         pthread_mutex_lock(&mainobjstore_mutex);
841         if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
842                 printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
843                 pthread_mutex_unlock(&mainobjstore_mutex);
844                 pthread_exit(NULL);
845         }
846         pthread_mutex_unlock(&mainobjstore_mutex);
847         /* Write modified objects into the mainobject store */
848         for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
849                 headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
850                 GETSIZE(size,headeraddr);
851                 size+=sizeof(objheader_t);
852                 memcpy((char *)modptr+offset, headeraddr, size);  
853                 offset += size;
854         }
855         /* Write new objects into the mainobject store */
856         for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) {
857                 headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]);
858                 GETSIZE(size, headeraddr);
859                 size+=sizeof(objheader_t);
860                 memcpy((char *)modptr+offset, headeraddr, size);  
861                 offset += size;
862         }
863
864         ptr = modptr;
865         offset = 0; //Reset 
866
867         /* Process each oid in the machine pile/ group per thread */
868         for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
869                 if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified
870                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
871                         incr *= i;
872                         oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr));
873                         incr += sizeof(unsigned int);
874                         version = *((short *)(localtdata->tdata->buffer->objread + incr));
875                 } else {//Objs modified
876                         int tmpsize;
877                         headptr = (objheader_t *)ptr;
878                         oid = OID(headptr);
879                         version = headptr->version;
880                         GETSIZE(tmpsize, headptr);
881                         ptr += sizeof(objheader_t) + tmpsize;
882                 }
883
884                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
885
886                 /* Save the oids not found and number of oids not found for later use */
887                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
888                         /* Save the oids not found and number of oids not found for later use */
889                         oidnotfound[objnotfound] = oid;
890                         objnotfound++;
891                 } else { /* If Obj found in machine (i.e. has not moved) */
892                         /* Check if Obj is locked by any previous transaction */
893                         if (STATUS(((objheader_t *)mobj)) & LOCK) {
894                                 if (version == ((objheader_t *)mobj)->version) {      /* If not locked then match versions */ 
895                                         v_matchlock++;
896                                 } else {/* If versions don't match ...HARD ABORT */
897                                         v_nomatch++;
898                                         /* Send TRANS_DISAGREE to Coordinator */
899                                         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
900                                 }
901                         } else {/* If Obj is not locked then lock object */
902                                 STATUS(((objheader_t *)mobj)) |= LOCK;
903                                 //TODO Remove this for Testing
904                                 //randomdelay(); -- Why is this here.  BCD
905
906                                 /* Save all object oids that are locked on this machine during this transaction request call */
907                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
908                                 objlocked++;
909                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
910                                         v_matchnolock++;
911                                 } else { /* If versions don't match ...HARD ABORT */
912                                         v_nomatch++;
913                                         /* Send TRANS_DISAGREE to Coordinator */
914                                         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
915                                 }
916                         }
917                 }
918         }
919
920         /* Condition to send TRANS_AGREE */
921         if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
922                 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
923         }
924         /* Condition to send TRANS_SOFT_ABORT */
925         if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
926                 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
927                 //TODO  currently the only soft abort case that is supported is when object locked by previous
928                 //transaction => v_matchlock > 0 
929                 //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported 
930                 /* Send number of oids not found and the missing oids if objects are missing in the machine */
931                 /* TODO Remember to store the oidnotfound for later use
932                    if(objnotfound != 0) {
933                    int size = sizeof(unsigned int)* objnotfound;
934                    }
935                    */
936         }
937
938         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
939          * if Participant receives a TRANS_COMMIT */
940         localtdata->transinfo->objlocked = oidlocked;
941         localtdata->transinfo->objnotfound = oidnotfound;
942         localtdata->transinfo->modptr = modptr;
943         localtdata->transinfo->numlocked = objlocked;
944         localtdata->transinfo->numnotfound = objnotfound;
945
946         /* Lock and update count */
947         //Thread sleeps until all messages from pariticipants are received by coordinator
948         pthread_mutex_lock(localtdata->tdata->lock);
949         (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
950
951         /* Wake up the threads and invoke decideResponse (once) */
952         if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
953                 decideResponse(localtdata->tdata); 
954                 pthread_cond_broadcast(localtdata->tdata->threshold);
955         } else {
956                 pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
957         }
958         pthread_mutex_unlock(localtdata->tdata->lock);
959
960         /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
961         if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
962                 if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
963                         printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
964                         pthread_exit(NULL);
965                 }
966         }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
967                 if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) {
968                         printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
969                         pthread_exit(NULL);
970                 }
971         }
972
973         /* Free memory */
974         if (localtdata->transinfo->objlocked != NULL) {
975                 free(localtdata->transinfo->objlocked);
976                 localtdata->transinfo->objlocked = NULL;
977         }
978         if (localtdata->transinfo->objnotfound != NULL) {
979                 free(localtdata->transinfo->objnotfound);
980                 localtdata->transinfo->objnotfound = NULL;
981         }
982
983         pthread_exit(NULL);
984 }
985 /* This function completes the ABORT process if the transaction is aborting 
986 */
987 int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
988         char *ptr;
989         int i;
990         objheader_t *tmp_header;
991         void *header;
992
993         /* Set all ref counts as 1 and do garbage collection */
994         ptr = modptr;
995         for(i = 0; i< nummod; i++) {
996                 int tmpsize;
997                 tmp_header = (objheader_t *)ptr;
998                 tmp_header->rcount = 0;
999                 GETSIZE(tmpsize, tmp_header);
1000                 ptr += sizeof(objheader_t) + tmpsize;
1001         }
1002         /* Unlock objects that was locked due to this transaction */
1003         for(i = 0; i< numlocked; i++) {
1004                 if((header = mhashSearch(objlocked[i])) == NULL) {
1005                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1006                         return 1;
1007                 }
1008                 STATUS(((objheader_t *)header)) &= ~(LOCK);
1009         }
1010
1011         /* Send ack to Coordinator */
1012
1013         /*Free the pointer */
1014         ptr = NULL;
1015         return 0;
1016 }
1017
1018 /*This function completes the COMMIT process is the transaction is commiting
1019 */
1020 int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated, unsigned int *objlocked, int nummod, int numcreated, int numlocked) {
1021         objheader_t *header;
1022         int i = 0, offset = 0;
1023         char control;
1024
1025         /* Process each modified object saved in the mainobject store */
1026         for(i = 0; i < nummod; i++) {
1027           int tmpsize;
1028                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1029                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1030                         return 1;
1031                 }
1032                 /* Change reference count of older address and free space in objstr ?? */
1033                 header->rcount = 0;
1034
1035                 /* Change ptr address in mhash table */
1036                 mhashRemove(oidmod[i]); //TODO: this shouldn't be necessary
1037                 mhashInsert(oidmod[i], (((char *)modptr) + offset));
1038                 GETSIZE(tmpsize, header);
1039                 offset += sizeof(objheader_t) + tmpsize;
1040
1041                 /* Update object version number */
1042                 header = (objheader_t *) mhashSearch(oidmod[i]);
1043                 header->version += 1;
1044         }
1045
1046         /*If object is in prefetch cache then update it in prefetch cache */ 
1047
1048
1049         /* If object is newly created inside transaction then commit it */
1050         for (i = 0; i < numcreated; i++)
1051         {
1052                 int tmpsize;
1053                 header = (objheader_t *)(((char *)modptr) + offset);
1054                 mhashInsert(oidcreated[i], (((char *)modptr) + offset));
1055                 GETSIZE(tmpsize, header);
1056                 offset += sizeof(objheader_t) + tmpsize;
1057                 lhashInsert(oidcreated[i], myIpAddr);
1058         }
1059
1060         /* Unlock locked objects */
1061         for(i = 0; i < numlocked; i++) {
1062                 if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) {
1063                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1064                         return 1;
1065                 }
1066                 STATUS(header) &= ~(LOCK);
1067         }
1068
1069         //TODO Update location lookup table
1070
1071         /* Send ack to Coordinator */
1072         printf("TRANS_SUCCESSFUL\n");
1073         return 0;
1074 }
1075
1076 /* This function checks if the prefetch oids are same and have same offsets  
1077  * for case x.a.b and y.a.b where x and y have same oid's
1078  * or if a.b.c is a subset of x.b.c.d*/ 
1079 /* check for case where the generated request a.y.z or x.y.z.g then 
1080  * prefetch needs to be generated for x.y.z.g  if oid of a and x are same*/
1081 void checkPrefetchTuples(prefetchqelem_t *node) {
1082         int i,j, count,k, sindex, index;
1083         char *ptr, *tmp;
1084         int ntuples, slength;
1085         unsigned int *oid;
1086         short *endoffsets, *arryfields; 
1087
1088         /* Check for the case x.y.z and a.b.c are same oids */ 
1089         ptr = (char *) node;
1090         ntuples = *(GET_NTUPLES(ptr));
1091         oid = GET_PTR_OID(ptr);
1092         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1093         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1094         /* Find offset length for each tuple */
1095         int numoffset[ntuples];
1096         numoffset[0] = endoffsets[0];
1097         for(i = 1; i<ntuples; i++) {
1098                 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1099         }
1100         /* Check for redundant tuples by comparing oids of each tuple */
1101         for(i = 0; i < ntuples; i++) {
1102                 if(oid[i] == -1)
1103                         continue;
1104                 for(j = i+1 ; j < ntuples; j++) {
1105                         if(oid[j] == -1)
1106                                 continue;
1107                         /*If oids of tuples match */ 
1108                         if (oid[i] == oid[j]) {
1109                                 /* Find the smallest offset length of two tuples*/
1110                                 if(numoffset[i] >  numoffset[j]){
1111                                         slength = numoffset[j];
1112                                         sindex = j;
1113                                 }
1114                                 else {
1115                                         slength = numoffset[i];
1116                                         sindex = i;
1117                                 }
1118
1119                                 /* Compare the offset values based on the current indices
1120                                  * break if they do not match
1121                                  * if all offset values match then pick the largest tuple*/
1122
1123                                 if(i == 0) {
1124                                         k = 0;
1125                                         index = endoffsets[j -1];
1126                                         for(count = 0; count < slength; count ++) {
1127                                                 if (arryfields[k] != arryfields[index]) { 
1128                                                         break;
1129                                                 }
1130                                                 index++;
1131                                                 k++;
1132                                         }       
1133                                 } else {
1134                                         k = endoffsets[i-1];
1135                                         index = endoffsets[j-1];
1136                                         printf("Value of slength = %d\n", slength);
1137                                         for(count = 0; count < slength; count++) {
1138                                                 if(arryfields[k] != arryfields[index]) {
1139                                                         break;
1140                                                 }
1141                                                 index++;
1142                                                 k++;
1143                                         }
1144                                 }
1145
1146                                 if(slength == count) {
1147                                         oid[sindex] = -1;
1148                                 }
1149                         }
1150                 }
1151         }
1152 }
1153
1154 void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopcount, unsigned int objoid, int index, int iter, int oidnfound) {
1155         char *ptr, *tmp;
1156         int ntuples, i, k, flag;
1157         unsigned int * oid;
1158         short *endoffsets, *arryfields;
1159         objheader_t *header;
1160
1161         ptr = (char *) node;
1162         ntuples = *(GET_NTUPLES(ptr));
1163         oid = GET_PTR_OID(ptr);
1164         endoffsets = GET_PTR_EOFF(ptr, ntuples);
1165         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1166
1167         if(oidnfound == 1) {
1168                 if((header = (objheader_t *) prehashSearch(objoid)) == NULL) {
1169                         return;
1170                 } else { //Found in Prefetch Cache
1171                         //TODO Decide if object is too old, if old remove from cache
1172                         tmp = (char *) header;
1173                         /* Check if any of the offset oid is available in the Prefetch cache */
1174                         for(i = counter; i < loopcount; i++) {
1175                                 objoid = *(tmp + sizeof(objheader_t) + arryfields[counter]);
1176                                 if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
1177                                         flag = 0;
1178                                 } else {
1179                                         flag = 1;
1180                                         break;
1181                                 }
1182                         }
1183                 }
1184         } else {
1185                 for(i = counter; i<loopcount; i++) {
1186                         if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
1187                                 tmp = (char *) header;
1188                                 objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
1189                                 flag = 0;
1190                                 index++;
1191                         } else {
1192                                 flag = 1;
1193                                 break;
1194                         }
1195                 }
1196         }
1197
1198         /* If oid not found locally or in prefetch cache then 
1199          * assign the latest oid found as the new oid 
1200          * and copy left over offsets into the arrayoffsetfieldarray*/
1201         oid[iter] = objoid;
1202         numoffset[iter] = numoffset[iter] - (i+1);
1203         for(k = 0; k < numoffset[iter] ; k++) {
1204                 arryfields[endoffsets[counter]+k] = arryfields[endoffsets[counter]+k+1];
1205         }
1206
1207         if(flag == 0) {
1208                 oid[iter] = -1;
1209                 numoffset[iter] = 0;
1210         }
1211 }
1212
1213 /* This function makes machine piles to be added into the machine pile queue for each prefetch call */
1214 prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
1215         char *ptr, *tmp;
1216         int ntuples, slength, i, machinenum;
1217         int maxoffset;
1218         unsigned int *oid;
1219         short *endoffsets, *arryfields, *offset; 
1220         prefetchpile_t *head = NULL;
1221
1222         /* Check for the case x.y.z and a.b.c are same oids */ 
1223         ptr = (char *) node;
1224         ntuples = *(GET_NTUPLES(ptr));
1225         oid = GET_PTR_OID(ptr);
1226         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1227         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1228
1229         /* Check for redundant tuples by comparing oids of each tuple */
1230         for(i = 0; i < ntuples; i++) {
1231                 if(oid[i] == -1)
1232                         continue;
1233                 /* For each tuple make piles */
1234                 if ((machinenum = lhashSearch(oid[i])) == 0) {
1235                         printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
1236                         return NULL;
1237                 }
1238                 /* Insert into machine pile */
1239                 offset = &arryfields[endoffsets[i-1]];
1240                 insertPile(machinenum, oid[i], numoffset[i], offset, &head);
1241         }
1242         return head;
1243 }
1244
1245
1246 /* This function checks if the oids within the prefetch tuples are available locally.
1247  * If yes then makes the tuple invalid. If no then rearranges oid and offset values in 
1248  * the prefetchqelem_t node to represent a new prefetch tuple */
1249 prefetchpile_t *foundLocal(prefetchqelem_t *node) {
1250         int ntuples,i, j, k, oidnfound = 0, index, flag;
1251         unsigned int *oid;
1252         unsigned int  objoid;
1253         char *ptr, *tmp;
1254         objheader_t *objheader;
1255         short *endoffsets, *arryfields; 
1256         prefetchpile_t *head = NULL;
1257
1258         ptr = (char *) node;
1259         ntuples = *(GET_NTUPLES(ptr));
1260         oid = GET_PTR_OID(ptr);
1261         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1262         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1263         /* Find offset length for each tuple */
1264         int numoffset[ntuples];//Number of offsets for each tuple
1265         numoffset[0] = endoffsets[0];
1266         for(i = 1; i<ntuples; i++) {
1267                 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1268         }
1269         for(i = 0; i < ntuples; i++) { 
1270                 if(oid[i] == -1)
1271                         continue;
1272                 /* If object found locally */
1273                 if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) { 
1274                         oidnfound = 0;
1275                         tmp = (char *) objheader;
1276                         /* Find the oid of its offset value */
1277                         if(i == 0) 
1278                                 index = 0;
1279                         else 
1280                                 index = endoffsets[i - 1];
1281                         for(j = 0 ; j < numoffset[i] ; j++) {
1282                                 objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
1283                                 /*If oid found locally then 
1284                                  *assign the latest oid found as the new oid 
1285                                  *and copy left over offsets into the arrayoffsetfieldarray*/
1286                                 oid[i] = objoid;
1287                                 numoffset[i] = numoffset[i] - (j+1);
1288                                 for(k = 0; k < numoffset[i]; k++)
1289                                         arryfields[endoffsets[j]+ k] = arryfields[endoffsets[j]+k+1];
1290                                 index++;
1291                                 /*New offset oid not found */
1292                                 if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
1293                                         flag = 1;
1294                                         checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound); 
1295                                         break;
1296                                 } else 
1297                                         flag = 0;
1298                         }
1299
1300                         /*If all offset oids are found locally,make the prefetch tuple invalid */
1301                         if(flag == 0) {
1302                                 oid[i] = -1;
1303                                 numoffset[i] = 0;
1304                         }
1305                 } else {
1306                         oidnfound = 1;
1307                         /* Look in Prefetch cache */
1308                         checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); 
1309                 }
1310
1311         }
1312         /* Make machine groups */
1313         head = makePreGroups(node, numoffset);
1314         return head;
1315 }
1316
1317 /* This function is called by the thread calling transPrefetch */
1318 void *transPrefetch(void *t) {
1319         prefetchqelem_t *qnode;
1320         prefetchpile_t *pilehead = NULL;
1321
1322         while(1) {
1323                 /* lock mutex of primary prefetch queue */
1324                 pthread_mutex_lock(&pqueue.qlock);
1325                 /* while primary queue is empty, then wait */
1326                 while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
1327                         pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
1328                 }
1329
1330                 /* dequeue node to create a machine piles and  finally unlock mutex */
1331                 if((qnode = pre_dequeue()) == NULL) {
1332                         printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
1333                         pthread_mutex_unlock(&pqueue.qlock);
1334                         pthread_exit(NULL);
1335                 }
1336                 pthread_mutex_unlock(&pqueue.qlock);
1337                 /* Reduce redundant prefetch requests */
1338                 checkPrefetchTuples(qnode);
1339                 /* Check if the tuples are found locally, if yes then reduce them further*/ 
1340                 /* and group requests by remote machine ids by calling the makePreGroups() */
1341                 pilehead = foundLocal(qnode);
1342
1343                 /* Lock mutex of pool queue */
1344                 pthread_mutex_lock(&mcqueue.qlock);
1345                 /* Update the pool queue with the new remote machine piles generated per prefetch call */
1346                 mcpileenqueue(pilehead);
1347                 /* Broadcast signal on machine pile queue */
1348                 pthread_cond_broadcast(&mcqueue.qcond);
1349                 /* Unlock mutex of  machine pile queue */
1350                 pthread_mutex_unlock(&mcqueue.qlock);
1351                 /* Deallocate the prefetch queue pile node */
1352                 predealloc(qnode);
1353
1354         }
1355 }
1356
1357 /* Each thread in the  pool of threads calls this function to establish connection with
1358  * remote machines, send the prefetch requests and process the reponses from
1359  * the remote machines .
1360  * The thread is active throughout the period of runtime */
1361
1362 void *mcqProcess(void *threadid) {
1363         int tid;
1364         prefetchpile_t *mcpilenode;
1365
1366         tid = (int) threadid;
1367         while(1) {
1368                 /* Lock mutex of mc pile queue */
1369                 pthread_mutex_lock(&mcqueue.qlock);
1370                 /* When mc pile queue is empty, wait */
1371                 while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
1372                         pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
1373                 }
1374                 /* Dequeue node to send remote machine connections*/
1375                 if((mcpilenode = mcpiledequeue()) == NULL) {
1376                         printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
1377                         pthread_mutex_unlock(&mcqueue.qlock);
1378                         pthread_exit(NULL);
1379                 }
1380                 /* Unlock mutex */
1381                 pthread_mutex_unlock(&mcqueue.qlock);
1382
1383                 /*Initiate connection to remote host and send request */ 
1384                 /* Process Request */
1385                 sendPrefetchReq(mcpilenode, tid);
1386
1387                 /* Deallocate the machine queue pile node */
1388                 mcdealloc(mcpilenode);
1389         }
1390 }
1391
1392 void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
1393         int sd, i, offset, off, len, endpair, count = 0;
1394         struct sockaddr_in serv_addr;
1395         struct hostent *server;
1396         char machineip[16], control;
1397         objpile_t *tmp;
1398
1399
1400         /* Send Trans Prefetch Request */
1401         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1402                 perror("Error in socket for TRANS_REQUEST\n");
1403                 return;
1404         }
1405         bzero((char*) &serv_addr, sizeof(serv_addr));
1406         serv_addr.sin_family = AF_INET;
1407         serv_addr.sin_port = htons(LISTEN_PORT);
1408         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
1409         midtoIP(mcpilenode->mid ,machineip);
1410         machineip[15] = '\0';
1411         serv_addr.sin_addr.s_addr = inet_addr(machineip);
1412
1413         /* Open Connection */
1414         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
1415                 perror("Error in connect for TRANS_REQUEST\n");
1416                 close(sd);
1417                 return;
1418         }
1419
1420         /* Send TRANS_PREFETCH control message */
1421         control = TRANS_PREFETCH;
1422         if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
1423                 perror("Error in sending prefetch control\n");
1424                 close(sd);
1425                 return;
1426         }
1427
1428         /* Send Oids and offsets in pairs */
1429         tmp = mcpilenode->objpiles;
1430         while(tmp != NULL) {
1431                 off = offset = 0;
1432                 count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
1433                 len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1434                 char oidnoffset[len];
1435                 memcpy(oidnoffset, &len, sizeof(int));
1436                 off = sizeof(int);
1437                 memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
1438                 off += sizeof(unsigned int);
1439                 for(i = 0; i < tmp->numoffset; i++) {
1440                         memcpy(oidnoffset + off, &tmp->offset[i], sizeof(short));
1441                         off+=sizeof(short);
1442                 }
1443                 if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
1444                         perror("Error sending fixed bytes for thread\n");
1445                         close(sd);
1446                         return;
1447                 }
1448                 tmp = tmp->next;
1449         }
1450
1451         /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1452         endpair = -1;
1453         if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
1454                 perror("Error sending fixed bytes for thread\n");
1455                 close(sd);
1456                 return;
1457         }
1458
1459         /* Get Response from the remote machine */
1460         getPrefetchResponse(count,sd);
1461         close(sd);
1462         return;
1463 }
1464
1465 void getPrefetchResponse(int count, int sd) {
1466         int i = 0, val, n, N, sum, index, objsize;
1467         unsigned int bufsize,oid;
1468         char buffer[RECEIVE_BUFFER_SIZE], control;
1469         char *ptr;
1470         void *modptr, *oldptr;
1471
1472         /* Read  prefetch response from the Remote machine */
1473         if((val = read(sd, &control, sizeof(char))) <= 0) {
1474                 perror("No control response for Prefetch request sent\n");
1475                 return;
1476         }
1477
1478         if(control == TRANS_PREFETCH_RESPONSE) {
1479                 /*For each oid and offset tuple sent as prefetch request to remote machine*/
1480                 while(i < count) {
1481                         sum = 0;
1482                         index = 0;
1483                         /* Read the size of buffer to be received */
1484                         if((N = read(sd, buffer, sizeof(unsigned int))) <= 0) {
1485                                 perror("Size of buffer not recv\n");
1486                                 return;
1487                         }
1488                         memcpy(&bufsize, buffer, sizeof(unsigned int));
1489                         ptr = buffer + sizeof(unsigned int);
1490                         /* Keep receiving the buffer containing oid info */ 
1491                         do {
1492                                 n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0);
1493                                 sum +=n;
1494                         } while(sum < bufsize && n != 0);
1495                         /* Decode the contents of the buffer */
1496                         index = sizeof(unsigned int);
1497                         while(index < (bufsize - sizeof(unsigned int))) {
1498                                 if(buffer[index] == OBJECT_FOUND) {
1499                                         /* Increment it to get the object */
1500                                         index += sizeof(char);
1501                                         memcpy(&oid, buffer + index, sizeof(unsigned int));
1502                                         index += sizeof(unsigned int);
1503                                         /* For each object found add to Prefetch Cache */
1504                                         memcpy(&objsize, buffer + index, sizeof(int));
1505                                         index+=sizeof(int);
1506                                         pthread_mutex_lock(&prefetchcache_mutex);
1507                                         if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
1508                                                 printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1509                                                 pthread_mutex_unlock(&prefetchcache_mutex);
1510                                                 return;
1511                                         }
1512                                         pthread_mutex_unlock(&prefetchcache_mutex);
1513                                         memcpy(modptr, buffer+index, objsize);
1514                                         index += objsize;
1515                                         /* Insert the oid and its address into the prefetch hash lookup table */
1516                                         /* Do a version comparison if the oid exists */
1517                                         if((oldptr = prehashSearch(oid)) != NULL) {
1518                                                 /* If older version then update with new object ptr */
1519                                                 if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
1520                                                         prehashRemove(oid);
1521                                                         prehashInsert(oid, modptr);
1522                                                 } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { 
1523                                                         /* Add the new object ptr to hash table */
1524                                                         prehashRemove(oid);
1525                                                         prehashInsert(oid, modptr);
1526                                                 } else { /* Do nothing: TODO modptr should be reference counted */
1527                                                         ;
1528                                                 }
1529                                         } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
1530                                                 prehashInsert(oid, modptr);
1531                                         }
1532                                         /* Lock the Prefetch Cache look up table*/
1533                                         //pthread_mutex_lock(&pflookup.lock);
1534                                         /* Broadcast signal on prefetch cache condition variable */ 
1535                                         pthread_cond_broadcast(&pflookup.cond);
1536                                         /* Unlock the Prefetch Cache look up table*/
1537                                         //pthread_mutex_unlock(&pflookup.lock);
1538                                 } else if(buffer[index] == OBJECT_NOT_FOUND) {
1539                                         /* Increment it to get the object */
1540                                         /* TODO: For each object not found query DHT for new location and retrieve the object */
1541                                         index += sizeof(char);
1542                                         memcpy(&oid, buffer + index, sizeof(unsigned int));
1543                                         index += sizeof(unsigned int);
1544                                         /* Throw an error */
1545                                         printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
1546                                         exit(-1);
1547                                 } else {
1548                                         printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__);
1549                                         return;
1550                                 }
1551                         }
1552
1553                         i++;
1554                 }
1555         } else
1556                 printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
1557         return;
1558 }
1559
1560 unsigned short getObjType(unsigned int oid)
1561 {
1562         objheader_t *objheader;
1563         unsigned short numoffsets = 0;
1564
1565         if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL)
1566         {
1567                 if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
1568                 {
1569                         prefetch(1, &oid, &numoffsets, NULL);
1570                         pthread_mutex_lock(&pflookup.lock);
1571                         while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
1572                         {
1573                                 pthread_cond_wait(&pflookup.cond, &pflookup.lock);
1574                         }
1575                         pthread_mutex_unlock(&pflookup.lock);
1576                 }
1577         }
1578
1579         return TYPE(objheader);
1580 }
1581
1582 int startRemoteThread(unsigned int oid, unsigned int mid)
1583 {
1584         int sock;
1585         struct sockaddr_in remoteAddr;
1586         char msg[1 + sizeof(unsigned int)];
1587         int bytesSent;
1588         int status;
1589
1590         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1591         {
1592                 perror("startRemoteThread():socket()");
1593                 return -1;
1594         }
1595
1596         bzero(&remoteAddr, sizeof(remoteAddr));
1597         remoteAddr.sin_family = AF_INET;
1598         remoteAddr.sin_port = htons(LISTEN_PORT);
1599         remoteAddr.sin_addr.s_addr = htonl(mid);
1600         
1601         if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0)
1602         {
1603                 printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1604                         inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1605                 status = -1;
1606         }
1607         else
1608         {
1609                 msg[0] = START_REMOTE_THREAD;
1610                 memcpy(&msg[1], &oid, sizeof(unsigned int));
1611
1612                 bytesSent = send(sock, msg, 1 + sizeof(unsigned int), 0);
1613                 if (bytesSent < 0)
1614                 {
1615                         perror("startRemoteThread():send()");
1616                         status = -1;
1617                 }
1618                 else if (bytesSent != 1 + sizeof(unsigned int))
1619                 {
1620                         printf("startRemoteThread(): error, sent %d bytes\n", bytesSent);
1621                         status = -1;
1622                 }
1623                 else
1624                 {
1625                         status = 0;
1626                 }
1627         }
1628
1629         close(sock);
1630         return status;
1631 }
1632
1633 //TODO: when reusing oids, make sure they are not already in use!
1634 unsigned int getNewOID(void) {
1635         static unsigned int id = 0xFFFFFFFF;
1636         
1637         id += 2;
1638         if (id > oidMax || id < oidMin)
1639         {
1640                 id = (oidMin | 1);
1641         }
1642         return id;
1643 }
1644
1645 int processConfigFile()
1646 {
1647         FILE *configFile;
1648         const int maxLineLength = 200;
1649         char lineBuffer[maxLineLength];
1650         char *token;
1651         const char *delimiters = " \t\n";
1652         char *commentBegin;
1653         in_addr_t tmpAddr;
1654         
1655         configFile = fopen(CONFIG_FILENAME, "r");
1656         if (configFile == NULL)
1657         {
1658                 printf("error opening %s:\n", CONFIG_FILENAME);
1659                 perror("");
1660                 return -1;
1661         }
1662
1663         numHostsInSystem = 0;
1664         sizeOfHostArray = 8;
1665         hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1666         
1667         while(fgets(lineBuffer, maxLineLength, configFile) != NULL)
1668         {
1669                 commentBegin = strchr(lineBuffer, '#');
1670                 if (commentBegin != NULL)
1671                         *commentBegin = '\0';
1672                 token = strtok(lineBuffer, delimiters);
1673                 while (token != NULL)
1674                 {
1675                         tmpAddr = inet_addr(token);
1676                         if ((int)tmpAddr == -1)
1677                         {
1678                                 printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1679                                 fclose(configFile);
1680                                 return -1;
1681                         }
1682                         else
1683                                 addHost(htonl(tmpAddr));
1684                         token = strtok(NULL, delimiters);
1685                 }
1686         }
1687
1688         fclose(configFile);
1689         
1690         if (numHostsInSystem < 1)
1691         {
1692                 printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1693                 return -1;
1694         }
1695 #ifdef MAC
1696         myIpAddr = getMyIpAddr("en1");
1697 #else
1698         myIpAddr = getMyIpAddr("eth0");
1699 #endif
1700         myIndexInHostArray = findHost(myIpAddr);
1701         if (myIndexInHostArray == -1)
1702         {
1703                 printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1704                 return -1;
1705         }
1706         oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1707         oidMin = oidsPerBlock * myIndexInHostArray;
1708         if (myIndexInHostArray == numHostsInSystem - 1)
1709                 oidMax = 0xFFFFFFFF;
1710         else
1711                 oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1712
1713         return 0;
1714 }
1715
1716 void addHost(unsigned int hostIp)
1717 {
1718         unsigned int *tmpArray;
1719
1720         if (findHost(hostIp) != -1)
1721                 return;
1722
1723         if (numHostsInSystem == sizeOfHostArray)
1724         {
1725                 tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1726                 memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1727                 free(hostIpAddrs);
1728                 hostIpAddrs = tmpArray;
1729         }
1730
1731         hostIpAddrs[numHostsInSystem++] = hostIp;
1732
1733         return;
1734 }
1735
1736 int findHost(unsigned int hostIp)
1737 {
1738         int i;
1739         for (i = 0; i < numHostsInSystem; i++)
1740                 if (hostIpAddrs[i] == hostIp)
1741                         return i;
1742
1743         //not found
1744         return -1;
1745 }
1746