#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
using namespace std;
query(NULL),
method(NULL),
iotcloudroot(NULL),
- dir(NULL),
length(0),
- firstentry(0),
- lastentry(0),
+ oldestentry(0),
+ newestentry(0),
requestsequencenumber(0),
numqueueentries(DEFAULT_SIZE),
- fd(-1) {
+ fd(-1),
+ reqGetSlot(false),
+ reqPutSlot(false) {
}
IoTQuery::~IoTQuery() {
delete directory;
if (data)
delete data;
- if (dir != NULL)
- closedir(dir);
}
bool IoTQuery::checkDirectory() {
char * str=new char[len+1];
memcpy(str, query, len+1);
char *tok_ptr=str;
-
+
/* Parse commands */
char *command=strsep(&tok_ptr, "&");
- if (strncmp(command, "putslot", 7) == 0)
+ if (strncmp(command, "req=putslot", 11) == 0)
reqPutSlot = true;
- if (strncmp(command, "getslot", 7) == 0)
+ if (strncmp(command, "req=getslot", 11) == 0)
reqGetSlot = true;
/* Load Sequence Number for request */
char *sequencenumber_str = strsep(&tok_ptr, "&");
-
- if (sequencenumber_str != NULL)
- requestsequencenumber = strtol(sequencenumber_str, NULL, 10);
-
+ if (sequencenumber_str != NULL &&
+ strncmp(sequencenumber_str, "seq=", 4) == 0) {
+ sequencenumber_str = strchr(sequencenumber_str, '=');
+ if (sequencenumber_str != NULL) {
+ requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+ }
+ }
+
/* Update size if we get request */
char * numqueueentries_str = tok_ptr;
- if (numqueueentries_str != NULL)
- numqueueentries = strtol(numqueueentries_str, NULL, 10);
-
+ if (numqueueentries_str != NULL &&
+ strncmp(numqueueentries_str, "max=", 4) == 0) {
+ numqueueentries_str = strchr(numqueueentries_str + 1, '=');
+ numqueueentries = strtoll(numqueueentries_str, NULL, 10);
+ }
+
delete str;
}
-void IoTQuery::getSlot() {
+void doWrite(int fd, char *data, long long length) {
+ long long offset=0;
+ do {
+ long long byteswritten=write(fd, &data[offset], length);
+ if (byteswritten > 0) {
+ length -= byteswritten;
+ offset += byteswritten;
+ } else {
+ cerr << "Bytes not written" << endl;
+ if (byteswritten < 0) {
+ cerr << strerror(errno) << " error writing slot file" << endl;
+ }
+ return;
+ }
+ } while(length != 0);
+}
+bool doRead(int fd, void *buf, int numbytes) {
+ int offset=0;
+ char *ptr=(char *)buf;
+ do {
+ int bytesread=read(fd, ptr+offset, numbytes);
+ if (bytesread > 0) {
+ offset += bytesread;
+ numbytes -= bytesread;
+ } else
+ return false;
+ } while (numbytes!=0);
+ return true;
+}
+
+void IoTQuery::getSlot() {
+ int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
+ if (numrequeststosend < 0)
+ numrequeststosend = 0;
+ long long numbytes = 0;
+ int filesizes[numrequeststosend];
+ int fdarray[numrequeststosend];
+ int index=0;
+ for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+ struct stat st;
+ char *filename=getSlotFileName(seqn);
+ if (stat(filename, &st) == 0) {
+ fdarray[index]=open(filename, O_RDONLY);
+ filesizes[index]=st.st_size;
+ numbytes+=filesizes[index];
+ } else {
+ fdarray[index]=-1;
+ filesizes[index]=0;
+ }
+ delete filename;
+ }
+ const char header[]="getslot";
+ long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
+ char * response = new char[size];
+ long long offset=0;
+ memcpy(response, header, sizeof(header)-1);
+ offset+=sizeof(header)-1;
+ int numreq=htonl(numrequeststosend);
+ cerr << numrequeststosend << " " << numreq << endl;
+ memcpy(response + offset, &numreq, sizeof(numreq));
+ offset+=sizeof(numrequeststosend);
+ for(int i=0;i<numrequeststosend;i++) {
+ int filesize=htonl(filesizes[i]);
+ memcpy(response + offset, &filesize, sizeof(filesize));
+ offset+=sizeof(int);
+ }
+
+ //copy file data
+ for(int i=0;i<numrequeststosend;i++) {
+ if (fdarray[i]>=0) {
+ doRead(fdarray[i], response+offset, filesizes[i]);
+ offset+=filesizes[i];
+ }
+ }
+
+ //write response out
+ sendResponse(response, size);
+
+ //cleanup
+ delete response;
+ for(int i=0;i<numrequeststosend;i++) {
+ if (fdarray[i] >= 0)
+ close(fdarray[i]);
+ }
}
void IoTQuery::putSlot() {
+ if (requestsequencenumber!=(newestentry+1)) {
+ getSlot();
+ return;
+ }
+ int numberofliveslots=(int) ((newestentry-oldestentry)+1);
+ if (numberofliveslots >= numqueueentries) {
+ //need to drop slot
+ removeOldestSlot();
+ }
+
+ //write slot data out to file
+ char *filename = getSlotFileName(requestsequencenumber);
+ int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
+ doWrite(slotfd, data, length);
+ close(slotfd);
+ delete filename;
+ newestentry = requestsequencenumber; // update sequence number
+ updateStatusFile(); // update counts
+ char command[]="putslot";
+ sendResponse(command, sizeof(command)-1);
+}
+
+void IoTQuery::sendResponse(char * bytes, int len) {
+ cout << "Accept-Ranges: bytes\r\n"
+ << "Content-Length: " << len << "\r\n"
+ << "\r\n";
+ cout.write(bytes, len);
+}
+
+char * IoTQuery::getSlotFileName(long long slot) {
+ int directorylen=strlen(directory);
+ char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination
+ snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
+ return filename;
+}
+
+void IoTQuery::removeOldestSlot() {
+ if (oldestentry!=0) {
+ char * filename=getSlotFileName(oldestentry);
+ unlink(filename);
+ delete filename;
+ }
+ oldestentry++;
}
void IoTQuery::processQuery() {
getDirectory();
readData();
- if (strncmp(method, "POST", 4) != 0)
+ if (strncmp(method, "POST", 4) != 0) {
+ cerr << "Not POST Request" << endl;
return;
+ }
if (directory == NULL ||
- !checkDirectory())
+ !checkDirectory()) {
+ cerr << "Directory " << directory << " does not exist" << endl;
return;
+ }
- if (!openStatusFile())
+ if (!openStatusFile()) {
+ cerr << "Failed to open status file" << endl;
return;
+ }
flock(fd, LOCK_EX);
decodeQuery();
-
+
if (reqGetSlot)
getSlot();
else if (reqPutSlot)
putSlot();
- else return;
+ else {
+ cerr << "No recognized request" << endl;
+ return;
+ }
}
void IoTQuery::readData() {
iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
char * reqlength = FCGX_GetParam(length_str, request->envp);
- if (length) {
- length=strtol(reqlength, NULL, 10);
+ if (reqlength) {
+ length=strtoll(reqlength, NULL, 10);
} else {
length=0;
}
directory = new char[directory_len];
memcpy(directory, iotcloudroot, rootdir_len);
memcpy(directory + rootdir_len, uri, split_len);
- directory[directory_len]=0;
+ directory[directory_len-1]=0;
}
int doread(int fd, void *ptr, size_t count, off_t offset) {
void IoTQuery::updateStatusFile() {
pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
- pwrite(fd, &firstentry, sizeof(firstentry), OFFSET_FIRST);
- pwrite(fd, &lastentry, sizeof(lastentry), OFFSET_LAST);
+ pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
+ pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
}
bool IoTQuery::openStatusFile() {
fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
delete filename;
- if (fd < 0)
+ if (fd < 0) {
+ cerr << strerror(errno) << " error opening statusfile" << endl;
return false;
-
+ }
+
int size;
int needwrite=0;
if (doread(fd, &size, sizeof(size), OFFSET_MAX))
else
needwrite=1;
- long entry;
- if (doread(fd, &entry, sizeof(entry), OFFSET_FIRST))
- firstentry=entry;
+ long long entry;
+ if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
+ oldestentry=entry;
else
needwrite=1;
- if (doread(fd, &entry, sizeof(entry), OFFSET_LAST))
- lastentry=entry;
+ if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
+ newestentry=entry;
else
needwrite=1;