Server Code
[iotcloud.git] / src / server / iotquery.cpp
index 89b98a90f076846f72e3bb51a24e62046880cd36..9ab3f27aab15d6374060ff86a6d659b8320c7ebc 100644 (file)
@@ -6,6 +6,8 @@
 #include <fcntl.h>
 #include <unistd.h>
 #include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
 
 using namespace std;
 
@@ -16,122 +18,339 @@ const char * iotcloudroot_str="IOTCLOUD_ROOT";
 const char * length_str="CONTENT_LENGTH";
 
 IoTQuery::IoTQuery(FCGX_Request *request) :
-  request(request),
-  data(NULL),
-  directory(NULL),
-  uri(NULL),
-  query(NULL),
-  method(NULL),
-  iotcloudroot(NULL),
-  dir(NULL),
-  length(0),
-  fd(-1)
-{
+       request(request),
+       data(NULL),
+       directory(NULL),
+       uri(NULL),
+       query(NULL),
+       method(NULL),
+       iotcloudroot(NULL),
+       length(0),
+       oldestentry(0),
+       newestentry(0),
+       requestsequencenumber(0),
+       numqueueentries(DEFAULT_SIZE),
+       fd(-1),
+       reqGetSlot(false),
+       reqPutSlot(false) {
 }
 
-
 IoTQuery::~IoTQuery() {
-  if (fd >= 0) {
-    close(fd);
-  }
-  if (directory)
-    delete directory;
-  if (data)
-    delete data;
-  if (dir != NULL)
-    closedir(dir);
-}
-  
-void IoTQuery::processQuery() {
-  parseQuery();
-  getDirectory();
-  readData();
-
-  if (strncmp(method, "POST", 4) != 0)
-    return;
-  
-  if (directory == NULL ||
-      (dir = opendir(directory)) == NULL)
-    return;
-  
-  if (openMaxFile() < 0)
-    return;
-
-  flock(fd, LOCK_EX);
-      
-  cout << "Content-type: text/html\r\n"
-       << "\r\n"
-       << "<html>\n"
-       << "  <head>\n"
-       << "    <title>Hello, World!</title>\n"
-       << "  </head>\n"
-       << "  <body>\n"
-       << "    <h1>Hello, World!</h1>\n"
-       << "  </body>\n";
-  
-  cout << uri_str << " " << uri << "\n";
-  cout << query_str << " " << query << "\n";
-  cout << method_str << " " << method << "\n";
-  cout << iotcloudroot_str << " " << iotcloudroot << "\n";
-  if (data)
-    cout << "[" << data << "]";      
-  
-  
-  
-  cout << "</html>\n";
+       if (fd >= 0)
+               close(fd);
+       if (directory)
+               delete directory;
+       if (data)
+               delete data;
+}
+
+bool IoTQuery::checkDirectory() {
+       struct stat s;
+       int err=stat(directory, &s);
+       if (-1 == err)
+               return false;
+       return S_ISDIR(s.st_mode);
 }
 
+void IoTQuery::decodeQuery() {
+       int len=strlen(query);
+       char * str=new char[len+1];
+       memcpy(str, query, len+1);
+       char *tok_ptr=str;
+       
+       /* Parse commands */
+       char *command=strsep(&tok_ptr, "&");
+       if (strncmp(command, "req=putslot", 11) == 0)
+               reqPutSlot = true;
 
-void IoTQuery::readData() {
-  if (length) {
-    data = new char[length+1];
-    memset(data, 0, length+1);
-    cin.read(data, length);
-  }
-  do {
-    char dummy;
-    cin >> dummy;
-  } while (!cin.eof());
+       if (strncmp(command, "req=getslot", 11) == 0)
+               reqGetSlot = true;
+
+       /* Load Sequence Number for request */
+       char *sequencenumber_str = strsep(&tok_ptr, "&");
+       if (sequencenumber_str != NULL &&
+                       strncmp(sequencenumber_str, "seq=", 4) == 0) {
+               sequencenumber_str = strchr(sequencenumber_str, '=');
+               if (sequencenumber_str != NULL) {
+                       requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+               }
+       }
+       
+       /* Update size if we get request */
+       char * numqueueentries_str = tok_ptr;
+       if (numqueueentries_str != NULL &&
+                       strncmp(numqueueentries_str, "max=", 4) == 0) {
+               numqueueentries_str = strchr(numqueueentries_str + 1, '=');
+               numqueueentries = strtoll(numqueueentries_str, NULL, 10);
+       }
+               
+       delete str;
+}
+
+void doWrite(int fd, char *data, long long length) {
+       long long offset=0;
+       do {
+               long long byteswritten=write(fd, &data[offset], length);
+               if (byteswritten > 0) {
+                       length -= byteswritten;
+                       offset += byteswritten;
+               } else {
+                       cerr << "Bytes not written" << endl;
+                       if (byteswritten < 0) {
+                               cerr << strerror(errno) << " error writing slot file" << endl;
+                       }
+                       return;
+               }
+       } while(length != 0);
+}
+
+bool doRead(int fd, void *buf, int numbytes) {
+       int offset=0;
+       char *ptr=(char *)buf;
+       do {
+               int bytesread=read(fd, ptr+offset, numbytes);
+               if (bytesread > 0) {
+                       offset += bytesread;
+                       numbytes -= bytesread;
+               } else
+                       return false;
+       } while (numbytes!=0);
+       return true;
+}
+
+void IoTQuery::getSlot() {
+       int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
+       if (numrequeststosend < 0)
+               numrequeststosend = 0;
+       long long numbytes = 0;
+       int filesizes[numrequeststosend];
+       int fdarray[numrequeststosend];
+       int index=0;
+       for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+               struct stat st;
+               char *filename=getSlotFileName(seqn);
+               if (stat(filename, &st) == 0) {
+                       fdarray[index]=open(filename, O_RDONLY);
+                       filesizes[index]=st.st_size;
+                       numbytes+=filesizes[index];
+               } else {
+                       fdarray[index]=-1;
+                       filesizes[index]=0;
+               }
+               delete filename;
+       }
+       const char header[]="getslot";
+       long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
+       char * response = new char[size];
+       long long offset=0;
+       memcpy(response, header, sizeof(header)-1);
+       offset+=sizeof(header)-1;
+       int numreq=htonl(numrequeststosend);
+       cerr << numrequeststosend << " " << numreq << endl;
+       memcpy(response + offset, &numreq, sizeof(numreq));
+       offset+=sizeof(numrequeststosend);
+       for(int i=0;i<numrequeststosend;i++) {
+               int filesize=htonl(filesizes[i]);
+               memcpy(response + offset, &filesize, sizeof(filesize));
+               offset+=sizeof(int);
+       }
+       
+       //copy file data
+       for(int i=0;i<numrequeststosend;i++) {
+               if (fdarray[i]>=0) {
+                       doRead(fdarray[i], response+offset, filesizes[i]);
+                       offset+=filesizes[i];
+               }
+       }
+       
+       //write response out
+       sendResponse(response, size);
+
+       //cleanup
+       delete response;
+       for(int i=0;i<numrequeststosend;i++) {
+               if (fdarray[i] >= 0)
+                       close(fdarray[i]);
+       }
+}
+
+void IoTQuery::putSlot() {
+       if (requestsequencenumber!=(newestentry+1)) {
+               getSlot();
+               return;
+       }
+
+       int numberofliveslots=(int) ((newestentry-oldestentry)+1);
+       if (numberofliveslots >=  numqueueentries) {
+               //need to drop slot
+               removeOldestSlot();
+       }
+       
+       //write slot data out to file
+       char *filename = getSlotFileName(requestsequencenumber);
+       int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
+       doWrite(slotfd, data, length);
+       close(slotfd);
+       delete filename;
+       newestentry = requestsequencenumber; // update sequence number
+       updateStatusFile(); // update counts
+       char command[]="putslot";
+       sendResponse(command, sizeof(command)-1);
+}
+
+void IoTQuery::sendResponse(char * bytes, int len) {
+       cout << "Accept-Ranges: bytes\r\n"
+                        << "Content-Length: " << len << "\r\n"
+                        << "\r\n";
+       cout.write(bytes, len); 
+}
+
+char * IoTQuery::getSlotFileName(long long slot) {
+       int directorylen=strlen(directory);
+       char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination
+       snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
+       return filename;
+}
+
+void IoTQuery::removeOldestSlot() {
+       if (oldestentry!=0) {
+               char * filename=getSlotFileName(oldestentry);
+               unlink(filename);
+               delete filename;
+       }
+       oldestentry++;
+}
+
+void IoTQuery::processQuery() {
+       getQuery();
+       getDirectory();
+       readData();
+
+       if (strncmp(method, "POST", 4) != 0) {
+               cerr << "Not POST Request" << endl;
+               return;
+       }
+
+       if (directory == NULL ||
+                       !checkDirectory()) {
+               cerr << "Directory " << directory << " does not exist" << endl;
+               return;
+       }
+
+       if (!openStatusFile()) {
+               cerr << "Failed to open status file" << endl;
+               return;
+       }
+
+       flock(fd, LOCK_EX);
+
+       decodeQuery();
+       
+       if (reqGetSlot)
+               getSlot();
+       else if (reqPutSlot)
+               putSlot();
+       else {
+               cerr << "No recognized request" << endl;
+               return;
+       }
 }
 
+void IoTQuery::readData() {
+       if (length) {
+               data = new char[length+1];
+               memset(data, 0, length+1);
+               cin.read(data, length);
+       }
+       do {
+               char dummy;
+               cin >> dummy;
+       } while (!cin.eof());
+}
 
-void IoTQuery::parseQuery() {
-  uri = FCGX_GetParam(uri_str, request->envp);
-  query = FCGX_GetParam(query_str, request->envp);
-  method = FCGX_GetParam(method_str, request->envp);
-  iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+void IoTQuery::getQuery() {
+       uri = FCGX_GetParam(uri_str, request->envp);
+       query = FCGX_GetParam(query_str, request->envp);
+       method = FCGX_GetParam(method_str, request->envp);
+       iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
 
-  char * reqlength = FCGX_GetParam(length_str, request->envp);
-  if (length) {
-    length=strtol(reqlength, NULL, 10);
-  } else {
-    length=0;
-  }
+       char * reqlength = FCGX_GetParam(length_str, request->envp);
+       if (reqlength) {
+               length=strtoll(reqlength, NULL, 10);
+       } else {
+               length=0;
+       }
 }
 
 void IoTQuery::getDirectory() {
-  char * split = strchr((char *)uri, '?');
-  if (split == NULL)
-    return;
-  int split_len = (int) (split-uri);
-  int rootdir_len = strlen(iotcloudroot);
-  int directory_len = split_len + rootdir_len + 1;
-  directory = new char[directory_len];
-  memcpy(directory, iotcloudroot, rootdir_len);
-  memcpy(directory + rootdir_len, uri, split_len);
-  directory[directory_len]=0;
-}
-
-int IoTQuery::openMaxFile() {
-  char maxfile[]="queuesize";
-  int len=strlen(directory);
-
-  char * filename=new char[len+sizeof(maxfile)+2];
-  memcpy(filename, directory, len);
-  filename[len]='/';
-  memcpy(filename+len+1, maxfile, sizeof(maxfile));
-  filename[len+sizeof(maxfile)+1]=0;
-  fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
-  delete filename;
-  return fd;
+       char * split = strchr((char *)uri, '?');
+       if (split == NULL)
+               return;
+       int split_len = (int) (split-uri);
+       int rootdir_len = strlen(iotcloudroot);
+       int directory_len = split_len + rootdir_len + 1;
+       directory = new char[directory_len];
+       memcpy(directory, iotcloudroot, rootdir_len);
+       memcpy(directory + rootdir_len, uri, split_len);
+       directory[directory_len-1]=0;
 }
+
+int doread(int fd, void *ptr, size_t count, off_t offset) {
+       do {
+               size_t bytesread=pread(fd, ptr, count, offset);
+               if (bytesread==count) {
+                       return 1;
+               } else if (bytesread==0) {
+                       return 0;
+               }
+       } while(1);
+}
+
+void IoTQuery::updateStatusFile() {
+       pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
+       pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
+       pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
+}
+
+bool IoTQuery::openStatusFile() {
+       char statusfile[]="queuestatus";
+       int len=strlen(directory);
+
+       char * filename=new char[len+sizeof(statusfile)+2];
+       memcpy(filename, directory, len);
+       filename[len]='/';
+       memcpy(filename+len+1, statusfile, sizeof(statusfile));
+       filename[len+sizeof(statusfile)+1]=0;
+       fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
+       delete filename;
+
+       if (fd < 0) {
+               cerr << strerror(errno) << " error opening statusfile" << endl;
+               return false;
+       }
+       
+       int size;
+       int needwrite=0;
+       if (doread(fd, &size, sizeof(size), OFFSET_MAX))
+               numqueueentries=size;
+       else
+               needwrite=1;
+
+       long long entry;
+       if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
+               oldestentry=entry;
+       else
+               needwrite=1;
+
+       if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
+               newestentry=entry;
+       else
+               needwrite=1;
+
+       if (needwrite)
+               updateStatusFile();
+
+       return true;
+}
+
+