/* Initialize attribute for mutex */
pthread_mutexattr_init(&mainobjstore_mutex_attr);
pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
- //pthread_mutex_init(&mainobjstore_mutex, NULL);
pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
if (mhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
while(1)
{
- acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
- pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
+ int retval;
+ acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+ do {
+ retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
+ } while(retval!=0);
+ pthread_detach(thread_dstm_accept);
}
}
/* This function accepts a new connection request, decodes the control message in the connection
if (master) {
pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&thread_Listen, &attr, dstmListen, NULL);
return 1;
} else {
* processes the prefetch requests */
void transInit() {
int t, rc;
+ int retval;
//Create and initialize prefetch cache structure
prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
//Initialize machine pile w/prefetch oids and offsets shared queue
mcpileqInit();
//Create the primary prefetch thread
- pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
+
+ do {
+ retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
+ } while(retval!=0);
+ pthread_detach(tPrefetch);
+
//Create and Initialize a pool of threads
/* Threads are active for the entire period runtime is running */
for(t = 0; t< NUM_THREADS; t++) {
+ do {
rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
- if (rc) {
- printf("Thread create error %s, %d\n", __FILE__, __LINE__);
- return;
- }
+ } while(rc!=0);
+ pthread_detach(wthreads[t]);
}
}
/* This function initializes things required in the transaction start*/
transrecord_t *transStart()
{
+ printf("Starting transaction\n");
transrecord_t *tmp = malloc(sizeof(transrecord_t));
tmp->cache = objstrCreate(1048576);
tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
thread_data_array[threadnum].rec = record;
/* If local do not create any extra connection */
if(pile->mid != myIpAddr) { /* Not local */
+ do {
rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);
+ } while(rc!=0);
if(rc) {
perror("Error in pthread create\n");
pthread_cond_destroy(&tcond);
} else { /*Local*/
ltdata->tdata = &thread_data_array[threadnum];
ltdata->transinfo = &transinfo;
+ do {
val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
+ } while(val!=0);
if(val) {
perror("Error in pthread create\n");
pthread_cond_destroy(&tcond);