Socket I/O code and Example
authorbdemsky <bdemsky>
Thu, 5 Oct 2006 21:35:49 +0000 (21:35 +0000)
committerbdemsky <bdemsky>
Thu, 5 Oct 2006 21:35:49 +0000 (21:35 +0000)
Robust/src/ClassLibrary/ServerSocket.java
Robust/src/ClassLibrary/Socket.java
Robust/src/IR/FlagDescriptor.java
Robust/src/IR/Tree/BuildIR.java
Robust/src/IR/Tree/SemanticCheck.java
Robust/src/Lex/Keyword.java
Robust/src/Lex/Lexer.java
Robust/src/Parse/java14.cup
Robust/src/Runtime/runtime.c
Robust/src/Tests/ServerExample.java [new file with mode: 0644]
Robust/src/buildscripttask

index 58ebc8f0c4deff3e4a9c2f09789ebc4d895c1093..d99ff83e342900315ae8687a0cb383e8b32b4de6 100644 (file)
@@ -1,6 +1,6 @@
 public class ServerSocket {
     /* Socket pending flag */
-    flag SocketPending;    
+    external flag SocketPending;    
     /* File Descriptor */
     int fd;
 
@@ -12,12 +12,12 @@ public class ServerSocket {
     
     public Socket accept() {
        Socket s=new Socket();
-       int newfd=nativeaccept(s, fd);
+       int newfd=nativeaccept(s);
        s.setFD(newfd);
        return s;
     }
 
-    private static native int nativeaccept(Socket s,int fd);
+    private native int nativeaccept(Socket s);
     
     public void close();
 
index 1f0f32c0570e7eaf8bb35cccfbde6ff080fdb536..8ad852d5c7925929b455121c0771f913e319d174 100644 (file)
@@ -1,6 +1,6 @@
 public class Socket {
     /* Data pending flag */
-    flag IOPending;    
+    external flag IOPending;    
     /* File Descriptor */
     int fd;
     
@@ -12,17 +12,17 @@ public class Socket {
     }
 
     public int read(byte[] b) {
-       return nativeRead(b, fd);
+       return nativeRead(b);
     }
     public void write(byte[] b) {
-       nativeWrite(b, fd);
+       nativeWrite(b);
     }
 
-    private native static int nativeRead(byte[] b, int fd);
-    private native static void nativeWrite(byte[] b, int fd);
-    private native static void nativeClose(int fd);
+    private native int nativeRead(byte[] b);
+    private native void nativeWrite(byte[] b);
+    private native void nativeClose();
 
     public void close() {
-       nativeClose(fd);
+       nativeClose();
     }
 }
index f2f6bfd299a147664c766859ee6e99593b306589..f358fb62d22dba51f4428706243568c75a819d7a 100644 (file)
@@ -12,6 +12,15 @@ public class FlagDescriptor extends Descriptor {
        super(identifier);
     }
 
+    private boolean isExternal=false;
+    public void makeExternal() {
+       isExternal=true;
+    }
+
+    public boolean getExternal() {
+       return isExternal;
+    }
+
     public String toString() {
        return "Flag "+getSymbol();
     }
index 52becbd039f212d2bd34f826290f988c89675369..c7bdb9189be4e43886e8590ee40de20962dfd1fc 100644 (file)
@@ -246,7 +246,10 @@ public class BuildIR {
 
     private void parseFlagDecl(ClassDescriptor cn,ParseNode pn) {
        String name=pn.getChild("name").getTerminal();
-       cn.addFlag(new FlagDescriptor(name));
+       FlagDescriptor flag=new FlagDescriptor(name);
+       if (pn.getChild("external")!=null)
+           flag.makeExternal();
+       cn.addFlag(flag);
     }
 
     private void parseFieldDecl(ClassDescriptor cn,ParseNode pn) {
index eb1b5ec2a9cb962ca9daccdf9f1b0a5d6aa78500..a2cd5122e6718195be67c97a1a71d24bf48dcd58 100644 (file)
@@ -116,6 +116,8 @@ public class SemanticCheck {
                //Make sure the flag is declared
                if (flag_d==null)
                    throw new Error("Flag descriptor "+name+" undefined in class: "+cd.getSymbol());
+               if (flag_d.getExternal())
+                   throw new Error("Attempting to modify external flag: "+name);
                flag.setFlag(flag_d);
            }
        }
@@ -505,6 +507,8 @@ public class SemanticCheck {
                //Make sure the flag is declared
                if (flag_d==null)
                    throw new Error("Flag descriptor "+name+" undefined in class: "+cd.getSymbol());
+               if (flag_d.getExternal())
+                   throw new Error("Attempting to modify external flag: "+name);
                flag.setFlag(flag_d);
            }
        }
index f91131aeb28014cdaa18dddb32b4b397d98b668b..bdd7332c59690f1146e6831ed0c1c792b4d546e2 100644 (file)
@@ -66,6 +66,7 @@ class Keyword extends Token {
     key_table.put("while", new Integer(Sym.WHILE));
     //Keywords for failure aware computation
     key_table.put("flag", new Integer(Sym.FLAG));
+    key_table.put("external", new Integer(Sym.EXTERNAL));
     key_table.put("tag", new Integer(Sym.TAG));
     key_table.put("task", new Integer(Sym.TASK));
     key_table.put("taskexit", new Integer(Sym.TASKEXIT));
index 456cd0d1bd28e5a4ea49d8e6a3e4d3c0cd51bb0f..448cbff29ebe84c44d35d76dd614854602040618 100644 (file)
@@ -246,7 +246,7 @@ public class Lexer {
   static final String[] keywords = new String[] {
     "abstract", "assert", "boolean", "break", "byte", "case", "catch", "char",
     "class", "const", "continue", "default", "do", "double", "else", "enum",
-    "extends", "final", "finally", 
+    "extends", "external", "final", "finally", 
     "flag", //keyword for failure aware computation
     "float", "for", "goto", "if", 
     "implements", "import", "instanceof", "int", "interface", "long", 
index 4a6877a4e6612a9fe334220daafa87e57a2a7599..ae8058e558303191588b552147f585db72444a77 100644 (file)
@@ -221,6 +221,7 @@ non terminal ParseNode expression_opt, expression;
 //non terminal ParseNode constant_expression;
 //failure aware computation keywords
 terminal FLAG;
+terminal EXTERNAL;
 terminal TAG;
 terminal TASK;
 terminal TASKEXIT;
@@ -683,6 +684,12 @@ flag_declaration ::=
                ParseNode pn=new ParseNode("flag_declaration");
                pn.addChild("name").addChild(id);
                RESULT=pn;
+       :}      |
+               EXTERNAL FLAG IDENTIFIER:id SEMICOLON {: 
+               ParseNode pn=new ParseNode("flag_declaration");
+               pn.addChild("name").addChild(id);
+               pn.addChild("external");
+               RESULT=pn;
        :}
        ;
 
index cab31e81fe44f325ded124a3541c9322cd345081..7393e7e4cb4ee505b82ac392c422fdb9b3057a2f 100644 (file)
@@ -185,16 +185,16 @@ int maxreadfd;
 struct RuntimeHash *fdtoobject;
 
 void addreadfd(int fd) {
-  if (fd>maxreadfd)
-    fd=maxreadfd;
+  if (fd>=maxreadfd)
+    maxreadfd=fd+1;
   FD_SET(fd, &readfds);
 }
 
 void removereadfd(int fd) {
   FD_CLR(fd, &readfds);
-  if (maxreadfd==fd) {
+  if (maxreadfd==(fd+1)) {
     maxreadfd--;
-    while(!FD_ISSET(maxreadfd, &readfds)&&maxreadfd>0)
+    while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
       maxreadfd--;
   }
 }
@@ -222,54 +222,60 @@ void executetasks() {
   mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
 
   newtask:
-  while(!isEmpty(activetasks)) {
-    struct QueueItem * qi=(struct QueueItem *) getTail(activetasks);
-    struct taskparamdescriptor *tpd=(struct taskparamdescriptor *) qi->objectptr;
-    int i;
-    struct timeval timeout={0,0};
-    fd_set tmpreadfds;
-    int numselect;
-    FD_COPY(&readfds, &tmpreadfds);
-    numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
-    if (numselect>0) {
-      /* Process ready fd's */
-      int fd;
-      for(fd=0;fd<maxreadfd;fd++) {
-       if (FD_ISSET(fd, &readfds)) {
-         /* Set ready flag on object */
-         void * objptr;
-         if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
-           flagorand(objptr,1,0xFFFFFFFF); /* Set the first flag to 1 */
+  while(!isEmpty(activetasks)||(maxreadfd>0)) {
+
+    if (maxreadfd>0) {
+      int i;
+      struct timeval timeout={0,0};
+      fd_set tmpreadfds;
+      int numselect;
+      FD_COPY(&readfds, &tmpreadfds);
+      numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
+      if (numselect>0) {
+       /* Process ready fd's */
+       int fd;
+       for(fd=0;fd<maxreadfd;fd++) {
+         if (FD_ISSET(fd, &tmpreadfds)) {
+           /* Set ready flag on object */
+           void * objptr;
+           if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
+             flagorand(objptr,1,0xFFFFFFFF); /* Set the first flag to 1 */
+           }
          }
        }
       }
     }
-    
-    removeItem(activetasks, qi);
-    
-    for(i=0;i<tpd->task->numParameters;i++) {
-      void * parameter=tpd->parameterArray[i];
-      struct parameterdescriptor * pd=tpd->task->descriptorarray[i];
-      struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue;
-      if (!RuntimeHashcontainskey(pw->objectset, (int) parameter))
-       goto newtask;
-      taskpointerarray[i]=parameter;
-    }
-    {
-      struct RuntimeHash * forward=allocateRuntimeHash(100);
-      struct RuntimeHash * reverse=allocateRuntimeHash(100);
-      void ** checkpoint=makecheckpoint(tpd->task->numParameters, taskpointerarray, forward, reverse);
-      if (setjmp(error_handler)) {
-       /* Recover */
-       int h;
+
+    if (!isEmpty(activetasks)) {
+      int i;
+      struct QueueItem * qi=(struct QueueItem *) getTail(activetasks);
+      struct taskparamdescriptor *tpd=(struct taskparamdescriptor *) qi->objectptr;
+      removeItem(activetasks, qi);
+      
+      for(i=0;i<tpd->task->numParameters;i++) {
+       void * parameter=tpd->parameterArray[i];
+       struct parameterdescriptor * pd=tpd->task->descriptorarray[i];
+       struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue;
+       if (!RuntimeHashcontainskey(pw->objectset, (int) parameter))
+         goto newtask;
+       taskpointerarray[i]=parameter;
+      }
+      {
+       struct RuntimeHash * forward=allocateRuntimeHash(100);
+       struct RuntimeHash * reverse=allocateRuntimeHash(100);
+       void ** checkpoint=makecheckpoint(tpd->task->numParameters, taskpointerarray, forward, reverse);
+       if (setjmp(error_handler)) {
+         /* Recover */
+         int h;
 #ifdef DEBUG
-       printf("Recovering\n");
+         printf("Recovering\n");
 #endif
-       genputtable(failedtasks,tpd,tpd);
-       restorecheckpoint(tpd->task->numParameters, taskpointerarray, checkpoint, forward, reverse);
-      } else {
-       /* Actually call task */
-       ((void (*) (void **)) tpd->task->taskptr)(taskpointerarray);
+         genputtable(failedtasks,tpd,tpd);
+         restorecheckpoint(tpd->task->numParameters, taskpointerarray, checkpoint, forward, reverse);
+       } else {
+         /* Actually call task */
+         ((void (*) (void **)) tpd->task->taskptr)(taskpointerarray);
+       }
       }
     }
   }
@@ -302,7 +308,8 @@ void processtasks() {
 
 
 int ___ServerSocket______createSocket____I(struct ___ServerSocket___ * sock, int port) {
-  int fd=socket(AF_INET, SOCK_STREAM, 0);
+  int fd;
+
   int n=1;
   struct sockaddr_in sin;
 
@@ -310,12 +317,21 @@ int ___ServerSocket______createSocket____I(struct ___ServerSocket___ * sock, int
   sin.sin_family = AF_INET;
   sin.sin_port = htons (port);
   sin.sin_addr.s_addr = htonl (INADDR_ANY);
-  
-  if (fd<0)
+  fd=socket(AF_INET, SOCK_STREAM, 0);
+  if (fd<0) {
+#ifdef DEBUG
+    perror(NULL);
+    printf("createSocket error #1\n");
+#endif
     longjmp(error_handler,5);
-  
+  }
+
   if (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof (n)) < 0) {
     close(fd);
+#ifdef DEBUG
+    perror(NULL);
+    printf("createSocket error #2\n");
+#endif
     longjmp(error_handler, 6);
   }
   fcntl(fd, F_SETFD, 1);
@@ -324,47 +340,66 @@ int ___ServerSocket______createSocket____I(struct ___ServerSocket___ * sock, int
   /* bind to port */
   if (bind(fd, (struct sockaddr *) &sin, sizeof(sin))<0) { 
     close (fd);
+#ifdef DEBUG
+    perror(NULL);
+    printf("createSocket error #3\n");
+#endif
     longjmp(error_handler, 7);
   }
 
   /* listen */
-  if (listen(fd, 10)<0) { 
+  if (listen(fd, 5)<0) { 
     close (fd);
+#ifdef DEBUG
+    perror(NULL);
+    printf("createSocket error #4\n");
+#endif
     longjmp(error_handler, 8);
   }
 
   /* Store the fd/socket object mapping */
   RuntimeHashadd(fdtoobject, fd, (int) sock);
   addreadfd(fd);
-
   return fd;
 }
 
-int ___ServerSocket______nativeaccept____L___Socket____I(struct ___Socket___ * sock, int fd) {
+int ___ServerSocket______nativeaccept____L___Socket___(struct ___ServerSocket___ * serversock, struct ___Socket___ * sock) {
   struct sockaddr_in sin;
   unsigned int sinlen=sizeof(sin);
-  int newfd=accept(fd, (struct sockaddr *)&sin, &sinlen);
+  int fd=serversock->___fd___;
+  int newfd;
+  newfd=accept(fd, (struct sockaddr *)&sin, &sinlen);
+
 
   if (newfd<0) { 
+#ifdef DEBUG
+    perror(NULL);
+    printf("acceptSocket error #1\n");
+#endif
     longjmp(error_handler, 9);
   }
   fcntl(newfd, F_SETFL, fcntl(fd, F_GETFL)|O_NONBLOCK);
 
-  RuntimeHashadd(fdtoobject, fd, (int) sock);
-  addreadfd(fd);
+  RuntimeHashadd(fdtoobject, newfd, (int) sock);
+  addreadfd(newfd);
+  flagorand(serversock,0,0xFFFFFFFE);
   return newfd;
 }
 
-void ___Socket______nativeWrite_____AR_C_I(struct ArrayObject * ao, int fd) {
+
+void ___Socket______nativeWrite_____AR_C(struct ___Socket___ * sock, struct ArrayObject * ao) {
+  int fd=sock->___fd___;
   int length=ao->___length___;
   char * charstr=((char *)& ao->___length___)+sizeof(int);
   int bytewritten=write(fd, charstr, length);
   if (bytewritten!=length) {
     printf("ERROR IN NATIVEWRITE\n");
   }
+  flagorand(sock,0,0xFFFFFFFE);
 }
 
-int ___Socket______nativeRead_____AR_C_I(struct ArrayObject * ao, int fd) {
+int ___Socket______nativeRead_____AR_C(struct ___Socket___ * sock, struct ArrayObject * ao) {
+  int fd=sock->___fd___;
   int length=ao->___length___;
   char * charstr=((char *)& ao->___length___)+sizeof(int);
   int byteread=read(fd, charstr, length);
@@ -372,15 +407,18 @@ int ___Socket______nativeRead_____AR_C_I(struct ArrayObject * ao, int fd) {
   if (byteread<0) {
     printf("ERROR IN NATIVEREAD\n");
   }
+  flagorand(sock,0,0xFFFFFFFE);
   return byteread;
 }
 
-void ___Socket______nativeClose____I(int fd) {
+void ___Socket______nativeClose____(struct ___Socket___ * sock) {
+  int fd=sock->___fd___;
   int data;
   RuntimeHashget(fdtoobject, fd, &data);
   RuntimeHashremove(fdtoobject, fd, data);
   removereadfd(fd);
   close(fd);
+  flagorand(sock,0,0xFFFFFFFE);
 }
 #endif
 
diff --git a/Robust/src/Tests/ServerExample.java b/Robust/src/Tests/ServerExample.java
new file mode 100644 (file)
index 0000000..82b52ce
--- /dev/null
@@ -0,0 +1,27 @@
+/* Startup object is generated with the initialstate flag set by the
+ *  system to start the computation up */
+
+task Startup(StartupObject s {initialstate}) {
+    System.printString("Starting\n");
+    ServerSocket ss=new ServerSocket(8000);
+    System.printString("Creating ServerSocket\n");
+    taskexit(s {!initialstate}); /* Turns initial state flag off, so this task won't refire */
+}
+
+task AcceptConnection(ServerSocket ss{SocketPending}) {
+    Socket s=ss.accept();
+    System.printString("Creating Socket\n");
+}
+
+task IncomingIO(Socket s{IOPending}) {
+    byte[] b=new byte[10];
+    int length=s.read(b);
+    byte[] b2=new byte[length];
+    int i;
+    for(i=0;i<length;i++) {
+       b2[i]=b[i];
+    }
+    System.printString("receiving input\n");
+    s.write(b2);
+}
+
index 8aab83e1fba634a32dcdc5fa28e5a5e14a5ed2a1..e485dd832945e448bfd8be2c2dd72b065104da1b 100755 (executable)
@@ -5,4 +5,4 @@ shift
 mkdir tmpbuilddirectory
 java -cp $ROBUSTROOT/../cup/:$ROBUSTROOT Main.Main -classlibrary $ROBUSTROOT/ClassLibrary/ -dir tmpbuilddirectory -struct $MAINFILE -task $@
 #gcc -I$ROBUSTROOT/Runtime -Itmpbuilddirectory -O0 -DTASK -g tmpbuilddirectory/methods.c tmpbuilddirectory/taskdefs.c $ROBUSTROOT/Runtime/runtime.c $ROBUSTROOT/Runtime/Queue.c $ROBUSTROOT/Runtime/SimpleHash.c $ROBUSTROOT/Runtime/checkpoint.c $ROBUSTROOT/Runtime/GenericHashtable.c -o $MAINFILE.bin
-gcc -I$ROBUSTROOT/Runtime -I. -IRuntime/include -Itmpbuilddirectory -O0 -DBOEHM_GC -LRuntime/lib/ -lgc -DTASK -g tmpbuilddirectory/methods.c tmpbuilddirectory/taskdefs.c $ROBUSTROOT/Runtime/runtime.c $ROBUSTROOT/Runtime/Queue.c $ROBUSTROOT/Runtime/SimpleHash.c $ROBUSTROOT/Runtime/checkpoint.c $ROBUSTROOT/Runtime/GenericHashtable.c -o $MAINFILE.bin
\ No newline at end of file
+gcc -I$ROBUSTROOT/Runtime -I. -IRuntime/include -Itmpbuilddirectory -O0 -DBOEHM_GC -LRuntime/lib/ -lgc -DTASK -DDEBUG -g tmpbuilddirectory/methods.c tmpbuilddirectory/taskdefs.c $ROBUSTROOT/Runtime/runtime.c $ROBUSTROOT/Runtime/Queue.c $ROBUSTROOT/Runtime/SimpleHash.c $ROBUSTROOT/Runtime/checkpoint.c $ROBUSTROOT/Runtime/GenericHashtable.c -o $MAINFILE.bin
\ No newline at end of file