Server Code
authorBrian Demsky <bdemsky@plrg.eecs.uci.edu>
Sat, 23 Jul 2016 02:07:13 +0000 (19:07 -0700)
committerBrian Demsky <bdemsky@plrg.eecs.uci.edu>
Sat, 23 Jul 2016 02:07:41 +0000 (19:07 -0700)
src/java/iotcloud/CloudComm.java [new file with mode: 0644]
src/java/iotcloud/Slot.java [new file with mode: 0644]
src/server/iotquery.cpp

diff --git a/src/java/iotcloud/CloudComm.java b/src/java/iotcloud/CloudComm.java
new file mode 100644 (file)
index 0000000..18f7f30
--- /dev/null
@@ -0,0 +1,79 @@
+package iotcloud;
+import java.io.*;
+import java.net.*;
+import java.util.Arrays;
+
+class CloudComm {
+       String baseurl;
+       CloudComm(String _baseurl) {
+               this.baseurl=_baseurl;
+       }
+       
+       private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException {
+               String reqstring=isput?"req=putslot":"req=getslot";
+               String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber;
+               if (maxentries != 0)
+                       urlstr += "&max="+maxentries;
+               return new URL(urlstr);
+       }
+
+       public Slot[] putSlot(Slot slot, int max) throws IOException{
+               long sequencenumber=slot.getSequenceNumber();
+               URL url=buildRequest(true, sequencenumber, max);
+               URLConnection con=url.openConnection();
+               HttpURLConnection http = (HttpURLConnection) con;
+               http.setRequestMethod("POST");
+               http.setFixedLengthStreamingMode(slot.getBytes().length);
+               http.setDoOutput(true);
+               http.connect();
+               OutputStream os=http.getOutputStream();
+               os.write(slot.getBytes());
+               System.out.println(http.getResponseMessage());
+
+               InputStream is=http.getInputStream();
+               DataInputStream dis=new DataInputStream(is);
+               byte[] resptype=new byte[7];
+               dis.readFully(resptype);
+               if (Arrays.equals(resptype, "getslot".getBytes()))
+                       return processSlots(dis, sequencenumber);
+               else if (Arrays.equals(resptype, "putslot".getBytes()))
+                       return null;
+               else
+                       throw new Error("Bad response to putslot");
+       }
+       
+       public Slot[] getSlots(long sequencenumber) throws IOException {
+               URL url=buildRequest(false, sequencenumber, 0);
+               URLConnection con=url.openConnection();
+               HttpURLConnection http = (HttpURLConnection) con;
+               http.setRequestMethod("POST");
+               http.connect();
+               System.out.println(http.getResponseMessage());
+               InputStream is=http.getInputStream();
+
+               DataInputStream dis=new DataInputStream(is);
+               byte[] resptype=new byte[7];
+               dis.readFully(resptype);
+               if (!Arrays.equals(resptype, "getslot".getBytes()))
+                       throw new Error("Bad Response: "+new String(resptype));
+               else
+                       return processSlots(dis, sequencenumber);
+       }
+               
+       Slot[] processSlots(DataInputStream dis, long sequencenumber) throws IOException {
+               int numberofslots=dis.readInt();
+               int[] sizesofslots=new int[numberofslots];
+               Slot[] slots=new Slot[numberofslots];
+               System.out.println(numberofslots);
+               for(int i=0;i<numberofslots;i++)
+                       sizesofslots[i]=dis.readInt();
+
+               for(int i=0;i<numberofslots;i++) {
+                       byte[] data=new byte[sizesofslots[i]];
+                       dis.readFully(data);
+                       slots[i]=new Slot(sequencenumber+i, data);
+               }
+               dis.close();
+               return slots;
+       }
+}
diff --git a/src/java/iotcloud/Slot.java b/src/java/iotcloud/Slot.java
new file mode 100644 (file)
index 0000000..9dd6c18
--- /dev/null
@@ -0,0 +1,24 @@
+package iotcloud;
+
+class Slot {
+       long seqnum;
+       byte[] bytes;
+       
+       Slot(long _seqnum, byte[] _bytes) {
+               seqnum=_seqnum;
+               bytes=_bytes;
+       }
+
+       
+       long getSequenceNumber() {
+               return seqnum;
+       }
+
+       byte[] getBytes() {
+               return bytes;
+       }
+       
+       public String toString() {
+               return "<"+getSequenceNumber()+", "+new String(getBytes())+">";
+       }
+}
index 39dacab..9ab3f27 100644 (file)
@@ -6,6 +6,8 @@
 #include <fcntl.h>
 #include <unistd.h>
 #include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
 
 using namespace std;
 
@@ -28,7 +30,9 @@ IoTQuery::IoTQuery(FCGX_Request *request) :
        newestentry(0),
        requestsequencenumber(0),
        numqueueentries(DEFAULT_SIZE),
