X-Git-Url: http://plrg.eecs.uci.edu/git/?a=blobdiff_plain;ds=sidebyside;f=src%2Fserver%2Fiotquery.cpp;h=39dacab78b0ac104ad4a87fc06edde2325853b9a;hb=f4129f71302f3d6650c07e6559c1093aabc59135;hp=89b98a90f076846f72e3bb51a24e62046880cd36;hpb=18ea5b9b9fd5b062e156245a29ce8157b687045d;p=iotcloud.git diff --git a/src/server/iotquery.cpp b/src/server/iotquery.cpp index 89b98a9..39dacab 100644 --- a/src/server/iotquery.cpp +++ b/src/server/iotquery.cpp @@ -16,122 +16,310 @@ const char * iotcloudroot_str="IOTCLOUD_ROOT"; const char * length_str="CONTENT_LENGTH"; IoTQuery::IoTQuery(FCGX_Request *request) : - request(request), - data(NULL), - directory(NULL), - uri(NULL), - query(NULL), - method(NULL), - iotcloudroot(NULL), - dir(NULL), - length(0), - fd(-1) -{ + request(request), + data(NULL), + directory(NULL), + uri(NULL), + query(NULL), + method(NULL), + iotcloudroot(NULL), + length(0), + oldestentry(0), + newestentry(0), + requestsequencenumber(0), + numqueueentries(DEFAULT_SIZE), + fd(-1) { } - IoTQuery::~IoTQuery() { - if (fd >= 0) { - close(fd); - } - if (directory) - delete directory; - if (data) - delete data; - if (dir != NULL) - closedir(dir); -} - -void IoTQuery::processQuery() { - parseQuery(); - getDirectory(); - readData(); - - if (strncmp(method, "POST", 4) != 0) - return; - - if (directory == NULL || - (dir = opendir(directory)) == NULL) - return; - - if (openMaxFile() < 0) - return; - - flock(fd, LOCK_EX); - - cout << "Content-type: text/html\r\n" - << "\r\n" - << "\n" - << " \n" - << " Hello, World!\n" - << " \n" - << " \n" - << "

Hello, World!

