my changes
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "machinepile.h"
5 #include "mlookup.h"
6 #include "llookup.h"
7 #include "plookup.h"
8 #include "prelookup.h"
9 #include "queue.h"
10 #include <pthread.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <netdb.h>
14 #include <netinet/in.h>
15 #include <sys/types.h>
16 #include <unistd.h>
17 #include <errno.h>
18 #include <time.h>
19 #include <string.h>
20 #include <pthread.h>
21
22 #define LISTEN_PORT 2156
23 #define RECEIVE_BUFFER_SIZE 2048
24 #define NUM_THREADS 10
25 #define PREFETCH_CACHE_SIZE 1048576 //1MB
26
27 /* Global Variables */
28 extern int classsize[];
29 extern primarypfq_t pqueue; // shared prefetch queue
30 extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
31 objstr_t *prefetchcache; //Global Prefetch cache 
32 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
33 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
34 pthread_t tPrefetch;
35 extern objstr_t *mainobjstore;
36
37 plistnode_t *createPiles(transrecord_t *);
38 inline int arrayLength(int *array) {
39         int i;
40         for(i=0 ;array[i] != -1; i++)
41                 ;
42         return i;
43 }
44 inline int findmax(int *array, int arraylength) {
45         int max, i;
46         max = array[0];
47         for(i = 0; i < arraylength; i++){
48                 if(array[i] > max) {
49                         max = array[i];
50                 }
51         }
52         return max;
53 }
54 /* This function is a prefetch call generated by the compiler that
55  * populates the shared primary prefetch queue*/
56 void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) {
57         int qnodesize;
58         int len = 0;
59
60         /* Allocate for the queue node*/
61         char *node;
62         qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); 
63         if((node = calloc(1, qnodesize)) == NULL) {
64                 printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
65                 return;
66         }
67         /* Set queue node values */
68         len = sizeof(prefetchqelem_t);
69         memcpy(node + len, &ntuples, sizeof(int));
70         len += sizeof(int);
71         memcpy(node + len, oids, ntuples*sizeof(unsigned int));
72         len += ntuples * sizeof(unsigned int);
73         memcpy(node + len, endoffsets, ntuples*sizeof(short));
74         len += ntuples * sizeof(short);
75         memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
76         /* Lock and insert into primary prefetch queue */
77         pthread_mutex_lock(&pqueue.qlock);
78         pre_enqueue((prefetchqelem_t *)node);
79         pthread_cond_signal(&pqueue.qcond);
80         pthread_mutex_unlock(&pqueue.qlock);
81 }
82
83 static int objid=1;
84 /* This function allocates an object */
85 void * dstmalloc(int size) {
86   objheader_t * newobj=(objheader_t *)objstrAlloc(mainobjstore, size+sizeof(objheader_t));
87   OID(newobj)=objid;
88   newobj->version=1;
89   newobj->rcount=0;
90   STATUS(newobj)=NEW;
91   objid+=2;
92   return newobj;
93 }
94
95 /* This function starts up the transaction runtime. */
96 int dstmStartup(const char * option) {
97   pthread_t thread_Listen;
98   pthread_attr_t attr;
99   int master=strcmp(option, "master")==0;
100
101   dstmInit();
102   transInit();
103
104   if (master) {
105     pthread_attr_init(&attr);
106     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
107     pthread_create(&thread_Listen, &attr, dstmListen, NULL);
108     return 1;
109   } else {
110     dstmListen();
111     return 0;
112   }
113
114 }
115
116
117 /* This function initiates the prefetch thread
118  * A queue is shared between the main thread of execution
119  * and the prefetch thread to process the prefetch call
120  * Call from compiler populates the shared queue with prefetch requests while prefetch thread
121  * processes the prefetch requests */
122 void transInit() {
123         int t, rc;
124         //Create and initialize prefetch cache structure
125         prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
126         //Create prefetch cache lookup table
127         if(prehashCreate(HASH_SIZE, LOADFACTOR))
128                 return; //Failure
129         //Initialize primary shared queue
130         queueInit();
131         //Initialize machine pile w/prefetch oids and offsets shared queue
132         mcpileqInit();
133         //Create the primary prefetch thread 
134         pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
135         //Create and Initialize a pool of threads 
136         /* Threads are active for the entire period runtime is running */
137         for(t = 0; t< NUM_THREADS; t++) {
138                 rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
139                 if (rc) {
140                         printf("Thread create error %s, %d\n", __FILE__, __LINE__);
141                         return;
142                 }
143         }
144 }
145
146 /* This function stops the threads spawned */
147 void transExit() {
148         int t;
149         pthread_cancel(tPrefetch);
150         for(t = 0; t < NUM_THREADS; t++)
151                 pthread_cancel(wthreads[t]);
152
153         return;
154 }
155
156 /* This functions inserts randowm wait delays in the order of msec
157  * Mostly used when transaction commits retry*/
158 void randomdelay(void)
159 {
160         struct timespec req, rem;
161         time_t t;
162
163         t = time(NULL);
164         req.tv_sec = 0;
165         req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
166         nanosleep(&req, &rem);
167         return;
168 }
169
170 /* This function initializes things required in the transaction start*/
171 transrecord_t *transStart()
172 {
173         transrecord_t *tmp = malloc(sizeof(transrecord_t));
174         tmp->cache = objstrCreate(1048576);
175         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
176 #ifdef COMPILER
177         tmp->revertlist=NULL;
178 #endif
179         return tmp;
180 }
181
182 /* This function finds the location of the objects involved in a transaction
183  * and returns the pointer to the object if found in a remote location */
184 objheader_t *transRead(transrecord_t *record, unsigned int oid) {       
185         printf("Inside transaction read call\n");
186         unsigned int machinenumber;
187         objheader_t *tmp, *objheader;
188         void *objcopy;
189         int size, rc, found = 0;
190         void *buf;
191         struct timespec ts;
192         struct timeval tp;
193         
194         rc = gettimeofday(&tp, NULL);
195
196         /* Convert from timeval to timespec */
197         ts.tv_nsec = tp.tv_usec * 1000;
198
199         /* Search local transaction cache */
200         if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
201                 printf("Inside transaction cache \n");
202                 return(objheader);
203         } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
204                 /* Look up in machine lookup table  and copy  into cache*/
205                 printf("Inside mainobject store \n");
206                 tmp = mhashSearch(oid);
207                 size = sizeof(objheader_t)+classsize[TYPE(tmp)];
208                 objcopy = objstrAlloc(record->cache, size);
209                 memcpy(objcopy, (void *)objheader, size);
210                 /* Insert into cache's lookup table */
211                 chashInsert(record->lookupTable, OID(objheader), objcopy); 
212                 return(objcopy);
213         } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
214                 printf("Inside prefetch cache \n");
215                 found = 1;
216                 size = sizeof(objheader_t)+classsize[TYPE(tmp)];
217                 objcopy = objstrAlloc(record->cache, size);
218                 memcpy(objcopy, (void *)tmp, size);
219                 /* Insert into cache's lookup table */
220                 chashInsert(record->lookupTable, OID(tmp), objcopy); 
221                 return(objcopy);
222         } else { /* If not found anywhere, then block until object appears in prefetch cache */
223                 printf("Inside remote machine\n");
224                 pthread_mutex_lock(&pflookup.lock);
225                 while(!found) {
226                         rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
227                         if(rc == ETIMEDOUT) {
228                                 printf("Wait timed out\n");
229                                 /* Check Prefetch cache again */
230                                 if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
231                                         found = 1;
232                                         size = sizeof(objheader_t)+classsize[TYPE(tmp)];
233                                         objcopy = objstrAlloc(record->cache, size);
234                                         memcpy(objcopy, (void *)tmp, size);
235                                         /* Insert into cache's lookup table */
236                                         chashInsert(record->lookupTable, OID(tmp), objcopy); 
237                                         return(objcopy);
238                                 } else {
239                                         pthread_mutex_unlock(&pflookup.lock);
240                                         break;
241                                 }
242                                 pthread_mutex_unlock(&pflookup.lock);
243                         }
244                 }
245                 /* Get the object from the remote location */
246                 machinenumber = lhashSearch(oid);
247                 objcopy = getRemoteObj(record, machinenumber, oid);
248                 if(objcopy == NULL) {
249                         //If object is not found in Remote location
250                         //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
251                         return NULL;
252                 }
253                 else {
254                         //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
255                         return(objcopy);
256                 }
257         } 
258 }
259
260 /* This function creates objects in the transaction record */
261 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
262 {
263         objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
264         OID(tmp) = getNewOID();
265         TYPE(tmp) = type;
266         tmp->version = 1;
267         tmp->rcount = 0; //? not sure how to handle this yet
268         STATUS(tmp) = NEW;
269         chashInsert(record->lookupTable, OID(tmp), tmp);
270         return tmp;
271 }
272
273 /* This function creates machine piles based on all machines involved in a
274  * transaction commit request */
275 plistnode_t *createPiles(transrecord_t *record) {
276         int i = 0;
277         unsigned int size;/* Represents number of bins in the chash table */
278         chashlistnode_t *curr, *ptr, *next;
279         plistnode_t *pile = NULL;
280         unsigned int machinenum;
281         void *localmachinenum;
282         objheader_t *headeraddr;
283
284         ptr = record->lookupTable->table;
285         size = record->lookupTable->size;
286
287         for(i = 0; i < size ; i++) {
288                 curr = &ptr[i];
289                 /* Inner loop to traverse the linked list of the cache lookupTable */
290                 while(curr != NULL) {
291                         //if the first bin in hash table is empty
292                         if(curr->key == 0) {
293                                 break;
294                         }
295                         next = curr->next;
296                         //Get machine location for object id
297
298                         if ((machinenum = lhashSearch(curr->key)) == 0) {
299                                 printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
300                                 return NULL;
301                         }
302
303                         if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
304                                 printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
305                                 return NULL;
306                         }
307                         //Make machine groups
308                         if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
309                                 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
310                                 return NULL;
311                         }
312
313                         /* Check if local or not */
314                         if((localmachinenum = mhashSearch(curr->key)) != NULL) { 
315                                 /* Set the pile->local flag*/
316                                 pile->local = 1; //True i.e. local
317                         }
318
319                         curr = next;
320                 }
321         }
322         return pile; 
323 }
324
325 /* This function initiates the transaction commit process
326  * Spawns threads for each of the new connections with Participants 
327  * and creates new piles by calling the createPiles(),
328  * Fills the piles with necesaary information and 
329  * Sends a transrequest() to each pile*/
330 int transCommit(transrecord_t *record) {        
331         unsigned int tot_bytes_mod, *listmid;
332         plistnode_t *pile, *pile_ptr;
333         int i, rc, val;
334         int pilecount = 0, offset, threadnum = 0, trecvcount = 0, tmachcount = 0;
335         char buffer[RECEIVE_BUFFER_SIZE],control;
336         char transid[TID_LEN];
337         trans_req_data_t *tosend;
338         trans_commit_data_t transinfo;
339         static int newtid = 0;
340         char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
341         char localstat = 0;
342
343
344         /* Look through all the objects in the transaction record and make piles 
345          * for each machine involved in the transaction*/
346         pile_ptr = pile = createPiles(record);
347
348         /* Create the packet to be sent in TRANS_REQUEST */
349
350         /* Count the number of participants */
351         pilecount = pCount(pile);
352
353         /* Create a list of machine ids(Participants) involved in transaction   */
354         if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
355                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
356                 return 1;
357         }               
358         pListMid(pile, listmid);
359
360
361         /* Initialize thread variables,
362          * Spawn a thread for each Participant involved in a transaction */
363         pthread_t thread[pilecount];
364         pthread_attr_t attr;                    
365         pthread_cond_t tcond;
366         pthread_mutex_t tlock;
367         pthread_mutex_t tlshrd;
368
369         thread_data_array_t *thread_data_array;
370         thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
371         local_thread_data_array_t *ltdata;
372         if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
373                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
374                 return 1;
375         }
376
377         thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
378
379         /* Initialize and set thread detach attribute */
380         pthread_attr_init(&attr);
381         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
382         pthread_mutex_init(&tlock, NULL);
383         pthread_cond_init(&tcond, NULL);
384
385         /* Process each machine pile */
386         while(pile != NULL) {
387                 //Create transaction id
388                 newtid++;
389                 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
390                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
391                         return 1;
392                 }
393                 tosend->f.control = TRANS_REQUEST;
394                 sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
395                 tosend->f.mcount = pilecount;
396                 tosend->f.numread = pile->numread;
397                 tosend->f.nummod = pile->nummod;
398                 tosend->f.sum_bytes = pile->sum_bytes;
399                 tosend->listmid = listmid;
400                 tosend->objread = pile->objread;
401                 tosend->oidmod = pile->oidmod;
402                 thread_data_array[threadnum].thread_id = threadnum;
403                 thread_data_array[threadnum].mid = pile->mid;
404                 thread_data_array[threadnum].pilecount = pilecount;
405                 thread_data_array[threadnum].buffer = tosend;
406                 thread_data_array[threadnum].recvmsg = rcvd_control_msg;
407                 thread_data_array[threadnum].threshold = &tcond;
408                 thread_data_array[threadnum].lock = &tlock;
409                 thread_data_array[threadnum].count = &trecvcount;
410                 thread_data_array[threadnum].replyctrl = &treplyctrl;
411                 thread_data_array[threadnum].replyretry = &treplyretry;
412                 thread_data_array[threadnum].rec = record;
413                 /* If local do not create any extra connection */
414                 if(pile->local != 1) { /* Not local */
415                         rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]);  
416                         if (rc) {
417                                 perror("Error in pthread create\n");
418                                 return 1;
419                         }
420                 } else { /*Local*/
421                         /*Unset the pile->local flag*/
422                         pile->local = 0;
423                         /*Set flag to identify that Local machine is involved*/
424                         ltdata->tdata = &thread_data_array[threadnum];
425                         ltdata->transinfo = &transinfo;
426                         val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata);
427                         if (val) {
428                                 perror("Error in pthread create\n");
429                                 return 1;
430                         }
431                 }
432                 threadnum++;            
433                 pile = pile->next;
434         }
435
436         /* Free attribute and wait for the other threads */
437         pthread_attr_destroy(&attr);
438         for (i = 0 ;i < pilecount ; i++) {
439                 rc = pthread_join(thread[i], NULL);
440                 if (rc)
441                 {
442                         printf("ERROR return code from pthread_join() is %d\n", rc);
443                         return 1;
444                 }
445                 free(thread_data_array[i].buffer);
446         }
447
448         /* Free resources */    
449         pthread_cond_destroy(&tcond);
450         pthread_mutex_destroy(&tlock);
451         free(listmid);
452         pDelete(pile_ptr);
453         free(thread_data_array);
454         free(ltdata);
455
456         /* Retry trans commit procedure if not sucessful in the first try */
457         if(treplyretry == 1) {
458                 /* wait a random amount of time */
459                 randomdelay();
460                 /* Retry the commiting transaction again */
461                 transCommit(record);
462         }
463
464         return 0;
465 }
466
467 /* This function sends information involved in the transaction request and 
468  * accepts a response from particpants.
469  * It calls decideresponse() to decide on what control message 
470  * to send next and sends the message using sendResponse()*/
471 void *transRequest(void *threadarg) {
472         int sd, i, n;
473         struct sockaddr_in serv_addr;
474         struct hostent *server;
475         thread_data_array_t *tdata;
476         objheader_t *headeraddr;
477         char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
478         char machineip[16], retval;
479
480         tdata = (thread_data_array_t *) threadarg;
481
482         /* Send Trans Request */
483         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
484                 perror("Error in socket for TRANS_REQUEST\n");
485                 return NULL;
486         }
487         bzero((char*) &serv_addr, sizeof(serv_addr));
488         serv_addr.sin_family = AF_INET;
489         serv_addr.sin_port = htons(LISTEN_PORT);
490         midtoIP(tdata->mid,machineip);
491         machineip[15] = '\0';
492         serv_addr.sin_addr.s_addr = inet_addr(machineip);
493         /* Open Connection */
494         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
495                 perror("Error in connect for TRANS_REQUEST\n");
496                 return NULL;
497         }
498
499         printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
500         /* Send bytes of data with TRANS_REQUEST control message */
501         if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
502                 perror("Error sending fixed bytes for thread\n");
503                 return NULL;
504         }
505         /* Send list of machines involved in the transaction */
506         {
507                 int size=sizeof(unsigned int)*tdata->pilecount;
508                 if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
509                         perror("Error sending list of machines for thread\n");
510                         return NULL;
511                 }
512         }
513         /* Send oids and version number tuples for objects that are read */
514         {
515                 int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
516                 if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
517                         perror("Error sending tuples for thread\n");
518                         return NULL;
519                 }
520         }
521         /* Send objects that are modified */
522         for(i = 0; i < tdata->buffer->f.nummod ; i++) {
523                 int size;
524                 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
525                 size=sizeof(objheader_t)+classsize[TYPE(headeraddr)];
526                 if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
527                         perror("Error sending obj modified for thread\n");
528                         return NULL;
529                 }
530         }
531
532         /* Read control message from Participant */
533         if((n = read(sd, &control, sizeof(char))) <= 0) {
534                 perror("Error in reading control message from Participant\n");
535                 return NULL;
536         }
537         recvcontrol = control;
538
539         /* Update common data structure and increment count */
540         tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
541
542         /* Lock and update count */
543         //Thread sleeps until all messages from pariticipants are received by coordinator
544         pthread_mutex_lock(tdata->lock);
545
546         (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
547
548         /* Wake up the threads and invoke decideResponse (once) */
549         if(*(tdata->count) == tdata->pilecount) {
550                 if (decideResponse(tdata) != 0) { 
551                         printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
552                         pthread_mutex_unlock(tdata->lock);
553                         close(sd);
554                         return NULL;
555                 }
556                 pthread_cond_broadcast(tdata->threshold);
557         } else {
558                 pthread_cond_wait(tdata->threshold, tdata->lock);
559         }
560         pthread_mutex_unlock(tdata->lock);
561
562         /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
563          * to all participants in their respective socket */
564         if (sendResponse(tdata, sd) == 0) { 
565                 printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
566                 pthread_mutex_unlock(tdata->lock);
567                 close(sd);
568                 return NULL;
569         }
570
571         /* Close connection */
572         close(sd);
573         pthread_exit(NULL);
574 }
575
576 /* This function decides the reponse that needs to be sent to 
577  * all Participant machines involved in the transaction commit */
578 int decideResponse(thread_data_array_t *tdata) {
579         char control;
580         int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
581                                                                          message to send */
582
583         //Check common data structure 
584         for (i = 0 ; i < tdata->pilecount ; i++) {
585                 /*Switch on response from Participant */
586                 control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
587                                                            written onto the shared array */
588                 switch(control) {
589                         case TRANS_DISAGREE:
590                                 printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
591                                 transdisagree++;
592                                 break;
593
594                         case TRANS_AGREE:
595                                 printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
596                                 transagree++;
597                                 break;
598
599                         case TRANS_SOFT_ABORT:
600                                 printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
601                                 transsoftabort++;
602                                 break;
603                         default:
604                                 printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
605                                 return -1;
606                 }
607         }
608
609         /* Decide what control message to send to Participant */        
610         if(transdisagree > 0) {
611                 /* Send Abort */
612                 *(tdata->replyctrl) = TRANS_ABORT;
613                 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
614                 /* Free resources */
615                 objstrDelete(tdata->rec->cache);
616                 chashDelete(tdata->rec->lookupTable);
617                 free(tdata->rec);
618         } else if(transagree == tdata->pilecount){
619                 /* Send Commit */
620                 *(tdata->replyctrl) = TRANS_COMMIT;
621                 printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
622                 /* Free resources */
623                 objstrDelete(tdata->rec->cache);
624                 chashDelete(tdata->rec->lookupTable);
625                 free(tdata->rec);
626         } else if(transsoftabort > 0 && transdisagree == 0) {
627                 /* Send Abort in soft abort case followed by retry commiting transaction again*/
628                 *(tdata->replyctrl) = TRANS_ABORT;
629                 *(tdata->replyretry) = 1;
630                 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
631         } else {
632                 printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
633                 return -1;
634         }
635
636         return 0;
637 }
638 /* This function sends the final response to remote machines per thread in their respective socket id */
639 char sendResponse(thread_data_array_t *tdata, int sd) {
640         int n, N, sum, oidcount = 0;
641         char *ptr, retval = 0;
642         unsigned int *oidnotfound;
643
644         /* If the decided response is due to a soft abort and missing objects at the Participant's side */
645         if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
646                 /* Read list of objects missing */
647                 if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
648                         N = oidcount * sizeof(unsigned int);
649                         if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
650                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
651                         }
652                         ptr = (char *) oidnotfound;
653                         do {
654                                 n = read(sd, ptr+sum, N-sum);
655                                 sum += n;
656                         } while(sum < N && n !=0);
657                 }
658                 retval =  TRANS_SOFT_ABORT;
659         }
660         /* If the decided response is TRANS_ABORT */
661         if(*(tdata->replyctrl) == TRANS_ABORT) {
662                 retval = TRANS_ABORT;
663         } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
664                 retval = TRANS_COMMIT;
665         }
666         /* Send response to the Participant */
667         if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
668                 perror("Error sending ctrl message for participant\n");
669         }
670
671         return retval;
672 }
673
674 /* This function opens a connection, places an object read request to the 
675  * remote machine, reads the control message and object if available  and 
676  * copies the object and its header to the local cache.
677  * TODO replace mnum and midtoIP() with MACHINE_IP address later */ 
678
679 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
680         int sd, size, val;
681         struct sockaddr_in serv_addr;
682         struct hostent *server;
683         char control;
684         char machineip[16];
685         objheader_t *h;
686         void *objcopy;
687
688         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
689                 perror("Error in socket\n");
690                 return NULL;
691         }
692         bzero((char*) &serv_addr, sizeof(serv_addr));
693         serv_addr.sin_family = AF_INET;
694         serv_addr.sin_port = htons(LISTEN_PORT);
695         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
696         midtoIP(mnum,machineip);
697         machineip[15] = '\0';
698         serv_addr.sin_addr.s_addr = inet_addr(machineip);
699         /* Open connection */
700         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
701                 perror("Error in connect\n");
702                 return NULL;
703         }
704         char readrequest[sizeof(char)+sizeof(unsigned int)];
705         readrequest[0] = READ_REQUEST;
706         *((unsigned int *)(&readrequest[1])) = oid;
707         if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
708                 perror("Error sending message\n");
709                 return NULL;
710         }
711
712 #ifdef DEBUG1
713         printf("DEBUG -> ready to rcv ...\n");
714 #endif
715         /* Read response from the Participant */
716         if((val = read(sd, &control, sizeof(char))) <= 0) {
717                 perror("No control response for getRemoteObj sent\n");
718                 return NULL;
719         }
720         switch(control) {
721                 case OBJECT_NOT_FOUND:
722                         printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
723                         return NULL;
724                 case OBJECT_FOUND:
725                         /* Read object if found into local cache */
726                         if((val = read(sd, &size, sizeof(int))) <= 0) {
727                                 perror("No size is read from the participant\n");
728                                 return NULL;
729                         }
730                         objcopy = objstrAlloc(record->cache, size);
731                         if((val = read(sd, objcopy, size)) <= 0) {
732                                 perror("No objects are read from the remote participant\n");
733                                 return NULL;
734                         }
735                         /* Insert into cache's lookup table */
736                         chashInsert(record->lookupTable, oid, objcopy); 
737                         break;
738                 default:
739                         printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
740                         return NULL;
741         }
742         /* Close connection */
743         close(sd);
744         return objcopy;
745 }
746
747 /*This function handles the local trans requests involved in a transaction commiting process
748  * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT
749  * Activates the other nonlocal threads that are waiting for the decision and the
750  * based on common decision by all groups involved in the transaction it 
751  * either commits or aborts the transaction.
752  * It also frees the calloced memory resources
753  */
754
755 void *handleLocalReq(void *threadarg) {
756         int val, i = 0;
757         short version;
758         char control = 0, *ptr;
759         unsigned int oid;
760         unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL;
761         void *mobj, *modptr;
762         objheader_t *headptr;
763         local_thread_data_array_t *localtdata;
764
765         localtdata = (local_thread_data_array_t *) threadarg;
766
767         /* Counters and arrays to formulate decision on control message to be sent */
768         oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
769         oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
770         oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int));
771         int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
772         int objmodnotfound = 0, nummodfound = 0;
773
774         /* modptr points to the beginning of the object store 
775          * created at the Pariticipant */ 
776         if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
777                 printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
778                 return NULL;
779         }
780
781         ptr = modptr;
782
783         /* Process each oid in the machine pile/ group per thread */
784         for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
785                 if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified
786                         int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
787                         incr *= i;
788                         oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr));
789                         incr += sizeof(unsigned int);
790                         version = *((short *)(localtdata->tdata->buffer->objread + incr));
791                 } else {//Objs modified
792                         headptr = (objheader_t *) ptr;
793                         oid = OID(headptr);
794                         oidmod[objmod] = oid;//Array containing modified oids
795                         objmod++;
796                         version = headptr->version;
797                         ptr += sizeof(objheader_t) + classsize[TYPE(headptr)];
798                 }
799
800                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
801
802                 /* Save the oids not found and number of oids not found for later use */
803                 if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
804                         /* Save the oids not found and number of oids not found for later use */
805
806                         oidnotfound[objnotfound] = OID(((objheader_t *)mobj));
807                         objnotfound++;
808                 } else { /* If Obj found in machine (i.e. has not moved) */
809                         /* Check if Obj is locked by any previous transaction */
810                         if (STATUS(((objheader_t *)mobj)) & LOCK) {
811                                 if (version == ((objheader_t *)mobj)->version) {      /* If not locked then match versions */ 
812                                         v_matchlock++;
813                                 } else {/* If versions don't match ...HARD ABORT */
814                                         v_nomatch++;
815                                         /* Send TRANS_DISAGREE to Coordinator */
816                                         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
817                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
818                                         //return tdata->recvmsg[tdata->thread_id].rcv_status;  
819                                 }
820                         } else {/* If Obj is not locked then lock object */
821                                 STATUS(((objheader_t *)mobj)) |= LOCK;
822                                 //TODO Remove this for Testing
823                                 randomdelay();
824
825                                 /* Save all object oids that are locked on this machine during this transaction request call */
826                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
827                                 objlocked++;
828                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
829                                         v_matchnolock++;
830                                 } else { /* If versions don't match ...HARD ABORT */
831                                         v_nomatch++;
832                                         /* Send TRANS_DISAGREE to Coordinator */
833                                         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
834                                         printf("DEBUG -> Sending TRANS_DISAGREE\n");
835                                         //      return tdata->recvmsg[tdata->thread_id].rcv_status;  
836                                 }
837                         }
838                 }
839         }
840
841         /*Decide the response to be sent to the Coordinator( the local machine in this case)*/
842
843         /* Condition to send TRANS_AGREE */
844         if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
845                 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
846                 printf("DEBUG -> Sending TRANS_AGREE\n");
847         }
848         /* Condition to send TRANS_SOFT_ABORT */
849         if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
850                 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
851                 printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
852                 //TODO  currently the only soft abort case that is supported is when object locked by previous
853                 //transaction => v_matchlock > 0 
854                 //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported 
855                 /* Send number of oids not found and the missing oids if objects are missing in the machine */
856                 /* TODO Remember to store the oidnotfound for later use
857                    if(objnotfound != 0) {
858                    int size = sizeof(unsigned int)* objnotfound;
859                    }
860                    */
861         }
862
863         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
864          * if Participant receives a TRANS_COMMIT */
865         localtdata->transinfo->objmod = oidmod;
866         localtdata->transinfo->objlocked = oidlocked;
867         localtdata->transinfo->objnotfound = oidnotfound;
868         localtdata->transinfo->modptr = modptr;
869         localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod;
870         localtdata->transinfo->numlocked = objlocked;
871         localtdata->transinfo->numnotfound = objnotfound;
872
873         /*Set flag to show that common data structure for this individual thread has been written to */
874         //*(tdata->localstatus) |= LM_UPDATED;
875
876         /* Lock and update count */
877         //Thread sleeps until all messages from pariticipants are received by coordinator
878         pthread_mutex_lock(localtdata->tdata->lock);
879         (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
880
881         /* Wake up the threads and invoke decideResponse (once) */
882         if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) {
883                 if (decideResponse(localtdata->tdata) != 0) { 
884                         printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
885                         pthread_mutex_unlock(localtdata->tdata->lock);
886                         return NULL;
887                 }
888                 pthread_cond_broadcast(localtdata->tdata->threshold);
889         } else {
890                 pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
891         }
892         pthread_mutex_unlock(localtdata->tdata->lock);
893
894         /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
895         if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
896                 if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod, localtdata->tdata->buffer->f.numread) != 0) {
897                         printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
898                         return NULL;
899                 }
900         }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
901                 if(transComProcess(localtdata->transinfo) != 0) {
902                         printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
903                         return NULL;
904                 }
905         }
906
907         /* Free memory */
908         printf("DEBUG -> Freeing...\n");
909         fflush(stdout);
910         if (localtdata->transinfo->objmod != NULL) {
911                 free(localtdata->transinfo->objmod);
912                 localtdata->transinfo->objmod = NULL;
913         }
914         if (localtdata->transinfo->objlocked != NULL) {
915                 free(localtdata->transinfo->objlocked);
916                 localtdata->transinfo->objlocked = NULL;
917         }
918         if (localtdata->transinfo->objnotfound != NULL) {
919                 free(localtdata->transinfo->objnotfound);
920                 localtdata->transinfo->objnotfound = NULL;
921         }
922
923         pthread_exit(NULL);
924 }
925 /* This function completes the ABORT process if the transaction is aborting 
926 */
927 int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) {
928         char *ptr;
929         int i;
930         objheader_t *tmp_header;
931         void *header;
932
933         printf("DEBUG -> Recv TRANS_ABORT\n");
934         /* Set all ref counts as 1 and do garbage collection */
935         ptr = modptr;
936         for(i = 0; i< nummod; i++) {
937                 tmp_header = (objheader_t *)ptr;
938                 tmp_header->rcount = 1;
939                 ptr += sizeof(objheader_t) + classsize[TYPE(tmp_header)];
940         }
941         /* Unlock objects that was locked due to this transaction */
942         for(i = 0; i< numlocked; i++) {
943                 header = mhashSearch(objlocked[i]);// find the header address
944                 STATUS(((objheader_t *)header)) &= ~(LOCK);
945         }
946
947         /* Send ack to Coordinator */
948         printf("DEBUG-> TRANS_SUCCESSFUL\n");
949
950         /*Free the pointer */
951         ptr = NULL;
952         return 0;
953 }
954
955 /*This function completes the COMMIT process is the transaction is commiting
956 */
957 int transComProcess(trans_commit_data_t *transinfo) {
958         objheader_t *header;
959         int i = 0, offset = 0;
960         char control;
961
962         printf("DEBUG -> Recv TRANS_COMMIT\n");
963         /* Process each modified object saved in the mainobject store */
964         for(i=0; i<transinfo->nummod; i++) {
965                 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
966                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
967                 }
968                 /* Change reference count of older address and free space in objstr ?? */
969                 header->rcount = 1; //TODO Not sure what would be the val
970
971                 /* Change ptr address in mhash table */
972                 mhashRemove(transinfo->objmod[i]);
973                 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
974                 offset += sizeof(objheader_t) + classsize[TYPE(header)];
975
976                 /* Update object version number */
977                 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
978                 header->version += 1;
979         }
980
981         /* Unlock locked objects */
982         for(i=0; i<transinfo->numlocked; i++) {
983                 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
984                 STATUS(header) &= ~(LOCK);
985         }
986
987         //TODO Update location lookup table
988
989         /* Send ack to Coordinator */
990         printf("DEBUG-> TRANS_SUCESSFUL\n");
991         return 0;
992 }
993
994 /* This function checks if the prefetch oids are same and have same offsets  
995  * for case x.a.b and y.a.b where x and y have same oid's
996  * or if a.b.c is a subset of x.b.c.d*/ 
997 /* check for case where the generated request a.y.z or x.y.z.g then 
998  * prefetch needs to be generated for x.y.z.g  if oid of a and x are same*/
999 void checkPrefetchTuples(prefetchqelem_t *node) {
1000         int i,j, count,k, sindex, index;
1001         char *ptr, *tmp;
1002         int ntuples, slength;
1003         unsigned int *oid;
1004         short *endoffsets, *arryfields; 
1005
1006         /* Check for the case x.y.z and a.b.c are same oids */ 
1007         ptr = (char *) node;
1008         ntuples = *(GET_NTUPLES(ptr));
1009         oid = GET_PTR_OID(ptr);
1010         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1011         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1012         /* Find offset length for each tuple */
1013         int numoffset[ntuples];
1014         numoffset[0] = endoffsets[0];
1015         for(i = 1; i<ntuples; i++) {
1016                 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1017         }
1018         /* Check for redundant tuples by comparing oids of each tuple */
1019         for(i = 0; i < ntuples; i++) {
1020                 if(oid[i] == -1)
1021                         continue;
1022                 for(j = i+1 ; j < ntuples; j++) {
1023                         if(oid[j] == -1)
1024                                 continue;
1025                         /*If oids of tuples match */ 
1026                         if (oid[i] == oid[j]) {
1027                                 /* Find the smallest offset length of two tuples*/
1028                                 if(numoffset[i] >  numoffset[j]){
1029                                         slength = numoffset[j];
1030                                         sindex = j;
1031                                 }
1032                                 else {
1033                                         slength = numoffset[i];
1034                                         sindex = i;
1035                                 }
1036
1037                                 /* Compare the offset values based on the current indices
1038                                  * break if they do not match
1039                                  * if all offset values match then pick the largest tuple*/
1040
1041                                 if(i == 0) {
1042                                         k = 0;
1043                                         index = endoffsets[j -1];
1044                                         for(count = 0; count < slength; count ++) {
1045                                                 if (arryfields[k] != arryfields[index]) { 
1046                                                         break;
1047                                                 }
1048                                                 index++;
1049                                                 k++;
1050                                         }       
1051                                 } else {
1052                                         k = endoffsets[i-1];
1053                                         index = endoffsets[j-1];
1054                                         printf("Value of slength = %d\n", slength);
1055                                         for(count = 0; count < slength; count++) {
1056                                                 if(arryfields[k] != arryfields[index]) {
1057                                                         break;
1058                                                 }
1059                                                 index++;
1060                                                 k++;
1061                                         }
1062                                 }
1063
1064                                 if(slength == count) {
1065                                         oid[sindex] = -1;
1066                                 }
1067                         }
1068                 }
1069         }
1070 }
1071
1072 void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopcount, unsigned int objoid, int index, int iter, int oidnfound) {
1073         char *ptr, *tmp;
1074         int ntuples, i, k, flag;
1075         unsigned int * oid;
1076         short *endoffsets, *arryfields;
1077         objheader_t *header;
1078
1079         ptr = (char *) node;
1080         ntuples = *(GET_NTUPLES(ptr));
1081         oid = GET_PTR_OID(ptr);
1082         endoffsets = GET_PTR_EOFF(ptr, ntuples);
1083         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1084
1085         if(oidnfound == 1) {
1086                 if((header = (objheader_t *) prehashSearch(objoid)) == NULL) {
1087                         return;
1088                 } else { //Found in Prefetch Cache
1089                         //TODO Decide if object is too old, if old remove from cache
1090                         tmp = (char *) header;
1091                         /* Check if any of the offset oid is available in the Prefetch cache */
1092                         for(i = counter; i < loopcount; i++) {
1093                                 objoid = *(tmp + sizeof(objheader_t) + arryfields[counter]);
1094                                 if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
1095                                         flag = 0;
1096                                 } else {
1097                                         flag = 1;
1098                                         break;
1099                                 }
1100                         }
1101                 }
1102         } else {
1103                 for(i = counter; i<loopcount; i++) {
1104                         if((header = (objheader_t *)prehashSearch(objoid)) != NULL) {
1105                                 tmp = (char *) header;
1106                                 objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
1107                                 flag = 0;
1108                                 index++;
1109                         } else {
1110                                 flag = 1;
1111                                 break;
1112                         }
1113                 }
1114         }
1115
1116         /* If oid not found locally or in prefetch cache then 
1117          * assign the latest oid found as the new oid 
1118          * and copy left over offsets into the arrayoffsetfieldarray*/
1119         oid[iter] = objoid;
1120         numoffset[iter] = numoffset[iter] - (i+1);
1121         for(k = 0; k < numoffset[iter] ; k++) {
1122                 arryfields[endoffsets[counter]+k] = arryfields[endoffsets[counter]+k+1];
1123         }
1124
1125         if(flag == 0) {
1126                 oid[iter] = -1;
1127                 numoffset[iter] = 0;
1128         }
1129 }
1130
1131 /* This function makes machine piles to be added into the machine pile queue for each prefetch call */
1132 prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
1133         char *ptr, *tmp;
1134         int ntuples, slength, i, machinenum;
1135         int maxoffset;
1136         unsigned int *oid;
1137         short *endoffsets, *arryfields, *offset; 
1138         prefetchpile_t *head = NULL;
1139
1140         /* Check for the case x.y.z and a.b.c are same oids */ 
1141         ptr = (char *) node;
1142         ntuples = *(GET_NTUPLES(ptr));
1143         oid = GET_PTR_OID(ptr);
1144         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1145         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1146
1147         /* Check for redundant tuples by comparing oids of each tuple */
1148         for(i = 0; i < ntuples; i++) {
1149                 if(oid[i] == -1)
1150                         continue;
1151                 /* For each tuple make piles */
1152                 if ((machinenum = lhashSearch(oid[i])) == 0) {
1153                         printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
1154                         return NULL;
1155                 }
1156                 /* Insert into machine pile */
1157                 offset = &arryfields[endoffsets[i-1]];
1158                 insertPile(machinenum, oid[i], numoffset[i], offset, head);
1159         }
1160
1161         return head;
1162 }
1163
1164
1165 /* This function checks if the oids within the prefetch tuples are available locally.
1166  * If yes then makes the tuple invalid. If no then rearranges oid and offset values in 
1167  * the prefetchqelem_t node to represent a new prefetch tuple */
1168 prefetchpile_t *foundLocal(prefetchqelem_t *node) {
1169         int ntuples,i, j, k, oidnfound = 0, index, flag;
1170         unsigned int *oid;
1171         unsigned int  objoid;
1172         char *ptr, *tmp;
1173         objheader_t *objheader;
1174         short *endoffsets, *arryfields; 
1175         prefetchpile_t *head = NULL;
1176
1177         ptr = (char *) node;
1178         ntuples = *(GET_NTUPLES(ptr));
1179         oid = GET_PTR_OID(ptr);
1180         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1181         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1182         /* Find offset length for each tuple */
1183         int numoffset[ntuples];//Number of offsets for each tuple
1184         numoffset[0] = endoffsets[0];
1185         for(i = 1; i<ntuples; i++) {
1186                 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1187         }
1188         for(i = 0; i < ntuples; i++) { 
1189                 if(oid[i] == -1)
1190                         continue;
1191                 /* If object found locally */
1192                 if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) { 
1193                         oidnfound = 0;
1194                         tmp = (char *) objheader;
1195                         /* Find the oid of its offset value */
1196                         if(i == 0) 
1197                                 index = 0;
1198                         else 
1199                                 index = endoffsets[i - 1];
1200                         for(j = 0 ; j < numoffset[i] ; j++) {
1201                                 objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
1202                                 /*If oid found locally then 
1203                                  *assign the latest oid found as the new oid 
1204                                  *and copy left over offsets into the arrayoffsetfieldarray*/
1205                                 oid[i] = objoid;
1206                                 numoffset[i] = numoffset[i] - (j+1);
1207                                 for(k = 0; k < numoffset[i]; k++)
1208                                         arryfields[endoffsets[j]+ k] = arryfields[endoffsets[j]+k+1];
1209                                 index++;
1210                                 /*New offset oid not found */
1211                                 if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
1212                                         flag = 1;
1213                                         checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound); 
1214                                         break;
1215                                 } else 
1216                                         flag = 0;
1217                         }
1218
1219                         /*If all offset oids are found locally,make the prefetch tuple invalid */
1220                         if(flag == 0) {
1221                                 oid[i] = -1;
1222                                 numoffset[i] = 0;
1223                         }
1224                 } else {
1225                         oidnfound = 1;
1226                         /* Look in Prefetch cache */
1227                         checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); 
1228                 }
1229
1230         }
1231         /* Make machine groups */
1232         head = makePreGroups(node, numoffset);
1233         return head;
1234 }
1235
1236 /* This function is called by the thread calling transPrefetch */
1237 void *transPrefetch(void *t) {
1238         //int *offstarray = NULL;
1239         prefetchqelem_t *qnode;
1240         prefetchpile_t *pilehead = NULL;
1241
1242         while(1) {
1243                 /* lock mutex of primary prefetch queue */
1244                 pthread_mutex_lock(&pqueue.qlock);
1245                 /* while primary queue is empty, then wait */
1246                 while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
1247                         pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
1248                 }
1249
1250                 /* dequeue node to create a machine piles and  finally unlock mutex */
1251                 if((qnode = pre_dequeue()) == NULL) {
1252                         printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
1253                         return NULL;
1254                 }
1255                 pthread_mutex_unlock(&pqueue.qlock);
1256                 /* Reduce redundant prefetch requests */
1257                 checkPrefetchTuples(qnode);
1258                 /* Check if the tuples are found locally, if yes then reduce them further*/ 
1259                 /* and group requests by remote machine ids by calling the makePreGroups() */
1260                 pilehead = foundLocal(qnode);
1261
1262                 /* Lock mutex of pool queue */
1263                 pthread_mutex_lock(&mcqueue.qlock);
1264                 /* Update the pool queue with the new remote machine piles generated per prefetch call */
1265                 mcpileenqueue(pilehead);
1266                 /* Broadcast signal on machine pile queue */
1267                 pthread_cond_broadcast(&mcqueue.qcond);
1268                 /* Unlock mutex of  machine pile queue */
1269                 pthread_mutex_unlock(&mcqueue.qlock);
1270                 /* Deallocate the prefetch queue pile node */
1271                 predealloc(qnode);
1272
1273         }
1274 }
1275
1276 /* Each thread in the  pool of threads calls this function to establish connection with
1277  * remote machines, send the prefetch requests and process the reponses from
1278  * the remote machines .
1279  * The thread is active throughout the period of runtime */
1280
1281 void *mcqProcess(void *threadid) {
1282         int tid;
1283         prefetchpile_t *mcpilenode;
1284
1285         tid = (int) threadid;
1286         while(1) {
1287                 /* Lock mutex of mc pile queue */
1288                 pthread_mutex_lock(&mcqueue.qlock);
1289                 /* When mc pile queue is empty, wait */
1290                 while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
1291                         pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
1292                 }
1293                 /* Dequeue node to send remote machine connections*/
1294                 if((mcpilenode = mcpiledequeue()) == NULL) {
1295                         printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
1296                         return NULL;
1297                 }
1298                 /* Unlock mutex */
1299                 pthread_mutex_unlock(&mcqueue.qlock);
1300
1301                 /*Initiate connection to remote host and send request */ 
1302                 /* Process Request */
1303                 sendPrefetchReq(mcpilenode, tid);
1304
1305                 /* Deallocate the machine queue pile node */
1306                 mcdealloc(mcpilenode);
1307         }
1308 }
1309
1310 void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) {
1311         int sd, i, offset, off, len, endpair, numoffsets, count = 0;
1312         struct sockaddr_in serv_addr;
1313         struct hostent *server;
1314         char machineip[16], control;
1315         objpile_t *tmp;
1316
1317
1318         /* Send Trans Prefetch Request */
1319         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1320                 perror("Error in socket for TRANS_REQUEST\n");
1321                 return;
1322         }
1323         bzero((char*) &serv_addr, sizeof(serv_addr));
1324         serv_addr.sin_family = AF_INET;
1325         serv_addr.sin_port = htons(LISTEN_PORT);
1326         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
1327         midtoIP(mcpilenode->mid ,machineip);
1328         machineip[15] = '\0';
1329         serv_addr.sin_addr.s_addr = inet_addr(machineip);
1330
1331         /* Open Connection */
1332         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
1333                 perror("Error in connect for TRANS_REQUEST\n");
1334                 return;
1335         }
1336
1337         /* Send TRANS_PREFETCH control message */
1338         control = TRANS_PREFETCH;
1339         if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
1340                 perror("Error in sending prefetch control\n");
1341                 return;
1342         }
1343
1344         /* Send Oids and offsets in pairs */
1345         tmp = mcpilenode->objpiles;
1346         while(tmp != NULL) {
1347                 off = offset = 0;
1348                 count++;  // Keeps track of the number of oid and offset tuples sent per remote machine
1349                 len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1350                 char oidnoffset[len];
1351                 memcpy(oidnoffset, &len, sizeof(int));
1352                 off = sizeof(int);
1353                 memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int));
1354                 off += sizeof(unsigned int);
1355                 for(i = 0; i < numoffsets; i++) {
1356                         offset = off +  (i * sizeof(short));
1357                         memcpy(oidnoffset + offset, tmp->offset, sizeof(short));
1358                 }
1359                 if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) {
1360                         perror("Error sending fixed bytes for thread\n");
1361                         return;
1362                 }
1363                 tmp = tmp->next;
1364         }
1365
1366         /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1367         endpair = -1;
1368         if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
1369                 perror("Error sending fixed bytes for thread\n");
1370                 return;
1371         }
1372
1373         /* Get Response from the remote machine */
1374         getPrefetchResponse(count,sd);
1375         close(sd);
1376         return;
1377 }
1378
1379 void getPrefetchResponse(int count, int sd) {
1380         int i = 0, val, n, N, sum, index, objsize;
1381         unsigned int bufsize,oid;
1382         char buffer[RECEIVE_BUFFER_SIZE], control;
1383         char *ptr;
1384         void *modptr, *oldptr;
1385
1386         /* Read  prefetch response from the Remote machine */
1387         if((val = read(sd, &control, sizeof(char))) <= 0) {
1388                 perror("No control response for Prefetch request sent\n");
1389                 return;
1390         }
1391
1392         if(control == TRANS_PREFETCH_RESPONSE) {
1393                 /*For each oid and offset tuple sent as prefetch request to remote machine*/
1394                 while(i < count) {
1395                         /* Clear contents of buffer */
1396                         memset(buffer, 0, RECEIVE_BUFFER_SIZE);
1397                         sum = 0;
1398                         index = 0;
1399                         /* Read the size of buffer to be received */
1400                         if((N = read(sd, buffer, sizeof(unsigned int))) <= 0) {
1401                                 perror("Size of buffer not recv\n");
1402                                 return;
1403                         }
1404                         memcpy(&bufsize, buffer, sizeof(unsigned int));
1405                         ptr = buffer + sizeof(unsigned int);
1406                         /* Keep receiving the buffer containing oid info */ 
1407                         do {
1408                                 n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0);
1409                                 sum +=n;
1410                         } while(sum < bufsize && n != 0);
1411                         /* Decode the contents of the buffer */
1412                         index = sizeof(unsigned int);
1413                         while(index < (bufsize - sizeof(unsigned int))) {
1414                                 if(buffer[index] == OBJECT_FOUND) {
1415                                         /* Increment it to get the object */
1416                                         index += sizeof(char);
1417                                         memcpy(&oid, buffer + index, sizeof(unsigned int));
1418                                         index += sizeof(unsigned int);
1419                                         /* Lock the Prefetch Cache look up table*/
1420                                         pthread_mutex_lock(&pflookup.lock);
1421                                         /* For each object found add to Prefetch Cache */
1422                                         memcpy(&objsize, buffer + index, sizeof(int));
1423                                         if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
1424                                                 printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1425                                                 return;
1426                                         }
1427                                         memcpy(modptr, buffer+index, objsize);
1428                                         index += sizeof(int);
1429                                         /* Insert the oid and its address into the prefetch hash lookup table */
1430                                         /* Do a version comparison if the oid exists */
1431                                         if((oldptr = prehashSearch(oid)) != NULL) {
1432                                                 /* If older version then update with new object ptr */
1433                                                 if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
1434                                                         prehashRemove(oid);
1435                                                         prehashInsert(oid, modptr);
1436                                                 } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { 
1437                                                         /* Add the new object ptr to hash table */
1438                                                         prehashInsert(oid, modptr);
1439                                                 } else { /* Do nothing */
1440                                                         ;
1441                                                 }
1442                                         } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
1443                                                 prehashInsert(oid, modptr);
1444                                         }
1445                                         /* Broadcast signal on prefetch cache condition variable */ 
1446                                         pthread_cond_broadcast(&pflookup.cond);
1447                                         /* Unlock the Prefetch Cache look up table*/
1448                                         pthread_mutex_unlock(&pflookup.lock);
1449                                 } else if(buffer[index] == OBJECT_NOT_FOUND) {
1450                                         /* Increment it to get the object */
1451                                         /* TODO: For each object not found query DHT for new location and retrieve the object */
1452                                         index += sizeof(char);
1453                                         memcpy(&oid, buffer + index, sizeof(unsigned int));
1454                                         index += sizeof(unsigned int);
1455                                         /* Throw an error */
1456                                         printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
1457                                         exit(-1);
1458                                 } else 
1459                                         printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__);
1460                         }
1461
1462                         i++;
1463                 }
1464         } else
1465                 printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
1466         return;
1467 }