t = time(NULL);
req.tv_sec = 0;
req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
- nanosleep(&req, &rem);
+ //nanosleep(&req, &rem);
+ nanosleep(&req, NULL);
return;
}
rc = gettimeofday(&tp, NULL);
/* Convert from timeval to timespec */
- ts.tv_nsec = tp.tv_usec * 10;
+ ts.tv_nsec = tp.tv_usec * 1000;
/* Search local transaction cache */
if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
free(ltdata);
/* wait a random amount of time */
- if (treplyretry == 1)
+ if (treplyretry == 1)
randomdelay();
/* Retry trans commit procedure if not sucessful in the first try */
pthread_exit(NULL);
}
- printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
/* Send bytes of data with TRANS_REQUEST control message */
if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
perror("Error sending fixed bytes for thread\n");
return;
}
-/* This function sends the final response to remote machines per thread in their respective socket id */
+/* This function sends the final response to remote machines per thread in their respective socket id
+ * It returns a char that is only needed to check the correctness of execution of this function inside
+ * transRequest()*/
char sendResponse(thread_data_array_t *tdata, int sd) {
int n, N, sum, oidcount = 0;
char *ptr, retval = 0;
}
/* Send ack to Coordinator */
- printf("TRANS_SUCCESSFUL\n");
/*Free the pointer */
ptr = NULL;
/* dequeue node to create a machine piles and finally unlock mutex */
if((qnode = pre_dequeue()) == NULL) {
printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&pqueue.qlock);
pthread_exit(NULL);
}
pthread_mutex_unlock(&pqueue.qlock);
/* Dequeue node to send remote machine connections*/
if((mcpilenode = mcpiledequeue()) == NULL) {
printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&mcqueue.qlock);
pthread_exit(NULL);
}
/* Unlock mutex */
prehashInsert(oid, modptr);
} else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) {
/* Add the new object ptr to hash table */
+ prehashRemove(oid);
prehashInsert(oid, modptr);
- } else { /* Do nothing */
+ } else { /* Do nothing: TODO modptr should be reference counted */
;
}
} else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
prehashInsert(oid, modptr);
}
/* Lock the Prefetch Cache look up table*/
- pthread_mutex_lock(&pflookup.lock);
+ //pthread_mutex_lock(&pflookup.lock);
/* Broadcast signal on prefetch cache condition variable */
pthread_cond_broadcast(&pflookup.cond);
/* Unlock the Prefetch Cache look up table*/
- pthread_mutex_unlock(&pflookup.lock);
+ //pthread_mutex_unlock(&pflookup.lock);
} else if(buffer[index] == OBJECT_NOT_FOUND) {
/* Increment it to get the object */
/* TODO: For each object not found query DHT for new location and retrieve the object */