\n" - << " \n"; - - cout << uri_str << " " << uri << "\n"; - cout << query_str << " " << query << "\n"; - cout << method_str << " " << method << "\n"; - cout << iotcloudroot_str << " " << iotcloudroot << "\n"; - if (data) - cout << "[" << data << "]"; - - - - cout << "\n"; + if (fd >= 0) + close(fd); + if (directory) + delete directory; + if (data) + delete data; } +bool IoTQuery::checkDirectory() { + struct stat s; + int err=stat(directory, &s); + if (-1 == err) + return false; + return S_ISDIR(s.st_mode); +} -void IoTQuery::readData() { - if (length) { - data = new char[length+1]; - memset(data, 0, length+1); - cin.read(data, length); - } - do { - char dummy; - cin >> dummy; - } while (!cin.eof()); +void IoTQuery::decodeQuery() { + int len=strlen(query); + char * str=new char[len+1]; + memcpy(str, query, len+1); + char *tok_ptr=str; + + /* Parse commands */ + char *command=strsep(&tok_ptr, "&"); + if (strncmp(command, "putslot", 7) == 0) + reqPutSlot = true; + + if (strncmp(command, "getslot", 7) == 0) + reqGetSlot = true; + + /* Load Sequence Number for request */ + char *sequencenumber_str = strsep(&tok_ptr, "&"); + + if (sequencenumber_str != NULL) + requestsequencenumber = strtoll(sequencenumber_str, NULL, 10); + + /* Update size if we get request */ + char * numqueueentries_str = tok_ptr; + if (numqueueentries_str != NULL) + numqueueentries = strtoll(numqueueentries_str, NULL, 10); + + delete str; +} + +void doWrite(int fd, char *data, long long length) { + long long offset=0; + do { + long long byteswritten=write(fd, &data[offset], length); + if (byteswritten > 0) { + length -= byteswritten; + offset += byteswritten; + } else { + cerr << "Bytes not written"; + return; + } + } while(length != 0); +} + +bool doRead(int fd, void *buf, int numbytes) { + int offset=0; + char *ptr=(char *)buf; + do { + int bytesread=read(fd, ptr+offset, numbytes); + if (bytesread > 0) { + offset += bytesread; + numbytes -= bytesread; + } else + return false; + } while (numbytes!=0); + return true; +} + +void IoTQuery::getSlot() { + int numrequeststosend = (int)((newestentry-requestsequencenumber)+1); + long long numbytes = 0; + int filesizes[numrequeststosend]; + int fdarray[numrequeststosend]; + int index=0; + for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) { + struct stat st; + char *filename=getSlotFileName(seqn); + if (stat(filename, &st) == 0) { + fdarray[index]=open(filename, O_RDONLY); + filesizes[index]=st.st_size; + numbytes+=filesizes[index]; + } else { + fdarray[index]=-1; + filesizes[index]=0; + } + delete filename; + } + const char header[]="getdata"; + long long size=sizeof(header)+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes + char * response = new char[size]; + long long offset=0; + memcpy(response, header, sizeof(header)); + offset+=sizeof(header); + memcpy(response + offset, &numrequeststosend, sizeof(numrequeststosend)); + offset+=sizeof(numrequeststosend); + for(int i=0;i=0) { + doRead(fdarray[i], response+offset, filesizes[i]); + offset+=filesizes[i]; + } + } + + //write response out + sendResponse(response, size); + + //cleanup + delete response; + for(int i=0;i= 0) + close(fdarray[i]); + } +} + +void IoTQuery::putSlot() { + if (requestsequencenumber!=(newestentry+1)) { + getSlot(); + return; + } + + int numberofliveslots=(int) ((newestentry-oldestentry)+1); + if (numberofliveslots >= numqueueentries) { + //need to drop slot + removeOldestSlot(); + } + //write slot data out to file + char *filename = getSlotFileName(requestsequencenumber); + int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR); + doWrite(slotfd, data, length); + close(slotfd); + delete filename; + newestentry = requestsequencenumber; // update sequence number + updateStatusFile(); // update counts + char command[]="putdata"; + sendResponse(command, sizeof(command)); +} + +void IoTQuery::sendResponse(char * bytes, int len) { + cout << "Accept-Ranges: bytes\r\n" + << "Content-Length: " << len << "\r\n" + << "\r\n"; + cout.write(bytes, len); +} + +char * IoTQuery::getSlotFileName(long long slot) { + int directorylen=strlen(directory); + char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination + snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot); + return filename; +} + +void IoTQuery::removeOldestSlot() { + if (oldestentry!=0) { + char * filename=getSlotFileName(oldestentry); + unlink(filename); + delete filename; + } + oldestentry++; } +void IoTQuery::processQuery() { + getQuery(); + getDirectory(); + readData(); + + if (strncmp(method, "POST", 4) != 0) + return; + + if (directory == NULL || + !checkDirectory()) + return; + + if (!openStatusFile()) + return; -void IoTQuery::parseQuery() { - uri = FCGX_GetParam(uri_str, request->envp); - query = FCGX_GetParam(query_str, request->envp); - method = FCGX_GetParam(method_str, request->envp); - iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp); + flock(fd, LOCK_EX); - char * reqlength = FCGX_GetParam(length_str, request->envp); - if (length) { - length=strtol(reqlength, NULL, 10); - } else { - length=0; - } + decodeQuery(); + + if (reqGetSlot) + getSlot(); + else if (reqPutSlot) + putSlot(); + else return; +} + +void IoTQuery::readData() { + if (length) { + data = new char[length+1]; + memset(data, 0, length+1); + cin.read(data, length); + } + do { + char dummy; + cin >> dummy; + } while (!cin.eof()); +} + +void IoTQuery::getQuery() { + uri = FCGX_GetParam(uri_str, request->envp); + query = FCGX_GetParam(query_str, request->envp); + method = FCGX_GetParam(method_str, request->envp); + iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp); + + char * reqlength = FCGX_GetParam(length_str, request->envp); + if (length) { + length=strtoll(reqlength, NULL, 10); + } else { + length=0; + } } void IoTQuery::getDirectory() { - char * split = strchr((char *)uri, '?'); - if (split == NULL) - return; - int split_len = (int) (split-uri); - int rootdir_len = strlen(iotcloudroot); - int directory_len = split_len + rootdir_len + 1; - directory = new char[directory_len]; - memcpy(directory, iotcloudroot, rootdir_len); - memcpy(directory + rootdir_len, uri, split_len); - directory[directory_len]=0; -} - -int IoTQuery::openMaxFile() { - char maxfile[]="queuesize"; - int len=strlen(directory); - - char * filename=new char[len+sizeof(maxfile)+2]; - memcpy(filename, directory, len); - filename[len]='/'; - memcpy(filename+len+1, maxfile, sizeof(maxfile)); - filename[len+sizeof(maxfile)+1]=0; - fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR); - delete filename; - return fd; + char * split = strchr((char *)uri, '?'); + if (split == NULL) + return; + int split_len = (int) (split-uri); + int rootdir_len = strlen(iotcloudroot); + int directory_len = split_len + rootdir_len + 1; + directory = new char[directory_len]; + memcpy(directory, iotcloudroot, rootdir_len); + memcpy(directory + rootdir_len, uri, split_len); + directory[directory_len]=0; +} + +int doread(int fd, void *ptr, size_t count, off_t offset) { + do { + size_t bytesread=pread(fd, ptr, count, offset); + if (bytesread==count) { + return 1; + } else if (bytesread==0) { + return 0; + } + } while(1); } + +void IoTQuery::updateStatusFile() { + pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX); + pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD); + pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW); +} + +bool IoTQuery::openStatusFile() { + char statusfile[]="queuestatus"; + int len=strlen(directory); + + char * filename=new char[len+sizeof(statusfile)+2]; + memcpy(filename, directory, len); + filename[len]='/'; + memcpy(filename+len+1, statusfile, sizeof(statusfile)); + filename[len+sizeof(statusfile)+1]=0; + fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR); + delete filename; + + if (fd < 0) + return false; + + int size; + int needwrite=0; + if (doread(fd, &size, sizeof(size), OFFSET_MAX)) + numqueueentries=size; + else + needwrite=1; + + long long entry; + if (doread(fd, &entry, sizeof(entry), OFFSET_OLD)) + oldestentry=entry; + else + needwrite=1; + + if (doread(fd, &entry, sizeof(entry), OFFSET_NEW)) + newestentry=entry; + else + needwrite=1; + + if (needwrite) + updateStatusFile(); + + return true; +} + +