Added getMyIpAddr to ip.h, ip.c.
authorerubow <erubow>
Wed, 5 Sep 2007 00:51:34 +0000 (00:51 +0000)
committererubow <erubow>
Wed, 5 Sep 2007 00:51:34 +0000 (00:51 +0000)
Added preliminary code for starting remote threads.
Added code for getting the type of an object outside of a transaction.

Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/ip.c
Robust/src/Runtime/DSTM/interface/ip.h
Robust/src/Runtime/DSTM/interface/trans.c

index e3ccbc02f0fb24f49d20eecf5957fbf747869eb0..6e9a481eafa30f7b804c6eaa08db00eb4d17d802 100644 (file)
@@ -33,6 +33,7 @@
 #define TRANS_SOFT_ABORT               20
 #define TRANS_SUCESSFUL                        21
 #define TRANS_PREFETCH_RESPONSE                22
+#define START_REMOTE_THREAD            23
 
 //Control bits for status of objects in Machine pile
 #define OBJ_LOCKED_BUT_VERSION_MATCH   14
@@ -240,5 +241,7 @@ void checkPreCache(prefetchqelem_t *, int *, int, int, unsigned int, int, int, i
 int transPrefetchProcess(transrecord_t *, int **, short);
 void sendPrefetchReq(prefetchpile_t*, int);
 void getPrefetchResponse(int, int);
+unsigned short getObjType(unsigned int oid);
+int startRemoteThread(unsigned int oid, unsigned int mid);
 /* end transactions */
 #endif
index a1136966d520eb703ecd4a0f3e00894bbadd1af2..a28e906faa59825d494ca27d562028acb825bdcc 100644 (file)
@@ -98,6 +98,7 @@ void *dstmAccept(void *acceptfd)
        void *srcObj;
        objheader_t *h;
        trans_commit_data_t transinfo;
+       unsigned short objType;
        
        int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
 
@@ -169,6 +170,20 @@ void *dstmAccept(void *acceptfd)
                                return;
                        }
                        break;
+               case START_REMOTE_THREAD:
+                       retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
+                       if (retval <= 0)
+                               perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
+                       else if (retval != sizeof(unsigned int))
+                               printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD\n",
+                                       retval);
+                       else
+                       { //TODO: execute run method on this global thread object
+                               printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=%d\n", oid);
+                               objType = getObjType(oid);
+                               printf("dstmAccept(): type of object %d is %d\n", oid, objType);
+                       }
+                       break;
 
                default:
                        printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
index d959f49c2a110d1fa69021f25b0fe3c8b022d4c0..bbd8e2f67329f7f9cad69e8df19a56712a6128dd 100644 (file)
@@ -6,6 +6,8 @@
 #include <netdb.h>
 #include <netinet/in.h>
 #include <string.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
 
 #define LISTEN_PORT 2156
 
@@ -54,6 +56,35 @@ int checkServer(int mid, char *machineip) {
        close(tmpsd);
        return 0;
 }
+
+unsigned int getMyIpAddr(const char *interfaceStr)
+{      
+       int sock;
+       struct ifreq interfaceInfo;
+       struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
+
+       memset(&interfaceInfo, 0, sizeof(struct ifreq));
+
+       if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
+       {
+               perror("getMyIpAddr():socket()");
+               return 1;
+       }
+
+       strcpy(interfaceInfo.ifr_name, interfaceStr);
+       myAddr->sin_family = AF_INET;
+       
+       if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
+       {
+               perror("getMyIpAddr():ioctl()");
+               return 1;
+       }
+
+       close(sock);
+
+       return ntohl(myAddr->sin_addr.s_addr);
+}
+
 /*
 main() {
        unsigned int mid;
index 4dcc5f76f21c65c48040cab84d79646158087cfb..29db799c741e7ab5f82f53be7709c382a2e28bd2 100644 (file)
@@ -11,5 +11,6 @@ typedef struct ip {
 unsigned int iptoMid(char *);
 void midtoIP(unsigned int, char *);
 int checkServer(int, char *);
+unsigned int getMyIpAddr(const char *interfaceStr);
 
 #endif
index 963f324f083172098214559785c6b6671446ae56..9053965fc9d42dbc219ced0d0fad27fbd4a9113d 100644 (file)
@@ -800,8 +800,7 @@ void *handleLocalReq(void *threadarg) {
                /* Save the oids not found and number of oids not found for later use */
                if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
                        /* Save the oids not found and number of oids not found for later use */
-
-                       oidnotfound[objnotfound] = OID(((objheader_t *)mobj));
+                       oidnotfound[objnotfound] = oid;
                        objnotfound++;
                } else { /* If Obj found in machine (i.e. has not moved) */
                        /* Check if Obj is locked by any previous transaction */
@@ -1463,3 +1462,75 @@ void getPrefetchResponse(int count, int sd) {
                printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
        return;
 }
+
+unsigned short getObjType(unsigned int oid)
+{
+       objheader_t *objheader;
+       unsigned short numoffsets = 0;
+
+       if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL)
+       {
+               if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
+               {
+                       prefetch(1, &oid, &numoffsets, NULL);
+                       pthread_mutex_lock(&pflookup.lock);
+                       while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
+                               pthread_cond_wait(&pflookup.cond, &pflookup.lock);
+                       pthread_mutex_unlock(&pflookup.lock);
+               }
+       }
+
+       return TYPE(objheader);
+}
+
+int startRemoteThread(unsigned int oid, unsigned int mid)
+{
+       int sock;
+       struct sockaddr_in remoteAddr;
+       char msg[1 + sizeof(unsigned int)];
+       int bytesSent;
+       int status;
+
+       if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
+       {
+               perror("startRemoteThread():socket()");
+               return -1;
+       }
+
+       bzero(&remoteAddr, sizeof(remoteAddr));
+       remoteAddr.sin_family = AF_INET;
+       remoteAddr.sin_port = htons(LISTEN_PORT);
+       remoteAddr.sin_addr.s_addr = htonl(mid);
+       
+       if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0)
+       {
+               printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
+                       inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+               status = -1;
+       }
+       else
+       {
+               msg[0] = START_REMOTE_THREAD;
+               memcpy(&msg[1], &oid, sizeof(unsigned int));
+
+               bytesSent = send(sock, msg, 1 + sizeof(unsigned int), 0);
+               if (bytesSent < 0)
+               {
+                       perror("startRemoteThread():send()");
+                       status = -1;
+               }
+               else if (bytesSent != 1 + sizeof(unsigned int))
+               {
+                       printf("startRemoteThread(): error, sent %d bytes\n", bytesSent);
+                       status = -1;
+               }
+               else
+               {
+                       status = 0;
+               }
+       }
+
+       close(sock);
+       return status;
+}
+