changes
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "machinepile.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include "prelookup.h"
8 #include "threadnotify.h"
9 #include "queue.h"
10 #include "addUdpEnhance.h"
11 #include "addPrefetchEnhance.h"
12 #include "gCollect.h"
13 #ifdef COMPILER
14 #include "thread.h"
15 #endif
16
17 #define NUM_THREADS 1
18 #define PREFETCH_CACHE_SIZE 1048576 //1MB
19 #define CONFIG_FILENAME "dstm.conf"
20
21 /* Global Variables */
22 extern int classsize[];
23 pfcstats_t *evalPrefetch;
24 extern int numprefetchsites; //Global variable containing number of prefetch sites
25 extern pthread_mutex_t mainobjstore_mutex; // Mutex to lock main Object store
26 objstr_t *prefetchcache; //Global Prefetch cache
27 pthread_mutex_t prefetchcache_mutex; // Mutex to lock Prefetch Cache
28 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
29 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
30 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
31 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
32 extern objstr_t *mainobjstore;
33 unsigned int myIpAddr;
34 unsigned int *hostIpAddrs;
35 int sizeOfHostArray;
36 int numHostsInSystem;
37 int myIndexInHostArray;
38 unsigned int oidsPerBlock;
39 unsigned int oidMin;
40 unsigned int oidMax;
41
42 sockPoolHashTable_t *transReadSockPool;
43 sockPoolHashTable_t *transPrefetchSockPool;
44 sockPoolHashTable_t *transRequestSockPool;
45 pthread_mutex_t notifymutex;
46 pthread_mutex_t atomicObjLock;
47
48 /***********************************
49  * Global Variables for statistics
50  **********************************/
51 int numTransCommit = 0;
52 int numTransAbort = 0;
53 int nchashSearch = 0;
54 int nmhashSearch = 0;
55 int nprehashSearch = 0;
56 int nRemoteSend = 0;
57
58 void printhex(unsigned char *, int);
59 plistnode_t *createPiles(transrecord_t *);
60
61 /*******************************
62 * Send and Recv function calls
63 *******************************/
64 void send_data(int fd, void *buf, int buflen) {
65   char *buffer = (char *)(buf);
66   int size = buflen;
67   int numbytes;
68   while (size > 0) {
69     numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
70     if (numbytes == -1) {
71       perror("send");
72       exit(0);
73     }
74     buffer += numbytes;
75     size -= numbytes;
76   }
77 }
78
79 void recv_data(int fd, void *buf, int buflen) {
80   char *buffer = (char *)(buf);
81   int size = buflen;
82   int numbytes;
83   while (size > 0) {
84     numbytes = recv(fd, buffer, size, 0);
85     if (numbytes == -1) {
86       perror("recv");
87       exit(0);
88     }
89     buffer += numbytes;
90     size -= numbytes;
91   }
92 }
93
94 int recv_data_errorcode(int fd, void *buf, int buflen) {
95   char *buffer = (char *)(buf);
96   int size = buflen;
97   int numbytes;
98   while (size > 0) {
99     numbytes = recv(fd, buffer, size, 0);
100     if (numbytes==0)
101       return 0;
102     if (numbytes == -1) {
103       perror("recv");
104       return -1;
105     }
106     buffer += numbytes;
107     size -= numbytes;
108   }
109   return 1;
110 }
111
112 void printhex(unsigned char *ptr, int numBytes) {
113   int i;
114   for (i = 0; i < numBytes; i++) {
115     if (ptr[i] < 16)
116       printf("0%x ", ptr[i]);
117     else
118       printf("%x ", ptr[i]);
119   }
120   printf("\n");
121   return;
122 }
123
124 inline int arrayLength(int *array) {
125   int i;
126   for(i=0 ; array[i] != -1; i++)
127     ;
128   return i;
129 }
130
131 inline int findmax(int *array, int arraylength) {
132   int max, i;
133   max = array[0];
134   for(i = 0; i < arraylength; i++){
135     if(array[i] > max) {
136       max = array[i];
137     }
138   }
139   return max;
140 }
141
142 /* This function is a prefetch call generated by the compiler that
143  * populates the shared primary prefetch queue*/
144 void prefetch(int siteid, int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
145   /* Allocate for the queue node*/
146   int qnodesize = 2*sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short);
147   int len;
148   char * node= getmemory(qnodesize);
149   int top=endoffsets[ntuples-1];
150
151   if (node==NULL)
152     return;
153   /* Set queue node values */
154
155   /* TODO: Remove this after testing */
156   evalPrefetch[siteid].callcount++;
157
158   *((int *)(node))=siteid;
159   *((int *)(node + sizeof(int))) = ntuples;
160   len = 2*sizeof(int);
161   memcpy(node+len, oids, ntuples*sizeof(unsigned int));
162   memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
163   memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
164
165   /* Lock and insert into primary prefetch queue */
166   movehead(qnodesize);
167 }
168
169 /* This function starts up the transaction runtime. */
170 int dstmStartup(const char * option) {
171   pthread_t thread_Listen, udp_thread_Listen;
172   pthread_attr_t attr;
173   int master=option!=NULL && strcmp(option, "master")==0;
174   int fd;
175   int udpfd;
176
177   if (processConfigFile() != 0)
178     return 0; //TODO: return error value, cause main program to exit
179 #ifdef COMPILER
180   if (!master)
181     threadcount--;
182 #endif
183
184 #ifdef TRANSSTATS
185   printf("Trans stats is on\n");
186   fflush(stdout);
187 #endif
188
189   //Initialize socket pool
190   transReadSockPool = createSockPool(transReadSockPool, DEFAULTSOCKPOOLSIZE);
191   transPrefetchSockPool = createSockPool(transPrefetchSockPool, DEFAULTSOCKPOOLSIZE);
192   transRequestSockPool = createSockPool(transRequestSockPool, DEFAULTSOCKPOOLSIZE);
193
194   dstmInit();
195   transInit();
196
197   fd=startlistening();
198   pthread_attr_init(&attr);
199   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
200 #ifdef CACHE
201   udpfd = udpInit();
202   pthread_create(&udp_thread_Listen, &attr, udpListenBroadcast, (void*)udpfd);
203 #endif
204   if (master) {
205     pthread_create(&thread_Listen, &attr, dstmListen, (void*)fd);
206     return 1;
207   } else {
208     dstmListen((void *)fd);
209     return 0;
210   }
211 }
212
213 //TODO Use this later
214 void *pCacheAlloc(objstr_t *store, unsigned int size) {
215   void *tmp;
216   objstr_t *ptr;
217   ptr = store;
218   int success = 0;
219
220   while(ptr->next != NULL) {
221     /* check if store is empty */
222     if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
223       tmp = ptr->top;
224       ptr->top += size;
225       success = 1;
226       return tmp;
227     } else {
228       ptr = ptr->next;
229     }
230   }
231
232   if(success == 0) {
233     return NULL;
234   }
235 }
236
237 /* This function initiates the prefetch thread A queue is shared
238  * between the main thread of execution and the prefetch thread to
239  * process the prefetch call Call from compiler populates the shared
240  * queue with prefetch requests while prefetch thread processes the
241  * prefetch requests */
242
243 void transInit() {
244   //Create and initialize prefetch cache structure
245 #ifdef CACHE
246   prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
247   initializePCache();
248   if((evalPrefetch = initPrefetchStats()) == NULL) {
249     printf("%s() Error allocating memory at %s, %d\n", __func__, __FILE__, __LINE__);
250     exit(0);
251   }
252 #endif
253
254   /* Initialize attributes for mutex */
255   pthread_mutexattr_init(&prefetchcache_mutex_attr);
256   pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
257
258   pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
259   pthread_mutex_init(&notifymutex, NULL);
260   pthread_mutex_init(&atomicObjLock, NULL);
261 #ifdef CACHE
262   //Create prefetch cache lookup table
263   if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
264     printf("ERROR\n");
265     return; //Failure
266   }
267
268   //Initialize primary shared queue
269   queueInit();
270   //Initialize machine pile w/prefetch oids and offsets shared queue
271   mcpileqInit();
272
273   //Create the primary prefetch thread
274   int retval;
275   do {
276     retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
277   } while(retval!=0);
278   pthread_detach(tPrefetch);
279 #endif
280 }
281
282 /* This function stops the threads spawned */
283 void transExit() {
284 #ifdef CACHE
285   int t;
286   pthread_cancel(tPrefetch);
287   for(t = 0; t < NUM_THREADS; t++)
288     pthread_cancel(wthreads[t]);
289 #endif
290
291   return;
292 }
293
294 /* This functions inserts randowm wait delays in the order of msec
295  * Mostly used when transaction commits retry*/
296 void randomdelay() {
297   struct timespec req;
298   time_t t;
299
300   t = time(NULL);
301   req.tv_sec = 0;
302   req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
303   nanosleep(&req, NULL);
304   return;
305 }
306
307 /* This function initializes things required in the transaction start*/
308 transrecord_t *transStart() {
309   transrecord_t *tmp;
310   if((tmp = calloc(1, sizeof(transrecord_t))) == NULL){
311     printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
312     return NULL;
313   }
314   tmp->cache = objstrCreate(1048576);
315   tmp->lookupTable = chashCreate(CHASH_SIZE, CLOADFACTOR);
316 #ifdef COMPILER
317   tmp->revertlist=NULL;
318 #endif
319   return tmp;
320 }
321
322 /* This function finds the location of the objects involved in a transaction
323  * and returns the pointer to the object if found in a remote location */
324 objheader_t *transRead(transrecord_t *record, unsigned int oid) {
325   unsigned int machinenumber;
326   objheader_t *tmp, *objheader;
327   objheader_t *objcopy;
328   int size;
329   void *buf;
330
331   if(oid == 0) {
332     return NULL;
333   }
334
335   if((objheader = chashSearch(record->lookupTable, oid)) != NULL){
336 #ifdef TRANSSTATS
337     nchashSearch++;
338 #endif
339     /* Search local transaction cache */
340 #ifdef COMPILER
341     return &objheader[1];
342 #else
343     return objheader;
344 #endif
345   } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
346 #ifdef TRANSSTATS
347     nmhashSearch++;
348 #endif
349     /* Look up in machine lookup table  and copy  into cache*/
350     GETSIZE(size, objheader);
351     size += sizeof(objheader_t);
352     objcopy = (objheader_t *) objstrAlloc(record->cache, size);
353     memcpy(objcopy, objheader, size);
354     /* Insert into cache's lookup table */
355     STATUS(objcopy)=0;
356     chashInsert(record->lookupTable, OID(objheader), objcopy);
357 #ifdef COMPILER
358     return &objcopy[1];
359 #else
360     return objcopy;
361 #endif
362   } else {
363 #ifdef CACHE
364     if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) {
365 #ifdef TRANSSTATS
366       nprehashSearch++;
367 #endif
368       /* Look up in prefetch cache */
369       GETSIZE(size, tmp);
370       size+=sizeof(objheader_t);
371       objcopy = (objheader_t *) objstrAlloc(record->cache, size);
372       memcpy(objcopy, tmp, size);
373       /* Insert into cache's lookup table */
374       chashInsert(record->lookupTable, OID(tmp), objcopy);
375 #ifdef COMPILER
376       return &objcopy[1];
377 #else
378       return objcopy;
379 #endif
380     }
381 #endif
382     /* Get the object from the remote location */
383     if((machinenumber = lhashSearch(oid)) == 0) {
384       printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
385       return NULL;
386     }
387     objcopy = getRemoteObj(record, machinenumber, oid);
388
389     if(objcopy == NULL) {
390       printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
391       return NULL;
392     } else {
393 #ifdef TRANSSTATS
394       nRemoteSend++;
395 #endif
396       STATUS(objcopy)=0;
397 #ifdef COMPILER
398       return &objcopy[1];
399 #else
400       return objcopy;
401 #endif
402     }
403   }
404 }
405
406 /* This function creates objects in the transaction record */
407 objheader_t *transCreateObj(transrecord_t *record, unsigned int size) {
408   objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
409   OID(tmp) = getNewOID();
410   tmp->version = 1;
411   tmp->rcount = 1;
412   STATUS(tmp) = NEW;
413   chashInsert(record->lookupTable, OID(tmp), tmp);
414
415 #ifdef COMPILER
416   return &tmp[1]; //want space after object header
417 #else
418   return tmp;
419 #endif
420 }
421
422 /* This function creates machine piles based on all machines involved in a
423  * transaction commit request */
424 plistnode_t *createPiles(transrecord_t *record) {
425   int i;
426   plistnode_t *pile = NULL;
427   unsigned int machinenum;
428   objheader_t *headeraddr;
429   chashlistnode_t * ptr = record->lookupTable->table;
430   /* Represents number of bins in the chash table */
431   unsigned int size = record->lookupTable->size;
432
433   for(i = 0; i < size ; i++) {
434     chashlistnode_t * curr = &ptr[i];
435     /* Inner loop to traverse the linked list of the cache lookupTable */
436     while(curr != NULL) {
437       //if the first bin in hash table is empty
438       if(curr->key == 0)
439         break;
440
441       if ((headeraddr = (objheader_t *) chashSearch(record->lookupTable, curr->key)) == NULL) {
442         printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
443         return NULL;
444       }
445
446       //Get machine location for object id (and whether local or not)
447       if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
448         machinenum = myIpAddr;
449       } else if ((machinenum = lhashSearch(curr->key)) == 0) {
450         printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
451         return NULL;
452       }
453
454       //Make machine groups
455       pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements);
456       curr = curr->next;
457     }
458   }
459   return pile;
460 }
461
462 /* This function initiates the transaction commit process
463  * Spawns threads for each of the new connections with Participants
464  * and creates new piles by calling the createPiles(),
465  * Sends a transrequest() to each remote machines for objects found remotely
466  * and calls handleLocalReq() to process objects found locally */
467 int transCommit(transrecord_t *record) {
468   unsigned int tot_bytes_mod, *listmid;
469   plistnode_t *pile, *pile_ptr;
470   int i, j, rc, val;
471   int pilecount, offset, threadnum, trecvcount;
472   char control;
473   char transid[TID_LEN];
474   trans_req_data_t *tosend;
475   trans_commit_data_t transinfo;
476   static int newtid = 0;
477   char treplyctrl, treplyretry; /* keeps track of the common response that needs to be sent */
478   thread_data_array_t *thread_data_array;
479   local_thread_data_array_t *ltdata;
480   int firsttime=1;
481
482   do {
483     treplyctrl=0;
484     trecvcount = 0;
485     threadnum = 0;
486     treplyretry = 0;
487     thread_data_array = NULL;
488     ltdata = NULL;
489
490     /* Look through all the objects in the transaction record and make piles
491      * for each machine involved in the transaction*/
492     if (firsttime)
493       pile_ptr = pile = createPiles(record);
494     else
495       pile=pile_ptr;
496     firsttime=0;
497
498     /* Create the packet to be sent in TRANS_REQUEST */
499
500     /* Count the number of participants */
501     pilecount = pCount(pile);
502
503     /* Create a list of machine ids(Participants) involved in transaction       */
504     listmid = calloc(pilecount, sizeof(unsigned int));
505     pListMid(pile, listmid);
506
507
508     /* Initialize thread variables,
509      * Spawn a thread for each Participant involved in a transaction */
510     pthread_t thread[pilecount];
511     pthread_attr_t attr;
512     pthread_cond_t tcond;
513     pthread_mutex_t tlock;
514     pthread_mutex_t tlshrd;
515
516     thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t));
517     ltdata = calloc(1, sizeof(local_thread_data_array_t));
518
519     thread_response_t rcvd_control_msg[pilecount];      /* Shared thread array that keeps track of responses of participants */
520
521     /* Initialize and set thread detach attribute */
522     pthread_attr_init(&attr);
523     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
524     pthread_mutex_init(&tlock, NULL);
525     pthread_cond_init(&tcond, NULL);
526
527     /* Process each machine pile */
528     while(pile != NULL) {
529       //Create transaction id
530       newtid++;
531       tosend = calloc(1, sizeof(trans_req_data_t));
532       tosend->f.control = TRANS_REQUEST;
533       sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
534       tosend->f.mcount = pilecount;
535       tosend->f.numread = pile->numread;
536       tosend->f.nummod = pile->nummod;
537       tosend->f.numcreated = pile->numcreated;
538       tosend->f.sum_bytes = pile->sum_bytes;
539       tosend->listmid = listmid;
540       tosend->objread = pile->objread;
541       tosend->oidmod = pile->oidmod;
542       tosend->oidcreated = pile->oidcreated;
543       thread_data_array[threadnum].thread_id = threadnum;
544       thread_data_array[threadnum].mid = pile->mid;
545       thread_data_array[threadnum].buffer = tosend;
546       thread_data_array[threadnum].recvmsg = rcvd_control_msg;
547       thread_data_array[threadnum].threshold = &tcond;
548       thread_data_array[threadnum].lock = &tlock;
549       thread_data_array[threadnum].count = &trecvcount;
550       thread_data_array[threadnum].replyctrl = &treplyctrl;
551       thread_data_array[threadnum].replyretry = &treplyretry;
552       thread_data_array[threadnum].rec = record;
553       /* If local do not create any extra connection */
554       if(pile->mid != myIpAddr) { /* Not local */
555         do {
556           rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);
557         } while(rc!=0);
558         if(rc) {
559           perror("Error in pthread create\n");
560           pthread_cond_destroy(&tcond);
561           pthread_mutex_destroy(&tlock);
562           pDelete(pile_ptr);
563           free(listmid);
564           for (i = 0; i < threadnum; i++)
565             free(thread_data_array[i].buffer);
566           free(thread_data_array);
567           free(ltdata);
568           return 1;
569         }
570       } else { /*Local*/
571         ltdata->tdata = &thread_data_array[threadnum];
572         ltdata->transinfo = &transinfo;
573         do {
574           val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
575         } while(val!=0);
576         if(val) {
577           perror("Error in pthread create\n");
578           pthread_cond_destroy(&tcond);
579           pthread_mutex_destroy(&tlock);
580           pDelete(pile_ptr);
581           free(listmid);
582           for (i = 0; i < threadnum; i++)
583             free(thread_data_array[i].buffer);
584           free(thread_data_array);
585           free(ltdata);
586           return 1;
587         }
588       }
589
590       threadnum++;
591       pile = pile->next;
592     }
593     /* Free attribute and wait for the other threads */
594     pthread_attr_destroy(&attr);
595
596     for (i = 0; i < threadnum; i++) {
597       rc = pthread_join(thread[i], NULL);
598       if(rc){
599         printf("Error: return code from pthread_join() is %d\n", rc);
600         pthread_cond_destroy(&tcond);
601         pthread_mutex_destroy(&tlock);
602         pDelete(pile_ptr);
603         free(listmid);
604         for (j = i; j < threadnum; j++) {
605           free(thread_data_array[j].buffer);
606         }
607         return 1;
608       }
609       free(thread_data_array[i].buffer);
610     }
611
612     /* Free resources */
613     pthread_cond_destroy(&tcond);
614     pthread_mutex_destroy(&tlock);
615     free(listmid);
616
617     if (!treplyretry)
618       pDelete(pile_ptr);
619
620     /* wait a random amount of time before retrying to commit transaction*/
621     if(treplyretry) {
622       free(thread_data_array);
623       free(ltdata);
624       randomdelay();
625     }
626
627     /* Retry trans commit procedure during soft_abort case */
628   } while (treplyretry);
629
630   if(treplyctrl == TRANS_ABORT) {
631 #ifdef TRANSSTATS
632     numTransAbort++;
633 #endif
634     /* Free Resources */
635     objstrDelete(record->cache);
636     chashDelete(record->lookupTable);
637     free(record);
638     free(thread_data_array);
639     free(ltdata);
640     return TRANS_ABORT;
641   } else if(treplyctrl == TRANS_COMMIT) {
642 #ifdef TRANSSTATS
643     numTransCommit++;
644 #endif
645     /* Free Resources */
646     objstrDelete(record->cache);
647     chashDelete(record->lookupTable);
648     free(record);
649     free(thread_data_array);
650     free(ltdata);
651     return 0;
652   } else {
653     //TODO Add other cases
654     printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
655     exit(-1);
656   }
657   return 0;
658 }
659
660 /* This function sends information involved in the transaction request
661  * to participants and accepts a response from particpants.
662  * It calls decideresponse() to decide on what control message
663  * to send next to participants and sends the message using sendResponse()*/
664 void *transRequest(void *threadarg) {
665   int sd, i, n;
666   struct sockaddr_in serv_addr;
667   thread_data_array_t *tdata;
668   objheader_t *headeraddr;
669   char control, recvcontrol;
670   char machineip[16], retval;
671
672   tdata = (thread_data_array_t *) threadarg;
673
674   if((sd = getSock2WithLock(transRequestSockPool, tdata->mid)) < 0) {
675     printf("transRequest(): socket create error\n");
676     pthread_exit(NULL);
677   }
678
679   /* Send bytes of data with TRANS_REQUEST control message */
680   send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
681
682   /* Send list of machines involved in the transaction */
683   {
684     int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
685     send_data(sd, tdata->buffer->listmid, size);
686   }
687
688   /* Send oids and version number tuples for objects that are read */
689   {
690     int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
691     send_data(sd, tdata->buffer->objread, size);
692   }
693
694   /* Send objects that are modified */
695   for(i = 0; i < tdata->buffer->f.nummod ; i++) {
696     int size;
697     if((headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i])) == NULL) {
698       printf("%s() Error: No such oid %s, %d\n", __func__, __FILE__, __LINE__);
699       pthread_exit(NULL);
700     }
701     GETSIZE(size,headeraddr);
702     size+=sizeof(objheader_t);
703     send_data(sd, headeraddr, size);
704   }
705
706   /* Read control message from Participant */
707   recv_data(sd, &control, sizeof(char));
708   /* Recv Objects if participant sends TRANS_DISAGREE */
709 #ifdef CACHE
710   if(control == TRANS_DISAGREE) {
711     int length;
712     recv_data(sd, &length, sizeof(int));
713     void *newAddr;
714     pthread_mutex_lock(&prefetchcache_mutex);
715     if ((newAddr = prefetchobjstrAlloc((unsigned int)length)) == NULL) {
716       printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
717       pthread_exit(NULL);
718     }
719     pthread_mutex_unlock(&prefetchcache_mutex);
720     recv_data(sd, newAddr, length);
721     int offset = 0;
722     while(length != 0) {
723       unsigned int oidToPrefetch;
724       objheader_t * header;
725       header = (objheader_t *)(((char *)newAddr) + offset);
726       oidToPrefetch = OID(header);
727       int size = 0;
728       GETSIZE(size, header);
729       size += sizeof(objheader_t);
730       //make an entry in prefetch hash table
731       void *oldptr;
732       if((oldptr = prehashSearch(oidToPrefetch)) != NULL) {
733         prehashRemove(oidToPrefetch);
734         prehashInsert(oidToPrefetch, header);
735       } else {
736         prehashInsert(oidToPrefetch, header);
737       }
738       length = length - size;
739       offset += size;
740     }
741   }
742 #endif
743
744   recvcontrol = control;
745   /* Update common data structure and increment count */
746   tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
747
748   /* Lock and update count */
749   /* Thread sleeps until all messages from pariticipants are received by coordinator */
750   pthread_mutex_lock(tdata->lock);
751
752   (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
753
754   /* Wake up the threads and invoke decideResponse (once) */
755   if(*(tdata->count) == tdata->buffer->f.mcount) {
756     decideResponse(tdata);
757     pthread_cond_broadcast(tdata->threshold);
758   } else {
759     pthread_cond_wait(tdata->threshold, tdata->lock);
760   }
761   pthread_mutex_unlock(tdata->lock);
762
763   /* clear objects from prefetch cache */
764   /*
765      if(*(tdata->replyctrl) == TRANS_ABORT) {
766      int i;
767      for(i=0; i<tdata->buffer->f.nummod; i++) {
768       unsigned int oid = tdata->buffer->oidmod[i];
769       objheader_t *header;
770       if((header = prehashSearch(oid)) != NULL) {
771         prehashRemove(oid);
772       }
773      }
774      for(i=0; i<tdata->buffer->f.numread; i++) {
775       char *objread = tdata->buffer->objread;
776       unsigned int oid = *((unsigned int *)(objread+(sizeof(unsigned int) +
777                   sizeof(unsigned short))*i));
778       objheader_t *header;
779       if((header = prehashSearch(oid)) != NULL) {
780         prehashRemove(oid);
781       }
782      }
783      }
784    */
785
786 #ifdef CACHE
787   if(*(tdata->replyctrl) == TRANS_COMMIT) {
788     int retval;
789     /* Update prefetch cache */
790     if((retval = updatePrefetchCache(tdata)) != 0) {
791       printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
792       return;
793     }
794
795     /* Invalidate objects in other machine cache */
796     if(tdata->buffer->f.nummod > 0) {
797       if((retval = invalidateObj(tdata)) != 0) {
798         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
799         return;
800       }
801     }
802   }
803 #endif
804
805   /* Send the final response such as TRANS_COMMIT or TRANS_ABORT
806    * to all participants in their respective socket */
807   if (sendResponse(tdata, sd) == 0) {
808     printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
809     pthread_exit(NULL);
810   }
811
812   recv_data((int)sd, &control, sizeof(char));
813
814   if(control == TRANS_UNSUCESSFUL) {
815     //printf("DEBUG-> TRANS_ABORTED\n");
816   } else if(control == TRANS_SUCESSFUL) {
817     //printf("DEBUG-> TRANS_SUCCESSFUL\n");
818   } else {
819     //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
820   }
821   pthread_exit(NULL);
822 }
823
824 /* This function decides the reponse that needs to be sent to
825  * all Participant machines after the TRANS_REQUEST protocol */
826 void decideResponse(thread_data_array_t *tdata) {
827   char control;
828   int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
829                                                                    message to send */
830
831   for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
832     control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
833                                                written onto the shared array */
834     switch(control) {
835     default:
836       printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
837
838       /* treat as disagree, pass thru */
839     case TRANS_DISAGREE:
840       transdisagree++;
841       break;
842
843     case TRANS_AGREE:
844       transagree++;
845       break;
846
847     case TRANS_SOFT_ABORT:
848       transsoftabort++;
849       break;
850     }
851   }
852
853   if(transdisagree > 0) {
854     /* Send Abort */
855     *(tdata->replyctrl) = TRANS_ABORT;
856     *(tdata->replyretry) = 0;
857 #ifdef CACHE
858     /* clear objects from prefetch cache */
859     cleanPCache(tdata);
860 #endif
861   } else if(transagree == tdata->buffer->f.mcount){
862     /* Send Commit */
863     *(tdata->replyctrl) = TRANS_COMMIT;
864     *(tdata->replyretry) = 0;
865 #ifdef CACHE
866 #if 0
867     /* Turn prefetching on */
868     int i;
869     for (i=0; i<numprefetchsites; i++)
870       evalPrefetch[i].operMode = 1;
871 #endif
872 #endif
873   } else {
874     /* Send Abort in soft abort case followed by retry commiting transaction again*/
875     *(tdata->replyctrl) = TRANS_ABORT;
876     *(tdata->replyretry) = 1;
877   }
878   return;
879 }
880
881 /* This function sends the final response to remote machines per
882  * thread in their respective socket id It returns a char that is only
883  * needed to check the correctness of execution of this function
884  * inside transRequest()*/
885
886 char sendResponse(thread_data_array_t *tdata, int sd) {
887   int n, size, sum, oidcount = 0, control;
888   char *ptr, retval = 0;
889   unsigned int *oidnotfound;
890
891   control = *(tdata->replyctrl);
892   send_data(sd, &control, sizeof(char));
893
894   //TODO read missing objects during object migration
895   /* If response is a soft abort due to missing objects at the
896      Participant's side */
897
898   /* If the decided response is TRANS_ABORT */
899   if(*(tdata->replyctrl) == TRANS_ABORT) {
900     retval = TRANS_ABORT;
901   } else if(*(tdata->replyctrl) == TRANS_COMMIT) {
902     /* If the decided response is TRANS_COMMIT */
903     retval = TRANS_COMMIT;
904   }
905
906   return retval;
907 }
908
909 /* This function opens a connection, places an object read request to
910  * the remote machine, reads the control message and object if
911  * available and copies the object and its header to the local
912  * cache. */
913
914 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
915   int size, val;
916   struct sockaddr_in serv_addr;
917   char machineip[16];
918   char control;
919   objheader_t *h;
920   void *objcopy = NULL;
921
922   int sd = getSock2(transReadSockPool, mnum);
923   char readrequest[sizeof(char)+sizeof(unsigned int)];
924   readrequest[0] = READ_REQUEST;
925   *((unsigned int *)(&readrequest[1])) = oid;
926   send_data(sd, readrequest, sizeof(readrequest));
927
928   /* Read response from the Participant */
929   recv_data(sd, &control, sizeof(char));
930
931   if (control==OBJECT_NOT_FOUND) {
932     objcopy = NULL;
933   } else {
934     /* Read object if found into local cache */
935     recv_data(sd, &size, sizeof(int));
936     objcopy = objstrAlloc(record->cache, size);
937     recv_data(sd, objcopy, size);
938     /* Insert into cache's lookup table */
939     chashInsert(record->lookupTable, oid, objcopy);
940   }
941
942   return objcopy;
943 }
944
945 /* This function handles the local objects involved in a transaction
946  * commiting process.  It also makes a decision if this local machine
947  * sends AGREE or DISAGREE or SOFT_ABORT to coordinator.  Note
948  * Coordinator = local machine It wakes up the other threads from
949  * remote participants that are waiting for the coordinator's decision
950  * and based on common agreement it either commits or aborts the
951  * transaction.  It also frees the memory resources */
952
953 void *handleLocalReq(void *threadarg) {
954   unsigned int *oidnotfound = NULL, *oidlocked = NULL;
955   local_thread_data_array_t *localtdata;
956   int numoidnotfound = 0, numoidlocked = 0;
957   int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
958   int numread, i;
959   unsigned int oid;
960   unsigned short version;
961   void *mobj;
962   objheader_t *headptr;
963
964   localtdata = (local_thread_data_array_t *) threadarg;
965
966   /* Counters and arrays to formulate decision on control message to be sent */
967   oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
968   oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
969
970   numread = localtdata->tdata->buffer->f.numread;
971   /* Process each oid in the machine pile/ group per thread */
972   for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
973     if (i < localtdata->tdata->buffer->f.numread) {
974       int incr = sizeof(unsigned int) + sizeof(unsigned short); // Offset that points to next position in the objread array
975       incr *= i;
976       oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
977       version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
978     } else { // Objects Modified
979       int tmpsize;
980       headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
981       if (headptr == NULL) {
982         printf("Error: handleLocalReq() returning NULL, no such oid %s, %d\n", __FILE__, __LINE__);
983         return NULL;
984       }
985       oid = OID(headptr);
986       version = headptr->version;
987     }
988     /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
989
990     /* Save the oids not found and number of oids not found for later use */
991     if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
992       /* Save the oids not found and number of oids not found for later use */
993       oidnotfound[numoidnotfound] = oid;
994       numoidnotfound++;
995     } else { /* If Obj found in machine (i.e. has not moved) */
996       /* Check if Obj is locked by any previous transaction */
997       if (test_and_set(STATUSPTR(mobj))) {
998         if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */
999           v_matchlock++;
1000         } else { /* If versions don't match ...HARD ABORT */
1001           v_nomatch++;
1002           /* Send TRANS_DISAGREE to Coordinator */
1003           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
1004           break;
1005         }
1006       } else {
1007         //we're locked
1008         /* Save all object oids that are locked on this machine during this transaction request call */
1009         oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
1010         numoidlocked++;
1011         if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1012           v_matchnolock++;
1013         } else { /* If versions don't match ...HARD ABORT */
1014           v_nomatch++;
1015           /* Send TRANS_DISAGREE to Coordinator */
1016           localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
1017           break;
1018         }
1019       }
1020     }
1021   } // End for
1022     /* Condition to send TRANS_AGREE */
1023   if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
1024     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
1025   }
1026   /* Condition to send TRANS_SOFT_ABORT */
1027   if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
1028     localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
1029   }
1030
1031   /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1032    * if Participant receives a TRANS_COMMIT */
1033   localtdata->transinfo->objlocked = oidlocked;
1034   localtdata->transinfo->objnotfound = oidnotfound;
1035   localtdata->transinfo->modptr = NULL;
1036   localtdata->transinfo->numlocked = numoidlocked;
1037   localtdata->transinfo->numnotfound = numoidnotfound;
1038   /* Lock and update count */
1039   //Thread sleeps until all messages from pariticipants are received by coordinator
1040   pthread_mutex_lock(localtdata->tdata->lock);
1041   (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
1042
1043   /* Wake up the threads and invoke decideResponse (once) */
1044   if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
1045     decideResponse(localtdata->tdata);
1046     pthread_cond_broadcast(localtdata->tdata->threshold);
1047   } else {
1048     pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
1049   }
1050   pthread_mutex_unlock(localtdata->tdata->lock);
1051   if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
1052     if(transAbortProcess(localtdata) != 0) {
1053       printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1054       fflush(stdout);
1055       pthread_exit(NULL);
1056     }
1057   } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
1058 #ifdef CACHE
1059     /* Invalidate objects in other machine cache */
1060     if(localtdata->tdata->buffer->f.nummod > 0) {
1061       int retval;
1062       if((retval = invalidateObj(localtdata->tdata)) != 0) {
1063         printf("Error: %s() in invalidating Objects %s, %d\n", __func__, __FILE__, __LINE__);
1064         return;
1065       }
1066     }
1067 #endif
1068     if(transComProcess(localtdata) != 0) {
1069       printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1070       fflush(stdout);
1071       pthread_exit(NULL);
1072     }
1073   }
1074   /* Free memory */
1075   if (localtdata->transinfo->objlocked != NULL) {
1076     free(localtdata->transinfo->objlocked);
1077   }
1078   if (localtdata->transinfo->objnotfound != NULL) {
1079     free(localtdata->transinfo->objnotfound);
1080   }
1081   pthread_exit(NULL);
1082 }
1083
1084 /* This function completes the ABORT process if the transaction is aborting */
1085 int transAbortProcess(local_thread_data_array_t  *localtdata) {
1086   int i, numlocked;
1087   unsigned int *objlocked;
1088   void *header;
1089
1090   numlocked = localtdata->transinfo->numlocked;
1091   objlocked = localtdata->transinfo->objlocked;
1092
1093   for (i = 0; i < numlocked; i++) {
1094     if((header = mhashSearch(objlocked[i])) == NULL) {
1095       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1096       return 1;
1097     }
1098     UnLock(STATUSPTR(header));
1099   }
1100
1101   return 0;
1102 }
1103
1104 /*This function completes the COMMIT process if the transaction is commiting*/
1105 int transComProcess(local_thread_data_array_t  *localtdata) {
1106   objheader_t *header, *tcptr;
1107   int i, nummod, tmpsize, numcreated, numlocked;
1108   unsigned int *oidmod, *oidcreated, *oidlocked;
1109   void *ptrcreate;
1110
1111   nummod = localtdata->tdata->buffer->f.nummod;
1112   oidmod = localtdata->tdata->buffer->oidmod;
1113   numcreated = localtdata->tdata->buffer->f.numcreated;
1114   oidcreated = localtdata->tdata->buffer->oidcreated;
1115   numlocked = localtdata->transinfo->numlocked;
1116   oidlocked = localtdata->transinfo->objlocked;
1117
1118   for (i = 0; i < nummod; i++) {
1119     if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1120       printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1121       return 1;
1122     }
1123     /* Copy from transaction cache -> main object store */
1124     if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
1125       printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1126       return 1;
1127     }
1128     GETSIZE(tmpsize, header);
1129     char *tmptcptr = (char *) tcptr;
1130     memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
1131     header->version += 1;
1132     if(header->notifylist != NULL) {
1133       notifyAll(&header->notifylist, OID(header), header->version);
1134     }
1135   }
1136   /* If object is newly created inside transaction then commit it */
1137   for (i = 0; i < numcreated; i++) {
1138     if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
1139       printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
1140       return 1;
1141     }
1142     GETSIZE(tmpsize, header);
1143     tmpsize += sizeof(objheader_t);
1144     pthread_mutex_lock(&mainobjstore_mutex);
1145     if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
1146       printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1147       pthread_mutex_unlock(&mainobjstore_mutex);
1148       return 1;
1149     }
1150     pthread_mutex_unlock(&mainobjstore_mutex);
1151     memcpy(ptrcreate, header, tmpsize);
1152     mhashInsert(oidcreated[i], ptrcreate);
1153     lhashInsert(oidcreated[i], myIpAddr);
1154   }
1155   /* Unlock locked objects */
1156   for(i = 0; i < numlocked; i++) {
1157     if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1158       printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1159       return 1;
1160     }
1161     UnLock(STATUSPTR(header));
1162   }
1163
1164   return 0;
1165 }
1166
1167 prefetchpile_t *foundLocal(char *ptr) {
1168   int siteid = *(GET_SITEID(ptr));
1169   int ntuples = *(GET_NTUPLES(ptr));
1170   unsigned int * oidarray = GET_PTR_OID(ptr);
1171   unsigned short * endoffsets = GET_PTR_EOFF(ptr, ntuples);
1172   short * arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1173   prefetchpile_t * head=NULL;
1174   int numLocal = 0;
1175
1176   int i;
1177   for(i=0; i<ntuples; i++) {
1178     unsigned short baseindex=(i==0) ? 0 : endoffsets[i-1];
1179     unsigned short endindex=endoffsets[i];
1180     unsigned int oid=oidarray[i];
1181     int newbase;
1182     int machinenum;
1183     if (oid==0)
1184       continue;
1185     //Look up fields locally
1186     for(newbase=baseindex; newbase<endindex; newbase++) {
1187       if (!lookupObject(&oid, arryfields[newbase]))
1188         break;
1189       //Ended in a null pointer...
1190       if (oid==0)
1191         goto tuple;
1192     }
1193     //Entire prefetch is local
1194     if (newbase==endindex&&checkoid(oid)){
1195       numLocal++;
1196       goto tuple;
1197     }
1198     //Add to remote requests
1199     machinenum=lhashSearch(oid);
1200     insertPile(machinenum, oid, endindex-newbase, &arryfields[newbase], &head);
1201 tuple:
1202     ;
1203   }
1204
1205   /* handle dynamic prefetching */
1206   handleDynPrefetching(numLocal, ntuples, siteid);
1207   return head;
1208 }
1209
1210 int checkoid(unsigned int oid) {
1211   objheader_t *header;
1212   if ((header=mhashSearch(oid))!=NULL) {
1213     //Found on machine
1214     return 1;
1215   } else if ((header=prehashSearch(oid))!=NULL) {
1216     //Found in cache
1217     return 1;
1218   } else {
1219     return 0;
1220   }
1221 }
1222
1223 int lookupObject(unsigned int * oid, short offset) {
1224   objheader_t *header;
1225   if ((header=mhashSearch(*oid))!=NULL) {
1226     //Found on machine
1227     ;
1228   } else if ((header=prehashSearch(*oid))!=NULL) {
1229     //Found in cache
1230     ;
1231   } else {
1232     return 0;
1233   }
1234
1235   if(TYPE(header) > NUMCLASSES) {
1236     int elementsize = classsize[TYPE(header)];
1237     struct ArrayObject *ao = (struct ArrayObject *) (((char *)header) + sizeof(objheader_t));
1238     int length = ao->___length___;
1239     /* Check if array out of bounds */
1240     if(offset < 0 || offset >= length) {
1241       //if yes treat the object as found
1242       (*oid)=0;
1243       return 1;
1244     }
1245     (*oid) = *((unsigned int *)(((char *)ao) + sizeof(struct ArrayObject) + (elementsize*offset)));
1246     return 1;
1247   } else {
1248     (*oid) = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset));
1249     return 1;
1250   }
1251 }
1252
1253
1254 /* This function is called by the thread calling transPrefetch */
1255 void *transPrefetch(void *t) {
1256   while(1) {
1257     /* lock mutex of primary prefetch queue */
1258     void *node=gettail();
1259     /* Check if the tuples are found locally, if yes then reduce them further*/
1260     /* and group requests by remote machine ids by calling the makePreGroups() */
1261     prefetchpile_t *pilehead = foundLocal(node);
1262
1263     if (pilehead!=NULL) {
1264       // Get sock from shared pool
1265       int sd = getSock2(transPrefetchSockPool, pilehead->mid);
1266
1267       /* Send  Prefetch Request */
1268       prefetchpile_t *ptr = pilehead;
1269       while(ptr != NULL) {
1270         sendPrefetchReq(ptr, sd);
1271         ptr = ptr->next;
1272       }
1273
1274       /* Release socket */
1275       //        freeSock(transPrefetchSockPool, pilehead->mid, sd);
1276
1277       /* Deallocated pilehead */
1278       mcdealloc(pilehead);
1279     }
1280     // Deallocate the prefetch queue pile node
1281     inctail();
1282   }
1283 }
1284
1285 void sendPrefetchReqnew(prefetchpile_t *mcpilenode, int sd) {
1286   objpile_t *tmp;
1287
1288   int size=sizeof(char)+sizeof(int);
1289   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1290     size += sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1291   }
1292
1293   char buft[size];
1294   char *buf=buft;
1295   *buf=TRANS_PREFETCH;
1296   buf+=sizeof(char);
1297
1298   for(tmp=mcpilenode->objpiles; tmp!=NULL; tmp=tmp->next) {
1299     int len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1300     *((int*)buf)=len;
1301     buf+=sizeof(int);
1302     *((unsigned int *)buf)=tmp->oid;
1303     buf+=sizeof(unsigned int);
1304     *((unsigned int *)(buf)) = myIpAddr;
1305     buf+=sizeof(unsigned int);
1306     memcpy(buf, tmp->offset, tmp->numoffset*sizeof(short));
1307     buf+=tmp->numoffset*sizeof(short);
1308   }
1309   *((int *)buf)=-1;
1310   send_data(sd, buft, size);
1311   return;
1312 }
1313
1314 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1315   int len, endpair;
1316   char control;
1317   objpile_t *tmp;
1318
1319   /* Send TRANS_PREFETCH control message */
1320   control = TRANS_PREFETCH;
1321   send_data(sd, &control, sizeof(char));
1322
1323   /* Send Oids and offsets in pairs */
1324   tmp = mcpilenode->objpiles;
1325   while(tmp != NULL) {
1326     len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1327     char oidnoffset[len];
1328     char *buf=oidnoffset;
1329     *((int*)buf) = tmp->numoffset;
1330     buf+=sizeof(int);
1331     *((unsigned int *)buf) = tmp->oid;
1332     buf+=sizeof(unsigned int);
1333     *((unsigned int *)buf) = myIpAddr;
1334     buf += sizeof(unsigned int);
1335     memcpy(buf, tmp->offset, (tmp->numoffset)*sizeof(short));
1336     send_data(sd, oidnoffset, len);
1337     tmp = tmp->next;
1338   }
1339
1340   /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1341   endpair = -1;
1342   send_data(sd, &endpair, sizeof(int));
1343
1344   return;
1345 }
1346
1347 int getPrefetchResponse(int sd) {
1348   int length = 0, size = 0;
1349   char control;
1350   unsigned int oid;
1351   void *modptr, *oldptr;
1352
1353   recv_data((int)sd, &length, sizeof(int));
1354   size = length - sizeof(int);
1355   char recvbuffer[size];
1356
1357   recv_data((int)sd, recvbuffer, size);
1358   control = *((char *) recvbuffer);
1359   if(control == OBJECT_FOUND) {
1360     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1361     size = size - (sizeof(char) + sizeof(unsigned int));
1362     pthread_mutex_lock(&prefetchcache_mutex);
1363     if ((modptr = prefetchobjstrAlloc(size)) == NULL) {
1364       printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1365       pthread_mutex_unlock(&prefetchcache_mutex);
1366       return -1;
1367     }
1368     pthread_mutex_unlock(&prefetchcache_mutex);
1369     memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1370     STATUS(modptr)=0;
1371
1372     /* Insert the oid and its address into the prefetch hash lookup table */
1373     /* Do a version comparison if the oid exists */
1374     if((oldptr = prehashSearch(oid)) != NULL) {
1375       /* If older version then update with new object ptr */
1376       if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1377         prehashRemove(oid);
1378         prehashInsert(oid, modptr);
1379       }
1380     } else { /* Else add the object ptr to hash table*/
1381       prehashInsert(oid, modptr);
1382     }
1383     /* Lock the Prefetch Cache look up table*/
1384     pthread_mutex_lock(&pflookup.lock);
1385     /* Broadcast signal on prefetch cache condition variable */
1386     pthread_cond_broadcast(&pflookup.cond);
1387     /* Unlock the Prefetch Cache look up table*/
1388     pthread_mutex_unlock(&pflookup.lock);
1389   } else if(control == OBJECT_NOT_FOUND) {
1390     oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1391     /* TODO: For each object not found query DHT for new location and retrieve the object */
1392     /* Throw an error */
1393     //printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1394     //    exit(-1);
1395   } else {
1396     printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1397   }
1398
1399   return 0;
1400 }
1401
1402 unsigned short getObjType(unsigned int oid) {
1403   objheader_t *objheader;
1404   unsigned short numoffset[] ={0};
1405   short fieldoffset[] ={};
1406
1407   if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) {
1408 #ifdef CACHE
1409     if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) {
1410 #endif
1411     unsigned int mid = lhashSearch(oid);
1412     int sd = getSock2(transReadSockPool, mid);
1413     char remotereadrequest[sizeof(char)+sizeof(unsigned int)];
1414     remotereadrequest[0] = READ_REQUEST;
1415     *((unsigned int *)(&remotereadrequest[1])) = oid;
1416     send_data(sd, remotereadrequest, sizeof(remotereadrequest));
1417
1418     /* Read response from the Participant */
1419     char control;
1420     recv_data(sd, &control, sizeof(char));
1421
1422     if (control==OBJECT_NOT_FOUND) {
1423       printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
1424       fflush(stdout);
1425       exit(-1);
1426     } else {
1427       /* Read object if found into local cache */
1428       int size;
1429       recv_data(sd, &size, sizeof(int));
1430 #ifdef CACHE
1431       pthread_mutex_lock(&prefetchcache_mutex);
1432       if ((objheader = prefetchobjstrAlloc(size)) == NULL) {
1433         printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
1434         pthread_exit(NULL);
1435       }
1436       pthread_mutex_unlock(&prefetchcache_mutex);
1437       recv_data(sd, objheader, size);
1438       prehashInsert(oid, objheader);
1439       return TYPE(objheader);
1440 #else
1441       char *buffer;
1442       if((buffer = calloc(1, size)) == NULL) {
1443         printf("%s() Calloc Error %s at line %d\n", __func__, __FILE__, __LINE__);
1444         fflush(stdout);
1445         return 0;
1446       }
1447       recv_data(sd, buffer, size);
1448       objheader = (objheader_t *)buffer;
1449       unsigned short type = TYPE(objheader);
1450       free(buffer);
1451       return type;
1452 #endif
1453     }
1454 #ifdef CACHE
1455   }
1456 #endif
1457   }
1458   return TYPE(objheader);
1459 }
1460
1461 int startRemoteThread(unsigned int oid, unsigned int mid) {
1462   int sock;
1463   struct sockaddr_in remoteAddr;
1464   char msg[1 + sizeof(unsigned int)];
1465   int bytesSent;
1466   int status;
1467
1468   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1469     perror("startRemoteThread():socket()");
1470     return -1;
1471   }
1472
1473   bzero(&remoteAddr, sizeof(remoteAddr));
1474   remoteAddr.sin_family = AF_INET;
1475   remoteAddr.sin_port = htons(LISTEN_PORT);
1476   remoteAddr.sin_addr.s_addr = htonl(mid);
1477
1478   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1479     printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1480            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1481     status = -1;
1482   } else
1483   {
1484     msg[0] = START_REMOTE_THREAD;
1485     *((unsigned int *) &msg[1]) = oid;
1486     send_data(sock, msg, 1 + sizeof(unsigned int));
1487   }
1488
1489   close(sock);
1490   return status;
1491 }
1492
1493 //TODO: when reusing oids, make sure they are not already in use!
1494 static unsigned int id = 0xFFFFFFFF;
1495 unsigned int getNewOID(void) {
1496   id += 2;
1497   if (id > oidMax || id < oidMin){
1498     id = (oidMin | 1);
1499   }
1500   return id;
1501 }
1502
1503 int processConfigFile() {
1504   FILE *configFile;
1505   const int maxLineLength = 200;
1506   char lineBuffer[maxLineLength];
1507   char *token;
1508   const char *delimiters = " \t\n";
1509   char *commentBegin;
1510   in_addr_t tmpAddr;
1511
1512   configFile = fopen(CONFIG_FILENAME, "r");
1513   if (configFile == NULL){
1514     printf("error opening %s:\n", CONFIG_FILENAME);
1515     perror("");
1516     return -1;
1517   }
1518
1519   numHostsInSystem = 0;
1520   sizeOfHostArray = 8;
1521   hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1522
1523   while(fgets(lineBuffer, maxLineLength, configFile) != NULL){
1524     commentBegin = strchr(lineBuffer, '#');
1525     if (commentBegin != NULL)
1526       *commentBegin = '\0';
1527     token = strtok(lineBuffer, delimiters);
1528     while (token != NULL){
1529       tmpAddr = inet_addr(token);
1530       if ((int)tmpAddr == -1){
1531         printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1532         fclose(configFile);
1533         return -1;
1534       } else
1535         addHost(htonl(tmpAddr));
1536       token = strtok(NULL, delimiters);
1537     }
1538   }
1539
1540   fclose(configFile);
1541
1542   if (numHostsInSystem < 1){
1543     printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1544     return -1;
1545   }
1546 #ifdef MAC
1547   myIpAddr = getMyIpAddr("en1");
1548 #else
1549   myIpAddr = getMyIpAddr("eth0");
1550 #endif
1551   myIndexInHostArray = findHost(myIpAddr);
1552   if (myIndexInHostArray == -1){
1553     printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1554     return -1;
1555   }
1556   oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1557   oidMin = oidsPerBlock * myIndexInHostArray;
1558   if (myIndexInHostArray == numHostsInSystem - 1)
1559     oidMax = 0xFFFFFFFF;
1560   else
1561     oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1562
1563   return 0;
1564 }
1565
1566 void addHost(unsigned int hostIp) {
1567   unsigned int *tmpArray;
1568
1569   if (findHost(hostIp) != -1)
1570     return;
1571
1572   if (numHostsInSystem == sizeOfHostArray){
1573     tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1574     memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1575     free(hostIpAddrs);
1576     hostIpAddrs = tmpArray;
1577   }
1578
1579   hostIpAddrs[numHostsInSystem++] = hostIp;
1580
1581   return;
1582 }
1583
1584 int findHost(unsigned int hostIp) {
1585   int i;
1586   for (i = 0; i < numHostsInSystem; i++)
1587     if (hostIpAddrs[i] == hostIp)
1588       return i;
1589
1590   //not found
1591   return -1;
1592 }
1593
1594 /* This function sends notification request per thread waiting on object(s) whose version
1595  * changes */
1596 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1597   int sock,i;
1598   objheader_t *objheader;
1599   struct sockaddr_in remoteAddr;
1600   char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1601   char *ptr;
1602   int bytesSent;
1603   int status, size;
1604   unsigned short version;
1605   unsigned int oid,mid;
1606   static unsigned int threadid = 0;
1607   pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1608   pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1609   notifydata_t *ndata;
1610
1611   oid = oidarry[0];
1612   if((mid = lhashSearch(oid)) == 0) {
1613     printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1614     return;
1615   }
1616
1617   if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1618     perror("reqNotify():socket()");
1619     return -1;
1620   }
1621
1622   bzero(&remoteAddr, sizeof(remoteAddr));
1623   remoteAddr.sin_family = AF_INET;
1624   remoteAddr.sin_port = htons(LISTEN_PORT);
1625   remoteAddr.sin_addr.s_addr = htonl(mid);
1626
1627   /* Generate unique threadid */
1628   threadid++;
1629
1630   /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1631   if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1632     printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1633     return -1;
1634   }
1635   ndata->numoid = numoid;
1636   ndata->threadid = threadid;
1637   ndata->oidarry = oidarry;
1638   ndata->versionarry = versionarry;
1639   ndata->threadcond = threadcond;
1640   ndata->threadnotify = threadnotify;
1641   if((status = notifyhashInsert(threadid, ndata)) != 0) {
1642     printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1643     free(ndata);
1644     return -1;
1645   }
1646
1647   /* Send  number of oids, oidarry, version array, machine id and threadid */
1648   if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1649     printf("reqNotify():error %d connecting to %s:%d\n", errno,
1650            inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1651     free(ndata);
1652     return -1;
1653   } else {
1654     msg[0] = THREAD_NOTIFY_REQUEST;
1655     *((unsigned int *)(&msg[1])) = numoid;
1656     /* Send array of oids  */
1657     size = sizeof(unsigned int);
1658     {
1659       i = 0;
1660       while(i < numoid) {
1661         oid = oidarry[i];
1662         *((unsigned int *)(&msg[1] + size)) = oid;
1663         size += sizeof(unsigned int);
1664         i++;
1665       }
1666     }
1667
1668     /* Send array of version  */
1669     {
1670       i = 0;
1671       while(i < numoid) {
1672         version = versionarry[i];
1673         *((unsigned short *)(&msg[1] + size)) = version;
1674         size += sizeof(unsigned short);
1675         i++;
1676       }
1677     }
1678
1679     *((unsigned int *)(&msg[1] + size)) = myIpAddr;
1680     size += sizeof(unsigned int);
1681     *((unsigned int *)(&msg[1] + size)) = threadid;
1682     pthread_mutex_lock(&(ndata->threadnotify));
1683     size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
1684     send_data(sock, msg, size);
1685     pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
1686     pthread_mutex_unlock(&(ndata->threadnotify));
1687   }
1688
1689   pthread_cond_destroy(&threadcond);
1690   pthread_mutex_destroy(&threadnotify);
1691   free(ndata);
1692   close(sock);
1693   return status;
1694 }
1695
1696 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
1697   notifydata_t *ndata;
1698   int i, objIsFound = 0, index;
1699   void *ptr;
1700
1701   //Look up the tid and call the corresponding pthread_cond_signal
1702   if((ndata = notifyhashSearch(tid)) == NULL) {
1703     printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
1704     return;
1705   } else  {
1706     for(i = 0; i < ndata->numoid; i++) {
1707       if(ndata->oidarry[i] == oid){
1708         objIsFound = 1;
1709         index = i;
1710       }
1711     }
1712     if(objIsFound == 0){
1713       printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
1714       return;
1715     } else {
1716       if(version <= ndata->versionarry[index]){
1717         printf("threadNotify(): New version %d has not changed since last version for oid = %d, %s, %d\n", version, oid, __FILE__, __LINE__);
1718         return;
1719       } else {
1720 #ifdef CACHE
1721         /* Clear from prefetch cache and free thread related data structure */
1722         if((ptr = prehashSearch(oid)) != NULL) {
1723           prehashRemove(oid);
1724         }
1725 #endif
1726         pthread_cond_signal(&(ndata->threadcond));
1727       }
1728     }
1729   }
1730   return;
1731 }
1732
1733 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
1734   threadlist_t *ptr;
1735   unsigned int mid;
1736   struct sockaddr_in remoteAddr;
1737   char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
1738   int sock, status, size, bytesSent;
1739
1740   while(*head != NULL) {
1741     ptr = *head;
1742     mid = ptr->mid;
1743     //create a socket connection to that machine
1744     if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1745       perror("notifyAll():socket()");
1746       return -1;
1747     }
1748
1749     bzero(&remoteAddr, sizeof(remoteAddr));
1750     remoteAddr.sin_family = AF_INET;
1751     remoteAddr.sin_port = htons(LISTEN_PORT);
1752     remoteAddr.sin_addr.s_addr = htonl(mid);
1753     //send Thread Notify response and threadid to that machine
1754     if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1755       printf("notifyAll():error %d connecting to %s:%d\n", errno,
1756              inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1757       fflush(stdout);
1758       status = -1;
1759     } else {
1760       bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
1761       msg[0] = THREAD_NOTIFY_RESPONSE;
1762       *((unsigned int *)&msg[1]) = oid;
1763       size = sizeof(unsigned int);
1764       *((unsigned short *)(&msg[1]+ size)) = version;
1765       size+= sizeof(unsigned short);
1766       *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
1767
1768       size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
1769       send_data(sock, msg, size);
1770     }
1771     //close socket
1772     close(sock);
1773     // Update head
1774     *head = ptr->next;
1775     free(ptr);
1776   }
1777   return status;
1778 }
1779
1780 void transAbort(transrecord_t *trans) {
1781   objstrDelete(trans->cache);
1782   chashDelete(trans->lookupTable);
1783   free(trans);
1784 }
1785
1786 /* This function inserts necessary information into
1787  * a machine pile data structure */
1788 plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
1789   plistnode_t *ptr, *tmp;
1790   int found = 0, offset = 0;
1791
1792   tmp = pile;
1793   //Add oid into a machine that is already present in the pile linked list structure
1794   while(tmp != NULL) {
1795     if (tmp->mid == mid) {
1796       int tmpsize;
1797
1798       if (STATUS(headeraddr) & NEW) {
1799         tmp->oidcreated[tmp->numcreated] = OID(headeraddr);
1800         tmp->numcreated++;
1801         GETSIZE(tmpsize, headeraddr);
1802         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
1803       } else if (STATUS(headeraddr) & DIRTY) {
1804         tmp->oidmod[tmp->nummod] = OID(headeraddr);
1805         tmp->nummod++;
1806         GETSIZE(tmpsize, headeraddr);
1807         tmp->sum_bytes += sizeof(objheader_t) + tmpsize;
1808       } else {
1809         offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
1810         *((unsigned int *)(((char *)tmp->objread) + offset))=OID(headeraddr);
1811         offset += sizeof(unsigned int);
1812         *((short *)(((char *)tmp->objread) + offset)) = headeraddr->version;
1813         tmp->numread++;
1814       }
1815       found = 1;
1816       break;
1817     }
1818     tmp = tmp->next;
1819   }
1820   //Add oid for any new machine
1821   if (!found) {
1822     int tmpsize;
1823     if((ptr = pCreate(num_objs)) == NULL) {
1824       return NULL;
1825     }
1826     ptr->mid = mid;
1827     if (STATUS(headeraddr) & NEW) {
1828       ptr->oidcreated[ptr->numcreated] = OID(headeraddr);
1829       ptr->numcreated++;
1830       GETSIZE(tmpsize, headeraddr);
1831       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
1832     } else if (STATUS(headeraddr) & DIRTY) {
1833       ptr->oidmod[ptr->nummod] = OID(headeraddr);
1834       ptr->nummod++;
1835       GETSIZE(tmpsize, headeraddr);
1836       ptr->sum_bytes += sizeof(objheader_t) + tmpsize;
1837     } else {
1838       *((unsigned int *)ptr->objread)=OID(headeraddr);
1839       offset = sizeof(unsigned int);
1840       *((short *)(((char *)ptr->objread) + offset)) = headeraddr->version;
1841       ptr->numread++;
1842     }
1843     ptr->next = pile;
1844     pile = ptr;
1845   }
1846
1847   /* Clear Flags */
1848   STATUS(headeraddr) =0;
1849
1850   return pile;
1851 }