e9d2d90b693843941dd682d8460ad782ee9b3181
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "machinepile.h"
5 #include "mlookup.h"
6 #include "llookup.h"
7 #include "plookup.h"
8 #include "prelookup.h"
9 #include "threadnotify.h"
10 #include "queue.h"
11 #include <pthread.h>
12 #include <sys/types.h>
13 #include <sys/socket.h>
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <sys/types.h>
17 #include <unistd.h>
18 #include <errno.h>
19 #include <time.h>
20 #include <string.h>
21 #ifdef COMPILER
22 #include "thread.h"
23 #endif
24
25 #define LISTEN_PORT 2156
26 #define NUM_THREADS 1
27 #define PREFETCH_CACHE_SIZE 1048576 //1MB
28 #define NUM_MACHINES 2
29 #define CONFIG_FILENAME "dstm.conf"
30
31 /* Global Variables */
32 extern int classsize[];
33 extern primarypfq_t pqueue; //Shared prefetch queue
34 extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
35 objstr_t *prefetchcache; //Global Prefetch cache
36 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
37 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
38 extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
39 extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
40 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
41 pthread_t tPrefetch;            /* Primary Prefetch thread that processes the prefetch queue */
42 extern objstr_t *mainobjstore;
43 unsigned int myIpAddr;
44 unsigned int *hostIpAddrs;
45 int sizeOfHostArray;
46 int numHostsInSystem;
47 int myIndexInHostArray;
48 unsigned int oidsPerBlock;
49 unsigned int oidMin;
50 unsigned int oidMax;
51
52 /* Global variables to track mapping of socketid and remote mid */
53 midSocketInfo_t midSocketArray[NUM_MACHINES];
54 int sockCount = 0;
55 int sockIdFound;
56
57 void printhex(unsigned char *, int);
58 plistnode_t *createPiles(transrecord_t *);
59
60 void printhex(unsigned char *ptr, int numBytes)
61 {
62         int i;
63         for (i = 0; i < numBytes; i++)
64         {
65                 if (ptr[i] < 16)
66                         printf("0%x ", ptr[i]);
67                 else
68                         printf("%x ", ptr[i]);
69         }
70         printf("\n");
71         return;
72 }
73
74 inline int arrayLength(int *array) {
75         int i;
76         for(i=0 ;array[i] != -1; i++)
77                 ;
78         return i;
79 }
80 inline int findmax(int *array, int arraylength) {
81         int max, i;
82         max = array[0];
83         for(i = 0; i < arraylength; i++){
84                 if(array[i] > max) {
85                         max = array[i];
86                 }
87         }
88         return max;
89 }
90 /* This function is a prefetch call generated by the compiler that
91  * populates the shared primary prefetch queue*/
92 void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) {
93         int qnodesize;
94         int len = 0;
95         int i, rc;
96
97         /* Allocate for the queue node*/
98         char *node;
99         if(ntuples > 0) {
100                 qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); 
101                 if((node = calloc(1, qnodesize)) == NULL) {
102                         printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
103                         return;
104                 }
105                 /* Set queue node values */
106                 len = sizeof(prefetchqelem_t);
107                 memcpy(node + len, &ntuples, sizeof(int));
108                 len += sizeof(int);
109                 memcpy(node + len, oids, ntuples*sizeof(unsigned int));
110                 len += ntuples * sizeof(unsigned int);
111                 memcpy(node + len, endoffsets, ntuples*sizeof(unsigned short));
112                 len += ntuples * sizeof(unsigned short);
113                 memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short));
114                 /* Lock and insert into primary prefetch queue */
115                 pthread_mutex_lock(&pqueue.qlock);
116                 pre_enqueue((prefetchqelem_t *)node);
117                 pthread_cond_signal(&pqueue.qcond);
118                 pthread_mutex_unlock(&pqueue.qlock);
119         }
120 }
121
122 /* This function starts up the transaction runtime. */
123 int dstmStartup(const char * option) {
124         pthread_t thread_Listen;
125         pthread_attr_t attr;
126         int master=option!=NULL && strcmp(option, "master")==0;
127
128         if (processConfigFile() != 0)
129                 return 0; //TODO: return error value, cause main program to exit
130 #ifdef COMPILER
131         if (!master)
132           threadcount--;
133 #endif
134
135         dstmInit();
136         transInit();
137
138
139
140         if (master) {
141                 pthread_attr_init(&attr);
142                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
143                 pthread_create(&thread_Listen, &attr, dstmListen, NULL);
144                 return 1;
145         } else {
146                 dstmListen();
147                 return 0;
148         }
149
150 }
151
152 //TODO Use this later
153 void *pCacheAlloc(objstr_t *store, unsigned int size) {
154         void *tmp;
155         objstr_t *ptr;
156         ptr = store;
157         int success = 0;
158
159         while(ptr->next != NULL) {
160                 /* check if store is empty */
161                 if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) {
162                         tmp = ptr->top;
163                         ptr->top += size;
164                         success = 1;
165                         return tmp;
166                 } else {
167                         ptr = ptr-> next;
168                 }
169         }
170
171         if(success == 0) {
172                 return NULL;
173         }
174 }
175
176 /* This function initiates the prefetch thread
177  * A queue is shared between the main thread of execution
178  * and the prefetch thread to process the prefetch call
179  * Call from compiler populates the shared queue with prefetch requests while prefetch thread
180  * processes the prefetch requests */
181 void transInit() {
182         int t, rc;
183         int retval;
184         //Create and initialize prefetch cache structure
185         prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
186         //prefetchcache->next = objstrCreate(PREFETCH_CACHE_SIZE);
187         //prefetchcache->next->next = objstrCreate(PREFETCH_CACHE_SIZE);
188
189         /* Initialize attributes for mutex */
190         pthread_mutexattr_init(&prefetchcache_mutex_attr);
191         pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
192         
193         pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
194
195         //Create prefetch cache lookup table
196         if(prehashCreate(HASH_SIZE, LOADFACTOR))
197                 return; //Failure
198
199         //Initialize primary shared queue
200         queueInit();
201         //Initialize machine pile w/prefetch oids and offsets shared queue
202         mcpileqInit();
203
204         //Create the primary prefetch thread 
205         do {
206           retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
207         } while(retval!=0);
208         pthread_detach(tPrefetch);
209
210         //Initialize mid to socketid mapping array
211         for(t = 0; t < NUM_MACHINES; t++) {
212                 midSocketArray[t].mid = 0;
213                 midSocketArray[t].sockid = 0;
214         }
215
216         //Create and Initialize a pool of threads 
217         /* Threads are active for the entire period runtime is running */
218         for(t = 0; t< NUM_THREADS; t++) {
219           do {
220                 rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
221           } while(rc!=0);
222           pthread_detach(wthreads[t]);
223         }
224 }
225
226 /* This function stops the threads spawned */
227 void transExit() {
228         int t;
229         pthread_cancel(tPrefetch);
230         for(t = 0; t < NUM_THREADS; t++)
231                 pthread_cancel(wthreads[t]);
232
233         return;
234 }
235
236 /* This functions inserts randowm wait delays in the order of msec
237  * Mostly used when transaction commits retry*/
238 void randomdelay()
239 {
240         struct timespec req;
241         time_t t;
242
243         t = time(NULL);
244         req.tv_sec = 0;
245         req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
246         nanosleep(&req, NULL);
247         return;
248 }
249
250 /* This function initializes things required in the transaction start*/
251 transrecord_t *transStart()
252 {
253         transrecord_t *tmp = calloc(1, sizeof(transrecord_t));
254         tmp->cache = objstrCreate(1048576);
255         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
256 #ifdef COMPILER
257         tmp->revertlist=NULL;
258 #endif
259         return tmp;
260 }
261
262 /* This function finds the location of the objects involved in a transaction
263  * and returns the pointer to the object if found in a remote location */
264 objheader_t *transRead(transrecord_t *record, unsigned int oid) {
265         unsigned int machinenumber;
266         objheader_t *tmp, *objheader;
267         objheader_t *objcopy;
268         int size, rc, found = 0;
269         void *buf;
270         struct timespec ts;
271         struct timeval tp;
272
273         if(oid == 0) {
274                 printf("Error: %s, %d oid is NULL \n", __FILE__, __LINE__);
275                 return NULL;
276         }
277         
278         rc = gettimeofday(&tp, NULL);
279
280         /* 1ms delay */
281         tp.tv_usec += 1000;
282         if (tp.tv_usec >= 1000000)
283         {
284                 tp.tv_usec -= 1000000;
285                 tp.tv_sec += 1;
286         }
287         /* Convert from timeval to timespec */
288         ts.tv_sec = tp.tv_sec;
289         ts.tv_nsec = tp.tv_usec * 1000;
290
291         /* Search local transaction cache */
292         if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
293
294 #ifdef COMPILER
295           return &objheader[1];
296 #else
297           return objheader;
298 #endif
299         } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
300                 /* Look up in machine lookup table  and copy  into cache*/
301                 GETSIZE(size, objheader);
302                 size += sizeof(objheader_t);
303                 objcopy = (objheader_t *) objstrAlloc(record->cache, size);
304                 memcpy(objcopy, objheader, size);
305                 /* Insert into cache's lookup table */
306                 chashInsert(record->lookupTable, OID(objheader), objcopy); 
307 #ifdef COMPILER
308                 return &objcopy[1];
309 #else
310                 return objcopy;
311 #endif
312         } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
313                 GETSIZE(size, tmp);
314                 size+=sizeof(objheader_t);
315                 objcopy = (objheader_t *) objstrAlloc(record->cache, size);
316                 memcpy(objcopy, tmp, size);
317                 /* Insert into cache's lookup table */
318                 chashInsert(record->lookupTable, OID(tmp), objcopy); 
319 #ifdef COMPILER
320                 return &objcopy[1];
321 #else
322                 return objcopy;
323 #endif
324         } else {
325                 /*If object not found in prefetch cache then block until object appears in the prefetch cache */
326                 /*
327                 pthread_mutex_lock(&pflookup.lock);
328                 while(!found) {
329                         rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
330                         // Check Prefetch cache again 
331                         if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
332                                 found = 1;
333                                 GETSIZE(size,tmp);
334                                 size+=sizeof(objheader_t);
335                                 objcopy = (objheader_t *) objstrAlloc(record->cache, size);
336                                 memcpy(objcopy, tmp, size);
337                                 chashInsert(record->lookupTable, OID(tmp), objcopy); 
338                                 pthread_mutex_unlock(&pflookup.lock);
339 #ifdef COMPILER
340                                 return &objcopy[1];
341 #else
342                                 return objcopy;
343 #endif
344                         } else if (rc == ETIMEDOUT) {
345                                 pthread_mutex_unlock(&pflookup.lock);
346                                 break;
347                         }
348                 }
349                 */
350
351                 /* Get the object from the remote location */
352                 if((machinenumber = lhashSearch(oid)) == 0) {
353                         printf("Error: %s() No machine found for oid =% %s,%dx\n",__func__, machinenumber, __FILE__, __LINE__);
354                         return NULL;
355                 }
356                 objcopy = getRemoteObj(record, machinenumber, oid);
357                 
358                 if(objcopy == NULL) {
359                         printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
360                         return NULL;
361                 } else {
362
363 #ifdef COMPILER
364                         return &objcopy[1];
365 #else
366                         return objcopy;
367 #endif
368                 }
369         }
370 }
371
372 /* This function creates objects in the transaction record */
373 objheader_t *transCreateObj(transrecord_t *record, unsigned int size)
374 {
375         objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
376         tmp->notifylist = NULL;
377         OID(tmp) = getNewOID();
378         tmp->version = 1;
379         tmp->rcount = 1;
380         STATUS(tmp) = NEW;
381         chashInsert(record->lookupTable, OID(tmp), tmp);
382
383 #ifdef COMPILER
384         return &tmp[1]; //want space after object header
385 #else
386         return tmp;
387 #endif
388 }
389
390 /* This function creates machine piles based on all machines involved in a
391  * transaction commit request */
392 plistnode_t *createPiles(transrecord_t *record) {
393         int i = 0;
394         unsigned int size;/* Represents number of bins in the chash table */
395         chashlistnode_t *curr, *ptr, *next;
396         plistnode_t *pile = NULL;
397         unsigned int machinenum;
398         void *localmachinenum;
399         objheader_t *headeraddr;
400
401         ptr = record->lookupTable->table;
402         size = record->lookupTable->size;
403
404         for(i = 0; i < size ; i++) {
405                 curr = &ptr[i];
406                 /* Inner loop to traverse the linked list of the cache lookupTable */
407                 while(curr != NULL) {
408                         //if the first bin in hash table is empty
409                         if(curr->key == 0) {
410                                 break;
411                         }
412                         next = curr->next;
413
414                         if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
415                                 printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
416                                 return NULL;
417                         }
418
419                         //Get machine location for object id (and whether local or not)
420                         if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) {
421                                 machinenum = myIpAddr;
422                         } else  if ((machinenum = lhashSearch(curr->key)) == 0) {
423                                 printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
424                                 return NULL;
425                         }
426
427                         //Make machine groups
428                         if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
429                                 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
430                                 return NULL;
431                         }
432
433                         curr = next;
434                 }
435         }
436         return pile; 
437 }
438
439 /* This function initiates the transaction commit process
440  * Spawns threads for each of the new connections with Participants 
441  * and creates new piles by calling the createPiles(), 
442  * Sends a transrequest() to each remote machines for objects found remotely 
443  * and calls handleLocalReq() to process objects found locally */
444 int transCommit(transrecord_t *record) {        
445         unsigned int tot_bytes_mod, *listmid;
446         plistnode_t *pile, *pile_ptr;
447         int i, j, rc, val;
448         int pilecount, offset, threadnum = 0, trecvcount = 0;
449         char control;
450         char transid[TID_LEN];
451         trans_req_data_t *tosend;
452         trans_commit_data_t transinfo;
453         static int newtid = 0;
454         char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
455         char localstat = 0;
456         thread_data_array_t *thread_data_array;
457         local_thread_data_array_t *ltdata;
458
459         do { 
460                 trecvcount = 0; 
461                 threadnum = 0; 
462                 treplyretry = 0;
463                 thread_data_array = NULL;
464                 ltdata = NULL;
465
466                 /* Look through all the objects in the transaction record and make piles 
467                  * for each machine involved in the transaction*/
468                 pile_ptr = pile = createPiles(record);
469
470                 /* Create the packet to be sent in TRANS_REQUEST */
471
472                 /* Count the number of participants */
473                 pilecount = pCount(pile);
474
475                 /* Create a list of machine ids(Participants) involved in transaction   */
476                 if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
477                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
478                         return 1;
479                 }               
480                 pListMid(pile, listmid);
481
482
483                 /* Initialize thread variables,
484                  * Spawn a thread for each Participant involved in a transaction */
485                 pthread_t thread[pilecount];
486                 pthread_attr_t attr;                    
487                 pthread_cond_t tcond;
488                 pthread_mutex_t tlock;
489                 pthread_mutex_t tlshrd;
490
491                 if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
492                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
493                         pthread_cond_destroy(&tcond);
494                         pthread_mutex_destroy(&tlock);
495                         pDelete(pile_ptr);
496                         free(listmid);
497                         return 1;
498                 }
499
500                 if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
501                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
502                         pthread_cond_destroy(&tcond);
503                         pthread_mutex_destroy(&tlock);
504                         pDelete(pile_ptr);
505                         free(listmid);
506                         free(thread_data_array);
507                         return 1;
508                 }
509
510                 thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
511
512                 /* Initialize and set thread detach attribute */
513                 pthread_attr_init(&attr);
514                 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
515                 pthread_mutex_init(&tlock, NULL);
516                 pthread_cond_init(&tcond, NULL);
517
518                 /* Process each machine pile */
519                 while(pile != NULL) {
520                         //Create transaction id
521                         newtid++;
522                         if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
523                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
524                                 pthread_cond_destroy(&tcond);
525                                 pthread_mutex_destroy(&tlock);
526                                 pDelete(pile_ptr);
527                                 free(listmid);
528                                 free(thread_data_array);
529                                 free(ltdata);
530                                 return 1;
531                         }
532                         tosend->f.control = TRANS_REQUEST;
533                         sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
534                         tosend->f.mcount = pilecount;
535                         tosend->f.numread = pile->numread;
536                         tosend->f.nummod = pile->nummod;
537                         tosend->f.numcreated = pile->numcreated;
538                         tosend->f.sum_bytes = pile->sum_bytes;
539                         tosend->listmid = listmid;
540                         tosend->objread = pile->objread;
541                         tosend->oidmod = pile->oidmod;
542                         tosend->oidcreated = pile->oidcreated;
543                         thread_data_array[threadnum].thread_id = threadnum;
544                         thread_data_array[threadnum].mid = pile->mid;
545                         thread_data_array[threadnum].buffer = tosend;
546                         thread_data_array[threadnum].recvmsg = rcvd_control_msg;
547                         thread_data_array[threadnum].threshold = &tcond;
548                         thread_data_array[threadnum].lock = &tlock;
549                         thread_data_array[threadnum].count = &trecvcount;
550                         thread_data_array[threadnum].replyctrl = &treplyctrl;
551                         thread_data_array[threadnum].replyretry = &treplyretry;
552                         thread_data_array[threadnum].rec = record;
553                         /* If local do not create any extra connection */
554                         if(pile->mid != myIpAddr) { /* Not local */
555                                 do {
556                                         rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
557                                 } while(rc!=0);
558                                 if(rc) {
559                                         perror("Error in pthread create\n");
560                                         pthread_cond_destroy(&tcond);
561                                         pthread_mutex_destroy(&tlock);
562                                         pDelete(pile_ptr);
563                                         free(listmid);
564                                         for (i = 0; i < threadnum; i++)
565                                                 free(thread_data_array[i].buffer);
566                                         free(thread_data_array);
567                                         free(ltdata);
568                                         return 1;
569                                 }
570                         } else { /*Local*/
571                                 ltdata->tdata = &thread_data_array[threadnum];
572                                 ltdata->transinfo = &transinfo;
573                                 do {
574                                         val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
575                                 } while(val!=0);
576                                 if(val) {
577                                         perror("Error in pthread create\n");
578                                         pthread_cond_destroy(&tcond);
579                                         pthread_mutex_destroy(&tlock);
580                                         pDelete(pile_ptr);
581                                         free(listmid);
582                                         for (i = 0; i < threadnum; i++)
583                                                 free(thread_data_array[i].buffer);
584                                         free(thread_data_array);
585                                         free(ltdata);
586                                         return 1;
587                                 }
588                         }
589
590                         threadnum++;            
591                         pile = pile->next;
592                 }
593                 /* Free attribute and wait for the other threads */
594                 pthread_attr_destroy(&attr);
595
596                 for (i = 0; i < threadnum; i++) {
597                         rc = pthread_join(thread[i], NULL);
598                         if(rc)
599                         {
600                                 printf("Error: return code from pthread_join() is %d\n", rc);
601                                 pthread_cond_destroy(&tcond);
602                                 pthread_mutex_destroy(&tlock);
603                                 pDelete(pile_ptr);
604                                 free(listmid);
605                                 for (j = i; j < threadnum; j++) {
606                                         free(thread_data_array[j].buffer);
607                                 }
608                                 return 1;
609                         }
610                         free(thread_data_array[i].buffer);
611                 }
612
613                 /* Free resources */    
614                 pthread_cond_destroy(&tcond);
615                 pthread_mutex_destroy(&tlock);
616                 free(listmid);
617                 pDelete(pile_ptr);
618
619                 /* wait a random amount of time before retrying to commit transaction*/
620                 if(treplyretry == 1) {
621                         free(thread_data_array);
622                         free(ltdata);
623                         randomdelay();
624                 }
625
626         /* Retry trans commit procedure during soft_abort case */
627         } while (treplyretry == 1);
628
629
630         if(treplyctrl == TRANS_ABORT) {
631                 /* Free Resources */
632                 objstrDelete(record->cache);
633                 chashDelete(record->lookupTable);
634                 free(record);
635                 free(thread_data_array);
636                 free(ltdata);
637                 return TRANS_ABORT;
638         } else if(treplyctrl == TRANS_COMMIT) {
639                 /* Free Resources */
640                 objstrDelete(record->cache);
641                 chashDelete(record->lookupTable);
642                 free(record);
643                 free(thread_data_array);
644                 free(ltdata);
645                 return 0;
646         } else {
647                 //TODO Add other cases
648                 printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
649                 exit(-1);
650         }
651
652         return 0;
653 }
654
655 /* This function sends information involved in the transaction request 
656  * to participants and accepts a response from particpants.
657  * It calls decideresponse() to decide on what control message 
658  * to send next to participants and sends the message using sendResponse()*/
659 void *transRequest(void *threadarg) {
660         int sd, i, n;
661         struct sockaddr_in serv_addr;
662         thread_data_array_t *tdata;
663         objheader_t *headeraddr;
664         char control, recvcontrol;
665         char machineip[16], retval;
666
667         tdata = (thread_data_array_t *) threadarg;
668
669         /* Send Trans Request */
670         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
671                 perror("Error in socket for TRANS_REQUEST\n");
672                 pthread_exit(NULL);
673         }
674         bzero((char*) &serv_addr, sizeof(serv_addr));
675         serv_addr.sin_family = AF_INET;
676         serv_addr.sin_port = htons(LISTEN_PORT);
677         midtoIP(tdata->mid,machineip);
678         machineip[15] = '\0';
679         serv_addr.sin_addr.s_addr = inet_addr(machineip);
680         /* Open Connection */
681         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
682                 perror("Error in connect for TRANS_REQUEST\n");
683                 close(sd);
684                 pthread_exit(NULL);
685         }
686
687         /* Send bytes of data with TRANS_REQUEST control message */
688         if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
689                 perror("Error sending fixed bytes for thread\n");
690                 close(sd);
691                 pthread_exit(NULL);
692         }
693
694         /* Send list of machines involved in the transaction */
695         {
696                 int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
697                 if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
698                         perror("Error sending list of machines for thread\n");
699                         close(sd);
700                         pthread_exit(NULL);
701                 }
702         }
703
704         /* Send oids and version number tuples for objects that are read */
705         {
706                 int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
707                 
708                 if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
709                         perror("Error sending tuples for thread\n");
710                         close(sd);
711                         pthread_exit(NULL);
712                 }
713         }
714
715         /* Send objects that are modified */
716         for(i = 0; i < tdata->buffer->f.nummod ; i++) {
717                 int size;
718                 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
719                 GETSIZE(size,headeraddr);
720                 size+=sizeof(objheader_t);
721                 if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
722                         perror("Error sending obj modified for thread\n");
723                         close(sd);
724                         pthread_exit(NULL);
725                 }
726         }
727
728         /* Read control message from Participant */
729         if((n = read(sd, &control, sizeof(char))) <= 0) {
730                 perror("Error in reading control message from Participant\n");
731                 close(sd);
732                 pthread_exit(NULL);
733         }
734
735         recvcontrol = control;
736
737         /* Update common data structure and increment count */
738         tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
739
740         /* Lock and update count */
741         /* Thread sleeps until all messages from pariticipants are received by coordinator */
742         pthread_mutex_lock(tdata->lock);
743
744         (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
745
746         /* Wake up the threads and invoke decideResponse (once) */
747         if(*(tdata->count) == tdata->buffer->f.mcount) {
748                 decideResponse(tdata); 
749                 pthread_cond_broadcast(tdata->threshold);
750         } else {
751                 pthread_cond_wait(tdata->threshold, tdata->lock);
752         }
753         pthread_mutex_unlock(tdata->lock);
754
755         /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
756          * to all participants in their respective socket */
757         if (sendResponse(tdata, sd) == 0) { 
758                 printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
759                 close(sd);
760                 pthread_exit(NULL);
761         }
762
763         do {
764            retval = recv((int)sd, &control, sizeof(char), 0);
765         } while (retval < sizeof(char));
766
767         if(control == TRANS_UNSUCESSFUL) {
768                 //printf("DEBUG-> TRANS_ABORTED\n");
769         } else if(control == TRANS_SUCESSFUL) {
770                 //printf("DEBUG-> TRANS_SUCCESSFUL\n");
771         } else {
772                 //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control);
773         }
774
775         /* Close connection */
776         close(sd);
777         pthread_exit(NULL);
778 }
779
780 /* This function decides the reponse that needs to be sent to 
781  * all Participant machines after the TRANS_REQUEST protocol */
782 void decideResponse(thread_data_array_t *tdata) {
783         char control;
784         int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
785                                                                          message to send */
786
787         for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
788                 control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
789                                                            written onto the shared array */
790                 switch(control) {
791                         default:
792                                 printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
793                                 /* treat as disagree, pass thru */
794                         case TRANS_DISAGREE:
795                                 transdisagree++;
796                                 break;
797
798                         case TRANS_AGREE:
799                                 transagree++;
800                                 break;
801
802                         case TRANS_SOFT_ABORT:
803                                 transsoftabort++;
804                                 break;
805                 }
806         }
807
808         if(transdisagree > 0) {
809                 /* Send Abort */
810                 *(tdata->replyctrl) = TRANS_ABORT;
811                 *(tdata->replyretry) = 0;
812                 /* clear objects from prefetch cache */
813                 for (i = 0; i < tdata->buffer->f.numread; i++)
814                         prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
815                 for (i = 0; i < tdata->buffer->f.nummod; i++)
816                         prehashRemove(tdata->buffer->oidmod[i]);
817         } else if(transagree == tdata->buffer->f.mcount){
818                 /* Send Commit */
819                 *(tdata->replyctrl) = TRANS_COMMIT;
820                 *(tdata->replyretry) = 0;
821         } else { 
822                 /* Send Abort in soft abort case followed by retry commiting transaction again*/
823                 *(tdata->replyctrl) = TRANS_ABORT;
824                 *(tdata->replyretry) = 1;
825         }
826
827         return;
828 }
829 /* This function sends the final response to remote machines per thread in their respective socket id 
830  * It returns a char that is only needed to check the correctness of execution of this function inside
831  * transRequest()*/
832 char sendResponse(thread_data_array_t *tdata, int sd) {
833         int n, N, sum, oidcount = 0, control;
834         char *ptr, retval = 0;
835         unsigned int *oidnotfound;
836
837         control = *(tdata->replyctrl);
838         if (send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
839                 perror("Error sending ctrl message for participant\n");
840                 return 0;
841         }
842
843         //FIXME read missing objects 
844         /* If the decided response is due to a soft abort and missing objects at the Participant's side */
845         /*
846         if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
847                 // Read list of objects missing  
848                 if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
849                         N = oidcount * sizeof(unsigned int);
850                         if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
851                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
852                                 return 0;
853                         }
854                         ptr = (char *) oidnotfound;
855                         do {
856                                 n = read(sd, ptr+sum, N-sum);
857                                 sum += n;
858                         } while(sum < N && n !=0);
859                 }
860                 retval =  TRANS_SOFT_ABORT;
861         }
862         */
863
864         /* If the decided response is TRANS_ABORT */
865         if(*(tdata->replyctrl) == TRANS_ABORT) {
866                 retval = TRANS_ABORT;
867         } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ 
868                 retval = TRANS_COMMIT;
869         }
870         
871         return retval;
872 }
873
874 /* This function opens a connection, places an object read request to the 
875  * remote machine, reads the control message and object if available  and 
876  * copies the object and its header to the local cache.
877  * */ 
878
879 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
880         int sd, size, val;
881         struct sockaddr_in serv_addr;
882         char machineip[16];
883         char control;
884         objheader_t *h;
885         void *objcopy;
886
887         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
888                 perror("Error in socket\n");
889                 return NULL;
890         }
891
892         bzero((char*) &serv_addr, sizeof(serv_addr));
893         serv_addr.sin_family = AF_INET;
894         serv_addr.sin_port = htons(LISTEN_PORT);
895         midtoIP(mnum,machineip);
896         machineip[15] = '\0';
897         serv_addr.sin_addr.s_addr = inet_addr(machineip);
898
899         // Open connection 
900         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
901                 perror("getRemoteObj() Error in connect\n");
902                 return NULL;
903         }
904
905         char readrequest[sizeof(char)+sizeof(unsigned int)];
906         readrequest[0] = READ_REQUEST;
907         *((unsigned int *)(&readrequest[1])) = oid;
908         if (send(sd, readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
909                 perror("getRemoteObj(): error sending message\n");
910                 return NULL;
911         }
912
913         /* Read response from the Participant */
914         if((val = read(sd, &control, sizeof(char))) <= 0) {
915                 printf("getRemoteObj(): error no response, %d\n", val);
916                 return NULL;
917         }
918
919         switch(control) {
920                 case OBJECT_NOT_FOUND:
921                         return NULL;
922                 case OBJECT_FOUND:
923                         /* Read object if found into local cache */
924                         if((val = read(sd, &size, sizeof(int))) <= 0) {
925                                 perror("getRemoteObj(): error in reading size\n");
926                                 return NULL;
927                         }
928                         objcopy = objstrAlloc(record->cache, size);
929                         int sum = 0;
930                         while (sum < size) {
931                                 sum += read(sd, (char *)objcopy+sum, size-sum);
932                         }
933                         /* Insert into cache's lookup table */
934                         chashInsert(record->lookupTable, oid, objcopy); 
935                         break;
936                 default:
937                         printf("Error: in recv response from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
938                         return NULL;
939         }
940
941         //Close connection 
942         close(sd);
943         return objcopy;
944 }
945
946 /* This function handles the local objects involved in a transaction commiting process.
947  * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator.
948  * Note Coordinator = local machine
949  * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and
950  * based on common agreement it either commits or aborts the transaction.
951  * It also frees the memory resources */
952 void *handleLocalReq(void *threadarg) {
953         unsigned int *oidnotfound = NULL, *oidlocked = NULL;
954         local_thread_data_array_t *localtdata;
955         int objnotfound = 0, objlocked = 0; 
956         int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
957         int numread, i;
958         unsigned int oid;
959         unsigned short version;
960         void *mobj;
961         objheader_t *headptr;
962
963         localtdata = (local_thread_data_array_t *) threadarg;
964
965         /* Counters and arrays to formulate decision on control message to be sent */
966         oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
967         oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
968
969         numread = localtdata->tdata->buffer->f.numread;
970         /* Process each oid in the machine pile/ group per thread */
971         for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
972                 if (i < localtdata->tdata->buffer->f.numread) {
973                         int incr = sizeof(unsigned int) + sizeof(unsigned short);// Offset that points to next position in the objread array
974                         incr *= i;
975                         oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr));
976                         version = *((unsigned short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int)));
977                 } else { // Objects Modified
978                         int tmpsize;
979                         headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
980                         if (headptr == NULL) {
981                                 printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__);
982                                 return NULL;
983                         }
984                         oid = OID(headptr);
985                         version = headptr->version;
986                 }
987                 /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
988
989                 /* Save the oids not found and number of oids not found for later use */
990                 if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
991                         /* Save the oids not found and number of oids not found for later use */
992                         oidnotfound[objnotfound] = oid;
993                         objnotfound++;
994                 } else { /* If Obj found in machine (i.e. has not moved) */
995                         /* Check if Obj is locked by any previous transaction */
996                         if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
997                                 if (version == ((objheader_t *)mobj)->version) {      /* If locked then match versions */ 
998                                         v_matchlock++;
999                                 } else {/* If versions don't match ...HARD ABORT */
1000                                         v_nomatch++;
1001                                         /* Send TRANS_DISAGREE to Coordinator */
1002                                         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
1003                                 }
1004                         } else {/* If Obj is not locked then lock object */
1005                                 STATUS(((objheader_t *)mobj)) |= LOCK;
1006                                 /* Save all object oids that are locked on this machine during this transaction request call */
1007                                 oidlocked[objlocked] = OID(((objheader_t *)mobj));
1008                                 objlocked++;
1009                                 if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
1010                                         v_matchnolock++;
1011                                 } else { /* If versions don't match ...HARD ABORT */
1012                                         v_nomatch++;
1013                                         /* Send TRANS_DISAGREE to Coordinator */
1014                                         localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
1015                                 }
1016                         }
1017                 }
1018         } // End for
1019         /* Condition to send TRANS_AGREE */
1020         if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
1021                 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
1022         }
1023         /* Condition to send TRANS_SOFT_ABORT */
1024         if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
1025                 localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
1026         }
1027
1028         /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
1029          * if Participant receives a TRANS_COMMIT */
1030         localtdata->transinfo->objlocked = oidlocked;
1031         localtdata->transinfo->objnotfound = oidnotfound;
1032         localtdata->transinfo->modptr = NULL;
1033         localtdata->transinfo->numlocked = objlocked;
1034         localtdata->transinfo->numnotfound = objnotfound;
1035         /* Lock and update count */
1036         //Thread sleeps until all messages from pariticipants are received by coordinator
1037         pthread_mutex_lock(localtdata->tdata->lock);
1038         (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
1039
1040         /* Wake up the threads and invoke decideResponse (once) */
1041         if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
1042                 decideResponse(localtdata->tdata); 
1043                 pthread_cond_broadcast(localtdata->tdata->threshold);
1044         } else {
1045                 pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
1046         }
1047         pthread_mutex_unlock(localtdata->tdata->lock);
1048         if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
1049                 if(transAbortProcess(localtdata) != 0) {
1050                         printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
1051                         pthread_exit(NULL);
1052                 }
1053         } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
1054                 if(transComProcess(localtdata) != 0) {
1055                         printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
1056                         pthread_exit(NULL);
1057                 }
1058         }
1059         /* Free memory */
1060         if (localtdata->transinfo->objlocked != NULL) {
1061                 free(localtdata->transinfo->objlocked);
1062         }
1063         if (localtdata->transinfo->objnotfound != NULL) {
1064                 free(localtdata->transinfo->objnotfound);
1065         }
1066
1067         pthread_exit(NULL);
1068 }
1069
1070 /* This function completes the ABORT process if the transaction is aborting */
1071 int transAbortProcess(local_thread_data_array_t  *localtdata) {
1072         int i, numlocked;
1073         unsigned int *objlocked;
1074         void *header;
1075
1076         numlocked = localtdata->transinfo->numlocked;
1077         objlocked = localtdata->transinfo->objlocked;
1078
1079         for (i = 0; i < numlocked; i++) {
1080                 if((header = mhashSearch(objlocked[i])) == NULL) {
1081                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1082                         return 1;
1083                 }
1084                 STATUS(((objheader_t *)header)) &= ~(LOCK);
1085         }
1086
1087         return 0;
1088 }
1089
1090 /*This function completes the COMMIT process is the transaction is commiting*/
1091 int transComProcess(local_thread_data_array_t  *localtdata) {
1092         objheader_t *header, *tcptr;
1093         int i, nummod, tmpsize, numcreated, numlocked;
1094         unsigned int *oidmod, *oidcreated, *oidlocked;
1095         void *ptrcreate;
1096
1097         nummod = localtdata->tdata->buffer->f.nummod;
1098         oidmod = localtdata->tdata->buffer->oidmod;
1099         numcreated = localtdata->tdata->buffer->f.numcreated;
1100         oidcreated = localtdata->tdata->buffer->oidcreated;
1101         numlocked = localtdata->transinfo->numlocked;
1102         oidlocked = localtdata->transinfo->objlocked;
1103
1104         for (i = 0; i < nummod; i++) {
1105                 if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
1106                         printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1107                         return 1;
1108                 }
1109                 /* Copy from transaction cache -> main object store */
1110                 if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
1111                         printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1112                         return 1;
1113                 }
1114                 GETSIZE(tmpsize, header);
1115                 pthread_mutex_lock(&mainobjstore_mutex);
1116                 memcpy((char*)header+sizeof(objheader_t), (char *)tcptr+ sizeof(objheader_t), tmpsize);
1117                 header->version += 1;
1118                 if(header->notifylist != NULL) {
1119                         notifyAll(&header->notifylist, OID(header), header->version);
1120                 }
1121                 pthread_mutex_unlock(&mainobjstore_mutex);
1122         }
1123         /* If object is newly created inside transaction then commit it */
1124         for (i = 0; i < numcreated; i++) {
1125                 if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
1126                         printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
1127                         return 1;
1128                 }
1129                 GETSIZE(tmpsize, header);
1130                 tmpsize += sizeof(objheader_t);
1131                 pthread_mutex_lock(&mainobjstore_mutex);
1132                 if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
1133                         printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__);
1134                         pthread_mutex_unlock(&mainobjstore_mutex);
1135                         return 1;
1136                 }
1137                 pthread_mutex_unlock(&mainobjstore_mutex);
1138                 memcpy(ptrcreate, header, tmpsize);
1139                 mhashInsert(oidcreated[i], ptrcreate);
1140                 lhashInsert(oidcreated[i], myIpAddr);
1141         }
1142         /* Unlock locked objects */
1143         for(i = 0; i < numlocked; i++) {
1144                 if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
1145                         printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
1146                         return 1;
1147                 }
1148                 STATUS(header) &= ~(LOCK);
1149         }
1150
1151         return 0;
1152 }
1153
1154 /* This function checks if the prefetch oids are same and have same offsets  
1155  * for case x.a.b and y.a.b where x and y have same oid's
1156  * or if a.b.c is a subset of x.b.c.d*/ 
1157 /* check for case where the generated request a.y.z or x.y.z.g then 
1158  * prefetch needs to be generated for x.y.z.g  if oid of a and x are same*/
1159 void checkPrefetchTuples(prefetchqelem_t *node) {
1160         int i,j, count,k, sindex, index;
1161         char *ptr, *tmp;
1162         int ntuples, slength;
1163         unsigned int *oid;
1164         unsigned short *endoffsets;
1165         short *arryfields; 
1166
1167         /* Check for the case x.y.z and a.b.c are same oids */ 
1168         ptr = (char *) node;
1169         ntuples = *(GET_NTUPLES(ptr));
1170         oid = GET_PTR_OID(ptr);
1171         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1172         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1173         
1174         /* Find offset length for each tuple */
1175         int numoffset[ntuples];
1176         numoffset[0] = endoffsets[0];
1177         for(i = 1; i<ntuples; i++) {
1178                 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1179         }
1180         /* Check for redundant tuples by comparing oids of each tuple */
1181         for(i = 0; i < ntuples; i++) {
1182                 if(oid[i] == 0)
1183                         continue;
1184                 for(j = i+1 ; j < ntuples; j++) {
1185                         if(oid[j] == 0)
1186                                 continue;
1187                         /*If oids of tuples match */ 
1188                         if (oid[i] == oid[j]) {
1189                                 /* Find the smallest offset length of two tuples*/
1190                                 if(numoffset[i] >  numoffset[j]){
1191                                         slength = numoffset[j];
1192                                         sindex = j;
1193                                 }
1194                                 else {
1195                                         slength = numoffset[i];
1196                                         sindex = i;
1197                                 }
1198
1199                                 /* Compare the offset values based on the current indices
1200                                  * break if they do not match
1201                                  * if all offset values match then pick the largest tuple*/
1202
1203                                 if(i == 0) {
1204                                         k = 0;
1205                                 } else {
1206                                         k = endoffsets[i-1];
1207                                 }
1208                                 index = endoffsets[j -1];
1209                                 for(count = 0; count < slength; count ++) {
1210                                         if (arryfields[k] != arryfields[index]) { 
1211                                                 break;
1212                                         }
1213                                         index++;
1214                                         k++;
1215                                 }       
1216                                 if(slength == count) {
1217                                         oid[sindex] = 0;
1218                                 }
1219                         }
1220                 }
1221         }
1222 }
1223 /* This function makes machine piles to be added into the machine pile queue for each prefetch call */
1224 prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) {
1225         char *ptr;
1226         int ntuples, i, machinenum, count=0;
1227         unsigned int *oid;
1228         unsigned short *endoffsets;
1229         short *arryfields, *offset; 
1230         prefetchpile_t *head = NULL, *tmp = NULL;
1231
1232         /* Check for the case x.y.z and a.b.c are same oids */ 
1233         ptr = (char *) node;
1234         ntuples = *(GET_NTUPLES(ptr));
1235         oid = GET_PTR_OID(ptr);
1236         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1237         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1238
1239         if((head = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
1240                 printf("Calloc error: %s %d\n", __FILE__, __LINE__);
1241                 return NULL;
1242         }
1243
1244         /* Check for redundant tuples by comparing oids of each tuple */
1245         for(i = 0; i < ntuples; i++) {
1246                 if(oid[i] == 0){
1247                         if(head->next != NULL) {
1248                                 if((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
1249                                         printf("Calloc error: %s %d\n", __FILE__, __LINE__);
1250                                         return NULL;
1251                                 }
1252                                 tmp->mid = myIpAddr;
1253                                 tmp->next = head;
1254                                 head = tmp;
1255                         } else {
1256                                 head->mid = myIpAddr;
1257                         }
1258                         continue;
1259                 }
1260                 /* For each tuple make piles */
1261                 if ((machinenum = lhashSearch(oid[i])) == 0) {
1262                         printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
1263                         return NULL;
1264                 }
1265                 /* Insert into machine pile */
1266                 if(i == 0){
1267                         offset = &arryfields[0];
1268                 } else {
1269                         offset = &arryfields[endoffsets[i-1]];
1270                 }
1271
1272                 if((head = insertPile(machinenum, oid[i], numoffset[i], offset, head)) == NULL){
1273                         printf("Error: Couldn't create a pile %s, %d\n", __FILE__, __LINE__);
1274                         return NULL;
1275                 }
1276         }
1277
1278         return head;
1279 }
1280
1281 prefetchpile_t *foundLocal(prefetchqelem_t *node) {
1282         int ntuples,i, j, k, oidnfound = 0, arryfieldindex,nextarryfieldindex, flag = 0, val;
1283         unsigned int *oid;
1284         int isArray;
1285         char *ptr, *tmp;
1286         objheader_t *objheader;
1287         unsigned short *endoffsets;
1288         short *arryfields; 
1289         prefetchpile_t *head = NULL;
1290
1291         ptr = (char *) node;
1292         ntuples = *(GET_NTUPLES(ptr));
1293         oid = GET_PTR_OID(ptr);
1294         endoffsets = GET_PTR_EOFF(ptr, ntuples); 
1295         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1296
1297         /* Find offset length for each tuple */
1298         int numoffset[ntuples];//Number of offsets for each tuple
1299         numoffset[0] = endoffsets[0];
1300         for(i = 1; i<ntuples; i++) {
1301                 numoffset[i] = endoffsets[i] - endoffsets[i-1];
1302         }
1303
1304         for(i = 0; i < ntuples; i++) { 
1305                 if(oid[i] == 0){
1306                         if(i == 0) {
1307                                 arryfieldindex = 0;
1308                                 nextarryfieldindex =  endoffsets[0];
1309                         }else {
1310                                 arryfieldindex = endoffsets[i-1];
1311                                 nextarryfieldindex =  endoffsets[i];
1312                         }
1313                         numoffset[i] = 0;
1314                         endoffsets[0] = val = numoffset[0];
1315                         for(k = 1; k < ntuples; k++) {
1316                                 val = val + numoffset[k];
1317                                 endoffsets[k] = val; 
1318                         }
1319                         
1320                         for(k = 0; k<endoffsets[ntuples-1]; k++) {
1321                                 arryfields[arryfieldindex+k] = arryfields[nextarryfieldindex+k];
1322                         }
1323                         continue;
1324                 }
1325
1326                 /* If object found locally */
1327                 if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) { 
1328                         isArray = 0;
1329                         tmp = (char *) objheader;
1330                         int orgnumoffset = numoffset[i];
1331                         if(i == 0) {
1332                                 arryfieldindex = 0;
1333                         }else {
1334                                 arryfieldindex = endoffsets[i-1];
1335                         }
1336
1337                         for(j = 0; j<orgnumoffset; j++) {
1338                                 unsigned int objoid = 0;
1339                                 /* Check for arrays  */
1340                                 if(TYPE(objheader) > NUMCLASSES) {
1341                                         isArray = 1;
1342                                 }
1343                                 if(isArray == 1) {
1344                                         int elementsize = classsize[TYPE(objheader)];
1345                                         struct ArrayObject *ao = (struct ArrayObject *) (tmp + sizeof(objheader_t));
1346                                         unsigned short length = ao->___length___;
1347                                         /* Check if array out of bounds */
1348                                         if(arryfields[arryfieldindex] < 0 || arryfields[arryfieldindex]>= length) {
1349                                                 break;
1350                                         }
1351                                         objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex])));
1352                                 } else {
1353                                         objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]));
1354                                 }
1355                                 //Update numoffset array
1356                                 numoffset[i] = numoffset[i] - 1;
1357                                 //Update oid array
1358                                 oid[i] = objoid;
1359                                 //Update endoffset array
1360                                 endoffsets[0] = val = numoffset[0];
1361                                 for(k = 1; k < ntuples; k++) {
1362                                         val = val + numoffset[k];
1363                                         endoffsets[k] = val; 
1364                                 }
1365                                 //Update arrayfields array
1366                                 for(k = 0; k < endoffsets[ntuples-1]; k++) {
1367                                         arryfields[arryfieldindex+k] = arryfields[arryfieldindex+k+1];
1368                                 }
1369                                 if((objheader = (objheader_t*) mhashSearch(oid[i])) == NULL) {
1370                                         flag = 1;
1371                                         checkPreCache(node, numoffset, oid[i], i); 
1372                                         break;
1373                                 }
1374                                 tmp = (char *) objheader;
1375                                 isArray = 0;
1376                         }
1377                         /*If all offset oids are found locally,make the prefetch tuple invalid */
1378                         if(flag == 0) {
1379                                 oid[i] = 0;
1380                         }
1381                 } else {
1382                         /* Look in Prefetch cache */
1383                         checkPreCache(node, numoffset, oid[i],i); 
1384                 }
1385         }
1386         
1387         /* Make machine groups */
1388         if((head = makePreGroups(node, numoffset)) == NULL) {
1389                 printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__);
1390                 return NULL;
1391         }
1392
1393         return head;
1394 }
1395
1396 void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, int index) {
1397         char *ptr, *tmp;
1398         int ntuples, i, k, flag=0, isArray =0, arryfieldindex, val;
1399         unsigned int * oid;
1400         unsigned short *endoffsets;
1401         short *arryfields;
1402         objheader_t *header;
1403
1404         ptr = (char *) node;
1405         ntuples = *(GET_NTUPLES(ptr));
1406         oid = GET_PTR_OID(ptr);
1407         endoffsets = GET_PTR_EOFF(ptr, ntuples);
1408         arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
1409
1410         if((header = (objheader_t *) prehashSearch(objoid)) == NULL) {
1411                 return;
1412         } else { //Found in Prefetch Cache
1413                 //TODO Decide if object is too old, if old remove from cache
1414                 tmp = (char *) header;
1415                 int loopcount = numoffset[index];       
1416                 if(index == 0)
1417                         arryfieldindex = 0;
1418                 else
1419                         arryfieldindex = endoffsets[(index - 1)];
1420                 // Check if any of the offset oid is available in the Prefetch cache
1421                 for(i = 0; i < loopcount; i++) {
1422                         /* Check for arrays  */
1423                         if(TYPE(header) > NUMCLASSES) {
1424                                 isArray = 1;
1425                         }
1426                         if(isArray == 1) {
1427                                 int elementsize = classsize[TYPE(header)];
1428                                 objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex])));
1429                         } else {
1430                                 objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]));
1431                         }
1432                         //Update numoffset array
1433                         numoffset[index] = numoffset[index] - 1;
1434                         //Update oid array
1435                         oid[index] = objoid;
1436                         //Update endoffset array
1437                         endoffsets[0] = val = numoffset[0];
1438                         for(k = 1; k < ntuples; k++) {
1439                                 val = val + numoffset[k];
1440                                 endoffsets[k] = val; 
1441                         }
1442                         //Update arrayfields array
1443                         for(k = 0; k < endoffsets[ntuples-1]; k++) {
1444                                 arryfields[arryfieldindex+k] = arryfields[arryfieldindex+k+1];
1445                         }
1446                         if((header = (objheader_t *)prehashSearch(oid[index])) != NULL) {
1447                                 tmp = (char *) header;
1448                                 isArray = 0;
1449                         } else {
1450                                 flag = 1;
1451                                 break;
1452                         }
1453                 }
1454         }
1455         //Found in the prefetch cache
1456         if(flag == 0 && (numoffset[index] == 0)) {
1457                 oid[index] = 0;
1458         }
1459 }
1460
1461
1462
1463 /* This function is called by the thread calling transPrefetch */
1464 void *transPrefetch(void *t) {
1465         prefetchqelem_t *qnode;
1466         prefetchpile_t *pilehead = NULL;
1467         prefetchpile_t *ptr = NULL, *piletail = NULL;
1468
1469         while(1) {
1470                 /* lock mutex of primary prefetch queue */
1471                 pthread_mutex_lock(&pqueue.qlock);
1472                 /* while primary queue is empty, then wait */
1473                 while((pqueue.front == NULL) && (pqueue.rear == NULL)) {
1474                         pthread_cond_wait(&pqueue.qcond, &pqueue.qlock);
1475                 }
1476
1477                 /* dequeue node to create a machine piles and  finally unlock mutex */
1478                 if((qnode = pre_dequeue()) == NULL) {
1479                         printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
1480                         pthread_mutex_unlock(&pqueue.qlock);
1481                         continue;
1482                 }
1483                 pthread_mutex_unlock(&pqueue.qlock);
1484                                 
1485                 /* Reduce redundant prefetch requests */
1486                 checkPrefetchTuples(qnode);
1487                 /* Check if the tuples are found locally, if yes then reduce them further*/ 
1488                 /* and group requests by remote machine ids by calling the makePreGroups() */
1489                 if((pilehead = foundLocal(qnode)) == NULL) {
1490                         printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__);
1491                         pre_enqueue(qnode);
1492                         continue;
1493                 }
1494
1495                 ptr = pilehead;
1496                 while(ptr != NULL) {
1497                         if(ptr->next == NULL) {
1498                                 piletail = ptr;
1499                         } 
1500                         ptr = ptr->next;
1501                 }
1502
1503                 /* Lock mutex of pool queue */
1504                 pthread_mutex_lock(&mcqueue.qlock);
1505                 /* Update the pool queue with the new remote machine piles generated per prefetch call */
1506                 mcpileenqueue(pilehead, piletail);
1507                 /* Broadcast signal on machine pile queue */
1508                 pthread_cond_broadcast(&mcqueue.qcond);
1509                 /* Unlock mutex of  machine pile queue */
1510                 pthread_mutex_unlock(&mcqueue.qlock);
1511                 /* Deallocate the prefetch queue pile node */
1512                 predealloc(qnode);
1513         }
1514 }
1515
1516 /* Each thread in the  pool of threads calls this function to establish connection with
1517  * remote machines, send the prefetch requests and process the reponses from
1518  * the remote machines .
1519  * The thread is active throughout the period of runtime */
1520
1521 void *mcqProcess(void *threadid) {
1522         int tid, i;
1523         prefetchpile_t *mcpilenode;
1524         struct sockaddr_in remoteAddr;
1525         int sd;
1526
1527         tid = (int) threadid;
1528         while(1) {
1529
1530                 sockIdFound = 0;
1531                 /* Lock mutex of mc pile queue */
1532                 pthread_mutex_lock(&mcqueue.qlock);
1533                 /* When mc pile queue is empty, wait */
1534                 while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
1535                         pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
1536                 }
1537                 /* Dequeue node to send remote machine connections*/
1538                 if((mcpilenode = mcpiledequeue()) == NULL) {
1539                         printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
1540                         pthread_mutex_unlock(&mcqueue.qlock);
1541                         continue;
1542                 }
1543                 /* Unlock mutex */
1544                 pthread_mutex_unlock(&mcqueue.qlock);
1545
1546                 /*Initiate connection to remote host and send prefetch request */ 
1547                 if(mcpilenode->mid != myIpAddr) {
1548                         /* Check to see if socket exists */
1549                         for(i = 0; i < NUM_MACHINES; i++) {
1550                                 if(midSocketArray[i].mid == mcpilenode->mid) {
1551                                         sendPrefetchReq(mcpilenode, midSocketArray[i].sockid);
1552                                         sockIdFound = 1;
1553                                         break;
1554                                 }
1555                         }
1556
1557                         if(sockIdFound == 0) {
1558                                 if(sockCount < NUM_MACHINES) {
1559                                         /* Open Socket */
1560                                         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1561                                                 printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__);
1562                                                 return;
1563                                         }
1564
1565                                         bzero(&remoteAddr, sizeof(remoteAddr));
1566                                         remoteAddr.sin_family = AF_INET;
1567                                         remoteAddr.sin_port = htons(LISTEN_PORT);
1568                                         remoteAddr.sin_addr.s_addr = htonl(mcpilenode->mid);
1569
1570                                         /* Open Connection */
1571                                         if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
1572                                                 printf("%s():error %d connecting to %s:%d\n", __func__, errno,
1573                                                                 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1574                                                 close(sd);
1575                                                 return;
1576                                         }
1577
1578                                         midSocketArray[sockCount].mid = mcpilenode->mid;
1579                                         midSocketArray[sockCount].sockid = sd;
1580                                         sendPrefetchReq(mcpilenode, midSocketArray[sockCount].sockid);
1581                                         sockCount++;
1582                                 } else {
1583                                         //TODO Fix for connecting to more than 2 machines && close socket
1584                                         printf("%s(): Error: Currently works for only 2 machines\n", __func__);
1585                                         return;
1586                                 }
1587                         }
1588                 }
1589
1590                 /* Deallocate the machine queue pile node */
1591                 mcdealloc(mcpilenode);
1592         }
1593 }
1594
1595 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
1596         int i, off, len, endpair, count = 0;
1597         char machineip[16], control;
1598         objpile_t *tmp;
1599
1600         /* Send TRANS_PREFETCH control message */
1601         control = TRANS_PREFETCH;
1602         if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
1603                 perror("sendPrefetchReq() Sending TRANS_PREFETCH");
1604                 return;
1605         }
1606
1607         /* Send Oids and offsets in pairs */
1608         tmp = mcpilenode->objpiles;
1609         while(tmp != NULL) {
1610                 off = 0;
1611                 count++;  /* Keeps track of the number of oid and offset tuples sent per remote machine */
1612                 len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
1613                 char oidnoffset[len];
1614                 bzero(oidnoffset, len);
1615                 *((int*)oidnoffset) = len;
1616                 off = sizeof(int);
1617                 *((unsigned int *)(oidnoffset + off)) = tmp->oid;
1618                 off += sizeof(unsigned int);
1619                 *((unsigned int *)(oidnoffset + off)) = myIpAddr; 
1620                 off += sizeof(unsigned int);
1621                 for(i = 0; i < tmp->numoffset; i++) {
1622                         *((short*)(oidnoffset + off)) = tmp->offset[i];
1623                         off+=sizeof(short);
1624                 }
1625                 if (send(sd, oidnoffset, len , MSG_NOSIGNAL) < len) {
1626                         perror("Sending oids and offsets");
1627                         return;
1628                 }
1629                 
1630                 tmp = tmp->next;
1631         }
1632
1633         /* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
1634         endpair = -1;
1635         if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
1636                 perror("Error sending endpair\n");
1637                 return;
1638         }
1639
1640         return;
1641 }
1642
1643 int getPrefetchResponse(int sd) {
1644         int numbytes = 0, length = 0, size = 0;
1645         char *recvbuffer, control;
1646         unsigned int oid;
1647         void *modptr, *oldptr;
1648
1649         if((numbytes = recv((int)sd, &length, sizeof(int), 0)) <= 0) {
1650                 printf("%s() Error: in receiving length at %s, %d\n", __func__, __FILE__, __LINE__);
1651                 return -1;
1652         } else {
1653                 numbytes = 0;
1654                 size = length - sizeof(int);
1655                 if((recvbuffer = calloc(1, size)) == NULL) {
1656                         printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
1657                         return -1;
1658                 }
1659                 while(numbytes < size) {
1660                         numbytes += recv((int)sd, recvbuffer+numbytes, size-numbytes, 0);
1661                 }
1662                 
1663                 control = *((char *) recvbuffer);
1664                 if(control == OBJECT_FOUND) {
1665                         numbytes = 0;
1666                         oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1667                         size = size - (sizeof(char) + sizeof(unsigned int));
1668                         pthread_mutex_lock(&prefetchcache_mutex);
1669                         if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
1670                                 printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
1671                                 pthread_mutex_unlock(&prefetchcache_mutex);
1672                                 free(recvbuffer);
1673                                 return -1;
1674                         }
1675                         pthread_mutex_unlock(&prefetchcache_mutex);
1676                         memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
1677
1678                         /* Insert the oid and its address into the prefetch hash lookup table */
1679                         /* Do a version comparison if the oid exists */
1680                         if((oldptr = prehashSearch(oid)) != NULL) {
1681                                 /* If older version then update with new object ptr */
1682                                 if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
1683                                         prehashRemove(oid);
1684                                         prehashInsert(oid, modptr);
1685                                 } else {
1686                                         /* TODO modptr should be reference counted */
1687                                 }
1688                         } else {/* Else add the object ptr to hash table*/
1689                                 prehashInsert(oid, modptr);
1690                         }
1691                         /* Lock the Prefetch Cache look up table*/
1692                         pthread_mutex_lock(&pflookup.lock);
1693                         /* Broadcast signal on prefetch cache condition variable */ 
1694                         pthread_cond_broadcast(&pflookup.cond);
1695                         /* Unlock the Prefetch Cache look up table*/
1696                         pthread_mutex_unlock(&pflookup.lock);
1697                 } else if(control == OBJECT_NOT_FOUND) {
1698                         oid = *((unsigned int *)(recvbuffer + sizeof(char)));
1699                         /* TODO: For each object not found query DHT for new location and retrieve the object */
1700                         /* Throw an error */
1701                         printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
1702                         free(recvbuffer);
1703                         exit(-1);
1704                 } else {
1705                         printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
1706                 }
1707                 free(recvbuffer);
1708         }
1709
1710         return 0;
1711 }
1712
1713 unsigned short getObjType(unsigned int oid)
1714 {
1715         objheader_t *objheader;
1716         unsigned short numoffset[] ={0};
1717         short fieldoffset[] ={};
1718
1719         if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL)
1720         {
1721                 if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
1722                 {
1723                         prefetch(1, &oid, numoffset, fieldoffset);
1724                         pthread_mutex_lock(&pflookup.lock);
1725                         while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
1726                         {
1727                                 pthread_cond_wait(&pflookup.cond, &pflookup.lock);
1728                         }
1729                         pthread_mutex_unlock(&pflookup.lock);
1730                 }
1731         }
1732
1733         return TYPE(objheader);
1734 }
1735
1736 int startRemoteThread(unsigned int oid, unsigned int mid)
1737 {
1738         int sock;
1739         struct sockaddr_in remoteAddr;
1740         char msg[1 + sizeof(unsigned int)];
1741         int bytesSent;
1742         int status;
1743
1744         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1745         {
1746                 perror("startRemoteThread():socket()");
1747                 return -1;
1748         }
1749
1750         bzero(&remoteAddr, sizeof(remoteAddr));
1751         remoteAddr.sin_family = AF_INET;
1752         remoteAddr.sin_port = htons(LISTEN_PORT);
1753         remoteAddr.sin_addr.s_addr = htonl(mid);
1754         
1755         if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0)
1756         {
1757                 printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
1758                         inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1759                 status = -1;
1760         }
1761         else
1762         {
1763                 msg[0] = START_REMOTE_THREAD;
1764                 memcpy(&msg[1], &oid, sizeof(unsigned int));
1765
1766                 bytesSent = send(sock, msg, 1 + sizeof(unsigned int), 0);
1767                 if (bytesSent < 0)
1768                 {
1769                         perror("startRemoteThread():send()");
1770                         status = -1;
1771                 }
1772                 else if (bytesSent != 1 + sizeof(unsigned int))
1773                 {
1774                         printf("startRemoteThread(): error, sent %d bytes\n", bytesSent);
1775                         status = -1;
1776                 }
1777                 else
1778                 {
1779                         status = 0;
1780                 }
1781         }
1782
1783         close(sock);
1784         return status;
1785 }
1786
1787 //TODO: when reusing oids, make sure they are not already in use!
1788 unsigned int getNewOID(void) {
1789         static unsigned int id = 0xFFFFFFFF;
1790         
1791         id += 2;
1792         if (id > oidMax || id < oidMin)
1793         {
1794                 id = (oidMin | 1);
1795         }
1796         return id;
1797 }
1798
1799 int processConfigFile()
1800 {
1801         FILE *configFile;
1802         const int maxLineLength = 200;
1803         char lineBuffer[maxLineLength];
1804         char *token;
1805         const char *delimiters = " \t\n";
1806         char *commentBegin;
1807         in_addr_t tmpAddr;
1808         
1809         configFile = fopen(CONFIG_FILENAME, "r");
1810         if (configFile == NULL)
1811         {
1812                 printf("error opening %s:\n", CONFIG_FILENAME);
1813                 perror("");
1814                 return -1;
1815         }
1816
1817         numHostsInSystem = 0;
1818         sizeOfHostArray = 8;
1819         hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int));
1820         
1821         while(fgets(lineBuffer, maxLineLength, configFile) != NULL)
1822         {
1823                 commentBegin = strchr(lineBuffer, '#');
1824                 if (commentBegin != NULL)
1825                         *commentBegin = '\0';
1826                 token = strtok(lineBuffer, delimiters);
1827                 while (token != NULL)
1828                 {
1829                         tmpAddr = inet_addr(token);
1830                         if ((int)tmpAddr == -1)
1831                         {
1832                                 printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token);
1833                                 fclose(configFile);
1834                                 return -1;
1835                         }
1836                         else
1837                                 addHost(htonl(tmpAddr));
1838                         token = strtok(NULL, delimiters);
1839                 }
1840         }
1841
1842         fclose(configFile);
1843         
1844         if (numHostsInSystem < 1)
1845         {
1846                 printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME);
1847                 return -1;
1848         }
1849 #ifdef MAC
1850         myIpAddr = getMyIpAddr("en1");
1851 #else
1852         myIpAddr = getMyIpAddr("eth0");
1853 #endif
1854         myIndexInHostArray = findHost(myIpAddr);
1855         if (myIndexInHostArray == -1)
1856         {
1857                 printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME);
1858                 return -1;
1859         }
1860         oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1;
1861         oidMin = oidsPerBlock * myIndexInHostArray;
1862         if (myIndexInHostArray == numHostsInSystem - 1)
1863                 oidMax = 0xFFFFFFFF;
1864         else
1865                 oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1;
1866
1867         return 0;
1868 }
1869
1870 void addHost(unsigned int hostIp)
1871 {
1872         unsigned int *tmpArray;
1873
1874         if (findHost(hostIp) != -1)
1875                 return;
1876
1877         if (numHostsInSystem == sizeOfHostArray)
1878         {
1879                 tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int));
1880                 memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem);
1881                 free(hostIpAddrs);
1882                 hostIpAddrs = tmpArray;
1883         }
1884
1885         hostIpAddrs[numHostsInSystem++] = hostIp;
1886
1887         return;
1888 }
1889
1890 int findHost(unsigned int hostIp)
1891 {
1892         int i;
1893         for (i = 0; i < numHostsInSystem; i++)
1894                 if (hostIpAddrs[i] == hostIp)
1895                         return i;
1896
1897         //not found
1898         return -1;
1899 }
1900
1901 /* This function sends notification request per thread waiting on object(s) whose version 
1902  * changes */
1903 int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
1904         int sock,i;
1905         objheader_t *objheader;
1906         struct sockaddr_in remoteAddr;
1907         char msg[1 + numoid * (sizeof(unsigned short) + sizeof(unsigned int)) +  3 * sizeof(unsigned int)];
1908         char *ptr;
1909         int bytesSent;
1910         int status, size;
1911         unsigned short version;
1912         unsigned int oid,mid;
1913         static unsigned int threadid = 0;
1914         pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification
1915         pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
1916         notifydata_t *ndata;
1917
1918         //FIXME currently all oids belong to one machine
1919         oid = oidarry[0];
1920         if((mid = lhashSearch(oid)) == 0) {
1921                 printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
1922                 return;
1923         }
1924
1925         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
1926                 perror("reqNotify():socket()");
1927                 return -1;
1928         }
1929
1930         bzero(&remoteAddr, sizeof(remoteAddr));
1931         remoteAddr.sin_family = AF_INET;
1932         remoteAddr.sin_port = htons(LISTEN_PORT);
1933         remoteAddr.sin_addr.s_addr = htonl(mid);
1934
1935         /* Generate unique threadid */
1936         threadid++;
1937
1938         /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
1939         if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
1940                 printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
1941                 return -1;
1942         }
1943         ndata->numoid = numoid;
1944         ndata->threadid = threadid;
1945         ndata->oidarry = oidarry;
1946         ndata->versionarry = versionarry;
1947         ndata->threadcond = threadcond;
1948         ndata->threadnotify = threadnotify;
1949         if((status = notifyhashInsert(threadid, ndata)) != 0) {
1950                 printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
1951                 free(ndata);
1952                 return -1; 
1953         }
1954         
1955         /* Send  number of oids, oidarry, version array, machine id and threadid */     
1956         if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
1957                 printf("reqNotify():error %d connecting to %s:%d\n", errno,
1958                                 inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
1959                 free(ndata);
1960                 return -1;
1961         } else {
1962                 msg[0] = THREAD_NOTIFY_REQUEST;
1963                 *((unsigned int *)(&msg[1])) = numoid;
1964                 /* Send array of oids  */
1965                 size = sizeof(unsigned int);
1966                 {
1967                         i = 0;
1968                         while(i < numoid) {
1969                                 oid = oidarry[i];
1970                                 *((unsigned int *)(&msg[1] + size)) = oid;
1971                                 size += sizeof(unsigned int);
1972                                 i++;
1973                         }
1974                 }
1975
1976                 /* Send array of version  */
1977                 {
1978                         i = 0;
1979                         while(i < numoid) {
1980                                 version = versionarry[i];
1981                                 *((unsigned short *)(&msg[1] + size)) = version;
1982                                 size += sizeof(unsigned short);
1983                                 i++;
1984                         }
1985                 }
1986
1987                 *((unsigned int *)(&msg[1] + size)) = myIpAddr;
1988                 size += sizeof(unsigned int);
1989                 *((unsigned int *)(&msg[1] + size)) = threadid;
1990
1991                 pthread_mutex_lock(&(ndata->threadnotify));
1992                 bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int) , 0);
1993                 if (bytesSent < 0){
1994                         perror("reqNotify():send()");
1995                         status = -1;
1996                 } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int)){
1997                         printf("reNotify(): error, sent %d bytes %s, %d\n", bytesSent, __FILE__, __LINE__);
1998                         status = -1;
1999                 } else {
2000                         status = 0;
2001                 }
2002                 
2003                 pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
2004                 pthread_mutex_unlock(&(ndata->threadnotify));
2005         }
2006
2007         pthread_cond_destroy(&threadcond);
2008         pthread_mutex_destroy(&threadnotify);
2009         free(ndata);
2010         close(sock);
2011         return status;
2012 }
2013
2014 void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
2015         notifydata_t *ndata;
2016         int i, objIsFound = 0, index;
2017         void *ptr;
2018
2019         //Look up the tid and call the corresponding pthread_cond_signal
2020         if((ndata = notifyhashSearch(tid)) == NULL) {
2021                 printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
2022                 return;
2023         } else  {
2024                 for(i = 0; i < ndata->numoid; i++) {
2025                         if(ndata->oidarry[i] == oid){
2026                                 objIsFound = 1;
2027                                 index = i;
2028                         }
2029                 }
2030                 if(objIsFound == 0){
2031                         printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__);
2032                         return;
2033                 } else {
2034                         if(version <= ndata->versionarry[index]){
2035                                 printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__);
2036                                 return;
2037                         } else {
2038                                 /* Clear from prefetch cache and free thread related data structure */
2039                                 if((ptr = prehashSearch(oid)) != NULL) {
2040                                         prehashRemove(oid);
2041                                 }
2042                                 pthread_cond_signal(&(ndata->threadcond));
2043                         }
2044                 }
2045         }
2046         return;
2047 }
2048
2049 int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
2050         threadlist_t *ptr;
2051         unsigned int mid;
2052         struct sockaddr_in remoteAddr;
2053         char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
2054         int sock, status, size, bytesSent;
2055
2056         while(*head != NULL) {
2057                 ptr = *head;
2058                 mid = ptr->mid; 
2059                 //create a socket connection to that machine
2060                 if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
2061                         perror("notifyAll():socket()");
2062                         return -1;
2063                 }
2064
2065                 bzero(&remoteAddr, sizeof(remoteAddr));
2066                 remoteAddr.sin_family = AF_INET;
2067                 remoteAddr.sin_port = htons(LISTEN_PORT);
2068                 remoteAddr.sin_addr.s_addr = htonl(mid);
2069                 //send Thread Notify response and threadid to that machine
2070                 if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
2071                         printf("notifyAll():error %d connecting to %s:%d\n", errno,
2072                                         inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
2073                         status = -1;
2074                 } else {
2075                         bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
2076                         msg[0] = THREAD_NOTIFY_RESPONSE;
2077                         *((unsigned int *)&msg[1]) = oid;
2078                         size = sizeof(unsigned int);
2079                         *((unsigned short *)(&msg[1]+ size)) = version;
2080                         size+= sizeof(unsigned short);
2081                         *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
2082
2083                         bytesSent = send(sock, msg, (1 + 2*sizeof(unsigned int) + sizeof(unsigned short)), 0);
2084                         if (bytesSent < 0){
2085                                 perror("notifyAll():send()");
2086                                 status = -1;
2087                         } else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){
2088                                 printf("notifyAll(): error, sent %d bytes %s, %d\n", 
2089                                                 bytesSent, __FILE__, __LINE__);
2090                                 status = -1;
2091                         } else {
2092                                 status = 0;
2093                         }
2094                 }
2095                 //close socket
2096                 close(sock);
2097                 // Update head
2098                 *head = ptr->next;
2099                 free(ptr);
2100         }
2101         return status;
2102 }
2103
2104 void transAbort(transrecord_t *trans) {
2105         objstrDelete(trans->cache);
2106         chashDelete(trans->lookupTable);
2107         free(trans);
2108 }