Clean up code
[IRC.git] / Robust / src / Runtime / DSTM / interface / trans.c
1 #include "dstm.h"
2 #include "ip.h"
3 #include "clookup.h"
4 #include "mlookup.h"
5 #include "llookup.h"
6 #include "plookup.h"
7 #include<pthread.h>
8 #include<sys/types.h>
9 #include<sys/socket.h>
10 #include<netdb.h>
11 #include<netinet/in.h>
12 #include <sys/types.h>
13 #include <unistd.h>
14 #include <time.h>
15
16 #define LISTEN_PORT 2156
17 #define MACHINE_IP "127.0.0.1"
18 #define RECEIVE_BUFFER_SIZE 2048
19
20 extern int classsize[];
21 plistnode_t *createPiles(transrecord_t *);
22 /* This functions inserts randowm wait delays in the order of msec */
23 void randomdelay(void)
24 {
25         struct timespec req, rem;
26         time_t t;
27
28         t = time(NULL);
29         req.tv_sec = 0;
30         req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
31         nanosleep(&req, &rem);
32         return;
33 }
34
35 transrecord_t *transStart()
36 {
37         transrecord_t *tmp = malloc(sizeof(transrecord_t));
38         tmp->cache = objstrCreate(1048576);
39         tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
40         return tmp;
41 }
42
43 /* This function finds the location of the objects involved in a transaction
44  * and returns the pointer to the object if found in a remote location */
45 objheader_t *transRead(transrecord_t *record, unsigned int oid)
46 {       
47         unsigned int machinenumber;
48         objheader_t *tmp, *objheader;
49         void *objcopy;
50         int size;
51         void *buf;
52                 /* Search local cache */
53         if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
54                 //printf("DEBUG -> transRead oid %d found local\n", oid);
55                 return(objheader);
56         } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
57                 /* Look up in machine lookup table  and copy  into cache*/
58                 //printf("oid is found in Local machinelookup\n");
59                 tmp = mhashSearch(oid);
60                 size = sizeof(objheader_t)+classsize[tmp->type];
61                 objcopy = objstrAlloc(record->cache, size);
62                 memcpy(objcopy, (void *)tmp, size);
63                 /* Insert into cache's lookup table */
64                 chashInsert(record->lookupTable, objheader->oid, objcopy); 
65                 return(objcopy);
66         } else { /* If not found in machine look up */
67                 /* Get the object from the remote location */
68                 machinenumber = lhashSearch(oid);
69                 objcopy = getRemoteObj(record, machinenumber, oid);
70                 if(objcopy == NULL) {
71                         //If object is not found in Remote location
72                         //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
73                         return NULL;
74                 }
75                 else {
76                         //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
77                         return(objcopy);
78                 }
79         } 
80 }
81 /* This function creates objects in the transaction record */
82 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
83 {
84         objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
85         tmp->oid = getNewOID();
86         tmp->type = type;
87         tmp->version = 1;
88         tmp->rcount = 0; //? not sure how to handle this yet
89         tmp->status = 0;
90         tmp->status |= NEW;
91         chashInsert(record->lookupTable, tmp->oid, tmp);
92         return tmp;
93 }
94
95 plistnode_t *createPiles(transrecord_t *record) {
96         int i = 0;
97         unsigned int size;/* Represents number of bins in the chash table */
98         chashlistnode_t *curr, *ptr, *next;
99         plistnode_t *pile = NULL;
100         unsigned int machinenum;
101         objheader_t *headeraddr;
102
103         ptr = record->lookupTable->table;
104         size = record->lookupTable->size;
105
106         for(i = 0; i < size ; i++) {
107                 curr = &ptr[i];
108                 /* Inner loop to traverse the linked list of the cache lookupTable */
109                 while(curr != NULL) {
110                         //if the first bin in hash table is empty
111                         if(curr->key == 0) {
112                                 break;
113                         }
114                         next = curr->next;
115                         //Get machine location for object id
116                         
117                         if ((machinenum = lhashSearch(curr->key)) == 0) {
118                                printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
119                                return NULL;
120                         }
121                                         
122                         if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
123                                 printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
124                                 return NULL;
125                         }
126                         //Make machine groups
127                         if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
128                                 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
129                                 return NULL;
130                         }
131                         curr = next;
132                 }
133         }
134
135         return pile; 
136 }
137 /* This function initiates the transaction commit process
138  * Spawns threads for each of the new connections with Participants 
139  * by creating new piles,
140  * Fills the piles with necesaary information and 
141  * Sends a transrequest() to each pile*/
142 int transCommit(transrecord_t *record) {        
143         unsigned int tot_bytes_mod, *listmid;
144         plistnode_t *pile;
145         int i, rc;
146         int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
147         char buffer[RECEIVE_BUFFER_SIZE],control;
148         char transid[TID_LEN];
149         trans_req_data_t *tosend;
150         static int newtid = 0;
151         char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
152
153         /* Look through all the objects in the transaction record and make piles 
154          * for each machine involved in the transaction*/
155         pile = createPiles(record);
156
157         /* Create the packet to be sent in TRANS_REQUEST */
158
159         /* Count the number of participants */
160         pilecount = pCount(pile);
161                 
162         /* Create a list of machine ids(Participants) involved in transaction   */
163         if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
164                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
165                 return 1;
166         }               
167         pListMid(pile, listmid);
168         
169
170         /* Initialize thread variables,
171          * Spawn a thread for each Participant involved in a transaction */
172         pthread_t thread[pilecount];
173         pthread_attr_t attr;                    
174         pthread_cond_t tcond;
175         pthread_mutex_t tlock;
176         pthread_mutex_t tlshrd;
177         
178         thread_data_array_t *thread_data_array;
179         thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
180         thread_response_t rcvd_control_msg[pilecount];  /* Shared thread array that keeps track of responses of participants */
181
182         /* Initialize and set thread detach attribute */
183         pthread_attr_init(&attr);
184         pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
185         pthread_mutex_init(&tlock, NULL);
186         pthread_cond_init(&tcond, NULL);
187         
188         /* Process each machine pile */
189         while(pile != NULL) {
190                 //Create transaction id
191                 newtid++;
192                 //trans_req_data_t *tosend;
193                 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
194                         printf("Calloc error %s, %d\n", __FILE__, __LINE__);
195                         return 1;
196                 }
197                 tosend->f.control = TRANS_REQUEST;
198                 sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
199                 tosend->f.mcount = pilecount;
200                 tosend->f.numread = pile->numread;
201                 tosend->f.nummod = pile->nummod;
202                 tosend->f.sum_bytes = pile->sum_bytes;
203                 tosend->listmid = listmid;
204                 tosend->objread = pile->objread;
205                 tosend->oidmod = pile->oidmod;
206                 thread_data_array[numthreads].thread_id = numthreads;
207                 thread_data_array[numthreads].mid = pile->mid;
208                 thread_data_array[numthreads].pilecount = pilecount;
209                 thread_data_array[numthreads].buffer = tosend;
210                 thread_data_array[numthreads].recvmsg = rcvd_control_msg;
211                 thread_data_array[numthreads].threshold = &tcond;
212                 thread_data_array[numthreads].lock = &tlock;
213                 thread_data_array[numthreads].count = &trecvcount;
214                 thread_data_array[numthreads].replyctrl = &treplyctrl;
215                 thread_data_array[numthreads].replyretry = &treplyretry;
216                 thread_data_array[numthreads].rec = record;
217
218                 rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);  
219                 if (rc) {
220                         perror("Error in pthread create");
221                         return 1;
222                 }               
223                 numthreads++;           
224                 //TODO frees 
225                 pile = pile->next;
226         }
227
228         /* Free attribute and wait for the other threads */
229         pthread_attr_destroy(&attr);
230         for (i = 0 ;i < pilecount ; i++) {
231                 rc = pthread_join(thread[i], NULL);
232                 if (rc)
233                 {
234                         printf("ERROR return code from pthread_join() is %d\n", rc);
235                         return 1;
236                 }
237         }
238         
239         /* Free resources */    
240         pthread_cond_destroy(&tcond);
241         pthread_mutex_destroy(&tlock);
242         free(tosend);
243         free(listmid);
244         pDelete(pile);
245
246         /* Retry trans commit procedure if not sucessful in the first try */
247         if(treplyretry == 1) {
248                 /* wait a random amount of time */
249                 randomdelay();
250                 //sleep(1);
251                 /* Retry the commiting transaction again */
252                 transCommit(record);
253         }       
254         
255         return 0;
256 }
257
258 /* This function sends information involved in the transaction request and 
259  * accepts a response from particpants.
260  * It calls decideresponse() to decide on what control message 
261  * to send next and sends the message using sendResponse()*/
262 void *transRequest(void *threadarg) {
263         int sd, i, n;
264         struct sockaddr_in serv_addr;
265         struct hostent *server;
266         thread_data_array_t *tdata;
267         objheader_t *headeraddr;
268         char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
269         char machineip[16], retval;
270
271         tdata = (thread_data_array_t *) threadarg;
272
273         /* Send Trans Request */
274         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
275                 perror("Error in socket for TRANS_REQUEST\n");
276                 return NULL;
277         }
278         bzero((char*) &serv_addr, sizeof(serv_addr));
279         serv_addr.sin_family = AF_INET;
280         serv_addr.sin_port = htons(LISTEN_PORT);
281         midtoIP(tdata->mid,machineip);
282         machineip[15] = '\0';
283         serv_addr.sin_addr.s_addr = inet_addr(machineip);
284         /* Open Connection */
285         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
286                 perror("Error in connect for TRANS_REQUEST\n");
287                 return NULL;
288         }
289         
290         printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
291         /* Send bytes of data with TRANS_REQUEST control message */
292         if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
293                 perror("Error sending fixed bytes for thread\n");
294                 return NULL;
295         }
296         /* Send list of machines involved in the transaction */
297         {
298           int size=sizeof(unsigned int)*tdata->pilecount;
299           if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
300             perror("Error sending list of machines for thread\n");
301             return NULL;
302           }
303         }
304         /* Send oids and version number tuples for objects that are read */
305         {
306           int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
307           if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
308             perror("Error sending tuples for thread\n");
309             return NULL;
310           }
311         }
312         /* Send objects that are modified */
313         for(i = 0; i < tdata->buffer->f.nummod ; i++) {
314           int size;
315           headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
316           size=sizeof(objheader_t)+classsize[headeraddr->type];
317           if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
318             perror("Error sending obj modified for thread\n");
319             return NULL;
320           }
321         }
322
323         /* Read control message from Participant */
324         if((n = read(sd, &control, sizeof(char))) <= 0) {
325                 perror("Error in reading control message from Participant\n");
326                 return NULL;
327         }
328         recvcontrol = control;
329         
330         /* Update common data structure and increment count */
331         tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
332
333         /* Lock and update count */
334         //Thread sleeps until all messages from pariticipants are received by coordinator
335         pthread_mutex_lock(tdata->lock);
336
337         (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
338
339         /* Wake up the threads and invoke decideResponse (once) */
340         if(*(tdata->count) == tdata->pilecount) {
341                 if (decideResponse(tdata) != 0) { 
342                         printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
343                         pthread_mutex_unlock(tdata->lock);
344                         close(sd);
345                         return NULL;
346                 }
347                 pthread_cond_broadcast(tdata->threshold);
348         } else {
349                 pthread_cond_wait(tdata->threshold, tdata->lock);
350         }       
351
352         pthread_mutex_unlock(tdata->lock);
353
354         /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
355          * to all participants in their respective socket */
356         if (sendResponse(tdata, sd) == 0) { 
357                 printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
358                 pthread_mutex_unlock(tdata->lock);
359                 close(sd);
360                 return NULL;
361         }
362
363         /* Close connection */
364         close(sd);
365         pthread_exit(NULL);
366 }
367
368 /* This function decides the reponse that needs to be sent to 
369  * all Participant machines involved in the transaction commit */
370 int decideResponse(thread_data_array_t *tdata) {
371         char control;
372         int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
373                                                                          message to send */
374
375         //Check common data structure 
376         for (i = 0 ; i < tdata->pilecount ; i++) {
377                 /*Switch on response from Participant */
378                 control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
379                                                            written onto the shared array */
380                 switch(control) {
381                         case TRANS_DISAGREE:
382                                 printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
383                                 transdisagree++;
384                                 break;
385
386                         case TRANS_AGREE:
387                                 printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
388                                 transagree++;
389                                 break;
390                                 
391                         case TRANS_SOFT_ABORT:
392                                 printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
393                                 transsoftabort++;
394                                 break;
395                         default:
396                                 printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
397                                 return -1;
398                 }
399         }
400         
401         /* Decide what control message to send to Participant */        
402         if(transdisagree > 0) {
403                 /* Send Abort */
404                 *(tdata->replyctrl) = TRANS_ABORT;
405                 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
406                 objstrDelete(tdata->rec->cache);
407                 chashDelete(tdata->rec->lookupTable);
408                 free(tdata->rec);
409         } else if(transagree == tdata->pilecount){
410                 /* Send Commit */
411                 *(tdata->replyctrl) = TRANS_COMMIT;
412                 printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
413                 objstrDelete(tdata->rec->cache);
414                 chashDelete(tdata->rec->lookupTable);
415                 free(tdata->rec);
416         } else if(transsoftabort > 0 && transdisagree == 0) {
417                 /* Send Abort in soft abort case followed by retry commiting transaction again*/
418                 *(tdata->replyctrl) = TRANS_ABORT;
419                 *(tdata->replyretry) = 1;
420                 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
421         } else {
422                 printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
423                 return -1;
424         }
425         
426         return 0;
427 }
428 /* This function sends the final response to all threads in their respective socket id */
429 char sendResponse(thread_data_array_t *tdata, int sd) {
430         int n, N, sum, oidcount = 0;
431         char *ptr, retval = 0;
432         unsigned int *oidnotfound;
433
434         /* If the decided response is due to a soft abort and missing objects at the Participant's side */
435         if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
436                 /* Read list of objects missing */
437                 if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
438                         N = oidcount * sizeof(unsigned int);
439                         if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
440                                 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
441                         }
442                         ptr = (char *) oidnotfound;
443                         do {
444                                 n = read(sd, ptr+sum, N-sum);
445                                 sum += n;
446                         } while(sum < N && n !=0);
447                 }
448                 retval =  TRANS_SOFT_ABORT;
449         }
450         /* If the decided response is TRANS_ABORT */
451         if(*(tdata->replyctrl) == TRANS_ABORT) {
452                 retval = TRANS_ABORT;
453         }
454         /* If the decided response is TRANS_COMMIT */
455         if(*(tdata->replyctrl) == TRANS_COMMIT) {
456                 retval = TRANS_COMMIT;
457         }
458         /* Send response to the Participant */
459         if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
460                 perror("Error sending ctrl message for participant\n");
461         }
462
463         return retval;
464 }
465
466 /* This function opens a connection, places an object read request to the 
467  * remote machine, reads the control message and object if available  and 
468  * copies the object and its header to the local cache.
469  * TODO replace mnum and midtoIP() with MACHINE_IP address later */ 
470
471 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
472         int sd, size, val;
473         struct sockaddr_in serv_addr;
474         struct hostent *server;
475         char control;
476         char machineip[16];
477         objheader_t *h;
478         void *objcopy;
479
480         if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
481                 perror("Error in socket\n");
482                 return NULL;
483         }
484         bzero((char*) &serv_addr, sizeof(serv_addr));
485         serv_addr.sin_family = AF_INET;
486         serv_addr.sin_port = htons(LISTEN_PORT);
487         //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
488         midtoIP(mnum,machineip);
489         machineip[15] = '\0';
490         serv_addr.sin_addr.s_addr = inet_addr(machineip);
491         /* Open connection */
492         if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
493                 perror("Error in connect\n");
494                 return NULL;
495         }
496         char readrequest[sizeof(char)+sizeof(unsigned int)];
497         readrequest[0] = READ_REQUEST;
498         *((unsigned int *)(&readrequest[1])) = oid;
499         if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
500                 perror("Error sending message\n");
501                 return NULL;
502         }
503
504 #ifdef DEBUG1
505         printf("DEBUG -> ready to rcv ...\n");
506 #endif
507         /* Read response from the Participant */
508         if((val = read(sd, &control, sizeof(char))) <= 0) {
509                 perror("No control response for getRemoteObj sent\n");
510                 return NULL;
511         }
512         switch(control) {
513                 case OBJECT_NOT_FOUND:
514                         printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
515                         return NULL;
516                 case OBJECT_FOUND:
517                         /* Read object if found into local cache */
518                         if((val = read(sd, &size, sizeof(int))) <= 0) {
519                                 perror("No size is read from the participant\n");
520                                 return NULL;
521                         }
522                         objcopy = objstrAlloc(record->cache, size);
523                         if((val = read(sd, objcopy, size)) <= 0) {
524                                 perror("No objects are read from the remote participant\n");
525                                 return NULL;
526                         }
527                         /* Insert into cache's lookup table */
528                         chashInsert(record->lookupTable, oid, objcopy); 
529                         break;
530                 default:
531                         printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
532                         return NULL;
533         }
534         /* Close connection */
535         close(sd);
536         return objcopy;
537 }