From: joelbandi Date: Fri, 22 Jul 2016 21:28:41 +0000 (-0700) Subject: Merge branch 'master' of ssh://plrg.eecs.uci.edu/home/git/iotcloud X-Git-Url: http://plrg.eecs.uci.edu/git/?p=iotcloud.git;a=commitdiff_plain;h=31fb11ef209d4f4d6d935320796b8e78a55c9722;hp=cda706a6f328cfb85fe7a3ac9c4f23f89c0eca1e Merge branch 'master' of ssh://plrg.eecs.uci.edu/home/git/iotcloud --- diff --git a/src/server/.dir-locals.el b/src/server/.dir-locals.el index ce85e5f..e166a2e 100644 --- a/src/server/.dir-locals.el +++ b/src/server/.dir-locals.el @@ -1 +1,2 @@ -((nil . ((indent-tabs-mode . t)))) \ No newline at end of file +((nil . ((indent-tabs-mode . t)))) + diff --git a/src/server/iotquery.cpp b/src/server/iotquery.cpp index 8fc9fad..39dacab 100644 --- a/src/server/iotquery.cpp +++ b/src/server/iotquery.cpp @@ -23,10 +23,9 @@ IoTQuery::IoTQuery(FCGX_Request *request) : query(NULL), method(NULL), iotcloudroot(NULL), - dir(NULL), length(0), - firstentry(0), - lastentry(0), + oldestentry(0), + newestentry(0), requestsequencenumber(0), numqueueentries(DEFAULT_SIZE), fd(-1) { @@ -39,8 +38,6 @@ IoTQuery::~IoTQuery() { delete directory; if (data) delete data; - if (dir != NULL) - closedir(dir); } bool IoTQuery::checkDirectory() { @@ -69,22 +66,139 @@ void IoTQuery::decodeQuery() { char *sequencenumber_str = strsep(&tok_ptr, "&"); if (sequencenumber_str != NULL) - requestsequencenumber = strtol(sequencenumber_str, NULL, 10); + requestsequencenumber = strtoll(sequencenumber_str, NULL, 10); /* Update size if we get request */ char * numqueueentries_str = tok_ptr; if (numqueueentries_str != NULL) - numqueueentries = strtol(numqueueentries_str, NULL, 10); + numqueueentries = strtoll(numqueueentries_str, NULL, 10); delete str; } +void doWrite(int fd, char *data, long long length) { + long long offset=0; + do { + long long byteswritten=write(fd, &data[offset], length); + if (byteswritten > 0) { + length -= byteswritten; + offset += byteswritten; + } else { + cerr << "Bytes not written"; + return; + } + } while(length != 0); +} + +bool doRead(int fd, void *buf, int numbytes) { + int offset=0; + char *ptr=(char *)buf; + do { + int bytesread=read(fd, ptr+offset, numbytes); + if (bytesread > 0) { + offset += bytesread; + numbytes -= bytesread; + } else + return false; + } while (numbytes!=0); + return true; +} + void IoTQuery::getSlot() { + int numrequeststosend = (int)((newestentry-requestsequencenumber)+1); + long long numbytes = 0; + int filesizes[numrequeststosend]; + int fdarray[numrequeststosend]; + int index=0; + for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) { + struct stat st; + char *filename=getSlotFileName(seqn); + if (stat(filename, &st) == 0) { + fdarray[index]=open(filename, O_RDONLY); + filesizes[index]=st.st_size; + numbytes+=filesizes[index]; + } else { + fdarray[index]=-1; + filesizes[index]=0; + } + delete filename; + } + const char header[]="getdata"; + long long size=sizeof(header)+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes + char * response = new char[size]; + long long offset=0; + memcpy(response, header, sizeof(header)); + offset+=sizeof(header); + memcpy(response + offset, &numrequeststosend, sizeof(numrequeststosend)); + offset+=sizeof(numrequeststosend); + for(int i=0;i=0) { + doRead(fdarray[i], response+offset, filesizes[i]); + offset+=filesizes[i]; + } + } + + //write response out + sendResponse(response, size); + + //cleanup + delete response; + for(int i=0;i= 0) + close(fdarray[i]); + } } void IoTQuery::putSlot() { + if (requestsequencenumber!=(newestentry+1)) { + getSlot(); + return; + } + + int numberofliveslots=(int) ((newestentry-oldestentry)+1); + if (numberofliveslots >= numqueueentries) { + //need to drop slot + removeOldestSlot(); + } + //write slot data out to file + char *filename = getSlotFileName(requestsequencenumber); + int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR); + doWrite(slotfd, data, length); + close(slotfd); + delete filename; + newestentry = requestsequencenumber; // update sequence number + updateStatusFile(); // update counts + char command[]="putdata"; + sendResponse(command, sizeof(command)); +} + +void IoTQuery::sendResponse(char * bytes, int len) { + cout << "Accept-Ranges: bytes\r\n" + << "Content-Length: " << len << "\r\n" + << "\r\n"; + cout.write(bytes, len); +} +char * IoTQuery::getSlotFileName(long long slot) { + int directorylen=strlen(directory); + char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination + snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot); + return filename; +} + +void IoTQuery::removeOldestSlot() { + if (oldestentry!=0) { + char * filename=getSlotFileName(oldestentry); + unlink(filename); + delete filename; + } + oldestentry++; } void IoTQuery::processQuery() { @@ -133,7 +247,7 @@ void IoTQuery::getQuery() { char * reqlength = FCGX_GetParam(length_str, request->envp); if (length) { - length=strtol(reqlength, NULL, 10); + length=strtoll(reqlength, NULL, 10); } else { length=0; } @@ -165,8 +279,8 @@ int doread(int fd, void *ptr, size_t count, off_t offset) { void IoTQuery::updateStatusFile() { pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX); - pwrite(fd, &firstentry, sizeof(firstentry), OFFSET_FIRST); - pwrite(fd, &lastentry, sizeof(lastentry), OFFSET_LAST); + pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD); + pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW); } bool IoTQuery::openStatusFile() { @@ -191,14 +305,14 @@ bool IoTQuery::openStatusFile() { else needwrite=1; - long entry; - if (doread(fd, &entry, sizeof(entry), OFFSET_FIRST)) - firstentry=entry; + long long entry; + if (doread(fd, &entry, sizeof(entry), OFFSET_OLD)) + oldestentry=entry; else needwrite=1; - if (doread(fd, &entry, sizeof(entry), OFFSET_LAST)) - lastentry=entry; + if (doread(fd, &entry, sizeof(entry), OFFSET_NEW)) + newestentry=entry; else needwrite=1; diff --git a/src/server/iotquery.h b/src/server/iotquery.h index 5aba593..6e35f9c 100644 --- a/src/server/iotquery.h +++ b/src/server/iotquery.h @@ -3,12 +3,11 @@ #include #include "fcgio.h" #include "fcgi_stdio.h" -#include #define DEFAULT_SIZE 128 #define OFFSET_MAX 0 -#define OFFSET_FIRST 4 -#define OFFSET_LAST 12 +#define OFFSET_OLD 4 +#define OFFSET_NEW 12 class IoTQuery { public: @@ -17,6 +16,7 @@ class IoTQuery { void processQuery(); private: + void sendResponse(char *data, int length); void getQuery(); void getDirectory(); void readData(); @@ -26,6 +26,8 @@ class IoTQuery { void decodeQuery(); void getSlot(); void putSlot(); + void removeOldestSlot(); + char * getSlotFileName(long long); FCGX_Request * request; char *data; @@ -34,11 +36,10 @@ class IoTQuery { const char * query; const char * method; const char * iotcloudroot; - DIR *dir; - long length; - long firstentry; - long lastentry; - long requestsequencenumber; + long long length; + long long oldestentry; + long long newestentry; + long long requestsequencenumber; int numqueueentries; int fd; bool reqGetSlot;