10 #include <netinet/in.h>
14 const char * query_str="QUERY_STRING";
15 const char * uri_str="REQUEST_URI";
16 const char * method_str="REQUEST_METHOD";
17 const char * iotcloudroot_str="IOTCLOUD_ROOT";
18 const char * length_str="CONTENT_LENGTH";
20 IoTQuery::IoTQuery(FCGX_Request *request) :
31 requestsequencenumber(0),
32 numqueueentries(DEFAULT_SIZE),
38 IoTQuery::~IoTQuery() {
47 bool IoTQuery::checkDirectory() {
49 int err=stat(directory, &s);
52 return S_ISDIR(s.st_mode);
55 void IoTQuery::decodeQuery() {
56 int len=strlen(query);
57 char * str=new char[len+1];
58 memcpy(str, query, len+1);
62 char *command=strsep(&tok_ptr, "&");
63 if (strncmp(command, "req=putslot", 11) == 0)
66 if (strncmp(command, "req=getslot", 11) == 0)
69 /* Load Sequence Number for request */
70 char *sequencenumber_str = strsep(&tok_ptr, "&");
71 if (sequencenumber_str != NULL &&
72 strncmp(sequencenumber_str, "seq=", 4) == 0) {
73 sequencenumber_str = strchr(sequencenumber_str, '=');
74 if (sequencenumber_str != NULL) {
75 requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
79 //don't allow a really old sequence number
80 if (requestsequencenumber < oldestentry)
81 requestsequencenumber = oldestentry;
83 /* Update size if we get request */
84 char * numqueueentries_str = tok_ptr;
85 if (numqueueentries_str != NULL &&
86 strncmp(numqueueentries_str, "max=", 4) == 0) {
87 numqueueentries_str = strchr(numqueueentries_str + 1, '=');
88 numqueueentries = strtoll(numqueueentries_str, NULL, 10);
94 void doWrite(int fd, char *data, long long length) {
97 long long byteswritten=write(fd, &data[offset], length);
98 if (byteswritten > 0) {
99 length -= byteswritten;
100 offset += byteswritten;
102 cerr << "Bytes not written" << endl;
103 if (byteswritten < 0) {
104 cerr << strerror(errno) << " error writing slot file" << endl;
108 } while(length != 0);
111 bool doRead(int fd, void *buf, int numbytes) {
113 char *ptr=(char *)buf;
115 int bytesread=read(fd, ptr+offset, numbytes);
118 numbytes -= bytesread;
121 } while (numbytes!=0);
125 void IoTQuery::getSlot() {
126 int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
127 if (numrequeststosend < 0)
128 numrequeststosend = 0;
129 long long numbytes = 0;
130 int filesizes[numrequeststosend];
131 int fdarray[numrequeststosend];
133 for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
135 char *filename=getSlotFileName(seqn);
136 if (stat(filename, &st) == 0) {
137 fdarray[index]=open(filename, O_RDONLY);
138 filesizes[index]=st.st_size;
139 numbytes+=filesizes[index];
146 const char header[]="getslot";
147 long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; //header + payload + file count + sizes
148 char * response = new char[size];
150 memcpy(response, header, sizeof(header)-1);
151 offset+=sizeof(header)-1;
152 int numreq=htonl(numrequeststosend);
153 cerr << numrequeststosend << " " << numreq << endl;
154 memcpy(response + offset, &numreq, sizeof(numreq));
155 offset+=sizeof(numrequeststosend);
156 for(int i=0;i<numrequeststosend;i++) {
157 int filesize=htonl(filesizes[i]);
158 memcpy(response + offset, &filesize, sizeof(filesize));
163 for(int i=0;i<numrequeststosend;i++) {
165 doRead(fdarray[i], response+offset, filesizes[i]);
166 offset+=filesizes[i];
171 sendResponse(response, size);
175 for(int i=0;i<numrequeststosend;i++) {
181 void IoTQuery::putSlot() {
182 if (requestsequencenumber!=(newestentry+1)) {
187 int numberofliveslots=(int) ((newestentry-oldestentry)+1);
188 if (numberofliveslots >= numqueueentries) {
193 //write slot data out to file
194 char *filename = getSlotFileName(requestsequencenumber);
195 int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
196 doWrite(slotfd, data, length);
199 newestentry = requestsequencenumber; // update sequence number
200 updateStatusFile(); // update counts
201 char command[]="putslot";
202 sendResponse(command, sizeof(command)-1);
205 void IoTQuery::sendResponse(char * bytes, int len) {
206 cout << "Accept-Ranges: bytes\r\n"
207 << "Content-Length: " << len << "\r\n"
209 cout.write(bytes, len);
212 char * IoTQuery::getSlotFileName(long long slot) {
213 int directorylen=strlen(directory);
214 char * filename=new char[25+directorylen];//19 digits for long number + 4 characters for SLOT + 1 character for null termination
215 snprintf(filename, 24+directorylen+1, "%s/SLOT%lld", directory, slot);
219 void IoTQuery::removeOldestSlot() {
220 if (oldestentry!=0) {
221 char * filename=getSlotFileName(oldestentry);
228 void IoTQuery::processQuery() {
233 if (strncmp(method, "POST", 4) != 0) {
234 cerr << "Not POST Request" << endl;
238 if (directory == NULL ||
240 cerr << "Directory " << directory << " does not exist" << endl;
244 if (!openStatusFile()) {
245 cerr << "Failed to open status file" << endl;
258 cerr << "No recognized request" << endl;
263 void IoTQuery::readData() {
265 data = new char[length+1];
266 memset(data, 0, length+1);
267 cin.read(data, length);
272 } while (!cin.eof());
275 void IoTQuery::getQuery() {
276 uri = FCGX_GetParam(uri_str, request->envp);
277 query = FCGX_GetParam(query_str, request->envp);
278 method = FCGX_GetParam(method_str, request->envp);
279 iotcloudroot = FCGX_GetParam(iotcloudroot_str, request->envp);
281 char * reqlength = FCGX_GetParam(length_str, request->envp);
283 length=strtoll(reqlength, NULL, 10);
289 void IoTQuery::getDirectory() {
290 char * split = strchr((char *)uri, '?');
293 int split_len = (int) (split-uri);
294 int rootdir_len = strlen(iotcloudroot);
295 int directory_len = split_len + rootdir_len + 1;
296 directory = new char[directory_len];
297 memcpy(directory, iotcloudroot, rootdir_len);
298 memcpy(directory + rootdir_len, uri, split_len);
299 directory[directory_len-1]=0;
302 int doread(int fd, void *ptr, size_t count, off_t offset) {
304 size_t bytesread=pread(fd, ptr, count, offset);
305 if (bytesread==count) {
307 } else if (bytesread==0) {
313 void IoTQuery::updateStatusFile() {
314 pwrite(fd, &numqueueentries, sizeof(numqueueentries), OFFSET_MAX);
315 pwrite(fd, &oldestentry, sizeof(oldestentry), OFFSET_OLD);
316 pwrite(fd, &newestentry, sizeof(newestentry), OFFSET_NEW);
319 bool IoTQuery::openStatusFile() {
320 char statusfile[]="queuestatus";
321 int len=strlen(directory);
323 char * filename=new char[len+sizeof(statusfile)+2];
324 memcpy(filename, directory, len);
326 memcpy(filename+len+1, statusfile, sizeof(statusfile));
327 filename[len+sizeof(statusfile)+1]=0;
328 fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
332 cerr << strerror(errno) << " error opening statusfile" << endl;
338 if (doread(fd, &size, sizeof(size), OFFSET_MAX))
339 numqueueentries=size;
344 if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
349 if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))