API Changes
[iotcloud.git] / version2 / src / server / iotquery.cpp
1 #include "iotquery.h"
2 #include <string.h>
3 #include <sys/stat.h>
4 #include <sys/types.h>
5 #include <sys/file.h>
6 #include <fcntl.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <errno.h>
10 #include <netinet/in.h>
11
12 using namespace std;
13
14 const char * query_str="QUERY_STRING";
15 const char * uri_str="REQUEST_URI";
16 const char * method_str="REQUEST_METHOD";
17 const char * iotcloudroot_str="IOTCLOUD_ROOT";
18 const char * length_str="CONTENT_LENGTH";
19
20 IoTQuery::IoTQuery(FCGX_Request *request) :
21         request(request),
22         data(NULL),
23         directory(NULL),
24         uri(NULL),
25         query(NULL),
26         method(NULL),
27         iotcloudroot(NULL),
28         length(0),
29         oldestentry(0),
30         newestentry(0),
31         requestsequencenumber(0),
32         numqueueentries(DEFAULT_SIZE),
33         fd(-1),
34         reqGetSlot(false),
35         reqPutSlot(false),
36         reqSetSalt(false),
37         reqGetSalt(false) {
38 }
39
40 IoTQuery::~IoTQuery() {
41         if (fd >= 0)
42                 close(fd);
43         if (directory)
44                 delete directory;
45         if (data)
46                 delete data;
47 }
48
49 /**
50  *  Returns true if the account directory exists.
51  */
52
53 bool IoTQuery::checkDirectory() {
54         struct stat s;
55         int err=stat(directory, &s);
56         if (-1 == err)
57                 return false;
58         return S_ISDIR(s.st_mode);
59 }
60
61 /**
62  * Decodes query string from client. Extracts type of request,
63  * sequence number, and whether the request changes the number of
64  * slots.
65  */
66
67 void IoTQuery::decodeQuery() {
68         int len=strlen(query);
69         char * str=new char[len+1];
70         memcpy(str, query, len+1);
71         char *tok_ptr=str;
72         
73         /* Parse commands */
74         char *command=strsep(&tok_ptr, "&");
75         if (strncmp(command, "req=putslot", 11) == 0)
76                 reqPutSlot = true;
77         else if (strncmp(command, "req=getslot", 11) == 0)
78                 reqGetSlot = true;
79         else if (strncmp(command, "req=setsalt", 11) == 0)
80                 reqSetSalt = true;
81         else if (strncmp(command, "req=getsalt", 11) == 0)
82                 reqGetSalt = true;
83
84         /* Load Sequence Number for request */
85         char *sequencenumber_str = strsep(&tok_ptr, "&");
86         if (sequencenumber_str != NULL &&
87                         strncmp(sequencenumber_str, "seq=", 4) == 0) {
88                 sequencenumber_str = strchr(sequencenumber_str, '=');
89                 if (sequencenumber_str != NULL) {
90                         requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
91                 }
92         }
93
94         /* don't allow a really old sequence number */
95         if (requestsequencenumber < oldestentry)
96                 requestsequencenumber = oldestentry;
97
98         /* Update size if we get request */
99         char * numqueueentries_str = tok_ptr;
100         if (numqueueentries_str != NULL &&
101                         strncmp(numqueueentries_str, "max=", 4) == 0) {
102                 numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
103                 numqueueentries = strtoll(numqueueentries_str, NULL, 10);
104         }
105
106         delete str;
107 }
108
109 /**
110  * Helper function to write data to file.
111  */
112
113 void doWrite(int fd, char *data, long long length) {
114         long long offset=0;
115         do {
116                 long long byteswritten=write(fd, &data[offset], length);
117                 if (byteswritten > 0) {
118                         length -= byteswritten;
119                         offset += byteswritten;
120                 } else {
121                         cerr << "Bytes not written" << endl;
122                         if (byteswritten < 0) {
123                                 cerr << strerror(errno) << " error writing slot file" << endl;
124                         }
125                         return;
126                 }
127         } while(length != 0);
128 }
129
130 /** Helper function to read data from file. */
131 bool doRead(int fd, void *buf, int numbytes) {
132         int offset=0;
133         char *ptr=(char *)buf;
134         do {
135                 int bytesread=read(fd, ptr+offset, numbytes);
136                 if (bytesread > 0) {
137                         offset += bytesread;
138                         numbytes -= bytesread;
139                 } else
140                         return false;
141         } while (numbytes!=0);
142         return true;
143 }
144
145 /**
146  * Function that handles a getSlot request.
147  */
148
149 void IoTQuery::getSlot() {
150         int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
151         if (numrequeststosend < 0)
152                 numrequeststosend = 0;
153         long long numbytes = 0;
154         int filesizes[numrequeststosend];
155         int fdarray[numrequeststosend];
156         int index=0;
157         for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
158                 struct stat st;
159                 char *filename=getSlotFileName(seqn);
160                 if (stat(filename, &st) == 0) {
161                         fdarray[index]=open(filename, O_RDONLY);
162                         filesizes[index]=st.st_size;
163                         numbytes+=filesizes[index];
164                 } else {
165                         fdarray[index]=-1;
166                         filesizes[index]=0;
167                 }
168                 delete filename;
169         }
170         const char header[]="getslot";
171
172         /* Size is the header + the payload + space for number of requests
173                  plus sizes of each slot */
174
175         long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;
176         char * response = new char[size];
177         long long offset=0;
178         memcpy(response, header, sizeof(header)-1);
179         offset+=sizeof(header)-1;
180         int numreq=htonl(numrequeststosend);
181         memcpy(response + offset, &numreq, sizeof(numreq));
182         offset+=sizeof(numrequeststosend);
183         for(int i=0; i<numrequeststosend; i++) {
184                 int filesize=htonl(filesizes[i]);
185                 memcpy(response + offset, &filesize, sizeof(filesize));
186                 offset+=sizeof(int);
187         }
188
189         /* Read the file data into the buffer */
190         for(int i=0; i<numrequeststosend; i++) {
191                 if (fdarray[i]>=0) {
192                         doRead(fdarray[i], response+offset, filesizes[i]);
193                         offset+=filesizes[i];
194                 }
195         }
196
197         /* Send the response out to the webserver. */
198         sendResponse(response, size);
199
200         /* Delete the response buffer and close the files. */
201         delete response;
202         for(int i=0; i<numrequeststosend; i++) {
203                 if (fdarray[i] >= 0)
204                         close(fdarray[i]);
205         }
206 }
207
208 /**
209  * The method setSalt handles a setSalt request from the client.
210  */
211
212 void IoTQuery::setSalt() {
213         /* Write the slot data we received to a SLOT file */
214         char *filename = getSaltFileName();
215         int saltfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
216         doWrite(saltfd, data, length);
217         char response[0];
218         sendResponse(response, 0);
219         close(saltfd);
220         delete filename;
221 }
222
223 /**
224  * The method getSalt handles a setSalt request from the client.
225  */
226
227 void IoTQuery::getSalt() {
228         /* Write the slot data we received to a SLOT file */
229         char *filename = getSaltFileName();
230         int filesize = 0;
231         struct stat st;
232         if (stat(filename, &st) == 0) {
233                 filesize=st.st_size;
234         } else {
235                 delete filename;
236                 return;
237         }
238         int saltfd = open(filename, O_RDONLY);
239         int responsesize = filesize + sizeof(int);
240         char * response = new char[responsesize];
241         doRead(saltfd, response+ sizeof(int), filesize);
242         int n_filesize=htonl(filesize);
243         *((int*) response) = n_filesize;
244         sendResponse(response, responsesize);
245         close(saltfd);
246         delete filename;
247         delete response;
248 }
249
250 /**
251  *      The method putSlot handles a putSlot request from the client
252  */
253
254 void IoTQuery::putSlot() {
255         /* Check if the request is stale and send update in that case.  This
256                  servers as an implicit failure of the request. */
257         if (requestsequencenumber!=(newestentry+1)) {
258                 getSlot();
259                 return;
260         }
261
262         /* See if we have too many slots and if so, delete the old one */
263         int numberofliveslots=(int) ((newestentry-oldestentry)+1);
264         if (numberofliveslots >=  numqueueentries) {
265                 removeOldestSlot();
266         }
267
268         /* Write the slot data we received to a SLOT file */
269         char *filename = getSlotFileName(requestsequencenumber);
270         int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
271         doWrite(slotfd, data, length);
272         close(slotfd);
273         delete filename;
274         newestentry = requestsequencenumber;
275
276         /* Update the seuqence numbers and other status file information. */
277         updateStatusFile();
278
279         /* Send response acknowledging success */
280         char command[]="putslot";
281         sendResponse(command, sizeof(command)-1);
282 }
283
284 /**
285  * Method sends response.  It wraps in appropriate headers for web
286  * server.
287  */
288
289 void IoTQuery::sendResponse(char * bytes, int len) {
290         cout << "Accept-Ranges: bytes\r\n"
291                          << "Content-Length: " << len << "\r\n"
292                          << "\r\n";
293         cout.write(bytes, len);
294 }
295
296 /**
297  *      Computes the name for a slot file for the given sequence number.
298  */
299
300 char * IoTQuery::getSlotFileName(long long seqnum) {
301         int directorylen=strlen(directory);
302
303         /* Size is 19 digits for ASCII representation of a long + 4
304                  characters for SLOT string + 1 character for null termination +
305                  directory size*/
306
307         char * filename=new char[25+directorylen];
308         snprintf(filename, 25+directorylen, "%s/SLOT%lld", directory, seqnum);
309         return filename;
310 }
311
312 /**
313  *      Computes the name for a salt file
314  */
315
316 char * IoTQuery::getSaltFileName() {
317         int directorylen=strlen(directory);
318
319         /* Size is 4 characters for SALT string + 1 character for null
320                  termination + directory size*/
321
322         char * filename=new char[6+directorylen];
323         snprintf(filename, 6+directorylen, "%s/SALT", directory);
324         return filename;
325 }
326
327 /**
328  *  Removes the oldest slot file
329  */
330
331 void IoTQuery::removeOldestSlot() {
332         if (oldestentry!=0) {
333                 char * filename=getSlotFileName(oldestentry);
334                 unlink(filename);
335                 delete filename;
336         }
337         oldestentry++;
338 }
339
340 /**
341  * Processes the query sent to the fastcgi handler.
342  */
343
344 void IoTQuery::processQuery() {
345         getQuery();
346         getDirectory();
347         readData();
348
349         /* Verify that we receive a post request. */
350         if (strncmp(method, "POST", 4) != 0) {
351                 cerr << "Not POST Request" << endl;
352                 return;
353         }
354
355         /* Make sure the directory is okay. */
356         if (directory == NULL ||
357                         !checkDirectory()) {
358                 cerr << "Directory " << directory << " does not exist" << endl;
359                 return;
360         }
361
362         /* Get queue state from the status file.  If it doesn't exist,
363                  create it. */
364         if (!openStatusFile()) {
365                 cerr << "Failed to open status file" << endl;
366                 return;
367         }
368
369         /* Lock status file to keep other requests out. */
370         flock(fd, LOCK_EX);
371
372         /* Decode query. */
373         decodeQuery();
374         
375         /* Handle request. */
376         if (reqGetSlot)
377                 getSlot();
378         else if (reqPutSlot)
379                 putSlot();
380         else if (reqSetSalt)
381                 setSalt();
382         else if (reqGetSalt)
383                 getSalt();
384         else {
385                 cerr << "No recognized request" << endl;
386                 return;
387         }
388 }
389
390 /**
391  * Reads in data for request.  This is used for the slot to be
392  * inserted.
393  */
394
395 void IoTQuery::readData() {
396         if (length) {
397                 data = new char[length+1];
398                 memset(data, 0, length+1);
399                 cin.read(data, length);
400         }
401         do {
402                 char dummy;
403                 cin >> dummy;
404         } while (!cin.eof());
405 }
406
407
408 /**
409  * Reads relevant environmental variables to find out the request.
410  */
411
412 void IoTQuery::getQuery() {
413         uri = FCGX_GetParam(uri_str, request->envp);
414         query = FCGX_GetParam(query_str, request->envp);
415         method = FCGX_GetParam(method_str, request->envp);
416         iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
417
418         /** We require the content-length header to be sent. */
419         char * reqlength = FCGX_GetParam(length_str, request->envp);
420         if (reqlength) {
421                 length=strtoll(reqlength, NULL, 10);
422         } else {
423                 length=0;
424         }
425 }
426
427 /**
428  *  Initializes directory field from environmental variables.
429  */
430
431 void IoTQuery::getDirectory() {
432         char * split = strchr((char *)uri, '?');
433         if (split == NULL)
434                 return;
435         int split_len = (int) (split-uri);
436         int rootdir_len = strlen(iotcloudroot);
437         int directory_len = split_len + rootdir_len + 1;
438         directory = new char[directory_len];
439         memcpy(directory, iotcloudroot, rootdir_len);
440         memcpy(directory + rootdir_len, uri, split_len);
441         directory[directory_len-1]=0;
442 }
443
444 /**
445  * Helper function that is used to read the status file.
446  */
447
448 int doread(int fd, void *ptr, size_t count, off_t offset) {
449         do {
450                 size_t bytesread=pread(fd, ptr, count, offset);
451                 if (bytesread==count) {
452                         return 1;
453                 } else if (bytesread==0) {
454                         return 0;
455                 }
456         } while(1);
457 }
458
459
460 /**
461  * Writes the current state to the status file.
462  */
463
464 void IoTQuery::updateStatusFile() {
465         pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
466         pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
467         pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
468 }
469
470 /**
471  * Reads in queue state from the status file.  Returns true if
472  * successful.
473  */
474
475 bool IoTQuery::openStatusFile() {
476         char statusfile[]="queuestatus";
477         int len=strlen(directory);
478
479         char * filename=new char[len+sizeof(statusfile)+2];
480         memcpy(filename, directory, len);
481         filename[len]='/';
482         memcpy(filename+len+1, statusfile, sizeof(statusfile));
483         filename[len+sizeof(statusfile)+1]=0;
484         fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
485         delete filename;
486
487         if (fd < 0) {
488                 cerr << strerror(errno) << " error opening statusfile" << endl;
489                 return false;
490         }
491
492         /* Read in queue size, oldest sequence number, and newest sequence number. */
493         int size;
494         int needwrite=0;
495         if (doread(fd, &size, sizeof(size), OFFSET_MAX))
496                 numqueueentries=size;
497         else
498                 needwrite=1;
499
500         long long entry;
501         if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
502                 oldestentry=entry;
503         else
504                 needwrite=1;
505
506         if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
507                 newestentry=entry;
508         else
509                 needwrite=1;
510
511         if (needwrite)
512                 updateStatusFile();
513
514         return true;
515 }
516
517