add some comments
[iotcloud.git] / src / server / iotquery.cpp
1 #include "iotquery.h"
2 #include <string.h>
3 #include <sys/stat.h>
4 #include <sys/types.h>
5 #include <sys/file.h>
6 #include <fcntl.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <errno.h>
10 #include <netinet/in.h>
11
12 using namespace std;
13
14 const char * query_str="QUERY_STRING";
15 const char * uri_str="REQUEST_URI";
16 const char * method_str="REQUEST_METHOD";
17 const char * iotcloudroot_str="IOTCLOUD_ROOT";
18 const char * length_str="CONTENT_LENGTH";
19
20 IoTQuery::IoTQuery(FCGX_Request *request) :
21         request(request),
22         data(NULL),
23         directory(NULL),
24         uri(NULL),
25         query(NULL),
26         method(NULL),
27         iotcloudroot(NULL),
28         length(0),
29         oldestentry(0),
30         newestentry(0),
31         requestsequencenumber(0),
32         numqueueentries(DEFAULT_SIZE),
33         fd(-1),
34         reqGetSlot(false),
35         reqPutSlot(false) {
36 }
37
38 IoTQuery::~IoTQuery() {
39         if (fd >= 0)
40                 close(fd);
41         if (directory)
42                 delete directory;
43         if (data)
44                 delete data;
45 }
46
47 /**
48  *  Returns true if the account directory exists.
49  */
50
51 bool IoTQuery::checkDirectory() {
52         struct stat s;
53         int err=stat(directory, &s);
54         if (-1 == err)
55                 return false;
56         return S_ISDIR(s.st_mode);
57 }
58
59 /**
60  * Decodes query string from client. Extracts type of request,
61  * sequence number, and whether the request changes the number of
62  * slots.
63  */
64
65 void IoTQuery::decodeQuery() {
66         int len=strlen(query);
67         char * str=new char[len+1];
68         memcpy(str, query, len+1);
69         char *tok_ptr=str;
70
71         /* Parse commands */
72         char *command=strsep(&tok_ptr, "&");
73         if (strncmp(command, "req=putslot", 11) == 0)
74                 reqPutSlot = true;
75
76         if (strncmp(command, "req=getslot", 11) == 0)
77                 reqGetSlot = true;
78
79         /* Load Sequence Number for request */
80         char *sequencenumber_str = strsep(&tok_ptr, "&");
81         if (sequencenumber_str != NULL &&
82                         strncmp(sequencenumber_str, "seq=", 4) == 0) {
83                 sequencenumber_str = strchr(sequencenumber_str, '=');
84                 if (sequencenumber_str != NULL) {
85                         requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
86                 }
87         }
88
89         /* don't allow a really old sequence number */
90         if (requestsequencenumber < oldestentry)
91                 requestsequencenumber = oldestentry;
92
93         /* Update size if we get request */
94         char * numqueueentries_str = tok_ptr;
95         if (numqueueentries_str != NULL &&
96                         strncmp(numqueueentries_str, "max=", 4) == 0) {
97                 numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
98                 numqueueentries = strtoll(numqueueentries_str, NULL, 10);
99         }
100
101         delete str;
102 }
103
104 /**
105  * Helper function to write data to file.
106  */
107
108 void doWrite(int fd, char *data, long long length) {
109         long long offset=0;
110         do {
111                 long long byteswritten=write(fd, &data[offset], length);
112                 if (byteswritten > 0) {
113                         length -= byteswritten;
114                         offset += byteswritten;
115                 } else {
116                         cerr << "Bytes not written" << endl;
117                         if (byteswritten < 0) {
118                                 cerr << strerror(errno) << " error writing slot file" << endl;
119                         }
120                         return;
121                 }
122         } while(length != 0);
123 }
124
125 /** Helper function to read data from file. */
126 bool doRead(int fd, void *buf, int numbytes) {
127         int offset=0;
128         char *ptr=(char *)buf;
129         do {
130                 int bytesread=read(fd, ptr+offset, numbytes);
131                 if (bytesread > 0) {
132                         offset += bytesread;
133                         numbytes -= bytesread;
134                 } else
135                         return false;
136         } while (numbytes!=0);
137         return true;
138 }
139
140 /**
141  * Function that handles a getSlot request.
142  */
143
144 void IoTQuery::getSlot() {
145         int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
146         if (numrequeststosend < 0)
147                 numrequeststosend = 0;
148         long long numbytes = 0;
149         int filesizes[numrequeststosend];
150         int fdarray[numrequeststosend];
151         int index=0;
152         for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
153                 struct stat st;
154                 char *filename=getSlotFileName(seqn);
155                 if (stat(filename, &st) == 0) {
156                         fdarray[index]=open(filename, O_RDONLY);
157                         filesizes[index]=st.st_size;
158                         numbytes+=filesizes[index];
159                 } else {
160                         fdarray[index]=-1;
161                         filesizes[index]=0;
162                 }
163                 delete filename;
164         }
165         const char header[]="getslot";
166
167         /* Size is the header + the payload + space for number of requests
168                  plus sizes of each slot */
169
170         long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;
171         char * response = new char[size];
172         long long offset=0;
173         memcpy(response, header, sizeof(header)-1);
174         offset+=sizeof(header)-1;
175         int numreq=htonl(numrequeststosend);
176         memcpy(response + offset, &numreq, sizeof(numreq));
177         offset+=sizeof(numrequeststosend);
178         for(int i=0; i<numrequeststosend; i++) {
179                 int filesize=htonl(filesizes[i]);
180                 memcpy(response + offset, &filesize, sizeof(filesize));
181                 offset+=sizeof(int);
182         }
183
184         /* Read the file data into the buffer */
185         for(int i=0; i<numrequeststosend; i++) {
186                 if (fdarray[i]>=0) {
187                         doRead(fdarray[i], response+offset, filesizes[i]);
188                         offset+=filesizes[i];
189                 }
190         }
191
192         /* Send the response out to the webserver. */
193         sendResponse(response, size);
194
195         /* Delete the response buffer and close the files. */
196         delete response;
197         for(int i=0; i<numrequeststosend; i++) {
198                 if (fdarray[i] >= 0)
199                         close(fdarray[i]);
200         }
201 }
202
203 /**
204  *      The method putSlot handles a putSlot request from the client
205  */
206
207 void IoTQuery::putSlot() {
208         /* Check if the request is stale and send update in that case.  This
209                  servers as an implicit failure of the request. */
210         if (requestsequencenumber!=(newestentry+1)) {
211                 getSlot();
212                 return;
213         }
214
215         /* See if we have too many slots and if so, delete the old one */
216         int numberofliveslots=(int) ((newestentry-oldestentry)+1);
217         if (numberofliveslots >=  numqueueentries) {
218                 removeOldestSlot();
219         }
220
221         /* Write the slot data we received to a SLOT file */
222         char *filename = getSlotFileName(requestsequencenumber);
223         int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
224         doWrite(slotfd, data, length);
225         close(slotfd);
226         delete filename;
227         newestentry = requestsequencenumber;
228
229         /* Update the seuqence numbers and other status file information. */
230         updateStatusFile();
231
232         /* Send response acknowledging success */
233         char command[]="putslot";
234         sendResponse(command, sizeof(command)-1);
235 }
236
237 /**
238  * Method sends response.  It wraps in appropriate headers for web
239  * server.
240  */
241
242 void IoTQuery::sendResponse(char * bytes, int len) {
243         cout << "Accept-Ranges: bytes\r\n"
244                          << "Content-Length: " << len << "\r\n"
245                          << "\r\n";
246         cout.write(bytes, len);
247 }
248
249 /**
250  *      Computes the name for a slot file for the given sequence number.
251  */
252
253 char * IoTQuery::getSlotFileName(long long seqnum) {
254         int directorylen=strlen(directory);
255
256         /* Size is 19 digits for ASCII representation of a long + 4
257                  characters for SLOT string + 1 character for null termination +
258                  directory size*/
259
260         char * filename=new char[25+directorylen];
261         snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, seqnum);
262         return filename;
263 }
264
265 /**
266  *  Removes the oldest slot file
267  */
268
269 void IoTQuery::removeOldestSlot() {
270         if (oldestentry!=0) {
271                 char * filename=getSlotFileName(oldestentry);
272                 unlink(filename);
273                 delete filename;
274         }
275         oldestentry++;
276 }
277
278 /**
279  * Processes the query sent to the fastcgi handler.
280  */
281
282 void IoTQuery::processQuery() {
283         getQuery();
284         getDirectory();
285         readData();
286
287         /* Verify that we receive a post request. */
288         if (strncmp(method, "POST", 4) != 0) {
289                 cerr << "Not POST Request" << endl;
290                 return;
291         }
292
293         /* Make sure the directory is okay. */
294         if (directory == NULL ||
295                         !checkDirectory()) {
296                 cerr << "Directory " << directory << " does not exist" << endl;
297                 return;
298         }
299
300         /* Get queue state from the status file.  If it doesn't exist,
301                  create it. */
302         if (!openStatusFile()) {
303                 cerr << "Failed to open status file" << endl;
304                 return;
305         }
306
307         /* Lock status file to keep other requests out. */
308         flock(fd, LOCK_EX);
309
310         /* Decode query. */
311         decodeQuery();
312
313         /* Handle request. */
314         if (reqGetSlot)
315                 getSlot();
316         else if (reqPutSlot)
317                 putSlot();
318         else {
319                 cerr << "No recognized request" << endl;
320                 return;
321         }
322 }
323
324 /**
325  * Reads in data for request.  This is used for the slot to be
326  * inserted.
327  */
328
329 void IoTQuery::readData() {
330         if (length) {
331                 data = new char[length+1];
332                 memset(data, 0, length+1);
333                 cin.read(data, length);
334         }
335         do {
336                 char dummy;
337                 cin >> dummy;
338         } while (!cin.eof());
339 }
340
341
342 /**
343  * Reads relevant environmental variables to find out the request.
344  */
345
346 void IoTQuery::getQuery() {
347         uri = FCGX_GetParam(uri_str, request->envp);
348         query = FCGX_GetParam(query_str, request->envp);
349         method = FCGX_GetParam(method_str, request->envp);
350         iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
351
352         /** We require the content-length header to be sent. */
353         char * reqlength = FCGX_GetParam(length_str, request->envp);
354         if (reqlength) {
355                 length=strtoll(reqlength, NULL, 10);
356         } else {
357                 length=0;
358         }
359 }
360
361 /**
362  *  Initializes directory field from environmental variables.
363  */
364
365 void IoTQuery::getDirectory() {
366         char * split = strchr((char *)uri, '?');
367         if (split == NULL)
368                 return;
369         int split_len = (int) (split-uri);
370         int rootdir_len = strlen(iotcloudroot);
371         int directory_len = split_len + rootdir_len + 1;
372         directory = new char[directory_len];
373         memcpy(directory, iotcloudroot, rootdir_len);
374         memcpy(directory + rootdir_len, uri, split_len);
375         directory[directory_len-1]=0;
376 }
377
378 /**
379  * Helper function that is used to read the status file.
380  */
381
382 int doread(int fd, void *ptr, size_t count, off_t offset) {
383         do {
384                 size_t bytesread=pread(fd, ptr, count, offset);
385                 if (bytesread==count) {
386                         return 1;
387                 } else if (bytesread==0) {
388                         return 0;
389                 }
390         } while(1);
391 }
392
393
394 /**
395  * Writes the current state to the status file.
396  */
397
398 void IoTQuery::updateStatusFile() {
399         pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
400         pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
401         pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
402 }
403
404 /**
405  * Reads in queue state from the status file.  Returns true if
406  * successful.
407  */
408
409 bool IoTQuery::openStatusFile() {
410         char statusfile[]="queuestatus";
411         int len=strlen(directory);
412
413         char * filename=new char[len+sizeof(statusfile)+2];
414         memcpy(filename, directory, len);
415         filename[len]='/';
416         memcpy(filename+len+1, statusfile, sizeof(statusfile));
417         filename[len+sizeof(statusfile)+1]=0;
418         fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
419         delete filename;
420
421         if (fd < 0) {
422                 cerr << strerror(errno) << " error opening statusfile" << endl;
423                 return false;
424         }
425
426         /* Read in queue size, oldest sequence number, and newest sequence number. */
427         int size;
428         int needwrite=0;
429         if (doread(fd, &size, sizeof(size), OFFSET_MAX))
430                 numqueueentries=size;
431         else
432                 needwrite=1;
433
434         long long entry;
435         if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
436                 oldestentry=entry;
437         else
438                 needwrite=1;
439
440         if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
441                 newestentry=entry;
442         else
443                 needwrite=1;
444
445         if (needwrite)
446                 updateStatusFile();
447
448         return true;
449 }
450
451