10 #include <netinet/in.h>
14 const char * query_str="QUERY_STRING";
15 const char * uri_str="REQUEST_URI";
16 const char * method_str="REQUEST_METHOD";
17 const char * iotcloudroot_str="IOTCLOUD_ROOT";
18 const char * length_str="CONTENT_LENGTH";
20 IoTQuery::IoTQuery(FCGX_Request *request) :
31 requestsequencenumber(0),
32 numqueueentries(DEFAULT_SIZE),
38 IoTQuery::~IoTQuery() {
48 * Returns true if the account directory exists.
51 bool IoTQuery::checkDirectory() {
53 int err=stat(directory, &s);
56 return S_ISDIR(s.st_mode);
60 * Decodes query string from client. Extracts type of request,
61 * sequence number, and whether the request changes the number of
65 void IoTQuery::decodeQuery() {
66 int len=strlen(query);
67 char * str=new char[len+1];
68 memcpy(str, query, len+1);
72 char *command=strsep(&tok_ptr, "&");
73 if (strncmp(command, "req=putslot", 11) == 0)
76 if (strncmp(command, "req=getslot", 11) == 0)
79 /* Load Sequence Number for request */
80 char *sequencenumber_str = strsep(&tok_ptr, "&");
81 if (sequencenumber_str != NULL &&
82 strncmp(sequencenumber_str, "seq=", 4) == 0) {
83 sequencenumber_str = strchr(sequencenumber_str, '=');
84 if (sequencenumber_str != NULL) {
85 requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
89 /* don't allow a really old sequence number */
90 if (requestsequencenumber < oldestentry)
91 requestsequencenumber = oldestentry;
93 /* Update size if we get request */
94 char * numqueueentries_str = tok_ptr;
95 if (numqueueentries_str != NULL &&
96 strncmp(numqueueentries_str, "max=", 4) == 0) {
97 numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
98 numqueueentries = strtoll(numqueueentries_str, NULL, 10);
105 * Helper function to write data to file.
108 void doWrite(int fd, char *data, long long length) {
111 long long byteswritten=write(fd, &data[offset], length);
112 if (byteswritten > 0) {
113 length -= byteswritten;
114 offset += byteswritten;
116 cerr << "Bytes not written" << endl;
117 if (byteswritten < 0) {
118 cerr << strerror(errno) << " error writing slot file" << endl;
122 } while(length != 0);
125 /** Helper function to read data from file. */
126 bool doRead(int fd, void *buf, int numbytes) {
128 char *ptr=(char *)buf;
130 int bytesread=read(fd, ptr+offset, numbytes);
133 numbytes -= bytesread;
136 } while (numbytes!=0);
141 * Function that handles a getSlot request.
144 void IoTQuery::getSlot() {
145 int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
146 if (numrequeststosend < 0)
147 numrequeststosend = 0;
148 long long numbytes = 0;
149 int filesizes[numrequeststosend];
150 int fdarray[numrequeststosend];
152 for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
154 char *filename=getSlotFileName(seqn);
155 if (stat(filename, &st) == 0) {
156 fdarray[index]=open(filename, O_RDONLY);
157 filesizes[index]=st.st_size;
158 numbytes+=filesizes[index];
165 const char header[]="getslot";
167 /* Size is the header + the payload + space for number of requests
168 plus sizes of each slot */
170 long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;
171 char * response = new char[size];
173 memcpy(response, header, sizeof(header)-1);
174 offset+=sizeof(header)-1;
175 int numreq=htonl(numrequeststosend);
176 memcpy(response + offset, &numreq, sizeof(numreq));
177 offset+=sizeof(numrequeststosend);
178 for(int i=0; i<numrequeststosend; i++) {
179 int filesize=htonl(filesizes[i]);
180 memcpy(response + offset, &filesize, sizeof(filesize));
184 /* Read the file data into the buffer */
185 for(int i=0; i<numrequeststosend; i++) {
187 doRead(fdarray[i], response+offset, filesizes[i]);
188 offset+=filesizes[i];
192 /* Send the response out to the webserver. */
193 sendResponse(response, size);
195 /* Delete the response buffer and close the files. */
197 for(int i=0; i<numrequeststosend; i++) {
204 * The method putSlot handles a putSlot request from the client
207 void IoTQuery::putSlot() {
208 /* Check if the request is stale and send update in that case. This
209 servers as an implicit failure of the request. */
210 if (requestsequencenumber!=(newestentry+1)) {
215 /* See if we have too many slots and if so, delete the old one */
216 int numberofliveslots=(int) ((newestentry-oldestentry)+1);
217 if (numberofliveslots >= numqueueentries) {
221 /* Write the slot data we received to a SLOT file */
222 char *filename = getSlotFileName(requestsequencenumber);
223 int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
224 doWrite(slotfd, data, length);
227 newestentry = requestsequencenumber;
229 /* Update the seuqence numbers and other status file information. */
232 /* Send response acknowledging success */
233 char command[]="putslot";
234 sendResponse(command, sizeof(command)-1);
238 * Method sends response. It wraps in appropriate headers for web
242 void IoTQuery::sendResponse(char * bytes, int len) {
243 cout << "Accept-Ranges: bytes\r\n"
244 << "Content-Length: " << len << "\r\n"
246 cout.write(bytes, len);
250 * Computes the name for a slot file for the given sequence number.
253 char * IoTQuery::getSlotFileName(long long seqnum) {
254 int directorylen=strlen(directory);
256 /* Size is 19 digits for ASCII representation of a long + 4
257 characters for SLOT string + 1 character for null termination +
260 char * filename=new char[25+directorylen];
261 snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, seqnum);
266 * Removes the oldest slot file
269 void IoTQuery::removeOldestSlot() {
270 if (oldestentry!=0) {
271 char * filename=getSlotFileName(oldestentry);
279 * Processes the query sent to the fastcgi handler.
282 void IoTQuery::processQuery() {
287 /* Verify that we receive a post request. */
288 if (strncmp(method, "POST", 4) != 0) {
289 cerr << "Not POST Request" << endl;
293 /* Make sure the directory is okay. */
294 if (directory == NULL ||
296 cerr << "Directory " << directory << " does not exist" << endl;
300 /* Get queue state from the status file. If it doesn't exist,
302 if (!openStatusFile()) {
303 cerr << "Failed to open status file" << endl;
307 /* Lock status file to keep other requests out. */
313 /* Handle request. */
319 cerr << "No recognized request" << endl;
325 * Reads in data for request. This is used for the slot to be
329 void IoTQuery::readData() {
331 data = new char[length+1];
332 memset(data, 0, length+1);
333 cin.read(data, length);
338 } while (!cin.eof());
343 * Reads relevant environmental variables to find out the request.
346 void IoTQuery::getQuery() {
347 uri = FCGX_GetParam(uri_str, request->envp);
348 query = FCGX_GetParam(query_str, request->envp);
349 method = FCGX_GetParam(method_str, request->envp);
350 iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
352 /** We require the content-length header to be sent. */
353 char * reqlength = FCGX_GetParam(length_str, request->envp);
355 length=strtoll(reqlength, NULL, 10);
362 * Initializes directory field from environmental variables.
365 void IoTQuery::getDirectory() {
366 char * split = strchr((char *)uri, '?');
369 int split_len = (int) (split-uri);
370 int rootdir_len = strlen(iotcloudroot);
371 int directory_len = split_len + rootdir_len + 1;
372 directory = new char[directory_len];
373 memcpy(directory, iotcloudroot, rootdir_len);
374 memcpy(directory + rootdir_len, uri, split_len);
375 directory[directory_len-1]=0;
379 * Helper function that is used to read the status file.
382 int doread(int fd, void *ptr, size_t count, off_t offset) {
384 size_t bytesread=pread(fd, ptr, count, offset);
385 if (bytesread==count) {
387 } else if (bytesread==0) {
395 * Writes the current state to the status file.
398 void IoTQuery::updateStatusFile() {
399 pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
400 pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
401 pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
405 * Reads in queue state from the status file. Returns true if
409 bool IoTQuery::openStatusFile() {
410 char statusfile[]="queuestatus";
411 int len=strlen(directory);
413 char * filename=new char[len+sizeof(statusfile)+2];
414 memcpy(filename, directory, len);
416 memcpy(filename+len+1, statusfile, sizeof(statusfile));
417 filename[len+sizeof(statusfile)+1]=0;
418 fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
422 cerr << strerror(errno) << " error opening statusfile" << endl;
426 /* Read in queue size, oldest sequence number, and newest sequence number. */
429 if (doread(fd, &size, sizeof(size), OFFSET_MAX))
430 numqueueentries=size;
435 if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
440 if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))