From d197f7b34c7037ce55f446c952f1b2d25fcec900 Mon Sep 17 00:00:00 2001 From: Brian Demsky Date: Fri, 22 Jul 2016 19:07:13 -0700 Subject: [PATCH] Server Code --- src/java/iotcloud/CloudComm.java | 79 +++++++++++++++++++++++++++ src/java/iotcloud/Slot.java | 24 +++++++++ src/server/iotquery.cpp | 91 +++++++++++++++++++++----------- 3 files changed, 164 insertions(+), 30 deletions(-) create mode 100644 src/java/iotcloud/CloudComm.java create mode 100644 src/java/iotcloud/Slot.java diff --git a/src/java/iotcloud/CloudComm.java b/src/java/iotcloud/CloudComm.java new file mode 100644 index 0000000..18f7f30 --- /dev/null +++ b/src/java/iotcloud/CloudComm.java @@ -0,0 +1,79 @@ +package iotcloud; +import java.io.*; +import java.net.*; +import java.util.Arrays; + +class CloudComm { + String baseurl; + CloudComm(String _baseurl) { + this.baseurl=_baseurl; + } + + private URL buildRequest(boolean isput, long sequencenumber, long maxentries) throws IOException { + String reqstring=isput?"req=putslot":"req=getslot"; + String urlstr=baseurl+"?"+reqstring+"&seq="+sequencenumber; + if (maxentries != 0) + urlstr += "&max="+maxentries; + return new URL(urlstr); + } + + public Slot[] putSlot(Slot slot, int max) throws IOException{ + long sequencenumber=slot.getSequenceNumber(); + URL url=buildRequest(true, sequencenumber, max); + URLConnection con=url.openConnection(); + HttpURLConnection http = (HttpURLConnection) con; + http.setRequestMethod("POST"); + http.setFixedLengthStreamingMode(slot.getBytes().length); + http.setDoOutput(true); + http.connect(); + OutputStream os=http.getOutputStream(); + os.write(slot.getBytes()); + System.out.println(http.getResponseMessage()); + + InputStream is=http.getInputStream(); + DataInputStream dis=new DataInputStream(is); + byte[] resptype=new byte[7]; + dis.readFully(resptype); + if (Arrays.equals(resptype, "getslot".getBytes())) + return processSlots(dis, sequencenumber); + else if (Arrays.equals(resptype, "putslot".getBytes())) + return null; + else + throw new Error("Bad response to putslot"); + } + + public Slot[] getSlots(long sequencenumber) throws IOException { + URL url=buildRequest(false, sequencenumber, 0); + URLConnection con=url.openConnection(); + HttpURLConnection http = (HttpURLConnection) con; + http.setRequestMethod("POST"); + http.connect(); + System.out.println(http.getResponseMessage()); + InputStream is=http.getInputStream(); + + DataInputStream dis=new DataInputStream(is); + byte[] resptype=new byte[7]; + dis.readFully(resptype); + if (!Arrays.equals(resptype, "getslot".getBytes())) + throw new Error("Bad Response: "+new String(resptype)); + else + return processSlots(dis, sequencenumber); + } + + Slot[] processSlots(DataInputStream dis, long sequencenumber) throws IOException { + int numberofslots=dis.readInt(); + int[] sizesofslots=new int[numberofslots]; + Slot[] slots=new Slot[numberofslots]; + System.out.println(numberofslots); + for(int i=0;i"; + } +} diff --git a/src/server/iotquery.cpp b/src/server/iotquery.cpp index 39dacab..9ab3f27 100644 --- a/src/server/iotquery.cpp +++ b/src/server/iotquery.cpp @@ -6,6 +6,8 @@ #include #include #include +#include +#include using namespace std; @@ -28,7 +30,9 @@ IoTQuery::IoTQuery(FCGX_Request *request) : newestentry(0), requestsequencenumber(0), numqueueentries(DEFAULT_SIZE), - fd(-1) { + fd(-1), + reqGetSlot(false), + reqPutSlot(false) { } IoTQuery::~IoTQuery() { @@ -53,26 +57,33 @@ void IoTQuery::decodeQuery() { char * str=new char[len+1]; memcpy(str, query, len+1); char *tok_ptr=str; - + /* Parse commands */ char *command=strsep(&tok_ptr, "&"); - if (strncmp(command, "putslot", 7) == 0) + if (strncmp(command, "req=putslot", 11) == 0) reqPutSlot = true; - if (strncmp(command, "getslot", 7) == 0) + if (strncmp(command, "req=getslot", 11) == 0) reqGetSlot = true; /* Load Sequence Number for request */ char *sequencenumber_str = strsep(&tok_ptr, "&"); - - if (sequencenumber_str != NULL) - requestsequencenumber = strtoll(sequencenumber_str, NULL, 10); - + if (sequencenumber_str != NULL && + strncmp(sequencenumber_str, "seq=", 4) == 0) { + sequencenumber_str = strchr(sequencenumber_str, '='); + if (sequencenumber_str != NULL) { + requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10); + } + } + /* Update size if we get request */ char * numqueueentries_str = tok_ptr; - if (numqueueentries_str != NULL) + if (numqueueentries_str != NULL && + strncmp(numqueueentries_str, "max=", 4) == 0) { + numqueueentries_str = strchr(numqueueentries_str + 1, '='); numqueueentries = strtoll(numqueueentries_str, NULL, 10); - + } + delete str; } @@ -84,7 +95,10 @@ void doWrite(int fd, char *data, long long length) { length -= byteswritten; offset += byteswritten; } else { - cerr << "Bytes not written"; + cerr << "Bytes not written" << endl; + if (byteswritten < 0) { + cerr << strerror(errno) << " error writing slot file" << endl; + } return; } } while(length != 0); @@ -106,6 +120,8 @@ bool doRead(int fd, void *buf, int numbytes) { void IoTQuery::getSlot() { int numrequeststosend = (int)((newestentry-requestsequencenumber)+1); + if (numrequeststosend < 0) + numrequeststosend = 0; long long numbytes = 0; int filesizes[numrequeststosend]; int fdarray[numrequeststosend]; @@ -123,19 +139,22 @@ void IoTQuery::getSlot() { } delete filename; } - const char header[]="getdata"; - long long size=sizeof(header)+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes + const char header[]="getslot"; + long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes char * response = new char[size]; long long offset=0; - memcpy(response, header, sizeof(header)); - offset+=sizeof(header); - memcpy(response + offset, &numrequeststosend, sizeof(numrequeststosend)); + memcpy(response, header, sizeof(header)-1); + offset+=sizeof(header)-1; + int numreq=htonl(numrequeststosend); + cerr << numrequeststosend << " " << numreq << endl; + memcpy(response + offset, &numreq, sizeof(numreq)); offset+=sizeof(numrequeststosend); for(int i=0;i=0) { @@ -160,12 +179,13 @@ void IoTQuery::putSlot() { getSlot(); return; } - + int numberofliveslots=(int) ((newestentry-oldestentry)+1); if (numberofliveslots >= numqueueentries) { //need to drop slot removeOldestSlot(); } + //write slot data out to file char *filename = getSlotFileName(requestsequencenumber); int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR); @@ -174,8 +194,8 @@ void IoTQuery::putSlot() { delete filename; newestentry = requestsequencenumber; // update sequence number updateStatusFile(); // update counts - char command[]="putdata"; - sendResponse(command, sizeof(command)); + char command[]="putslot"; + sendResponse(command, sizeof(command)-1); } void IoTQuery::sendResponse(char * bytes, int len) { @@ -206,25 +226,34 @@ void IoTQuery::processQuery() { getDirectory(); readData(); - if (strncmp(method, "POST", 4) != 0) + if (strncmp(method, "POST", 4) != 0) { + cerr << "Not POST Request" << endl; return; + } if (directory == NULL || - !checkDirectory()) + !checkDirectory()) { + cerr << "Directory " << directory << " does not exist" << endl; return; + } - if (!openStatusFile()) + if (!openStatusFile()) { + cerr << "Failed to open status file" << endl; return; + } flock(fd, LOCK_EX); decodeQuery(); - + if (reqGetSlot) getSlot(); else if (reqPutSlot) putSlot(); - else return; + else { + cerr << "No recognized request" << endl; + return; + } } void IoTQuery::readData() { @@ -246,7 +275,7 @@ void IoTQuery::getQuery() { iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp); char * reqlength = FCGX_GetParam(length_str, request->envp); - if (length) { + if (reqlength) { length=strtoll(reqlength, NULL, 10); } else { length=0; @@ -263,7 +292,7 @@ void IoTQuery::getDirectory() { directory = new char[directory_len]; memcpy(directory, iotcloudroot, rootdir_len); memcpy(directory + rootdir_len, uri, split_len); - directory[directory_len]=0; + directory[directory_len-1]=0; } int doread(int fd, void *ptr, size_t count, off_t offset) { @@ -295,9 +324,11 @@ bool IoTQuery::openStatusFile() { fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR); delete filename; - if (fd < 0) + if (fd < 0) { + cerr << strerror(errno) << " error opening statusfile" << endl; return false; - + } + int size; int needwrite=0; if (doread(fd, &size, sizeof(size), OFFSET_MAX)) -- 2.34.1