bug fixes and add machine pile queue DS that saves oids and offsets meant for
authoradash <adash>
Mon, 30 Jul 2007 19:47:14 +0000 (19:47 +0000)
committeradash <adash>
Mon, 30 Jul 2007 19:47:14 +0000 (19:47 +0000)
remote machines

Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/machinepile.c [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/machinepile.h [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/mcpileq.c [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/mcpileq.h [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/trans.c

index c937a57b7f12aa8d7796e56bc49aa0058a776940..957a68e95bb3b8d81dcfd8438512c78278821ef1 100644 (file)
@@ -5,6 +5,12 @@
 #define MSG_NOSIGNAL 0
 #endif
 
+#define GET_NTUPLES(x)         ((int *)(x + sizeof(prefetchqelem_t)))
+#define GET_PTR_OID(x)         ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
+#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
+#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
+
+
 //Coordinator Messages
 #define READ_REQUEST           1
 #define READ_MULT_REQUEST      2
@@ -42,6 +48,7 @@
 #include <pthread.h>
 #include "clookup.h"
 #include "queue.h"
+#include "mcpileq.h"
 
 #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
 #define TID_LEN 20
@@ -135,27 +142,15 @@ typedef struct objinfo {
 
 //Structure for members within prefetch tuples
 typedef struct member {
-        short offset;
-        short index;
-        struct member *next;
- }trans_member_t;
-
-
-//Structure for prefetching tuples generated by teh compiler
- typedef struct prefetchpile{
-        int mid;
-        int *oids;
-
-        int **numofarrys;
-        struct prefetchpile *next;
- }prefetchpile_t;
-
-//Structure per Oid in the prefetch call
+       short offset;
+       short index;
+       struct member *next;
+}trans_member_t;
 
 /*
 //Structure that holds the compiler generated prefetch data
 typedef struct compprefetchdata {
-       transrecord_t *record;
+transrecord_t *record;
 } compprefetchdata_t;
 */
 
diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.c b/Robust/src/Runtime/DSTM/interface/machinepile.c
new file mode 100644 (file)
index 0000000..58fe1b9
--- /dev/null
@@ -0,0 +1,48 @@
+#include "machinepile.h"
+
+int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t *head) {
+       prefetchpile_t *tmp = head;
+       objpile_t *objnode;
+       unsigned int *oidarray;
+       int ntuples;
+       char found = 0;
+
+       while (tmp != NULL) {
+               if (tmp->mid == mid) { // Found a match with exsisting machine id
+                       if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
+                               printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+                               return -1;
+                       }
+                       /* Fill objpiles DS */
+                       objnode->oid = oid;
+                       objnode->numoffset = numoffset;
+                       objnode->offset = offset;
+                       objnode->next = tmp->objpiles;
+                       tmp->objpiles = objnode;
+                       found = 1;
+                       break;
+               }
+               tmp = tmp->next;
+       }
+       if (!found) {// Not found => insert new mid DS
+               if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) {
+                       printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+                       return -1;
+               }
+               tmp->mid = mid;
+               if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) {
+                       printf("Calloc error: %s %d\n", __FILE__, __LINE__);
+                       return -1;
+               }
+               /* Fill objpiles DS */
+               objnode->oid = oid;
+               objnode->numoffset = numoffset;
+               objnode->offset = offset;
+               objnode->next = tmp->objpiles; // i.e., objnode->next = NULL;
+               tmp->objpiles = objnode;
+               tmp->next = head;
+               head = tmp;
+       }
+       return 0;
+}
+
diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.h b/Robust/src/Runtime/DSTM/interface/machinepile.h
new file mode 100644 (file)
index 0000000..b8ca3d6
--- /dev/null
@@ -0,0 +1,10 @@
+#ifndef _MACHINEPILE_H_
+#define _MACHINEPILE_H_
+
+#include "mcpileq.h"
+#include <stdio.h>
+#include <stdlib.h>
+
+int insertPile(int, unsigned int, short, short *, prefetchpile_t *);
+
+#endif
diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c
new file mode 100644 (file)
index 0000000..bbb608d
--- /dev/null
@@ -0,0 +1,77 @@
+#include "mcpileq.h"
+
+mcpileq_t mcqueue;
+
+void mcpileqInit(void) {
+       /* Initialize machine queue that containing prefetch oids and offset values  sorted by remote machineid */  
+       mcqueue.front = mcqueue.rear = NULL;
+       pthread_mutex_init(&mcqueue.qlock, NULL); 
+       pthread_cond_init(&mcqueue.qcond, NULL); 
+}
+
+/* Insert to the rear of machine pile queue */
+void mcpileenqueue(prefetchpile_t *node) {
+       if(mcqueue.front == NULL && mcqueue.rear == NULL) {
+               mcqueue.front = mcqueue.rear = node;
+       } else {
+               node->next = NULL;
+               mcqueue.rear->next = node;
+               mcqueue.rear = node;
+       }
+}
+
+/* Return the node pointed to by the front ptr of the queue */
+prefetchpile_t *mcpiledequeue(void) {
+       prefetchpile_t *retnode;
+       if(mcqueue.front == NULL) {
+               printf("Machune pile queue empty: Underfloe %s %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+       retnode = mcqueue.front;
+       mcqueue.front = mcqueue.front->next;
+
+       return retnode;
+}
+
+/* Delete the node pointed to by the front ptr of the queue */
+void delnode() {
+       prefetchpile_t *delnode;
+       if((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
+               printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
+               return;
+       } else if ((mcqueue.front == mcqueue.rear) && mcqueue.front != NULL && mcqueue.rear != NULL) {
+               printf("TEST1\n");
+               free(mcqueue.front);
+               mcqueue.front = mcqueue.rear = NULL;
+       } else {
+               delnode = mcqueue.front;
+               mcqueue.front = mcqueue.front->next;
+               printf("TEST2\n");
+               free(delnode);
+       }
+}
+
+void mcpiledelete(void) {
+       /* Remove each element */
+       while(mcqueue.front != NULL)
+               delqnode();
+       mcqueue.front = mcqueue.rear = NULL;
+}
+
+
+void mcpiledisplay() {
+       int mid;
+
+       prefetchpile_t *tmp = mcqueue.front;
+       while(tmp != NULL) {
+               printf("Remote machine id = %d\n", tmp->mid);
+               tmp = tmp->next;
+       }
+}
+
+
+
+
+
+
+
diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.h b/Robust/src/Runtime/DSTM/interface/mcpileq.h
new file mode 100644 (file)
index 0000000..7add8de
--- /dev/null
@@ -0,0 +1,37 @@
+#ifndef _MCPILEQ_H_
+#define _MCPILEQ_H_
+
+#include<pthread.h>
+#include<stdio.h>
+#include<stdlib.h>
+#include<string.h>
+
+//Structure to make machine groups when prefetching
+typedef struct objpile { 
+       unsigned int oid;
+       short numoffset;
+       short *offset;
+       struct objpile *next;
+}objpile_t;
+
+//Structure for prefetching tuples generated by the compiler
+typedef struct prefetchpile {
+       int mid;
+       objpile_t *objpiles;
+       struct prefetchpile *next;
+}prefetchpile_t;
+
+typedef struct mcpileq {
+       prefetchpile_t *front, *rear;
+       pthread_mutex_t qlock;
+       pthread_cond_t qcond;
+}mcpileq_t;
+
+void mcpileqInit(void);
+void mcpileenqueue(prefetchpile_t *);
+prefetchpile_t *mcpiledequeue(void);
+void delnode();
+void mcpiledelete();
+void mcpiledisplay();
+
+#endif
index f164528f6e4462ab5af24ddfc68024406f44c99a..298e0d115b250a9320afa7a7b44a9c43bf250147 100644 (file)
@@ -3,13 +3,13 @@
 primarypfq_t pqueue; //Global queue
 
 void queueInit(void) {
-       /* Intitialize primary thread */
+       /* Intitialize primary queue */
        pqueue.front = pqueue.rear = NULL;
        pthread_mutex_init(&pqueue.qlock, NULL);
        pthread_cond_init(&pqueue.qcond, NULL);
 }
 
-/* Removes the first element of the queue */
+/* Delete the node pointed to by the front ptr of the queue */
 void delqnode() {
        prefetchqelem_t *delnode;
        if((pqueue.front == NULL) && (pqueue.rear == NULL)) {
@@ -45,6 +45,7 @@ void enqueue(prefetchqelem_t *qnode) {
        }
 }
 
+/* Return the node pointed to by the front ptr of the queue */
 prefetchqelem_t *dequeue(void) {
        prefetchqelem_t *retnode;
        if (pqueue.front == NULL) {
@@ -52,7 +53,6 @@ prefetchqelem_t *dequeue(void) {
                return NULL;
        }
        retnode = pqueue.front;
-       //TODO make this atomic
        pqueue.front = pqueue.front->next;
 
        return retnode;
index 2bf2d0195700bb2bf506b9ca36a5d47710545447..2ddf7d2693e4772f138c2d492d04afd1c94e617c 100644 (file)
@@ -1,6 +1,7 @@
 #include "dstm.h"
 #include "ip.h"
 #include "clookup.h"
+#include "machinepile.h"
 #include "mlookup.h"
 #include "llookup.h"
 #include "plookup.h"
 #define RECEIVE_BUFFER_SIZE 2048
 #define NUM_THREADS 10
 #define PREFETCH_CACHE_SIZE 1048576 //1MB
-
+/*
 #define GET_NTUPLES(x)         ((int *)(x + sizeof(prefetchqelem_t)))
 #define GET_PTR_OID(x)         ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
 #define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
 #define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
+*/
 
 /* Global Variables */
 extern int classsize[];
 extern primarypfq_t pqueue; // shared prefetch queue
+extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
 pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
 pthread_t tPrefetch;
 extern objstr_t *mainobjstore;
@@ -41,6 +44,16 @@ inline int arrayLength(int *array) {
                ;
        return i;
 }
+inline int findmax(int *array, int arraylength) {
+       int max, i;
+       max = array[0];
+       for(i = 0; i < arraylength; i++){
+               if(array[i] > max) {
+                       max = array[i];
+               }
+       }
+       return max;
+}
 /* This function is a prefetch call generated by the compiler that
  * populates the shared primary prefetch queue*/
 void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) {
@@ -84,6 +97,8 @@ void transInit() {
                        return; //Failure
        //Initialize primary shared queue
        queueInit();
+       //Initialize machine pile w/prefetch oids and offsets shared queue
+       mcpileqInit();
        //Create the primary prefetch thread 
        pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
        //Create and Initialize a pool of threads 
@@ -919,9 +934,19 @@ void *transPrefetch(void *prefdata) {
                }
                pthread_mutex_unlock(&pqueue.qlock);
                /* Reduce redundant prefetch requests */
-               /* Group Requests by where objects are located */
-       
+               checkPrefetchTuples(qnode);
+               /* Check if the tuples are found locally, if yes then reduce them further*/ 
+               /* and group requests by remote machine ids by calling the makePreGroups() */
+               foundLocal(qnode);
+               
+               /* Lock mutex of pool queue */
+               pthread_mutex_lock(&mcqueue.qlock);
+               /* Update the pool queue with the new remote machine piles generated per prefetch call */
+
 
+               /* Broadcast signal on pool queue */
+
+               /* Unlock mutex of pool queue */
 
        }
 }
@@ -984,24 +1009,19 @@ void checkPrefetchTuples(prefetchqelem_t *node) {
                                                k++;
                                        }       
                                } else {
-                                       printf("i = %d, j = %d\n", i, j);
                                        k = endoffsets[i-1];
                                        index = endoffsets[j-1];
                                        printf("Value of slength = %d\n", slength);
                                        for(count = 0; count < slength; count++) {
-                                               printf("Value of count =%d\n", count);
                                                if(arryfields[k] != arryfields[index]) {
                                                        break;
                                                }
                                                index++;
                                                k++;
                                        }
-                                       printf("Value of count =%d\n", count);
                                }
-                               printf("The value of sindex = %d\n", sindex);
 
                                if(slength == count) {
-                                       printf("DEBUG-> Inside slength if %d\n", sindex);
                                        oid[sindex] = -1;
                                }
                        }
@@ -1058,22 +1078,50 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopc
         * and copy left over offsets into the arrayoffsetfieldarray*/
        oid[iter] = objoid;
        numoffset[iter] = numoffset[iter] - (i+1);
-       if(iter == 0)
-               endoffsets[iter] = numoffset[iter];
-       else
-               endoffsets[iter] = numoffset[iter] + endoffsets[iter - 1];
        for(k = 0; k < numoffset[iter] ; k++) {
-               arryfields[k] = arryfields[counter+1];
-               counter++;
+               arryfields[endoffsets[counter]+k] = arryfields[endoffsets[counter]+k+1];
        }
 
        if(flag == 0) {
                oid[iter] = -1;
                numoffset[iter] = 0;
-               endoffsets[iter] = 0;
        }
 }
 
+/* This function makes machine piles to be added into the machine pile queue for each prefetch call */
+void makePreGroups(prefetchqelem_t *node, int *numoffset) {
+       char *ptr, *tmp;
+       int ntuples, slength, i, machinenum;
+       int maxoffset;
+       unsigned int *oid;
+       short *endoffsets, *arryfields, *offset; 
+       prefetchpile_t *head = NULL;
+
+       /* Check for the case x.y.z and a.b.c are same oids */ 
+       ptr = (char *) node;
+       ntuples = *(GET_NTUPLES(ptr));
+       oid = GET_PTR_OID(ptr);
+       endoffsets = GET_PTR_EOFF(ptr, ntuples); 
+       arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+
+       /* Check for redundant tuples by comparing oids of each tuple */
+       for(i = 0; i < ntuples; i++) {
+               if(oid[i] == -1)
+                       continue;
+               /* For each tuple make piles */
+               if ((machinenum = lhashSearch(oid[i])) == 0) {
+                       printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
+                       return;
+               }
+               /* Insert into machine pile */
+               offset = &arryfields[endoffsets[i-1]];
+               insertPile(machinenum, oid[i], numoffset[i], offset, head);
+       }
+
+       return;
+}
+
+
 /* This function checks if the oids within the prefetch tuples are available locally.
  * If yes then makes the tuple invalid. If no then rearranges oid and offset values in 
  * the prefetchqelem_t node to represent a new prefetch tuple */
@@ -1110,75 +1158,39 @@ void foundLocal(prefetchqelem_t *node) {
                                index = endoffsets[i - 1];
                        for(j = 0 ; j < numoffset[i] ; j++) {
                                objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
+                               /*If oid found locally then 
+                                *assign the latest oid found as the new oid 
+                                *and copy left over offsets into the arrayoffsetfieldarray*/
+                               oid[i] = objoid;
+                               numoffset[i] = numoffset[i] - (j+1);
+                               for(k = 0; k < numoffset[i]; k++)
+                                       arryfields[endoffsets[j]+ k] = arryfields[endoffsets[j]+k+1];
                                index++;
                                /*New offset oid not found */
                                if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
                                        flag = 1;
-                                       checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound); 
+                                       checkPreCache(node, &numoffset, j, numoffset[i], objoid, index, i, oidnfound); 
                                        break;
                                } else 
                                        flag = 0;
                        }
-                       /*If oid not found locally then 
-                        *assign the latest oid found as the new oid 
-                        *and copy left over offsets into the arrayoffsetfieldarray*/
-                       oid[i] = objoid;
-                       numoffset[i] = numoffset[i] - (j+1);
-                       if(i == 0)
-                               endoffsets[i] = numoffset[i];
-                       else 
-                               endoffsets[i] = numoffset[i] - endoffsets[i - 1];
-                       for(k = 0; k < numoffset[i]; k++) {
-                               arryfields[k] = arryfields[j+1];
-                               j++;
-                       }
+               
                        /*If all offset oids are found locally,make the prefetch tuple invalid */
                        if(flag == 0) {
                                oid[i] = -1;
                                numoffset[i] = 0;
-                               endoffsets[i] = 0;
                        }
                } else {
                        oidnfound = 1;
                        /* Look in Prefetch cache */
-                       checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); 
+                       checkPreCache(node, &numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); 
                }
 
        }
