From 872b410315508a73421f2eb3d0e527c7bd743576 Mon Sep 17 00:00:00 2001 From: erubow Date: Wed, 5 Sep 2007 00:51:34 +0000 Subject: [PATCH] Added getMyIpAddr to ip.h, ip.c. Added preliminary code for starting remote threads. Added code for getting the type of an object outside of a transaction. --- Robust/src/Runtime/DSTM/interface/dstm.h | 3 + .../src/Runtime/DSTM/interface/dstmserver.c | 15 ++++ Robust/src/Runtime/DSTM/interface/ip.c | 31 ++++++++ Robust/src/Runtime/DSTM/interface/ip.h | 1 + Robust/src/Runtime/DSTM/interface/trans.c | 75 ++++++++++++++++++- 5 files changed, 123 insertions(+), 2 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index e3ccbc02..6e9a481e 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -33,6 +33,7 @@ #define TRANS_SOFT_ABORT 20 #define TRANS_SUCESSFUL 21 #define TRANS_PREFETCH_RESPONSE 22 +#define START_REMOTE_THREAD 23 //Control bits for status of objects in Machine pile #define OBJ_LOCKED_BUT_VERSION_MATCH 14 @@ -240,5 +241,7 @@ void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, i int transPrefetchProcess(transrecord_t *, int **, short); void sendPrefetchReq(prefetchpile_t*, int); void getPrefetchResponse(int, int); +unsigned short getObjType(unsigned int oid); +int startRemoteThread(unsigned int oid, unsigned int mid); /* end transactions */ #endif diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index a1136966..a28e906f 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -98,6 +98,7 @@ void *dstmAccept(void *acceptfd) void *srcObj; objheader_t *h; trans_commit_data_t transinfo; + unsigned short objType; int fd_flags = fcntl((int)acceptfd, F_GETFD), size; @@ -169,6 +170,20 @@ void *dstmAccept(void *acceptfd) return; } break; + case START_REMOTE_THREAD: + retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0); + if (retval <= 0) + perror("dstmAccept(): error receiving START_REMOTE_THREAD msg"); + else if (retval != sizeof(unsigned int)) + printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD\n", + retval); + else + { //TODO: execute run method on this global thread object + printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=%d\n", oid); + objType = getObjType(oid); + printf("dstmAccept(): type of object %d is %d\n", oid, objType); + } + break; default: printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control); diff --git a/Robust/src/Runtime/DSTM/interface/ip.c b/Robust/src/Runtime/DSTM/interface/ip.c index d959f49c..bbd8e2f6 100644 --- a/Robust/src/Runtime/DSTM/interface/ip.c +++ b/Robust/src/Runtime/DSTM/interface/ip.c @@ -6,6 +6,8 @@ #include #include #include +#include +#include #define LISTEN_PORT 2156 @@ -54,6 +56,35 @@ int checkServer(int mid, char *machineip) { close(tmpsd); return 0; } + +unsigned int getMyIpAddr(const char *interfaceStr) +{ + int sock; + struct ifreq interfaceInfo; + struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr; + + memset(&interfaceInfo, 0, sizeof(struct ifreq)); + + if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) + { + perror("getMyIpAddr():socket()"); + return 1; + } + + strcpy(interfaceInfo.ifr_name, interfaceStr); + myAddr->sin_family = AF_INET; + + if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0) + { + perror("getMyIpAddr():ioctl()"); + return 1; + } + + close(sock); + + return ntohl(myAddr->sin_addr.s_addr); +} + /* main() { unsigned int mid; diff --git a/Robust/src/Runtime/DSTM/interface/ip.h b/Robust/src/Runtime/DSTM/interface/ip.h index 4dcc5f76..29db799c 100644 --- a/Robust/src/Runtime/DSTM/interface/ip.h +++ b/Robust/src/Runtime/DSTM/interface/ip.h @@ -11,5 +11,6 @@ typedef struct ip { unsigned int iptoMid(char *); void midtoIP(unsigned int, char *); int checkServer(int, char *); +unsigned int getMyIpAddr(const char *interfaceStr); #endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 963f324f..9053965f 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -800,8 +800,7 @@ void *handleLocalReq(void *threadarg) { /* Save the oids not found and number of oids not found for later use */ if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ /* Save the oids not found and number of oids not found for later use */ - - oidnotfound[objnotfound] = OID(((objheader_t *)mobj)); + oidnotfound[objnotfound] = oid; objnotfound++; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ @@ -1463,3 +1462,75 @@ void getPrefetchResponse(int count, int sd) { printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__); return; } + +unsigned short getObjType(unsigned int oid) +{ + objheader_t *objheader; + unsigned short numoffsets = 0; + + if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) + { + if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) + { + prefetch(1, &oid, &numoffsets, NULL); + pthread_mutex_lock(&pflookup.lock); + while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) + pthread_cond_wait(&pflookup.cond, &pflookup.lock); + pthread_mutex_unlock(&pflookup.lock); + } + } + + return TYPE(objheader); +} + +int startRemoteThread(unsigned int oid, unsigned int mid) +{ + int sock; + struct sockaddr_in remoteAddr; + char msg[1 + sizeof(unsigned int)]; + int bytesSent; + int status; + + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) + { + perror("startRemoteThread():socket()"); + return -1; + } + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) + { + printf("startRemoteThread():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + status = -1; + } + else + { + msg[0] = START_REMOTE_THREAD; + memcpy(&msg[1], &oid, sizeof(unsigned int)); + + bytesSent = send(sock, msg, 1 + sizeof(unsigned int), 0); + if (bytesSent < 0) + { + perror("startRemoteThread():send()"); + status = -1; + } + else if (bytesSent != 1 + sizeof(unsigned int)) + { + printf("startRemoteThread(): error, sent %d bytes\n", bytesSent); + status = -1; + } + else + { + status = 0; + } + } + + close(sock); + return status; +} + -- 2.34.1