--- /dev/null
+#include "iotquery.h"
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/file.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <netinet/in.h>
+
+using namespace std;
+
+const char * query_str = "QUERY_STRING";
+const char * uri_str = "REQUEST_URI";
+const char * method_str = "REQUEST_METHOD";
+const char * iotcloudroot_str = "IOTCLOUD_ROOT";
+const char * length_str = "CONTENT_LENGTH";
+
+IoTQuery::IoTQuery(FCGX_Request *request) :
+ request(request),
+ data(NULL),
+ directory(NULL),
+ uri(NULL),
+ query(NULL),
+ method(NULL),
+ iotcloudroot(NULL),
+ length(0),
+ oldestentry(0),
+ newestentry(0),
+ requestsequencenumber(0),
+ numqueueentries(DEFAULT_SIZE),
+ fd(-1),
+ reqGetSlot(false),
+ reqPutSlot(false),
+ reqSetSalt(false),
+ reqGetSalt(false) {
+}
+
+IoTQuery::~IoTQuery() {
+ if (fd >= 0)
+ close(fd);
+ if (directory)
+ delete directory;
+ if (data)
+ delete data;
+}
+
+/**
+ * Returns true if the account directory exists.
+ */
+
+bool IoTQuery::checkDirectory() {
+ struct stat s;
+ int err = stat(directory, &s);
+ if (-1 == err)
+ return false;
+ return S_ISDIR(s.st_mode);
+}
+
+/**
+ * Decodes query string from client. Extracts type of request,
+ * sequence number, and whether the request changes the number of
+ * slots.
+ */
+
+void IoTQuery::decodeQuery() {
+ int len = strlen(query);
+ char * str = new char[len + 1];
+ memcpy(str, query, len + 1);
+ char *tok_ptr = str;
+
+ /* Parse commands */
+ char *command = strsep(&tok_ptr, "&");
+ if (strncmp(command, "req=putslot", 11) == 0)
+ reqPutSlot = true;
+ else if (strncmp(command, "req=getslot", 11) == 0)
+ reqGetSlot = true;
+ else if (strncmp(command, "req=setsalt", 11) == 0)
+ reqSetSalt = true;
+ else if (strncmp(command, "req=getsalt", 11) == 0)
+ reqGetSalt = true;
+
+ /* Load Sequence Number for request */
+ char *sequencenumber_str = strsep(&tok_ptr, "&");
+ if (sequencenumber_str != NULL &&
+ strncmp(sequencenumber_str, "seq=", 4) == 0) {
+ sequencenumber_str = strchr(sequencenumber_str, '=');
+ if (sequencenumber_str != NULL) {
+ requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
+ }
+ }
+
+ /* don't allow a really old sequence number */
+ if (requestsequencenumber < oldestentry)
+ requestsequencenumber = oldestentry;
+
+ /* Update size if we get request */
+ char * numqueueentries_str = tok_ptr;
+ if (numqueueentries_str != NULL &&
+ strncmp(numqueueentries_str, "max=", 4) == 0) {
+ numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
+ numqueueentries = strtoll(numqueueentries_str, NULL, 10);
+ }
+
+ delete str;
+}
+
+/**
+ * Helper function to write data to file.
+ */
+
+void doWrite(int fd, char *data, long long length) {
+ long long offset = 0;
+ do {
+ long long byteswritten = write(fd, &data[offset], length);
+ if (byteswritten > 0) {
+ length -= byteswritten;
+ offset += byteswritten;
+ } else {
+ cerr << "Bytes not written" << endl;
+ if (byteswritten < 0) {
+ cerr << strerror(errno) << " error writing slot file" << endl;
+ }
+ return;
+ }
+ } while (length != 0);
+}
+
+/** Helper function to read data from file. */
+bool doRead(int fd, void *buf, int numbytes) {
+ int offset = 0;
+ char *ptr = (char *)buf;
+ do {
+ int bytesread = read(fd, ptr + offset, numbytes);
+ if (bytesread > 0) {
+ offset += bytesread;
+ numbytes -= bytesread;
+ } else
+ return false;
+ } while (numbytes != 0);
+ return true;
+}
+
+/**
+ * Function that handles a getSlot request.
+ */
+
+void IoTQuery::getSlot() {
+ int numrequeststosend = (int)((newestentry - requestsequencenumber) + 1);
+ if (numrequeststosend < 0)
+ numrequeststosend = 0;
+ long long numbytes = 0;
+ int filesizes[numrequeststosend];
+ int fdarray[numrequeststosend];
+ int index = 0;
+ for (long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+ struct stat st;
+ char *filename = getSlotFileName(seqn);
+ if (stat(filename, &st) == 0) {
+ fdarray[index] = open(filename, O_RDONLY);
+ filesizes[index] = st.st_size;
+ numbytes += filesizes[index];
+ } else {
+ fdarray[index] = -1;
+ filesizes[index] = 0;
+ }
+ delete filename;
+ }
+ const char header[] = "getslot";
+
+ /* Size is the header + the payload + space for number of requests
+ plus sizes of each slot */
+
+ long long size = sizeof(header) - 1 + sizeof(numrequeststosend) + 4 * numrequeststosend + numbytes;
+ char * response = new char[size];
+ long long offset = 0;
+ memcpy(response, header, sizeof(header) - 1);
+ offset += sizeof(header) - 1;
+ int numreq = htonl(numrequeststosend);
+ memcpy(response + offset, &numreq, sizeof(numreq));
+ offset += sizeof(numrequeststosend);
+ for (int i = 0; i < numrequeststosend; i++) {
+ int filesize = htonl(filesizes[i]);
+ memcpy(response + offset, &filesize, sizeof(filesize));
+ offset += sizeof(int);
+ }
+
+ /* Read the file data into the buffer */
+ for (int i = 0; i < numrequeststosend; i++) {
+ if (fdarray[i] >= 0) {
+ doRead(fdarray[i], response + offset, filesizes[i]);
+ offset += filesizes[i];
+ }
+ }
+
+ /* Send the response out to the webserver. */
+ sendResponse(response, size);
+
+ /* Delete the response buffer and close the files. */
+ delete response;
+ for (int i = 0; i < numrequeststosend; i++) {
+ if (fdarray[i] >= 0)
+ close(fdarray[i]);
+ }
+}
+
+/**
+ * The method setSalt handles a setSalt request from the client.
+ */
+void IoTQuery::setSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(saltfd, data, length);
+ char response[0];
+ sendResponse(response, 0);
+ close(saltfd);
+ delete filename;
+}
+
+/**
+ * The method getSalt handles a getSalt request from the client.
+ */
+
+void IoTQuery::getSalt() {
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSaltFileName();
+ int filesize = 0;
+ struct stat st;
+ if (stat(filename, &st) == 0) {
+ filesize = st.st_size;
+ } else {
+ char response[0];
+ sendResponse(response, 0);
+ delete filename;
+ return;
+ }
+ int saltfd = open(filename, O_RDONLY);
+ int responsesize = filesize + sizeof(int);
+ char * response = new char[responsesize];
+ doRead(saltfd, response + sizeof(int), filesize);
+ int n_filesize = htonl(filesize);
+ *((int*) response) = n_filesize;
+ sendResponse(response, responsesize);
+ close(saltfd);
+ delete filename;
+ delete response;
+}
+
+/**
+ * The method putSlot handles a putSlot request from the client
+ */
+
+void IoTQuery::putSlot() {
+ /* Check if the request is stale and send update in that case. This
+ servers as an implicit failure of the request. */
+ if (requestsequencenumber == 150)
+ {
+ if (requestsequencenumber != (newestentry + 1)) {
+
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSlotFileName(requestsequencenumber);
+ unlink(filename);
+ int slotfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(slotfd, data, length);
+ close(slotfd);
+ delete filename;
+
+ getSlot();
+ return;
+ }
+ }
+
+
+
+ if (requestsequencenumber != (newestentry + 1)) {
+ getSlot();
+ return;
+ }
+
+ /* See if we have too many slots and if so, delete the old one */
+ int numberofliveslots = (int) ((newestentry - oldestentry) + 1);
+ if (numberofliveslots >= numqueueentries) {
+ removeOldestSlot();
+ }
+
+ /* Write the slot data we received to a SLOT file */
+ char *filename = getSlotFileName(requestsequencenumber);
+ int slotfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(slotfd, data, length);
+ close(slotfd);
+ delete filename;
+ newestentry = requestsequencenumber;
+
+ /* Update the seuqence numbers and other status file information. */
+ updateStatusFile();
+
+ /* Send response acknowledging success */
+ char command[] = "putslot";
+ sendResponse(command, sizeof(command) - 1);
+}
+
+/**
+ * Method sends response. It wraps in appropriate headers for web
+ * server.
+ */
+
+void IoTQuery::sendResponse(char * bytes, int len) {
+ cout << "Accept-Ranges: bytes\r\n"
+ << "Content-Length: " << len << "\r\n"
+ << "\r\n";
+ cout.write(bytes, len);
+ cout << flush;
+}
+
+/**
+ * Computes the name for a slot file for the given sequence number.
+ */
+
+char * IoTQuery::getSlotFileName(long long seqnum) {
+ int directorylen = strlen(directory);
+
+ /* Size is 19 digits for ASCII representation of a long + 4
+ characters for SLOT string + 1 character for null termination +
+ directory size*/
+
+ char * filename = new char[25 + directorylen];
+ snprintf(filename, 25 + directorylen, "%s/SLOT%lld", directory, seqnum);
+ return filename;
+}
+
+/**
+ * Computes the name for a salt file
+ */
+
+char * IoTQuery::getSaltFileName() {
+ int directorylen = strlen(directory);
+
+ /* Size is 4 characters for SALT string + 1 character for null
+ termination + directory size*/
+
+ char * filename = new char[6 + directorylen];
+ snprintf(filename, 6 + directorylen, "%s/SALT", directory);
+ return filename;
+}
+
+/**
+ * Removes the oldest slot file
+ */
+
+void IoTQuery::removeOldestSlot() {
+ if (oldestentry != 0) {
+ char * filename = getSlotFileName(oldestentry);
+ unlink(filename);
+ delete filename;
+ }
+ oldestentry++;
+}
+
+/**
+ * Processes the query sent to the fastcgi handler.
+ */
+
+void IoTQuery::processQuery() {
+ getQuery();
+ getDirectory();
+ // readData();
+ if (!readData())
+ {
+ cerr << "No Data Available" << endl;
+ return;
+ }
+
+
+ /* Verify that we receive a post request. */
+ if (strncmp(method, "POST", 4) != 0) {
+ cerr << "Not POST Request" << endl;
+ return;
+ }
+
+ /* Make sure the directory is okay. */
+ if (directory == NULL ||
+ !checkDirectory()) {
+ cerr << "Directory " << directory << " does not exist" << endl;
+ return;
+ }
+
+ /* Get queue state from the status file. If it doesn't exist,
+ create it. */
+ if (!openStatusFile()) {
+ cerr << "Failed to open status file" << endl;
+ return;
+ }
+
+ /* Lock status file to keep other requests out. */
+ flock(fd, LOCK_EX);
+
+ /* Decode query. */
+ decodeQuery();
+
+ /* Handle request. */
+ if (reqGetSlot)
+ getSlot();
+ else if (reqPutSlot)
+ putSlot();
+ else if (reqSetSalt)
+ setSalt();
+ else if (reqGetSalt)
+ getSalt();
+ else {
+ cerr << "No recognized request" << endl;
+ return;
+ }
+}
+
+/**
+ * Reads in data for request. This is used for the slot to be
+ * inserted.
+ */
+
+bool IoTQuery::readData() {
+ if (length != 0) {
+ data = new char[length + 1];
+ memset(data, 0, length + 1);
+ cin.read(data, length);
+ }
+
+ do {
+ char dummy;
+ cin >> dummy;
+ } while (!cin.eof());
+
+ if (length != 0)
+ {
+ if (cin.gcount() != length)
+ {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+
+/**
+ * Reads relevant environmental variables to find out the request.
+ */
+
+void IoTQuery::getQuery() {
+ uri = FCGX_GetParam(uri_str, request->envp);
+ query = FCGX_GetParam(query_str, request->envp);
+ method = FCGX_GetParam(method_str, request->envp);
+ iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
+
+ /** We require the content-length header to be sent. */
+ char * reqlength = FCGX_GetParam(length_str, request->envp);
+ if (reqlength) {
+ length = strtoll(reqlength, NULL, 10);
+ } else {
+ length = 0;
+ }
+}
+
+/**
+ * Initializes directory field from environmental variables.
+ */
+
+void IoTQuery::getDirectory() {
+ char * split = strchr((char *)uri, '?');
+ if (split == NULL)
+ return;
+ int split_len = (int) (split - uri);
+ int rootdir_len = strlen(iotcloudroot);
+ int directory_len = split_len + rootdir_len + 1;
+ directory = new char[directory_len];
+ memcpy(directory, iotcloudroot, rootdir_len);
+ memcpy(directory + rootdir_len, uri, split_len);
+ directory[directory_len - 1] = 0;
+}
+
+/**
+ * Helper function that is used to read the status file.
+ */
+
+int doread(int fd, void *ptr, size_t count, off_t offset) {
+ do {
+ size_t bytesread = pread(fd, ptr, count, offset);
+ if (bytesread == count) {
+ return 1;
+ } else if (bytesread == 0) {
+ return 0;
+ }
+ } while (1);
+}
+
+
+/**
+ * Writes the current state to the status file.
+ */
+
+void IoTQuery::updateStatusFile() {
+ pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
+ pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
+ pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
+}
+
+/**
+ * Reads in queue state from the status file. Returns true if
+ * successful.
+ */
+
+bool IoTQuery::openStatusFile() {
+ char statusfile[] = "queuestatus";
+ int len = strlen(directory);
+
+ char * filename = new char[len + sizeof(statusfile) + 2];
+ memcpy(filename, directory, len);
+ filename[len] = '/';
+ memcpy(filename + len + 1, statusfile, sizeof(statusfile));
+ filename[len + sizeof(statusfile) + 1] = 0;
+ fd = open(filename, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
+ delete filename;
+
+ if (fd < 0) {
+ cerr << strerror(errno) << " error opening statusfile" << endl;
+ return false;
+ }
+
+ /* Read in queue size, oldest sequence number, and newest sequence number. */
+ int size;
+ int needwrite = 0;
+ if (doread(fd, &size, sizeof(size), OFFSET_MAX))
+ numqueueentries = size;
+ else
+ needwrite = 1;
+
+ long long entry;
+ if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
+ oldestentry = entry;
+ else
+ needwrite = 1;
+
+ if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
+ newestentry = entry;
+ else
+ needwrite = 1;
+
+ if (needwrite)
+ updateStatusFile();
+
+ return true;
+}
+
+