+       // Make machine groups
+       makePreGroups(node, numoffset);
 }
 
-void makePreGroups(prefetchqelem_t *node) {
-       char *ptr, *tmp;
-       int ntuples, slength, i, machinenum;
-       unsigned int *oid;
-       short *endoffsets, *arryfields; 
-
-
-       /* Check for the case x.y.z and a.b.c are same oids */ 
-       ptr = (char *) node;
-       ntuples = *(GET_NTUPLES(ptr));
-       oid = GET_PTR_OID(ptr);
-       endoffsets = GET_PTR_EOFF(ptr, ntuples); 
-       arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
-       /* Find offset length for each tuple */
-       int numoffset[ntuples];
-       numoffset[0] = endoffsets[0];
-       for(i = 1; i<ntuples; i++) {
-               numoffset[i] = endoffsets[i] - endoffsets[i-1];
-       }
-
-       /* Check for redundant tuples by comparing oids of each tuple */
-       for(i = 0; i < ntuples; i++) {
-               if(oid[i] == -1)
-                       continue;
-               /* For each tuple make piles */
-               if ((machinenum = lhashSearch(oid[i])) == 0) {
-                       printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
-                       return;
-               }
-       }
-
-}
 
 /*This function is called by the thread that processes the 
  * prefetch request makes piles to prefetch records and prefetches the oids from remote machines */