code written, not debugged
[iotcloud.git] / src / server / iotquery.cpp
1 #include "iotquery.h"
2 #include <string.h>
3 #include <sys/stat.h>
4 #include <sys/types.h>
5 #include <sys/file.h>
6 #include <fcntl.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9
10 using namespace std;
11
12 const char * query_str="QUERY_STRING";
13 const char * uri_str="REQUEST_URI";
14 const char * method_str="REQUEST_METHOD";
15 const char * iotcloudroot_str="IOTCLOUD_ROOT";
16 const char * length_str="CONTENT_LENGTH";
17
18 IoTQuery::IoTQuery(FCGX_Request *request) :
19         request(request),
20         data(NULL),
21         directory(NULL),
22         uri(NULL),
23         query(NULL),
24         method(NULL),
25         iotcloudroot(NULL),
26         length(0),
27         oldestentry(0),
28         newestentry(0),
29         requestsequencenumber(0),
30         numqueueentries(DEFAULT_SIZE),
31         fd(-1) {
32 }
33
34 IoTQuery::~IoTQuery() {
35         if (fd >= 0)
36                 close(fd);
37         if (directory)
38                 delete directory;
39         if (data)
40                 delete data;
41 }
42
43 bool IoTQuery::checkDirectory() {
44         struct stat s;
45         int err=stat(directory, &s);
46         if (-1 == err)
47                 return false;
48         return S_ISDIR(s.st_mode);
49 }
50
51 void IoTQuery::decodeQuery() {
52         int len=strlen(query);
53         char * str=new char[len+1];
54         memcpy(str, query, len+1);
55         char *tok_ptr=str;
56
57         /* Parse commands */
58         char *command=strsep(&tok_ptr, "&");
59         if (strncmp(command, "putslot", 7) == 0)
60                 reqPutSlot = true;
61
62         if (strncmp(command, "getslot", 7) == 0)
63                 reqGetSlot = true;
64
65         /* Load Sequence Number for request */
66         char *sequencenumber_str = strsep(&tok_ptr, "&");
67
68         if (sequencenumber_str != NULL)
69                 requestsequencenumber = strtoll(sequencenumber_str, NULL, 10);
70
71         /* Update size if we get request */
72         char * numqueueentries_str = tok_ptr;
73         if (numqueueentries_str != NULL)
74                 numqueueentries = strtoll(numqueueentries_str, NULL, 10);
75
76         delete str;
77 }
78
79 void doWrite(int fd, char *data, long long length) {
80         long long offset=0;
81         do {
82                 long long byteswritten=write(fd, &data[offset], length);
83                 if (byteswritten > 0) {
84                         length -= byteswritten;
85                         offset += byteswritten;
86                 } else {
87                         cerr << "Bytes not written";
88                         return;
89                 }
90         } while(length != 0);
91 }
92
93 bool doRead(int fd, void *buf, int numbytes) {
94         int offset=0;
95         char *ptr=(char *)buf;
96         do {
97                 int bytesread=read(fd, ptr+offset, numbytes);
98                 if (bytesread > 0) {
99                         offset += bytesread;
100                         numbytes -= bytesread;
101                 } else
102                         return false;
103         } while (numbytes!=0);
104         return true;
105 }
106
107 void IoTQuery::getSlot() {
108         int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
109         long long numbytes = 0;
110         int filesizes[numrequeststosend];
111         int fdarray[numrequeststosend];
112         int index=0;
113         for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
114                 struct stat st;
115                 char *filename=getSlotFileName(seqn);
116                 if (stat(filename, &st) == 0) {
117                         fdarray[index]=open(filename, O_RDONLY);
118                         filesizes[index]=st.st_size;
119                         numbytes+=filesizes[index];
120                 } else {
121                         fdarray[index]=-1;
122                         filesizes[index]=0;
123                 }
124                 delete filename;
125         }
126         const char header[]="getdata";
127         long long size=sizeof(header)+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
128         char * response = new char[size];
129         long long offset=0;
130         memcpy(response, header, sizeof(header));
131         offset+=sizeof(header);
132         memcpy(response + offset, &numrequeststosend, sizeof(numrequeststosend));
133         offset+=sizeof(numrequeststosend);
134         for(int i=0;i<numrequeststosend;i++) {
135                 memcpy(response + offset, &filesizes[i], sizeof(int));
136                 offset+=sizeof(int);
137         }
138
139         //copy file data
140         for(int i=0;i<numrequeststosend;i++) {
141                 if (fdarray[i]>=0) {
142                         doRead(fdarray[i], response+offset, filesizes[i]);
143                         offset+=filesizes[i];
144                 }
145         }
146         
147         //write response out
148         sendResponse(response, size);
149
150         //cleanup
151         delete response;
152         for(int i=0;i<numrequeststosend;i++) {
153                 if (fdarray[i] >= 0)
154                         close(fdarray[i]);
155         }
156 }
157
158 void IoTQuery::putSlot() {
159         if (requestsequencenumber!=(newestentry+1)) {
160                 getSlot();
161                 return;
162         }
163         
164         int numberofliveslots=(int) ((newestentry-oldestentry)+1);
165         if (numberofliveslots >=  numqueueentries) {
166                 //need to drop slot
167                 removeOldestSlot();
168         }
169         //write slot data out to file
170         char *filename = getSlotFileName(requestsequencenumber);
171         int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
172         doWrite(slotfd, data, length);
173         close(slotfd);
174         delete filename;
175         newestentry = requestsequencenumber; // update sequence number
176         updateStatusFile(); // update counts
177         char command[]="putdata";
178         sendResponse(command, sizeof(command));
179 }
180
181 void IoTQuery::sendResponse(char * bytes, int len) {
182         cout << "Accept-Ranges: bytes\r\n"
183                          << "Content-Length: " << len << "\r\n"
184                          << "\r\n";
185         cout.write(bytes, len); 
186 }
187
188 char * IoTQuery::getSlotFileName(long long slot) {
189         int directorylen=strlen(directory);
190         char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination
191         snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
192         return filename;
193 }
194
195 void IoTQuery::removeOldestSlot() {
196         if (oldestentry!=0) {
197                 char * filename=getSlotFileName(oldestentry);
198                 unlink(filename);
199                 delete filename;
200         }
201         oldestentry++;
202 }
203
204 void IoTQuery::processQuery() {
205         getQuery();
206         getDirectory();
207         readData();
208
209         if (strncmp(method, "POST", 4) != 0)
210                 return;
211
212         if (directory == NULL ||
213                         !checkDirectory())
214                 return;
215
216         if (!openStatusFile())
217                 return;
218
219         flock(fd, LOCK_EX);
220
221         decodeQuery();
222
223         if (reqGetSlot)
224                 getSlot();
225         else if (reqPutSlot)
226                 putSlot();
227         else return;
228 }
229
230 void IoTQuery::readData() {
231         if (length) {
232                 data = new char[length+1];
233                 memset(data, 0, length+1);
234                 cin.read(data, length);
235         }
236         do {
237                 char dummy;
238                 cin >> dummy;
239         } while (!cin.eof());
240 }
241
242 void IoTQuery::getQuery() {
243         uri = FCGX_GetParam(uri_str, request->envp);
244         query = FCGX_GetParam(query_str, request->envp);
245         method = FCGX_GetParam(method_str, request->envp);
246         iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
247
248         char * reqlength = FCGX_GetParam(length_str, request->envp);
249         if (length) {
250                 length=strtoll(reqlength, NULL, 10);
251         } else {
252                 length=0;
253         }
254 }
255
256 void IoTQuery::getDirectory() {
257         char * split = strchr((char *)uri, '?');
258         if (split == NULL)
259                 return;
260         int split_len = (int) (split-uri);
261         int rootdir_len = strlen(iotcloudroot);
262         int directory_len = split_len + rootdir_len + 1;
263         directory = new char[directory_len];
264         memcpy(directory, iotcloudroot, rootdir_len);
265         memcpy(directory + rootdir_len, uri, split_len);
266         directory[directory_len]=0;
267 }
268
269 int doread(int fd, void *ptr, size_t count, off_t offset) {
270         do {
271                 size_t bytesread=pread(fd, ptr, count, offset);
272                 if (bytesread==count) {
273                         return 1;
274                 } else if (bytesread==0) {
275                         return 0;
276                 }
277         } while(1);
278 }
279
280 void IoTQuery::updateStatusFile() {
281         pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
282         pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
283         pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
284 }
285
286 bool IoTQuery::openStatusFile() {
287         char statusfile[]="queuestatus";
288         int len=strlen(directory);
289
290         char * filename=new char[len+sizeof(statusfile)+2];
291         memcpy(filename, directory, len);
292         filename[len]='/';
293         memcpy(filename+len+1, statusfile, sizeof(statusfile));
294         filename[len+sizeof(statusfile)+1]=0;
295         fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
296         delete filename;
297
298         if (fd < 0)
299                 return false;
300
301         int size;
302         int needwrite=0;
303         if (doread(fd, &size, sizeof(size), OFFSET_MAX))
304                 numqueueentries=size;
305         else
306                 needwrite=1;
307
308         long long entry;
309         if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
310                 oldestentry=entry;
311         else
312                 needwrite=1;
313
314         if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
315                 newestentry=entry;
316         else
317                 needwrite=1;
318
319         if (needwrite)
320                 updateStatusFile();
321
322         return true;
323 }
324
325