fixed some thread allocation bugs:
authorbdemsky <bdemsky>
Thu, 20 Sep 2007 09:43:59 +0000 (09:43 +0000)
committerbdemsky <bdemsky>
Thu, 20 Sep 2007 09:43:59 +0000 (09:43 +0000)
1) if thread creation fails, retry it...the os always the option just to not cooperate
2) need to either:
a) join a thread or
b) set it as a detached thread

Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 7b27ce8dfb3ee9b76f1b032e36c625884bf483c7..7828afa781e5fe950514c4f966a7fe4951538eb5 100644 (file)
@@ -35,7 +35,6 @@ int dstmInit(void)
        /* Initialize attribute for mutex */
        pthread_mutexattr_init(&mainobjstore_mutex_attr);
        pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
-       //pthread_mutex_init(&mainobjstore_mutex, NULL);
        pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
        if (mhashCreate(HASH_SIZE, LOADFACTOR))
                return 1; //failure
@@ -96,8 +95,12 @@ void *dstmListen()
        printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
        while(1)
        {
-               acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
-               pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
+         int retval;
+         acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
+         do {
+           retval=pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
+         } while(retval!=0);
+         pthread_detach(thread_dstm_accept);
        }
 }
 /* This function accepts a new connection request, decodes the control message in the connection 
index 6e6caba5399a5c0a73ec0880ce05501f32a7b8e7..c7ff1f40124c98c8fc8f522d0224781f646869c4 100644 (file)
@@ -106,7 +106,7 @@ int dstmStartup(const char * option) {
 
   if (master) {
     pthread_attr_init(&attr);
-    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
     pthread_create(&thread_Listen, &attr, dstmListen, NULL);
     return 1;
   } else {
@@ -124,6 +124,7 @@ int dstmStartup(const char * option) {
  * processes the prefetch requests */
 void transInit() {
        int t, rc;
+       int retval;
        //Create and initialize prefetch cache structure
        prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
 
@@ -141,15 +142,19 @@ void transInit() {
        //Initialize machine pile w/prefetch oids and offsets shared queue
        mcpileqInit();
        //Create the primary prefetch thread 
-       pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
+       
+       do {
+         retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
+       } while(retval!=0);
+       pthread_detach(tPrefetch);
+
        //Create and Initialize a pool of threads 
        /* Threads are active for the entire period runtime is running */
        for(t = 0; t< NUM_THREADS; t++) {
+         do {
                rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
-               if (rc) {
-                       printf("Thread create error %s, %d\n", __FILE__, __LINE__);
-                       return;
-               }
+         } while(rc!=0);
+         pthread_detach(wthreads[t]);
        }
 }
 
@@ -181,6 +186,7 @@ void randomdelay(void)
 /* This function initializes things required in the transaction start*/
 transrecord_t *transStart()
 {
+  printf("Starting transaction\n");
        transrecord_t *tmp = malloc(sizeof(transrecord_t));
        tmp->cache = objstrCreate(1048576);
        tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
@@ -467,7 +473,9 @@ int transCommit(transrecord_t *record) {
                        thread_data_array[threadnum].rec = record;
                        /* If local do not create any extra connection */
                        if(pile->mid != myIpAddr) { /* Not local */
+                         do {
                                rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);  
+                         } while(rc!=0);
                                if(rc) {
                                        perror("Error in pthread create\n");
                                        pthread_cond_destroy(&tcond);
@@ -484,7 +492,9 @@ int transCommit(transrecord_t *record) {
                        } else { /*Local*/
                                ltdata->tdata = &thread_data_array[threadnum];
                                ltdata->transinfo = &transinfo;
+                               do {
                                val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
+                               } while(val!=0);
                                if(val) {
                                        perror("Error in pthread create\n");
                                        pthread_cond_destroy(&tcond);