more bug fixes
[iotcloud.git] / src / server / iotquery.cpp
index d2d80e4177420a100a9900fbdd625289bc910735..8d708cb1edeb1385c2d81a7ef2e3df3ddc511ce4 100644 (file)
@@ -6,6 +6,8 @@
 #include <fcntl.h>
 #include <unistd.h>
 #include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
 
 using namespace std;
 
@@ -28,7 +30,9 @@ IoTQuery::IoTQuery(FCGX_Request *request) :
        newestentry(0),
        requestsequencenumber(0),
        numqueueentries(DEFAULT_SIZE),
-       fd(-1) {
+       fd(-1),
+       reqGetSlot(false),
+       reqPutSlot(false) {
 }
 
 IoTQuery::~IoTQuery() {
@@ -56,22 +60,33 @@ void IoTQuery::decodeQuery() {
 
        /* Parse commands */
        char *command=strsep(&tok_ptr, "&");
-       if (strncmp(command, "putslot", 7) == 0)
+       if (strncmp(command, "req=putslot", 11) == 0)
                reqPutSlot = true;
 
-       if (strncmp(command, "getslot", 7) == 0)
+       if (strncmp(command, "req=getslot", 11) == 0)
                reqGetSlot = true;
 
        /* Load Sequence Number for request */
        char *sequencenumber_str = strsep(&tok_ptr, "&");
+       if (sequencenumber_str != NULL &&
+                       strncmp(sequencenumber_str, "seq=", 4) == 0) {
+               sequencenumber_str = strchr(sequencenumber_str, '=');
+               if (sequencenumber_str != NULL) {
+                       requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+               }
+       }
 
-       if (sequencenumber_str != NULL)
-               requestsequencenumber = strtoll(sequencenumber_str, NULL, 10);
+       //don't allow a really old sequence number
+       if (requestsequencenumber < oldestentry)
+               requestsequencenumber = oldestentry;
 
        /* Update size if we get request */
        char * numqueueentries_str = tok_ptr;
-       if (numqueueentries_str != NULL)
+       if (numqueueentries_str != NULL &&
+                       strncmp(numqueueentries_str, "max=", 4) == 0) {
+               numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
                numqueueentries = strtoll(numqueueentries_str, NULL, 10);
+       }
 
        delete str;
 }
@@ -84,14 +99,82 @@ void doWrite(int fd, char *data, long long length) {
                        length -= byteswritten;
                        offset += byteswritten;
                } else {
-                       cerr << "Bytes not written";
+                       cerr << "Bytes not written" << endl;
+                       if (byteswritten < 0) {
+                               cerr << strerror(errno) << " error writing slot file" << endl;
+                       }
                        return;
                }
        } while(length != 0);
 }
 