-       fd(-1) {
+       fd(-1),
+       reqGetSlot(false),
+       reqPutSlot(false) {
 }
 
 IoTQuery::~IoTQuery() {
@@ -53,26 +57,33 @@ void IoTQuery::decodeQuery() {
        char * str=new char[len+1];
        memcpy(str, query, len+1);
        char *tok_ptr=str;
-
+       
        /* Parse commands */
        char *command=strsep(&tok_ptr, "&");
-       if (strncmp(command, "putslot", 7) == 0)
+       if (strncmp(command, "req=putslot", 11) == 0)
                reqPutSlot = true;
 
-       if (strncmp(command, "getslot", 7) == 0)
+       if (strncmp(command, "req=getslot", 11) == 0)
                reqGetSlot = true;
 
        /* Load Sequence Number for request */
        char *sequencenumber_str = strsep(&tok_ptr, "&");
-
-       if (sequencenumber_str != NULL)
-               requestsequencenumber = strtoll(sequencenumber_str, NULL, 10);
-
+       if (sequencenumber_str != NULL &&
+                       strncmp(sequencenumber_str, "seq=", 4) == 0) {
+               sequencenumber_str = strchr(sequencenumber_str, '=');
+               if (sequencenumber_str != NULL) {
+                       requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+               }
+       }
+       
        /* Update size if we get request */
        char * numqueueentries_str = tok_ptr;
-       if (numqueueentries_str != NULL)
+       if (numqueueentries_str != NULL &&
+                       strncmp(numqueueentries_str, "max=", 4) == 0) {
+               numqueueentries_str = strchr(numqueueentries_str + 1, '=');
                numqueueentries = strtoll(numqueueentries_str, NULL, 10);
-
+       }
+               
        delete str;
 }
 
@@ -84,7 +95,10 @@ void doWrite(int fd, char *data, long long length) {
                        length -= byteswritten;
                        offset += byteswritten;
                } else {
-                       cerr << "Bytes not written";
+                       cerr << "Bytes not written" << endl;
+                       if (byteswritten < 0) {
+                               cerr << strerror(errno) << " error writing slot file" << endl;
+                       }
                        return;
                }
        } while(length != 0);
@@ -106,6 +120,8 @@ bool doRead(int fd, void *buf, int numbytes) {
 
 void IoTQuery::getSlot() {
        int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
+       if (numrequeststosend < 0)
+               numrequeststosend = 0;
        long long numbytes = 0;
        int filesizes[numrequeststosend];
        int fdarray[numrequeststosend];
@@ -123,19 +139,22 @@ void IoTQuery::getSlot() {
                }
                delete filename;
        }
-       const char header[]="getdata";
-       long long size=sizeof(header)+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
+       const char header[]="getslot";
+       long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
        char * response = new char[size];
        long long offset=0;
-       memcpy(response, header, sizeof(header));
-       offset+=sizeof(header);
-       memcpy(response + offset, &numrequeststosend, sizeof(numrequeststosend));
+       memcpy(response, header, sizeof(header)-1);
+       offset+=sizeof(header)-1;
+       int numreq=htonl(numrequeststosend);
+       cerr << numrequeststosend << " " << numreq << endl;
+       memcpy(response + offset, &numreq, sizeof(numreq));
        offset+=sizeof(numrequeststosend);
        for(int i=0;i<numrequeststosend;i++) {
-               memcpy(response + offset, &filesizes[i], sizeof(int));
+               int filesize=htonl(filesizes[i]);
+               memcpy(response + offset, &filesize, sizeof(filesize));
                offset+=sizeof(int);
        }
-
+       
        //copy file data
        for(int i=0;i<numrequeststosend;i++) {
                if (fdarray[i]>=0) {
@@ -160,12 +179,13 @@ void IoTQuery::putSlot() {
                getSlot();
                return;
        }
-       
+
        int numberofliveslots=(int) ((newestentry-oldestentry)+1);
        if (numberofliveslots >=  numqueueentries) {
                //need to drop slot
                removeOldestSlot();
        }
+       
        //write slot data out to file
        char *filename = getSlotFileName(requestsequencenumber);
        int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
@@ -174,8 +194,8 @@ void IoTQuery::putSlot() {
        delete filename;
        newestentry = requestsequencenumber; // update sequence number
        updateStatusFile(); // update counts
-       char command[]="putdata";
-       sendResponse(command, sizeof(command));
+       char command[]="putslot";
+       sendResponse(command, sizeof(command)-1);
 }
 
 void IoTQuery::sendResponse(char * bytes, int len) {
@@ -206,25 +226,34 @@ void IoTQuery::processQuery() {
        getDirectory();
        readData();
 
-       if (strncmp(method, "POST", 4) != 0)
+       if (strncmp(method, "POST", 4) != 0) {
+               cerr << "Not POST Request" << endl;
                return;
+       }
 
        if (directory == NULL ||
-                       !checkDirectory())
+                       !checkDirectory()) {
+               cerr << "Directory " << directory << " does not exist" << endl;
                return;
+       }
 
-       if (!openStatusFile())
+       if (!openStatusFile()) {
+               cerr << "Failed to open status file" << endl;
                return;
+       }
 
        flock(fd, LOCK_EX);
 
        decodeQuery();
-
+       
        if (reqGetSlot)
                getSlot();
        else if (reqPutSlot)
                putSlot();
-       else return;
+       else {
+               cerr << "No recognized request" << endl;
+               return;
+       }
 }
 
 void IoTQuery::readData() {
@@ -246,7 +275,7 @@ void IoTQuery::getQuery() {
        iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
 
        char * reqlength = FCGX_GetParam(length_str, request->envp);
-       if (length) {
+       if (reqlength) {
                length=strtoll(reqlength, NULL, 10);
        } else {
                length=0;
@@ -263,7 +292,7 @@ void IoTQuery::getDirectory() {
        directory = new char[directory_len];
        memcpy(directory, iotcloudroot, rootdir_len);
        memcpy(directory + rootdir_len, uri, split_len);
-       directory[directory_len]=0;
+       directory[directory_len-1]=0;
 }
 
 int doread(int fd, void *ptr, size_t count, off_t offset) {
@@ -295,9 +324,11 @@ bool IoTQuery::openStatusFile() {
        fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
        delete filename;
 
-       if (fd < 0)
+       if (fd < 0) {
+               cerr << strerror(errno) << " error opening statusfile" << endl;
                return false;
-
+       }
+       
        int size;
        int needwrite=0;
        if (doread(fd, &size, sizeof(size), OFFSET_MAX))