const char * length_str="CONTENT_LENGTH";
IoTQuery::IoTQuery(FCGX_Request *request) :
- request(request),
- data(NULL),
- directory(NULL),
- uri(NULL),
- query(NULL),
- method(NULL),
- iotcloudroot(NULL),
- dir(NULL),
- length(0),
- fd(-1)
-{
+ request(request),
+ data(NULL),
+ directory(NULL),
+ uri(NULL),
+ query(NULL),
+ method(NULL),
+ iotcloudroot(NULL),
+ length(0),
+ oldestentry(0),
+ newestentry(0),
+ requestsequencenumber(0),
+ numqueueentries(DEFAULT_SIZE),
+ fd(-1) {
}
-
IoTQuery::~IoTQuery() {
- if (fd >= 0) {
- close(fd);
- }
- if (directory)
- delete directory;
- if (data)
- delete data;
- if (dir != NULL)
- closedir(dir);
-}
-
-void IoTQuery::processQuery() {
- parseQuery();
- getDirectory();
- readData();
-
- if (strncmp(method, "POST", 4) != 0)
- return;
-
- if (directory == NULL ||
- (dir = opendir(directory)) == NULL)
- return;
-
- if (openMaxFile() < 0)
- return;
-
- flock(fd, LOCK_EX);
-
- cout << "Content-type: text/html\r\n"
- << "\r\n"
- << "<html>\n"
- << " <head>\n"
- << " <title>Hello, World!</title>\n"
- << " </head>\n"
- << " <body>\n"
- << " <h1>Hello, World!</h1>\n"
- << " </body>\n";
-
- cout << uri_str << " " << uri << "\n";
- cout << query_str << " " << query << "\n";
- cout << method_str << " " << method << "\n";
- cout << iotcloudroot_str << " " << iotcloudroot << "\n";
- if (data)
- cout << "[" << data << "]";
-
-
-
- cout << "</html>\n";
+ if (fd >= 0)
+ close(fd);
+ if (directory)
+ delete directory;
+ if (data)
+ delete data;
}
+bool IoTQuery::checkDirectory() {
+ struct stat s;
+ int err=stat(directory, &s);
+ if (-1 == err)
+ return false;
+ return S_ISDIR(s.st_mode);
+}
-void IoTQuery::readData() {
- if (length) {
- data = new char[length+1];
- memset(data, 0, length+1);
- cin.read(data, length);
- }
- do {
- char dummy;
- cin >> dummy;
- } while (!cin.eof());
+void IoTQuery::decodeQuery() {
+ int len=strlen(query);
+ char * str=new char[len+1];
+ memcpy(str, query, len+1);
+ char *tok_ptr=str;
+
+ /* Parse commands */
+ char *command=strsep(&tok_ptr, "&");
+ if (strncmp(command, "putslot", 7) == 0)
+ reqPutSlot = true;
+
+ if (strncmp(command, "getslot", 7) == 0)
+ reqGetSlot = true;
+
+ /* Load Sequence Number for request */
+ char *sequencenumber_str = strsep(&tok_ptr, "&");
+
+ if (sequencenumber_str != NULL)
+ requestsequencenumber = strtoll(sequencenumber_str, NULL, 10);
+
+ /* Update size if we get request */
+ char * numqueueentries_str = tok_ptr;
+ if (numqueueentries_str != NULL)
+ numqueueentries = strtoll(numqueueentries_str, NULL, 10);
+
+ delete str;
+}
+
+void doWrite(int fd, char *data, long long length) {
+ long long offset=0;
+ do {
+ long long byteswritten=write(fd, &data[offset], length);
+ if (byteswritten > 0) {
+ length -= byteswritten;
+ offset += byteswritten;
+ } else {
+ cerr << "Bytes not written";
+ return;
+ }
+ } while(length != 0);
+}
+
+bool doRead(int fd, void *buf, int numbytes) {
+ int offset=0;
+ char *ptr=(char *)buf;
+ do {
+ int bytesread=read(fd, ptr+offset, numbytes);
+ if (bytesread > 0) {
+ offset += bytesread;
+ numbytes -= bytesread;
+ } else
+ return false;
+ } while (numbytes!=0);
+ return true;
+}
+
+void IoTQuery::getSlot() {
+ int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
+ long long numbytes = 0;
+ int filesizes[numrequeststosend];
+ int fdarray[numrequeststosend];
+ int index=0;
+ for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+ struct stat st;
+ char *filename=getSlotFileName(seqn);
+ if (stat(filename, &st) == 0) {
+ fdarray[index]=open(filename, O_RDONLY);
+ filesizes[index]=st.st_size;
+ numbytes+=filesizes[index];
+ } else {
+ fdarray[index]=-1;
+ filesizes[index]=0;
+ }
+ delete filename;
+ }
+ const char header[]="getdata";
+ long long size=sizeof(header)+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
+ char * response = new char[size];
+ long long offset=0;
+ memcpy(response, header, sizeof(header));
+ offset+=sizeof(header);
+ memcpy(response + offset, &numrequeststosend, sizeof(numrequeststosend));
+ offset+=sizeof(numrequeststosend);
+ for(int i=0;i<numrequeststosend;i++) {
+ memcpy(response + offset, &filesizes[i], sizeof(int));
+ offset+=sizeof(int);
+ }
+
+ //copy file data
+ for(int i=0;i<numrequeststosend;i++) {
+ if (fdarray[i]>=0) {
+ doRead(fdarray[i], response+offset, filesizes[i]);
+ offset+=filesizes[i];
+ }
+ }
+
+ //write response out
+ sendResponse(response, size);
+
+ //cleanup
+ delete response;
+ for(int i=0;i<numrequeststosend;i++) {
+ if (fdarray[i] >= 0)
+ close(fdarray[i]);
+ }
+}
+
+void IoTQuery::putSlot() {
+ if (requestsequencenumber!=(newestentry+1)) {
+ getSlot();
+ return;
+ }
+
+ int numberofliveslots=(int) ((newestentry-oldestentry)+1);
+ if (numberofliveslots >= numqueueentries) {
+ //need to drop slot
+ removeOldestSlot();
+ }
+ //write slot data out to file
+ char *filename = getSlotFileName(requestsequencenumber);
+ int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
+ doWrite(slotfd, data, length);
+ close(slotfd);
+ delete filename;
+ newestentry = requestsequencenumber; // update sequence number
+ updateStatusFile(); // update counts
+ char command[]="putdata";
+ sendResponse(command, sizeof(command));
+}
+
+void IoTQuery::sendResponse(char * bytes, int len) {
+ cout << "Accept-Ranges: bytes\r\n"
+ << "Content-Length: " << len << "\r\n"
+ << "\r\n";
+ cout.write(bytes, len);
+}
+
+char * IoTQuery::getSlotFileName(long long slot) {
+ int directorylen=strlen(directory);
+ char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination
+ snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
+ return filename;
+}
+
+void IoTQuery::removeOldestSlot() {
+ if (oldestentry!=0) {
+ char * filename=getSlotFileName(oldestentry);
+ unlink(filename);
+ delete filename;
+ }
+ oldestentry++;
}
+void IoTQuery::processQuery() {
+ getQuery();
+ getDirectory();
+ readData();
+
+ if (strncmp(method, "POST", 4) != 0)
+ return;
+
+ if (directory == NULL ||
+ !checkDirectory())
+ return;
+
+ if (!openStatusFile())
+ return;
-void IoTQuery::parseQuery() {
- uri = FCGX_GetParam(uri_str, request->envp);
- query = FCGX_GetParam(query_str, request->envp);
- method = FCGX_GetParam(method_str, request->envp);
- iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+ flock(fd, LOCK_EX);
- char * reqlength = FCGX_GetParam(length_str, request->envp);
- if (length) {
- length=strtol(reqlength, NULL, 10);
- } else {
- length=0;
- }
+ decodeQuery();
+
+ if (reqGetSlot)
+ getSlot();
+ else if (reqPutSlot)
+ putSlot();
+ else return;
+}
+
+void IoTQuery::readData() {
+ if (length) {
+ data = new char[length+1];
+ memset(data, 0, length+1);
+ cin.read(data, length);
+ }
+ do {
+ char dummy;
+ cin >> dummy;
+ } while (!cin.eof());
+}
+
+void IoTQuery::getQuery() {
+ uri = FCGX_GetParam(uri_str, request->envp);
+ query = FCGX_GetParam(query_str, request->envp);
+ method = FCGX_GetParam(method_str, request->envp);
+ iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+
+ char * reqlength = FCGX_GetParam(length_str, request->envp);
+ if (length) {
+ length=strtoll(reqlength, NULL, 10);
+ } else {
+ length=0;
+ }
}
void IoTQuery::getDirectory() {
- char * split = strchr((char *)uri, '?');
- if (split == NULL)
- return;
- int split_len = (int) (split-uri);
- int rootdir_len = strlen(iotcloudroot);
- int directory_len = split_len + rootdir_len + 1;
- directory = new char[directory_len];
- memcpy(directory, iotcloudroot, rootdir_len);
- memcpy(directory + rootdir_len, uri, split_len);
- directory[directory_len]=0;
-}
-
-int IoTQuery::openMaxFile() {
- char maxfile[]="queuesize";
- int len=strlen(directory);
-
- char * filename=new char[len+sizeof(maxfile)+2];
- memcpy(filename, directory, len);
- filename[len]='/';
- memcpy(filename+len+1, maxfile, sizeof(maxfile));
- filename[len+sizeof(maxfile)+1]=0;
- fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
- delete filename;
- return fd;
+ char * split = strchr((char *)uri, '?');
+ if (split == NULL)
+ return;
+ int split_len = (int) (split-uri);
+ int rootdir_len = strlen(iotcloudroot);
+ int directory_len = split_len + rootdir_len + 1;
+ directory = new char[directory_len];
+ memcpy(directory, iotcloudroot, rootdir_len);
+ memcpy(directory + rootdir_len, uri, split_len);
+ directory[directory_len]=0;
+}
+
+int doread(int fd, void *ptr, size_t count, off_t offset) {
+ do {
+ size_t bytesread=pread(fd, ptr, count, offset);
+ if (bytesread==count) {
+ return 1;
+ } else if (bytesread==0) {
+ return 0;
+ }
+ } while(1);
}
+
+void IoTQuery::updateStatusFile() {
+ pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
+ pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
+ pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
+}
+
+bool IoTQuery::openStatusFile() {
+ char statusfile[]="queuestatus";
+ int len=strlen(directory);
+
+ char * filename=new char[len+sizeof(statusfile)+2];
+ memcpy(filename, directory, len);
+ filename[len]='/';
+ memcpy(filename+len+1, statusfile, sizeof(statusfile));
+ filename[len+sizeof(statusfile)+1]=0;
+ fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
+ delete filename;
+
+ if (fd < 0)
+ return false;
+
+ int size;
+ int needwrite=0;
+ if (doread(fd, &size, sizeof(size), OFFSET_MAX))
+ numqueueentries=size;
+ else
+ needwrite=1;
+
+ long long entry;
+ if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
+ oldestentry=entry;
+ else
+ needwrite=1;
+
+ if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
+ newestentry=entry;
+ else
+ needwrite=1;
+
+ if (needwrite)
+ updateStatusFile();
+
+ return true;
+}
+
+