--- /dev/null
+#include "mcpileq.h"
+
+mcpileq_t mcqueue;
+
+void mcpileqInit(void) {
+ /* Initialize machine queue that containing prefetch oids and offset values sorted by remote machineid */
+ mcqueue.front = mcqueue.rear = NULL;
+ pthread_mutex_init(&mcqueue.qlock, NULL);
+ pthread_cond_init(&mcqueue.qcond, NULL);
+}
+
+/* Insert to the rear of machine pile queue */
+void mcpileenqueue(prefetchpile_t *node) {
+ if(mcqueue.front == NULL && mcqueue.rear == NULL) {
+ mcqueue.front = mcqueue.rear = node;
+ } else {
+ node->next = NULL;
+ mcqueue.rear->next = node;
+ mcqueue.rear = node;
+ }
+}
+
+/* Return the node pointed to by the front ptr of the queue */
+prefetchpile_t *mcpiledequeue(void) {
+ prefetchpile_t *retnode;
+ if(mcqueue.front == NULL) {
+ printf("Machune pile queue empty: Underfloe %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ retnode = mcqueue.front;
+ mcqueue.front = mcqueue.front->next;
+
+ return retnode;
+}
+
+/* Delete the node pointed to by the front ptr of the queue */
+void delnode() {
+ prefetchpile_t *delnode;
+ if((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
+ printf("The queue is empty: UNDERFLOW %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else if ((mcqueue.front == mcqueue.rear) && mcqueue.front != NULL && mcqueue.rear != NULL) {
+ printf("TEST1\n");
+ free(mcqueue.front);
+ mcqueue.front = mcqueue.rear = NULL;
+ } else {
+ delnode = mcqueue.front;
+ mcqueue.front = mcqueue.front->next;
+ printf("TEST2\n");
+ free(delnode);
+ }
+}
+
+void mcpiledelete(void) {
+ /* Remove each element */
+ while(mcqueue.front != NULL)
+ delqnode();
+ mcqueue.front = mcqueue.rear = NULL;
+}
+
+
+void mcpiledisplay() {
+ int mid;
+
+ prefetchpile_t *tmp = mcqueue.front;
+ while(tmp != NULL) {
+ printf("Remote machine id = %d\n", tmp->mid);
+ tmp = tmp->next;
+ }
+}
+
+
+
+
+
+
+
#include "dstm.h"
#include "ip.h"
#include "clookup.h"
+#include "machinepile.h"
#include "mlookup.h"
#include "llookup.h"
#include "plookup.h"
#define RECEIVE_BUFFER_SIZE 2048
#define NUM_THREADS 10
#define PREFETCH_CACHE_SIZE 1048576 //1MB
-
+/*
#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t)))
#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
+*/
/* Global Variables */
extern int classsize[];
extern primarypfq_t pqueue; // shared prefetch queue
+extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids
pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
pthread_t tPrefetch;
extern objstr_t *mainobjstore;
;
return i;
}
+inline int findmax(int *array, int arraylength) {
+ int max, i;
+ max = array[0];
+ for(i = 0; i < arraylength; i++){
+ if(array[i] > max) {
+ max = array[i];
+ }
+ }
+ return max;
+}
/* This function is a prefetch call generated by the compiler that
* populates the shared primary prefetch queue*/
void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) {
return; //Failure
//Initialize primary shared queue
queueInit();
+ //Initialize machine pile w/prefetch oids and offsets shared queue
+ mcpileqInit();
//Create the primary prefetch thread
pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
//Create and Initialize a pool of threads
}
pthread_mutex_unlock(&pqueue.qlock);
/* Reduce redundant prefetch requests */
- /* Group Requests by where objects are located */
-
+ checkPrefetchTuples(qnode);
+ /* Check if the tuples are found locally, if yes then reduce them further*/
+ /* and group requests by remote machine ids by calling the makePreGroups() */
+ foundLocal(qnode);
+
+ /* Lock mutex of pool queue */
+ pthread_mutex_lock(&mcqueue.qlock);
+ /* Update the pool queue with the new remote machine piles generated per prefetch call */
+
+ /* Broadcast signal on pool queue */
+
+ /* Unlock mutex of pool queue */
}
}
k++;
}
} else {
- printf("i = %d, j = %d\n", i, j);
k = endoffsets[i-1];
index = endoffsets[j-1];
printf("Value of slength = %d\n", slength);
for(count = 0; count < slength; count++) {
- printf("Value of count =%d\n", count);
if(arryfields[k] != arryfields[index]) {
break;
}
index++;
k++;
}
- printf("Value of count =%d\n", count);
}
- printf("The value of sindex = %d\n", sindex);
if(slength == count) {
- printf("DEBUG-> Inside slength if %d\n", sindex);
oid[sindex] = -1;
}
}
* and copy left over offsets into the arrayoffsetfieldarray*/
oid[iter] = objoid;
numoffset[iter] = numoffset[iter] - (i+1);
- if(iter == 0)
- endoffsets[iter] = numoffset[iter];
- else
- endoffsets[iter] = numoffset[iter] + endoffsets[iter - 1];
for(k = 0; k < numoffset[iter] ; k++) {
- arryfields[k] = arryfields[counter+1];
- counter++;
+ arryfields[endoffsets[counter]+k] = arryfields[endoffsets[counter]+k+1];
}
if(flag == 0) {
oid[iter] = -1;
numoffset[iter] = 0;
- endoffsets[iter] = 0;
}
}
+/* This function makes machine piles to be added into the machine pile queue for each prefetch call */
+void makePreGroups(prefetchqelem_t *node, int *numoffset) {
+ char *ptr, *tmp;
+ int ntuples, slength, i, machinenum;
+ int maxoffset;
+ unsigned int *oid;
+ short *endoffsets, *arryfields, *offset;
+ prefetchpile_t *head = NULL;
+
+ /* Check for the case x.y.z and a.b.c are same oids */
+ ptr = (char *) node;
+ ntuples = *(GET_NTUPLES(ptr));
+ oid = GET_PTR_OID(ptr);
+ endoffsets = GET_PTR_EOFF(ptr, ntuples);
+ arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+
+ /* Check for redundant tuples by comparing oids of each tuple */
+ for(i = 0; i < ntuples; i++) {
+ if(oid[i] == -1)
+ continue;
+ /* For each tuple make piles */
+ if ((machinenum = lhashSearch(oid[i])) == 0) {
+ printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+ /* Insert into machine pile */
+ offset = &arryfields[endoffsets[i-1]];
+ insertPile(machinenum, oid[i], numoffset[i], offset, head);
+ }
+
+ return;
+}
+
+
/* This function checks if the oids within the prefetch tuples are available locally.
* If yes then makes the tuple invalid. If no then rearranges oid and offset values in
* the prefetchqelem_t node to represent a new prefetch tuple */
index = endoffsets[i - 1];
for(j = 0 ; j < numoffset[i] ; j++) {
objoid = *(tmp + sizeof(objheader_t) + arryfields[index]);
+ /*If oid found locally then
+ *assign the latest oid found as the new oid
+ *and copy left over offsets into the arrayoffsetfieldarray*/
+ oid[i] = objoid;
+ numoffset[i] = numoffset[i] - (j+1);
+ for(k = 0; k < numoffset[i]; k++)
+ arryfields[endoffsets[j]+ k] = arryfields[endoffsets[j]+k+1];
index++;
/*New offset oid not found */
if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) {
flag = 1;
- checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound);
+ checkPreCache(node, &numoffset, j, numoffset[i], objoid, index, i, oidnfound);
break;
} else
flag = 0;
}
- /*If oid not found locally then
- *assign the latest oid found as the new oid
- *and copy left over offsets into the arrayoffsetfieldarray*/
- oid[i] = objoid;
- numoffset[i] = numoffset[i] - (j+1);
- if(i == 0)
- endoffsets[i] = numoffset[i];
- else
- endoffsets[i] = numoffset[i] - endoffsets[i - 1];
- for(k = 0; k < numoffset[i]; k++) {
- arryfields[k] = arryfields[j+1];
- j++;
- }
+
/*If all offset oids are found locally,make the prefetch tuple invalid */
if(flag == 0) {
oid[i] = -1;
numoffset[i] = 0;
- endoffsets[i] = 0;
}
} else {
oidnfound = 1;
/* Look in Prefetch cache */
- checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound);
+ checkPreCache(node, &numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound);
}
}
+ // Make machine groups
+ makePreGroups(node, numoffset);
}
-void makePreGroups(prefetchqelem_t *node) {
- char *ptr, *tmp;
- int ntuples, slength, i, machinenum;
- unsigned int *oid;
- short *endoffsets, *arryfields;
-
-
- /* Check for the case x.y.z and a.b.c are same oids */
- ptr = (char *) node;
- ntuples = *(GET_NTUPLES(ptr));
- oid = GET_PTR_OID(ptr);
- endoffsets = GET_PTR_EOFF(ptr, ntuples);
- arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
- /* Find offset length for each tuple */
- int numoffset[ntuples];
- numoffset[0] = endoffsets[0];
- for(i = 1; i<ntuples; i++) {
- numoffset[i] = endoffsets[i] - endoffsets[i-1];
- }
-
- /* Check for redundant tuples by comparing oids of each tuple */
- for(i = 0; i < ntuples; i++) {
- if(oid[i] == -1)
- continue;
- /* For each tuple make piles */
- if ((machinenum = lhashSearch(oid[i])) == 0) {
- printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__);
- return;
- }
- }
-
-}
/*This function is called by the thread that processes the
* prefetch request makes piles to prefetch records and prefetches the oids from remote machines */