prefetch.
commented out array implementation of socket pools
A) allocations always have to traverse to end of list
B) do we need to zero first?? -- need to check about this one, it may be okay
+Status:Verified
+
1) Wrap all receive calls in loops
A) Perhaps the best way is to just define a macro or function call that
does this. Look at GETSIZE macro for example...
+Status:DONE
+
2) Check locking... There is likely a race condition on getObjType().
+Status:DONE
+
3) Receiving object code assume a maximum object size. It is probably
better to:
A) read size in.
B) allocate space for object at its final destination
C) read into the space
+Status:DONE
+
Low priority list
---------------------------------
1) We shouldn't call memcopy for copying fixed-sized structs or primitive
values...just use =
+
+Status: DONE in most places
#define MAX_OBJECTS 20
//Max remote-machine connections
#define NUM_MACHINES 2
+#define LOADFACTOR 0.5
#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
//Transaction id per machine
#define TID_LEN 20
+#define LISTEN_PORT 2156
#include <stdlib.h>
#include "queue.h"
#include "mcpileq.h"
#include "threadnotify.h"
-
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <errno.h>
+#include <time.h>
+#include "sockpool.h"
//bit designations for status field of objheader
#define DIRTY 0x01
struct objstr *next;
} objstr_t;
+typedef struct oidmidpair {
+ unsigned int oid;
+ unsigned int mid;
+} oidmidpair_t;
+
typedef struct transrecord {
objstr_t *cache;
chashtable_t *lookupTable;
/* Coordinator => Machine that initiates the transaction request call for commiting a transaction
* Participant => Machines that host the objects involved in a transaction commit */
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <netdb.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <string.h>
#include "dstm.h"
#include "mlookup.h"
#include "llookup.h"
#include "thread.h"
#endif
-
-#define LISTEN_PORT 2156
#define BACKLOG 10 //max pending connections
#define RECEIVE_BUFFER_SIZE 2048
extern int classsize[];
+extern int numHostsInSystem;
objstr_t *mainobjstore;
pthread_mutex_t mainobjstore_mutex;
pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
-/**********************************************************
- * Global variables to map socketid and remote mid
- * to resuse sockets
- **************************************************/
-midSocketInfo_t sockArray[NUM_MACHINES];
-int sockCount; //number of connections with all remote machines(one socket per mc)
-int sockIdFound; //track if socket file descriptor is already established
-pthread_mutex_t sockLock = PTHREAD_MUTEX_INITIALIZER; //lock to prevent global sock variables to be inconsistent
+
+sockPoolHashTable_t *transPResponseSocketPool;
/* This function initializes the main objects store and creates the
* global machine and location lookup table */
if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
return 1; //failure
-
- //Initialize mid to socketid mapping array
- int t;
- sockCount = 0;
- for(t = 0; t < NUM_MACHINES; t++) {
- sockArray[t].mid = 0;
- sockArray[t].sockid = 0;
- }
+
+ //Initialize socket pool
+ if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1, LOADFACTOR)) == NULL) {
+ printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
+ return 0;
+ }
return 0;
}
unsigned short objType, *versionarry, version;
unsigned int *oidarry, numoid, mid, threadid;
+ /*
transinfo.objlocked = NULL;
transinfo.objnotfound = NULL;
transinfo.modptr = NULL;
transinfo.numlocked = 0;
transinfo.numnotfound = 0;
+ */
/* Receive control messages from other machines */
while(1) {
case TRANS_REQUEST:
/* Read transaction request */
+ transinfo.objlocked = NULL;
+ transinfo.objnotfound = NULL;
+ transinfo.modptr = NULL;
+ transinfo.numlocked = 0;
+ transinfo.numnotfound = 0;
if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
pthread_exit(NULL);
transinfo->modptr = modptr;
transinfo->numlocked = *(objlocked);
transinfo->numnotfound = *(objnotfound);
-
+
return control;
}
int prefetchReq(int acceptfd) {
int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
- int length, sd = -1;
+ int length;
char *recvbuffer, *sendbuffer, control;
unsigned int oid, mid;
- short *offsetarry;
objheader_t *header;
struct sockaddr_in remoteAddr;
+ oidmidpair_t oidmid;
do {
recv_data((int)acceptfd, &length, sizeof(int));
if(length != -1) {
- size = length - sizeof(int);
- if((recvbuffer = calloc(1, size)) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- return -1;
- }
- recv_data((int)acceptfd, recvbuffer, size);
- oid = *((unsigned int *) recvbuffer);
- mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
- size = size - (2 * sizeof(unsigned int));
- numoffset = size / sizeof(short);
- if((offsetarry = calloc(numoffset, sizeof(short))) == NULL) {
- printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- free(recvbuffer);
- return -1;
- }
- memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
- free(recvbuffer);
- pthread_mutex_lock(&sockLock);
- sockIdFound = 0;
- pthread_mutex_unlock(&sockLock);
- /* If socket is already established then send data reusing socket */
- for(i = 0; i < NUM_MACHINES; i++) {
- if(sockArray[i].mid == mid) {
- sd = sockArray[i].sockid;
- pthread_mutex_lock(&sockLock);
- sockIdFound = 1;
- pthread_mutex_unlock(&sockLock);
- break;
- }
- }
-
- if(sockIdFound == 0) {
- if(sockCount < NUM_MACHINES) {
- /* Create socket to send information */
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
- perror("prefetchReq():socket()");
- return -1;
- }
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
-
- if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
- perror("connect");
- printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- close(sd);
- return -1;
- }
- sockArray[sockCount].mid = mid;
- sockArray[sockCount].sockid = sd;
- pthread_mutex_lock(&sockLock);
- sockCount++;
- pthread_mutex_unlock(&sockLock);
- } else {
- //TODO Fix for connecting to more than 2 machines && close socket
- printf("%s(): Error: Currently works for only 2 machines\n", __func__);
- return -1;
- }
- }
+ recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
+ oid = oidmid.oid;
+ mid = oidmid.mid;
+ size = length - sizeof(int) - (2 * sizeof(unsigned int));
+ numoffset = size/sizeof(short);
+ short offsetarry[numoffset];
+ recv_data((int) acceptfd, offsetarry, size);
+
+ int sd = -1;
+ if((sd = getSock(transPResponseSocketPool, mid)) == -1) {
+ printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
+ exit(-1);
+ }
/*Process each oid */
if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
if((sendbuffer = calloc(1, size)) == NULL) {
printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- free(offsetarry);
close(sd);
return -1;
}
control = TRANS_PREFETCH_RESPONSE;
if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- free(offsetarry);
printf("Error: %s() in sending prefetch response at %s, %d\n",
__func__, __FILE__, __LINE__);
close(sd);
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
if((sendbuffer = calloc(1, size)) == NULL) {
printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- free(offsetarry);
close(sd);
return -1;
}
control = TRANS_PREFETCH_RESPONSE;
if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- free(offsetarry);
printf("Error: %s() in sending prefetch response at %s, %d\n",
__func__, __FILE__, __LINE__);
close(sd);
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
if((sendbuffer = calloc(1, size)) == NULL) {
printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
- free(offsetarry);
close(sd);
return -1;
}
control = TRANS_PREFETCH_RESPONSE;
if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- free(offsetarry);
printf("Error: %s() in sending prefetch response at %s, %d\n",
__FILE__, __LINE__);
close(sd);
size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
if((sendbuffer = calloc(1, size)) == NULL) {
printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
- free(offsetarry);
close(sd);
return -1;
}
control = TRANS_PREFETCH_RESPONSE;
if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
- free(offsetarry);
printf("Error: %s() in sending prefetch response at %s, %d\n",
__func__, __FILE__, __LINE__);
close(sd);
}
isArray = 0;
}
- free(offsetarry);
}
+
+ //Release socket
+ int status;
+ if((status = freeSock(transPResponseSocketPool, mid, sd)) == -1) {
+ printf("Error: in releasing socket at %s line %d\n", __FILE__, __LINE__);
+ return -1;
+ }
}
} while (length != -1);
return 0;
#include "sockpool.h"
-sockPoolHashTable_t sockhash;
-
inline int CompareAndSwap(int *a, int oldval, int newval) {
int temp = *a;
if (temp == oldval) {
*s = 0;
}
-int createSockPool(unsigned int size, float loadfactor) {
+sockPoolHashTable_t *createSockPool(sockPoolHashTable_t * sockhash, unsigned int size, float loadfactor) {
+ if((sockhash = calloc(1, sizeof(sockPoolHashTable_t))) == NULL) {
+ printf("Calloc error at %s line %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+
socknode_t **nodelist;
if ((nodelist = calloc(size, sizeof(socknode_t *))) < 0) {
printf("Calloc error at %s line %d\n", __FILE__, __LINE__);
- return -1;
+ free(sockhash);
+ return NULL;
}
- sockhash.table = nodelist;
- sockhash.inuse = NULL;
- sockhash.size = size;
- sockhash.numelements = 0;
- sockhash.loadfactor = loadfactor;
- InitLock(&sockhash.mylock);
- return 0;
+
+ sockhash->table = nodelist;
+ sockhash->inuse = NULL;
+ sockhash->size = size;
+ sockhash->numelements = 0;
+ sockhash->loadfactor = loadfactor;
+ InitLock(&sockhash->mylock);
+
+ return sockhash;
}
int createNewSocket(unsigned int mid) {
return sd;
}
-int getSock(unsigned int mid) {
- int key = mid%(sockhash.size);
+int getSock(sockPoolHashTable_t *sockhash, unsigned int mid) {
+ int key = mid%(sockhash->size);
+
+ if (sockhash->table[key] == NULL) {
+ int sd;
+ if((sd = createNewSocket(mid)) != -1) {
+ socknode_t *inusenode = calloc(1, sizeof(socknode_t));
+ inusenode->mid = mid;
+ inusenode->sd = sd;
+ insToList(sockhash, inusenode);
+ return sd;
+ } else {
+ return -1;
+ }
+ }
+
+ int midFound = 0;
+ socknode_t *ptr = sockhash->table[key];
+ socknode_t *prev = (socknode_t *) &(sockhash->table[key]);
+ while (ptr != NULL) {
+ if (mid == ptr->mid) {
+ midFound = 1;
+ int sd = ptr->sd;
+ prev = ptr->next;
+ insToList(sockhash, ptr);
+ return sd;
+ }
+ prev = ptr;
+ ptr = ptr->next;
+ }
+
+ if(midFound == 0) {
+ int sd;
+ if((sd = createNewSocket(mid)) != -1) {
+ socknode_t *inusenode = calloc(1, sizeof(socknode_t));
+ inusenode->mid = mid;
+ inusenode->sd = sd;
+ insToList(sockhash, inusenode);
+ return sd;
+ } else {
+ return -1;
+ }
+ }
+ return -1;
+}
+
+int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
+ int key = mid%(sockhash->size);
- Lock(&sockhash.mylock);
- if (sockhash.table[key] == NULL) {
- UnLock(&sockhash.mylock);
+ Lock(&sockhash->mylock);
+ if (sockhash->table[key] == NULL) {
+ UnLock(&sockhash->mylock);
int sd;
if((sd = createNewSocket(mid)) != -1) {
socknode_t *inusenode = calloc(1, sizeof(socknode_t));
inusenode->mid = mid;
inusenode->sd = sd;
- insToList(inusenode);
+ insToListWithLock(sockhash, inusenode);
return sd;
} else {
return -1;
}
}
- UnLock(&sockhash.mylock);
+ UnLock(&sockhash->mylock);
int midFound = 0;
- Lock(&sockhash.mylock);
- socknode_t *ptr = sockhash.table[key];
- socknode_t *prev = (socknode_t *) &(sockhash.table[key]);
+ Lock(&sockhash->mylock);
+ socknode_t *ptr = sockhash->table[key];
+ socknode_t *prev = (socknode_t *) &(sockhash->table[key]);
while (ptr != NULL) {
if (mid == ptr->mid) {
midFound = 1;
int sd = ptr->sd;
prev = ptr->next;
- UnLock(&sockhash.mylock);
- insToList(ptr);
+ UnLock(&sockhash->mylock);
+ insToListWithLock(sockhash, ptr);
return sd;
}
prev = ptr;
ptr = ptr->next;
}
- UnLock(&sockhash.mylock);
+ UnLock(&sockhash->mylock);
+
if(midFound == 0) {
int sd;
if((sd = createNewSocket(mid)) != -1) {
socknode_t *inusenode = calloc(1, sizeof(socknode_t));
inusenode->mid = mid;
inusenode->sd = sd;
- insToList(inusenode);
+ insToListWithLock(sockhash, inusenode);
return sd;
} else {
return -1;
return -1;
}
-void insToList(socknode_t *inusenode) {
- Lock(&sockhash.mylock);
- inusenode->next = sockhash.inuse;
- sockhash.inuse = inusenode;
- UnLock(&sockhash.mylock);
+void insToList(sockPoolHashTable_t *sockhash, socknode_t *inusenode) {
+ inusenode->next = sockhash->inuse;
+ sockhash->inuse = inusenode;
+}
+
+void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) {
+ Lock(&sockhash->mylock);
+ inusenode->next = sockhash->inuse;
+ sockhash->inuse = inusenode;
+ UnLock(&sockhash->mylock);
}
-int freeSock(unsigned int mid, int sd) {
- if(sockhash.inuse != NULL) {
- Lock(&sockhash.mylock);
- socknode_t *ptr = sockhash.inuse;
+int freeSock(sockPoolHashTable_t *sockhash, unsigned int mid, int sd) {
+ if(sockhash->inuse != NULL) {
+ socknode_t *ptr = sockhash->inuse;
+ ptr->mid = mid;
+ ptr->sd = sd;
+ sockhash->inuse = ptr->next;
+ int key = mid%(sockhash->size);
+ ptr->next = sockhash->table[key];
+ sockhash->table[key] = ptr;
+ return 0;
+ }
+ return -1;
+}
+
+int freeSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid, int sd) {
+ if(sockhash->inuse != NULL) {
+ Lock(&sockhash->mylock);
+ socknode_t *ptr = sockhash->inuse;
ptr->mid = mid;
ptr->sd = sd;
- sockhash.inuse = ptr->next;
- int key = mid%(sockhash.size);
- ptr->next = sockhash.table[key];
- sockhash.table[key] = ptr;
- UnLock(&sockhash.mylock);
+ sockhash->inuse = ptr->next;
+ int key = mid%(sockhash->size);
+ ptr->next = sockhash->table[key];
+ sockhash->table[key] = ptr;
+ UnLock(&sockhash->mylock);
return 0;
}
return -1;
}
+
+#if 0
+/ ***************************************/
+* Array Implementation for socket resuse
+* ***************************************/
+
+int num_machines;
+
+sock_pool_t *initSockPool(unsigned int *mid, int machines) {
+ sock_pool_t *sockpool;
+ num_machines = machines;
+ if ((sockpool = calloc(num_machines, sizeof(sock_pool_t))) < 0) {
+ printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__);
+ return NULL;
+ }
+ int i;
+ for (i = 0; i < num_machines; i++) {
+ if ((sockpool[i].sd = calloc(MAX_CONN_PER_MACHINE, sizeof(int))) < 0) {
+ printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__);
+ return NULL;
+ }
+ if ((sockpool[i].inuse = calloc(MAX_CONN_PER_MACHINE, sizeof(char))) < 0) {
+ printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__);
+ return NULL;
+ }
+ sockpool[i].mid = mid[i];
+ int j;
+ for(j = 0; j < MAX_CONN_PER_MACHINE; j++) {
+ sockpool[i].sd[j] = -1;
+ }
+ }
+
+ return sockpool;
+}
+
+int getSock(sock_pool_t *sockpool, unsigned int mid) {
+ int i;
+ for (i = 0; i < num_machines; i++) {
+ if (sockpool[i].mid == mid) {
+ int j;
+ for (j = 0; j < MAX_CONN_PER_MACHINE; j++) {
+ if (sockpool[i].sd[j] != -1 && (sockpool[i].inuse[j] == 0)) {
+ sockpool[i].inuse[j] = 1;
+ return sockpool[i].sd[j];
+ }
+ if (sockpool[i].sd[j] == -1) {
+ //Open Connection
+ int sd;
+ if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ struct sockaddr_in remoteAddr;
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+ printf("%s(): Error %d connecting to %s:%d\n", __func__, errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ close(sd);
+ return -1;
+ }
+ sockpool[i].sd[j] = sd;
+ sockpool[i].inuse[j] = 1;
+ return sockpool[i].sd[j];
+ }
+ }
+ printf("%s()->Error: Less number of MAX_CONN_PER_MACHINE\n", __func__);
+ return -1;
+ }
+ }
+ printf("%s()-> Error: Machine id not found\n", __func__);
+
+ return -1;
+}
+
+int freeSock(sock_pool_t *sockpool, int sd) {
+ int i;
+ for (i = 0; i < num_machines; i++) {
+ int j;
+ for (j = 0; j < MAX_CONN_PER_MACHINE; j++) {
+ if (sockpool[i].sd[j] == sd) {
+ sockpool[i].inuse[j] = 0;
+ return 0;
+ }
+ }
+ }
+ printf("%s() Error: Illegal socket descriptor %d\n", __func__, sd);
+
+ return -1;
+}
+
+#endif
#include "dstm.h"
-#define LOADFACTOR 0.5
-
typedef int SpinLock;
-
typedef struct socknode {
int sd;
unsigned int mid;
SpinLock mylock;
} sockPoolHashTable_t;
-int createSockPool(unsigned int, float);
-int getSock(unsigned int);
-int freeSock(unsigned int, int);
-int deleteSockpool(sockPoolHashTable_t *);
-void insToList(socknode_t *);
+sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int, float);
+int getSock(sockPoolHashTable_t *, unsigned int);
+int getSockWithLock(sockPoolHashTable_t *, unsigned int);
+int freeSock(sockPoolHashTable_t *, unsigned int, int);
+int freeSockWithLock(sockPoolHashTable_t *, unsigned int, int);
+void insToList(sockPoolHashTable_t *, socknode_t *);
+void insToListWithLock(sockPoolHashTable_t *, socknode_t *);
int createNewSocket(unsigned int);
int CompareAndSwap(int *, int, int);
void InitLock(SpinLock *);
void Lock (SpinLock *);
void UnLock (SpinLock *);
+#if 0
+/************************************************
+ * Array Implementation data structures
+ ***********************************************/
+#define MAX_CONN_PER_MACHINE 10
+typedef struct sock_pool {
+ unsigned int mid;
+ int *sd;
+ char *inuse;
+} sock_pool_t;
+
+sock_pool_t *initSockPool(unsigned int *, int);
+int getSock(sock_pool_t *, unsigned int);
+int freeSock(sock_pool_t *, int);
+#endif
+
#endif
#include "prelookup.h"
#include "threadnotify.h"
#include "queue.h"
-#include <pthread.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <netinet/in.h>
-#include <sys/types.h>
-#include <unistd.h>
-#include <errno.h>
-#include <time.h>
-#include <string.h>
#ifdef COMPILER
#include "thread.h"
#endif
-#define LISTEN_PORT 2156
#define NUM_THREADS 1
#define PREFETCH_CACHE_SIZE 1048576 //1MB
#define CONFIG_FILENAME "dstm.conf"
/* Global Variables */
extern int classsize[];
extern primarypfq_t pqueue; //Shared prefetch queue
-extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids
objstr_t *prefetchcache; //Global Prefetch cache
pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
unsigned int oidMin;
unsigned int oidMax;
-/************************************************************************
- * Global variables to map socketid and remote mid to
- * reuse sockets for sending prefetches and making remote read requests
- ************************************************************************/
-midSocketInfo_t midSocketArray[NUM_MACHINES];
-int sockCount; //number of connections with all remote machines(one socket per mc)
-int sockIdFound; //track if socket file descriptor is already established
-midSocketInfo_t sockArrayRemoteRead[NUM_MACHINES];
-int sockCountRemoteRead; //number of connections with all remote machines(one socket per mc)
-int sockIdFoundRemoteRead; //track if socket file descriptor is already established
+sockPoolHashTable_t *transReadSockPool;
+sockPoolHashTable_t *transPrefetchSockPool;
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
threadcount--;
#endif
+ //Initialize socket pool
+ if((transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
+ printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ if((transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
+ printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+
dstmInit();
transInit();
-
-
if (master) {
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
} while(retval!=0);
pthread_detach(tPrefetch);
-
- //Initialize mid to socketid mapping array
- sockCount = 0;
- for(t = 0; t < NUM_MACHINES; t++) {
- midSocketArray[t].mid = 0;
- midSocketArray[t].sockid = 0;
- }
-
- //Create and Initialize a pool of threads
- /* Threads are active for the entire period runtime is running */
- for(t = 0; t< NUM_THREADS; t++) {
- do {
- rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
- } while(rc!=0);
- pthread_detach(wthreads[t]);
- }
}
/* This function stops the threads spawned */
control = *(tdata->replyctrl);
send_data(sd, &control, sizeof(char));
- //TODO read missing objects to be used during object migration
- /* If the decided response is due to a soft abort and missing objects at the Participant's side */
+ //TODO read missing objects during object migration
+ /* If response is a soft abort due to missing objects at the Participant's side */
/*
if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
// Read list of objects missing
* */
void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
- int sd, size, val;
+ int size, val;
struct sockaddr_in serv_addr;
char machineip[16];
char control;
objheader_t *h;
- void *objcopy;
+ void *objcopy = NULL;
- int i;
- for(i = 0; i < NUM_MACHINES; i++) {
- if(sockArrayRemoteRead[i].mid == mnum) {
- sd = sockArrayRemoteRead[i].sockid;
- sockIdFoundRemoteRead = 1;
- break;
- }
- }
-
- if(sockIdFoundRemoteRead == 0) {
- if(sockCountRemoteRead < NUM_MACHINES) {
- /* Create socket */
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("Error in socket\n");
- return NULL;
- }
-
- bzero((char*) &serv_addr, sizeof(serv_addr));
- serv_addr.sin_family = AF_INET;
- serv_addr.sin_port = htons(LISTEN_PORT);
- serv_addr.sin_addr.s_addr = htonl(mnum);
- // Open connection
- if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
- perror("getRemoteObj() Error in connect\n");
- close(sd);
- return NULL;
- }
- sockArrayRemoteRead[sockCountRemoteRead].mid = mnum;
- sockArrayRemoteRead[sockCountRemoteRead].sockid = sd;
- sockCountRemoteRead++;
- } else {
- //TODO Fix for connecting to more than 2 machines && close socket
- printf("%s(): Error: Currently works for two remote machines\n", __func__);
- return NULL;
- }
+ int sd;
+ if((sd = getSock(transReadSockPool, mnum)) == -1) {
+ printf("%s(): Error: no socket id in the pool of sockets at %s, %d\n", __func__, __FILE__, __LINE__);
+ return NULL;
}
char readrequest[sizeof(char)+sizeof(unsigned int)];
switch(control) {
case OBJECT_NOT_FOUND:
- return NULL;
+ objcopy = NULL;
+ break;
case OBJECT_FOUND:
/* Read object if found into local cache */
recv_data(sd, &size, sizeof(int));
break;
default:
printf("Error: in recv response from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
- return NULL;
+ break;
}
+ int status;
+ if((status = freeSock(transReadSockPool, mnum, sd)) == -1) {
+ printf("Error in releasing socket at %s line %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+
return objcopy;
}
int ntuples, slength;
unsigned int *oid;
unsigned short *endoffsets;
- short *arryfields;
+ short *offsets;
/* Check for the case x.y.z and a.b.c are same oids */
ptr = (char *) node;
ntuples = *(GET_NTUPLES(ptr));
oid = GET_PTR_OID(ptr);
endoffsets = GET_PTR_EOFF(ptr, ntuples);
- arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+ offsets = GET_PTR_ARRYFLD(ptr, ntuples);
/* Find offset length for each tuple */
int numoffset[ntuples];
}
index = endoffsets[j -1];
for(count = 0; count < slength; count ++) {
- if (arryfields[k] != arryfields[index]) {
+ if (offsets[k] != offsets[index]) {
break;
}
index++;
objheader_t *objheader;
unsigned short *endoffsets;
short *arryfields;
- prefetchpile_t *head = NULL;
ptr = (char *) node;
ntuples = *(GET_NTUPLES(ptr));
}
/* Make machine groups */
+ prefetchpile_t *head = NULL;
if((head = makePreGroups(node, numoffset)) == NULL) {
printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__);
return NULL;
/* This function is called by the thread calling transPrefetch */
void *transPrefetch(void *t) {
- prefetchqelem_t *qnode;
- prefetchpile_t *pilehead = NULL;
- prefetchpile_t *ptr = NULL, *piletail = NULL;
-
while(1) {
/* lock mutex of primary prefetch queue */
pthread_mutex_lock(&pqueue.qlock);
}
/* dequeue node to create a machine piles and finally unlock mutex */
+ prefetchqelem_t *qnode;
if((qnode = pre_dequeue()) == NULL) {
printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
pthread_mutex_unlock(&pqueue.qlock);
checkPrefetchTuples(qnode);
/* Check if the tuples are found locally, if yes then reduce them further*/
/* and group requests by remote machine ids by calling the makePreGroups() */
+ prefetchpile_t *pilehead = NULL;
if((pilehead = foundLocal(qnode)) == NULL) {
printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__);
pre_enqueue(qnode);
continue;
}
- ptr = pilehead;
- while(ptr != NULL) {
- if(ptr->next == NULL) {
- piletail = ptr;
- }
- ptr = ptr->next;
- }
-
- /* Lock mutex of pool queue */
- pthread_mutex_lock(&mcqueue.qlock);
- /* Update the pool queue with the new remote machine piles generated per prefetch call */
- mcpileenqueue(pilehead, piletail);
- /* Broadcast signal on machine pile queue */
- pthread_cond_broadcast(&mcqueue.qcond);
- /* Unlock mutex of machine pile queue */
- pthread_mutex_unlock(&mcqueue.qlock);
- /* Deallocate the prefetch queue pile node */
- predealloc(qnode);
- }
-}
-
-/* Each thread in the pool of threads calls this function to establish connection with
- * remote machines, send the prefetch requests and process the reponses from
- * the remote machines .
- * The thread is active throughout the period of runtime */
-
-void *mcqProcess(void *threadid) {
- int tid, i;
- prefetchpile_t *mcpilenode;
- struct sockaddr_in remoteAddr;
- int sd;
-
- tid = (int) threadid;
- while(1) {
-
- sockIdFound = 0;
- /* Lock mutex of mc pile queue */
- pthread_mutex_lock(&mcqueue.qlock);
- /* When mc pile queue is empty, wait */
- while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
- pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
- }
- /* Dequeue node to send remote machine connections*/
- if((mcpilenode = mcpiledequeue()) == NULL) {
- printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&mcqueue.qlock);
- continue;
- }
- /* Unlock mutex */
- pthread_mutex_unlock(&mcqueue.qlock);
-
- /*Initiate connection to remote host and send prefetch request */
- if(mcpilenode->mid != myIpAddr) {
- /* Check to see if socket exists */
- for(i = 0; i < NUM_MACHINES; i++) {
- if(midSocketArray[i].mid == mcpilenode->mid) {
- sendPrefetchReq(mcpilenode, midSocketArray[i].sockid);
- sockIdFound = 1;
- break;
- }
- }
+ // Get sock from shared pool
+ int sd = -1;
+ if((sd = getSock(transPrefetchSockPool, pilehead->mid)) == -1) {
+ printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
+ exit(-1);
+ }
- if(sockIdFound == 0) {
- if(sockCount < NUM_MACHINES) {
- /* Open Socket */
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__);
- return;
- }
+ /* Send Prefetch Request */
+ prefetchpile_t *ptr = pilehead;
+ while(ptr != NULL) {
+ sendPrefetchReq(ptr, sd);
+ ptr = ptr->next;
+ }
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mcpilenode->mid);
-
- /* Open Connection */
- if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
- printf("%s():error %d connecting to %s:%d\n", __func__, errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- close(sd);
- return;
- }
+ /* Release socket */
+ int status;
+ if((status = freeSock(transPrefetchSockPool, pilehead->mid, sd)) == -1) {
+ printf("Error: In realeasing socket at %s line %d\n", __FILE__, __LINE__);
+ return;
+ }
- midSocketArray[sockCount].mid = mcpilenode->mid;
- midSocketArray[sockCount].sockid = sd;
- sendPrefetchReq(mcpilenode, midSocketArray[sockCount].sockid);
- sockCount++;
- } else {
- //TODO Fix for connecting to more than 2 machines && close socket
- printf("%s(): Error: Currently works for only 2 machines\n", __func__);
- return;
- }
- }
- }
+ /* Deallocated pilehead */
+ mcdealloc(pilehead);
- /* Deallocate the machine queue pile node */
- mcdealloc(mcpilenode);
+ // Deallocate the prefetch queue pile node
+ predealloc(qnode);
}
}
void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
- int i, off, len, endpair, count = 0;
- char machineip[16], control;
+ int off, len, endpair, count = 0;
+ char control;
objpile_t *tmp;
/* Send TRANS_PREFETCH control message */
off += sizeof(unsigned int);
*((unsigned int *)(oidnoffset + off)) = myIpAddr;
off += sizeof(unsigned int);
+ int i;
for(i = 0; i < tmp->numoffset; i++) {
*((short*)(oidnoffset + off)) = tmp->offset[i];
off+=sizeof(short);
else
{
msg[0] = START_REMOTE_THREAD;
- memcpy(&msg[1], &oid, sizeof(unsigned int));
+ *((unsigned int *) &msg[1]) = oid;
send_data(sock, msg, 1 + sizeof(unsigned int));
}
if $DSMFLAG
then
EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DCOMPILER -DDSTM -I$DSMRUNTIME"
-FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c"
+FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c"
fi
if $RECOVERFLAG