+bool doRead(int fd, void *buf, int numbytes) {
+       int offset=0;
+       char *ptr=(char *)buf;
+       do {
+               int bytesread=read(fd, ptr+offset, numbytes);
+               if (bytesread > 0) {
+                       offset += bytesread;
+                       numbytes -= bytesread;
+               } else
+                       return false;
+       } while (numbytes!=0);
+       return true;
+}
+
 void IoTQuery::getSlot() {
-       
+       int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
+       if (numrequeststosend < 0)
+               numrequeststosend = 0;
+       long long numbytes = 0;
+       int filesizes[numrequeststosend];
+       int fdarray[numrequeststosend];
+       int index=0;
+       for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+               struct stat st;
+               char *filename=getSlotFileName(seqn);
+               if (stat(filename, &st) == 0) {
+                       fdarray[index]=open(filename, O_RDONLY);
+                       filesizes[index]=st.st_size;
+                       numbytes+=filesizes[index];
+               } else {
+                       fdarray[index]=-1;
+                       filesizes[index]=0;
+               }
+               delete filename;
+       }
+       const char header[]="getslot";
+       long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         //header + payload + file count + sizes
+       char * response = new char[size];
+       long long offset=0;
+       memcpy(response, header, sizeof(header)-1);
+       offset+=sizeof(header)-1;
+       int numreq=htonl(numrequeststosend);
+       memcpy(response + offset, &numreq, sizeof(numreq));
+       offset+=sizeof(numrequeststosend);
+       for(int i=0; i<numrequeststosend; i++) {
+               int filesize=htonl(filesizes[i]);
+               memcpy(response + offset, &filesize, sizeof(filesize));
+               offset+=sizeof(int);
+       }
+
+       //copy file data
+       for(int i=0; i<numrequeststosend; i++) {
+               if (fdarray[i]>=0) {
+                       doRead(fdarray[i], response+offset, filesizes[i]);
+                       offset+=filesizes[i];
+               }
+       }
+
+       //write response out
+       sendResponse(response, size);
+
+       //cleanup
+       delete response;
+       for(int i=0; i<numrequeststosend; i++) {
+               if (fdarray[i] >= 0)
+                       close(fdarray[i]);
+       }
 }
 
 void IoTQuery::putSlot() {
@@ -99,31 +182,39 @@ void IoTQuery::putSlot() {
                getSlot();
                return;
        }
-       
+
        int numberofliveslots=(int) ((newestentry-oldestentry)+1);
        if (numberofliveslots >=  numqueueentries) {
                //need to drop slot
                removeOldestSlot();
        }
+
        //write slot data out to file
        char *filename = getSlotFileName(requestsequencenumber);
        int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
        doWrite(slotfd, data, length);
        close(slotfd);
        delete filename;
-       newestentry = requestsequencenumber; // update sequence number
-       updateStatusFile(); // update counts
+       newestentry = requestsequencenumber;                                                                                                                                                                                                                                                                                                                                                                                                                                    // update sequence number
+       updateStatusFile();                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             // update counts
+       char command[]="putslot";
+       sendResponse(command, sizeof(command)-1);
+}
+
+void IoTQuery::sendResponse(char * bytes, int len) {
+       cout << "Accept-Ranges: bytes\r\n"
+                        << "Content-Length: " << len << "\r\n"
+                        << "\r\n";
+       cout.write(bytes, len);
 }
 
 char * IoTQuery::getSlotFileName(long long slot) {
        int directorylen=strlen(directory);
-       char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination
+       char * filename=new char[25+directorylen];                                                                                                                                                                                                                                                                                                                                                                                                                                                                              //19 digits for long number + 4 characters for SLOT + 1 character for null termination
        snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
        return filename;
 }
 
-
-
 void IoTQuery::removeOldestSlot() {
        if (oldestentry!=0) {
                char * filename=getSlotFileName(oldestentry);
@@ -138,15 +229,21 @@ void IoTQuery::processQuery() {
        getDirectory();
        readData();
 
-       if (strncmp(method, "POST", 4) != 0)
+       if (strncmp(method, "POST", 4) != 0) {
+               cerr << "Not POST Request" << endl;
                return;
+       }
 
        if (directory == NULL ||
-                       !checkDirectory())
+                       !checkDirectory()) {
+               cerr << "Directory " << directory << " does not exist" << endl;
                return;
+       }
 
-       if (!openStatusFile())
+       if (!openStatusFile()) {
+               cerr << "Failed to open status file" << endl;
                return;
+       }
 
        flock(fd, LOCK_EX);
 
@@ -156,7 +253,10 @@ void IoTQuery::processQuery() {
                getSlot();
        else if (reqPutSlot)
                putSlot();
-       else return;
+       else {
+               cerr << "No recognized request" << endl;
+               return;
+       }
 }
 
 void IoTQuery::readData() {
@@ -178,7 +278,7 @@ void IoTQuery::getQuery() {
        iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
 
        char * reqlength = FCGX_GetParam(length_str, request->envp);
-       if (length) {
+       if (reqlength) {
                length=strtoll(reqlength, NULL, 10);
        } else {
                length=0;
@@ -195,7 +295,7 @@ void IoTQuery::getDirectory() {
        directory = new char[directory_len];
        memcpy(directory, iotcloudroot, rootdir_len);
        memcpy(directory + rootdir_len, uri, split_len);
-       directory[directory_len]=0;
+       directory[directory_len-1]=0;
 }
 
 int doread(int fd, void *ptr, size_t count, off_t offset) {
@@ -227,8 +327,10 @@ bool IoTQuery::openStatusFile() {
        fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
        delete filename;
 
-       if (fd < 0)
+       if (fd < 0) {
+               cerr << strerror(errno) << " error opening statusfile" << endl;
                return false;
+       }
 
        int size;
        int needwrite=0;