Fixed : crashing due to pile creation
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "machinepile.h"
5 #include "mlookup.h"
6 #include "llookup.h"
7 #include "plookup.h"
8 #include "prelookup.h"
9 #include "queue.h"
10 #include <pthread.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <netdb.h>
14 #include <netinet/in.h>
15 #include <sys/types.h>
16 #include <unistd.h>
17 #include <errno.h>
18 #include <time.h>
19 #include <string.h>
20 #include <pthread.h>
21
22 #define LISTEN_PORT 2156
23 #define RECEIVE_BUFFER_SIZE 2048
24 #define NUM_THREADS 10
25 #define PREFETCH_CACHE_SIZE 1048576 //1MB
26 #define CONFIG_FILENAME "dstm.conf"
27
28 /* Global Variables */
29 extern int classsize[];
30 extern primarypfq_t pqueue; //Shared prefetch queue
31 extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
32 objstr_t *prefetchcache; //Global Prefetch cache
33 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
34 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
35 extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
36 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
37 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
38 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
39 extern objstr_t *mainobjstore;
40 unsigned int myIpAddr;
41 unsigned int *hostIpAddrs;
42 int sizeOfHostArray;
43 int numHostsInSystem;
44 int myIndexInHostArray;
45 unsigned int oidsPerBlock;
46 unsigned int oidMin;
47 unsigned int oidMax;
48
49 plistnode_t *createPiles(transrecord_t *);
50 inline int arrayLength(int *array) {
51         int i;
52         for(i=0 ;array[i] != -1; i++)
53                 ;
54         return i;
55 }
56 inline int findmax(int *array, int arraylength) {
57         int max, i;
58         max = array[0];
59         for(i = 0; i < arraylength; i++){
60                 if(array[i] > max) {
61                         max = array[i];
62                 }
63         }
64         return max;
65 }
66 /* This function is a prefetch call generated by the compiler that
67  * populates the shared primary prefetch queue*/
68 void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
69         int qnodesize;
70         int len = 0;
71
72         /* Allocate for the queue node*/
73         char *node;
74         qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); 
75         if((node = calloc(1, qnodesize)) == NULL) {
76                 printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
77                 return;
78         }
79         /* Set queue node values */
80         len = sizeof(prefetchqelem_t);
81         memcpy(node + len, &ntuples, sizeof(int));
82         len += sizeof(int);
83         memcpy(node + len, oids, ntuples*sizeof(unsigned int));
84         len += ntuples * sizeof(unsigned int);
85         memcpy(node + len, endoffsets, ntuples*sizeof(short));
86         len += ntuples * sizeof(short);
87         memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
88         /* Lock and insert into primary prefetch queue */
89         pthread_mutex_lock(&pqueue.qlock);
90         pre_enqueue((prefetchqelem_t *)node);
91         pthread_cond_signal(&pqueue.qcond);
92         pthread_mutex_unlock(&pqueue.qlock);
93 }
94
95 /* This function starts up the transaction runtime. */
96 int dstmStartup(const char * option) {
97   pthread_t thread_Listen;
98   pthread_attr_t attr;
99   int master=option!=NULL && strcmp(option, "master")==0;
100
101         if (processConfigFile() != 0)
102                 return 0; //TODO: return error value, cause main program to exit
103
104   dstmInit();
105   transInit();
106
107   if (master) {
108     pthread_attr_init(&attr);
109     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
110     pthread_create(&thread_Listen, &attr, dstmListen, NULL);
111     return 1;
112   } else {
113     dstmListen();
114     return 0;
115   }
116
117 }
118
119
120 /* This function initiates the prefetch thread
121  * A queue is shared between the main thread of execution
122  * and the prefetch thread to process the prefetch call
123  * Call from compiler populates the shared queue with prefetch requests while prefetch thread
124  * processes the prefetch requests */
125 void transInit() {
126         int t, rc;
127         //Create and initialize prefetch cache structure
128         prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
129
130         /* Initialize attributes for mutex */
131         pthread_mutexattr_init(&prefetchcache_mutex_attr);
132         pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
133         
134         pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
135
136         //Create prefetch cache lookup table
137         if(prehashCreate(HASH_SIZE, LOADFACTOR))
138                 return; //Failure
139         //Initialize primary shared queue
140         queueInit();
141         //Initialize machine pile w/prefetch oids and offsets shared queue
142         mcpileqInit();
143         //Create the primary prefetch thread 
144         pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
145         //Create and Initialize a pool of threads 
146         /* Threads are active for the entire period runtime is running */
147         for(t = 0; t< NUM_THREADS; t++) {
148                 rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
149                 if (rc) {
150                         printf("Thread create error %s, %d\n", __FILE__, __LINE__);
151                         return;
152                 }
153         }
154 }
155
156 /* This function stops the threads spawned */
157 void transExit() {
158         int t;
159         pthread_cancel(tPrefetch);
160         for(t = 0; t < NUM_THREADS; t++)
161                 pthread_cancel(wthreads[t]);
162
163         return;
164 }
165
166 /* This functions inserts randowm wait delays in the order of msec
167  * Mostly used when transaction commits retry*/
168 void randomdelay(void)
169 {
170         struct timespec req, rem;
171         time_t t;
172
173         t = time(NULL);
174         req.tv_sec = 0;
175         req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
176         nanosleep(&req, &rem);
177         return;
178 }
179
180 /* This function initializes things required in the transaction start*/
181 transrecord_t *transStart()
182 {
183         transrecord_t *tmp = malloc(sizeof(transrecord_t));
184         tmp->cache = objstrCreate(1048576);
185         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
186 #ifdef COMPILER
187         tmp->revertlist=NULL;
188 #endif
189         return tmp;
190 }
191
192 /* This function finds the location of the objects involved in a transaction
193  * and returns the pointer to the object if found in a remote location */
194 objheader_t *transRead(transrecord_t *record, unsigned int oid) {       
195         unsigned int machinenumber;
196         objheader_t *tmp, *objheader;
197         objheader_t *objcopy;
198         int size, rc, found = 0;
199         void *buf;
200         struct timespec ts;
201         struct timeval tp;
202         
203         rc = gettimeofday(&tp, NULL);
204
205         /* Convert from timeval to timespec */
206         ts.tv_nsec = tp.tv_usec * 10;
207
208         /* Search local transaction cache */
209         if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
210 #ifdef COMPILER
211           return &objheader[1];
212 #else
213           return objheader;
214 #endif
215         } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
216                 /* Look up in machine lookup table  and copy  into cache*/
217                 GETSIZE(size, objheader);
218                 size += sizeof(objheader_t);
219                 //TODO:Lock the local trans cache while copying the object here
220                 objcopy = objstrAlloc(record->cache, size);
221                 memcpy(objcopy, (void *)objheader, size);
222                 /* Insert into cache's lookup table */
223                 chashInsert(record->lookupTable, OID(objheader), objcopy); 
224 #ifdef COMPILER
225                 return &objcopy[1];
226 #else
227                 return objcopy;
228 #endif
229         } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
230                 found = 1;
231                 GETSIZE(size, tmp);
232                 size+=sizeof(objheader_t);
233                 //TODO:Lock the local  trans cache while copying the object here
234                 objcopy = objstrAlloc(record->cache, size);
235                 memcpy(objcopy, (void *)tmp, size);
236                 /* Insert into cache's lookup table */
237                 chashInsert(record->lookupTable, OID(tmp), objcopy); 
238 #ifdef COMPILER
239                 return &objcopy[1];
240 #else
241                 return objcopy;
242 #endif
243         } else {
244                 /*If object not found in prefetch cache then block until object appears in the prefetch cache */
245                 pthread_mutex_lock(&pflookup.lock);
246                 while(!found) {
247                         rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
248                         if(rc == ETIMEDOUT) {
249                                 printf("Wait timed out\n");
250                                 /* Check Prefetch cache again */
251                                 if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
252                                         found = 1;
253                                         GETSIZE(size,tmp);
254                                         size+=sizeof(objheader_t);
255                                         objcopy = objstrAlloc(record->cache, size);
256                                         memcpy(objcopy, (void *)tmp, size);
257                                         chashInsert(record->lookupTable, OID(tmp), objcopy); 
258                                         pthread_mutex_unlock(&pflookup.lock);
259 #ifdef COMPILER
260                                         return &objcopy[1];
261 #else
262                                         return objcopy;
263 #endif
264                                 } else {
265                                         pthread_mutex_unlock(&pflookup.lock);
266                                         break;
267                                 }
268                         }
269                 }
270
271                 /* Get the object from the remote location */
272                 machinenumber = lhashSearch(oid);
273                 objcopy = getRemoteObj(record, machinenumber, oid);
274                 if(objcopy == NULL) {
275                         printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
276                         return NULL;
277                 } else {
278 #ifdef COMPILER
279                   return &objcopy[1];
280 #else
281                   return objcopy;
282 #endif
283                 }
284         }
285 }
286
287 /* This function creates objects in the transaction record */
288 objheader_t *transCreateObj(transrecord_t *record, unsigned int size)
289 {
290   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
291   OID(tmp) = getNewOID();
292   tmp->version = 1;
293   tmp->rcount = 1;
294   STATUS(tmp) = NEW;
295   chashInsert(record->lookupTable, OID(tmp), tmp);
296 #ifdef COMPILER
297   return &tmp[1]; //want space after object header
298 #else
299   return tmp;
300 #endif
301 }
302
303 /* This function creates machine piles based on all machines involved in a
304  * transaction commit request */
305 plistnode_t *createPiles(transrecord_t *record) {
306         int i = 0;
307         unsigned int size;/* Represents number of bins in the chash table */
308         chashlistnode_t *curr, *ptr, *next;
309         plistnode_t *pile = NULL;
310         unsigned int machinenum;
311         void *localmachinenum;
312         objheader_t *headeraddr;
313         
314         ptr = record->lookupTable->table;
315         size = record->lookupTable->size;
316
317         for(i = 0; i < size ; i++) {
318                 curr = &ptr[i];
319                 /* Inner loop to traverse the linked list of the cache lookupTable */
320                 while(curr != NULL) {
321                         //if the first bin in hash table is empty
322                         if(curr->key == 0) {
323                                 break;
324                         }
325                         next = curr->next;
326
327                         if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
328                                 printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
329                                 return NULL;
330                         }
331
332                         //Get machine location for object id (and whether local or not)
333                         if (STATUS(headeraddr) & NEW || mhashSearch(curr->key) != NULL) {
334                                 machinenum = myIpAddr;
335                         } else  if ((machinenum = lhashSearch(curr->key)) == 0) {
336                                 printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
337                                 return NULL;
338                         }
339
340                         //Make machine groups
341                         if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
342                                 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
343                                 return NULL;
344                         }
345
346                         curr = next;
347                 }
348         }
349         return pile; 
350 }
351
352 /* This function initiates the transaction commit process
353  * Spawns threads for each of the new connections with Participants 
354  * and creates new piles by calling the createPiles(), 
355  * Sends a transrequest() to each remote machines for objects found remotely 
356  * and calls handleLocalReq() to process objects found locally */
357 int transCommit(transrecord_t *record) {        
358         unsigned int tot_bytes_mod, *listmid;
359         plistnode_t *pile, *pile_ptr;
360         int i, j, rc, val;
361         int pilecount, offset, threadnum, trecvcount;
362         char buffer[RECEIVE_BUFFER_SIZE],control;
363         char transid[TID_LEN];
364         trans_req_data_t *tosend;
365         trans_commit_data_t transinfo;
366         static int newtid = 0;
367         char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
368         char localstat = 0;
369
370         do {
371                 trecvcount = 0;
372                 threadnum = 0;
373
374                 /* Look through all the objects in the transaction record and make piles 
375                  * for each machine involved in the transaction*/
376                 pile_ptr = pile = createPiles(record);
377
378                 /* Create the packet to be sent in TRANS_REQUEST */
379
380                 /* Count the number of participants */
381                 pilecount = pCount(pile);
382
383                 /* Create a list of machine ids(Participants) involved in transaction   */
384                 if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
385                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
386                         free(record);
387                         return 1;
388                 }               
389                 pListMid(pile, listmid);
390
391
392                 /* Initialize thread variables,
393                  * Spawn a thread for each Participant involved in a transaction */
394                 pthread_t thread[pilecount];
395                 pthread_attr_t attr;                    
396                 pthread_cond_t tcond;
397                 pthread_mutex_t tlock;
398                 pthread_mutex_t tlshrd;
399
400                 thread_data_array_t *thread_data_array;
401                 if((thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount)) == NULL) {
402                         printf("Malloc error %s, %d\n", __FILE__, __LINE__);
403                         pthread_cond_destroy(&tcond);
404                         pthread_mutex_destroy(&tlock);
405                         pDelete(pile_ptr);
406                         free(listmid);
407                         free(record);
408                         return 1;
409                 }
410
411                 local_thread_data_array_t *ltdata;
412                 if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
413                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
414                         pthread_cond_destroy(&tcond);
415                         pthread_mutex_destroy(&tlock);
416                         pDelete(pile_ptr);
417                         free(listmid);
418                         free(thread_data_array);
419                         free(record);
420                         return 1;
421                 }
422
423                 thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
424
425                 /* Initialize and set thread detach attribute */
426                 pthread_attr_init(&attr);
427                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
428                 pthread_mutex_init(&tlock, NULL);
429                 pthread_cond_init(&tcond, NULL);
430
431                 /* Process each machine pile */
432                 while(pile != NULL) {
433                         //Create transaction id
434                         newtid++;
435                         if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
436                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
437                                 pthread_cond_destroy(&tcond);
438                                 pthread_mutex_destroy(&tlock);
439                                 pDelete(pile_ptr);
440                                 free(listmid);
441                                 free(thread_data_array);
442                                 free(ltdata);
443                                 free(record);
444                                 return 1;
445                         }
446                         tosend->f.control = TRANS_REQUEST;
447                         sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
448                         tosend->f.mcount = pilecount;
449                         tosend->f.numread = pile->numread;
450                         tosend->f.nummod = pile->nummod;
451                         tosend->f.numcreated = pile->numcreated;
452                         tosend->f.sum_bytes = pile->sum_bytes;
453                         tosend->listmid = listmid;
454                         tosend->objread = pile->objread;
455                         tosend->oidmod = pile->oidmod;
456                         tosend->oidcreated = pile->oidcreated;
457                         thread_data_array[threadnum].thread_id = threadnum;
458                         thread_data_array[threadnum].mid = pile->mid;
459                         thread_data_array[threadnum].buffer = tosend;
460                         thread_data_array[threadnum].recvmsg = rcvd_control_msg;
461                         thread_data_array[threadnum].threshold = &tcond;
462                         thread_data_array[threadnum].lock = &tlock;
463                         thread_data_array[threadnum].count = &trecvcount;
464                         thread_data_array[threadnum].replyctrl = &treplyctrl;
465                         thread_data_array[threadnum].replyretry = &treplyretry;
466                         thread_data_array[threadnum].rec = record;
467                         /* If local do not create any extra connection */
468                         if(pile->mid != myIpAddr) { /* Not local */
469                                 rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
470                                 if(rc) {
471                                         perror("Error in pthread create\n");
472                                         pthread_cond_destroy(&tcond);
473                                         pthread_mutex_destroy(&tlock);
474                                         pDelete(pile_ptr);
475                                         free(listmid);
476                                         for (i = 0; i < threadnum; i++)
477                                                 free(thread_data_array[i].buffer);
478                                         free(thread_data_array);
479                                         free(ltdata);
480                                         free(record);
481                                         return 1;
482                                 }
483                         } else { /*Local*/
484                                 ltdata->tdata = &thread_data_array[threadnum];
485                                 ltdata->transinfo = &transinfo;
486                                 val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
487                                 if(val) {
488                                         perror("Error in pthread create\n");
489                                         pthread_cond_destroy(&tcond);
490                                         pthread_mutex_destroy(&tlock);
491                                         pDelete(pile_ptr);
492                                         free(listmid);
493                                         for (i = 0; i < threadnum; i++)
494                                                 free(thread_data_array[i].buffer);
495                                         free(thread_data_array);
496                                         free(ltdata);
497                                         free(record);
498                                         return 1;
499                                 }
500                         }
501
502                         threadnum++;            
503                         pile = pile->next;
504                 }
505
506                 /* Free attribute and wait for the other threads */
507                 pthread_attr_destroy(&attr);
508                 
509                 for (i = 0; i < pilecount; i++) {
510                         rc = pthread_join(thread[i], NULL);
511                         if(rc)
512                         {
513                                 printf("ERROR return code from pthread_join() is %d\n", rc);
514                                 pthread_cond_destroy(&tcond);
515                                 pthread_mutex_destroy(&tlock);
516                                 pDelete(pile_ptr);
517                                 free(listmid);
518                                 for (j = i; j < pilecount; j++)
519                                         free(thread_data_array[j].buffer);
520                                 free(thread_data_array);
521                                 free(ltdata);
522                                 free(record);
523                                 return 1;
524                         }
525                         free(thread_data_array[i].buffer);
526                 }
527         
528
529                 /* Free resources */    
530                 pthread_cond_destroy(&tcond);
531                 pthread_mutex_destroy(&tlock);
532                 free(listmid);
533                 pDelete(pile_ptr);
534                 free(thread_data_array);
535                 free(ltdata);
536
537                 /* wait a random amount of time */
538                 if (treplyretry == 1)
539                         randomdelay();
540
541         /* Retry trans commit procedure if not sucessful in the first try */
542         } while (treplyretry == 1);
543         
544         /* Free Resources */
545         objstrDelete(record->cache);
546         chashDelete(record->lookupTable);
547         free(record);
548         return 0;
549 }
550
551 /* This function sends information involved in the transaction request 
552  * to participants and accepts a response from particpants.
553  * It calls decideresponse() to decide on what control message 
554  * to send next to participants and sends the message using sendResponse()*/
555 void *transRequest(void *threadarg) {
556         int sd, i, n;
557         struct sockaddr_in serv_addr;
558         struct hostent *server;
559         thread_data_array_t *tdata;
560         objheader_t *headeraddr;
561         char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
562         char machineip[16], retval;
563
564         tdata = (thread_data_array_t *) threadarg;
565
566         /* Send Trans Request */
567         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
568                 perror("Error in socket for TRANS_REQUEST\n");
569                 pthread_exit(NULL);
570         }
571         bzero((char*) &serv_addr, sizeof(serv_addr));
572         serv_addr.sin_family = AF_INET;
573         serv_addr.sin_port = htons(LISTEN_PORT);
574         midtoIP(tdata->mid,machineip);
575         machineip[15] = '\0';
576         serv_addr.sin_addr.s_addr = inet_addr(machineip);
577         /* Open Connection */
578         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
579                 perror("Error in connect for TRANS_REQUEST\n");
580                 close(sd);
581                 pthread_exit(NULL);
582         }
583
584         printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
585         /* Send bytes of data with TRANS_REQUEST control message */
586         if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
587                 perror("Error sending fixed bytes for thread\n");
588                 close(sd);
589                 pthread_exit(NULL);
590         }
591         /* Send list of machines involved in the transaction */
592         {
593                 int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
594                 if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
595                         perror("Error sending list of machines for thread\n");
596                         close(sd);
597                         pthread_exit(NULL);
598                 }
599         }
600         /* Send oids and version number tuples for objects that are read */
601         {
602                 int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
603                 if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
604                         perror("Error sending tuples for thread\n");
605                         close(sd);
606                         pthread_exit(NULL);
607                 }
608         }
609         /* Send objects that are modified */
610         for(i = 0; i < tdata->buffer->f.nummod ; i++) {
611                 int size;
612                 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
613                 GETSIZE(size,headeraddr);
614                 size+=sizeof(objheader_t);
615                 if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
616                         perror("Error sending obj modified for thread\n");
617                         close(sd);
618                         pthread_exit(NULL);
619                 }
620         }
621
622         /* Read control message from Participant */
623         if((n = read(sd, &control, sizeof(char))) <= 0) {
624                 perror("Error in reading control message from Participant\n");
625                 close(sd);
626                 pthread_exit(NULL);
627         }
628         recvcontrol = control;
629
630         /* Update common data structure and increment count */
631         tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
632
633         /* Lock and update count */
634         /* Thread sleeps until all messages from pariticipants are received by coordinator */
635         pthread_mutex_lock(tdata->lock);
636
637         (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
638
639         /* Wake up the threads and invoke decideResponse (once) */
640         if(*(tdata->count) == tdata->buffer->f.mcount) {
641                 decideResponse(tdata); 
642                 pthread_cond_broadcast(tdata->threshold);
643         } else {
644                 pthread_cond_wait(tdata->threshold, tdata->lock);
645         }
646         pthread_mutex_unlock(tdata->lock);
647
648         /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
649          * to all participants in their respective socket */
650         if (sendResponse(tdata, sd) == 0) { 
651                 printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
652                 close(sd);
653                 pthread_exit(NULL);
654         }
655
656         /* Close connection */
657         close(sd);
658         pthread_exit(NULL);
659 }
660
661 /* This function decides the reponse that needs to be sent to 
662  * all Participant machines after the TRANS_REQUEST protocol */
663 void decideResponse(thread_data_array_t *tdata) {
664         char control;
665         int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
666                                                                          message to send */
667
668         for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
669                 control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
670                                                            written onto the shared array */
671                 switch(control) {
672                         default:
673                                 printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
674                                 /* treat as disagree, pass thru */
675                         case TRANS_DISAGREE:
676                                 transdisagree++;
677                                 break;
678
679                         case TRANS_AGREE:
680                                 transagree++;
681                                 break;
682
683                         case TRANS_SOFT_ABORT:
684                                 transsoftabort++;
685                                 break;
686                 }
687         }
688
689         if(transdisagree > 0) {
690                 /* Send Abort */
691                 *(tdata->replyctrl) = TRANS_ABORT;
692         } else if(transagree == tdata->buffer->f.mcount){
693                 /* Send Commit */
694                 *(tdata->replyctrl) = TRANS_COMMIT;
695         } else { 
696                 /* Send Abort in soft abort case followed by retry commiting transaction again*/
697                 *(tdata->replyctrl) = TRANS_ABORT;
698                 *(tdata->replyretry) = 1;
699         }
700
701         return;
702 }
703 /* This function sends the final response to remote machines per thread in their respective socket id */
704 char sendResponse(thread_data_array_t *tdata, int sd) {
705         int n, N, sum, oidcount = 0;
706         char *ptr, retval = 0;
707         unsigned int *oidnotfound;
708
709         /* If the decided response is due to a soft abort and missing objects at the Participant's side */
710         if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
711                 /* Read list of objects missing */
712                 if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
713                         N = oidcount * sizeof(unsigned int);
714                         if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
715                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
716                                 return 0;
717                         }
718                         ptr = (char *) oidnotfound;
719                         do {
720                                 n = read(sd, ptr+sum, N-sum);
721                                 sum += n;
722                         } while(sum < N && n !=0);
723                 }
724                 retval =  TRANS_SOFT_ABORT;
725         }
726         /* If the decided response is TRANS_ABORT */
727         if(*(tdata->replyctrl) == TRANS_ABORT) {
728                 retval = TRANS_ABORT;
729         } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
730                 retval = TRANS_COMMIT;
731         }
732
733         if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
734                 perror("Error sending ctrl message for participant\n");
735         }
736
737         return retval;
738 }
739
740 /* This function opens a connection, places an object read request to the 
741  * remote machine, reads the control message and object if available  and 
742  * copies the object and its header to the local cache.
743  * TODO replace mnum and midtoIP() with MACHINE_IP address later */ 
744
745 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
746         int sd, size, val;
747         struct sockaddr_in serv_addr;
748         struct hostent *server;
749         char control;
750         char machineip[16];
751         objheader_t *h;
752         void *objcopy;
753
754         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
755                 perror("Error in socket\n");
756                 return NULL;
757         }
758         bzero((char*) &serv_addr, sizeof(serv_addr));
759         serv_addr.sin_family = AF_INET;
760         serv_addr.sin_port = htons(LISTEN_PORT);
761         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
762         midtoIP(mnum,machineip);
763         machineip[15] = '\0';
764         serv_addr.sin_addr.s_addr = inet_addr(machineip);
765         /* Open connection */
766         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
767                 perror("Error in connect\n");
768                 return NULL;
769         }
770         char readrequest[sizeof(char)+sizeof(unsigned int)];
771         readrequest[0] = READ_REQUEST;
772         *((unsigned int *)(&readrequest[1])) = oid;
773         if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
774                 perror("Error sending message\n");
775                 return NULL;
776         }
777
778 #ifdef DEBUG1
779         printf("DEBUG -> ready to rcv ...\n");
780 #endif
781         /* Read response from the Participant */
782         if((val = read(sd, &control, sizeof(char))) <= 0) {
783                 perror("No control response for getRemoteObj sent\n");
784                 return NULL;
785         }
786         switch(control) {
787                 case OBJECT_NOT_FOUND:
788                         return NULL;
789                 case OBJECT_FOUND:
790                         /* Read object if found into local cache */
791                         if((val = read(sd, &size, sizeof(int))) <= 0) {
792                                 perror("No size is read from the participant\n");
793                                 return NULL;
794                         }
795                         objcopy = objstrAlloc(record->cache, size);
796                         if((val = read(sd, objcopy, size)) <= 0) {
797                                 perror("No objects are read from the remote participant\n");
798                                 return NULL;
799                         }
800                         /* Insert into cache's lookup table */
801                         chashInsert(record->lookupTable, oid, objcopy); 
802                         break;
803                 default:
804                         printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
805                         return NULL;
806         }
807         /* Close connection */
808         close(sd);
809         return objcopy;
810 }
811
812 /* This function handles the local objects involved in a transaction commiting process.
813  * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator.
814  * Note Coordinator = local machine
815  * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and
816  * based on common agreement it either commits or aborts the transaction.
817  * It also frees the memory resources */
818 void *handleLocalReq(void *threadarg) {
819         int val, i = 0, size, offset = 0;
820         short version;
821         char control = 0, *ptr;
822         unsigned int oid;
823         unsigned int *oidnotfound = NULL, *oidlocked = NULL;
824         void *mobj, *modptr;
825         objheader_t *headptr, *headeraddr;
826         local_thread_data_array_t *localtdata;
827
828         localtdata = (local_thread_data_array_t *) threadarg;
829
830         /* Counters and arrays to formulate decision on control message to be sent */
831         oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
832         oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
833         int objnotfound = 0, objlocked = 0; 
834         int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
835
836         /* modptr points to the beginning of the object store 
837          * created at the Pariticipant */ 
838         pthread_mutex_lock(&mainobjstore_mutex);
839         if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
840                 printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
841                 pthread_mutex_unlock(&mainobjstore_mutex);
842                 pthread_exit(NULL);
843         }
844         pthread_mutex_unlock(&mainobjstore_mutex);
845         /* Write modified objects into the mainobject store */
846         for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
847                 headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
848                 GETSIZE(size,headeraddr);
849                 size+=sizeof(objheader_t);
850                 memcpy((char *)modptr+offset, headeraddr, size);  
851                 offset += size;
852         }
853         /* Write new objects into the mainobject store */
854         for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) {
855                 headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]);
856                 GETSIZE(size, headeraddr);
857                 size+=sizeof(objheader_t);
858                 memcpy((char *)modptr+offset, headeraddr, size);  
859                 offset += size;
860         }
861
862         ptr = modptr;
863         offset = 0; //Reset 
864
865         /* Process each oid in the machine pile/ group per thread */
866         for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
867                 if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified
868                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
869                         incr *= i;
870                         oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr));
871                         incr += sizeof(unsigned int);
872                         version = *((short *)(localtdata->tdata->buffer->objread + incr));
873                 } else {//Objs modified
874                         int tmpsize;
875                         headptr = (objheader_t *)ptr;
876                         oid = OID(headptr);
877                         version = headptr->version;
878                         GETSIZE(tmpsize, headptr);
879                         ptr += sizeof(objheader_t) + tmpsize;
880                 }
881
882                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
883
884                 /* Save the oids not found and number of oids not found for later use */
885                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
886                         /* Save the oids not found and number of oids not found for later use */
887                         oidnotfound[objnotfound] = oid;
888                         objnotfound++;
889                 } else { /* If Obj found in machine (i.e. has not moved) */
890                         /* Check if Obj is locked by any previous transaction */
891                         if (STATUS(((objheader_t *)mobj)) & LOCK) {
892                                 if (version == ((objheader_t *)mobj)->version) {      /* If not locked then match versions */ 
893                                         v_matchlock++;
894                                 } else {/* If versions don't match ...HARD ABORT */
895                                         v_nomatch++;
896                                         /* Send TRANS_DISAGREE to Coordinator */
897                                         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
898                                 }
899                         } else {/* If Obj is not locked then lock object */
900                                 STATUS(((objheader_t *)mobj)) |= LOCK;
901                                 //TODO Remove this for Testing
902                                 //randomdelay(); -- Why is this here.  BCD
903
904                                 /* Save all object oids that are locked on this machine during this transaction request call */
905                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
906                                 objlocked++;
907                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
908                                         v_matchnolock++;
909                                 } else { /* If versions don't match ...HARD ABORT */
910                                         v_nomatch++;
911                                         /* Send TRANS_DISAGREE to Coordinator */
912                                         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
913                                 }
914                         }
915                 }
916         }
917
918         /* Condition to send TRANS_AGREE */
919         if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
920                 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
921         }
922         /* Condition to send TRANS_SOFT_ABORT */
923         if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
924                 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
925                 //TODO  currently the only soft abort case that is supported is when object locked by previous
926                 //transaction => v_matchlock > 0 
927                 //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported 
928                 /* Send number of oids not found and the missing oids if objects are missing in the machine */
929                 /* TODO Remember to store the oidnotfound for later use
930                    if(objnotfound != 0) {
931                    int size = sizeof(unsigned int)* objnotfound;
932                    }
933                    */
934         }
935
936         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
937          * if Participant receives a TRANS_COMMIT */
938         localtdata->transinfo->objlocked = oidlocked;
939         localtdata->transinfo->objnotfound = oidnotfound;
940         localtdata->transinfo->modptr = modptr;
941         localtdata->transinfo->numlocked = objlocked;
942         localtdata->transinfo->numnotfound = objnotfound;
943
944         /* Lock and update count */
945         //Thread sleeps until all messages from pariticipants are received by coordinator
946         pthread_mutex_lock(localtdata->tdata->lock);
947         (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
948
949         /* Wake up the threads and invoke decideResponse (once) */
950         if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
951                 decideResponse(localtdata->tdata); 
952                 pthread_cond_broadcast(localtdata->tdata->threshold);
953         } else {
954                 pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
955         }
956         pthread_mutex_unlock(localtdata->tdata->lock);
957
958         /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
959         if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
960                 if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
961                         printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
962                         pthread_exit(NULL);
963                 }
964         }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
965                 if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) {
966                         printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
967                         pthread_exit(NULL);
968                 }
969         }
970
971         /* Free memory */
972         if (localtdata->transinfo->objlocked != NULL) {
973                 free(localtdata->transinfo->objlocked);
974                 localtdata->transinfo->objlocked = NULL;
975         }
976         if (localtdata->transinfo->objnotfound != NULL) {
977                 free(localtdata->transinfo->objnotfound);
978                 localtdata->transinfo->objnotfound = NULL;
979         }
980
981         pthread_exit(NULL);
982 }
983 /* This function completes the ABORT process if the transaction is aborting 
984 */
985 int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
986         char *ptr;
987         int i;
988         objheader_t *tmp_header;
989         void *header;
990
991         /* Set all ref counts as 1 and do garbage collection */
992         ptr = modptr;
993         for(i = 0; i< nummod; i++) {
994                 int tmpsize;
995                 tmp_header = (objheader_t *)ptr;
996                 tmp_header->rcount = 0;
997                 GETSIZE(tmpsize, tmp_header);
998                 ptr += sizeof(objheader_t) + tmpsize;
999         }
1000         /* Unlock objects that was locked due to this transaction */
1001         for(i = 0; i< numlocked; i++) {
1002                 if((header = mhashSearch(objlocked[i])) == NULL) {
1003                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1004                         return 1;
1005                 }
1006                 STATUS(((objheader_t *)header)) &= ~(LOCK);
1007         }
1008
1009         /* Send ack to Coordinator */
1010         printf("TRANS_SUCCESSFUL\n");
1011
1012         /*Free the pointer */
1013         ptr = NULL;
1014         return 0;
1015 }
1016
1017 /*This function completes the COMMIT process is the transaction is commiting
1018 */
1019 int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated, unsigned int *objlocked, int nummod, int numcreated, int numlocked) {
1020         objheader_t *header;
1021         int i = 0, offset = 0;
1022         char control;
1023
1024         /* Process each modified object saved in the mainobject store */
1025         for(i = 0; i < nummod; i++) {
1026           int tmpsize;
1027                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1028                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1029                         return 1;
1030                 }
1031                 /* Change reference count of older address and free space in objstr ?? */
1032                 header->rcount = 0;
1033
1034                 /* Change ptr address in mhash table */
1035                 mhashRemove(oidmod[i]); //TODO: this shouldn't be necessary
1036                 mhashInsert(oidmod[i], (((char *)modptr) + offset));
1037                 GETSIZE(tmpsize, header);
1038                 offset += sizeof(objheader_t) + tmpsize;
1039
1040                 /* Update object version number */
1041                 header = (objheader_t *) mhashSearch(oidmod[i]);
1042                 header->version += 1;
1043         }
1044
1045         /*If object is in prefetch cache then update it in prefetch cache */ 
1046
1047
1048         /* If object is newly created inside transaction then commit it */
1049         for (i = 0; i < numcreated; i++)
1050         {
1051                 int tmpsize;
1052                 header = (objheader_t *)(((char *)modptr) + offset);
1053                 mhashInsert(oidcreated[i], (((char *)modptr) + offset));
1054                 GETSIZE(tmpsize, header);
1055                 offset += sizeof(objheader_t) + tmpsize;
1056                 lhashInsert(oidcreated[i], myIpAddr);
1057         }
1058
1059         /* Unlock locked objects */
1060         for(i = 0; i < numlocked; i++) {
1061                 if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) {
1062                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1063                         return 1;
1064                 }
1065                 STATUS(header) &= ~(LOCK);
1066         }
1067
1068         //TODO Update location lookup table
1069
1070         /* Send ack to Coordinator */
1071         printf("TRANS_SUCCESSFUL\n");
1072         return 0;
1073 }
1074
1075 /* This function checks if the prefetch oids are same and have same offsets  
1076  * for case x.a.b and y.a.b where x and y have same oid's
1077  * or if a.b.c is a subset of x.b.c.d*/ 
1078 /* check for case where the generated request a.y.z or x.y.z.g then 
1079  * prefetch needs to be generated for x.y.z.g  if oid of a and x are same*/
1080 void checkPrefetchTuples(prefetchqelem_t *node) {
1081         int i,j, count,k, sindex, index;
1082         char *ptr, *tmp;
1083         int ntuples, slength;
1084         unsigned int *oid;
1085         short *endoffsets, *arryfields; 
1086
1087         /* Check for the case x.y.z and a.b.c are same oids */ 
1088         ptr = (char *) node;
1089         ntuples = *(GET_NTUPLES(ptr));
1090         oid = GET_PTR_OID(ptr);
1091         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1092         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1093         /* Find offset length for each tuple */
1094         int numoffset[ntuples];
1095         numoffset[0] = endoffsets[0];
1096         for(i = 1; i<ntuples; i++) {
1097                 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1098         }
1099         /* Check for redundant tuples by comparing oids of each tuple */
1100         for(i = 0; i < ntuples; i++) {
1101                 if(oid[i] == -1)
1102                         continue;
1103                 for(j = i+1 ; j < ntuples; j++) {
1104                         if(oid[j] == -1)
1105                                 continue;
1106                         /*If oids of tuples match */ 
1107                         if (oid[i] == oid[j]) {
1108                                 /* Find the smallest offset length of two tuples*/
1109                                 if(numoffset[i] >  numoffset[j]){
1110                                         slength = numoffset[j];
1111                                         sindex = j;
1112                                 }
1113                                 else {
1114                                         slength = numoffset[i];
1115                                         sindex = i;
1116                                 }
1117
1118                                 /* Compare the offset values based on the current indices
1119                                  * break if they do not match
1120                                  * if all offset values match then pick the largest tuple*/
1121
1122                                 if(i == 0) {
1123                                         k = 0;
1124                                         index = endoffsets[j -1];
1125                                         for(count = 0; count < slength; count ++) {
1126                                                 if (arryfields[k] != arryfields[index]) { 
1127                                                         break;
1128                                                 }
1129                                                 index++;
1130                                                 k++;
1131                                         }       
1132                                 } else {
1133                                         k = endoffsets[i-1];
1134                                         index = endoffsets[j-1];
1135                                         printf("Value of slength = %d\n", slength);
1136                                         for(count = 0; count < slength; count++) {
1137                                                 if(arryfields[k] != arryfields[index]) {
1138                                                         break;
1139                                                 }
1140                                                 index++;
1141                                                 k++;
1142                                         }
1143                                 }
1144
1145                                 if(slength == count) {
1146                                         oid[sindex] = -1;
1147                                 }
1148                         }
1149                 }
1150         }
1151 }
1152
1153 void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopcount, unsigned int objoid, int index, int iter, int oidnfound) {
1154         char *ptr, *tmp;
1155         int ntuples, i, k, flag;
1156         unsigned int * oid;
1157         short *endoffsets, *arryfields;
1158         objheader_t *header;
1159
1160         ptr = (char *) node;
1161         ntuples = *(GET_NTUPLES(ptr));
1162         oid = GET_PTR_OID(ptr);
1163         endoffsets = GET_PTR_EOFF(ptr, ntuples);
1164         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1165
1166         if(oidnfound == 1) {
1167                 if((header = (objheader_t *) prehashSearch(objoid)) == NULL) {
1168                         return;
1169                 } else { //Found in Prefetch Cache
1170                         //TODO Decide if object is too old, if old remove from cache
1171                         tmp = (char *) header;
1172                         /* Check if any of the offset oid is available in the Prefetch cache */
1173                         for(i = counter; i < loopcount; i++) {
1174                                 objoid = *(tmp + sizeof(objheader_t) + arryfields[counter]);
1175                                 if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
1176                                         flag = 0;
1177                                 } else {
1178                                         flag = 1;
1179                                         break;
1180                                 }
1181                         }
1182                 }
1183         } else {
1184                 for(i = counter; i<loopcount; i++) {
1185                         if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
1186                                 tmp = (char *) header;
1187                                 objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
1188                                 flag = 0;
1189                                 index++;
1190                         } else {
1191                                 flag = 1;
1192                                 break;
1193                         }
1194                 }
1195         }
1196
1197         /* If oid not found locally or in prefetch cache then 
1198          * assign the latest oid found as the new oid 
1199          * and copy left over offsets into the arrayoffsetfieldarray*/
1200         oid[iter] = objoid;
1201         numoffset[iter] = numoffset[iter] - (i+1);
1202         for(k = 0; k < numoffset[iter] ; k++) {
1203                 arryfields[endoffsets[counter]+k] = arryfields[endoffsets[counter]+k+1];
1204         }
1205
1206         if(flag == 0) {
1207                 oid[iter] = -1;
1208                 numoffset[iter] = 0;
1209         }
1210 }
1211
1212 /* This function makes machine piles to be added into the machine pile queue for each prefetch call */
1213 prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
1214         char *ptr, *tmp;
1215         int ntuples, slength, i, machinenum;
1216         int maxoffset;
1217         unsigned int *oid;
1218         short *endoffsets, *arryfields, *offset; 
1219         prefetchpile_t *head = NULL;
1220
1221         /* Check for the case x.y.z and a.b.c are same oids */ 
1222         ptr = (char *) node;
1223         ntuples = *(GET_NTUPLES(ptr));
1224         oid = GET_PTR_OID(ptr);
1225         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1226         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1227
1228         /* Check for redundant tuples by comparing oids of each tuple */
1229         for(i = 0; i < ntuples; i++) {
1230                 if(oid[i] == -1)
1231                         continue;
1232                 /* For each tuple make piles */
1233                 if ((machinenum = lhashSearch(oid[i])) == 0) {
1234                         printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
1235                         return NULL;
1236                 }
1237                 /* Insert into machine pile */
1238                 offset = &arryfields[endoffsets[i-1]];
1239                 insertPile(machinenum, oid[i], numoffset[i], offset, &head);
1240         }
1241         return head;
1242 }
1243
1244
1245 /* This function checks if the oids within the prefetch tuples are available locally.
1246  * If yes then makes the tuple invalid. If no then rearranges oid and offset values in 
1247  * the prefetchqelem_t node to represent a new prefetch tuple */
1248 prefetchpile_t *foundLocal(prefetchqelem_t *node) {
1249         int ntuples,i, j, k, oidnfound = 0, index, flag;
1250         unsigned int *oid;
1251         unsigned int  objoid;
1252         char *ptr, *tmp;
1253         objheader_t *objheader;
1254         short *endoffsets, *arryfields; 
1255         prefetchpile_t *head = NULL;
1256
1257         ptr = (char *) node;
1258         ntuples = *(GET_NTUPLES(ptr));
1259         oid = GET_PTR_OID(ptr);
1260         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1261         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1262         /* Find offset length for each tuple */
1263         int numoffset[ntuples];//Number of offsets for each tuple
1264         numoffset[0] = endoffsets[0];
1265         for(i = 1; i<ntuples; i++) {
1266                 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1267         }
1268         for(i = 0; i < ntuples; i++) { 
1269                 if(oid[i] == -1)
1270                         continue;
1271                 /* If object found locally */
1272                 if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) { 
1273                         oidnfound = 0;
1274                         tmp = (char *) objheader;
1275                         /* Find the oid of its offset value */
1276                         if(i == 0) 
1277                                 index = 0;
1278                         else 
1279                                 index = endoffsets[i - 1];
1280                         for(j = 0 ; j < numoffset[i] ; j++) {
1281                                 objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
1282                                 /*If oid found locally then 
1283                                  *assign the latest oid found as the new oid 
1284                                  *and copy left over offsets into the arrayoffsetfieldarray*/
1285                                 oid[i] = objoid;
1286                                 numoffset[i] = numoffset[i] - (j+1);
1287                                 for(k = 0; k < numoffset[i]; k++)
1288                                         arryfields[endoffsets[j]+ k] = arryfields[endoffsets[j]+k+1];
1289                                 index++;
1290                                 /*New offset oid not found */
1291                                 if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
1292                                         flag = 1;
1293                                         checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound); 
1294                                         break;
1295                                 } else 
1296                                         flag = 0;
1297                         }
1298
1299                         /*If all offset oids are found locally,make the prefetch tuple invalid */
1300                         if(flag == 0) {
1301                                 oid[i] = -1;
1302                                 numoffset[i] = 0;
1303                         }
1304                 } else {
1305                         oidnfound = 1;
1306                         /* Look in Prefetch cache */
1307                         checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); 
1308                 }
1309
1310         }
1311         /* Make machine groups */
1312         head = makePreGroups(node, numoffset);
1313         return head;
1314 }
1315
1316 /* This function is called by the thread calling transPrefetch */
1317 void *transPrefetch(void *t) {
1318         prefetchqelem_t *qnode;
1319         prefetchpile_t *pilehead = NULL;
1320
1321         while(1) {
1322                 /* lock mutex of primary prefetch queue */
1323                 pthread_mutex_lock(&pqueue.qlock);
1324                 /* while primary queue is empty, then wait */
1325                 while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
1326                         pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
1327                 }
1328
1329                 /* dequeue node to create a machine piles and  finally unlock mutex */
1330                 if((qnode = pre_dequeue()) == NULL) {
1331                         printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
1332                         pthread_exit(NULL);
1333                 }
1334                 pthread_mutex_unlock(&pqueue.qlock);
1335                 /* Reduce redundant prefetch requests */
1336                 checkPrefetchTuples(qnode);
1337                 /* Check if the tuples are found locally, if yes then reduce them further*/ 
1338                 /* and group requests by remote machine ids by calling the makePreGroups() */
1339                 pilehead = foundLocal(qnode);
1340
1341                 /* Lock mutex of pool queue */
1342                 pthread_mutex_lock(&mcqueue.qlock);
1343                 /* Update the pool queue with the new remote machine piles generated per prefetch call */
1344                 mcpileenqueue(pilehead);
1345                 /* Broadcast signal on machine pile queue */
1346                 pthread_cond_broadcast(&mcqueue.qcond);
1347                 /* Unlock mutex of  machine pile queue */
1348                 pthread_mutex_unlock(&mcqueue.qlock);
1349                 /* Deallocate the prefetch queue pile node */
1350                 predealloc(qnode);
1351
1352         }
1353 }
1354
1355 /* Each thread in the  pool of threads calls this function to establish connection with
1356  * remote machines, send the prefetch requests and process the reponses from
1357  * the remote machines .
1358  * The thread is active throughout the period of runtime */
1359
1360 void *mcqProcess(void *threadid) {
1361         int tid;
1362         prefetchpile_t *mcpilenode;
1363
1364         tid = (int) threadid;
1365         while(1) {
1366                 /* Lock mutex of mc pile queue */
1367                 pthread_mutex_lock(&mcqueue.qlock);
1368                 /* When mc pile queue is empty, wait */
1369                 while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
1370                         pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
1371                 }
1372                 /* Dequeue node to send remote machine connections*/
1373                 if((mcpilenode = mcpiledequeue()) == NULL) {
1374                         printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
1375                         pthread_exit(NULL);
1376                 }
1377                 /* Unlock mutex */
1378                 pthread_mutex_unlock(&mcqueue.qlock);
1379
1380                 /*Initiate connection to remote host and send request */ 
1381                 /* Process Request */
1382                 sendPrefetchReq(mcpilenode, tid);
1383
1384                 /* Deallocate the machine queue pile node */
1385                 mcdealloc(mcpilenode);
1386         }
1387 }
1388
1389 void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
1390         int sd, i, offset, off, len, endpair, count = 0;
1391         struct sockaddr_in serv_addr;
1392         struct hostent *server;
1393         char machineip[16], control;
1394         objpile_t *tmp;
1395
1396
1397         /* Send Trans Prefetch Request */
1398         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1399                 perror("Error in socket for TRANS_REQUEST\n");
1400                 return;
1401         }
1402         bzero((char*) &serv_addr, sizeof(serv_addr));
1403         serv_addr.sin_family = AF_INET;
1404         serv_addr.sin_port = htons(LISTEN_PORT);
1405         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
1406         midtoIP(mcpilenode->mid ,machineip);
1407         machineip[15] = '\0';
1408         serv_addr.sin_addr.s_addr = inet_addr(machineip);
1409
1410         /* Open Connection */
1411         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
1412                 perror("Error in connect for TRANS_REQUEST\n");
1413                 close(sd);
1414                 return;
1415         }
1416
1417         /* Send TRANS_PREFETCH control message */
1418         control = TRANS_PREFETCH;
1419         if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
1420                 perror("Error in sending prefetch control\n");
1421                 close(sd);
1422                 return;
1423         }
1424
1425         /* Send Oids and offsets in pairs */
1426         tmp = mcpilenode->objpiles;
1427         while(tmp != NULL) {
1428                 off = offset = 0;
1429                 count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
1430                 len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1431                 char oidnoffset[len];
1432                 memcpy(oidnoffset, &len, sizeof(int));
1433                 off = sizeof(int);
1434                 memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
1435                 off += sizeof(unsigned int);
1436                 for(i = 0; i < tmp->numoffset; i++) {
1437                         memcpy(oidnoffset + off, &tmp->offset[i], sizeof(short));
1438                         off+=sizeof(short);
1439                 }
1440                 if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
1441                         perror("Error sending fixed bytes for thread\n");
1442                         close(sd);
1443                         return;
1444                 }
1445                 tmp = tmp->next;
1446         }
1447
1448         /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1449         endpair = -1;
1450         if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
1451                 perror("Error sending fixed bytes for thread\n");
1452                 close(sd);
1453                 return;
1454         }
1455
1456         /* Get Response from the remote machine */
1457         getPrefetchResponse(count,sd);
1458         close(sd);
1459         return;
1460 }
1461
1462 void getPrefetchResponse(int count, int sd) {
1463         int i = 0, val, n, N, sum, index, objsize;
1464         unsigned int bufsize,oid;
1465         char buffer[RECEIVE_BUFFER_SIZE], control;
1466         char *ptr;
1467         void *modptr, *oldptr;
1468
1469         /* Read  prefetch response from the Remote machine */
1470         if((val = read(sd, &control, sizeof(char))) <= 0) {
1471                 perror("No control response for Prefetch request sent\n");
1472                 return;
1473         }
1474
1475         if(control == TRANS_PREFETCH_RESPONSE) {
1476                 /*For each oid and offset tuple sent as prefetch request to remote machine*/
1477                 while(i < count) {
1478                         sum = 0;
1479                         index = 0;
1480                         /* Read the size of buffer to be received */
1481                         if((N = read(sd, buffer, sizeof(unsigned int))) <= 0) {
1482                                 perror("Size of buffer not recv\n");
1483                                 return;
1484                         }
1485                         memcpy(&bufsize, buffer, sizeof(unsigned int));
1486                         ptr = buffer + sizeof(unsigned int);
1487                         /* Keep receiving the buffer containing oid info */ 
1488                         do {
1489                                 n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0);
1490                                 sum +=n;
1491                         } while(sum < bufsize && n != 0);
1492                         /* Decode the contents of the buffer */
1493                         index = sizeof(unsigned int);
1494                         while(index < (bufsize - sizeof(unsigned int))) {
1495                                 if(buffer[index] == OBJECT_FOUND) {
1496                                         /* Increment it to get the object */
1497                                         index += sizeof(char);
1498                                         memcpy(&oid, buffer + index, sizeof(unsigned int));
1499                                         index += sizeof(unsigned int);
1500                                         /* For each object found add to Prefetch Cache */
1501                                         memcpy(&objsize, buffer + index, sizeof(int));
1502                                         index+=sizeof(int);
1503                                         pthread_mutex_lock(&prefetchcache_mutex);
1504                                         if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
1505                                                 printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1506                                                 pthread_mutex_unlock(&prefetchcache_mutex);
1507                                                 return;
1508                                         }
1509                                         pthread_mutex_unlock(&prefetchcache_mutex);
1510                                         memcpy(modptr, buffer+index, objsize);
1511                                         index += objsize;
1512                                         /* Insert the oid and its address into the prefetch hash lookup table */
1513                                         /* Do a version comparison if the oid exists */
1514                                         if((oldptr = prehashSearch(oid)) != NULL) {
1515                                                 /* If older version then update with new object ptr */
1516                                                 if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
1517                                                         prehashRemove(oid);
1518                                                         prehashInsert(oid, modptr);
1519                                                 } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { 
1520                                                         /* Add the new object ptr to hash table */
1521                                                         prehashInsert(oid, modptr);
1522                                                 } else { /* Do nothing */
1523                                                         ;
1524                                                 }
1525                                         } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
1526                                                 prehashInsert(oid, modptr);
1527                                         }
1528                                         /* Lock the Prefetch Cache look up table*/
1529                                         pthread_mutex_lock(&pflookup.lock);
1530                                         /* Broadcast signal on prefetch cache condition variable */ 
1531                                         pthread_cond_broadcast(&pflookup.cond);
1532                                         /* Unlock the Prefetch Cache look up table*/
1533                                         pthread_mutex_unlock(&pflookup.lock);
1534                                 } else if(buffer[index] == OBJECT_NOT_FOUND) {
1535                                         /* Increment it to get the object */
1536                                         /* TODO: For each object not found query DHT for new location and retrieve the object */
1537                                         index += sizeof(char);
1538                                         memcpy(&oid, buffer + index, sizeof(unsigned int));
1539                                         index += sizeof(unsigned int);
1540                                         /* Throw an error */
1541                                         printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
1542                                         exit(-1);
1543                                 } else {
1544                                         printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__);
1545                                         return;
1546                                 }
1547                         }
1548
1549                         i++;
1550                 }
1551         } else
1552                 printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
1553         return;
1554 }
1555
1556 unsigned short getObjType(unsigned int oid)
1557 {
1558         objheader_t *objheader;
1559         unsigned short numoffsets = 0;
1560
1561         if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL)
1562         {
1563                 if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
1564                 {
1565                         prefetch(1, &oid, &numoffsets, NULL);
1566                         pthread_mutex_lock(&pflookup.lock);
1567                         while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
1568                         {
1569                                 pthread_cond_wait(&pflookup.cond, &pflookup.lock);
1570                         }
1571                         pthread_mutex_unlock(&pflookup.lock);
1572                 }
1573         }
1574
1575         return TYPE(objheader);
1576 }
1577
1578 int startRemoteThread(unsigned int oid, unsigned int mid)
1579 {
1580         int sock;
1581         struct sockaddr_in remoteAddr;
1582         char msg[1 + sizeof(unsigned int)];
1583         int bytesSent;
1584         int status;
1585
1586         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1587         {
1588                 perror("startRemoteThread():socket()");
1589                 return -1;
1590         }
1591
1592         bzero(&remoteAddr, sizeof(remoteAddr));
1593         remoteAddr.sin_family = AF_INET;
1594         remoteAddr.sin_port = htons(LISTEN_PORT);
1595         remoteAddr.sin_addr.s_addr = htonl(mid);
1596         
1597         if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0)
1598         {
1599                 printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1600                         inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1601                 status = -1;
1602         }
1603         else
1604         {
1605                 msg[0] = START_REMOTE_THREAD;
1606                 memcpy(&msg[1], &oid, sizeof(unsigned int));
1607
1608                 bytesSent = send(sock, msg, 1 + sizeof(unsigned int), 0);
1609                 if (bytesSent < 0)
1610                 {
1611                         perror("startRemoteThread():send()");
1612                         status = -1;
1613                 }
1614                 else if (bytesSent != 1 + sizeof(unsigned int))
1615                 {
1616                         printf("startRemoteThread(): error, sent %d bytes\n", bytesSent);
1617                         status = -1;
1618                 }
1619                 else
1620                 {
1621                         status = 0;
1622                 }
1623         }
1624
1625         close(sock);
1626         return status;
1627 }
1628
1629 //TODO: when reusing oids, make sure they are not already in use!
1630 unsigned int getNewOID(void) {
1631         static unsigned int id = 0xFFFFFFFF;
1632         
1633         id += 2;
1634         if (id > oidMax || id < oidMin)
1635         {
1636                 id = (oidMin | 1);
1637         }
1638         return id;
1639 }
1640
1641 int processConfigFile()
1642 {
1643         FILE *configFile;
1644         const int maxLineLength = 200;
1645         char lineBuffer[maxLineLength];
1646         char *token;
1647         const char *delimiters = " \t\n";
1648         char *commentBegin;
1649         in_addr_t tmpAddr;
1650         
1651         configFile = fopen(CONFIG_FILENAME, "r");
1652         if (configFile == NULL)
1653         {
1654                 printf("error opening %s:\n", CONFIG_FILENAME);
1655                 perror("");
1656                 return -1;
1657         }
1658
1659         numHostsInSystem = 0;
1660         sizeOfHostArray = 8;
1661         hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1662         
1663         while(fgets(lineBuffer, maxLineLength, configFile) != NULL)
1664         {
1665                 commentBegin = strchr(lineBuffer, '#');
1666                 if (commentBegin != NULL)
1667                         *commentBegin = '\0';
1668                 token = strtok(lineBuffer, delimiters);
1669                 while (token != NULL)
1670                 {
1671                         tmpAddr = inet_addr(token);
1672                         if ((int)tmpAddr == -1)
1673                         {
1674                                 printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1675                                 fclose(configFile);
1676                                 return -1;
1677                         }
1678                         else
1679                                 addHost(htonl(tmpAddr));
1680                         token = strtok(NULL, delimiters);
1681                 }
1682         }
1683
1684         fclose(configFile);
1685         
1686         if (numHostsInSystem < 1)
1687         {
1688                 printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1689                 return -1;
1690         }
1691 #ifdef MAC
1692         myIpAddr = getMyIpAddr("en1");
1693 #else
1694         myIpAddr = getMyIpAddr("eth0");
1695 #endif
1696         myIndexInHostArray = findHost(myIpAddr);
1697         if (myIndexInHostArray == -1)
1698         {
1699                 printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1700                 return -1;
1701         }
1702         oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1703         oidMin = oidsPerBlock * myIndexInHostArray;
1704         if (myIndexInHostArray == numHostsInSystem - 1)
1705                 oidMax = 0xFFFFFFFF;
1706         else
1707                 oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1708
1709         return 0;
1710 }
1711
1712 void addHost(unsigned int hostIp)
1713 {
1714         unsigned int *tmpArray;
1715
1716         if (findHost(hostIp) != -1)
1717                 return;
1718
1719         if (numHostsInSystem == sizeOfHostArray)
1720         {
1721                 tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1722                 memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1723                 free(hostIpAddrs);
1724                 hostIpAddrs = tmpArray;
1725         }
1726
1727         hostIpAddrs[numHostsInSystem++] = hostIp;
1728
1729         return;
1730 }
1731
1732 int findHost(unsigned int hostIp)
1733 {
1734         int i;
1735         for (i = 0; i < numHostsInSystem; i++)
1736                 if (hostIpAddrs[i] == hostIp)
1737                         return i;
1738
1739         //not found
1740         return -1;
1741 }
1742