11 #include<netinet/in.h>
13 #define LISTEN_PORT 2156
14 #define MACHINE_IP "127.0.0.1"
15 #define RECEIVE_BUFFER_SIZE 2048
17 extern int classsize[];
19 transrecord_t *transStart()
21 transrecord_t *tmp = malloc(sizeof(transrecord_t));
22 tmp->cache = objstrCreate(1048576);
23 tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
27 objheader_t *transRead(transrecord_t *record, unsigned int oid)
29 printf("Enter TRANS_READ\n");
30 unsigned int machinenumber;
31 objheader_t *tmp, *objheader;
36 if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
37 printf("DEBUG -> transRead oid %d found local\n", oid);
39 } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
40 //Look up in Machine lookup table and found
41 printf("oid is found in Local mlookup\n");
42 tmp = mhashSearch(oid);
43 size = sizeof(objheader_t)+classsize[tmp->type];
45 objcopy = objstrAlloc(record->cache, size);
46 memcpy(objcopy, (void *)tmp, size);
47 //Insert into cache's lookup table
48 chashInsert(record->lookupTable, objheader->oid, objcopy);
51 //Get the object from the remote location
52 printf("oid is found in remote machine\n");
53 machinenumber = lhashSearch(oid);
54 objcopy = getRemoteObj(record, machinenumber, oid);
56 //If object is not found in Remote location
57 printf("Object not found in Machine %d\n", machinenumber);
65 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
67 objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
68 tmp->oid = getNewOID();
71 tmp->rcount = 0; //? not sure how to handle this yet
74 chashInsert(record->lookupTable, tmp->oid, tmp);
77 //int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
78 int decideResponse(thread_data_array_t *tdata, int sd, int val) {
79 int i, n, N, sum, retval, oidcount = 0;
80 int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
81 char ctrl, control, *ptr;
82 unsigned int *oidnotfound;
86 //Check common data structure
87 for (i = 0 ; i < tdata->pilecount ; i++) {
89 control = tdata->recvmsg[i].rcv_status;
92 printf("DEBUG-> Inside TRANS_DISAGREE\n");
94 //Free transaction records
95 objstrDelete(tdata->rec->cache);
96 chashDelete(tdata->rec->lookupTable);
100 for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
101 if (write(sd, &ctrl, sizeof(char)) < 0) {
102 perror("Error sending ctrl message for participant\n");
109 printf("DEBUG-> Inside TRANS_AGREE\n");
113 case TRANS_SOFT_ABORT:
114 printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
116 /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
117 if ((i == tdata->thread_id) && (val == 0)) {
118 //Read list of objects missing
119 if(read(sd, &oidcount, sizeof(int)) != 0) {
120 //Break if only objs are locked at the Participant side
124 transsoftabortmiss++;
125 N = oidcount * sizeof(unsigned int);
126 if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
127 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
129 ptr = (char *) oidnotfound;
131 n = read(sd, ptr+sum, N-sum);
133 } while(sum < N && n !=0);
139 printf("Participant sent unknown message\n");
143 //Decide what control message to send to Participant
144 if(transagree == tdata->pilecount){
147 printf("Sending TRANS_COMMIT\n");
148 if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
149 perror("Error sending ctrl message for participant\n");
152 //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
155 if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
156 //Send abort but retry commit
157 ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
158 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
159 if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
160 perror("Error sending ctrl message for participant\n");
163 //Sleep and the resend the request
165 //Read new control message from Participant
167 if((n = read(sd, &control, sizeof(char))) <= 0) {
168 perror("No bytes are read for participant\n");
172 //Update common data structure and increment count
173 tdata->recvmsg[tdata->thread_id].rcv_status = control;
175 decideResponse(tdata, sd, val); //Second call to decideResponse(); indicated by parameter val = 1
178 if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
179 //Send abort but retry commit after relooking up objects
181 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
182 if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
183 perror("Error sending ctrl message for participant\n");
188 //update location table
197 void *transRequest(void *threadarg) {
199 struct sockaddr_in serv_addr;
200 struct hostent *server;
201 thread_data_array_t *tdata;
202 objheader_t *headeraddr;
203 //unsigned int *oidnotfound;
204 char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
207 tdata = (thread_data_array_t *) threadarg;
209 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
210 perror("Error in socket for TRANS_REQUEST");
213 bzero((char*) &serv_addr, sizeof(serv_addr));
214 serv_addr.sin_family = AF_INET;
215 serv_addr.sin_port = htons(LISTEN_PORT);
216 //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
217 midtoIP(tdata->mid,machineip);
218 machineip[15] = '\0';
219 serv_addr.sin_addr.s_addr = inet_addr(machineip);
221 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
222 perror("Error in connect for TRANS_REQUEST");
226 //Multiple writes for sending packets of data
227 //Send first few fixed bytes of the TRANS_REQUEST protocol
228 printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
229 // printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
230 // printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
231 if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
232 perror("Error sending fixed bytes for thread");
235 //Send list of machines involved in the transaction
236 // printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
237 if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
238 perror("Error sending list of machines for thread");
241 //Send oids and version number tuples for objects that are read
242 // printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
243 // printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
244 if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
245 perror("Error sending tuples for thread");
248 //Send objects that are modified
249 for(i = 0; i < tdata->buffer->f.nummod ; i++) {
250 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
251 if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) {
252 perror("Error sending obj modified for thread");
257 //Read message control message from participant side
258 if((n = read(sd, &control, sizeof(char))) <= 0) {
259 perror("Error in reading control message from Participant\n");
262 recvcontrol = control;
264 //Update common data structure and increment count
265 tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
266 //Lock and update count
267 //Thread sleeps until all messages from pariticipants are received by coordinator
268 pthread_mutex_lock(tdata->lock);
271 if(*(tdata->count) == tdata->pilecount) {
272 pthread_cond_broadcast(tdata->threshold);
274 pthread_cond_wait(tdata->threshold, tdata->lock);
277 //process the participant's request
278 if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
279 printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
280 pthread_mutex_unlock(tdata->lock);
283 pthread_mutex_unlock(tdata->lock);
289 int transCommit(transrecord_t *record) {
290 chashlistnode_t *curr, *ptr, *next;
291 unsigned int size;//Represents number of bins in the chash table
292 unsigned int machinenum, tot_bytes_mod, *listmid;
293 objheader_t *headeraddr;
294 plistnode_t *tmp, *pile = NULL;
296 int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
297 char buffer[RECEIVE_BUFFER_SIZE],control;
298 char transid[TID_LEN];
299 static int newtid = 0;
300 trans_req_data_t *tosend;
302 ptr = record->lookupTable->table;
303 size = record->lookupTable->size;
304 //Look through all the objects in the cache and make piles
305 for(i = 0; i < size ;i++) {
307 //Inner loop to traverse the linked list of the cache lookupTable
308 while(curr != NULL) {
309 //if the first bin in hash table is empty
314 //Get machine location for object id
316 if ((machinenum = lhashSearch(curr->key)) == 0) {
317 printf("Error: No such machine\n");
321 //TODO only for debug
323 if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
324 printf("Error: No such oid\n");
327 //Make machine groups
328 if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
329 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
336 //Create the packet to be sent in TRANS_REQUEST
338 pilecount = pCount(pile); //Keeps track of the number of participants
340 //Thread related variables
341 pthread_t thread[pilecount]; //Create threads for each participant
343 pthread_cond_t tcond;
344 pthread_mutex_t tlock;
345 pthread_mutex_t tlshrd;
346 //thread_data_array_t thread_data_array[pilecount];
347 thread_data_array_t *thread_data_array;
349 thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
351 thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants
353 //Initialize and set thread detach attribute
354 pthread_attr_init(&attr);
355 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
356 pthread_mutex_init(&tlock, NULL);
357 pthread_cond_init(&tcond, NULL);
359 //Keep track of list of machine ids per transaction
360 if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
361 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
365 pListMid(pile, listmid);
366 //Process each machine group
368 //Create transaction id
370 //trans_req_data_t *tosend;
371 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
372 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
375 tosend->f.control = TRANS_REQUEST;
376 sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid);
377 tosend->f.mcount = pilecount;
378 tosend->f.numread = tmp->numread;
379 tosend->f.nummod = tmp->nummod;
380 tosend->f.sum_bytes = tmp->sum_bytes;
381 tosend->listmid = listmid;
382 tosend->objread = tmp->objread;
383 tosend->oidmod = tmp->oidmod;
384 thread_data_array[numthreads].thread_id = numthreads;
385 thread_data_array[numthreads].mid = tmp->mid;
386 thread_data_array[numthreads].pilecount = pilecount;
387 thread_data_array[numthreads].buffer = tosend;
388 thread_data_array[numthreads].recvmsg = rcvd_control_msg;
389 thread_data_array[numthreads].threshold = &tcond;
390 thread_data_array[numthreads].lock = &tlock;
391 thread_data_array[numthreads].count = &trecvcount;
392 thread_data_array[numthreads].rec = record;
394 rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);
396 perror("Error in pthread create");
406 // Free attribute and wait for the other threads
407 pthread_attr_destroy(&attr);
408 for (i = 0 ;i < pilecount ; i++) {
409 rc = pthread_join(thread[i], NULL);
412 printf("ERROR return code from pthread_join() is %d\n", rc);
418 pthread_cond_destroy(&tcond);
419 pthread_mutex_destroy(&tlock);
420 // for(i = 0 ;i< pilecount ;i++) {
428 //mnun will be used to represent the machine IP address later
429 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
431 struct sockaddr_in serv_addr;
432 struct hostent *server;
438 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
439 perror("Error in socket\n");
442 bzero((char*) &serv_addr, sizeof(serv_addr));
443 serv_addr.sin_family = AF_INET;
444 serv_addr.sin_port = htons(LISTEN_PORT);
445 //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
446 midtoIP(mnum,machineip);
447 machineip[15] = '\0';
448 serv_addr.sin_addr.s_addr = inet_addr(machineip);
450 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
451 perror("Error in connect\n");
454 char readrequest[sizeof(char)+sizeof(unsigned int)];
455 readrequest[0] = READ_REQUEST;
456 *((unsigned int *)(&readrequest[1])) = oid;
457 if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
458 perror("Error sending message\n");
463 printf("DEBUG -> ready to rcv ...\n");
465 //Read response from the Participant
466 if((val = read(sd, &control, sizeof(char))) <= 0) {
467 perror("No control response for getRemoteObj sent\n");
471 case OBJECT_NOT_FOUND:
475 if((val = read(sd, &size, sizeof(int))) <= 0) {
476 perror("No size is read from the participant\n");
479 objcopy = objstrAlloc(record->cache, size);
480 if((val = read(sd, objcopy, size)) <= 0) {
481 perror("No objects are read from the remote participant\n");
484 //Insert into cache's lookup table
485 chashInsert(record->lookupTable, oid, objcopy);
488 printf("Error in recv request from participant on a READ_REQUEST\n");