d2d80e4177420a100a9900fbdd625289bc910735
[iotcloud.git] / src / server / iotquery.cpp
1 #include "iotquery.h"
2 #include <string.h>
3 #include <sys/stat.h>
4 #include <sys/types.h>
5 #include <sys/file.h>
6 #include <fcntl.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9
10 using namespace std;
11
12 const char * query_str="QUERY_STRING";
13 const char * uri_str="REQUEST_URI";
14 const char * method_str="REQUEST_METHOD";
15 const char * iotcloudroot_str="IOTCLOUD_ROOT";
16 const char * length_str="CONTENT_LENGTH";
17
18 IoTQuery::IoTQuery(FCGX_Request *request) :
19         request(request),
20         data(NULL),
21         directory(NULL),
22         uri(NULL),
23         query(NULL),
24         method(NULL),
25         iotcloudroot(NULL),
26         length(0),
27         oldestentry(0),
28         newestentry(0),
29         requestsequencenumber(0),
30         numqueueentries(DEFAULT_SIZE),
31         fd(-1) {
32 }
33
34 IoTQuery::~IoTQuery() {
35         if (fd >= 0)
36                 close(fd);
37         if (directory)
38                 delete directory;
39         if (data)
40                 delete data;
41 }
42
43 bool IoTQuery::checkDirectory() {
44         struct stat s;
45         int err=stat(directory, &s);
46         if (-1 == err)
47                 return false;
48         return S_ISDIR(s.st_mode);
49 }
50
51 void IoTQuery::decodeQuery() {
52         int len=strlen(query);
53         char * str=new char[len+1];
54         memcpy(str, query, len+1);
55         char *tok_ptr=str;
56
57         /* Parse commands */
58         char *command=strsep(&tok_ptr, "&");
59         if (strncmp(command, "putslot", 7) == 0)
60                 reqPutSlot = true;
61
62         if (strncmp(command, "getslot", 7) == 0)
63                 reqGetSlot = true;
64
65         /* Load Sequence Number for request */
66         char *sequencenumber_str = strsep(&tok_ptr, "&");
67
68         if (sequencenumber_str != NULL)
69                 requestsequencenumber = strtoll(sequencenumber_str, NULL, 10);
70
71         /* Update size if we get request */
72         char * numqueueentries_str = tok_ptr;
73         if (numqueueentries_str != NULL)
74                 numqueueentries = strtoll(numqueueentries_str, NULL, 10);
75
76         delete str;
77 }
78
79 void doWrite(int fd, char *data, long long length) {
80         long long offset=0;
81         do {
82                 long long byteswritten=write(fd, &data[offset], length);
83                 if (byteswritten > 0) {
84                         length -= byteswritten;
85                         offset += byteswritten;
86                 } else {
87                         cerr << "Bytes not written";
88                         return;
89                 }
90         } while(length != 0);
91 }
92
93 void IoTQuery::getSlot() {
94         
95 }
96
97 void IoTQuery::putSlot() {
98         if (requestsequencenumber!=(newestentry+1)) {
99                 getSlot();
100                 return;
101         }
102         
103         int numberofliveslots=(int) ((newestentry-oldestentry)+1);
104         if (numberofliveslots >=  numqueueentries) {
105                 //need to drop slot
106                 removeOldestSlot();
107         }
108         //write slot data out to file
109         char *filename = getSlotFileName(requestsequencenumber);
110         int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
111         doWrite(slotfd, data, length);
112         close(slotfd);
113         delete filename;
114         newestentry = requestsequencenumber; // update sequence number
115         updateStatusFile(); // update counts
116 }
117
118 char * IoTQuery::getSlotFileName(long long slot) {
119         int directorylen=strlen(directory);
120         char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination
121         snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
122         return filename;
123 }
124
125
126
127 void IoTQuery::removeOldestSlot() {
128         if (oldestentry!=0) {
129                 char * filename=getSlotFileName(oldestentry);
130                 unlink(filename);
131                 delete filename;
132         }
133         oldestentry++;
134 }
135
136 void IoTQuery::processQuery() {
137         getQuery();
138         getDirectory();
139         readData();
140
141         if (strncmp(method, "POST", 4) != 0)
142                 return;
143
144         if (directory == NULL ||
145                         !checkDirectory())
146                 return;
147
148         if (!openStatusFile())
149                 return;
150
151         flock(fd, LOCK_EX);
152
153         decodeQuery();
154
155         if (reqGetSlot)
156                 getSlot();
157         else if (reqPutSlot)
158                 putSlot();
159         else return;
160 }
161
162 void IoTQuery::readData() {
163         if (length) {
164                 data = new char[length+1];
165                 memset(data, 0, length+1);
166                 cin.read(data, length);
167         }
168         do {
169                 char dummy;
170                 cin >> dummy;
171         } while (!cin.eof());
172 }
173
174 void IoTQuery::getQuery() {
175         uri = FCGX_GetParam(uri_str, request->envp);
176         query = FCGX_GetParam(query_str, request->envp);
177         method = FCGX_GetParam(method_str, request->envp);
178         iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
179
180         char * reqlength = FCGX_GetParam(length_str, request->envp);
181         if (length) {
182                 length=strtoll(reqlength, NULL, 10);
183         } else {
184                 length=0;
185         }
186 }
187
188 void IoTQuery::getDirectory() {
189         char * split = strchr((char *)uri, '?');
190         if (split == NULL)
191                 return;
192         int split_len = (int) (split-uri);
193         int rootdir_len = strlen(iotcloudroot);
194         int directory_len = split_len + rootdir_len + 1;
195         directory = new char[directory_len];
196         memcpy(directory, iotcloudroot, rootdir_len);
197         memcpy(directory + rootdir_len, uri, split_len);
198         directory[directory_len]=0;
199 }
200
201 int doread(int fd, void *ptr, size_t count, off_t offset) {
202         do {
203                 size_t bytesread=pread(fd, ptr, count, offset);
204                 if (bytesread==count) {
205                         return 1;
206                 } else if (bytesread==0) {
207                         return 0;
208                 }
209         } while(1);
210 }
211
212 void IoTQuery::updateStatusFile() {
213         pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
214         pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
215         pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
216 }
217
218 bool IoTQuery::openStatusFile() {
219         char statusfile[]="queuestatus";
220         int len=strlen(directory);
221
222         char * filename=new char[len+sizeof(statusfile)+2];
223         memcpy(filename, directory, len);
224         filename[len]='/';
225         memcpy(filename+len+1, statusfile, sizeof(statusfile));
226         filename[len+sizeof(statusfile)+1]=0;
227         fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
228         delete filename;
229
230         if (fd < 0)
231                 return false;
232
233         int size;
234         int needwrite=0;
235         if (doread(fd, &size, sizeof(size), OFFSET_MAX))
236                 numqueueentries=size;
237         else
238                 needwrite=1;
239
240         long long entry;
241         if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
242                 oldestentry=entry;
243         else
244                 needwrite=1;
245
246         if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
247                 newestentry=entry;
248         else
249                 needwrite=1;
250
251         if (needwrite)
252                 updateStatusFile();
253
254         return true;
255 }
256
257