delete data;
}
+/**
+ * Returns true if the account directory exists.
+ */
+
bool IoTQuery::checkDirectory() {
struct stat s;
int err=stat(directory, &s);
return S_ISDIR(s.st_mode);
}
+/**
+ * Decodes query string from client. Extracts type of request,
+ * sequence number, and whether the request changes the number of
+ * slots.
+ */
+
void IoTQuery::decodeQuery() {
int len=strlen(query);
char * str=new char[len+1];
}
}
- //don't allow a really old sequence number
+ /* don't allow a really old sequence number */
if (requestsequencenumber < oldestentry)
requestsequencenumber = oldestentry;
delete str;
}
+/**
+ * Helper function to write data to file.
+ */
+
void doWrite(int fd, char *data, long long length) {
long long offset=0;
do {
} while(length != 0);
}
+/** Helper function to read data from file. */
bool doRead(int fd, void *buf, int numbytes) {
int offset=0;
char *ptr=(char *)buf;
return true;
}
+/**
+ * Function that handles a getSlot request.
+ */
+
void IoTQuery::getSlot() {
int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
if (numrequeststosend < 0)
delete filename;
}
const char header[]="getslot";
- long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
+
+ /* Size is the header + the payload + space for number of requests
+ plus sizes of each slot */
+
+ long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;
char * response = new char[size];
long long offset=0;
memcpy(response, header, sizeof(header)-1);
offset+=sizeof(int);
}
- //copy file data
+ /* Read the file data into the buffer */
for(int i=0; i<numrequeststosend; i++) {
if (fdarray[i]>=0) {
doRead(fdarray[i], response+offset, filesizes[i]);
}
}
- //write response out
+ /* Send the response out to the webserver. */
sendResponse(response, size);
- //cleanup
+ /* Delete the response buffer and close the files. */
delete response;
for(int i=0; i<numrequeststosend; i++) {
if (fdarray[i] >= 0)
}
}
+/**
+ * The method putSlot handles a putSlot request from the client
+ */
+
void IoTQuery::putSlot() {
+ /* Check if the request is stale and send update in that case. This
+ servers as an implicit failure of the request. */
if (requestsequencenumber!=(newestentry+1)) {
getSlot();
return;
}
+ /* See if we have too many slots and if so, delete the old one */
int numberofliveslots=(int) ((newestentry-oldestentry)+1);
if (numberofliveslots >= numqueueentries) {
- //need to drop slot
removeOldestSlot();
}
- //write slot data out to file
+ /* Write the slot data we received to a SLOT file */
char *filename = getSlotFileName(requestsequencenumber);
int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
doWrite(slotfd, data, length);
close(slotfd);
delete filename;
- newestentry = requestsequencenumber; // update sequence number
- updateStatusFile(); // update counts
+ newestentry = requestsequencenumber;
+
+ /* Update the seuqence numbers and other status file information. */
+ updateStatusFile();
+
+ /* Send response acknowledging success */
char command[]="putslot";
sendResponse(command, sizeof(command)-1);
}
+/**
+ * Method sends response. It wraps in appropriate headers for web
+ * server.
+ */
+
void IoTQuery::sendResponse(char * bytes, int len) {
cout << "Accept-Ranges: bytes\r\n"
<< "Content-Length: " << len << "\r\n"
cout.write(bytes, len);
}
-char * IoTQuery::getSlotFileName(long long slot) {
+/**
+ * Computes the name for a slot file for the given sequence number.
+ */
+
+char * IoTQuery::getSlotFileName(long long seqnum) {
int directorylen=strlen(directory);
- char * filename=new char[25+directorylen]; //19 digits for long number + 4 characters for SLOT + 1 character for null termination
- snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
+
+ /* Size is 19 digits for ASCII representation of a long + 4
+ characters for SLOT string + 1 character for null termination +
+ directory size*/
+
+ char * filename=new char[25+directorylen];
+ snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, seqnum);
return filename;
}
+/**
+ * Removes the oldest slot file
+ */
+
void IoTQuery::removeOldestSlot() {
if (oldestentry!=0) {
char * filename=getSlotFileName(oldestentry);
oldestentry++;
}
+/**
+ * Processes the query sent to the fastcgi handler.
+ */
+
void IoTQuery::processQuery() {
getQuery();
getDirectory();
readData();
+ /* Verify that we receive a post request. */
if (strncmp(method, "POST", 4) != 0) {
cerr << "Not POST Request" << endl;
return;
}
+ /* Make sure the directory is okay. */
if (directory == NULL ||
!checkDirectory()) {
cerr << "Directory " << directory << " does not exist" << endl;
return;
}
+ /* Get queue state from the status file. If it doesn't exist,
+ create it. */
if (!openStatusFile()) {
cerr << "Failed to open status file" << endl;
return;
}
+ /* Lock status file to keep other requests out. */
flock(fd, LOCK_EX);
+ /* Decode query. */
decodeQuery();
+ /* Handle request. */
if (reqGetSlot)
getSlot();
else if (reqPutSlot)
}
}
+/**
+ * Reads in data for request. This is used for the slot to be
+ * inserted.
+ */
+
void IoTQuery::readData() {
if (length) {
data = new char[length+1];
} while (!cin.eof());
}
+
+/**
+ * Reads relevant environmental variables to find out the request.
+ */
+
void IoTQuery::getQuery() {
uri = FCGX_GetParam(uri_str, request->envp);
query = FCGX_GetParam(query_str, request->envp);
method = FCGX_GetParam(method_str, request->envp);
iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+ /** We require the content-length header to be sent. */
char * reqlength = FCGX_GetParam(length_str, request->envp);
if (reqlength) {
length=strtoll(reqlength, NULL, 10);
}
}
+/**
+ * Initializes directory field from environmental variables.
+ */
+
void IoTQuery::getDirectory() {
char * split = strchr((char *)uri, '?');
if (split == NULL)
directory[directory_len-1]=0;
}
+/**
+ * Helper function that is used to read the status file.
+ */
+
int doread(int fd, void *ptr, size_t count, off_t offset) {
do {
size_t bytesread=pread(fd, ptr, count, offset);
} while(1);
}
+
+/**
+ * Writes the current state to the status file.
+ */
+
void IoTQuery::updateStatusFile() {
pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
}
+/**
+ * Reads in queue state from the status file. Returns true if
+ * successful.
+ */
+
bool IoTQuery::openStatusFile() {
char statusfile[]="queuestatus";
int len=strlen(directory);
return false;
}
+ /* Read in queue size, oldest sequence number, and newest sequence number. */
int size;
int needwrite=0;
if (doread(fd, &size, sizeof(size), OFFSET_MAX))