7c1b2fc53b3302358a85e95bb8be78418f804226
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #include "dsmlock.h"
14 #include "prefetch.h"
15 #ifdef COMPILER
16 #include "thread.h"
17 #endif
18 #ifdef ABORTREADERS
19 #include "abortreaders.h"
20 #endif
21
22 #define NUM_THREADS 1
23 #define CONFIG_FILENAME "dstm.conf"
24
25
26 /* Global Variables */
27 extern int classsize[];
28 pfcstats_t *evalPrefetch;
29 extern int numprefetchsites; //Global variable containing number of prefetch sites
30 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
31 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
32 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
33 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
34 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
35 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
36 extern objstr_t *mainobjstore;
37 unsigned int myIpAddr;
38 unsigned int *hostIpAddrs;
39 int sizeOfHostArray;
40 int numHostsInSystem;
41 int myIndexInHostArray;
42 unsigned int oidsPerBlock;
43 unsigned int oidMin;
44 unsigned int oidMax;
45
46 sockPoolHashTable_t *transReadSockPool;
47 sockPoolHashTable_t *transPrefetchSockPool;
48 sockPoolHashTable_t *transRequestSockPool;
49 pthread_mutex_t notifymutex;
50 pthread_mutex_t atomicObjLock;
51
52 /***********************************
53  * Global Variables for statistics
54  **********************************/
55 int numTransCommit = 0;
56 int numTransAbort = 0;
57 int nchashSearch = 0;
58 int nmhashSearch = 0;
59 int nprehashSearch = 0;
60 int nRemoteSend = 0;
61 int nSoftAbort = 0;
62 int bytesSent = 0;
63 int bytesRecv = 0;
64
65 void printhex(unsigned char *, int);
66 plistnode_t *createPiles(transrecord_t *);
67 plistnode_t *sortPiles(plistnode_t *pileptr);
68
69 /*******************************
70 * Send and Recv function calls
71 *******************************/
72 void send_data(int fd, void *buf, int buflen) {
73   char *buffer = (char *)(buf);
74   int size = buflen;
75   int numbytes;
76   while (size > 0) {
77     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
78     bytesSent = bytesSent + numbytes;
79     if (numbytes == -1) {
80       perror("send");
81       exit(0);
82     }
83     buffer += numbytes;
84     size -= numbytes;
85   }
86 }
87
88 void recv_data(int fd, void *buf, int buflen) {
89   char *buffer = (char *)(buf);
90   int size = buflen;
91   int numbytes;
92   while (size > 0) {
93     numbytes = recv(fd, buffer, size, 0);
94     bytesRecv = bytesRecv + numbytes;
95     if (numbytes == -1) {
96       perror("recv");
97       exit(0);
98     }
99     buffer += numbytes;
100     size -= numbytes;
101   }
102 }
103
104 int recv_data_errorcode(int fd, void *buf, int buflen) {
105   char *buffer = (char *)(buf);
106   int size = buflen;
107   int numbytes;
108   while (size > 0) {
109     numbytes = recv(fd, buffer, size, 0);
110     if (numbytes==0)
111       return 0;
112     if (numbytes == -1) {
113       perror("recv");
114       return -1;
115     }
116     buffer += numbytes;
117     size -= numbytes;
118   }
119   return 1;
120 }
121
122 void printhex(unsigned char *ptr, int numBytes) {
123   int i;
124   for (i = 0; i < numBytes; i++) {
125     if (ptr[i] < 16)
126       printf("0%x ", ptr[i]);
127     else
128       printf("%x ", ptr[i]);
129   }
130   printf("\n");
131   return;
132 }
133
134 inline int arrayLength(int *array) {
135   int i;
136   for(i=0 ; array[i] != -1; i++)
137     ;
138   return i;
139 }
140
141 inline int findmax(int *array, int arraylength) {
142   int max, i;
143   max = array[0];
144   for(i = 0; i < arraylength; i++) {
145     if(array[i] > max) {
146       max = array[i];
147     }
148   }
149   return max;
150 }
151
152 /* This function is a prefetch call generated by the compiler that
153  * populates the shared primary prefetch queue*/
154 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
155   /* Allocate for the queue node*/
156   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
157   int len;
158   char * node= getmemory(qnodesize);
159   int top=endoffsets[ntuples-1];
160
161   if (node==NULL)
162     return;
163   /* Set queue node values */
164
165   /* TODO: Remove this after testing */
166   evalPrefetch[siteid].callcount++;
167
168   *((int *)(node))=siteid;
169   *((int *)(node + sizeof(int))) = ntuples;
170   len = 2*sizeof(int);
171   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
172   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
173   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
174
175   /* Lock and insert into primary prefetch queue */
176   movehead(qnodesize);
177 }
178
179 /* This function starts up the transaction runtime. */
180 int dstmStartup(const char * option) {
181   pthread_t thread_Listen, udp_thread_Listen;
182   pthread_attr_t attr;
183   int master=option!=NULL && strcmp(option, "master")==0;
184   int fd;
185   int udpfd;
186
187   if (processConfigFile() != 0)
188     return 0; //TODO: return error value, cause main program to exit
189 #ifdef COMPILER
190   if (!master)
191     threadcount--;
192 #endif
193
194 #ifdef TRANSSTATS
195   printf("Trans stats is on\n");
196   fflush(stdout);
197 #endif
198 #ifdef ABORTREADERS
199   initreaderlist();
200 #endif
201
202   //Initialize socket pool
203   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
204   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
205   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
206
207   dstmInit();
208   transInit();
209
210   fd=startlistening();
211   pthread_attr_init(&attr);
212   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
213 #ifdef CACHE
214   udpfd = udpInit();
215   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
216 #endif
217   if (master) {
218     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
219     return 1;
220   } else {
221     dstmListen((void *)fd);
222     return 0;
223   }
224 }
225
226 //TODO Use this later
227 void *pCacheAlloc(objstr_t *store, unsigned int size) {
228   void *tmp;
229   objstr_t *ptr;
230   ptr = store;
231   int success = 0;
232
233   while(ptr->next != NULL) {
234     /* check if store is empty */
235     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
236       tmp = ptr->top;
237       ptr->top += size;
238       success = 1;
239       return tmp;
240     } else {
241       ptr = ptr->next;
242     }
243   }
244
245   if(success == 0) {
246     return NULL;
247   }
248 }
249
250 /* This function initiates the prefetch thread A queue is shared
251  * between the main thread of execution and the prefetch thread to
252  * process the prefetch call Call from compiler populates the shared
253  * queue with prefetch requests while prefetch thread processes the
254  * prefetch requests */
255
256 void transInit() {
257   //Create and initialize prefetch cache structure
258 #ifdef CACHE
259   initializePCache();
260   if((evalPrefetch = initPrefetchStats()) == NULL) {
261     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
262     exit(0);
263   }
264 #endif
265
266   /* Initialize attributes for mutex */
267   pthread_mutexattr_init(&prefetchcache_mutex_attr);
268   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
269
270   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
271   pthread_mutex_init(&notifymutex, NULL);
272   pthread_mutex_init(&atomicObjLock, NULL);
273 #ifdef CACHE
274   //Create prefetch cache lookup table
275   if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
276     printf("ERROR\n");
277     return; //Failure
278   }
279
280   //Initialize primary shared queue
281   queueInit();
282   //Initialize machine pile w/prefetch oids and offsets shared queue
283   mcpileqInit();
284
285   //Create the primary prefetch thread
286   int retval;
287 #ifdef RANGEPREFETCH
288   do {
289     retval=pthread_create(&tPrefetch, NULL, transPrefetchNew, NULL);
290   } while(retval!=0);
291 #else
292   do {
293     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
294   } while(retval!=0);
295 #endif
296   pthread_detach(tPrefetch);
297 #endif
298 }
299
300 /* This function stops the threads spawned */
301 void transExit() {
302 #ifdef CACHE
303   int t;
304   pthread_cancel(tPrefetch);
305   for(t = 0; t < NUM_THREADS; t++)
306     pthread_cancel(wthreads[t]);
307 #endif
308
309   return;
310 }
311
312 /* This functions inserts randowm wait delays in the order of msec
313  * Mostly used when transaction commits retry*/
314 void randomdelay() {
315   struct timespec req;
316   time_t t;
317
318   t = time(NULL);
319   req.tv_sec = 0;
320   req.tv_nsec = (long)(1000 + (t%10000)); //1-11 microsec
321   nanosleep(&req, NULL);
322   return;
323 }
324
325 /* This function initializes things required in the transaction start*/
326 __attribute__((malloc)) transrecord_t *transStart() {
327   transrecord_t *tmp;
328   if((tmp = calloc(1, sizeof(transrecord_t))) == NULL) {
329     printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
330     return NULL;
331   }
332   tmp->cache = objstrCreate(1048576);
333   tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
334   //#ifdef COMPILER
335   //  tmp->revertlist=NULL; //Not necessary...already null
336   //#endif
337   return tmp;
338 }
339
340 // Search for an address for a given oid                                                                               
341 /*#define INLINE    inline __attribute__((always_inline))
342
343 INLINE void * chashSearchI(chashtable_t *table, unsigned int key) {
344   //REMOVE HASH FUNCTION CALL TO MAKE SURE IT IS INLINED HERE                                                          
345   chashlistnode_t *node = &table->table[(key & table->mask)>>1];
346
347   do {
348     if(node->key == key) {
349       return node->val;
350     }
351     node = node->next;
352   } while(node != NULL);
353
354   return NULL;
355   }*/
356
357
358 /* This function finds the location of the objects involved in a transaction
359  * and returns the pointer to the object if found in a remote location */
360 __attribute__((pure)) objheader_t *transRead(transrecord_t *record, unsigned int oid) {
361   unsigned int machinenumber;
362   objheader_t *tmp, *objheader;
363   objheader_t *objcopy;
364   int size;
365   void *buf;
366   chashlistnode_t *node;
367   chashtable_t *table=record->lookupTable;
368
369   if(oid == 0) {
370     return NULL;
371   }
372   
373   node= &table->table[(oid & table->mask)>>1];
374   do {
375     if(node->key == oid) {
376 #ifdef TRANSSTATS
377     nchashSearch++;
378 #endif
379 #ifdef COMPILER
380     return &((objheader_t*)node->val)[1];
381 #else
382     return node->val;
383 #endif
384     }
385     node = node->next;
386   } while(node != NULL);
387   
388
389   /*  
390   if((objheader = chashSearchI(record->lookupTable, oid)) != NULL) {
391 #ifdef TRANSSTATS
392     nchashSearch++;
393 #endif
394 #ifdef COMPILER
395     return &objheader[1];
396 #else
397     return objheader;
398 #endif
399   } else 
400   */
401
402 #ifdef ABORTREADERS
403   if (record->abort) {
404     //abort this transaction
405     printf("ABORTING\n");
406     objstrDelete(record->cache);
407     chashDelete(record->lookupTable);
408     _longjmp(record->aborttrans,1);
409   } else
410     addtransaction(oid,record);
411 #endif
412
413   if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
414 #ifdef TRANSSTATS
415     nmhashSearch++;
416 #endif
417     /* Look up in machine lookup table  and copy  into cache*/
418     GETSIZE(size, objheader);
419     size += sizeof(objheader_t);
420     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
421     memcpy(objcopy, objheader, size);
422     /* Insert into cache's lookup table */
423     STATUS(objcopy)=0;
424     chashInsert(record->lookupTable, OID(objheader), objcopy);
425 #ifdef COMPILER
426     return &objcopy[1];
427 #else
428     return objcopy;
429 #endif
430   } else {
431 #ifdef CACHE
432     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
433 #ifdef TRANSSTATS
434       nprehashSearch++;
435 #endif
436       /* Look up in prefetch cache */
437       GETSIZE(size, tmp);
438       size+=sizeof(objheader_t);
439       objcopy = (objheader_t *) objstrAlloc(record->cache, size);
440       memcpy(objcopy, tmp, size);
441       /* Insert into cache's lookup table */
442       chashInsert(record->lookupTable, OID(tmp), objcopy);
443 #ifdef COMPILER
444       return &objcopy[1];
445 #else
446       return objcopy;
447 #endif
448     }
449 #endif
450     /* Get the object from the remote location */
451     if((machinenumber = lhashSearch(oid)) == 0) {
452       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
453       return NULL;
454     }
455     objcopy = getRemoteObj(record, machinenumber, oid);
456
457     if(objcopy == NULL) {
458       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
459       return NULL;
460     } else {
461 #ifdef TRANSSTATS
462       nRemoteSend++;
463 #endif
464 #ifdef COMPILER
465       return &objcopy[1];
466 #else
467       return objcopy;
468 #endif
469     }
470   }
471 }
472
473 /* This function creates objects in the transaction record */
474 objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
475   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
476   OID(tmp) = getNewOID();
477   tmp->version = 1;
478   tmp->rcount = 1;
479   STATUS(tmp) = NEW;
480   chashInsert(record->lookupTable, OID(tmp), tmp);
481
482 #ifdef COMPILER
483   return &tmp[1]; //want space after object header
484 #else
485   return tmp;
486 #endif
487 }
488
489 #if 1
490 /* This function creates machine piles based on all machines involved in a
491  * transaction commit request */
492 plistnode_t *createPiles(transrecord_t *record) {
493   int i;
494   plistnode_t *pile = NULL;
495   unsigned int machinenum;
496   objheader_t *headeraddr;
497   chashlistnode_t * ptr = record->lookupTable->table;
498   /* Represents number of bins in the chash table */
499   unsigned int size = record->lookupTable->size;
500
501   for(i = 0; i < size ; i++) {
502     chashlistnode_t * curr = &ptr[i];
503     /* Inner loop to traverse the linked list of the cache lookupTable */
504     while(curr != NULL) {
505       //if the first bin in hash table is empty
506       if(curr->key == 0)
507         break;
508       headeraddr=(objheader_t *) curr->val;
509
510       //Get machine location for object id (and whether local or not)
511       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
512         machinenum = myIpAddr;
513       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
514         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
515         return NULL;
516       }
517
518       //Make machine groups
519       pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
520       curr = curr->next;
521     }
522   }
523   return pile;
524 }
525 #else
526 /* This function creates machine piles based on all machines involved in a
527  * transaction commit request */
528 plistnode_t *createPiles(transrecord_t *record) {
529   int i;
530   plistnode_t *pile = NULL;
531   unsigned int machinenum;
532   objheader_t *headeraddr;
533   struct chashentry * ptr = record->lookupTable->table;
534   /* Represents number of bins in the chash table */
535   unsigned int size = record->lookupTable->size;
536
537   for(i = 0; i < size ; i++) {
538     struct chashentry * curr = & ptr[i];
539     /* Inner loop to traverse the linked list of the cache lookupTable */
540     //if the first bin in hash table is empty
541     if(curr->key == 0)
542       continue;
543     headeraddr=(objheader_t *) curr->ptr;
544
545     //Get machine location for object id (and whether local or not)
546     if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
547       machinenum = myIpAddr;
548     } else if ((machinenum = lhashSearch(curr->key)) == 0) {
549       printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
550       return NULL;
551     }
552
553     //Make machine groups
554     pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
555   }
556   return pile;
557 }
558 #endif
559
560 /* This function initiates the transaction commit process
561  * Spawns threads for each of the new connections with Participants
562  * and creates new piles by calling the createPiles(),
563  * Sends a transrequest() to each remote machines for objects found remotely
564  * and calls handleLocalReq() to process objects found locally */
565 int transCommit(transrecord_t *record) {
566   unsigned int tot_bytes_mod, *listmid;
567   plistnode_t *pile, *pile_ptr;
568   int trecvcount;
569   char treplyretry; /* keeps track of the common response that needs to be sent */
570   int firsttime=1;
571   trans_commit_data_t transinfo; /* keeps track of objs locked during transaction */
572   char finalResponse;
573
574 #ifdef ABORTREADERS
575   if (record->abort) {
576     //abort this transaction
577     printf("ABORTING TRANSACTION AT COMMIT\n");
578     objstrDelete(record->cache);
579     chashDelete(record->lookupTable);
580     free(record);
581     return 1;
582   }
583 #endif
584
585
586   do {
587     trecvcount = 0;
588     treplyretry = 0;
589
590     /* Look through all the objects in the transaction record and make piles
591      * for each machine involved in the transaction*/
592     if (firsttime) {
593       pile_ptr = pile = createPiles(record);
594       pile_ptr = pile = sortPiles(pile);
595     } else {
596       pile = pile_ptr;
597     }
598     firsttime = 0;
599     /* Create the packet to be sent in TRANS_REQUEST */
600
601     /* Count the number of participants */
602     int pilecount;
603     pilecount = pCount(pile);
604
605     /* Create a list of machine ids(Participants) involved in transaction   */
606     listmid = calloc(pilecount, sizeof(unsigned int));
607     pListMid(pile, listmid);
608
609     /* Create a socket and getReplyCtrl array, initialize */
610     int socklist[pilecount];
611     int loopcount;
612     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
613       socklist[loopcount] = 0;
614     char getReplyCtrl[pilecount];
615     for(loopcount = 0 ; loopcount < pilecount; loopcount++)
616       getReplyCtrl[loopcount] = 0;
617
618     /* Process each machine pile */
619     int sockindex = 0;
620     trans_req_data_t *tosend;
621     tosend = calloc(pilecount, sizeof(trans_req_data_t));
622     while(pile != NULL) {
623       tosend[sockindex].f.control = TRANS_REQUEST;
624       tosend[sockindex].f.mcount = pilecount;
625       tosend[sockindex].f.numread = pile->numread;
626       tosend[sockindex].f.nummod = pile->nummod;
627       tosend[sockindex].f.numcreated = pile->numcreated;
628       tosend[sockindex].f.sum_bytes = pile->sum_bytes;
629       tosend[sockindex].listmid = listmid;
630       tosend[sockindex].objread = pile->objread;
631       tosend[sockindex].oidmod = pile->oidmod;
632       tosend[sockindex].oidcreated = pile->oidcreated;
633       int sd = 0;
634       if(pile->mid != myIpAddr) {
635         if((sd = getSock2WithLock(transRequestSockPool, pile->mid)) < 0) {
636           printf("transRequest(): socket create error\n");
637           free(listmid);
638           free(tosend);
639           return 1;
640         }
641         socklist[sockindex] = sd;
642         /* Send bytes of data with TRANS_REQUEST control message */
643         send_data(sd, &(tosend[sockindex].f), sizeof(fixed_data_t));
644
645         /* Send list of machines involved in the transaction */
646         {
647           int size=sizeof(unsigned int)*(tosend[sockindex].f.mcount);
648           send_data(sd, tosend[sockindex].listmid, size);
649         }
650
651         /* Send oids and version number tuples for objects that are read */
652         {
653           int size=(sizeof(unsigned int)+sizeof(unsigned short))*(tosend[sockindex].f.numread);
654           send_data(sd, tosend[sockindex].objread, size);
655         }
656
657         /* Send objects that are modified */
658         void *modptr;
659         if((modptr = calloc(1, tosend[sockindex].f.sum_bytes)) == NULL) {
660           printf("Calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
661           free(listmid);
662           free(tosend);
663           return 1;
664         }
665         int offset = 0;
666         int i;
667         for(i = 0; i < tosend[sockindex].f.nummod ; i++) {
668           int size;
669           objheader_t *headeraddr;
670           if((headeraddr = chashSearch(record->lookupTable, tosend[sockindex].oidmod[i])) == NULL) {
671             printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
672             free(modptr);
673             free(listmid);
674             free(tosend);
675             return 1;
676           }
677           GETSIZE(size,headeraddr);
678           size+=sizeof(objheader_t);
679           memcpy(modptr+offset, headeraddr, size);
680           offset+=size;
681         }
682         send_data(sd, modptr, tosend[sockindex].f.sum_bytes);
683         free(modptr);
684       } else { //handle request locally
685         handleLocalReq(&tosend[sockindex], &transinfo, record, &getReplyCtrl[sockindex]);
686       }
687       sockindex++;
688       pile = pile->next;
689     } //end of pile processing
690       /* Recv Ctrl msgs from all machines */
691     int i;
692     for(i = 0; i < pilecount; i++) {
693       int sd = socklist[i];
694       if(sd != 0) {
695         char control;
696         recv_data(sd, &control, sizeof(char));
697         //Update common data structure with new ctrl msg
698         getReplyCtrl[i] = control;
699         /* Recv Objects if participant sends TRANS_DISAGREE */
700 #ifdef CACHE
701         if(control == TRANS_DISAGREE) {
702           int length;
703           recv_data(sd, &length, sizeof(int));
704           void *newAddr;
705           pthread_mutex_lock(&prefetchcache_mutex);
706           if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
707             printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
708             free(tosend);
709             free(listmid);
710             pthread_mutex_unlock(&prefetchcache_mutex);
711             return 1;
712           }
713           pthread_mutex_unlock(&prefetchcache_mutex);
714           recv_data(sd, newAddr, length);
715           int offset = 0;
716           while(length != 0) {
717             unsigned int oidToPrefetch;
718             objheader_t * header;
719             header = (objheader_t *)(((char *)newAddr) + offset);
720             oidToPrefetch = OID(header);
721             STATUS(header)=0;
722             int size = 0;
723             GETSIZE(size, header);
724             size += sizeof(objheader_t);
725             //make an entry in prefetch hash table
726             void *oldptr;
727             if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
728               prehashRemove(oidToPrefetch);
729               prehashInsert(oidToPrefetch, header);
730             } else {
731               prehashInsert(oidToPrefetch, header);
732             }
733             length = length - size;
734             offset += size;
735           }
736         } //end of receiving objs
737 #endif
738       }
739     }
740     /* Decide the final response */
741     if((finalResponse = decideResponse(getReplyCtrl, &treplyretry, record, pilecount)) == 0) {
742       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
743       free(tosend);
744       free(listmid);
745       return 1;
746     }
747
748     /* Send responses to all machines */
749     for(i = 0; i < pilecount; i++) {
750       int sd = socklist[i];
751       if(sd != 0) {
752 #ifdef CACHE
753         if(finalResponse == TRANS_COMMIT) {
754           int retval;
755           /* Update prefetch cache */
756           if((retval = updatePrefetchCache(&(tosend[i]), record)) != 0) {
757             printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
758             free(tosend);
759             free(listmid);
760             return 1;
761           }
762
763
764           /* Invalidate objects in other machine cache */
765           if(tosend[i].f.nummod > 0) {
766             if((retval = invalidateObj(&(tosend[i]))) != 0) {
767               printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
768               free(tosend);
769               free(listmid);
770               return 1;
771             }
772           }
773 #ifdef ABORTREADERS
774           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
775           removethisreadtransaction(tosend[i].objread, tosend[i].f.numread, record);
776 #endif
777         }
778 #ifdef ABORTREADERS
779         else if (!treplyretry) {
780           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record);
781           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread,record);
782         }
783 #endif
784 #endif
785         send_data(sd, &finalResponse, sizeof(char));
786       } else {
787         /* Complete local processing */
788         doLocalProcess(finalResponse, &(tosend[i]), &transinfo, record);
789 #ifdef ABORTREADERS
790         if(finalResponse == TRANS_COMMIT) {
791           removetransaction(tosend[i].oidmod,tosend[i].f.nummod);
792           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread, record);
793         } else if (!treplyretry) {
794           removethistransaction(tosend[i].oidmod,tosend[i].f.nummod,record);
795           removethisreadtransaction(tosend[i].objread,tosend[i].f.numread,record);
796         }
797 #endif
798       }
799     }
800
801     /* Free resources */
802     free(tosend);
803     free(listmid);
804     if (!treplyretry)
805       pDelete(pile_ptr);
806     /* wait a random amount of time before retrying to commit transaction*/
807     if(treplyretry) {
808       randomdelay();
809 #ifdef TRANSSTATS
810       nSoftAbort++;
811 #endif
812     }
813     /* Retry trans commit procedure during soft_abort case */
814   } while (treplyretry);
815
816   if(finalResponse == TRANS_ABORT) {
817     //printf("Aborting trans\n");
818 #ifdef TRANSSTATS
819     numTransAbort++;
820 #endif
821     /* Free Resources */
822     objstrDelete(record->cache);
823     chashDelete(record->lookupTable);
824     free(record);
825     return TRANS_ABORT;
826   } else if(finalResponse == TRANS_COMMIT) {
827 #ifdef TRANSSTATS
828     numTransCommit++;
829 #endif
830     /* Free Resources */
831     objstrDelete(record->cache);
832     chashDelete(record->lookupTable);
833     free(record);
834     return 0;
835   } else {
836     //TODO Add other cases
837     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
838     exit(-1);
839   }
840   return 0;
841 }
842
843 /* This function handles the local objects involved in a transaction
844  * commiting process.  It also makes a decision if this local machine
845  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator */
846 void handleLocalReq(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec, char *getReplyCtrl) {
847   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
848   int numoidnotfound = 0, numoidlocked = 0;
849   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
850   int numread, i;
851   unsigned int oid;
852   unsigned short version;
853
854   /* Counters and arrays to formulate decision on control message to be sent */
855   oidnotfound = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod), sizeof(unsigned int));
856   oidlocked = (unsigned int *) calloc((tdata->f.numread + tdata->f.nummod +1), sizeof(unsigned int)); // calloc additional 1 byte for
857   //setting a divider between read and write locks
858   numread = tdata->f.numread;
859   /* Process each oid in the machine pile/ group per thread */
860   for (i = 0; i < tdata->f.numread + tdata->f.nummod; i++) {
861     if (i < tdata->f.numread) {
862       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
863       incr *= i;
864       oid = *((unsigned int *)(((char *)tdata->objread) + incr));
865       version = *((unsigned short *)(((char *)tdata->objread) + incr + sizeof(unsigned int)));
866       commitCountForObjRead(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
867     } else { // Objects Modified
868       if(i == tdata->f.numread) {
869         oidlocked[numoidlocked++] = -1;
870       }
871       int tmpsize;
872       objheader_t *headptr;
873       headptr = (objheader_t *) chashSearch(rec->lookupTable, tdata->oidmod[i-numread]);
874       if (headptr == NULL) {
875         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
876         return;
877       }
878       oid = OID(headptr);
879       version = headptr->version;
880       commitCountForObjMod(getReplyCtrl, oidnotfound, oidlocked, &numoidnotfound, &numoidlocked, &v_nomatch, &v_matchlock, &v_matchnolock, oid, version);
881     }
882   }
883
884 /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
885  * if Participant receives a TRANS_COMMIT */
886   transinfo->objlocked = oidlocked;
887   transinfo->objnotfound = oidnotfound;
888   transinfo->modptr = NULL;
889   transinfo->numlocked = numoidlocked;
890   transinfo->numnotfound = numoidnotfound;
891
892   /* Condition to send TRANS_AGREE */
893   if(v_matchnolock == tdata->f.numread + tdata->f.nummod) {
894     *getReplyCtrl = TRANS_AGREE;
895   }
896   /* Condition to send TRANS_SOFT_ABORT */
897   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
898     *getReplyCtrl = TRANS_SOFT_ABORT;
899   }
900 }
901
902 void doLocalProcess(char finalResponse, trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *record) {
903   if(finalResponse == TRANS_ABORT) {
904     if(transAbortProcess(transinfo) != 0) {
905       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
906       fflush(stdout);
907       return;
908     }
909   } else if(finalResponse == TRANS_COMMIT) {
910 #ifdef CACHE
911     /* Invalidate objects in other machine cache */
912     if(tdata->f.nummod > 0) {
913       int retval;
914       if((retval = invalidateObj(tdata)) != 0) {
915         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
916         return;
917       }
918     }
919 #endif
920     if(transComProcess(tdata, transinfo, record) != 0) {
921       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
922       fflush(stdout);
923       return;
924     }
925   } else {
926     printf("ERROR...No Decision\n");
927   }
928
929   /* Free memory */
930   if (transinfo->objlocked != NULL) {
931     free(transinfo->objlocked);
932   }
933   if (transinfo->objnotfound != NULL) {
934     free(transinfo->objnotfound);
935   }
936 }
937
938 /* This function decides the reponse that needs to be sent to
939  * all Participant machines after the TRANS_REQUEST protocol */
940 char decideResponse(char *getReplyCtrl, char *treplyretry, transrecord_t *record, int pilecount) {
941   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
942                                                                    message to send */
943   for (i = 0 ; i < pilecount; i++) {
944     char control;
945     control = getReplyCtrl[i];
946     switch(control) {
947     default:
948       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
949
950       /* treat as disagree, pass thru */
951     case TRANS_DISAGREE:
952       transdisagree++;
953       break;
954
955     case TRANS_AGREE:
956       transagree++;
957       break;
958
959     case TRANS_SOFT_ABORT:
960       transsoftabort++;
961       break;
962     }
963   }
964
965   if(transdisagree > 0) {
966     /* Send Abort */
967     *treplyretry = 0;
968     return TRANS_ABORT;
969 #ifdef CACHE
970     /* clear objects from prefetch cache */
971     cleanPCache(record);
972 #endif
973   } else if(transagree == pilecount) {
974     /* Send Commit */
975     *treplyretry = 0;
976     return TRANS_COMMIT;
977   } else {
978     /* Send Abort in soft abort case followed by retry commiting transaction again*/
979     *treplyretry = 1;
980     return TRANS_ABORT;
981   }
982   return 0;
983 }
984
985 /* This function opens a connection, places an object read request to
986  * the remote machine, reads the control message and object if
987  * available and copies the object and its header to the local
988  * cache. */
989
990 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
991   int size, val;
992   struct sockaddr_in serv_addr;
993   char machineip[16];
994   char control;
995   objheader_t *h;
996   void *objcopy = NULL;
997
998   int sd = getSock2(transReadSockPool, mnum);
999   char readrequest[sizeof(char)+sizeof(unsigned int)];
1000   readrequest[0] = READ_REQUEST;
1001   *((unsigned int *)(&readrequest[1])) = oid;
1002   send_data(sd, readrequest, sizeof(readrequest));
1003
1004   /* Read response from the Participant */
1005   recv_data(sd, &control, sizeof(char));
1006
1007   if (control==OBJECT_NOT_FOUND) {
1008     objcopy = NULL;
1009   } else {
1010     /* Read object if found into local cache */
1011     recv_data(sd, &size, sizeof(int));
1012     objcopy = objstrAlloc(record->cache, size);
1013     recv_data(sd, objcopy, size);
1014     STATUS(objcopy)=0;
1015     /* Insert into cache's lookup table */
1016     chashInsert(record->lookupTable, oid, objcopy);
1017   }
1018
1019   return objcopy;
1020 }
1021
1022 /*  Commit info for objects modified */
1023 void commitCountForObjMod(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1024                           int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1025   void *mobj;
1026   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1027   /* Save the oids not found and number of oids not found for later use */
1028   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1029     /* Save the oids not found and number of oids not found for later use */
1030     oidnotfound[*numoidnotfound] = oid;
1031     (*numoidnotfound)++;
1032   } else { /* If Obj found in machine (i.e. has not moved) */
1033     /* Check if Obj is locked by any previous transaction */
1034     if (write_trylock(STATUSPTR(mobj))) { // Can acquire write lock
1035       if (version == ((objheader_t *)mobj)->version) {      /* match versions */
1036         (*v_matchnolock)++;
1037         //Keep track of what is locked
1038         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1039       } else { /* If versions don't match ...HARD ABORT */
1040         (*v_nomatch)++;
1041         /* Send TRANS_DISAGREE to Coordinator */
1042         *getReplyCtrl = TRANS_DISAGREE;
1043
1044         //Keep track of what is locked
1045         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1046         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1047         return;
1048       }
1049     } else { //A lock is acquired some place else
1050       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1051         (*v_matchlock)++;
1052       } else { /* If versions don't match ...HARD ABORT */
1053         (*v_nomatch)++;
1054         /* Send TRANS_DISAGREE to Coordinator */
1055         *getReplyCtrl = TRANS_DISAGREE;
1056         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1057         return;
1058       }
1059     }
1060   }
1061 }
1062
1063 /*  Commit info for objects modified */
1064 void commitCountForObjRead(char *getReplyCtrl, unsigned int *oidnotfound, unsigned int *oidlocked, int *numoidnotfound,
1065                            int *numoidlocked, int *v_nomatch, int *v_matchlock, int *v_matchnolock, unsigned int oid, unsigned short version) {
1066   void *mobj;
1067   /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
1068   /* Save the oids not found and number of oids not found for later use */
1069   if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
1070     /* Save the oids not found and number of oids not found for later use */
1071     oidnotfound[*numoidnotfound] = oid;
1072     (*numoidnotfound)++;
1073   } else { /* If Obj found in machine (i.e. has not moved) */
1074     /* Check if Obj is locked by any previous transaction */
1075     if (read_trylock(STATUSPTR(mobj))) { // Can further acquire read locks
1076       if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
1077         (*v_matchnolock)++;
1078         //Keep track of what is locked
1079         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1080       } else { /* If versions don't match ...HARD ABORT */
1081         (*v_nomatch)++;
1082         /* Send TRANS_DISAGREE to Coordinator */
1083         *getReplyCtrl = TRANS_DISAGREE;
1084         //Keep track of what is locked
1085         oidlocked[(*numoidlocked)++] = OID(((objheader_t *)mobj));
1086         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1087         return;
1088       }
1089     } else { //Has reached max number of readers or some other transaction
1090       //has acquired a lock on this object
1091       if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1092         (*v_matchlock)++;
1093       } else { /* If versions don't match ...HARD ABORT */
1094         (*v_nomatch)++;
1095         /* Send TRANS_DISAGREE to Coordinator */
1096         *getReplyCtrl = TRANS_DISAGREE;
1097         //printf("%s() oid = %d, type = %d\t", __func__, OID(mobj), TYPE((objheader_t *)mobj));
1098         return;
1099       }
1100     }
1101   }
1102 }
1103
1104 /* This function completes the ABORT process if the transaction is aborting */
1105 int transAbortProcess(trans_commit_data_t *transinfo) {
1106   int i, numlocked;
1107   unsigned int *objlocked;
1108   void *header;
1109
1110   numlocked = transinfo->numlocked;
1111   objlocked = transinfo->objlocked;
1112
1113   int useWriteUnlock = 0;
1114   for (i = 0; i < numlocked; i++) {
1115     if(objlocked[i] == -1) {
1116       useWriteUnlock = 1;
1117       continue;
1118     }
1119     if((header = mhashSearch(objlocked[i])) == NULL) {
1120       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1121       return 1;
1122     }
1123     if(!useWriteUnlock) {
1124       read_unlock(STATUSPTR(header));
1125     } else {
1126       write_unlock(STATUSPTR(header));
1127     }
1128   }
1129
1130   return 0;
1131 }
1132
1133 /*This function completes the COMMIT process if the transaction is commiting*/
1134 int transComProcess(trans_req_data_t *tdata, trans_commit_data_t *transinfo, transrecord_t *rec) {
1135   objheader_t *header, *tcptr;
1136   int i, nummod, tmpsize, numcreated, numlocked;
1137   unsigned int *oidmod, *oidcreated, *oidlocked;
1138   void *ptrcreate;
1139
1140   nummod = tdata->f.nummod;
1141   oidmod = tdata->oidmod;
1142   numcreated = tdata->f.numcreated;
1143   oidcreated = tdata->oidcreated;
1144   numlocked = transinfo->numlocked;
1145   oidlocked = transinfo->objlocked;
1146
1147   for (i = 0; i < nummod; i++) {
1148     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1149       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1150       return 1;
1151     }
1152     /* Copy from transaction cache -> main object store */
1153     if ((tcptr = ((objheader_t *) chashSearch(rec->lookupTable, oidmod[i]))) == NULL) {
1154       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1155       return 1;
1156     }
1157     GETSIZE(tmpsize, header);
1158     char *tmptcptr = (char *) tcptr;
1159     {
1160       struct ___Object___ *dst=(struct ___Object___*)((char*)header+sizeof(objheader_t));
1161       struct ___Object___ *src=(struct ___Object___*)((char*)tmptcptr+sizeof(objheader_t));
1162       dst->___cachedCode___=src->___cachedCode___;
1163       dst->___cachedHash___=src->___cachedHash___;
1164
1165       memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
1166     }
1167
1168     header->version += 1;
1169     if(header->notifylist != NULL) {
1170       notifyAll(&header->notifylist, OID(header), header->version);
1171     }
1172   }
1173   /* If object is newly created inside transaction then commit it */
1174   for (i = 0; i < numcreated; i++) {
1175     if ((header = ((objheader_t *) chashSearch(rec->lookupTable, oidcreated[i]))) == NULL) {
1176       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1177       return 1;
1178     }
1179     GETSIZE(tmpsize, header);
1180     tmpsize += sizeof(objheader_t);
1181     pthread_mutex_lock(&mainobjstore_mutex);
1182     if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
1183       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1184       pthread_mutex_unlock(&mainobjstore_mutex);
1185       return 1;
1186     }
1187     pthread_mutex_unlock(&mainobjstore_mutex);
1188     /* Initialize read and write locks */
1189     initdsmlocks(STATUSPTR(header));
1190     memcpy(ptrcreate, header, tmpsize);
1191     mhashInsert(oidcreated[i], ptrcreate);
1192     lhashInsert(oidcreated[i], myIpAddr);
1193   }
1194   /* Unlock locked objects */
1195   int useWriteUnlock = 0;
1196   for(i = 0; i < numlocked; i++) {
1197     if(oidlocked[i] == -1) {
1198       useWriteUnlock = 1;
1199       continue;
1200     }
1201     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1202       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1203       return 1;
1204     }
1205     if(!useWriteUnlock) {
1206       read_unlock(STATUSPTR(header));
1207     } else {
1208       write_unlock(STATUSPTR(header));
1209     }
1210   }
1211   return 0;
1212 }
1213
1214 prefetchpile_t *foundLocal(char *ptr) {
1215   int siteid = *(GET_SITEID(ptr));
1216   int ntuples = *(GET_NTUPLES(ptr));
1217   unsigned int * oidarray = GET_PTR_OID(ptr);
1218   unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1219   short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1220   prefetchpile_t * head=NULL;
1221   int numLocal = 0;
1222
1223   int i;
1224   for(i=0; i<ntuples; i++) {
1225     unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1226     unsigned short endindex=endoffsets[i];
1227     unsigned int oid=oidarray[i];
1228     int newbase;
1229     int machinenum;
1230     if (oid==0)
1231       continue;
1232     //Look up fields locally
1233     for(newbase=baseindex; newbase<endindex; newbase++) {
1234       if (!lookupObject(&oid, arryfields[newbase]))
1235         break;
1236       //Ended in a null pointer...
1237       if (oid==0)
1238         goto tuple;
1239     }
1240     //Entire prefetch is local
1241     if (newbase==endindex&&checkoid(oid)) {
1242       numLocal++;
1243       goto tuple;
1244     }
1245     //Add to remote requests
1246     machinenum=lhashSearch(oid);
1247     insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1248 tuple:
1249     ;
1250   }
1251
1252   /* handle dynamic prefetching */
1253   handleDynPrefetching(numLocal, ntuples, siteid);
1254   return head;
1255 }
1256
1257 int checkoid(unsigned int oid) {
1258   objheader_t *header;
1259   if ((header=mhashSearch(oid))!=NULL) {
1260     //Found on machine
1261     return 1;
1262   } else if ((header=prehashSearch(oid))!=NULL) {
1263     //Found in cache
1264     return 1;
1265   } else {
1266     return 0;
1267   }
1268 }
1269
1270 int lookupObject(unsigned int * oid, short offset) {
1271   objheader_t *header;
1272   if ((header=mhashSearch(*oid))!=NULL) {
1273     //Found on machine
1274     ;
1275   } else if ((header=prehashSearch(*oid))!=NULL) {
1276     //Found in cache
1277     ;
1278   } else {
1279     return 0;
1280   }
1281
1282   if(TYPE(header) > NUMCLASSES) {
1283     int elementsize = classsize[TYPE(header)];
1284     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1285     int length = ao->___length___;
1286     /* Check if array out of bounds */
1287     if(offset < 0 || offset >= length) {
1288       //if yes treat the object as found
1289       (*oid)=0;
1290       return 1;
1291     }
1292     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1293     return 1;
1294   } else {
1295     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1296     return 1;
1297   }
1298 }
1299
1300
1301 /* This function is called by the thread calling transPrefetch */
1302 void *transPrefetch(void *t) {
1303   while(1) {
1304     /* read from prefetch queue */
1305     void *node=gettail();
1306     /* Check if the tuples are found locally, if yes then reduce them further*/
1307     /* and group requests by remote machine ids by calling the makePreGroups() */
1308     prefetchpile_t *pilehead = foundLocal(node);
1309
1310     if (pilehead!=NULL) {
1311       // Get sock from shared pool
1312       int sd = getSock2(transPrefetchSockPool, pilehead->mid);
1313
1314       /* Send  Prefetch Request */
1315       prefetchpile_t *ptr = pilehead;
1316       while(ptr != NULL) {
1317         sendPrefetchReq(ptr, sd);
1318         ptr = ptr->next;
1319       }
1320
1321       /* Release socket */
1322       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1323
1324       /* Deallocated pilehead */
1325       mcdealloc(pilehead);
1326     }
1327     // Deallocate the prefetch queue pile node
1328     inctail();
1329   }
1330 }
1331
1332 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1333   objpile_t *tmp;
1334
1335   int size=sizeof(char)+sizeof(int);
1336   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1337     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1338   }
1339
1340   char buft[size];
1341   char *buf=buft;
1342   *buf=TRANS_PREFETCH;
1343   buf+=sizeof(char);
1344
1345   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1346     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1347     *((int*)buf)=len;
1348     buf+=sizeof(int);
1349     *((unsigned int *)buf)=tmp->oid;
1350     buf+=sizeof(unsigned int);
1351     *((unsigned int *)(buf)) = myIpAddr;
1352     buf+=sizeof(unsigned int);
1353     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1354     buf+=tmp->numoffset*sizeof(short);
1355   }
1356   *((int *)buf)=-1;
1357   send_data(sd, buft, size);
1358   return;
1359 }
1360
1361 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1362   int len, endpair;
1363   char control;
1364   objpile_t *tmp;
1365
1366   /* Send TRANS_PREFETCH control message */
1367   control = TRANS_PREFETCH;
1368   send_data(sd, &control, sizeof(char));
1369
1370   /* Send Oids and offsets in pairs */
1371   tmp = mcpilenode->objpiles;
1372   while(tmp != NULL) {
1373     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1374     char oidnoffset[len];
1375     char *buf=oidnoffset;
1376     *((int*)buf) = tmp->numoffset;
1377     buf+=sizeof(int);
1378     *((unsigned int *)buf) = tmp->oid;
1379     buf+=sizeof(unsigned int);
1380     *((unsigned int *)buf) = myIpAddr;
1381     buf += sizeof(unsigned int);
1382     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1383     send_data(sd, oidnoffset, len);
1384     tmp = tmp->next;
1385   }
1386
1387   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1388   endpair = -1;
1389   send_data(sd, &endpair, sizeof(int));
1390
1391   return;
1392 }
1393
1394 int getPrefetchResponse(int sd) {
1395   int length = 0, size = 0;
1396   char control;
1397   unsigned int oid;
1398   void *modptr, *oldptr;
1399
1400   recv_data((int)sd, &length, sizeof(int));
1401   size = length - sizeof(int);
1402   char recvbuffer[size];
1403
1404   recv_data((int)sd, recvbuffer, size);
1405   control = *((char *) recvbuffer);
1406   if(control == OBJECT_FOUND) {
1407     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1408     size = size - (sizeof(char) + sizeof(unsigned int));
1409     pthread_mutex_lock(&prefetchcache_mutex);
1410     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1411       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1412       pthread_mutex_unlock(&prefetchcache_mutex);
1413       return -1;
1414     }
1415     pthread_mutex_unlock(&prefetchcache_mutex);
1416     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1417     STATUS(modptr)=0;
1418
1419     /* Insert the oid and its address into the prefetch hash lookup table */
1420     /* Do a version comparison if the oid exists */
1421     if((oldptr = prehashSearch(oid)) != NULL) {
1422       /* If older version then update with new object ptr */
1423       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1424         prehashRemove(oid);
1425         prehashInsert(oid, modptr);
1426       }
1427     } else { /* Else add the object ptr to hash table*/
1428       prehashInsert(oid, modptr);
1429     }
1430     /* Lock the Prefetch Cache look up table*/
1431     pthread_mutex_lock(&pflookup.lock);
1432     /* Broadcast signal on prefetch cache condition variable */
1433     pthread_cond_broadcast(&pflookup.cond);
1434     /* Unlock the Prefetch Cache look up table*/
1435     pthread_mutex_unlock(&pflookup.lock);
1436   } else if(control == OBJECT_NOT_FOUND) {
1437     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1438     /* TODO: For each object not found query DHT for new location and retrieve the object */
1439     /* Throw an error */
1440     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1441     //    exit(-1);
1442   } else {
1443     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1444   }
1445
1446   return 0;
1447 }
1448
1449 unsigned short getObjType(unsigned int oid) {
1450   objheader_t *objheader;
1451   unsigned short numoffset[] ={0};
1452   short fieldoffset[] ={};
1453
1454   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1455 #ifdef CACHE
1456     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1457 #endif
1458     unsigned int mid = lhashSearch(oid);
1459     int sd = getSock2(transReadSockPool, mid);
1460     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1461     remotereadrequest[0] = READ_REQUEST;
1462     *((unsigned int *)(&remotereadrequest[1])) = oid;
1463     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1464
1465     /* Read response from the Participant */
1466     char control;
1467     recv_data(sd, &control, sizeof(char));
1468
1469     if (control==OBJECT_NOT_FOUND) {
1470       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1471       fflush(stdout);
1472       exit(-1);
1473     } else {
1474       /* Read object if found into local cache */
1475       int size;
1476       recv_data(sd, &size, sizeof(int));
1477 #ifdef CACHE
1478       pthread_mutex_lock(&prefetchcache_mutex);
1479       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1480         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1481         pthread_exit(NULL);
1482       }
1483       pthread_mutex_unlock(&prefetchcache_mutex);
1484       recv_data(sd, objheader, size);
1485       prehashInsert(oid, objheader);
1486       return TYPE(objheader);
1487 #else
1488       char *buffer;
1489       if((buffer = calloc(1, size)) == NULL) {
1490         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1491         fflush(stdout);
1492         return 0;
1493       }
1494       recv_data(sd, buffer, size);
1495       objheader = (objheader_t *)buffer;
1496       unsigned short type = TYPE(objheader);
1497       free(buffer);
1498       return type;
1499 #endif
1500     }
1501 #ifdef CACHE
1502   }
1503 #endif
1504   }
1505   return TYPE(objheader);
1506 }
1507
1508 int startRemoteThread(unsigned int oid, unsigned int mid) {
1509   int sock;
1510   struct sockaddr_in remoteAddr;
1511   char msg[1 + sizeof(unsigned int)];
1512   int bytesSent;
1513   int status;
1514
1515   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1516     perror("startRemoteThread():socket()");
1517     return -1;
1518   }
1519
1520   bzero(&remoteAddr, sizeof(remoteAddr));
1521   remoteAddr.sin_family = AF_INET;
1522   remoteAddr.sin_port = htons(LISTEN_PORT);
1523   remoteAddr.sin_addr.s_addr = htonl(mid);
1524
1525   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1526     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1527            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1528     status = -1;
1529   } else
1530   {
1531     msg[0] = START_REMOTE_THREAD;
1532     *((unsigned int *) &msg[1]) = oid;
1533     send_data(sock, msg, 1 + sizeof(unsigned int));
1534   }
1535
1536   close(sock);
1537   return status;
1538 }
1539
1540 //TODO: when reusing oids, make sure they are not already in use!
1541 static unsigned int id = 0xFFFFFFFF;
1542 unsigned int getNewOID(void) {
1543   id += 2;
1544   if (id > oidMax || id < oidMin) {
1545     id = (oidMin | 1);
1546   }
1547   return id;
1548 }
1549
1550 int processConfigFile() {
1551   FILE *configFile;
1552   const int maxLineLength = 200;
1553   char lineBuffer[maxLineLength];
1554   char *token;
1555   const char *delimiters = " \t\n";
1556   char *commentBegin;
1557   in_addr_t tmpAddr;
1558
1559   configFile = fopen(CONFIG_FILENAME, "r");
1560   if (configFile == NULL) {
1561     printf("error opening %s:\n", CONFIG_FILENAME);
1562     perror("");
1563     return -1;
1564   }
1565
1566   numHostsInSystem = 0;
1567   sizeOfHostArray = 8;
1568   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1569
1570   while(fgets(lineBuffer, maxLineLength, configFile) != NULL) {
1571     commentBegin = strchr(lineBuffer, '#');
1572     if (commentBegin != NULL)
1573       *commentBegin = '\0';
1574     token = strtok(lineBuffer, delimiters);
1575     while (token != NULL) {
1576       tmpAddr = inet_addr(token);
1577       if ((int)tmpAddr == -1) {
1578         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1579         fclose(configFile);
1580         return -1;
1581       } else
1582         addHost(htonl(tmpAddr));
1583       token = strtok(NULL, delimiters);
1584     }
1585   }
1586
1587   fclose(configFile);
1588
1589   if (numHostsInSystem < 1) {
1590     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1591     return -1;
1592   }
1593 #ifdef MAC
1594   myIpAddr = getMyIpAddr("en1");
1595 #else
1596   myIpAddr = getMyIpAddr("eth0");
1597 #endif
1598   myIndexInHostArray = findHost(myIpAddr);
1599   if (myIndexInHostArray == -1) {
1600     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1601     return -1;
1602   }
1603   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1604   oidMin = oidsPerBlock * myIndexInHostArray;
1605   if (myIndexInHostArray == numHostsInSystem - 1)
1606     oidMax = 0xFFFFFFFF;
1607   else
1608     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1609
1610   return 0;
1611 }
1612
1613 void addHost(unsigned int hostIp) {
1614   unsigned int *tmpArray;
1615
1616   if (findHost(hostIp) != -1)
1617     return;
1618
1619   if (numHostsInSystem == sizeOfHostArray) {
1620     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1621     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1622     free(hostIpAddrs);
1623     hostIpAddrs = tmpArray;
1624   }
1625
1626   hostIpAddrs[numHostsInSystem++] = hostIp;
1627
1628   return;
1629 }
1630
1631 int findHost(unsigned int hostIp) {
1632   int i;
1633   for (i = 0; i < numHostsInSystem; i++)
1634     if (hostIpAddrs[i] == hostIp)
1635       return i;
1636
1637   //not found
1638   return -1;
1639 }
1640
1641 /* This function sends notification request per thread waiting on object(s) whose version
1642  * changes */
1643 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1644   int sock,i;
1645   objheader_t *objheader;
1646   struct sockaddr_in remoteAddr;
1647   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1648   char *ptr;
1649   int bytesSent;
1650   int status, size;
1651   unsigned short version;
1652   unsigned int oid,mid;
1653   static unsigned int threadid = 0;
1654   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1655   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1656   notifydata_t *ndata;
1657
1658   oid = oidarry[0];
1659   if((mid = lhashSearch(oid)) == 0) {
1660     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1661     return;
1662   }
1663
1664   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1665     perror("reqNotify():socket()");
1666     return -1;
1667   }
1668
1669   bzero(&remoteAddr, sizeof(remoteAddr));
1670   remoteAddr.sin_family = AF_INET;
1671   remoteAddr.sin_port = htons(LISTEN_PORT);
1672   remoteAddr.sin_addr.s_addr = htonl(mid);
1673
1674   /* Generate unique threadid */
1675   threadid++;
1676
1677   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1678   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1679     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1680     return -1;
1681   }
1682   ndata->numoid = numoid;
1683   ndata->threadid = threadid;
1684   ndata->oidarry = oidarry;
1685   ndata->versionarry = versionarry;
1686   ndata->threadcond = threadcond;
1687   ndata->threadnotify = threadnotify;
1688   if((status = notifyhashInsert(threadid, ndata)) != 0) {
1689     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1690     free(ndata);
1691     return -1;
1692   }
1693
1694   /* Send  number of oids, oidarry, version array, machine id and threadid */
1695   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1696     printf("reqNotify():error %d connecting to %s:%d\n", errno,
1697            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1698     free(ndata);
1699     return -1;
1700   } else {
1701     msg[0] = THREAD_NOTIFY_REQUEST;
1702     *((unsigned int *)(&msg[1])) = numoid;
1703     /* Send array of oids  */
1704     size = sizeof(unsigned int);
1705
1706     for(i = 0;i < numoid; i++) {
1707       oid = oidarry[i];
1708       *((unsigned int *)(&msg[1] + size)) = oid;
1709       size += sizeof(unsigned int);
1710     }
1711
1712     /* Send array of version  */
1713     for(i = 0;i < numoid; i++) {
1714       version = versionarry[i];
1715       *((unsigned short *)(&msg[1] + size)) = version;
1716       size += sizeof(unsigned short);
1717     }
1718
1719     *((unsigned int *)(&msg[1] + size)) = myIpAddr; size += sizeof(unsigned int);
1720     *((unsigned int *)(&msg[1] + size)) = threadid;
1721     pthread_mutex_lock(&(ndata->threadnotify));
1722     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
1723     send_data(sock, msg, size);
1724     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
1725     pthread_mutex_unlock(&(ndata->threadnotify));
1726   }
1727
1728   pthread_cond_destroy(&threadcond);
1729   pthread_mutex_destroy(&threadnotify);
1730   free(ndata);
1731   close(sock);
1732   return status;
1733 }
1734
1735 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
1736   notifydata_t *ndata;
1737   int i, objIsFound = 0, index;
1738   void *ptr;
1739
1740   //Look up the tid and call the corresponding pthread_cond_signal
1741   if((ndata = notifyhashSearch(tid)) == NULL) {
1742     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
1743     return;
1744   } else  {
1745     for(i = 0; i < ndata->numoid; i++) {
1746       if(ndata->oidarry[i] == oid) {
1747         objIsFound = 1;
1748         index = i;
1749       }
1750     }
1751     if(objIsFound == 0) {
1752       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
1753       return;
1754     } else {
1755       if(version <= ndata->versionarry[index]) {
1756         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
1757         return;
1758       } else {
1759 #ifdef CACHE
1760         /* Clear from prefetch cache and free thread related data structure */
1761         if((ptr = prehashSearch(oid)) != NULL) {
1762           prehashRemove(oid);
1763         }
1764 #endif
1765         pthread_mutex_lock(&(ndata->threadnotify));
1766         pthread_cond_signal(&(ndata->threadcond));
1767         pthread_mutex_unlock(&(ndata->threadnotify));
1768       }
1769     }
1770   }
1771   return;
1772 }
1773
1774 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
1775   threadlist_t *ptr;
1776   unsigned int mid;
1777   struct sockaddr_in remoteAddr;
1778   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
1779   int sock, status, size, bytesSent;
1780
1781   while(*head != NULL) {
1782     ptr = *head;
1783     mid = ptr->mid;
1784     //create a socket connection to that machine
1785     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1786       perror("notifyAll():socket()");
1787       return -1;
1788     }
1789
1790     bzero(&remoteAddr, sizeof(remoteAddr));
1791     remoteAddr.sin_family = AF_INET;
1792     remoteAddr.sin_port = htons(LISTEN_PORT);
1793     remoteAddr.sin_addr.s_addr = htonl(mid);
1794     //send Thread Notify response and threadid to that machine
1795     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1796       printf("notifyAll():error %d connecting to %s:%d\n", errno,
1797              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1798       fflush(stdout);
1799       status = -1;
1800     } else {
1801       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
1802       msg[0] = THREAD_NOTIFY_RESPONSE;
1803       *((unsigned int *)&msg[1]) = oid;
1804       size = sizeof(unsigned int);
1805       *((unsigned short *)(&msg[1]+ size)) = version;
1806       size+= sizeof(unsigned short);
1807       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
1808
1809       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
1810       send_data(sock, msg, size);
1811     }
1812     //close socket
1813     close(sock);
1814     // Update head
1815     *head = ptr->next;
1816     free(ptr);
1817   }
1818   return status;
1819 }
1820
1821 void transAbort(transrecord_t *trans) {
1822 #ifdef ABORTREADERS
1823   removetransactionhash(trans->lookupTable, trans);
1824 #endif
1825   objstrDelete(trans->cache);
1826   chashDelete(trans->lookupTable);
1827   free(trans);
1828 }
1829
1830 /* This function inserts necessary information into
1831  * a machine pile data structure */
1832 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
1833   plistnode_t *ptr, *tmp;
1834   int found = 0, offset = 0;
1835
1836   tmp = pile;
1837   //Add oid into a machine that is already present in the pile linked list structure
1838   while(tmp != NULL) {
1839     if (tmp->mid == mid) {
1840       int tmpsize;
1841
1842       if (STATUS(headeraddr) & NEW) {
1843         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
1844         tmp->numcreated++;
1845         GETSIZE(tmpsize, headeraddr);
1846         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
1847       } else if (STATUS(headeraddr) & DIRTY) {
1848         tmp->oidmod[tmp->nummod] = OID(headeraddr);
1849         tmp->nummod++;
1850         GETSIZE(tmpsize, headeraddr);
1851         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
1852       } else {
1853         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
1854         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
1855         offset += sizeof(unsigned int);
1856         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
1857         tmp->numread++;
1858       }
1859       found = 1;
1860       break;
1861     }
1862     tmp = tmp->next;
1863   }
1864   //Add oid for any new machine
1865   if (!found) {
1866     int tmpsize;
1867     if((ptr = pCreate(num_objs)) == NULL) {
1868       return NULL;
1869     }
1870     ptr->mid = mid;
1871     if (STATUS(headeraddr) & NEW) {
1872       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
1873       ptr->numcreated++;
1874       GETSIZE(tmpsize, headeraddr);
1875       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
1876     } else if (STATUS(headeraddr) & DIRTY) {
1877       ptr->oidmod[ptr->nummod] = OID(headeraddr);
1878       ptr->nummod++;
1879       GETSIZE(tmpsize, headeraddr);
1880       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
1881     } else {
1882       *((unsigned int *)ptr->objread)=OID(headeraddr);
1883       offset = sizeof(unsigned int);
1884       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
1885       ptr->numread++;
1886     }
1887     ptr->next = pile;
1888     pile = ptr;
1889   }
1890
1891   /* Clear Flags */
1892   STATUS(headeraddr) =0;
1893
1894
1895   return pile;
1896 }
1897
1898 plistnode_t *sortPiles(plistnode_t *pileptr) {
1899   plistnode_t *head, *ptr, *tail;
1900   head = pileptr;
1901   ptr = pileptr;
1902   /* Get tail pointer */
1903   while(ptr!= NULL) {
1904     tail = ptr;
1905     ptr = ptr->next;
1906   }
1907   ptr = pileptr;
1908   plistnode_t *prev = pileptr;
1909   /* Arrange local machine processing at the end of the pile list */
1910   while(ptr != NULL) {
1911     if(ptr != tail) {
1912       if(ptr->mid == myIpAddr && (prev != pileptr)) {
1913         prev->next = ptr->next;
1914         ptr->next = NULL;
1915         tail->next = ptr;
1916         return pileptr;
1917       }
1918       if((ptr->mid == myIpAddr) && (prev == pileptr)) {
1919         prev = ptr->next;
1920         ptr->next = NULL;
1921         tail->next = ptr;
1922         return prev;
1923       }
1924       prev = ptr;
1925     }
1926     ptr = ptr->next;
1927   }
1928   return pileptr;
1929 }