more code
[iotcloud.git] / src / server / iotquery.cpp
1 #include "iotquery.h"
2 #include <string.h>
3 #include <sys/stat.h>
4 #include <sys/types.h>
5 #include <sys/file.h>
6 #include <fcntl.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <errno.h>
10 #include <netinet/in.h>
11
12 using namespace std;
13
14 const char * query_str="QUERY_STRING";
15 const char * uri_str="REQUEST_URI";
16 const char * method_str="REQUEST_METHOD";
17 const char * iotcloudroot_str="IOTCLOUD_ROOT";
18 const char * length_str="CONTENT_LENGTH";
19
20 IoTQuery::IoTQuery(FCGX_Request *request) :
21         request(request),
22         data(NULL),
23         directory(NULL),
24         uri(NULL),
25         query(NULL),
26         method(NULL),
27         iotcloudroot(NULL),
28         length(0),
29         oldestentry(0),
30         newestentry(0),
31         requestsequencenumber(0),
32         numqueueentries(DEFAULT_SIZE),
33         fd(-1),
34         reqGetSlot(false),
35         reqPutSlot(false) {
36 }
37
38 IoTQuery::~IoTQuery() {
39         if (fd >= 0)
40                 close(fd);
41         if (directory)
42                 delete directory;
43         if (data)
44                 delete data;
45 }
46
47 bool IoTQuery::checkDirectory() {
48         struct stat s;
49         int err=stat(directory, &s);
50         if (-1 == err)
51                 return false;
52         return S_ISDIR(s.st_mode);
53 }
54
55 void IoTQuery::decodeQuery() {
56         int len=strlen(query);
57         char * str=new char[len+1];
58         memcpy(str, query, len+1);
59         char *tok_ptr=str;
60         
61         /* Parse commands */
62         char *command=strsep(&tok_ptr, "&");
63         if (strncmp(command, "req=putslot", 11) == 0)
64                 reqPutSlot = true;
65
66         if (strncmp(command, "req=getslot", 11) == 0)
67                 reqGetSlot = true;
68
69         /* Load Sequence Number for request */
70         char *sequencenumber_str = strsep(&tok_ptr, "&");
71         if (sequencenumber_str != NULL &&
72                         strncmp(sequencenumber_str, "seq=", 4) == 0) {
73                 sequencenumber_str = strchr(sequencenumber_str, '=');
74                 if (sequencenumber_str != NULL) {
75                         requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
76                 }
77         }
78
79         //don't allow a really old sequence number
80         if (requestsequencenumber < oldestentry)
81                 requestsequencenumber = oldestentry;
82         
83         /* Update size if we get request */
84         char * numqueueentries_str = tok_ptr;
85         if (numqueueentries_str != NULL &&
86                         strncmp(numqueueentries_str, "max=", 4) == 0) {
87                 numqueueentries_str = strchr(numqueueentries_str + 1, '=');
88                 numqueueentries = strtoll(numqueueentries_str, NULL, 10);
89         }
90                 
91         delete str;
92 }
93
94 void doWrite(int fd, char *data, long long length) {
95         long long offset=0;
96         do {
97                 long long byteswritten=write(fd, &data[offset], length);
98                 if (byteswritten > 0) {
99                         length -= byteswritten;
100                         offset += byteswritten;
101                 } else {
102                         cerr << "Bytes not written" << endl;
103                         if (byteswritten < 0) {
104                                 cerr << strerror(errno) << " error writing slot file" << endl;
105                         }
106                         return;
107                 }
108         } while(length != 0);
109 }
110
111 bool doRead(int fd, void *buf, int numbytes) {
112         int offset=0;
113         char *ptr=(char *)buf;
114         do {
115                 int bytesread=read(fd, ptr+offset, numbytes);
116                 if (bytesread > 0) {
117                         offset += bytesread;
118                         numbytes -= bytesread;
119                 } else
120                         return false;
121         } while (numbytes!=0);
122         return true;
123 }
124
125 void IoTQuery::getSlot() {
126         int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
127         if (numrequeststosend < 0)
128                 numrequeststosend = 0;
129         long long numbytes = 0;
130         int filesizes[numrequeststosend];
131         int fdarray[numrequeststosend];
132         int index=0;
133         for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
134                 struct stat st;
135                 char *filename=getSlotFileName(seqn);
136                 if (stat(filename, &st) == 0) {
137                         fdarray[index]=open(filename, O_RDONLY);
138                         filesizes[index]=st.st_size;
139                         numbytes+=filesizes[index];
140                 } else {
141                         fdarray[index]=-1;
142                         filesizes[index]=0;
143                 }
144                 delete filename;
145         }
146         const char header[]="getslot";
147         long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
148         char * response = new char[size];
149         long long offset=0;
150         memcpy(response, header, sizeof(header)-1);
151         offset+=sizeof(header)-1;
152         int numreq=htonl(numrequeststosend);
153         cerr << numrequeststosend << " " << numreq << endl;
154         memcpy(response + offset, &numreq, sizeof(numreq));
155         offset+=sizeof(numrequeststosend);
156         for(int i=0;i<numrequeststosend;i++) {
157                 int filesize=htonl(filesizes[i]);
158                 memcpy(response + offset, &filesize, sizeof(filesize));
159                 offset+=sizeof(int);
160         }
161         
162         //copy file data
163         for(int i=0;i<numrequeststosend;i++) {
164                 if (fdarray[i]>=0) {
165                         doRead(fdarray[i], response+offset, filesizes[i]);
166                         offset+=filesizes[i];
167                 }
168         }
169         
170         //write response out
171         sendResponse(response, size);
172
173         //cleanup
174         delete response;
175         for(int i=0;i<numrequeststosend;i++) {
176                 if (fdarray[i] >= 0)
177                         close(fdarray[i]);
178         }
179 }
180
181 void IoTQuery::putSlot() {
182         if (requestsequencenumber!=(newestentry+1)) {
183                 getSlot();
184                 return;
185         }
186
187         int numberofliveslots=(int) ((newestentry-oldestentry)+1);
188         if (numberofliveslots >=  numqueueentries) {
189                 //need to drop slot
190                 removeOldestSlot();
191         }
192         
193         //write slot data out to file
194         char *filename = getSlotFileName(requestsequencenumber);
195         int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
196         doWrite(slotfd, data, length);
197         close(slotfd);
198         delete filename;
199         newestentry = requestsequencenumber; // update sequence number
200         updateStatusFile(); // update counts
201         char command[]="putslot";
202         sendResponse(command, sizeof(command)-1);
203 }
204
205 void IoTQuery::sendResponse(char * bytes, int len) {
206         cout << "Accept-Ranges: bytes\r\n"
207                          << "Content-Length: " << len << "\r\n"
208                          << "\r\n";
209         cout.write(bytes, len); 
210 }
211
212 char * IoTQuery::getSlotFileName(long long slot) {
213         int directorylen=strlen(directory);
214         char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination
215         snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
216         return filename;
217 }
218
219 void IoTQuery::removeOldestSlot() {
220         if (oldestentry!=0) {
221                 char * filename=getSlotFileName(oldestentry);
222                 unlink(filename);
223                 delete filename;
224         }
225         oldestentry++;
226 }
227
228 void IoTQuery::processQuery() {
229         getQuery();
230         getDirectory();
231         readData();
232
233         if (strncmp(method, "POST", 4) != 0) {
234                 cerr << "Not POST Request" << endl;
235                 return;
236         }
237
238         if (directory == NULL ||
239                         !checkDirectory()) {
240                 cerr << "Directory " << directory << " does not exist" << endl;
241                 return;
242         }
243
244         if (!openStatusFile()) {
245                 cerr << "Failed to open status file" << endl;
246                 return;
247         }
248
249         flock(fd, LOCK_EX);
250
251         decodeQuery();
252         
253         if (reqGetSlot)
254                 getSlot();
255         else if (reqPutSlot)
256                 putSlot();
257         else {
258                 cerr << "No recognized request" << endl;
259                 return;
260         }
261 }
262
263 void IoTQuery::readData() {
264         if (length) {
265                 data = new char[length+1];
266                 memset(data, 0, length+1);
267                 cin.read(data, length);
268         }
269         do {
270                 char dummy;
271                 cin >> dummy;
272         } while (!cin.eof());
273 }
274
275 void IoTQuery::getQuery() {
276         uri = FCGX_GetParam(uri_str, request->envp);
277         query = FCGX_GetParam(query_str, request->envp);
278         method = FCGX_GetParam(method_str, request->envp);
279         iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
280
281         char * reqlength = FCGX_GetParam(length_str, request->envp);
282         if (reqlength) {
283                 length=strtoll(reqlength, NULL, 10);
284         } else {
285                 length=0;
286         }
287 }
288
289 void IoTQuery::getDirectory() {
290         char * split = strchr((char *)uri, '?');
291         if (split == NULL)
292                 return;
293         int split_len = (int) (split-uri);
294         int rootdir_len = strlen(iotcloudroot);
295         int directory_len = split_len + rootdir_len + 1;
296         directory = new char[directory_len];
297         memcpy(directory, iotcloudroot, rootdir_len);
298         memcpy(directory + rootdir_len, uri, split_len);
299         directory[directory_len-1]=0;
300 }
301
302 int doread(int fd, void *ptr, size_t count, off_t offset) {
303         do {
304                 size_t bytesread=pread(fd, ptr, count, offset);
305                 if (bytesread==count) {
306                         return 1;
307                 } else if (bytesread==0) {
308                         return 0;
309                 }
310         } while(1);
311 }
312
313 void IoTQuery::updateStatusFile() {
314         pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
315         pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
316         pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
317 }
318
319 bool IoTQuery::openStatusFile() {
320         char statusfile[]="queuestatus";
321         int len=strlen(directory);
322
323         char * filename=new char[len+sizeof(statusfile)+2];
324         memcpy(filename, directory, len);
325         filename[len]='/';
326         memcpy(filename+len+1, statusfile, sizeof(statusfile));
327         filename[len+sizeof(statusfile)+1]=0;
328         fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
329         delete filename;
330
331         if (fd < 0) {
332                 cerr << strerror(errno) << " error opening statusfile" << endl;
333                 return false;
334         }
335         
336         int size;
337         int needwrite=0;
338         if (doread(fd, &size, sizeof(size), OFFSET_MAX))
339                 numqueueentries=size;
340         else
341                 needwrite=1;
342
343         long long entry;
344         if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
345                 oldestentry=entry;
346         else
347                 needwrite=1;
348
349         if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
350                 newestentry=entry;
351         else
352                 needwrite=1;
353
354         if (needwrite)
355                 updateStatusFile();
356
357         return true;
358 }
359
360